nex_docus/backend/app/api/v1/preview.py

234 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
项目预览相关 API支持公开和私密项目
"""
from fastapi import APIRouter, Depends, HTTPException, Header
from fastapi.responses import FileResponse
from fastapi.security import HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
import mimetypes
from app.core.database import get_db
from app.core.deps import get_current_user_optional, security_optional
from app.core.security import decode_access_token
from app.core.redis_client import TokenCache
from app.models.project import Project, ProjectMember
from app.models.user import User
from app.schemas.response import success_response
from app.services.storage import storage_service
router = APIRouter()
async def check_preview_access(
project: Project,
current_user: Optional[User],
db: AsyncSession
):
"""检查预览访问权限"""
# 公开项目:任何人都可以访问
if project.is_public == 1:
return True
# 私密项目:必须是项目成员
if not current_user:
raise HTTPException(status_code=401, detail="私密项目需要登录才能访问")
# 检查是否是项目所有者
if project.owner_id == current_user.id:
return True
# 检查是否是项目成员
member_result = await db.execute(
select(ProjectMember).where(
ProjectMember.project_id == project.id,
ProjectMember.user_id == current_user.id
)
)
member = member_result.scalar_one_or_none()
if not member:
raise HTTPException(status_code=403, detail="无权访问该私密项目")
return True
@router.get("/{project_id}/info", response_model=dict)
async def get_preview_info(
project_id: int,
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目基本信息"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 返回基本信息
info = {
"id": project.id,
"name": project.name,
"description": project.description,
"is_public": project.is_public,
"has_password": bool(project.access_pass),
}
return success_response(data=info)
@router.post("/{project_id}/verify", response_model=dict)
async def verify_access_password(
project_id: int,
password: str = Header(..., alias="X-Access-Password"),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""验证访问密码"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 验证密码
if not project.access_pass:
return success_response(message="该项目无需密码访问")
if project.access_pass != password:
raise HTTPException(status_code=403, detail="访问密码错误")
return success_response(message="验证成功")
@router.get("/{project_id}/tree", response_model=dict)
async def get_preview_tree(
project_id: int,
password: Optional[str] = Header(None, alias="X-Access-Password"),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目的文档树"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 如果设置了密码,需要验证
if project.access_pass:
if not password or project.access_pass != password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文档树
project_path = storage_service.get_secure_path(project.storage_key)
tree = storage_service.generate_tree(project_path)
return success_response(data=tree)
@router.get("/{project_id}/file", response_model=dict)
async def get_preview_file(
project_id: int,
path: str,
password: Optional[str] = Header(None, alias="X-Access-Password"),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目的文件内容"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 如果设置了密码,需要验证
if project.access_pass:
if not password or project.access_pass != password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文件内容
file_path = storage_service.get_secure_path(project.storage_key, path)
content = await storage_service.read_file(file_path)
return success_response(data={"content": content})
@router.get("/{project_id}/document/{path:path}")
async def get_preview_document(
project_id: int,
path: str,
password: Optional[str] = Header(None, alias="X-Access-Password"),
access_pass: Optional[str] = None, # 支持密码查询参数
token: Optional[str] = None, # 支持token查询参数
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security_optional),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目的文档文件PDF等- 返回文件流"""
# 获取当前用户支持header或query参数
current_user = None
token_str = None
if credentials:
token_str = credentials.credentials
elif token:
token_str = token
if token_str:
try:
user_id_from_redis = await TokenCache.get_user_id(token_str)
if user_id_from_redis:
payload = decode_access_token(token_str)
if payload:
user_id_str = payload.get("sub")
if user_id_str:
user_id = int(user_id_str)
result = await db.execute(select(User).where(User.id == user_id))
current_user = result.scalar_one_or_none()
except Exception:
pass # 忽略token验证失败继续作为未登录用户
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 如果设置了密码需要验证优先使用header其次使用query参数
provided_password = password or access_pass
if project.access_pass:
if not provided_password or project.access_pass != provided_password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文件
file_path = storage_service.get_secure_path(project.storage_key, path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="文件不存在")
content_type, _ = mimetypes.guess_type(str(file_path))
return FileResponse(path=str(file_path), media_type=content_type, filename=file_path.name)