nex_docus/backend/app/api/v1/users.py

446 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
用户管理 API
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, delete, or_
from typing import List, Optional
from pydantic import BaseModel, EmailStr
from app.core.database import get_db
from app.core.deps import get_current_user
from app.core.security import get_password_hash
from app.core.config import settings
from app.models.user import User
from app.models.role import Role, UserRole
from app.models.project import Project, ProjectMember
from app.schemas.response import success_response, error_response
router = APIRouter()
# === Pydantic Schemas ===
class UserCreateRequest(BaseModel):
"""创建用户请求"""
username: str
nickname: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
role_ids: List[int] = [] # 分配的角色ID列表
class UserUpdateRequest(BaseModel):
"""更新用户请求"""
nickname: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
status: Optional[int] = None
class UserRolesUpdateRequest(BaseModel):
"""更新用户角色请求"""
role_ids: List[int]
# === API Endpoints ===
@router.get("/", response_model=dict)
async def get_users(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
keyword: Optional[str] = Query(None, description="搜索关键词(用户名、昵称、邮箱)"),
status: Optional[int] = Query(None, description="状态筛选0-禁用 1-启用"),
role_id: Optional[int] = Query(None, description="角色ID筛选"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取用户列表(分页)"""
# 构建查询条件
conditions = []
if keyword:
conditions.append(
or_(
User.username.like(f"%{keyword}%"),
User.nickname.like(f"%{keyword}%"),
User.email.like(f"%{keyword}%")
)
)
if status is not None:
conditions.append(User.status == status)
# 如果需要按角色筛选
if role_id is not None:
# 查询用户列表时需要JOIN UserRole表
query = (
select(User)
.join(UserRole, UserRole.user_id == User.id)
.where(UserRole.role_id == role_id)
)
if conditions:
query = query.where(*conditions)
query = query.order_by(User.created_at.desc())
# 查询总数
count_query = (
select(func.count(User.id.distinct()))
.join(UserRole, UserRole.user_id == User.id)
.where(UserRole.role_id == role_id)
)
if conditions:
count_query = count_query.where(*conditions)
total_result = await db.execute(count_query)
total = total_result.scalar()
else:
# 查询总数
count_query = select(func.count(User.id))
if conditions:
count_query = count_query.where(*conditions)
total_result = await db.execute(count_query)
total = total_result.scalar()
# 查询用户列表
query = select(User).order_by(User.created_at.desc())
if conditions:
query = query.where(*conditions)
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
users = result.scalars().all()
# 获取每个用户的角色信息
users_data = []
for user in users:
# 获取用户角色
roles_result = await db.execute(
select(Role)
.join(UserRole, UserRole.role_id == Role.id)
.where(UserRole.user_id == user.id)
)
roles = roles_result.scalars().all()
users_data.append({
"id": user.id,
"username": user.username,
"nickname": user.nickname,
"email": user.email,
"phone": user.phone,
"avatar": user.avatar,
"status": user.status,
"is_superuser": user.is_superuser,
"last_login_at": user.last_login_at.isoformat() if user.last_login_at else None,
"created_at": user.created_at.isoformat() if user.created_at else None,
"roles": [{"id": r.id, "role_name": r.role_name, "role_code": r.role_code} for r in roles]
})
return {
"code": 200,
"message": "success",
"data": users_data,
"total": total,
"page": page,
"page_size": page_size
}
@router.post("/", response_model=dict)
async def create_user(
user_data: UserCreateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""创建新用户"""
# 检查用户名是否已存在
result = await db.execute(
select(User).where(User.username == user_data.username)
)
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(status_code=400, detail="用户名已存在")
# 检查邮箱是否已存在
if user_data.email:
result = await db.execute(
select(User).where(User.email == user_data.email)
)
existing_email = result.scalar_one_or_none()
if existing_email:
raise HTTPException(status_code=400, detail="邮箱已被使用")
# 创建用户
new_user = User(
username=user_data.username,
password_hash=get_password_hash(settings.DEFAULT_USER_PASSWORD),
nickname=user_data.nickname or user_data.username,
email=user_data.email,
phone=user_data.phone,
status=1,
is_superuser=0
)
db.add(new_user)
await db.flush()
# 分配角色
if user_data.role_ids:
# 验证角色ID是否存在
roles_result = await db.execute(
select(Role.id).where(Role.id.in_(user_data.role_ids))
)
valid_role_ids = [row[0] for row in roles_result.all()]
for role_id in valid_role_ids:
user_role = UserRole(user_id=new_user.id, role_id=role_id)
db.add(user_role)
await db.commit()
await db.refresh(new_user)
return success_response(
data={
"id": new_user.id,
"username": new_user.username,
"default_password": settings.DEFAULT_USER_PASSWORD
},
message="用户创建成功,请记住初始密码"
)
@router.get("/{user_id}", response_model=dict)
async def get_user(
user_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取用户详情"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 获取用户角色
roles_result = await db.execute(
select(Role)
.join(UserRole, UserRole.role_id == Role.id)
.where(UserRole.user_id == user.id)
)
roles = roles_result.scalars().all()
return success_response(data={
"id": user.id,
"username": user.username,
"nickname": user.nickname,
"email": user.email,
"phone": user.phone,
"avatar": user.avatar,
"status": user.status,
"is_superuser": user.is_superuser,
"last_login_at": user.last_login_at.isoformat() if user.last_login_at else None,
"created_at": user.created_at.isoformat() if user.created_at else None,
"roles": [{"id": r.id, "role_name": r.role_name, "role_code": r.role_code} for r in roles]
})
@router.put("/{user_id}", response_model=dict)
async def update_user(
user_id: int,
user_data: UserUpdateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""更新用户信息"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 更新字段
if user_data.nickname is not None:
user.nickname = user_data.nickname
if user_data.email is not None:
# 检查邮箱是否被其他用户使用
if user_data.email != user.email:
email_result = await db.execute(
select(User).where(User.email == user_data.email, User.id != user_id)
)
if email_result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="邮箱已被其他用户使用")
user.email = user_data.email
if user_data.phone is not None:
user.phone = user_data.phone
if user_data.status is not None:
user.status = user_data.status
await db.commit()
await db.refresh(user)
return success_response(data={"id": user.id}, message="用户信息更新成功")
@router.delete("/{user_id}", response_model=dict)
async def delete_user(
user_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""删除用户(需要检查是否有归属项目)"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 不允许删除超级管理员
if user.is_superuser == 1:
raise HTTPException(status_code=400, detail="不允许删除超级管理员")
# 不允许删除自己
if user_id == current_user.id:
raise HTTPException(status_code=400, detail="不允许删除当前登录用户")
# 检查用户是否拥有项目
owned_projects_result = await db.execute(
select(func.count(Project.id)).where(Project.owner_id == user_id)
)
owned_projects_count = owned_projects_result.scalar()
if owned_projects_count > 0:
raise HTTPException(
status_code=400,
detail=f"该用户拥有 {owned_projects_count} 个项目,无法删除。请先转移或删除这些项目。"
)
# 删除用户的角色关联
await db.execute(
delete(UserRole).where(UserRole.user_id == user_id)
)
# 删除用户的项目成员关联
await db.execute(
delete(ProjectMember).where(ProjectMember.user_id == user_id)
)
# 删除用户
await db.delete(user)
await db.commit()
return success_response(message="用户删除成功")
@router.put("/{user_id}/status", response_model=dict)
async def update_user_status(
user_id: int,
status: int = Query(..., ge=0, le=1, description="状态0-禁用 1-启用"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""更新用户状态(停用/启用)"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 不允许停用超级管理员
if user.is_superuser == 1 and status == 0:
raise HTTPException(status_code=400, detail="不允许停用超级管理员")
# 不允许停用自己
if user_id == current_user.id and status == 0:
raise HTTPException(status_code=400, detail="不允许停用当前登录用户")
user.status = status
await db.commit()
status_text = "启用" if status == 1 else "停用"
return success_response(message=f"用户已{status_text}")
@router.put("/{user_id}/roles", response_model=dict)
async def update_user_roles(
user_id: int,
roles_data: UserRolesUpdateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""更新用户角色"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 验证角色ID是否存在
if roles_data.role_ids:
roles_result = await db.execute(
select(Role.id).where(Role.id.in_(roles_data.role_ids))
)
valid_role_ids = [row[0] for row in roles_result.all()]
invalid_ids = set(roles_data.role_ids) - set(valid_role_ids)
if invalid_ids:
raise HTTPException(
status_code=400,
detail=f"以下角色ID不存在: {', '.join(map(str, invalid_ids))}"
)
else:
valid_role_ids = []
# 删除原有角色关联
await db.execute(
delete(UserRole).where(UserRole.user_id == user_id)
)
# 添加新角色关联
for role_id in valid_role_ids:
user_role = UserRole(user_id=user_id, role_id=role_id)
db.add(user_role)
await db.commit()
return success_response(message="用户角色更新成功")
@router.post("/{user_id}/reset-password", response_model=dict)
async def reset_user_password(
user_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""重置用户密码为初始密码"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 重置密码
user.password_hash = get_password_hash(settings.DEFAULT_USER_PASSWORD)
await db.commit()
return success_response(
data={"default_password": settings.DEFAULT_USER_PASSWORD},
message="密码已重置为初始密码"
)