from typing import Any, List from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db from app.schemas import peak as peak_schema from app.services.crud_peak import peak as crud_peak import json from geoalchemy2.shape import to_shape from shapely.geometry import mapping router = APIRouter() @router.get("/", response_model=List[peak_schema.Peak]) async def read_peaks( db: AsyncSession = Depends(get_db), skip: int = 0, limit: int = 100, ): """ Retrieve all peaks. """ peaks = await crud_peak.get_multi(db, skip=skip, limit=limit) # Manually convert WKBElement to GeoJSON dict for response # This logic usually goes into a serializer or custom Pydantic validator results = [] for p in peaks: if p.location is not None: # Create a copy or simple dict to avoid mutating SQLAlchemy object state inappropriately if attached p_dict = p.__dict__.copy() # Convert WKBElement to Shapely object then to GeoJSON dict sh = to_shape(p.location) p_dict['location'] = mapping(sh) results.append(p_dict) else: results.append(p) return results @router.post("/", response_model=peak_schema.Peak) async def create_peak( *, db: AsyncSession = Depends(get_db), peak_in: peak_schema.PeakCreate, ): """ Create new peak. """ peak = await crud_peak.create(db=db, obj_in=peak_in) # Handle response serialization p_dict = peak.__dict__.copy() if peak.location is not None: sh = to_shape(peak.location) p_dict['location'] = mapping(sh) return p_dict @router.get("/{peak_id}", response_model=peak_schema.Peak) async def read_peak( *, db: AsyncSession = Depends(get_db), peak_id: int, ): """ Get peak by ID. """ peak = await crud_peak.get(db=db, id=peak_id) if not peak: raise HTTPException(status_code=404, detail="Peak not found") p_dict = peak.__dict__.copy() if peak.location is not None: sh = to_shape(peak.location) p_dict['location'] = mapping(sh) return p_dict