nex_docus/backend/app/api/v1/preview.py

116 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
项目预览相关 API公开访问支持分享
"""
from fastapi import APIRouter, Depends, HTTPException, Header
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
from app.core.database import get_db
from app.models.project import Project
from app.schemas.response import success_response
from app.services.storage import storage_service
router = APIRouter()
@router.get("/{project_id}/info", response_model=dict)
async def get_preview_info(
project_id: int,
db: AsyncSession = Depends(get_db)
):
"""获取预览项目基本信息(公开访问)"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 返回基本信息
info = {
"id": project.id,
"name": project.name,
"description": project.description,
"has_password": bool(project.access_pass),
}
return success_response(data=info)
@router.post("/{project_id}/verify", response_model=dict)
async def verify_access_password(
project_id: int,
password: str = Header(..., alias="X-Access-Password"),
db: AsyncSession = Depends(get_db)
):
"""验证访问密码"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 验证密码
if not project.access_pass:
return success_response(message="该项目无需密码访问")
if project.access_pass != password:
raise HTTPException(status_code=403, detail="访问密码错误")
return success_response(message="验证成功")
@router.get("/{project_id}/tree", response_model=dict)
async def get_preview_tree(
project_id: int,
password: Optional[str] = Header(None, alias="X-Access-Password"),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目的文档树(公开访问,需验证密码)"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 如果设置了密码,需要验证
if project.access_pass:
if not password or project.access_pass != password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文档树
project_path = storage_service.get_secure_path(project.storage_key)
tree = storage_service.generate_tree(project_path)
return success_response(data=tree)
@router.get("/{project_id}/file", response_model=dict)
async def get_preview_file(
project_id: int,
path: str,
password: Optional[str] = Header(None, alias="X-Access-Password"),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目的文件内容(公开访问,需验证密码)"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 如果设置了密码,需要验证
if project.access_pass:
if not password or project.access_pass != password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文件内容
file_path = storage_service.get_secure_path(project.storage_key, path)
content = await storage_service.read_file(file_path)
return success_response(data={"content": content})