nex_docus/backend/app/api/v1/logs.py

165 lines
5.1 KiB
Python

"""
系统日志相关 API
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from typing import Optional
from datetime import datetime
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.user import User
from app.models.log import OperationLog
from app.models.project import Project
from app.schemas.response import success_response
router = APIRouter()
@router.get("/", response_model=dict)
async def get_operation_logs(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
operation_type: Optional[str] = None,
resource_type: Optional[str] = None,
user_id: Optional[int] = None,
project_id: Optional[int] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""
获取操作日志列表
支持过滤:
- operation_type: 操作类型
- resource_type: 资源类型
- user_id: 用户ID
- project_id: 项目ID
- start_date: 开始日期 (YYYY-MM-DD)
- end_date: 结束日期 (YYYY-MM-DD)
"""
# 构建查询条件
conditions = []
if operation_type:
conditions.append(OperationLog.operation_type == operation_type)
if resource_type:
conditions.append(OperationLog.resource_type == resource_type)
if user_id:
conditions.append(OperationLog.user_id == user_id)
if project_id:
conditions.append(OperationLog.resource_id == project_id)
if start_date:
try:
start_datetime = datetime.strptime(start_date, "%Y-%m-%d")
conditions.append(OperationLog.created_at >= start_datetime)
except ValueError:
raise HTTPException(status_code=400, detail="开始日期格式错误")
if end_date:
try:
end_datetime = datetime.strptime(f"{end_date} 23:59:59", "%Y-%m-%d %H:%M:%S")
conditions.append(OperationLog.created_at <= end_datetime)
except ValueError:
raise HTTPException(status_code=400, detail="结束日期格式错误")
# 查询总数
count_query = select(func.count(OperationLog.id))
if conditions:
count_query = count_query.where(and_(*conditions))
total_result = await db.execute(count_query)
total = total_result.scalar()
# 查询日志列表
query = select(OperationLog).order_by(OperationLog.created_at.desc())
if conditions:
query = query.where(and_(*conditions))
# 分页
offset = (page - 1) * page_size
query = query.offset(offset).limit(page_size)
result = await db.execute(query)
logs = result.scalars().all()
# 转换为字典格式
logs_data = []
for log in logs:
log_dict = {
"id": log.id,
"user_id": log.user_id,
"username": log.username,
"operation_type": log.operation_type,
"resource_type": log.resource_type,
"resource_id": log.resource_id,
"detail": log.detail,
"ip_address": log.ip_address,
"user_agent": log.user_agent,
"status": log.status,
"error_message": log.error_message,
"created_at": log.created_at.isoformat() if log.created_at else None,
}
logs_data.append(log_dict)
return success_response(data={
"items": logs_data,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
})
@router.get("/stats", response_model=dict)
async def get_log_stats(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取日志统计信息"""
# 统计各类操作数量
operation_stats_query = select(
OperationLog.operation_type,
func.count(OperationLog.id).label('count')
).group_by(OperationLog.operation_type)
operation_result = await db.execute(operation_stats_query)
operation_stats = {row.operation_type: row.count for row in operation_result}
# 统计各资源类型数量
resource_stats_query = select(
OperationLog.resource_type,
func.count(OperationLog.id).label('count')
).group_by(OperationLog.resource_type)
resource_result = await db.execute(resource_stats_query)
resource_stats = {row.resource_type: row.count for row in resource_result}
# 统计今天的操作数量
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
today_count_query = select(func.count(OperationLog.id)).where(
OperationLog.created_at >= today_start
)
today_result = await db.execute(today_count_query)
today_count = today_result.scalar()
# 总日志数量
total_query = select(func.count(OperationLog.id))
total_result = await db.execute(total_query)
total_count = total_result.scalar()
return success_response(data={
"operation_stats": operation_stats,
"resource_stats": resource_stats,
"today_count": today_count,
"total_count": total_count,
})