feat: Implement Danmaku (bullet chat) feature with Redis storage and configurable TTL
parent
8585f89c53
commit
cb1f03794e
|
|
@ -0,0 +1,47 @@
|
|||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from pydantic import BaseModel, constr
|
||||
from typing import List
|
||||
|
||||
from app.database import get_db
|
||||
from app.models.db import User
|
||||
from app.services.auth_deps import get_current_user
|
||||
from app.services.danmaku_service import danmaku_service
|
||||
|
||||
router = APIRouter(prefix="/danmaku", tags=["danmaku"])
|
||||
|
||||
class DanmakuCreate(BaseModel):
|
||||
text: constr(max_length=20, min_length=1)
|
||||
|
||||
class DanmakuResponse(BaseModel):
|
||||
id: str
|
||||
uid: str
|
||||
username: str
|
||||
text: str
|
||||
ts: float
|
||||
|
||||
@router.post("/send", response_model=DanmakuResponse)
|
||||
async def send_danmaku(
|
||||
data: DanmakuCreate,
|
||||
current_user: User = Depends(get_current_user),
|
||||
db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""Send a short danmaku message (max 20 chars)"""
|
||||
try:
|
||||
result = await danmaku_service.add_danmaku(
|
||||
user_id=current_user.id,
|
||||
username=current_user.username,
|
||||
text=data.text,
|
||||
db=db
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@router.get("/list", response_model=List[DanmakuResponse])
|
||||
async def get_danmaku_list(
|
||||
db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""Get all active danmaku messages"""
|
||||
# This endpoint is public (or could be protected if needed)
|
||||
return await danmaku_service.get_active_danmaku(db)
|
||||
|
|
@ -21,6 +21,7 @@ from app.api.routes import router as celestial_router
|
|||
from app.api.auth import router as auth_router
|
||||
from app.api.user import router as user_router
|
||||
from app.api.system import router as system_router
|
||||
from app.api.danmaku import router as danmaku_router
|
||||
from app.services.redis_cache import redis_cache
|
||||
from app.services.cache_preheat import preheat_all_caches
|
||||
from app.database import close_db
|
||||
|
|
@ -104,6 +105,7 @@ app.include_router(celestial_router, prefix=settings.api_prefix)
|
|||
app.include_router(auth_router, prefix=settings.api_prefix)
|
||||
app.include_router(user_router, prefix=settings.api_prefix)
|
||||
app.include_router(system_router, prefix=settings.api_prefix)
|
||||
app.include_router(danmaku_router, prefix=settings.api_prefix)
|
||||
|
||||
# Mount static files for uploaded resources
|
||||
upload_dir = Path(__file__).parent.parent / "upload"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,98 @@
|
|||
import json
|
||||
import time
|
||||
import logging
|
||||
from typing import List, Dict
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.services.redis_cache import redis_cache
|
||||
from app.services.system_settings_service import system_settings_service
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DanmakuService:
|
||||
def __init__(self):
|
||||
self.redis_key = "cosmo:danmaku:stream"
|
||||
self.default_ttl = 86400 # 24 hours fallback
|
||||
|
||||
async def get_ttl(self, db: AsyncSession) -> int:
|
||||
"""Fetch TTL from system settings or use default"""
|
||||
try:
|
||||
setting = await system_settings_service.get_setting_by_key("danmaku_ttl", db)
|
||||
if setting:
|
||||
return int(setting.value)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch danmaku_ttl: {e}")
|
||||
return self.default_ttl
|
||||
|
||||
async def add_danmaku(self, user_id: int, username: str, text: str, db: AsyncSession) -> Dict:
|
||||
"""Add a new danmaku message"""
|
||||
# Validate length (double check server side)
|
||||
if len(text) > 20:
|
||||
text = text[:20]
|
||||
|
||||
now = time.time()
|
||||
ttl = await self.get_ttl(db)
|
||||
expire_time = now - ttl
|
||||
|
||||
# Create message object
|
||||
# Add unique timestamp/random to value to ensure uniqueness in Set if user spams same msg?
|
||||
# Actually ZSET handles unique values. If same user sends "Hi" twice, it updates score.
|
||||
# To allow same msg multiple times, we can append a unique ID or timestamp to the JSON.
|
||||
message = {
|
||||
"uid": str(user_id),
|
||||
"username": username,
|
||||
"text": text,
|
||||
"ts": now,
|
||||
"id": f"{user_id}_{now}" # Unique ID for React keys
|
||||
}
|
||||
|
||||
serialized = json.dumps(message)
|
||||
|
||||
# 1. Remove expired messages first
|
||||
# ZREMRANGEBYSCORE key -inf (now - ttl)
|
||||
if redis_cache.client:
|
||||
try:
|
||||
# Clean up old
|
||||
await redis_cache.client.zremrangebyscore(self.redis_key, 0, expire_time)
|
||||
|
||||
# Add new
|
||||
await redis_cache.client.zadd(self.redis_key, {serialized: now})
|
||||
|
||||
# Optional: Set key expiry to max TTL just in case (but ZADD keeps it alive)
|
||||
await redis_cache.client.expire(self.redis_key, ttl)
|
||||
|
||||
logger.info(f"Danmaku added by {username}: {text}")
|
||||
return message
|
||||
except Exception as e:
|
||||
logger.error(f"Redis error adding danmaku: {e}")
|
||||
raise e
|
||||
else:
|
||||
logger.warning("Redis not connected, danmaku lost")
|
||||
return message
|
||||
|
||||
async def get_active_danmaku(self, db: AsyncSession) -> List[Dict]:
|
||||
"""Get all active danmaku messages"""
|
||||
now = time.time()
|
||||
ttl = await self.get_ttl(db)
|
||||
min_score = now - ttl
|
||||
|
||||
if redis_cache.client:
|
||||
try:
|
||||
# Get messages from (now - ttl) to +inf
|
||||
# ZRANGEBYSCORE key min max
|
||||
results = await redis_cache.client.zrangebyscore(self.redis_key, min_score, "+inf")
|
||||
|
||||
messages = []
|
||||
for res in results:
|
||||
try:
|
||||
messages.append(json.loads(res))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return messages
|
||||
except Exception as e:
|
||||
logger.error(f"Redis error getting danmaku: {e}")
|
||||
return []
|
||||
return []
|
||||
|
||||
danmaku_service = DanmakuService()
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
-- Add danmaku_ttl setting (default 24 hours = 86400 seconds)
|
||||
INSERT INTO system_settings (key, value, value_type, category, label, description, is_public)
|
||||
SELECT 'danmaku_ttl', '86400', 'int', 'platform', '弹幕保留时间', '用户发送的弹幕在系统中保留的时间(秒)', true
|
||||
WHERE NOT EXISTS (SELECT 1 FROM system_settings WHERE key = 'danmaku_ttl');
|
||||
|
|
@ -1,10 +1,6 @@
|
|||
/**
|
||||
* Cosmo - Deep Space Explorer
|
||||
* Main application component
|
||||
*/
|
||||
import { useState, useCallback, useEffect } from 'react';
|
||||
import { useNavigate } from 'react-router-dom';
|
||||
import { message } from 'antd';
|
||||
import { message, Modal, Input } from 'antd';
|
||||
import { useSpaceData } from './hooks/useSpaceData';
|
||||
import { useHistoricalData } from './hooks/useHistoricalData';
|
||||
import { useTrajectory } from './hooks/useTrajectory';
|
||||
|
|
@ -17,7 +13,9 @@ import { Loading } from './components/Loading';
|
|||
import { InterstellarTicker } from './components/InterstellarTicker';
|
||||
import { ControlPanel } from './components/ControlPanel';
|
||||
import { AuthModal } from './components/AuthModal';
|
||||
import { DanmakuLayer } from './components/DanmakuLayer';
|
||||
import { auth } from './utils/auth';
|
||||
import { request } from './utils/request';
|
||||
import type { CelestialBody } from './types';
|
||||
|
||||
// Timeline configuration - will be fetched from backend later
|
||||
|
|
@ -67,6 +65,10 @@ function App() {
|
|||
const [user, setUser] = useState<any>(auth.getUser());
|
||||
const [showAuthModal, setShowAuthModal] = useState(false);
|
||||
|
||||
// Danmaku state
|
||||
const [isDanmakuInputVisible, setIsDanmakuInputVisible] = useState(false);
|
||||
const [danmakuText, setDanmakuText] = useState('');
|
||||
|
||||
// Use real-time data or historical data based on mode
|
||||
const { bodies: realTimeBodies, loading: realTimeLoading, error: realTimeError } = useSpaceData();
|
||||
const { bodies: historicalBodies, loading: historicalLoading, error: historicalError } = useHistoricalData(selectedDate);
|
||||
|
|
@ -116,6 +118,29 @@ function App() {
|
|||
takeScreenshot(nickname);
|
||||
}, [user, takeScreenshot]);
|
||||
|
||||
// Danmaku send handler
|
||||
const handleSendDanmaku = async () => {
|
||||
if (!danmakuText.trim()) return;
|
||||
if (danmakuText.length > 20) {
|
||||
message.warning("弹幕内容不能超过20字");
|
||||
return;
|
||||
}
|
||||
if (!user) {
|
||||
message.warning("请先登录");
|
||||
setShowAuthModal(true);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await request.post('/danmaku/send', { text: danmakuText });
|
||||
message.success("发送成功");
|
||||
setDanmakuText('');
|
||||
setIsDanmakuInputVisible(false);
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
message.error("发送失败");
|
||||
}
|
||||
};
|
||||
|
||||
// Auth handlers
|
||||
const handleLoginSuccess = (userData: any) => {
|
||||
setUser(userData);
|
||||
|
|
@ -159,6 +184,9 @@ function App() {
|
|||
onNavigateToAdmin={() => navigate('/admin')}
|
||||
/>
|
||||
|
||||
{/* Danmaku Layer */}
|
||||
<DanmakuLayer enabled={showDanmaku} />
|
||||
|
||||
{/* Right Control Panel */}
|
||||
<ControlPanel
|
||||
isTimelineMode={isTimelineMode}
|
||||
|
|
@ -169,6 +197,14 @@ function App() {
|
|||
onToggleSound={() => setIsSoundOn(!isSoundOn)}
|
||||
showDanmaku={showDanmaku}
|
||||
onToggleDanmaku={() => setShowDanmaku(!showDanmaku)}
|
||||
onOpenDanmakuInput={() => {
|
||||
if (!user) {
|
||||
message.warning("请先登录");
|
||||
setShowAuthModal(true);
|
||||
} else {
|
||||
setIsDanmakuInputVisible(true);
|
||||
}
|
||||
}}
|
||||
onScreenshot={handleScreenshot}
|
||||
/>
|
||||
|
||||
|
|
@ -179,6 +215,27 @@ function App() {
|
|||
onLoginSuccess={handleLoginSuccess}
|
||||
/>
|
||||
|
||||
{/* Danmaku Input Modal */}
|
||||
<Modal
|
||||
title="发送星际弹幕"
|
||||
open={isDanmakuInputVisible}
|
||||
onOk={handleSendDanmaku}
|
||||
onCancel={() => setIsDanmakuInputVisible(false)}
|
||||
okText="发射"
|
||||
cancelText="取消"
|
||||
centered
|
||||
>
|
||||
<Input
|
||||
placeholder="输入想说的话 (20字以内)"
|
||||
maxLength={20}
|
||||
showCount
|
||||
value={danmakuText}
|
||||
onChange={e => setDanmakuText(e.target.value)}
|
||||
onPressEnter={handleSendDanmaku}
|
||||
autoFocus
|
||||
/>
|
||||
</Modal>
|
||||
|
||||
{/* Probe List Sidebar */}
|
||||
<ProbeList
|
||||
probes={probes}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,8 @@ import {
|
|||
MessageSquare,
|
||||
Eye,
|
||||
EyeOff,
|
||||
Camera
|
||||
Camera,
|
||||
Send
|
||||
} from 'lucide-react';
|
||||
|
||||
interface ControlPanelProps {
|
||||
|
|
@ -18,6 +19,7 @@ interface ControlPanelProps {
|
|||
onToggleSound: () => void;
|
||||
showDanmaku: boolean;
|
||||
onToggleDanmaku: () => void;
|
||||
onOpenDanmakuInput: () => void;
|
||||
onScreenshot: () => void;
|
||||
}
|
||||
|
||||
|
|
@ -30,6 +32,7 @@ export function ControlPanel({
|
|||
onToggleSound,
|
||||
showDanmaku,
|
||||
onToggleDanmaku,
|
||||
onOpenDanmakuInput,
|
||||
onScreenshot,
|
||||
}: ControlPanelProps) {
|
||||
const buttonClass = (isActive: boolean) => `
|
||||
|
|
@ -77,16 +80,29 @@ export function ControlPanel({
|
|||
</div>
|
||||
</button>
|
||||
|
||||
{/* Danmaku Toggle (Mock) */}
|
||||
<button
|
||||
onClick={onToggleDanmaku}
|
||||
className={buttonClass(showDanmaku)}
|
||||
>
|
||||
<MessageSquare size={20} />
|
||||
<div className={tooltipClass}>
|
||||
{showDanmaku ? '关闭弹幕' : '开启弹幕'}
|
||||
</div>
|
||||
</button>
|
||||
{/* Danmaku Toggle */}
|
||||
<div className="relative flex items-center">
|
||||
{showDanmaku && (
|
||||
<button
|
||||
onClick={onOpenDanmakuInput}
|
||||
className="mr-2 p-2 rounded-lg bg-white/10 text-gray-300 hover:bg-white/20 border border-white/5 transition-all duration-200 relative group"
|
||||
>
|
||||
<Send size={16} />
|
||||
<div className={tooltipClass}>
|
||||
发送弹幕
|
||||
</div>
|
||||
</button>
|
||||
)}
|
||||
<button
|
||||
onClick={onToggleDanmaku}
|
||||
className={buttonClass(showDanmaku)}
|
||||
>
|
||||
<MessageSquare size={20} />
|
||||
<div className={tooltipClass}>
|
||||
{showDanmaku ? '关闭弹幕' : '开启弹幕'}
|
||||
</div>
|
||||
</button>
|
||||
</div>
|
||||
|
||||
{/* Screenshot Button */}
|
||||
<button
|
||||
|
|
|
|||
|
|
@ -0,0 +1,113 @@
|
|||
import { useEffect, useState, useRef } from 'react';
|
||||
import { request } from '../utils/request';
|
||||
|
||||
interface DanmakuMessage {
|
||||
id: string;
|
||||
uid: string;
|
||||
username: string;
|
||||
text: string;
|
||||
ts: number;
|
||||
// Runtime properties for animation
|
||||
top?: number;
|
||||
duration?: number;
|
||||
startTime?: number;
|
||||
}
|
||||
|
||||
interface DanmakuLayerProps {
|
||||
enabled: boolean;
|
||||
}
|
||||
|
||||
export function DanmakuLayer({ enabled }: DanmakuLayerProps) {
|
||||
const [visibleMessages, setVisibleMessages] = useState<DanmakuMessage[]>([]);
|
||||
const processedIds = useRef<Set<string>>(new Set());
|
||||
const containerRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
// Polling for new messages
|
||||
useEffect(() => {
|
||||
if (!enabled) return;
|
||||
|
||||
const fetchDanmaku = async () => {
|
||||
try {
|
||||
const { data } = await request.get('/danmaku/list');
|
||||
if (Array.isArray(data)) {
|
||||
// Filter out messages we've already seen or processed locally
|
||||
// Actually, for a "live" feel, we only want *recent* messages or messages we haven't shown in this session.
|
||||
// But since the backend returns a 24h window, we don't want to replay all 24h history at once on load.
|
||||
// Strategy: On first load, maybe only show last 20? Or just start listening for new ones?
|
||||
// Let's show recent ones (last 1 min) on load, then polling.
|
||||
|
||||
const now = Date.now() / 1000;
|
||||
const newMessages = data.filter((msg: DanmakuMessage) => {
|
||||
if (processedIds.current.has(msg.id)) return false;
|
||||
// Only show messages from the last 5 minutes to avoid flooding history on reload
|
||||
if (now - msg.ts > 300) return false;
|
||||
return true;
|
||||
});
|
||||
|
||||
if (newMessages.length > 0) {
|
||||
newMessages.forEach((msg: DanmakuMessage) => processedIds.current.add(msg.id));
|
||||
// Add to queue
|
||||
addMessagesToTrack(newMessages);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error("Failed to fetch danmaku", err);
|
||||
}
|
||||
};
|
||||
|
||||
fetchDanmaku(); // Initial fetch
|
||||
const interval = setInterval(fetchDanmaku, 3000); // Poll every 3s
|
||||
|
||||
return () => clearInterval(interval);
|
||||
}, [enabled]);
|
||||
|
||||
const addMessagesToTrack = (newMsgs: DanmakuMessage[]) => {
|
||||
// Assign random vertical position and duration
|
||||
const tracks = newMsgs.map(msg => ({
|
||||
...msg,
|
||||
top: Math.floor(Math.random() * 60) + 10, // 10% to 70% height
|
||||
duration: Math.floor(Math.random() * 5) + 8, // 8-13 seconds duration
|
||||
startTime: Date.now()
|
||||
}));
|
||||
|
||||
setVisibleMessages(prev => [...prev, ...tracks]);
|
||||
};
|
||||
|
||||
// Cleanup finished animations
|
||||
const handleAnimationEnd = (id: string) => {
|
||||
setVisibleMessages(prev => prev.filter(m => m.id !== id));
|
||||
};
|
||||
|
||||
if (!enabled) return null;
|
||||
|
||||
return (
|
||||
<div
|
||||
ref={containerRef}
|
||||
className="absolute inset-0 pointer-events-none z-30 overflow-hidden"
|
||||
style={{ userSelect: 'none' }}
|
||||
>
|
||||
{visibleMessages.map(msg => (
|
||||
<div
|
||||
key={msg.id}
|
||||
className="absolute whitespace-nowrap text-white font-bold text-shadow-md will-change-transform"
|
||||
style={{
|
||||
top: `${msg.top}%`,
|
||||
left: '100%',
|
||||
fontSize: '1.2rem',
|
||||
textShadow: '0 0 4px rgba(0,0,0,0.8)',
|
||||
animation: `danmaku-move ${msg.duration}s linear forwards`
|
||||
}}
|
||||
onAnimationEnd={() => handleAnimationEnd(msg.id)}
|
||||
>
|
||||
{msg.text}
|
||||
</div>
|
||||
))}
|
||||
<style>{`
|
||||
@keyframes danmaku-move {
|
||||
0% { transform: translateX(0); }
|
||||
100% { transform: translateX(-100vw - 100%); }
|
||||
}
|
||||
`}</style>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
Loading…
Reference in New Issue