nex_docus/backend/app/api/v1/dashboard.py

376 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
管理员仪表盘相关 API
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_
from datetime import datetime, date
from typing import Optional
import os
import glob
import json
from app.core.database import get_db
from app.core.deps import get_current_user
from app.core.config import settings
from app.models.user import User
from app.models.project import Project, ProjectMember
from app.models.log import OperationLog
from app.core.enums import OperationType, ResourceType
from app.schemas.response import success_response
router = APIRouter()
@router.get("/stats", response_model=dict)
async def get_dashboard_stats(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取仪表盘统计数据(仅管理员)"""
# 检查是否为超级管理员
if current_user.is_superuser != 1:
raise HTTPException(status_code=403, detail="仅管理员可访问")
# 统计用户数
user_count_result = await db.execute(select(func.count(User.id)))
user_count = user_count_result.scalar()
# 统计项目数
project_count_result = await db.execute(select(func.count(Project.id)))
project_count = project_count_result.scalar()
# 统计文档数(所有项目中的 .md 文件)
document_count = 0
if os.path.exists(settings.PROJECTS_PATH):
for project_dir in os.listdir(settings.PROJECTS_PATH):
project_path = os.path.join(settings.PROJECTS_PATH, project_dir)
if os.path.isdir(project_path):
md_files = glob.glob(os.path.join(project_path, "**/*.md"), recursive=True)
document_count += len(md_files)
# 获取最近创建的用户
recent_users_result = await db.execute(
select(User)
.order_by(User.created_at.desc())
.limit(5)
)
recent_users = recent_users_result.scalars().all()
recent_users_data = [
{
"id": user.id,
"username": user.username,
"email": user.email,
"created_at": user.created_at.isoformat() if user.created_at else None,
}
for user in recent_users
]
# 获取最近创建的项目(包含所有者信息)
recent_projects_result = await db.execute(
select(Project, User)
.join(User, Project.owner_id == User.id)
.order_by(Project.created_at.desc())
.limit(5)
)
recent_projects_rows = recent_projects_result.all()
recent_projects_data = [
{
"id": project.id,
"name": project.name,
"description": project.description,
"owner_name": owner.username,
"created_at": project.created_at.isoformat() if project.created_at else None,
}
for project, owner in recent_projects_rows
]
return success_response(
data={
"stats": {
"user_count": user_count,
"project_count": project_count,
"document_count": document_count,
},
"recent_users": recent_users_data,
"recent_projects": recent_projects_data,
}
)
@router.get("/personal-stats", response_model=dict)
async def get_personal_stats(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取个人桌面统计数据"""
# 统计个人项目数
personal_projects_count_result = await db.execute(
select(func.count(Project.id)).where(Project.owner_id == current_user.id)
)
personal_projects_count = personal_projects_count_result.scalar()
# 统计参加项目数(协作项目)
shared_projects_count_result = await db.execute(
select(func.count(Project.id))
.join(ProjectMember, Project.id == ProjectMember.project_id)
.where(ProjectMember.user_id == current_user.id)
.where(Project.owner_id != current_user.id)
)
shared_projects_count = shared_projects_count_result.scalar()
# 统计个人文档数(个人项目中的 .md 文件)
document_count = 0
personal_projects_result = await db.execute(
select(Project).where(Project.owner_id == current_user.id)
)
personal_projects = personal_projects_result.scalars().all()
for project in personal_projects:
project_path = os.path.join(settings.PROJECTS_PATH, project.storage_key)
if os.path.exists(project_path) and os.path.isdir(project_path):
md_files = glob.glob(os.path.join(project_path, "**/*.md"), recursive=True)
document_count += len(md_files)
# 获取最近的个人项目
recent_personal_projects_result = await db.execute(
select(Project)
.where(Project.owner_id == current_user.id)
.order_by(Project.created_at.desc())
.limit(5)
)
recent_personal_projects = recent_personal_projects_result.scalars().all()
recent_personal_projects_data = [
{
"id": project.id,
"name": project.name,
"description": project.description,
"created_at": project.created_at.isoformat() if project.created_at else None,
}
for project in recent_personal_projects
]
# 获取最近的分享项目(从 project_members 表)
recent_shared_projects_result = await db.execute(
select(Project, ProjectMember)
.join(ProjectMember, Project.id == ProjectMember.project_id)
.where(ProjectMember.user_id == current_user.id)
.where(Project.owner_id != current_user.id)
.order_by(ProjectMember.joined_at.desc())
.limit(5)
)
recent_shared_projects_rows = recent_shared_projects_result.all()
recent_shared_projects_data = [
{
"id": project.id,
"name": project.name,
"description": project.description,
"role": member.role,
"joined_at": member.joined_at.isoformat() if member.joined_at else None,
}
for project, member in recent_shared_projects_rows
]
return success_response(
data={
"user_info": {
"id": current_user.id,
"username": current_user.username,
"email": current_user.email,
"created_at": current_user.created_at.isoformat() if current_user.created_at else None,
},
"stats": {
"personal_projects_count": personal_projects_count,
"shared_projects_count": shared_projects_count,
"document_count": document_count,
},
"recent_personal_projects": recent_personal_projects_data,
"recent_shared_projects": recent_shared_projects_data,
}
)
@router.get("/document-activity-dates", response_model=dict)
async def get_document_activity_dates(
year: int = Query(..., description="年份"),
month: int = Query(..., description="月份"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取指定月份有文档操作的日期列表"""
# 计算月份的开始和结束日期
start_date = datetime(year, month, 1)
if month == 12:
end_date = datetime(year + 1, 1, 1)
else:
end_date = datetime(year, month + 1, 1)
# 查询该用户在指定月份内的文档操作日志
# 文档操作包括:创建文件、保存文件、删除文件、重命名文件、移动文件
document_operations = [
OperationType.CREATE_FILE,
OperationType.SAVE_FILE,
OperationType.DELETE_FILE,
OperationType.RENAME_FILE,
OperationType.MOVE_FILE,
]
result = await db.execute(
select(func.date(OperationLog.created_at).label('activity_date'), func.count(OperationLog.id).label('count'))
.where(
and_(
OperationLog.user_id == current_user.id,
OperationLog.operation_type.in_(document_operations),
OperationLog.created_at >= start_date,
OperationLog.created_at < end_date
)
)
.group_by(func.date(OperationLog.created_at))
.order_by(func.date(OperationLog.created_at))
)
activity_dates = result.all()
dates_data = [
{
"date": activity_date.isoformat(),
"count": count
}
for activity_date, count in activity_dates
]
return success_response(data={"dates": dates_data})
@router.get("/document-activity", response_model=dict)
async def get_document_activity(
date_str: str = Query(..., description="日期YYYY-MM-DD"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取指定日期的文档操作日志(按项目+文件+操作类型聚合)"""
# 解析日期
try:
target_date = datetime.strptime(date_str, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="日期格式错误,应为 YYYY-MM-DD")
# 计算日期范围
start_datetime = datetime.combine(target_date, datetime.min.time())
end_datetime = datetime.combine(target_date, datetime.max.time())
# 查询该用户在指定日期的文档操作日志
document_operations = [
OperationType.CREATE_FILE,
OperationType.SAVE_FILE,
OperationType.DELETE_FILE,
OperationType.RENAME_FILE,
OperationType.MOVE_FILE,
]
result = await db.execute(
select(OperationLog)
.where(
and_(
OperationLog.user_id == current_user.id,
OperationLog.operation_type.in_(document_operations),
OperationLog.created_at >= start_datetime,
OperationLog.created_at <= end_datetime
)
)
.order_by(OperationLog.created_at.desc())
)
logs = result.scalars().all()
# 操作类型中文映射
operation_map = {
OperationType.CREATE_FILE: "创建",
OperationType.SAVE_FILE: "保存",
OperationType.DELETE_FILE: "删除",
OperationType.RENAME_FILE: "重命名",
OperationType.MOVE_FILE: "移动",
}
# 聚合日志:按 (project_id, file_path, operation_type) 分组
aggregated = {}
for log in logs:
# 解析 detail 字段获取文件路径和项目ID
detail = json.loads(log.detail) if log.detail else {}
project_id = detail.get('project_id')
file_path = detail.get('path') or detail.get('file_path') or detail.get('old_path')
# 创建聚合键
key = (project_id, file_path, log.operation_type)
if key not in aggregated:
aggregated[key] = {
'project_id': project_id,
'file_path': file_path,
'operation_type': log.operation_type,
'count': 0,
'first_time': log.created_at,
'last_time': log.created_at,
}
aggregated[key]['count'] += 1
# 更新最早和最晚时间
if log.created_at < aggregated[key]['first_time']:
aggregated[key]['first_time'] = log.created_at
if log.created_at > aggregated[key]['last_time']:
aggregated[key]['last_time'] = log.created_at
# 构建返回数据,包含项目信息
logs_data = []
for key, agg in aggregated.items():
project_id = agg['project_id']
file_path = agg['file_path']
operation_type = agg['operation_type']
# 获取项目信息
project_name = None
project_storage_key = None
if project_id:
project_result = await db.execute(
select(Project).where(Project.id == project_id)
)
project = project_result.scalar_one_or_none()
if project:
project_name = project.name
project_storage_key = project.storage_key
# 检查文件是否存在(仅针对非删除操作)
file_exists = False
if project_storage_key and file_path and operation_type != OperationType.DELETE_FILE:
full_path = os.path.join(settings.PROJECTS_PATH, project_storage_key, file_path)
file_exists = os.path.exists(full_path) and os.path.isfile(full_path)
# 生成描述文本
operation_text = operation_map.get(operation_type, operation_type)
if agg['count'] > 1:
description = f"{operation_text} {agg['count']}"
else:
description = operation_text
logs_data.append({
"id": f"{project_id}_{file_path}_{operation_type}", # 唯一ID
"operation_type": description,
"operation_count": agg['count'],
"project_id": project_id,
"project_name": project_name or "未知项目",
"file_path": file_path or "未知文件",
"file_exists": file_exists,
"first_time": agg['first_time'].isoformat() if agg['first_time'] else None,
"last_time": agg['last_time'].isoformat() if agg['last_time'] else None,
"created_at": agg['last_time'].isoformat() if agg['last_time'] else None, # 用最后操作时间排序
})
# 按最后操作时间降序排序
logs_data.sort(key=lambda x: x['last_time'] if x['last_time'] else '', reverse=True)
return success_response(data={"logs": logs_data})