""" 通知管理 API (Redis版) """ from fastapi import APIRouter, Depends, HTTPException from typing import List, Union from datetime import datetime from app.core.database import get_db from app.core.deps import get_current_user from app.models.user import User from app.schemas.notification import ( NotificationResponse, NotificationCreate, ) from app.schemas.response import success_response from app.services.notification_service import notification_service router = APIRouter() @router.get("/", response_model=dict) async def get_notifications( page: int = 1, page_size: int = 20, unread_only: bool = False, current_user: User = Depends(get_current_user), ): """获取当前用户的通知列表 (从 Redis 读取)""" try: skip = (page - 1) * page_size notifications = await notification_service.get_user_notifications( user_id=current_user.id, limit=page_size, skip=skip, unread_only=unread_only ) # 获取总数 (ZCARD) from app.core.redis_client import get_redis redis = get_redis() total = 0 if redis: if not unread_only: key = notification_service._get_order_key(current_user.id) total = await redis.zcard(key) else: # 对于未读过滤,总数即为过滤后的列表长度 # 这里为了简单直接使用返回列表的长度(如果没满一页)或者重新计算 total = await notification_service.get_unread_count(current_user.id) return { "code": 200, "message": "success", "data": notifications, "total": total, "page": page, "page_size": page_size } except Exception as e: # 降级处理,防止 500 print(f"Error fetching notifications: {e}") return { "code": 200, "message": "success", "data": [], "total": 0, "page": page, "page_size": page_size } @router.get("/unread-count", response_model=dict) async def get_unread_count( current_user: User = Depends(get_current_user), ): """获取未读通知数量""" try: count = await notification_service.get_unread_count(current_user.id) return success_response(data={"unread_count": count}) except Exception as e: print(f"Error fetching unread count: {e}") return success_response(data={"unread_count": 0}) @router.put("/{notification_id}/read", response_model=dict) async def mark_as_read( notification_id: str, current_user: User = Depends(get_current_user), ): """标记单条通知为已读""" await notification_service.mark_read(current_user.id, notification_id) return success_response(message="已标记为已读") @router.put("/read-all", response_model=dict) async def mark_all_as_read( current_user: User = Depends(get_current_user), ): """标记所有通知为已读""" await notification_service.mark_all_read(current_user.id) return success_response(message="全部标记为已读") @router.post("/system", response_model=dict) async def send_system_notification( notification_in: NotificationCreate, current_user: User = Depends(get_current_user), ): """发送系统通知(仅限超级管理员)""" if not current_user.is_superuser: raise HTTPException(status_code=403, detail="只有管理员可以发送系统通知") # 這裡我们传 db=None 因为 service 已经不需要 db 写操作了 await notification_service.create_notification( db=None, user_id=notification_in.user_id, title=notification_in.title, content=notification_in.content, type=notification_in.type, category="system", link=notification_in.link ) return success_response(message="系统通知发送成功")