nex_docus/backend/app/api/v1/files.py

311 lines
9.9 KiB
Python

"""
文件系统操作相关 API
"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from fastapi.responses import StreamingResponse, FileResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List
import os
import zipfile
import io
from pathlib import Path
from app.core.database import get_db
from app.core.deps import get_current_user, get_user_from_token_or_query
from app.models.user import User
from app.models.project import Project, ProjectMember
from app.schemas.file import (
FileTreeNode,
FileSaveRequest,
FileOperateRequest,
FileUploadResponse,
)
from app.schemas.response import success_response
from app.services.storage import storage_service
router = APIRouter()
async def check_project_access(
project_id: int,
current_user: User,
db: AsyncSession,
require_write: bool = False
):
"""检查项目访问权限"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查是否是项目所有者
if project.owner_id == current_user.id:
return project
# 检查是否是项目成员
member_result = await db.execute(
select(ProjectMember).where(
ProjectMember.project_id == project_id,
ProjectMember.user_id == current_user.id
)
)
member = member_result.scalar_one_or_none()
if not member:
if project.is_public == 1 and not require_write:
return project
raise HTTPException(status_code=403, detail="无权访问该项目")
# 如果需要写权限,检查成员角色
if require_write and member.role == "viewer":
raise HTTPException(status_code=403, detail="无写入权限")
return project
@router.get("/{project_id}/tree", response_model=dict)
async def get_project_tree(
project_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取项目目录树"""
project = await check_project_access(project_id, current_user, db)
# 获取项目根目录
project_root = storage_service.get_secure_path(project.storage_key)
# 生成目录树
tree = storage_service.generate_tree(project_root)
return success_response(data=tree)
@router.get("/{project_id}/file", response_model=dict)
async def get_file_content(
project_id: int,
path: str,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取文件内容"""
project = await check_project_access(project_id, current_user, db)
# 获取文件路径
file_path = storage_service.get_secure_path(project.storage_key, path)
# 读取文件内容
content = await storage_service.read_file(file_path)
return success_response(data={"path": path, "content": content})
@router.post("/{project_id}/file", response_model=dict)
async def save_file(
project_id: int,
file_data: FileSaveRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""保存文件内容"""
project = await check_project_access(project_id, current_user, db, require_write=True)
# 获取文件路径
file_path = storage_service.get_secure_path(project.storage_key, file_data.path)
# 写入文件内容
await storage_service.write_file(file_path, file_data.content)
return success_response(message="文件保存成功")
@router.post("/{project_id}/file/operate", response_model=dict)
async def operate_file(
project_id: int,
operation: FileOperateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""文件操作(重命名、删除、创建目录、创建文件、移动)"""
project = await check_project_access(project_id, current_user, db, require_write=True)
# 获取当前路径
current_path = storage_service.get_secure_path(project.storage_key, operation.path)
if operation.action == "delete":
# 删除文件或文件夹
await storage_service.delete_file(current_path)
return success_response(message="删除成功")
elif operation.action == "rename":
# 重命名
if not operation.new_path:
raise HTTPException(status_code=400, detail="缺少新路径参数")
new_path = storage_service.get_secure_path(project.storage_key, operation.new_path)
await storage_service.rename_file(current_path, new_path)
return success_response(message="重命名成功")
elif operation.action == "move":
# 移动文件或文件夹
if not operation.new_path:
raise HTTPException(status_code=400, detail="缺少目标路径参数")
new_path = storage_service.get_secure_path(project.storage_key, operation.new_path)
await storage_service.rename_file(current_path, new_path)
return success_response(message="移动成功")
elif operation.action == "create_dir":
# 创建目录
await storage_service.create_directory(current_path)
return success_response(message="目录创建成功")
elif operation.action == "create_file":
# 创建文件
content = operation.content or ""
await storage_service.write_file(current_path, content)
return success_response(message="文件创建成功")
else:
raise HTTPException(status_code=400, detail="不支持的操作类型")
@router.post("/{project_id}/upload", response_model=dict)
async def upload_file(
project_id: int,
file: UploadFile = File(...),
subfolder: str = "images",
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""上传文件(图片/附件)"""
project = await check_project_access(project_id, current_user, db, require_write=True)
# 上传文件
file_info = await storage_service.upload_file(
project.storage_key,
file,
subfolder=subfolder
)
# 构建访问 URL
file_info["url"] = f"/api/v1/files/{project_id}/assets/{subfolder}/{file_info['filename']}"
return success_response(data=file_info, message="文件上传成功")
@router.get("/{project_id}/assets/{subfolder}/{filename}")
async def get_asset_file(
project_id: int,
subfolder: str,
filename: str,
db: AsyncSession = Depends(get_db)
):
"""获取资源文件(公开访问,支持分享)"""
# 验证项目是否存在
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 获取文件路径
asset_path = f"_assets/{subfolder}/{filename}"
file_path = storage_service.get_secure_path(project.storage_key, asset_path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="文件不存在")
# 返回文件(流式响应)
return FileResponse(
path=file_path,
filename=filename,
media_type="application/octet-stream"
)
@router.post("/{project_id}/import-documents", response_model=dict)
async def import_documents(
project_id: int,
files: List[UploadFile] = File(...),
target_path: str = "",
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""批量导入Markdown文档"""
project = await check_project_access(project_id, current_user, db, require_write=True)
# 验证所有文件都是.md格式
for file in files:
if not file.filename.endswith('.md'):
raise HTTPException(status_code=400, detail=f"文件 {file.filename} 不是Markdown格式")
# 获取目标目录路径
target_dir = storage_service.get_secure_path(project.storage_key, target_path)
# 确保目标目录存在
if not target_dir.exists():
target_dir.mkdir(parents=True, exist_ok=True)
# 保存所有文件
imported_files = []
for file in files:
file_path = target_dir / file.filename
content = await file.read()
# 写入文件
with open(file_path, 'wb') as f:
f.write(content)
# 构建相对路径
relative_path = f"{target_path}/{file.filename}" if target_path else file.filename
imported_files.append(relative_path)
return success_response(
data={"imported_files": imported_files},
message=f"成功导入 {len(imported_files)} 个文档"
)
@router.get("/{project_id}/export-directory")
async def export_directory(
project_id: int,
directory_path: str = "",
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""导出目录为ZIP包"""
project = await check_project_access(project_id, current_user, db)
# 获取目标目录路径
source_dir = storage_service.get_secure_path(project.storage_key, directory_path)
if not source_dir.exists():
raise HTTPException(status_code=404, detail="目录不存在")
# 创建ZIP文件在内存中
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# 遍历目录添加所有文件
for file_path in source_dir.rglob('*'):
if file_path.is_file():
# 计算相对路径
arcname = file_path.relative_to(source_dir)
zip_file.write(file_path, arcname)
# 重置buffer位置
zip_buffer.seek(0)
# 生成ZIP文件名
zip_filename = f"{project.name}_{directory_path.replace('/', '_') if directory_path else 'root'}.zip"
return StreamingResponse(
zip_buffer,
media_type="application/zip",
headers={
"Content-Disposition": f"attachment; filename={zip_filename}"
}
)