summit/backend/app/api/peaks.py

76 lines
2.2 KiB
Python

from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.schemas import peak as peak_schema
from app.services.crud_peak import peak as crud_peak
import json
from geoalchemy2.shape import to_shape
from shapely.geometry import mapping
router = APIRouter()
@router.get("/", response_model=List[peak_schema.Peak])
async def read_peaks(
db: AsyncSession = Depends(get_db),
skip: int = 0,
limit: int = 100,
):
"""
Retrieve all peaks.
"""
peaks = await crud_peak.get_multi(db, skip=skip, limit=limit)
# Manually convert WKBElement to GeoJSON dict for response
# This logic usually goes into a serializer or custom Pydantic validator
results = []
for p in peaks:
if p.location is not None:
# Create a copy or simple dict to avoid mutating SQLAlchemy object state inappropriately if attached
p_dict = p.__dict__.copy()
# Convert WKBElement to Shapely object then to GeoJSON dict
sh = to_shape(p.location)
p_dict['location'] = mapping(sh)
results.append(p_dict)
else:
results.append(p)
return results
@router.post("/", response_model=peak_schema.Peak)
async def create_peak(
*,
db: AsyncSession = Depends(get_db),
peak_in: peak_schema.PeakCreate,
):
"""
Create new peak.
"""
peak = await crud_peak.create(db=db, obj_in=peak_in)
# Handle response serialization
p_dict = peak.__dict__.copy()
if peak.location is not None:
sh = to_shape(peak.location)
p_dict['location'] = mapping(sh)
return p_dict
@router.get("/{peak_id}", response_model=peak_schema.Peak)
async def read_peak(
*,
db: AsyncSession = Depends(get_db),
peak_id: int,
):
"""
Get peak by ID.
"""
peak = await crud_peak.get(db=db, id=peak_id)
if not peak:
raise HTTPException(status_code=404, detail="Peak not found")
p_dict = peak.__dict__.copy()
if peak.location is not None:
sh = to_shape(peak.location)
p_dict['location'] = mapping(sh)
return p_dict