""" 项目预览相关 API(支持公开和私密项目) """ from fastapi import APIRouter, Depends, HTTPException, Header from fastapi.responses import FileResponse from fastapi.security import HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from typing import Optional import mimetypes from app.core.database import get_db from app.core.deps import get_current_user_optional, security_optional from app.core.security import decode_access_token from app.core.redis_client import TokenCache from app.models.project import Project, ProjectMember from app.models.user import User from app.schemas.response import success_response from app.services.storage import storage_service router = APIRouter() async def check_preview_access( project: Project, current_user: Optional[User], db: AsyncSession ): """检查预览访问权限""" # 公开项目:任何人都可以访问 if project.is_public == 1: return True # 私密项目:必须是项目成员 if not current_user: raise HTTPException(status_code=401, detail="私密项目需要登录才能访问") # 检查是否是项目所有者 if project.owner_id == current_user.id: return True # 检查是否是项目成员 member_result = await db.execute( select(ProjectMember).where( ProjectMember.project_id == project.id, ProjectMember.user_id == current_user.id ) ) member = member_result.scalar_one_or_none() if not member: raise HTTPException(status_code=403, detail="无权访问该私密项目") return True @router.get("/{project_id}/info", response_model=dict) async def get_preview_info( project_id: int, current_user: Optional[User] = Depends(get_current_user_optional), db: AsyncSession = Depends(get_db) ): """获取预览项目基本信息""" # 查询项目 result = await db.execute(select(Project).where(Project.id == project_id)) project = result.scalar_one_or_none() if not project: raise HTTPException(status_code=404, detail="项目不存在") # 检查访问权限 await check_preview_access(project, current_user, db) # 返回基本信息 info = { "id": project.id, "name": project.name, "description": project.description, "is_public": project.is_public, "has_password": bool(project.access_pass), } return success_response(data=info) @router.post("/{project_id}/verify", response_model=dict) async def verify_access_password( project_id: int, password: str = Header(..., alias="X-Access-Password"), current_user: Optional[User] = Depends(get_current_user_optional), db: AsyncSession = Depends(get_db) ): """验证访问密码""" # 查询项目 result = await db.execute(select(Project).where(Project.id == project_id)) project = result.scalar_one_or_none() if not project: raise HTTPException(status_code=404, detail="项目不存在") # 检查访问权限 await check_preview_access(project, current_user, db) # 验证密码 if not project.access_pass: return success_response(message="该项目无需密码访问") if project.access_pass != password: raise HTTPException(status_code=403, detail="访问密码错误") return success_response(message="验证成功") @router.get("/{project_id}/tree", response_model=dict) async def get_preview_tree( project_id: int, password: Optional[str] = Header(None, alias="X-Access-Password"), current_user: Optional[User] = Depends(get_current_user_optional), db: AsyncSession = Depends(get_db) ): """获取预览项目的文档树""" # 查询项目 result = await db.execute(select(Project).where(Project.id == project_id)) project = result.scalar_one_or_none() if not project: raise HTTPException(status_code=404, detail="项目不存在") # 检查访问权限 await check_preview_access(project, current_user, db) # 如果设置了密码,需要验证 if project.access_pass: if not password or project.access_pass != password: raise HTTPException(status_code=403, detail="需要提供正确的访问密码") # 获取文档树 project_path = storage_service.get_secure_path(project.storage_key) tree = storage_service.generate_tree(project_path) return success_response(data=tree) @router.get("/{project_id}/file", response_model=dict) async def get_preview_file( project_id: int, path: str, password: Optional[str] = Header(None, alias="X-Access-Password"), current_user: Optional[User] = Depends(get_current_user_optional), db: AsyncSession = Depends(get_db) ): """获取预览项目的文件内容""" # 查询项目 result = await db.execute(select(Project).where(Project.id == project_id)) project = result.scalar_one_or_none() if not project: raise HTTPException(status_code=404, detail="项目不存在") # 检查访问权限 await check_preview_access(project, current_user, db) # 如果设置了密码,需要验证 if project.access_pass: if not password or project.access_pass != password: raise HTTPException(status_code=403, detail="需要提供正确的访问密码") # 获取文件内容 file_path = storage_service.get_secure_path(project.storage_key, path) content = await storage_service.read_file(file_path) return success_response(data={"content": content}) @router.get("/{project_id}/document/{path:path}") async def get_preview_document( project_id: int, path: str, password: Optional[str] = Header(None, alias="X-Access-Password"), access_pass: Optional[str] = None, # 支持密码查询参数 token: Optional[str] = None, # 支持token查询参数 credentials: Optional[HTTPAuthorizationCredentials] = Depends(security_optional), db: AsyncSession = Depends(get_db) ): """获取预览项目的文档文件(PDF等)- 返回文件流""" # 获取当前用户(支持header或query参数) current_user = None token_str = None if credentials: token_str = credentials.credentials elif token: token_str = token if token_str: try: user_id_from_redis = await TokenCache.get_user_id(token_str) if user_id_from_redis: payload = decode_access_token(token_str) if payload: user_id_str = payload.get("sub") if user_id_str: user_id = int(user_id_str) result = await db.execute(select(User).where(User.id == user_id)) current_user = result.scalar_one_or_none() except Exception: pass # 忽略token验证失败,继续作为未登录用户 # 查询项目 result = await db.execute(select(Project).where(Project.id == project_id)) project = result.scalar_one_or_none() if not project: raise HTTPException(status_code=404, detail="项目不存在") # 检查访问权限 await check_preview_access(project, current_user, db) # 如果设置了密码,需要验证(优先使用header,其次使用query参数) provided_password = password or access_pass if project.access_pass: if not provided_password or project.access_pass != provided_password: raise HTTPException(status_code=403, detail="需要提供正确的访问密码") # 获取文件 file_path = storage_service.get_secure_path(project.storage_key, path) if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail="文件不存在") content_type, _ = mimetypes.guess_type(str(file_path)) return FileResponse(path=str(file_path), media_type=content_type, filename=file_path.name)