from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File from sqlalchemy.orm import Session from pathlib import Path import shutil from app.core.db import get_db from app.core.deps import get_current_user from app.schemas.user import UserOut, UserMe, UserCreate, UserUpdate, PasswordChange from app.models import User, UserRole, Role from app.core.security import hash_password, verify_password router = APIRouter(prefix="/users", tags=["users"]) @router.get("/me", response_model=UserMe) def get_me(current_user: User = Depends(get_current_user)): # 优先返回 role_code 用于前端权限判断 roles = [ur.role.role_code for ur in current_user.roles] user_dict = UserOut.model_validate(current_user).model_dump() user_dict["roles"] = roles return user_dict @router.put("/me/password") def change_password( payload: PasswordChange, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): if not verify_password(payload.old_password, current_user.password_hash): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Current password incorrect" ) current_user.password_hash = hash_password(payload.new_password) db.commit() return {"status": "ok"} @router.post("/me/avatar") def upload_avatar( file: UploadFile = File(...), current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): if not file.content_type.startswith("image/"): raise HTTPException(status_code=400, detail="Invalid file type") # Path resolution: backend/app/api/v1/endpoints/users.py -> root BASE_DIR = Path(__file__).resolve().parents[5] # parents[0] is endpoints, [1] v1, [2] api, [3] app, [4] backend, [5] root? # Let's verify: # file: .../backend/app/api/v1/endpoints/users.py # parent: .../backend/app/api/v1/endpoints # parents[0]: endpoints # parents[1]: v1 # parents[2]: api # parents[3]: app # parents[4]: backend # parents[5]: root (where storage is) STORAGE_DIR = BASE_DIR / "storage" user_dir = STORAGE_DIR / "users" / str(current_user.user_id) / "avatar" user_dir.mkdir(parents=True, exist_ok=True) file_path = user_dir / file.filename with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Assuming backend serves /storage avatar_url = f"/storage/users/{current_user.user_id}/avatar/{file.filename}" current_user.avatar = avatar_url db.commit() db.refresh(current_user) return {"avatar": avatar_url} @router.get("", response_model=list[UserOut]) def list_users(db: Session = Depends(get_db)): return db.query(User).filter(User.is_deleted.is_(False)).all() @router.post("", response_model=UserOut) def create_user(payload: UserCreate, db: Session = Depends(get_db)): exists = db.query(User).filter(User.username == payload.username).first() if exists: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username exists") password = payload.password or "123456" user = User( username=payload.username, display_name=payload.display_name, email=payload.email, phone=payload.phone, password_hash=hash_password(password), status=payload.status, is_deleted=False, ) db.add(user) db.flush() for role_id in payload.role_ids: db.add(UserRole(user_id=user.user_id, role_id=role_id)) db.commit() db.refresh(user) return user @router.put("/{user_id}", response_model=UserOut) def update_user(user_id: int, payload: UserUpdate, db: Session = Depends(get_db)): user = db.query(User).filter(User.user_id == user_id, User.is_deleted.is_(False)).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") if payload.display_name is not None: user.display_name = payload.display_name if payload.email is not None: user.email = payload.email if payload.phone is not None: user.phone = payload.phone if payload.status is not None: user.status = payload.status if payload.role_ids is not None: db.query(UserRole).filter(UserRole.user_id == user.user_id).delete() for role_id in payload.role_ids: db.add(UserRole(user_id=user.user_id, role_id=role_id)) db.commit() db.refresh(user) return user @router.delete("/{user_id}") def delete_user(user_id: int, db: Session = Depends(get_db)): user = db.query(User).filter(User.user_id == user_id, User.is_deleted.is_(False)).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") user.is_deleted = True db.commit() return {"status": "ok"} @router.get("/{user_id}/roles") def get_user_roles(user_id: int, db: Session = Depends(get_db)): roles = ( db.query(Role.role_id) .join(UserRole, UserRole.role_id == Role.role_id) .filter(UserRole.user_id == user_id) .all() ) return [r[0] for r in roles] @router.put("/{user_id}/roles") def update_user_roles(user_id: int, role_ids: list[int], db: Session = Depends(get_db)): db.query(UserRole).filter(UserRole.user_id == user_id).delete() for role_id in role_ids: db.add(UserRole(user_id=user_id, role_id=role_id)) db.commit() return {"status": "ok"} @router.post("/{user_id}/reset-password") def reset_password(user_id: int, password: str, db: Session = Depends(get_db)): user = db.query(User).filter(User.user_id == user_id, User.is_deleted.is_(False)).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") user.password_hash = hash_password(password) db.commit() return {"status": "ok"}