nex_docus/backend/app/api/v1/preview.py

168 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
项目预览相关 API支持公开和私密项目
"""
from fastapi import APIRouter, Depends, HTTPException, Header
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
from app.core.database import get_db
from app.core.deps import get_current_user_optional
from app.models.project import Project, ProjectMember
from app.models.user import User
from app.schemas.response import success_response
from app.services.storage import storage_service
router = APIRouter()
async def check_preview_access(
project: Project,
current_user: Optional[User],
db: AsyncSession
):
"""检查预览访问权限"""
# 公开项目:任何人都可以访问
if project.is_public == 1:
return True
# 私密项目:必须是项目成员
if not current_user:
raise HTTPException(status_code=401, detail="私密项目需要登录才能访问")
# 检查是否是项目所有者
if project.owner_id == current_user.id:
return True
# 检查是否是项目成员
member_result = await db.execute(
select(ProjectMember).where(
ProjectMember.project_id == project.id,
ProjectMember.user_id == current_user.id
)
)
member = member_result.scalar_one_or_none()
if not member:
raise HTTPException(status_code=403, detail="无权访问该私密项目")
return True
@router.get("/{project_id}/info", response_model=dict)
async def get_preview_info(
project_id: int,
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目基本信息"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 返回基本信息
info = {
"id": project.id,
"name": project.name,
"description": project.description,
"is_public": project.is_public,
"has_password": bool(project.access_pass),
}
return success_response(data=info)
@router.post("/{project_id}/verify", response_model=dict)
async def verify_access_password(
project_id: int,
password: str = Header(..., alias="X-Access-Password"),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""验证访问密码"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 验证密码
if not project.access_pass:
return success_response(message="该项目无需密码访问")
if project.access_pass != password:
raise HTTPException(status_code=403, detail="访问密码错误")
return success_response(message="验证成功")
@router.get("/{project_id}/tree", response_model=dict)
async def get_preview_tree(
project_id: int,
password: Optional[str] = Header(None, alias="X-Access-Password"),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目的文档树"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 如果设置了密码,需要验证
if project.access_pass:
if not password or project.access_pass != password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文档树
project_path = storage_service.get_secure_path(project.storage_key)
tree = storage_service.generate_tree(project_path)
return success_response(data=tree)
@router.get("/{project_id}/file", response_model=dict)
async def get_preview_file(
project_id: int,
path: str,
password: Optional[str] = Header(None, alias="X-Access-Password"),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目的文件内容"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 如果设置了密码,需要验证
if project.access_pass:
if not password or project.access_pass != password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文件内容
file_path = storage_service.get_secure_path(project.storage_key, path)
content = await storage_service.read_file(file_path)
return success_response(data={"content": content})