""" 用户管理 API """ from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, delete, or_ from typing import List, Optional from pydantic import BaseModel, EmailStr from app.core.database import get_db from app.core.deps import get_current_user from app.core.security import get_password_hash from app.core.config import settings from app.models.user import User from app.models.role import Role, UserRole from app.models.project import Project, ProjectMember from app.schemas.response import success_response, error_response router = APIRouter() # === Pydantic Schemas === class UserCreateRequest(BaseModel): """创建用户请求""" username: str nickname: Optional[str] = None email: Optional[EmailStr] = None phone: Optional[str] = None role_ids: List[int] = [] # 分配的角色ID列表 class UserUpdateRequest(BaseModel): """更新用户请求""" nickname: Optional[str] = None email: Optional[EmailStr] = None phone: Optional[str] = None status: Optional[int] = None class UserRolesUpdateRequest(BaseModel): """更新用户角色请求""" role_ids: List[int] # === API Endpoints === @router.get("/", response_model=dict) async def get_users( page: int = Query(1, ge=1), page_size: int = Query(10, ge=1, le=100), keyword: Optional[str] = Query(None, description="搜索关键词(用户名、昵称、邮箱)"), status: Optional[int] = Query(None, description="状态筛选:0-禁用 1-启用"), role_id: Optional[int] = Query(None, description="角色ID筛选"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """获取用户列表(分页)""" # 构建查询条件 conditions = [] if keyword: conditions.append( or_( User.username.like(f"%{keyword}%"), User.nickname.like(f"%{keyword}%"), User.email.like(f"%{keyword}%") ) ) if status is not None: conditions.append(User.status == status) # 如果需要按角色筛选 if role_id is not None: # 查询用户列表时需要JOIN UserRole表 query = ( select(User) .join(UserRole, UserRole.user_id == User.id) .where(UserRole.role_id == role_id) ) if conditions: query = query.where(*conditions) query = query.order_by(User.created_at.desc()) # 查询总数 count_query = ( select(func.count(User.id.distinct())) .join(UserRole, UserRole.user_id == User.id) .where(UserRole.role_id == role_id) ) if conditions: count_query = count_query.where(*conditions) total_result = await db.execute(count_query) total = total_result.scalar() else: # 查询总数 count_query = select(func.count(User.id)) if conditions: count_query = count_query.where(*conditions) total_result = await db.execute(count_query) total = total_result.scalar() # 查询用户列表 query = select(User).order_by(User.created_at.desc()) if conditions: query = query.where(*conditions) query = query.offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) users = result.scalars().all() # 获取每个用户的角色信息 users_data = [] for user in users: # 获取用户角色 roles_result = await db.execute( select(Role) .join(UserRole, UserRole.role_id == Role.id) .where(UserRole.user_id == user.id) ) roles = roles_result.scalars().all() users_data.append({ "id": user.id, "username": user.username, "nickname": user.nickname, "email": user.email, "phone": user.phone, "avatar": user.avatar, "status": user.status, "is_superuser": user.is_superuser, "last_login_at": user.last_login_at.isoformat() if user.last_login_at else None, "created_at": user.created_at.isoformat() if user.created_at else None, "roles": [{"id": r.id, "role_name": r.role_name, "role_code": r.role_code} for r in roles] }) return { "code": 200, "message": "success", "data": users_data, "total": total, "page": page, "page_size": page_size } @router.post("/", response_model=dict) async def create_user( user_data: UserCreateRequest, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """创建新用户""" # 检查用户名是否已存在 result = await db.execute( select(User).where(User.username == user_data.username) ) existing_user = result.scalar_one_or_none() if existing_user: raise HTTPException(status_code=400, detail="用户名已存在") # 检查邮箱是否已存在 if user_data.email: result = await db.execute( select(User).where(User.email == user_data.email) ) existing_email = result.scalar_one_or_none() if existing_email: raise HTTPException(status_code=400, detail="邮箱已被使用") # 创建用户 new_user = User( username=user_data.username, password_hash=get_password_hash(settings.DEFAULT_USER_PASSWORD), nickname=user_data.nickname or user_data.username, email=user_data.email, phone=user_data.phone, status=1, is_superuser=0 ) db.add(new_user) await db.flush() # 分配角色 if user_data.role_ids: # 验证角色ID是否存在 roles_result = await db.execute( select(Role.id).where(Role.id.in_(user_data.role_ids)) ) valid_role_ids = [row[0] for row in roles_result.all()] for role_id in valid_role_ids: user_role = UserRole(user_id=new_user.id, role_id=role_id) db.add(user_role) await db.commit() await db.refresh(new_user) return success_response( data={ "id": new_user.id, "username": new_user.username, "default_password": settings.DEFAULT_USER_PASSWORD }, message="用户创建成功,请记住初始密码" ) @router.get("/{user_id}", response_model=dict) async def get_user( user_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """获取用户详情""" # 查询用户 result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="用户不存在") # 获取用户角色 roles_result = await db.execute( select(Role) .join(UserRole, UserRole.role_id == Role.id) .where(UserRole.user_id == user.id) ) roles = roles_result.scalars().all() return success_response(data={ "id": user.id, "username": user.username, "nickname": user.nickname, "email": user.email, "phone": user.phone, "avatar": user.avatar, "status": user.status, "is_superuser": user.is_superuser, "last_login_at": user.last_login_at.isoformat() if user.last_login_at else None, "created_at": user.created_at.isoformat() if user.created_at else None, "roles": [{"id": r.id, "role_name": r.role_name, "role_code": r.role_code} for r in roles] }) @router.put("/{user_id}", response_model=dict) async def update_user( user_id: int, user_data: UserUpdateRequest, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """更新用户信息""" # 查询用户 result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="用户不存在") # 更新字段 if user_data.nickname is not None: user.nickname = user_data.nickname if user_data.email is not None: # 检查邮箱是否被其他用户使用 if user_data.email != user.email: email_result = await db.execute( select(User).where(User.email == user_data.email, User.id != user_id) ) if email_result.scalar_one_or_none(): raise HTTPException(status_code=400, detail="邮箱已被其他用户使用") user.email = user_data.email if user_data.phone is not None: user.phone = user_data.phone if user_data.status is not None: user.status = user_data.status await db.commit() await db.refresh(user) return success_response(data={"id": user.id}, message="用户信息更新成功") @router.delete("/{user_id}", response_model=dict) async def delete_user( user_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """删除用户(需要检查是否有归属项目)""" # 查询用户 result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="用户不存在") # 不允许删除超级管理员 if user.is_superuser == 1: raise HTTPException(status_code=400, detail="不允许删除超级管理员") # 不允许删除自己 if user_id == current_user.id: raise HTTPException(status_code=400, detail="不允许删除当前登录用户") # 检查用户是否拥有项目 owned_projects_result = await db.execute( select(func.count(Project.id)).where(Project.owner_id == user_id) ) owned_projects_count = owned_projects_result.scalar() if owned_projects_count > 0: raise HTTPException( status_code=400, detail=f"该用户拥有 {owned_projects_count} 个项目,无法删除。请先转移或删除这些项目。" ) # 删除用户的角色关联 await db.execute( delete(UserRole).where(UserRole.user_id == user_id) ) # 删除用户的项目成员关联 await db.execute( delete(ProjectMember).where(ProjectMember.user_id == user_id) ) # 删除用户 await db.delete(user) await db.commit() return success_response(message="用户删除成功") @router.put("/{user_id}/status", response_model=dict) async def update_user_status( user_id: int, status: int = Query(..., ge=0, le=1, description="状态:0-禁用 1-启用"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """更新用户状态(停用/启用)""" # 查询用户 result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="用户不存在") # 不允许停用超级管理员 if user.is_superuser == 1 and status == 0: raise HTTPException(status_code=400, detail="不允许停用超级管理员") # 不允许停用自己 if user_id == current_user.id and status == 0: raise HTTPException(status_code=400, detail="不允许停用当前登录用户") user.status = status await db.commit() status_text = "启用" if status == 1 else "停用" return success_response(message=f"用户已{status_text}") @router.put("/{user_id}/roles", response_model=dict) async def update_user_roles( user_id: int, roles_data: UserRolesUpdateRequest, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """更新用户角色""" # 查询用户 result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="用户不存在") # 验证角色ID是否存在 if roles_data.role_ids: roles_result = await db.execute( select(Role.id).where(Role.id.in_(roles_data.role_ids)) ) valid_role_ids = [row[0] for row in roles_result.all()] invalid_ids = set(roles_data.role_ids) - set(valid_role_ids) if invalid_ids: raise HTTPException( status_code=400, detail=f"以下角色ID不存在: {', '.join(map(str, invalid_ids))}" ) else: valid_role_ids = [] # 删除原有角色关联 await db.execute( delete(UserRole).where(UserRole.user_id == user_id) ) # 添加新角色关联 for role_id in valid_role_ids: user_role = UserRole(user_id=user_id, role_id=role_id) db.add(user_role) await db.commit() return success_response(message="用户角色更新成功") @router.post("/{user_id}/reset-password", response_model=dict) async def reset_user_password( user_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """重置用户密码为初始密码""" # 查询用户 result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="用户不存在") # 重置密码 user.password_hash = get_password_hash(settings.DEFAULT_USER_PASSWORD) await db.commit() return success_response( data={"default_password": settings.DEFAULT_USER_PASSWORD}, message="密码已重置为初始密码" )