import { useCallback, useEffect, useRef } from 'react'; import axios from 'axios'; import { useAppStore } from '../store/appStore'; import { APP_ENDPOINTS } from '../config/env'; import type { BotState, ChatMessage } from '../types/bot'; import { normalizeAssistantMessageText, normalizeUserMessageText, summarizeProgressText } from '../shared/text/messageText'; import { pickLocale } from '../i18n'; import { botsSyncZhCn } from '../i18n/bots-sync.zh-cn'; import { botsSyncEn } from '../i18n/bots-sync.en'; import { buildMonitorWsUrl, notifyBotAuthInvalid } from '../utils/botAccess'; interface BotMessageRow { id?: unknown; role?: unknown; text?: unknown; media?: unknown; ts?: unknown; feedback?: unknown; } interface BotMessagesPageResponse { items?: BotMessageRow[]; } interface RequestErrorShape { response?: { status?: number; }; } interface MonitorEventPayload { state?: unknown; action_msg?: unknown; msg?: unknown; text?: unknown; content?: unknown; media?: unknown; message_id?: unknown; command?: unknown; } interface MonitorWsMessage { type?: unknown; channel?: unknown; source?: unknown; payload?: MonitorEventPayload | null; state?: unknown; action_msg?: unknown; msg?: unknown; text?: unknown; media?: unknown; message_id?: unknown; content?: unknown; is_progress?: unknown; is_tool?: unknown; } interface ProgressMessageText { detailText: string; summaryText: string; } function normalizeState(v: string): 'THINKING' | 'TOOL_CALL' | 'SUCCESS' | 'ERROR' | 'INFO' { const s = (v || '').toUpperCase(); if (s === 'THINKING' || s === 'TOOL_CALL' || s === 'SUCCESS' || s === 'ERROR') return s; return 'INFO'; } function normalizeBusState(isTool: boolean): 'THINKING' | 'TOOL_CALL' { return isTool ? 'TOOL_CALL' : 'THINKING'; } function normalizeMedia(raw: unknown): string[] { if (!Array.isArray(raw)) return []; return raw.map((v) => String(v || '').trim()).filter((v) => v.length > 0); } function normalizeFeedback(raw: unknown): 'up' | 'down' | null { const v = String(raw || '').trim().toLowerCase(); if (v === 'up' || v === 'down') return v; return null; } function normalizeMessageId(raw: unknown): number | undefined { const n = Number(raw); if (!Number.isFinite(n)) return undefined; const i = Math.trunc(n); return i > 0 ? i : undefined; } function normalizeChannelName(raw: unknown): string { const channel = String(raw || '').trim().toLowerCase(); if (channel === 'dashboard_channel' || channel === 'dashboard-channel') return 'dashboard'; return channel; } function isLikelyEchoOfUserInput(progressText: string, userText: string): boolean { const progress = normalizeAssistantMessageText(progressText).replace(/\s+/g, ' ').trim().toLowerCase(); const user = normalizeUserMessageText(userText).replace(/\s+/g, ' ').trim().toLowerCase(); if (!progress || !user) return false; if (progress === user) return true; if (user.length < 8) return false; const hasProcessingPrefix = /processing message|message from|received message|收到消息|处理消息|用户输入|command/i.test(progress); if (progress.includes(user) && (hasProcessingPrefix || progress.length <= user.length + 40)) { return true; } return false; } function extractToolCallProgressPreview(raw: string): string | null { const text = String(raw || '').replace(/<\/?tool_call>/gi, '').trim(); if (!text) return null; const hasToolCallSignal = /"name"\s*:/.test(text) && /"arguments"\s*:/.test(text); if (!hasToolCallSignal) return null; const nameMatch = text.match(/"name"\s*:\s*"([^"]+)"/); const toolName = String(nameMatch?.[1] || '').trim(); if (!toolName) return null; const queryMatch = text.match(/"query"\s*:\s*"([^"]+)"/); const pathMatch = text.match(/"path"\s*:\s*"([^"]+)"/); const target = String(queryMatch?.[1] || pathMatch?.[1] || '').trim(); return target ? `${toolName}("${target.slice(0, 80)}${target.length > 80 ? '…' : ''}")` : toolName; } function buildProgressMessageText(raw: string, isZh: boolean, isTool: boolean): ProgressMessageText { const normalized = normalizeAssistantMessageText(raw); const fallback = isZh ? '处理中...' : 'Processing...'; if (!isTool) { const detailText = normalized || fallback; return { detailText, summaryText: summarizeProgressText(detailText, isZh), }; } const title = isZh ? '工具调用' : 'Tool Call'; const preview = extractToolCallProgressPreview(normalized); const fallbackSummary = normalized ? summarizeProgressText(normalized, isZh) : ''; const summaryText = preview ? `${title} · ${preview}` : (fallbackSummary ? `${title} · ${fallbackSummary}` : title); if (!normalized || (!preview && fallbackSummary === normalized)) { return { detailText: summaryText, summaryText }; } return { detailText: `${summaryText}\n\n${normalized}`, summaryText, }; } export function useBotsSync(forcedBotId?: string, enableMonitorSockets: boolean = true) { const { activeBots, setBots, updateBotState, addBotLog, addBotMessage, addBotEvent, setBotMessages } = useAppStore(); const socketsRef = useRef>({}); const heartbeatsRef = useRef>({}); const activeBotsRef = useRef>({}); const lastUserEchoRef = useRef>({}); const lastAssistantRef = useRef>({}); const lastProgressRef = useRef>({}); const hydratedMessagesRef = useRef>({}); const isZh = useAppStore((s) => s.locale === 'zh'); const locale = useAppStore((s) => s.locale); const t = pickLocale(locale, { 'zh-cn': botsSyncZhCn, en: botsSyncEn }); const forced = String(forcedBotId || '').trim(); useEffect(() => { activeBotsRef.current = activeBots; }, [activeBots]); const syncBotMessages = useCallback( async (botId: string) => { const target = String(botId || '').trim(); if (!target) return; try { const res = await axios.get(`${APP_ENDPOINTS.apiBase}/bots/${target}/messages/page`); const rows = Array.isArray(res.data?.items) ? res.data.items : []; const latestPage: ChatMessage[] = rows .map((row) => { const roleRaw = String(row?.role || '').toLowerCase(); const role: ChatMessage['role'] = roleRaw === 'user' || roleRaw === 'assistant' || roleRaw === 'system' ? roleRaw : 'assistant'; return { id: normalizeMessageId(row?.id), role, text: String(row?.text || ''), attachments: normalizeMedia(row?.media), ts: Number(row?.ts || Date.now()), feedback: normalizeFeedback(row?.feedback), }; }) .filter((msg) => msg.text.trim().length > 0 || (msg.attachments || []).length > 0) .slice(-300); // Keep already lazy-loaded history; only merge/refresh the latest page. const existing = (activeBotsRef.current[target]?.messages || []).filter((m) => (m.kind || 'final') !== 'progress'); const mergedMap = new Map(); [...existing, ...latestPage].forEach((msg) => { const key = msg.id ? `id:${msg.id}` : `k:${msg.role}:${msg.ts}:${msg.text}`; if (!mergedMap.has(key)) mergedMap.set(key, msg); }); const messages = Array.from(mergedMap.values()) .sort((a, b) => { if (a.ts !== b.ts) return a.ts - b.ts; return Number(a.id || 0) - Number(b.id || 0); }) .slice(-300); setBotMessages(target, messages); const lastUser = [...messages].reverse().find((m) => m.role === 'user'); if (lastUser) lastUserEchoRef.current[target] = { text: lastUser.text, ts: lastUser.ts }; const lastAssistant = [...messages].reverse().find((m) => m.role === 'assistant'); if (lastAssistant) lastAssistantRef.current[target] = { text: lastAssistant.text, ts: lastAssistant.ts }; } catch (error) { console.error(`Failed to sync bot messages for ${target}`, error); } }, [setBotMessages], ); useEffect(() => { const fetchBots = async () => { try { if (forced) { const res = await axios.get(`${APP_ENDPOINTS.apiBase}/bots/${encodeURIComponent(forced)}`); setBots(res.data ? [res.data] : []); return; } const res = await axios.get(`${APP_ENDPOINTS.apiBase}/bots`); setBots(res.data); } catch (error: unknown) { const resolvedError = (error && typeof error === 'object' ? error : {}) as RequestErrorShape; const status = Number(resolvedError.response?.status || 0); if (forced && status === 401) { setBots([]); return; } console.error(forced ? `Failed to fetch bot ${forced}` : 'Failed to fetch bots', error); } }; fetchBots(); const interval = window.setInterval(fetchBots, 5000); return () => { window.clearInterval(interval); }; }, [forced, setBots]); useEffect(() => { const botIds = Object.keys(activeBots); const aliveIds = new Set(botIds); Object.keys(hydratedMessagesRef.current).forEach((botId) => { if (!aliveIds.has(botId)) { delete hydratedMessagesRef.current[botId]; } }); botIds.forEach((botId) => { if (hydratedMessagesRef.current[botId]) return; hydratedMessagesRef.current[botId] = true; void syncBotMessages(botId); }); }, [activeBots, syncBotMessages]); useEffect(() => { const syncVisibleBots = () => { if (typeof document !== 'undefined' && document.visibilityState !== 'visible') return; const botIds = Object.keys(activeBotsRef.current || {}); botIds.forEach((botId) => { void syncBotMessages(botId); }); }; window.addEventListener('focus', syncVisibleBots); window.addEventListener('pageshow', syncVisibleBots); document.addEventListener('visibilitychange', syncVisibleBots); return () => { window.removeEventListener('focus', syncVisibleBots); window.removeEventListener('pageshow', syncVisibleBots); document.removeEventListener('visibilitychange', syncVisibleBots); }; }, [syncBotMessages]); useEffect(() => { if (!enableMonitorSockets) { Object.values(socketsRef.current).forEach((ws) => ws.close()); Object.values(heartbeatsRef.current).forEach((timerId) => window.clearInterval(timerId)); heartbeatsRef.current = {}; socketsRef.current = {}; return () => { // no-op }; } const runningIds = new Set( Object.values(activeBots) .filter((bot) => bot.docker_status === 'RUNNING') .map((bot) => bot.id), ); Object.keys(socketsRef.current).forEach((botId) => { if (!runningIds.has(botId)) { socketsRef.current[botId].close(); delete socketsRef.current[botId]; } }); Object.values(activeBots).forEach((bot) => { if (bot.docker_status !== 'RUNNING') { return; } if (socketsRef.current[bot.id]) { return; } const ws = new WebSocket(buildMonitorWsUrl(APP_ENDPOINTS.wsBase, bot.id)); ws.onopen = () => { const beat = window.setInterval(() => { if (ws.readyState === WebSocket.OPEN) { ws.send('ping'); } }, 15000); heartbeatsRef.current[bot.id] = beat; // Backfill messages after (re)connect to avoid missing outputs while tab was backgrounded. void syncBotMessages(bot.id); }; ws.onmessage = (event) => { let data: MonitorWsMessage; try { data = JSON.parse(event.data) as MonitorWsMessage; } catch { return; } const sourceChannel = normalizeChannelName(data?.channel || data?.source); const isDashboardChannel = sourceChannel === 'dashboard'; const payload = data?.payload && typeof data.payload === 'object' ? data.payload : {}; if (data.type === 'AGENT_STATE') { const state = String(payload.state || data.state || 'INFO'); const messageRaw = String(payload.action_msg || payload.msg || data.action_msg || data.msg || ''); const normalizedState = normalizeState(state); const fullMessage = normalizeAssistantMessageText(messageRaw); const toolProgress = normalizedState === 'TOOL_CALL' ? buildProgressMessageText(fullMessage || messageRaw, isZh, true) : null; const message = toolProgress?.summaryText || fullMessage || summarizeProgressText(messageRaw, isZh) || t.stateUpdated; updateBotState(bot.id, state, message); addBotEvent(bot.id, { state: normalizedState, text: message || t.stateUpdated, ts: Date.now(), channel: sourceChannel || undefined, }); if (isDashboardChannel && fullMessage && normalizedState === 'TOOL_CALL') { const chatText = toolProgress?.detailText || fullMessage; const now = Date.now(); const prev = lastProgressRef.current[bot.id]; if (!prev || prev.text !== chatText || now - prev.ts > 1200) { addBotMessage(bot.id, { role: 'assistant', text: chatText, ts: now, kind: 'progress', }); lastProgressRef.current[bot.id] = { text: chatText, ts: now }; } } return; } if (data.type === 'ASSISTANT_MESSAGE') { if (!isDashboardChannel) return; const text = normalizeAssistantMessageText(String(data.text || payload.text || payload.content || '')); const attachments = normalizeMedia(data.media || payload.media); const messageId = normalizeMessageId(data.message_id || payload.message_id); if (!text && attachments.length === 0) return; const now = Date.now(); const prev = lastAssistantRef.current[bot.id]; if (prev && prev.text === text && now - prev.ts < 5000 && attachments.length === 0) return; lastAssistantRef.current[bot.id] = { text, ts: now }; addBotMessage(bot.id, { id: messageId, role: 'assistant', text, attachments, ts: now, kind: 'final', feedback: null }); updateBotState(bot.id, 'IDLE', ''); addBotEvent(bot.id, { state: 'SUCCESS', text: t.replied, ts: Date.now(), channel: sourceChannel || undefined }); return; } if (data.type === 'BUS_EVENT') { const content = normalizeAssistantMessageText(String(data.content || payload.content || '')); const isProgress = Boolean(data.is_progress); const isTool = Boolean(data.is_tool) || Boolean(extractToolCallProgressPreview(content)); if (isProgress) { const state = normalizeBusState(isTool); const progressMessage = buildProgressMessageText(content, isZh, isTool); updateBotState(bot.id, state, progressMessage.summaryText || t.progress); addBotEvent(bot.id, { state, text: progressMessage.summaryText || t.progress, ts: Date.now(), channel: sourceChannel || undefined, }); if (isDashboardChannel) { const lastUserText = lastUserEchoRef.current[bot.id]?.text || ''; if (!isTool && isLikelyEchoOfUserInput(progressMessage.detailText, lastUserText)) { return; } const chatText = progressMessage.detailText; const now = Date.now(); const prev = lastProgressRef.current[bot.id]; if (!prev || prev.text !== chatText || now - prev.ts > 1200) { addBotMessage(bot.id, { role: 'assistant', text: chatText, ts: now, kind: 'progress', }); lastProgressRef.current[bot.id] = { text: chatText, ts: now }; } } return; } if (!isDashboardChannel) return; if (content) { const messageId = normalizeMessageId(data.message_id || payload.message_id); const now = Date.now(); const prev = lastAssistantRef.current[bot.id]; if (!prev || prev.text !== content || now - prev.ts >= 5000) { addBotMessage(bot.id, { id: messageId, role: 'assistant', text: content, ts: now, kind: 'final', feedback: null }); lastAssistantRef.current[bot.id] = { text: content, ts: now }; } updateBotState(bot.id, 'IDLE', summarizeProgressText(content, isZh)); addBotEvent(bot.id, { state: 'SUCCESS', text: t.replied, ts: Date.now(), channel: sourceChannel || undefined }); } return; } if (data.type === 'USER_COMMAND') { if (!isDashboardChannel) return; const rawText = String(data.text || payload.text || payload.command || ''); const text = normalizeUserMessageText(rawText); const attachments = normalizeMedia(data.media || payload.media); const messageId = normalizeMessageId(data.message_id || payload.message_id); if (!text && attachments.length === 0) return; const now = Date.now(); const prev = lastUserEchoRef.current[bot.id]; if (prev && prev.text === text && now - prev.ts < 10000 && attachments.length === 0) return; lastUserEchoRef.current[bot.id] = { text, ts: now }; addBotMessage(bot.id, { id: messageId, role: 'user', text: rawText, attachments, ts: now, kind: 'final' }); return; } if (data.type === 'RAW_LOG') { addBotLog(bot.id, String(data.text || '')); } }; ws.onclose = (event) => { const hb = heartbeatsRef.current[bot.id]; if (hb) { window.clearInterval(hb); delete heartbeatsRef.current[bot.id]; } delete socketsRef.current[bot.id]; if (event.code === 4401 && forced === bot.id) { notifyBotAuthInvalid(bot.id); } }; socketsRef.current[bot.id] = ws; }); return () => { // no-op: clean in unmount effect below }; }, [activeBots, addBotEvent, addBotLog, addBotMessage, enableMonitorSockets, forced, isZh, syncBotMessages, t.progress, t.replied, t.stateUpdated, updateBotState]); useEffect(() => { return () => { Object.values(socketsRef.current).forEach((ws) => ws.close()); Object.values(heartbeatsRef.current).forEach((timerId) => window.clearInterval(timerId)); heartbeatsRef.current = {}; socketsRef.current = {}; }; }, []); }