diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java index b6652f6..fdcf686 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java @@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import java.net.URI; import java.net.URLEncoder; @@ -60,30 +61,27 @@ public class AiTaskServiceImpl extends ServiceImpl impleme @Override @Async public void dispatchTasks(Long meetingId) { - // 尝试获取轮询锁,防止并发执行 String lockKey = RedisKeys.meetingPollingLockKey(meetingId); Boolean acquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 30, TimeUnit.MINUTES); if (Boolean.FALSE.equals(acquired)) { - log.warn("Meeting {} task is already being processed by another thread", meetingId); + log.warn("Meeting {} is already being processed", meetingId); return; } try { - log.info("Starting real AI processing for meeting ID: {}", meetingId); Meeting meeting = meetingMapper.selectById(meetingId); if (meeting == null) return; - // 1. 执行 ASR 识别 + // 1. 执行 ASR 识别 (含接管逻辑) String asrText = processAsrTask(meeting); // 2. 执行 LLM 总结 processSummaryTask(meeting, asrText); - // 完成后清除进度 redisTemplate.delete(RedisKeys.meetingProgressKey(meetingId)); } catch (Exception e) { log.error("Meeting {} AI Task Flow failed", meetingId, e); - updateMeetingStatus(meetingId, 4); // Overall Failed + updateMeetingStatus(meetingId, 4); updateProgress(meetingId, -1, "分析失败: " + e.getMessage(), 0); } finally { redisTemplate.delete(lockKey); @@ -95,13 +93,15 @@ public class AiTaskServiceImpl extends ServiceImpl impleme public void dispatchSummaryTask(Long meetingId) { Meeting meeting = meetingMapper.selectById(meetingId); if (meeting == null) return; - try { - // 获取已有转录全文 List transcripts = transcriptMapper.selectList(new LambdaQueryWrapper() .eq(MeetingTranscript::getMeetingId, meetingId) .orderByAsc(MeetingTranscript::getStartTime)); + if (transcripts.isEmpty()) { + throw new RuntimeException("没有找到可用的转录文本,无法生成总结"); + } + String asrText = transcripts.stream() .map(t -> (t.getSpeakerName() != null ? t.getSpeakerName() : t.getSpeakerId()) + ": " + t.getContent()) .collect(Collectors.joining("\n")); @@ -110,64 +110,73 @@ public class AiTaskServiceImpl extends ServiceImpl impleme } catch (Exception e) { log.error("Re-summary failed for meeting {}", meetingId, e); updateMeetingStatus(meetingId, 4); + updateProgress(meetingId, -1, "总结失败: " + e.getMessage(), 0); } } private String processAsrTask(Meeting meeting) throws Exception { - updateMeetingStatus(meeting.getId(), 1); // 识别中 - updateProgress(meeting.getId(), 5, "已提交识别请求...", 0); + updateMeetingStatus(meeting.getId(), 1); AiModel asrModel = aiModelService.getById(meeting.getAsrModelId()); - if (asrModel == null) throw new RuntimeException("ASR Model config not found"); + if (asrModel == null) throw new RuntimeException("ASR模型配置不存在"); - // 构建请求参数并转码 - Map req = new HashMap<>(); - String rawAudioUrl = meeting.getAudioUrl(); - String encodedAudioUrl = Arrays.stream(rawAudioUrl.split("/")) - .map(part -> { - try { - return URLEncoder.encode(part, StandardCharsets.UTF_8).replace("+", "%20"); - } catch (Exception e) { return part; } - }) - .collect(Collectors.joining("/")); - String fullAudioUrl = serverBaseUrl + (encodedAudioUrl.startsWith("/") ? "" : "/") + encodedAudioUrl; - - req.put("file_path", fullAudioUrl); - req.put("use_spk_id", meeting.getUseSpkId() != null && meeting.getUseSpkId() == 1); + String submitUrl = asrModel.getBaseUrl().endsWith("/") ? asrModel.getBaseUrl() + "api/tasks/recognition" : asrModel.getBaseUrl() + "/api/tasks/recognition"; + String taskId = null; + AiTask taskRecord = null; - // 处理热词权重 - List> formattedHotwords = new ArrayList<>(); - if (meeting.getHotWords() != null && !meeting.getHotWords().isEmpty()) { - List entities = hotWordService.list(new LambdaQueryWrapper() - .eq(HotWord::getTenantId, meeting.getTenantId()) - .in(HotWord::getWord, meeting.getHotWords())); - Map weightMap = entities.stream().collect(Collectors.toMap(HotWord::getWord, HotWord::getWeight, (v1, v2) -> v1)); - for (String w : meeting.getHotWords()) { - formattedHotwords.add(Map.of("hotword", w, "weight", weightMap.getOrDefault(w, 10) / 10.0)); + // --- 核心:接管逻辑 --- + List existingTasks = this.list(new LambdaQueryWrapper() + .eq(AiTask::getMeetingId, meeting.getId()) + .eq(AiTask::getTaskType, "ASR") + .orderByDesc(AiTask::getStartedAt)); + + if (!existingTasks.isEmpty()) { + AiTask lastTask = existingTasks.get(0); + if (lastTask.getResponseData() != null && lastTask.getResponseData().get("task_id") != null) { + String potentialTaskId = lastTask.getResponseData().get("task_id").toString(); + log.info("Attempting to resume ASR task {} for meeting {}", potentialTaskId, meeting.getId()); + + // 验证旧 taskId 是否依然有效 + String checkUrl = asrModel.getBaseUrl().endsWith("/") ? asrModel.getBaseUrl() + "api/tasks/" + potentialTaskId : asrModel.getBaseUrl() + "/api/tasks/" + potentialTaskId; + try { + String checkResp = get(checkUrl); + JsonNode node = objectMapper.readTree(checkResp); + if (node.path("code").asInt() == 200) { + taskId = potentialTaskId; + taskRecord = lastTask; + log.info("Successfully resumed ASR task {}", taskId); + } + } catch (Exception e) { + log.warn("Old task ID {} is invalid, will start a new one", potentialTaskId); + } } } - req.put("hotwords", formattedHotwords); - AiTask taskRecord = createAiTask(meeting.getId(), "ASR", req); - - // 提交 - String submitUrl = asrModel.getBaseUrl().endsWith("/") ? asrModel.getBaseUrl() + "api/tasks/recognition" : asrModel.getBaseUrl() + "/api/tasks/recognition"; - String respBody = postJson(submitUrl, req); - JsonNode submitNode = objectMapper.readTree(respBody); - if (submitNode.path("code").asInt() != 200) { - updateAiTaskFail(taskRecord, "Submission Failed: " + respBody); - throw new RuntimeException("ASR引擎拒绝请求: " + submitNode.path("msg").asText()); + // 如果没有可接管的任务,则发起新任务 + if (taskId == null) { + updateProgress(meeting.getId(), 5, "正在提交识别请求...", 0); + Map req = buildAsrRequest(meeting); + taskRecord = createAiTask(meeting.getId(), "ASR", req); + + String respBody = postJson(submitUrl, req); + JsonNode submitNode = objectMapper.readTree(respBody); + if (submitNode.path("code").asInt() != 200) { + updateAiTaskFail(taskRecord, "Submission Failed: " + respBody); + throw new RuntimeException("ASR引擎拒绝请求: " + submitNode.path("msg").asText()); + } + taskId = submitNode.path("data").path("task_id").asText(); + taskRecord.setResponseData(Map.of("task_id", taskId)); + this.updateById(taskRecord); } - String taskId = submitNode.path("data").path("task_id").asText(); String queryUrl = asrModel.getBaseUrl().endsWith("/") ? asrModel.getBaseUrl() + "api/tasks/" + taskId : asrModel.getBaseUrl() + "/api/tasks/" + taskId; - // 轮询带防护 + // 轮询逻辑 (带防卡死防护) JsonNode resultNode = null; int lastPercent = -1; int unchangedCount = 0; - for (int i = 0; i < 600; i++) { // Max 20 minutes + for (int i = 0; i < 600; i++) { Thread.sleep(2000); String queryResp = get(queryUrl); JsonNode statusNode = objectMapper.readTree(queryResp); @@ -177,25 +186,17 @@ public class AiTaskServiceImpl extends ServiceImpl impleme if ("completed".equalsIgnoreCase(status)) { resultNode = data.path("result"); updateAiTaskSuccess(taskRecord, statusNode); - updateProgress(meeting.getId(), 85, "语音转录完成,准备进行总结...", 0); break; } else if ("failed".equalsIgnoreCase(status)) { - updateAiTaskFail(taskRecord, "ASR reported failure: " + queryResp); - throw new RuntimeException("ASR处理失败: " + data.path("message").asText()); + updateAiTaskFail(taskRecord, "ASR engine reported failure: " + queryResp); + throw new RuntimeException("ASR引擎处理失败: " + data.path("message").asText()); } else { - // 处理中:同步进度到 Redis int currentPercent = data.path("percentage").asInt(); - String message = data.path("message").asText(); int eta = data.path("eta").asInt(0); - - // 缩放到 0-85% 范围 - int scaledPercent = (int)(currentPercent * 0.85); - updateProgress(meeting.getId(), Math.max(5, scaledPercent), message, eta); + updateProgress(meeting.getId(), (int)(currentPercent * 0.85), data.path("message").asText(), eta); - // 防死循环逻辑:如果进度长时间不动且不是 0 if (currentPercent > 0 && currentPercent == lastPercent) { - unchangedCount++; - if (unchangedCount > 40) throw new RuntimeException("ASR处理停滞,自动超时"); + if (++unchangedCount > 45) throw new RuntimeException("识别任务长时间无进度增长,自动强制超时"); } else { unchangedCount = 0; } @@ -205,7 +206,40 @@ public class AiTaskServiceImpl extends ServiceImpl impleme if (resultNode == null) throw new RuntimeException("ASR轮询超时"); - // 解析并入库 (适配新 Speaker 格式) + // 解析并入库 (防御性清理旧数据) + return saveTranscripts(meeting, resultNode); + } + + private Map buildAsrRequest(Meeting meeting) { + Map req = new HashMap<>(); + String rawAudioUrl = meeting.getAudioUrl(); + String encodedAudioUrl = Arrays.stream(rawAudioUrl.split("/")) + .map(part -> { + try { return URLEncoder.encode(part, StandardCharsets.UTF_8).replace("+", "%20"); } + catch (Exception e) { return part; } + }) + .collect(Collectors.joining("/")); + req.put("file_path", serverBaseUrl + (encodedAudioUrl.startsWith("/") ? "" : "/") + encodedAudioUrl); + req.put("use_spk_id", meeting.getUseSpkId() != null && meeting.getUseSpkId() == 1); + + List> hotwords = new ArrayList<>(); + if (meeting.getHotWords() != null && !meeting.getHotWords().isEmpty()) { + List entities = hotWordService.list(new LambdaQueryWrapper() + .eq(HotWord::getTenantId, meeting.getTenantId()).in(HotWord::getWord, meeting.getHotWords())); + Map weightMap = entities.stream().collect(Collectors.toMap(HotWord::getWord, HotWord::getWeight, (v1, v2) -> v1)); + for (String w : meeting.getHotWords()) { + hotwords.add(Map.of("hotword", w, "weight", weightMap.getOrDefault(w, 10) / 10.0)); + } + } + req.put("hotwords", hotwords); + return req; + } + + @Transactional(rollbackFor = Exception.class) + protected String saveTranscripts(Meeting meeting, JsonNode resultNode) { + // 关键:入库前清理旧记录,防止恢复任务导致数据重复 + transcriptMapper.delete(new LambdaQueryWrapper().eq(MeetingTranscript::getMeetingId, meeting.getId())); + StringBuilder sb = new StringBuilder(); JsonNode segments = resultNode.path("segments"); if (segments.isArray()) { @@ -214,12 +248,10 @@ public class AiTaskServiceImpl extends ServiceImpl impleme MeetingTranscript mt = new MeetingTranscript(); mt.setMeetingId(meeting.getId()); - // 解析 Speaker 对象 JsonNode spkNode = seg.path("speaker"); String spkId = spkNode.path("user_id").asText("spk_0"); String spkName = spkNode.path("name").asText(spkId); - // 用户名称转换逻辑 if (spkId.matches("\\d+")) { SysUser user = sysUserMapper.selectById(Long.parseLong(spkId)); if (user != null) spkName = user.getDisplayName() != null ? user.getDisplayName() : user.getUsername(); @@ -241,8 +273,8 @@ public class AiTaskServiceImpl extends ServiceImpl impleme } private void processSummaryTask(Meeting meeting, String asrText) throws Exception { - updateMeetingStatus(meeting.getId(), 2); // 总结中 - updateProgress(meeting.getId(), 90, "正在进行 AI 智能总结...", 0); + updateMeetingStatus(meeting.getId(), 2); + updateProgress(meeting.getId(), 90, "正在生成智能总结纪要...", 0); AiModel llmModel = aiModelService.getById(meeting.getSummaryModelId()); if (llmModel == null) return; @@ -252,7 +284,7 @@ public class AiTaskServiceImpl extends ServiceImpl impleme req.put("temperature", llmModel.getTemperature()); req.put("messages", List.of( Map.of("role", "system", "content", meeting.getPromptContent()), - Map.of("role", "user", "content", "请总结以下内容:\n" + asrText) + Map.of("role", "user", "content", "请总结以下会议内容:\n" + asrText) )); AiTask taskRecord = createAiTask(meeting.getId(), "SUMMARY", req); @@ -271,13 +303,13 @@ public class AiTaskServiceImpl extends ServiceImpl impleme if (response.statusCode() == 200 && respNode.has("choices")) { String content = respNode.path("choices").path(0).path("message").path("content").asText(); meeting.setSummaryContent(content); - meeting.setStatus(3); // Finished + meeting.setStatus(3); meetingMapper.updateById(meeting); updateAiTaskSuccess(taskRecord, respNode); - updateProgress(meeting.getId(), 100, "分析已完成", 0); + updateProgress(meeting.getId(), 100, "全流程分析完成", 0); } else { - updateAiTaskFail(taskRecord, "LLM failed: " + response.body()); - throw new RuntimeException("AI总结生成失败"); + updateAiTaskFail(taskRecord, "LLM Summary failed: " + response.body()); + throw new RuntimeException("AI总结生成异常"); } } @@ -291,53 +323,38 @@ public class AiTaskServiceImpl extends ServiceImpl impleme redisTemplate.opsForValue().set(RedisKeys.meetingProgressKey(meetingId), objectMapper.writeValueAsString(progress), 1, TimeUnit.HOURS); } catch (Exception e) { - log.error("Update progress to redis failed", e); + log.error("Redis progress update error", e); } } private String postJson(String url, Object body) throws Exception { - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(url)) - .header("Content-Type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(body))) - .build(); - return httpClient.send(request, HttpResponse.BodyHandlers.ofString()).body(); + return httpClient.send(HttpRequest.newBuilder().uri(URI.create(url)).header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(body))).build(), + HttpResponse.BodyHandlers.ofString()).body(); } private String get(String url) throws Exception { - HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).GET().build(); - return httpClient.send(request, HttpResponse.BodyHandlers.ofString()).body(); + return httpClient.send(HttpRequest.newBuilder().uri(URI.create(url)).GET().build(), HttpResponse.BodyHandlers.ofString()).body(); } private void updateMeetingStatus(Long id, int status) { - Meeting m = new Meeting(); - m.setId(id); - m.setStatus(status); - meetingMapper.updateById(m); + Meeting m = new Meeting(); m.setId(id); m.setStatus(status); meetingMapper.updateById(m); } private AiTask createAiTask(Long meetingId, String type, Map req) { AiTask task = new AiTask(); - task.setMeetingId(meetingId); - task.setTaskType(type); - task.setStatus(1); - task.setRequestData(req); - task.setStartedAt(LocalDateTime.now()); - this.save(task); - return task; + task.setMeetingId(meetingId); task.setTaskType(type); task.setStatus(1); + task.setRequestData(req); task.setStartedAt(LocalDateTime.now()); + this.save(task); return task; } private void updateAiTaskSuccess(AiTask task, JsonNode resp) { - task.setStatus(2); - task.setResponseData(objectMapper.convertValue(resp, Map.class)); - task.setCompletedAt(LocalDateTime.now()); - this.updateById(task); + task.setStatus(2); task.setResponseData(objectMapper.convertValue(resp, Map.class)); + task.setCompletedAt(LocalDateTime.now()); this.updateById(task); } private void updateAiTaskFail(AiTask task, String error) { - task.setStatus(3); - task.setErrorMsg(error); - task.setCompletedAt(LocalDateTime.now()); - this.updateById(task); + task.setStatus(3); task.setErrorMsg(error); + task.setCompletedAt(LocalDateTime.now()); this.updateById(task); } } diff --git a/frontend/src/pages/business/HotWords.tsx b/frontend/src/pages/business/HotWords.tsx index decce9c..c8d982c 100644 --- a/frontend/src/pages/business/HotWords.tsx +++ b/frontend/src/pages/business/HotWords.tsx @@ -8,6 +8,7 @@ import { PlusOutlined, EditOutlined, DeleteOutlined, SearchOutlined, UserOutlined, GlobalOutlined } from '@ant-design/icons'; +import { useTranslation } from 'react-i18next'; import { useDict } from '../../hooks/useDict'; import { getHotWordPage, @@ -23,6 +24,7 @@ const { Option } = Select; const { Text } = Typography; const HotWords: React.FC = () => { + const { t } = useTranslation(); const [form] = Form.useForm(); const { items: categories, loading: dictLoading } = useDict('biz_hotword_category'); const [loading, setLoading] = useState(false); diff --git a/frontend/src/pages/business/MeetingDetail.tsx b/frontend/src/pages/business/MeetingDetail.tsx index 2e5e905..a7e8025 100644 --- a/frontend/src/pages/business/MeetingDetail.tsx +++ b/frontend/src/pages/business/MeetingDetail.tsx @@ -41,7 +41,7 @@ const MeetingProgressDisplay: React.FC<{ meetingId: number; onComplete: () => vo // 格式化剩余时间 (ETA) const formatETA = (seconds?: number) => { - if (!seconds || seconds <= 0) return '即将完成'; + if (!seconds || seconds <= 0) return '正在分析中'; if (seconds < 60) return `${seconds}秒`; const m = Math.floor(seconds / 60); const s = seconds % 60; @@ -67,7 +67,7 @@ const MeetingProgressDisplay: React.FC<{ meetingId: number; onComplete: () => vo {progress?.message || '正在准备计算资源...'} - 分析过程大约需要 1-3 分钟,请耐心等待,您可以先去处理其他工作 + 分析过程中,请耐心等待,您可以先去处理其他工作 @@ -304,7 +304,28 @@ const MeetingDetail: React.FC = () => { - {isOwner && } + {isOwner && meeting.status === 3 && ( + + )} + {isOwner && meeting.status === 2 && ( + + )} diff --git a/frontend/src/pages/business/PromptTemplates.tsx b/frontend/src/pages/business/PromptTemplates.tsx index 88a80a8..0f99116 100644 --- a/frontend/src/pages/business/PromptTemplates.tsx +++ b/frontend/src/pages/business/PromptTemplates.tsx @@ -241,10 +241,10 @@ const PromptTemplates: React.FC = () => { -
- {item.templateName} - 使用次数: {item.usageCount || 0} -
+ {/*
*/} + {/* {item.templateName}*/} + {/* 使用次数: {item.usageCount || 0}*/} + {/*
*/}
{item.tags?.map(tag => {