nex_docus/backend/app/api/v1/files.py

686 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
文件系统操作相关 API
"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Request
from fastapi.responses import StreamingResponse, FileResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List
import os
import zipfile
import io
import mimetypes
import json
from pathlib import Path
from app.core.database import get_db
from app.core.deps import get_current_user, get_user_from_token_or_query
from app.models.user import User
from app.models.project import Project, ProjectMember
from app.models.log import OperationLog
from app.schemas.file import (
FileTreeNode,
FileSaveRequest,
FileOperateRequest,
FileUploadResponse,
)
from app.schemas.response import success_response
from app.services.storage import storage_service
from app.services.log_service import log_service
from app.services.notification_service import notification_service
from app.services.search_service import search_service
from app.core.enums import OperationType
router = APIRouter()
async def check_project_access(
project_id: int,
current_user: User,
db: AsyncSession,
require_write: bool = False
):
"""检查项目访问权限"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查是否是项目所有者
if project.owner_id == current_user.id:
return project
# 检查是否是项目成员
member_result = await db.execute(
select(ProjectMember).where(
ProjectMember.project_id == project_id,
ProjectMember.user_id == current_user.id
)
)
member = member_result.scalar_one_or_none()
if not member:
if project.is_public == 1 and not require_write:
return project
raise HTTPException(status_code=403, detail="无权访问该项目")
# 如果需要写权限,检查成员角色
if require_write and member.role == "viewer":
raise HTTPException(status_code=403, detail="无写入权限")
return project
@router.get("/{project_id}/tree", response_model=dict)
async def get_project_tree(
project_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取项目目录树"""
project = await check_project_access(project_id, current_user, db)
# 获取项目根目录
project_root = storage_service.get_secure_path(project.storage_key)
# 生成目录树
tree = storage_service.generate_tree(project_root)
# 获取当前用户角色
user_role = "owner" # 默认是所有者
if project.owner_id != current_user.id:
# 查询成员角色
member_result = await db.execute(
select(ProjectMember).where(
ProjectMember.project_id == project_id,
ProjectMember.user_id == current_user.id
)
)
member = member_result.scalar_one_or_none()
if member:
user_role = member.role
return success_response(data={
"tree": tree,
"user_role": user_role,
"project_name": project.name,
"project_description": project.description
})
@router.get("/{project_id}/file", response_model=dict)
async def get_file_content(
project_id: int,
path: str,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取文件内容"""
project = await check_project_access(project_id, current_user, db)
# 获取文件路径
file_path = storage_service.get_secure_path(project.storage_key, path)
# 读取文件内容
content = await storage_service.read_file(file_path)
return success_response(data={"path": path, "content": content})
@router.post("/{project_id}/file", response_model=dict)
async def save_file(
project_id: int,
file_data: FileSaveRequest,
request: Request,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""保存文件内容"""
project = await check_project_access(project_id, current_user, db, require_write=True)
# 获取文件路径
file_path = storage_service.get_secure_path(project.storage_key, file_data.path)
# 写入文件内容
await storage_service.write_file(file_path, file_data.content)
# 更新搜索索引 (仅限 Markdown)
if file_data.path.endswith('.md'):
file_title = Path(file_data.path).stem
await search_service.update_doc(project_id, file_data.path, file_title, file_data.content)
# 记录操作日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.SAVE_FILE,
project_id=project_id,
file_path=file_data.path,
user=current_user,
detail={"content_length": len(file_data.content)},
request=request,
)
# 发送通知给其他成员
await notification_service.notify_project_members(
db=db,
project_id=project_id,
exclude_user_id=current_user.id,
title=f"项目文档更新",
content=f"项目 [{project.name}] 中的文档 [{file_data.path}] 已被 {current_user.nickname or current_user.username} 更新。",
link=f"/projects/{project_id}/docs?file={file_data.path}",
category="project"
)
await db.commit()
return success_response(message="文件保存成功")
@router.post("/{project_id}/file/operate", response_model=dict)
async def operate_file(
project_id: int,
operation: FileOperateRequest,
request: Request,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""文件操作(重命名、删除、创建目录、创建文件、移动)"""
project = await check_project_access(project_id, current_user, db, require_write=True)
# 获取当前路径
current_path = storage_service.get_secure_path(project.storage_key, operation.path)
if operation.action == "delete":
# 删除文件或文件夹
await storage_service.delete_file(current_path)
# 删除索引
if operation.path.endswith('.md'):
await search_service.remove_doc(project_id, operation.path)
# 记录日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.DELETE_FILE,
project_id=project_id,
file_path=operation.path,
user=current_user,
request=request,
)
# 发送通知
await notification_service.notify_project_members(
db=db,
project_id=project_id,
exclude_user_id=current_user.id,
title=f"项目文档删除",
content=f"项目 [{project.name}] 中的文档/目录 [{operation.path}] 已被 {current_user.nickname or current_user.username} 删除。",
category="project"
)
await db.commit()
return success_response(message="删除成功")
elif operation.action == "rename":
# 重命名
if not operation.new_path:
raise HTTPException(status_code=400, detail="缺少新路径参数")
new_path = storage_service.get_secure_path(project.storage_key, operation.new_path)
await storage_service.rename_file(current_path, new_path)
# 更新索引 (删除旧的,添加新的 - 如果内容未变也需要重新读取内容吗?
# 优化Whoosh 更新需要内容。我们可以尝试读取文件内容。
# 如果是目录重命名,比较复杂,暂时忽略目录重命名的递归索引更新,或者后续实现重建索引功能)
if operation.path.endswith('.md') and operation.new_path.endswith('.md'):
# 简单处理:读取新文件内容并更新索引
try:
content = await storage_service.read_file(new_path)
file_title = Path(operation.new_path).stem
await search_service.remove_doc(project_id, operation.path)
await search_service.update_doc(project_id, operation.new_path, file_title, content)
except Exception as e:
# 忽略索引更新错误
pass
# 记录日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.RENAME_FILE,
project_id=project_id,
file_path=operation.path,
user=current_user,
detail={"new_path": operation.new_path},
request=request,
)
# 发送通知
await notification_service.notify_project_members(
db=db,
project_id=project_id,
exclude_user_id=current_user.id,
title=f"项目文档重命名",
content=f"项目 [{project.name}] 中的文档 [{operation.path}] 已被重命名为 [{operation.new_path}]。",
category="project"
)
await db.commit()
return success_response(message="重命名成功")
elif operation.action == "move":
# 移动文件或文件夹
if not operation.new_path:
raise HTTPException(status_code=400, detail="缺少目标路径参数")
new_path = storage_service.get_secure_path(project.storage_key, operation.new_path)
await storage_service.rename_file(current_path, new_path)
# 更新索引
if operation.path.endswith('.md') and operation.new_path.endswith('.md'):
try:
content = await storage_service.read_file(new_path)
file_title = Path(operation.new_path).stem
await search_service.remove_doc(project_id, operation.path)
await search_service.update_doc(project_id, operation.new_path, file_title, content)
except Exception:
pass
# 记录日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.MOVE_FILE,
project_id=project_id,
file_path=operation.path,
user=current_user,
detail={"new_path": operation.new_path},
request=request,
)
# 发送通知
await notification_service.notify_project_members(
db=db,
project_id=project_id,
exclude_user_id=current_user.id,
title=f"项目文档移动",
content=f"项目 [{project.name}] 中的文档 [{operation.path}] 已移动到 [{operation.new_path}]。",
category="project"
)
await db.commit()
return success_response(message="移动成功")
elif operation.action == "create_dir":
# 创建目录
await storage_service.create_directory(current_path)
# 记录日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.CREATE_DIR,
project_id=project_id,
file_path=operation.path,
user=current_user,
request=request,
)
# 发送通知
await notification_service.notify_project_members(
db=db,
project_id=project_id,
exclude_user_id=current_user.id,
title=f"创建新目录",
content=f"{current_user.nickname or current_user.username} 在项目 [{project.name}] 中创建了新目录 [{operation.path}]。",
category="project"
)
await db.commit()
return success_response(message="目录创建成功")
elif operation.action == "create_file":
# 创建文件
content = operation.content or ""
await storage_service.write_file(current_path, content)
# 更新索引
if operation.path.endswith('.md'):
file_title = Path(operation.path).stem
await search_service.update_doc(project_id, operation.path, file_title, content)
# 记录日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.CREATE_FILE,
project_id=project_id,
file_path=operation.path,
user=current_user,
request=request,
)
# 发送通知
await notification_service.notify_project_members(
db=db,
project_id=project_id,
exclude_user_id=current_user.id,
title=f"创建新文档",
content=f"{current_user.nickname or current_user.username} 在项目 [{project.name}] 中创建了新文档 [{operation.path}]。",
link=f"/projects/{project_id}/docs?file={operation.path}",
category="project"
)
await db.commit()
return success_response(message="文件创建成功")
else:
raise HTTPException(status_code=400, detail="不支持的操作类型")
@router.post("/{project_id}/upload", response_model=dict)
async def upload_file(
project_id: int,
file: UploadFile = File(...),
subfolder: str = "images",
request: Request = None,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""上传文件(图片/附件)"""
project = await check_project_access(project_id, current_user, db, require_write=True)
# 上传文件
file_info = await storage_service.upload_file(
project.storage_key,
file,
subfolder=subfolder
)
# 构建访问 URL
file_info["url"] = f"/api/v1/files/{project_id}/assets/{subfolder}/{file_info['filename']}"
# 记录日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.UPLOAD_IMAGE,
project_id=project_id,
file_path=f"_assets/{subfolder}/{file_info['filename']}",
user=current_user,
detail={
"original_filename": file.filename,
"file_size": file_info.get("size", 0),
},
request=request,
)
return success_response(data=file_info, message="文件上传成功")
@router.post("/{project_id}/upload-document", response_model=dict)
async def upload_document(
project_id: int,
file: UploadFile = File(...),
target_dir: str = "",
request: Request = None,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""
上传文档文件PDF等到项目目录
"""
project = await check_project_access(project_id, current_user, db, require_write=True)
# 只允许PDF文件
allowed_extensions = [".pdf"]
# 上传文件
file_info = await storage_service.upload_document(
project.storage_key,
file,
target_dir=target_dir,
allowed_extensions=allowed_extensions
)
# 记录日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.UPLOAD_IMAGE, # 复用上传图片的日志类型
project_id=project_id,
file_path=file_info['path'],
user=current_user,
detail={
"original_filename": file_info['original_filename'],
"size": file_info['size'],
"target_dir": target_dir
},
request=request,
)
return success_response(data=file_info, message="文档上传成功")
@router.get("/{project_id}/document/{path:path}")
async def get_document_file(
project_id: int,
path: str,
request: Request,
current_user: User = Depends(get_user_from_token_or_query),
db: AsyncSession = Depends(get_db)
):
"""
获取文档文件PDF等- 支持 HTTP Range 请求
"""
import re
import aiofiles
project = await check_project_access(project_id, current_user, db)
# 获取文件路径
file_path = storage_service.get_secure_path(project.storage_key, path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="文件不存在")
# 判断文件类型
content_type, _ = mimetypes.guess_type(str(file_path))
if not content_type:
content_type = "application/octet-stream"
# 获取文件大小
file_size = file_path.stat().st_size
# 检查是否有 Range 请求头
range_header = request.headers.get("range")
if range_header:
# 解析 Range 头: bytes=start-end
range_match = re.match(r"bytes=(\d+)-(\d*)", range_header)
if range_match:
start = int(range_match.group(1))
end = int(range_match.group(2)) if range_match.group(2) else file_size - 1
end = min(end, file_size - 1)
# 读取指定范围的文件内容
async def file_iterator():
async with aiofiles.open(file_path, 'rb') as f:
await f.seek(start)
chunk_size = 1024 * 1024 # 1MB chunks
remaining = end - start + 1
while remaining > 0:
chunk = await f.read(min(chunk_size, remaining))
if not chunk:
break
remaining -= len(chunk)
yield chunk
headers = {
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(end - start + 1),
}
return StreamingResponse(
file_iterator(),
status_code=206, # Partial Content
headers=headers,
media_type=content_type
)
# 正常请求,返回完整文件(添加 Accept-Ranges 头)
return FileResponse(
path=str(file_path),
media_type=content_type,
filename=file_path.name,
headers={"Accept-Ranges": "bytes"}
)
@router.get("/{project_id}/assets/{subfolder}/{filename}")
async def get_asset_file(
project_id: int,
subfolder: str,
filename: str,
db: AsyncSession = Depends(get_db)
):
"""获取资源文件(公开访问,支持分享)"""
# 验证项目是否存在
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 获取文件路径
asset_path = f"_assets/{subfolder}/{filename}"
file_path = storage_service.get_secure_path(project.storage_key, asset_path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="文件不存在")
# 根据文件扩展名确定 MIME 类型
mime_type, _ = mimetypes.guess_type(filename)
if mime_type is None:
mime_type = "application/octet-stream"
# 返回文件(流式响应)
return FileResponse(
path=file_path,
filename=filename,
media_type=mime_type
)
@router.post("/{project_id}/import-documents", response_model=dict)
async def import_documents(
project_id: int,
files: List[UploadFile] = File(...),
target_path: str = "",
request: Request = None,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""批量导入Markdown文档"""
project = await check_project_access(project_id, current_user, db, require_write=True)
# 验证所有文件都是.md格式
for file in files:
if not file.filename.endswith('.md'):
raise HTTPException(status_code=400, detail=f"文件 {file.filename} 不是Markdown格式")
# 获取目标目录路径
target_dir = storage_service.get_secure_path(project.storage_key, target_path)
# 确保目标目录存在
if not target_dir.exists():
target_dir.mkdir(parents=True, exist_ok=True)
# 保存所有文件
imported_files = []
for file in files:
file_path = target_dir / file.filename
content = await file.read()
# 写入文件
with open(file_path, 'wb') as f:
f.write(content)
# 构建相对路径
relative_path = f"{target_path}/{file.filename}" if target_path else file.filename
imported_files.append(relative_path)
# 更新索引
try:
text_content = content.decode('utf-8')
file_title = Path(file.filename).stem
await search_service.update_doc(project_id, relative_path, file_title, text_content)
except Exception:
pass
# 记录日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.IMPORT_DOCUMENTS,
project_id=project_id,
file_path=target_path or "/",
user=current_user,
detail={
"file_count": len(imported_files),
"files": imported_files,
},
request=request,
)
return success_response(
data={"imported_files": imported_files},
message=f"成功导入 {len(imported_files)} 个文档"
)
@router.get("/{project_id}/export-directory")
async def export_directory(
project_id: int,
directory_path: str = "",
request: Request = None,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""导出目录为ZIP包"""
project = await check_project_access(project_id, current_user, db)
# 获取目标目录路径
source_dir = storage_service.get_secure_path(project.storage_key, directory_path)
if not source_dir.exists():
raise HTTPException(status_code=404, detail="目录不存在")
# 创建ZIP文件在内存中
zip_buffer = io.BytesIO()
file_count = 0
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# 遍历目录添加所有文件
for file_path in source_dir.rglob('*'):
if file_path.is_file():
# 计算相对路径
arcname = file_path.relative_to(source_dir)
zip_file.write(file_path, arcname)
file_count += 1
# 重置buffer位置
zip_buffer.seek(0)
# 生成ZIP文件名
zip_filename = f"{project.name}_{directory_path.replace('/', '_') if directory_path else 'root'}.zip"
# 记录日志
await log_service.log_file_operation(
db=db,
operation_type=OperationType.EXPORT_DOCUMENTS,
project_id=project_id,
file_path=directory_path or "/",
user=current_user,
detail={
"file_count": file_count,
"zip_filename": zip_filename,
},
request=request,
)
return StreamingResponse(
zip_buffer,
media_type="application/zip",
headers={
"Content-Disposition": f"attachment; filename={zip_filename}"
}
)