diff --git a/backend/src/main/java/com/imeeting/common/RedisKeys.java b/backend/src/main/java/com/imeeting/common/RedisKeys.java index 4cb6545..9faf822 100644 --- a/backend/src/main/java/com/imeeting/common/RedisKeys.java +++ b/backend/src/main/java/com/imeeting/common/RedisKeys.java @@ -71,6 +71,10 @@ public final class RedisKeys { return "biz:meeting:realtime:state:" + meetingId; } + public static String realtimeMeetingTranscriptCacheKey(Long meetingId) { + return "biz:meeting:realtime:transcript-cache:" + meetingId; + } + public static String realtimeMeetingResumeTimeoutKey(Long meetingId) { return realtimeMeetingResumeTimeoutPrefix() + meetingId; } diff --git a/backend/src/main/java/com/imeeting/controller/biz/AiModelController.java b/backend/src/main/java/com/imeeting/controller/biz/AiModelController.java index 9bd38a1..71af7c6 100644 --- a/backend/src/main/java/com/imeeting/controller/biz/AiModelController.java +++ b/backend/src/main/java/com/imeeting/controller/biz/AiModelController.java @@ -3,6 +3,7 @@ package com.imeeting.controller.biz; import com.imeeting.dto.biz.AiLocalProfileVO; import com.imeeting.dto.biz.AiModelDTO; import com.imeeting.dto.biz.AiModelVO; +import com.imeeting.enums.ModelProviderEnum; import com.imeeting.service.biz.AiModelService; import com.unisbase.common.ApiResponse; import com.unisbase.common.annotation.Log; @@ -121,7 +122,7 @@ public class AiModelController { @PostMapping("/llm-connectivity-test") @PreAuthorize("isAuthenticated()") public ApiResponse testLlmConnectivity(@RequestBody AiModelDTO dto) { - if ("custom".equalsIgnoreCase(dto.getProvider()) && (dto.getBaseUrl() == null || dto.getBaseUrl().isBlank())) { + if (ModelProviderEnum.LOCAL.getCode().equalsIgnoreCase(dto.getProvider()) && (dto.getBaseUrl() == null || dto.getBaseUrl().isBlank())) { return ApiResponse.error("基础地址不能为空"); } if (dto.getModelCode() == null || dto.getModelCode().isBlank()) { diff --git a/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java b/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java index 7a8d354..ffdbc8e 100644 --- a/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java +++ b/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java @@ -399,17 +399,6 @@ public class MeetingController { return ApiResponse.ok(result); } - @Operation(summary = "追加实时转写片段") - @PostMapping("/{id}/realtime/transcripts") - @PreAuthorize("isAuthenticated()") - public ApiResponse appendRealtimeTranscripts(@PathVariable Long id, @RequestBody List items) { - LoginUser loginUser = currentLoginUser(); - Meeting meeting = meetingAccessService.requireMeeting(id); - meetingAccessService.assertCanControlRealtimeMeeting(meeting, loginUser, MeetingConstants.SOURCE_WEB); - meetingCommandService.appendRealtimeTranscripts(id, items); - return ApiResponse.ok(true); - } - @Operation(summary = "暂停实时会议") @PostMapping("/{id}/realtime/pause") @PreAuthorize("isAuthenticated()") diff --git a/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheItem.java b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheItem.java new file mode 100644 index 0000000..4cfcbd1 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheItem.java @@ -0,0 +1,21 @@ +package com.imeeting.dto.biz; + +import lombok.Data; + +@Data +public class RealtimeMeetingTranscriptCacheItem { + private String sentenceKey; + private Integer sentenceId; + private Integer sentenceType; + private String speakerId; + private String speakerName; + private String userId; + private Integer startTime; + private Integer endTime; + private String content; + private Integer sortOrder; + private Boolean finalResult; + private Long transcriptId; + private Long firstReceivedAt; + private Long updatedAt; +} diff --git a/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheState.java b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheState.java new file mode 100644 index 0000000..8beaf3a --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingTranscriptCacheState.java @@ -0,0 +1,15 @@ +package com.imeeting.dto.biz; + +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; + +@Data +public class RealtimeMeetingTranscriptCacheState { + private Long meetingId; + private Integer nextSortOrder; + private Integer nextLegacySequence; + private List items = new ArrayList<>(); + private Long updatedAt; +} diff --git a/backend/src/main/java/com/imeeting/dto/biz/RealtimeSocketSessionData.java b/backend/src/main/java/com/imeeting/dto/biz/RealtimeSocketSessionData.java index c9f6ead..bc56143 100644 --- a/backend/src/main/java/com/imeeting/dto/biz/RealtimeSocketSessionData.java +++ b/backend/src/main/java/com/imeeting/dto/biz/RealtimeSocketSessionData.java @@ -8,5 +8,6 @@ public class RealtimeSocketSessionData { private Long userId; private Long tenantId; private Long asrModelId; + private String provider; private String targetWsUrl; } diff --git a/backend/src/main/java/com/imeeting/enums/ModelProviderEnum.java b/backend/src/main/java/com/imeeting/enums/ModelProviderEnum.java new file mode 100644 index 0000000..fe5de13 --- /dev/null +++ b/backend/src/main/java/com/imeeting/enums/ModelProviderEnum.java @@ -0,0 +1,18 @@ +package com.imeeting.enums; + +import lombok.Getter; + +@Getter +public enum ModelProviderEnum { + LOCAL("local", "本地"), + + ; + + private final String code; + private final String description; + + ModelProviderEnum(String code, String description) { + this.code = code; + this.description = description; + } +} diff --git a/backend/src/main/java/com/imeeting/service/biz/MeetingCommandService.java b/backend/src/main/java/com/imeeting/service/biz/MeetingCommandService.java index 6ed8965..f781e87 100644 --- a/backend/src/main/java/com/imeeting/service/biz/MeetingCommandService.java +++ b/backend/src/main/java/com/imeeting/service/biz/MeetingCommandService.java @@ -28,8 +28,6 @@ public interface MeetingCommandService { void deleteMeeting(Long id); - void appendRealtimeTranscripts(Long meetingId, List items); - void saveRealtimeTranscriptSnapshot(Long meetingId, RealtimeTranscriptItemDTO item, boolean finalResult); void completeRealtimeMeeting(Long meetingId, String audioUrl, boolean overwriteAudio); diff --git a/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java b/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java index 7f4c273..3f56ce6 100644 --- a/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java +++ b/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java @@ -25,6 +25,8 @@ public interface RealtimeMeetingSessionStateService { void refreshAfterTranscript(Long meetingId); + void refreshAfterTranscriptCapture(Long meetingId, long transcriptCount); + boolean markCompletingIfResumeExpired(Long meetingId); void expireEmptySession(Long meetingId); diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/AiModelServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/AiModelServiceImpl.java index 1f552a2..ddf3a05 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/AiModelServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/AiModelServiceImpl.java @@ -11,6 +11,7 @@ import com.imeeting.dto.biz.AiLocalProfileVO; import com.imeeting.dto.biz.AiModelVO; import com.imeeting.entity.biz.AsrModel; import com.imeeting.entity.biz.LlmModel; +import com.imeeting.enums.ModelProviderEnum; import com.imeeting.mapper.biz.AsrModelMapper; import com.imeeting.mapper.biz.LlmModelMapper; import com.imeeting.service.biz.AiModelService; @@ -169,7 +170,7 @@ public class AiModelServiceImpl implements AiModelService { if (resolvedBaseUrl == null || resolvedBaseUrl.isBlank()) { return Collections.emptyList(); } - if ("custom".equals(providerKey)) { + if (ModelProviderEnum.LOCAL.getCode().equals(providerKey)) { return fetchLocalProfile(resolvedBaseUrl, apiKey).getAsrModels(); } String targetUrl = resolveModelListUrl(providerKey, resolvedBaseUrl, apiKey); @@ -464,7 +465,7 @@ public class AiModelServiceImpl implements AiModelService { } private String resolveModelListUrl(String providerKey, String baseUrl, String apiKey) { - if ("Custom".equalsIgnoreCase(providerKey)) { + if (ModelProviderEnum.LOCAL.getCode().equalsIgnoreCase(providerKey)) { return baseUrl+"/api/asrconfig"; } if ("gemini".equals(providerKey) || "google".equals(providerKey)) { @@ -832,7 +833,7 @@ public class AiModelServiceImpl implements AiModelService { } private void pushAsrConfig(AsrModel entity) { - if ("custom".equals(normalizeProvider(entity.getProvider()))) { + if (ModelProviderEnum.LOCAL.getCode().equals(normalizeProvider(entity.getProvider()))) { if (entity.getApiKey() == null || entity.getApiKey().isBlank()) { log.info("Skip syncing local ASR profile because apiKey is blank, modelName={}", entity.getModelName()); return; diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java index 19bc0c7..17a61aa 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java @@ -49,6 +49,7 @@ import com.imeeting.support.redis.MeetingAsrPermitCache; import com.imeeting.support.redis.MeetingLockCache; import com.unisbase.common.exception.BusinessException; import com.unisbase.common.exception.ErrorCodeEnum; +import com.imeeting.websocket.RealtimeMeetingProxyWebSocketHandler; import com.unisbase.service.SysParamService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -82,7 +83,8 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { private final MeetingDomainSupport meetingDomainSupport; private final MeetingRuntimeProfileResolver meetingRuntimeProfileResolver; private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; - private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService; + private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService; + private final RealtimeMeetingProxyWebSocketHandler realtimeMeetingProxyWebSocketHandler; private final MeetingProgressService meetingProgressService; private final MeetingPointsService meetingPointsService; private final MeetingSummaryPromptAssembler meetingSummaryPromptAssembler; @@ -109,7 +111,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { MeetingDomainSupport meetingDomainSupport, MeetingRuntimeProfileResolver meetingRuntimeProfileResolver, RealtimeMeetingSessionStateService realtimeMeetingSessionStateService, - RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService, + RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService, RealtimeMeetingProxyWebSocketHandler realtimeMeetingProxyWebSocketHandler, MeetingProgressService meetingProgressService, MeetingPointsService meetingPointsService, MeetingSummaryPromptAssembler meetingSummaryPromptAssembler, @@ -131,7 +133,8 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { this.meetingDomainSupport = meetingDomainSupport; this.meetingRuntimeProfileResolver = meetingRuntimeProfileResolver; this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService; - this.realtimeMeetingAudioStorageService = realtimeMeetingAudioStorageService; + this.realtimeMeetingAudioStorageService = realtimeMeetingAudioStorageService; + this.realtimeMeetingProxyWebSocketHandler = realtimeMeetingProxyWebSocketHandler; this.meetingProgressService = meetingProgressService; this.meetingPointsService = meetingPointsService; this.meetingSummaryPromptAssembler = meetingSummaryPromptAssembler; @@ -347,58 +350,6 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { deleteMeetingArtifactsAfterCommit(id); } - @Override - @Transactional(rollbackFor = Exception.class) - public void appendRealtimeTranscripts(Long meetingId, List items) { - if (items == null || items.isEmpty()) { - return; - } - - Integer maxSortOrder = transcriptMapper.selectList(new LambdaQueryWrapper() - .eq(MeetingTranscript::getMeetingId, meetingId) - .orderByDesc(MeetingTranscript::getSortOrder) - .last("LIMIT 1")) - .stream() - .findFirst() - .map(MeetingTranscript::getSortOrder) - .orElse(0); - - int nextSortOrder = maxSortOrder == null ? 0 : maxSortOrder + 1; - boolean inserted = false; - for (RealtimeTranscriptItemDTO item : items) { - if (item.getContent() == null || item.getContent().isBlank()) { - continue; - } - - MeetingTranscript existing = transcriptMapper.selectOne(new LambdaQueryWrapper() - .eq(MeetingTranscript::getMeetingId, meetingId) - .eq(MeetingTranscript::getContent, item.getContent().trim()) - .eq(item.getSpeakerId() != null && !item.getSpeakerId().isBlank(), MeetingTranscript::getSpeakerId, item.getSpeakerId()) - .eq(item.getStartTime() != null, MeetingTranscript::getStartTime, item.getStartTime()) - .eq(item.getEndTime() != null, MeetingTranscript::getEndTime, item.getEndTime()) - .last("LIMIT 1")); - if (existing != null) { - continue; - } - - MeetingTranscript transcript = new MeetingTranscript(); - transcript.setMeetingId(meetingId); - transcript.setSpeakerId(meetingDomainSupport.resolveSpeakerId(item.getSpeakerId())); - transcript.setSpeakerName(meetingDomainSupport.resolveSpeakerName(item.getSpeakerId(), item.getSpeakerName())); - transcript.setContent(item.getContent().trim()); - transcript.setStartTime(item.getStartTime()); - transcript.setEndTime(item.getEndTime()); - transcript.setSortOrder(nextSortOrder++); - transcriptMapper.insert(transcript); - inserted = true; - } - - if (inserted) { - meetingTranscriptFileService.initializeTranscriptFileIfAbsent(meetingId); - realtimeMeetingSessionStateService.refreshAfterTranscript(meetingId); - } - } - @Override @Transactional(rollbackFor = Exception.class) public void saveRealtimeTranscriptSnapshot(Long meetingId, RealtimeTranscriptItemDTO item, boolean finalResult) { @@ -457,11 +408,11 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { if (meeting == null) { throw new RuntimeException("会议不存在"); } - + realtimeMeetingProxyWebSocketHandler.closeMeetingSession(meetingId); RealtimeMeetingSessionStatusVO currentStatus = realtimeMeetingSessionStateService.getStatus(meetingId); if (overwriteAudio) { if (audioUrl == null || audioUrl.isBlank()) { - throw new RuntimeException("overwriteAudio 为 true 时必须提供音频地址"); + throw new RuntimeException("overwriteAudio=true requires audioUrl"); } meetingDomainSupport.applyMeetingAudioMetadata( meeting, diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java index 38578a1..14e9406 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java @@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; import com.imeeting.dto.biz.RealtimeMeetingSessionState; import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; +import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheState; import com.imeeting.entity.biz.Meeting; import com.imeeting.entity.biz.MeetingTranscript; import com.imeeting.enums.MeetingStatusEnum; @@ -12,6 +13,7 @@ import com.imeeting.mapper.biz.MeetingTranscriptMapper; import com.imeeting.service.biz.RealtimeMeetingSessionStateService; import com.imeeting.support.redis.MeetingLockCache; import com.imeeting.support.redis.RealtimeMeetingSessionCache; +import com.imeeting.support.redis.RealtimeMeetingTranscriptCache; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -31,6 +33,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe private final MeetingLockCache meetingLockCache; private final MeetingTranscriptMapper transcriptMapper; private final MeetingMapper meetingMapper; + private final RealtimeMeetingTranscriptCache realtimeMeetingTranscriptCache; @Value("${imeeting.realtime.resume-window-minutes:30}") private String resumeWindowMinutesValue; @@ -49,8 +52,9 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe next.setTenantId(tenantId); next.setUserId(userId); next.setStatus("IDLE"); - next.setHasTranscript(countTranscripts(meetingId) > 0); - next.setTranscriptCountSnapshot(countTranscripts(meetingId)); + long transcriptCount = countCapturedTranscripts(meetingId); + next.setHasTranscript(transcriptCount > 0); + next.setTranscriptCountSnapshot(transcriptCount); next.setUpdatedAt(System.currentTimeMillis()); writeState(next); } @@ -72,16 +76,16 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe String currentStatus = status.getStatus(); if ("COMPLETING".equals(currentStatus)) { - throw new RuntimeException("实时会议正在结束处理中"); + throw new RuntimeException("Realtime meeting is completing"); } if ("COMPLETED".equals(currentStatus)) { - throw new RuntimeException("实时会议已结束"); + throw new RuntimeException("Realtime meeting is already completed"); } if ("ACTIVE".equals(currentStatus) || Boolean.TRUE.equals(status.getActiveConnection())) { - throw new RuntimeException("实时会议已存在活动连接"); + throw new RuntimeException("Realtime meeting already has an active connection"); } if ("PAUSED_RESUMABLE".equals(currentStatus) && !Boolean.TRUE.equals(status.getCanResume())) { - throw new RuntimeException("实时会议恢复窗口已过期"); + throw new RuntimeException("Realtime meeting resume window has expired"); } } @@ -102,7 +106,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe } long now = System.currentTimeMillis(); - long transcriptCount = countTranscripts(meetingId); + long transcriptCount = countCapturedTranscripts(meetingId); state.setStatus("ACTIVE"); state.setHasTranscript(transcriptCount > 0); state.setTranscriptCountSnapshot(transcriptCount); @@ -182,9 +186,13 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe @Override public void refreshAfterTranscript(Long meetingId) { + refreshAfterTranscriptCapture(meetingId, countTranscripts(meetingId)); + } + + @Override + public void refreshAfterTranscriptCapture(Long meetingId, long transcriptCount) { RealtimeMeetingSessionState state = getOrCreateState(meetingId); long now = System.currentTimeMillis(); - long transcriptCount = countTranscripts(meetingId); state.setHasTranscript(transcriptCount > 0); state.setTranscriptCountSnapshot(transcriptCount); @@ -214,7 +222,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe return false; } - long transcriptCount = countTranscripts(meetingId); + long transcriptCount = countCapturedTranscripts(meetingId); if (transcriptCount <= 0) { clear(meetingId); return false; @@ -256,7 +264,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe } private RealtimeMeetingSessionStatusVO pauseState(Long meetingId, RealtimeMeetingSessionState state) { - long transcriptCount = countTranscripts(meetingId); + long transcriptCount = countCapturedTranscripts(meetingId); long now = System.currentTimeMillis(); state.setHasTranscript(transcriptCount > 0); @@ -307,7 +315,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe } else { vo.setStatus("IDLE"); } - vo.setHasTranscript(countTranscripts(meetingId) > 0); + vo.setHasTranscript(countCapturedTranscripts(meetingId) > 0); vo.setCanResume(false); vo.setRemainingSeconds(0L); vo.setActiveConnection(false); @@ -362,8 +370,9 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe RealtimeMeetingSessionState next = new RealtimeMeetingSessionState(); next.setMeetingId(meetingId); next.setStatus("IDLE"); - next.setHasTranscript(countTranscripts(meetingId) > 0); - next.setTranscriptCountSnapshot(countTranscripts(meetingId)); + long transcriptCount = countCapturedTranscripts(meetingId); + next.setHasTranscript(transcriptCount > 0); + next.setTranscriptCountSnapshot(transcriptCount); next.setUpdatedAt(System.currentTimeMillis()); return next; } @@ -400,4 +409,21 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe return transcriptMapper.selectCount(new LambdaQueryWrapper() .eq(MeetingTranscript::getMeetingId, meetingId)); } + + private long countCapturedTranscripts(Long meetingId) { + return Math.max(countTranscripts(meetingId), countCachedTranscripts(meetingId)); + } + + private long countCachedTranscripts(Long meetingId) { + if (meetingId == null) { + return 0L; + } + RealtimeMeetingTranscriptCacheState state = realtimeMeetingTranscriptCache.getState(meetingId); + if (state == null || state.getItems() == null || state.getItems().isEmpty()) { + return 0L; + } + return state.getItems().stream() + .filter(item -> item.getContent() != null && !item.getContent().isBlank()) + .count(); + } } diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java index 6477014..1bb3e81 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java @@ -10,12 +10,13 @@ import com.imeeting.service.biz.AiModelService; import com.imeeting.service.biz.MeetingAccessService; import com.imeeting.service.biz.RealtimeMeetingSessionStateService; import com.imeeting.service.biz.RealtimeMeetingSocketSessionService; +import com.imeeting.service.realtime.RealtimeAsrChannel; +import com.imeeting.service.realtime.RealtimeAsrChannelFactory; import com.imeeting.support.redis.RealtimeMeetingSocketSessionCache; import com.unisbase.security.LoginUser; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -30,6 +31,7 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS private final MeetingAccessService meetingAccessService; private final AiModelService aiModelService; private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; + private final RealtimeAsrChannelFactory realtimeAsrChannelFactory; @Override public RealtimeSocketSessionVO createSession(Long meetingId, Long asrModelId, String mode, String language, @@ -37,10 +39,10 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS Boolean enableTextRefine, Boolean saveAudio, List> hotwords, LoginUser loginUser) { if (meetingId == null) { - throw new RuntimeException("会议 ID 不能为空"); + throw new RuntimeException("浼氳 ID 涓嶈兘涓虹┖"); } if (asrModelId == null) { - throw new RuntimeException("ASR 模型 ID 不能为空"); + throw new RuntimeException("ASR 妯″瀷 ID 涓嶈兘涓虹┖"); } Meeting meeting = meetingAccessService.requireMeeting(meetingId); @@ -51,12 +53,13 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR"); if (asrModel == null) { - throw new RuntimeException("ASR 模型不存在"); + throw new RuntimeException("ASR 妯″瀷涓嶅瓨鍦?"); } - String targetWsUrl = resolveWsUrl(asrModel); + RealtimeAsrChannel realtimeAsrChannel = realtimeAsrChannelFactory.getRequired(asrModel.getProvider()); + String targetWsUrl = realtimeAsrChannel.resolveTargetWsUrl(asrModel); if (targetWsUrl == null || targetWsUrl.isBlank()) { - throw new RuntimeException("ASR 模型未配置 WebSocket 地址"); + throw new RuntimeException("ASR 妯″瀷鏈厤缃?WebSocket 鍦板潃"); } RealtimeMeetingResumeConfig resumeConfig = new RealtimeMeetingResumeConfig(); @@ -82,6 +85,7 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS sessionData.setUserId(loginUser.getUserId()); sessionData.setTenantId(loginUser.getTenantId()); sessionData.setAsrModelId(asrModelId); + sessionData.setProvider(realtimeAsrChannelFactory.normalizeProvider(asrModel.getProvider())); sessionData.setTargetWsUrl(targetWsUrl); String sessionToken = UUID.randomUUID().toString().replace("-", ""); @@ -91,9 +95,8 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS vo.setSessionToken(sessionToken); vo.setPath(WS_PATH); vo.setExpiresInSeconds(socketSessionCache.getSessionTtlSeconds()); - vo.setStartMessage(buildStartMessage( + vo.setStartMessage(realtimeAsrChannel.buildStartMessage( asrModel, - meetingId, mode, language, useSpkId, @@ -110,74 +113,4 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS public RealtimeSocketSessionData getSessionData(String sessionToken) { return socketSessionCache.get(sessionToken); } - - private String resolveWsUrl(AiModelVO model) { - if (model.getWsUrl() != null && !model.getWsUrl().isBlank()) { - return model.getWsUrl(); - } - if (model.getBaseUrl() == null || model.getBaseUrl().isBlank()) { - return ""; - } - return model.getBaseUrl() - .replaceFirst("^http://", "ws://") - .replaceFirst("^https://", "wss://"); - } - - private Map buildStartMessage(AiModelVO model, Long meetingId, String mode, String language, - Integer useSpkId, Boolean enablePunctuation, Boolean enableItn, - Boolean enableTextRefine, Boolean saveAudio, - List> hotwords) { - Map root = new HashMap<>(); - root.put("type", "start"); - root.put("request_id", "web_" + System.currentTimeMillis() + "_" + meetingId); - root.put("authorization", buildAuthorization(model.getApiKey())); - - Map config = new HashMap<>(); - Map audio = new HashMap<>(); - audio.put("format", "pcm"); - audio.put("sample_rate", 16000); - audio.put("channels", 1); - config.put("audio", audio); - - Map recognition = new HashMap<>(); - recognition.put("language", normalizeLanguage(language)); - recognition.put("enable_punctuation", boolOrDefault(enablePunctuation, true)); - recognition.put("enable_itn", boolOrDefault(enableItn, true)); - recognition.put("enable_speaker", Integer.valueOf(1).equals(useSpkId)); - recognition.put("enable_two_pass", !"online".equalsIgnoreCase(mode)); - recognition.put("enable_text_refine", boolOrDefault(enableTextRefine, false)); - recognition.put("speaker_threshold", readSpeakerThreshold(model.getMediaConfig())); - recognition.put("hotwords", hotwords == null ? List.of() : hotwords); - config.put("recognition", recognition); - - config.put("model", model.getModelCode()); - config.put("save_audio", boolOrDefault(saveAudio, false)); - root.put("config", config); - return root; - } - - private String buildAuthorization(String apiKey) { - if (apiKey == null || apiKey.isBlank()) { - return ""; - } - return apiKey.startsWith("Bearer ") ? apiKey : "Bearer " + apiKey; - } - - private Object readSpeakerThreshold(Map mediaConfig) { - if (mediaConfig == null) { - return null; - } - return mediaConfig.get("svThreshold"); - } - - private String normalizeLanguage(String language) { - if (language == null || language.isBlank()) { - return "auto"; - } - return language.trim(); - } - - private boolean boolOrDefault(Boolean value, boolean defaultValue) { - return value != null ? value : defaultValue; - } } diff --git a/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannel.java b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannel.java new file mode 100644 index 0000000..382fbf1 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannel.java @@ -0,0 +1,32 @@ +package com.imeeting.service.realtime; + +import com.imeeting.dto.biz.AiModelVO; + +import java.util.List; +import java.util.Map; + +public interface RealtimeAsrChannel { + boolean supports(String provider); + + String resolveTargetWsUrl(AiModelVO model); + + Map buildStartMessage(AiModelVO model, + String mode, + String language, + Integer useSpkId, + Boolean enablePunctuation, + Boolean enableItn, + Boolean enableTextRefine, + Boolean saveAudio, + List> hotwords); + + void connect(RealtimeAsrChannelContext context) throws Exception; + + void handleFrontendText(RealtimeAsrChannelContext context, String payload); + + void handleFrontendBinary(RealtimeAsrChannelContext context, byte[] payload); + + void closeMeeting(RealtimeAsrChannelContext context); + + boolean isOpen(RealtimeAsrChannelContext context); +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannelCallback.java b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannelCallback.java new file mode 100644 index 0000000..e5697f6 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannelCallback.java @@ -0,0 +1,17 @@ +package com.imeeting.service.realtime; + +import org.springframework.web.socket.CloseStatus; + +public interface RealtimeAsrChannelCallback { + void onChannelOpen(Long meetingId) throws Exception; + + void sendFrontendText(Long meetingId, String payload) throws Exception; + + void sendFrontendBinary(Long meetingId, byte[] payload) throws Exception; + + void sendFrontendError(Long meetingId, String code, String message); + + void removeMeetingSession(Long meetingId); + + void closeFrontend(Long meetingId, CloseStatus status); +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannelContext.java b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannelContext.java new file mode 100644 index 0000000..0098dfa --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannelContext.java @@ -0,0 +1,26 @@ +package com.imeeting.service.realtime; + +import lombok.Data; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Data +public class RealtimeAsrChannelContext { + private Long meetingId; + private String provider; + private String targetWsUrl; + private WebSocketSession rawSession; + private ConcurrentWebSocketSessionDecorator frontendSession; + private RealtimeAsrChannelCallback callback; + private final ConcurrentMap channelState = new ConcurrentHashMap<>(); + private volatile ConcurrentMap frontendState = new ConcurrentHashMap<>(); + + public void bindFrontendSession(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) { + this.rawSession = rawSession; + this.frontendSession = frontendSession; + this.frontendState = new ConcurrentHashMap<>(); + } +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannelFactory.java b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannelFactory.java new file mode 100644 index 0000000..8b305ae --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/RealtimeAsrChannelFactory.java @@ -0,0 +1,29 @@ +package com.imeeting.service.realtime; + +import com.imeeting.enums.ModelProviderEnum; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +@RequiredArgsConstructor +public class RealtimeAsrChannelFactory { + + private final List channels; + + public RealtimeAsrChannel getRequired(String provider) { + String normalizedProvider = normalizeProvider(provider); + return channels.stream() + .filter(channel -> channel.supports(normalizedProvider)) + .findFirst() + .orElseThrow(() -> new RuntimeException("暂不支持的实时 ASR 渠道: " + provider)); + } + + public String normalizeProvider(String provider) { + if (provider == null || provider.isBlank()) { + return ModelProviderEnum.LOCAL.getCode(); + } + return provider.trim().toLowerCase(); + } +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingTranscriptCacheService.java b/backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingTranscriptCacheService.java new file mode 100644 index 0000000..a272936 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingTranscriptCacheService.java @@ -0,0 +1,13 @@ +package com.imeeting.service.realtime; + +import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheItem; + +import java.util.List; + +public interface RealtimeMeetingTranscriptCacheService { + void mergeUpstreamMessage(Long meetingId, String payload); + + List listOrderedItems(Long meetingId); + + void clear(Long meetingId); +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/LocalRealtimeAsrChannel.java b/backend/src/main/java/com/imeeting/service/realtime/impl/LocalRealtimeAsrChannel.java new file mode 100644 index 0000000..5b03acc --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/impl/LocalRealtimeAsrChannel.java @@ -0,0 +1,538 @@ +package com.imeeting.service.realtime.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.imeeting.dto.biz.AiModelVO; +import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheItem; +import com.imeeting.enums.ModelProviderEnum; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; +import com.imeeting.service.realtime.RealtimeAsrChannel; +import com.imeeting.service.realtime.RealtimeAsrChannelContext; +import com.imeeting.service.realtime.RealtimeMeetingTranscriptCacheService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; + +import java.io.ByteArrayOutputStream; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +@Slf4j +@Component +@RequiredArgsConstructor +public class LocalRealtimeAsrChannel implements RealtimeAsrChannel { + + private static final String STATE_UPSTREAM_SOCKET = "upstreamSocket"; + private static final String STATE_CLOSE_AFTER_END = "closeAfterEnd"; + private static final String STATE_START_MESSAGE_FORWARDED = "startMessageForwarded"; + private static final String STATE_UPSTREAM_SEND_CHAIN = "upstreamSendChain"; + private static final String STATE_START_MESSAGE_SENT = "startMessageSent"; + private static final String STATE_PENDING_AUDIO_FRAMES = "pendingAudioFrames"; + private static final CompletableFuture COMPLETED = CompletableFuture.completedFuture(null); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String STOP_MESSAGE = "{\"type\":\"stop\"}"; + + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; + private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService; + + @Override + public boolean supports(String provider) { + return ModelProviderEnum.LOCAL.getCode().equalsIgnoreCase(provider); + } + + @Override + public String resolveTargetWsUrl(AiModelVO model) { + if (model.getWsUrl() != null && !model.getWsUrl().isBlank()) { + return model.getWsUrl(); + } + if (model.getBaseUrl() == null || model.getBaseUrl().isBlank()) { + return ""; + } + return model.getBaseUrl() + .replaceFirst("^http://", "ws://") + .replaceFirst("^https://", "wss://"); + } + + @Override + public Map buildStartMessage(AiModelVO model, + String mode, + String language, + Integer useSpkId, + Boolean enablePunctuation, + Boolean enableItn, + Boolean enableTextRefine, + Boolean saveAudio, + List> hotwords) { + Map root = new HashMap<>(); + root.put("type", "start"); + + Map payload = new HashMap<>(); + payload.put("format", "pcm"); + payload.put("sample_rate", 16000); + payload.put("language", normalizeProtocolLanguage(language)); + payload.put("context", ""); + payload.put("enable_inverse_text_normalization", boolOrDefault(enableItn, true)); + payload.put("unfixed_token_num", 3); + payload.put("silence_duration_ms", 800); + payload.put("min_partial_sec", 0.3D); + payload.put("pre_roll_ms", 240); + payload.put("max_sentence_count", 8); + payload.put("partial_holdback_chars", 2); + payload.put("enable_native_partial_stream", false); + payload.put("enable_speaker", Integer.valueOf(1).equals(useSpkId)); + payload.put("match_speaker_registry", false); + payload.put("speaker_threshold", readSpeakerThreshold(model.getMediaConfig())); + payload.put("enable_realtime_longform", false); + payload.put("enable_realtime_vad_split", false); + payload.put("force_stable_segment_sec", 6); + payload.put("force_stable_min_chars", 24); + payload.put("max_segment_sec", 12); + payload.put("hotwords", hotwords == null ? List.of() : hotwords); + root.put("payload", payload); + return root; + } + + @Override + public void connect(RealtimeAsrChannelContext context) throws Exception { + initializeFrontendState(context); + java.net.http.WebSocket upstreamSocket = java.net.http.HttpClient.newHttpClient() + .newWebSocketBuilder() + .buildAsync(URI.create(context.getTargetWsUrl()), new UpstreamListener(context)) + .get(); + context.getChannelState().put(STATE_UPSTREAM_SOCKET, upstreamSocket); + } + + @Override + public void handleFrontendText(RealtimeAsrChannelContext context, String payload) { + java.net.http.WebSocket upstreamSocket = getUpstreamSocket(context); + if (upstreamSocket == null) { + return; + } + initializeFrontendState(context); + if (looksLikeStartMessage(payload)) { + context.getFrontendState().put(STATE_START_MESSAGE_SENT, Boolean.TRUE); + if (!Boolean.TRUE.equals(context.getChannelState().get(STATE_START_MESSAGE_FORWARDED))) { + context.getChannelState().put(STATE_START_MESSAGE_FORWARDED, Boolean.TRUE); + sendUpstreamOrdered(context, () -> upstreamSocket.sendText(payload, true), "text-start"); + } + flushPendingAudioFrames(context, upstreamSocket); + return; + } + if (looksLikeStopMessage(payload)) { + context.getChannelState().put(STATE_CLOSE_AFTER_END, Boolean.TRUE); + } + sendUpstreamOrdered(context, () -> upstreamSocket.sendText(payload, true), "text"); + } + + @Override + public void handleFrontendBinary(RealtimeAsrChannelContext context, byte[] payload) { + java.net.http.WebSocket upstreamSocket = getUpstreamSocket(context); + if (upstreamSocket == null) { + return; + } + initializeFrontendState(context); + if (!Boolean.TRUE.equals(context.getFrontendState().get(STATE_START_MESSAGE_SENT))) { + queuePendingAudioFrame(context, payload); + return; + } + sendUpstreamOrdered(context, () -> upstreamSocket.sendBinary(ByteBuffer.wrap(payload), true), "binary"); + } + + @Override + public void closeMeeting(RealtimeAsrChannelContext context) { + if (context == null) { + return; + } + java.net.http.WebSocket upstreamSocket = getUpstreamSocket(context); + if (upstreamSocket == null) { + return; + } + context.getChannelState().put(STATE_CLOSE_AFTER_END, Boolean.TRUE); + upstreamSocket.sendText(STOP_MESSAGE, true); + } + + @Override + public boolean isOpen(RealtimeAsrChannelContext context) { + return getUpstreamSocket(context) != null; + } + + public static String buildFrontendTranscriptMessage(RealtimeMeetingTranscriptCacheItem item) throws JsonProcessingException { + ObjectNode root = OBJECT_MAPPER.createObjectNode(); + boolean isFinal = Boolean.TRUE.equals(item.getFinalResult()); + root.put("type", isFinal ? "segment" : "partial"); + ObjectNode data = root.putObject("data"); + data.put("text", item.getContent()); + data.put("is_final", isFinal); + if (item.getSentenceId() != null) { + data.put("sentence_id", item.getSentenceId()); + } + if (item.getStartTime() != null) { + data.put("start", item.getStartTime() / 1000D); + } + if (item.getEndTime() != null) { + data.put("end", item.getEndTime() / 1000D); + } + if (item.getSpeakerId() != null && !item.getSpeakerId().isBlank()) { + data.put("speaker_id", item.getSpeakerId()); + } + if (item.getSpeakerName() != null && !item.getSpeakerName().isBlank()) { + data.put("speaker_name", item.getSpeakerName()); + } + if (item.getUserId() != null && !item.getUserId().isBlank()) { + data.put("user_id", item.getUserId()); + } + return OBJECT_MAPPER.writeValueAsString(root); + } + + private Object readSpeakerThreshold(Map mediaConfig) { + if (mediaConfig == null) { + return null; + } + return mediaConfig.get("svThreshold"); + } + + private String normalizeProtocolLanguage(String language) { + if (language == null || language.isBlank()) { + return null; + } + String normalized = language.trim(); + if ("auto".equalsIgnoreCase(normalized)) { + return null; + } + return normalized; + } + + private boolean boolOrDefault(Boolean value, boolean defaultValue) { + return value != null ? value : defaultValue; + } + + private void initializeFrontendState(RealtimeAsrChannelContext context) { + context.getFrontendState().putIfAbsent(STATE_UPSTREAM_SEND_CHAIN, COMPLETED); + context.getFrontendState().putIfAbsent(STATE_START_MESSAGE_SENT, Boolean.FALSE); + context.getFrontendState().putIfAbsent(STATE_PENDING_AUDIO_FRAMES, new ArrayList()); + } + + private java.net.http.WebSocket getUpstreamSocket(RealtimeAsrChannelContext context) { + Object value = context.getChannelState().get(STATE_UPSTREAM_SOCKET); + return value instanceof java.net.http.WebSocket socket ? socket : null; + } + + @SuppressWarnings("unchecked") + private void sendUpstreamOrdered(RealtimeAsrChannelContext context, + Supplier> sendAction, + String messageType) { + ConcurrentMap frontendState = context.getFrontendState(); + synchronized (frontendState) { + CompletableFuture chain = (CompletableFuture) frontendState.getOrDefault(STATE_UPSTREAM_SEND_CHAIN, COMPLETED); + CompletableFuture nextChain = chain + .exceptionally(ex -> null) + .thenCompose(ignored -> sendAction.get().thenApply(ignoredResult -> null)); + nextChain = nextChain.whenComplete((ignored, ex) -> { + if (ex != null) { + log.error("顺序发送上游消息失败:meetingId={}, sessionId={}, type={}", + context.getMeetingId(), currentConnectionId(context), messageType, ex); + } + }); + frontendState.put(STATE_UPSTREAM_SEND_CHAIN, nextChain); + } + } + + @SuppressWarnings("unchecked") + private void queuePendingAudioFrame(RealtimeAsrChannelContext context, byte[] payload) { + ConcurrentMap frontendState = context.getFrontendState(); + synchronized (frontendState) { + List pendingFrames = (List) frontendState.get(STATE_PENDING_AUDIO_FRAMES); + if (pendingFrames == null) { + pendingFrames = new ArrayList<>(); + frontendState.put(STATE_PENDING_AUDIO_FRAMES, pendingFrames); + } + pendingFrames.add(payload); + } + } + + @SuppressWarnings("unchecked") + private void flushPendingAudioFrames(RealtimeAsrChannelContext context, java.net.http.WebSocket upstreamSocket) { + List pendingFrames; + ConcurrentMap frontendState = context.getFrontendState(); + synchronized (frontendState) { + pendingFrames = (List) frontendState.get(STATE_PENDING_AUDIO_FRAMES); + if (pendingFrames == null || pendingFrames.isEmpty()) { + return; + } + frontendState.put(STATE_PENDING_AUDIO_FRAMES, new ArrayList()); + } + log.info("start 后开始补发排队音频帧:meetingId={}, sessionId={}, frameCount={}", + context.getMeetingId(), currentConnectionId(context), pendingFrames.size()); + for (byte[] frame : pendingFrames) { + sendUpstreamOrdered(context, () -> upstreamSocket.sendBinary(ByteBuffer.wrap(frame), true), "binary-flush"); + } + } + + private String currentConnectionId(RealtimeAsrChannelContext context) { + return context.getRawSession() == null ? null : context.getRawSession().getId(); + } + + private static ByteBuffer copyBuffer(ByteBuffer source) { + ByteBuffer duplicate = source.asReadOnlyBuffer(); + byte[] bytes = new byte[duplicate.remaining()]; + duplicate.get(bytes); + return ByteBuffer.wrap(bytes); + } + + private static boolean shouldLogBinaryFrame(int count) { + return count <= 3 || count % 25 == 0; + } + + private static String summarizeText(String payload) { + if (payload == null) { + return ""; + } + String normalized = payload.replaceAll("\\s+", " ").trim(); + if (normalized.length() <= 240) { + return normalized; + } + return normalized.substring(0, 240) + "..."; + } + + private static boolean looksLikeStartMessage(String payload) { + if (payload == null || payload.isBlank()) { + return false; + } + String normalized = payload.replaceAll("\\s+", ""); + return normalized.contains("\"type\":\"start\""); + } + + private static boolean looksLikeStopMessage(String payload) { + if (payload == null || payload.isBlank()) { + return false; + } + String normalized = payload.replaceAll("\\s+", ""); + return normalized.contains("\"type\":\"stop\""); + } + + private static boolean looksLikeEndMessage(String payload) { + if (payload == null || payload.isBlank()) { + return false; + } + try { + JsonNode root = OBJECT_MAPPER.readTree(payload); + return "end".equals(root.path("type").asText("")); + } catch (Exception ex) { + return false; + } + } + + private static List normalizeFrontendMessages(String upstreamPayload) { + try { + JsonNode root = OBJECT_MAPPER.readTree(upstreamPayload); + String type = root.path("type").asText(""); + if (!"sentences".equals(type) && !"end".equals(type)) { + return List.of(upstreamPayload); + } + List normalizedMessages = new ArrayList<>(); + JsonNode sentences = root.path("sentences"); + if (sentences.isArray()) { + for (JsonNode sentence : sentences) { + String text = sentence.path("sentence").asText("").trim(); + if (text.isEmpty()) { + continue; + } + boolean isFinal = sentence.path("sentence_type").asInt(0) != 0 || "end".equals(type); + normalizedMessages.add(buildFrontendTranscriptMessage(sentence, text, isFinal)); + } + } + if (normalizedMessages.isEmpty()) { + String fallbackText = root.path("result").path("voice_text_str").asText("").trim(); + if (!fallbackText.isEmpty()) { + normalizedMessages.add(buildFrontendTranscriptMessage((JsonNode) null, fallbackText, true)); + } + } + return normalizedMessages.isEmpty() ? List.of(upstreamPayload) : normalizedMessages; + } catch (Exception ex) { + return List.of(upstreamPayload); + } + } + + private static String buildFrontendTranscriptMessage(JsonNode sentence, String text, boolean isFinal) throws JsonProcessingException { + ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("type", isFinal ? "segment" : "partial"); + ObjectNode data = root.putObject("data"); + data.put("text", text); + data.put("is_final", isFinal); + if (sentence != null && sentence.has("sentence_id") && sentence.get("sentence_id").canConvertToInt()) { + data.put("sentence_id", sentence.get("sentence_id").asInt()); + } + if (sentence != null) { + copyOptionalTimeSeconds(sentence, data, "start_time", "start"); + copyOptionalTimeSeconds(sentence, data, "end_time", "end"); + copyOptionalText(sentence, data, "speaker_id"); + copyOptionalText(sentence, data, "speaker_name"); + copyOptionalText(sentence, data, "user_id"); + } + return OBJECT_MAPPER.writeValueAsString(root); + } + + private static void copyOptionalTimeSeconds(JsonNode source, ObjectNode target, String sourceFieldName, String targetFieldName) { + if (source == null) { + return; + } + if (source.has(sourceFieldName) && source.get(sourceFieldName).isNumber()) { + target.put(targetFieldName, source.get(sourceFieldName).asDouble() / 1000D); + return; + } + if (source.has(targetFieldName) && source.get(targetFieldName).isNumber()) { + target.put(targetFieldName, source.get(targetFieldName).asDouble()); + } + } + + private static void copyOptionalText(JsonNode source, ObjectNode target, String fieldName) { + if (source == null || !source.has(fieldName) || source.get(fieldName).isNull()) { + return; + } + String value = source.get(fieldName).asText("").trim(); + if (!value.isEmpty()) { + target.put(fieldName, value); + } + } + + private final class UpstreamListener implements java.net.http.WebSocket.Listener { + private final RealtimeAsrChannelContext context; + private final StringBuilder textBuffer = new StringBuilder(); + private final ByteArrayOutputStream binaryBuffer = new ByteArrayOutputStream(); + private final AtomicInteger upstreamTextCount = new AtomicInteger(); + private final AtomicInteger upstreamBinaryCount = new AtomicInteger(); + + private UpstreamListener(RealtimeAsrChannelContext context) { + this.context = context; + } + + @Override + public void onOpen(java.net.http.WebSocket webSocket) { + context.getChannelState().put(STATE_UPSTREAM_SOCKET, webSocket); + log.info("上游 ASR websocket 已打开:meetingId={}, sessionId={}, upstream={}", + context.getMeetingId(), currentConnectionId(context), context.getTargetWsUrl()); + String connectionId = currentConnectionId(context); + if (connectionId == null || !realtimeMeetingSessionStateService.activate(context.getMeetingId(), connectionId)) { + context.getCallback().sendFrontendError(context.getMeetingId(), "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议无法激活这条前端连接"); + webSocket.sendClose(CloseStatus.POLICY_VIOLATION.getCode(), "当前会议无法激活这条前端连接"); + context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.POLICY_VIOLATION.withReason("当前会议无法激活这条前端连接")); + return; + } + try { + context.getCallback().onChannelOpen(context.getMeetingId()); + } catch (Exception ex) { + log.error("通知前端上游就绪失败:meetingId={}, sessionId={}", context.getMeetingId(), currentConnectionId(context), ex); + context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.SERVER_ERROR); + return; + } + webSocket.request(1); + } + + @Override + public java.util.concurrent.CompletionStage onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) { + textBuffer.append(data); + if (last) { + int count = upstreamTextCount.incrementAndGet(); + String upstreamPayload = textBuffer.toString(); + realtimeMeetingTranscriptCacheService.mergeUpstreamMessage(context.getMeetingId(), upstreamPayload); + try { + for (String frontendPayload : normalizeFrontendMessages(upstreamPayload)) { + context.getCallback().sendFrontendText(context.getMeetingId(), frontendPayload); + } + log.info("上游 ASR 文本 -> 前端:meetingId={}, sessionId={}, count={}, payload={}", + context.getMeetingId(), currentConnectionId(context), count, summarizeText(upstreamPayload)); + if (Boolean.TRUE.equals(context.getChannelState().get(STATE_CLOSE_AFTER_END)) + && looksLikeEndMessage(upstreamPayload)) { + webSocket.sendClose(CloseStatus.NORMAL.getCode(), "meeting-complete"); + } + } catch (Exception ex) { + log.error("转发上游 ASR 文本失败:meetingId={}, sessionId={}", context.getMeetingId(), currentConnectionId(context), ex); + context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.SERVER_ERROR); + } finally { + textBuffer.setLength(0); + } + } + webSocket.request(1); + return COMPLETED; + } + + @Override + public java.util.concurrent.CompletionStage onBinary(java.net.http.WebSocket webSocket, ByteBuffer data, boolean last) { + byte[] chunk = new byte[data.remaining()]; + data.get(chunk); + binaryBuffer.writeBytes(chunk); + if (last) { + int count = upstreamBinaryCount.incrementAndGet(); + try { + context.getCallback().sendFrontendBinary(context.getMeetingId(), binaryBuffer.toByteArray()); + if (shouldLogBinaryFrame(count)) { + log.info("上游 ASR 二进制消息 -> 前端:meetingId={}, sessionId={}, count={}, bytes={}", + context.getMeetingId(), currentConnectionId(context), count, binaryBuffer.size()); + } + } catch (Exception ex) { + log.error("转发上游 ASR 二进制消息失败:meetingId={}, sessionId={}", context.getMeetingId(), currentConnectionId(context), ex); + context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.SERVER_ERROR); + } finally { + binaryBuffer.reset(); + } + } + webSocket.request(1); + return COMPLETED; + } + + @Override + public java.util.concurrent.CompletionStage onPing(java.net.http.WebSocket webSocket, ByteBuffer message) { + webSocket.sendPong(copyBuffer(message)); + log.info("上游 ASR ping 已本地响应:meetingId={}, sessionId={}, bytes={}", + context.getMeetingId(), currentConnectionId(context), message.remaining()); + webSocket.request(1); + return COMPLETED; + } + + @Override + public java.util.concurrent.CompletionStage onPong(java.net.http.WebSocket webSocket, ByteBuffer message) { + log.debug("上游 ASR pong 已本地忽略:meetingId={}, sessionId={}, bytes={}", + context.getMeetingId(), currentConnectionId(context), message.remaining()); + webSocket.request(1); + return COMPLETED; + } + + @Override + public java.util.concurrent.CompletionStage onClose(java.net.http.WebSocket webSocket, int statusCode, String reason) { + log.info("上游 ASR websocket 已关闭:meetingId={}, sessionId={}, code={}, reason={}", + context.getMeetingId(), currentConnectionId(context), statusCode, reason); + context.getChannelState().remove(STATE_UPSTREAM_SOCKET); + context.getCallback().removeMeetingSession(context.getMeetingId()); + context.getCallback().sendFrontendError(context.getMeetingId(), + "REALTIME_UPSTREAM_CLOSED", + reason == null || reason.isBlank() ? "上游 ASR WebSocket 已断开" : "上游 ASR WebSocket 已断开: " + reason); + context.getCallback().closeFrontend(context.getMeetingId(), new CloseStatus(statusCode, reason)); + return COMPLETED; + } + + @Override + public void onError(java.net.http.WebSocket webSocket, Throwable error) { + log.error("上游 ASR websocket 异常:meetingId={}, sessionId={}, upstream={}", + context.getMeetingId(), currentConnectionId(context), context.getTargetWsUrl(), error); + context.getChannelState().remove(STATE_UPSTREAM_SOCKET); + context.getCallback().removeMeetingSession(context.getMeetingId()); + context.getCallback().sendFrontendError(context.getMeetingId(), + "REALTIME_UPSTREAM_ERROR", + error == null || error.getMessage() == null || error.getMessage().isBlank() + ? "上游 ASR WebSocket 连接异常" + : "上游 ASR WebSocket 连接异常: " + error.getMessage()); + context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.SERVER_ERROR); + } + } +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingTranscriptCacheServiceImpl.java b/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingTranscriptCacheServiceImpl.java new file mode 100644 index 0000000..57b26da --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingTranscriptCacheServiceImpl.java @@ -0,0 +1,286 @@ +package com.imeeting.service.realtime.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheItem; +import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheState; +import com.imeeting.entity.biz.MeetingTranscript; +import com.imeeting.mapper.biz.MeetingTranscriptMapper; +import com.imeeting.service.biz.MeetingTranscriptFileService; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; +import com.imeeting.service.realtime.RealtimeMeetingTranscriptCacheService; +import com.imeeting.support.redis.RealtimeMeetingTranscriptCache; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +@Service +public class RealtimeMeetingTranscriptCacheServiceImpl implements RealtimeMeetingTranscriptCacheService { + + private final RealtimeMeetingTranscriptCache transcriptCache; + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; + private final MeetingTranscriptMapper transcriptMapper; + private final MeetingTranscriptFileService meetingTranscriptFileService; + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Map meetingLocks = new ConcurrentHashMap<>(); + + @Autowired + public RealtimeMeetingTranscriptCacheServiceImpl(RealtimeMeetingTranscriptCache transcriptCache, + RealtimeMeetingSessionStateService realtimeMeetingSessionStateService, + MeetingTranscriptMapper transcriptMapper, + MeetingTranscriptFileService meetingTranscriptFileService) { + this.transcriptCache = transcriptCache; + this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService; + this.transcriptMapper = transcriptMapper; + this.meetingTranscriptFileService = meetingTranscriptFileService; + } + + @Override + public void mergeUpstreamMessage(Long meetingId, String payload) { + if (meetingId == null || payload == null || payload.isBlank()) { + return; + } + synchronized (lockForMeeting(meetingId)) { + RealtimeMeetingTranscriptCacheState state = getOrCreateState(meetingId); + try { + JsonNode root = objectMapper.readTree(payload); + String type = root.path("type").asText(""); + if (!"sentences".equals(type) && !"end".equals(type)) { + return; + } + JsonNode sentences = root.path("sentences"); + if (!sentences.isArray()) { + return; + } + for (JsonNode sentence : sentences) { + RealtimeMeetingTranscriptCacheItem item = mergeSentenceNode(state, sentence, "end".equals(type)); + persistFinalSentence(meetingId, item); + } + state.setUpdatedAt(System.currentTimeMillis()); + transcriptCache.saveState(state); + realtimeMeetingSessionStateService.refreshAfterTranscriptCapture(meetingId, countNonEmptyItems(state)); + } catch (Exception ignored) { + // ignore malformed upstream payload + } + } + } + + @Override + public List listOrderedItems(Long meetingId) { + RealtimeMeetingTranscriptCacheState state = transcriptCache.getState(meetingId); + if (state == null || state.getItems() == null || state.getItems().isEmpty()) { + return List.of(); + } + return state.getItems().stream() + .filter(item -> item.getContent() != null && !item.getContent().isBlank()) + .sorted(Comparator.comparing(item -> item.getSortOrder() == null ? Integer.MAX_VALUE : item.getSortOrder())) + .toList(); + } + + @Override + public void clear(Long meetingId) { + transcriptCache.clear(meetingId); + } + + private RealtimeMeetingTranscriptCacheItem mergeSentenceNode(RealtimeMeetingTranscriptCacheState state, + JsonNode sentence, + boolean fromEndMessage) { + String text = sentence.path("sentence").asText("").trim(); + if (text.isEmpty()) { + return null; + } + Integer sentenceId = sentence.has("sentence_id") && sentence.get("sentence_id").canConvertToInt() + ? sentence.get("sentence_id").asInt() + : null; + String sentenceKey = sentenceId == null ? "sentence-" + nextLegacySequence(state) : "sentence-" + sentenceId; + RealtimeMeetingTranscriptCacheItem item = findBySentenceKey(state, sentenceKey); + long now = System.currentTimeMillis(); + if (item == null) { + item = new RealtimeMeetingTranscriptCacheItem(); + item.setSentenceKey(sentenceKey); + item.setSentenceId(sentenceId); + item.setSortOrder(nextSortOrder(state)); + item.setFirstReceivedAt(now); + state.getItems().add(item); + } + item.setSentenceType(readInteger(sentence, "sentence_type")); + item.setSpeakerId(resolveSpeakerId(sentence)); + item.setSpeakerName(readText(sentence, "speaker_name")); + item.setUserId(readText(sentence, "user_id")); + item.setStartTime(readTimeMilliseconds(sentence, "start_time", "start")); + item.setEndTime(readTimeMilliseconds(sentence, "end_time", "end")); + item.setContent(text); + item.setFinalResult(fromEndMessage || Objects.equals(item.getSentenceType(), 1)); + item.setUpdatedAt(now); + return item; + } + + private void persistFinalSentence(Long meetingId, RealtimeMeetingTranscriptCacheItem item) { + if (meetingId == null || item == null || !Boolean.TRUE.equals(item.getFinalResult())) { + return; + } + String content = item.getContent() == null ? null : item.getContent().trim(); + if (content == null || content.isBlank()) { + return; + } + String speakerId = resolveSpeakerId(item); + String speakerName = resolveSpeakerName(item); + if (item.getTranscriptId() != null) { + transcriptMapper.update(null, new LambdaUpdateWrapper() + .eq(MeetingTranscript::getId, item.getTranscriptId()) + .set(MeetingTranscript::getSpeakerId, speakerId) + .set(MeetingTranscript::getSpeakerName, speakerName) + .set(MeetingTranscript::getContent, content) + .set(item.getStartTime() != null, MeetingTranscript::getStartTime, item.getStartTime()) + .set(item.getEndTime() != null, MeetingTranscript::getEndTime, item.getEndTime())); + meetingTranscriptFileService.initializeTranscriptFileIfAbsent(meetingId); + return; + } + + MeetingTranscript transcript = new MeetingTranscript(); + transcript.setMeetingId(meetingId); + transcript.setSpeakerId(speakerId); + transcript.setSpeakerName(speakerName); + transcript.setContent(content); + transcript.setStartTime(item.getStartTime()); + transcript.setEndTime(item.getEndTime()); + transcript.setSortOrder(nextPersistedSortOrder(meetingId)); + transcriptMapper.insert(transcript); + item.setTranscriptId(transcript.getId()); + meetingTranscriptFileService.initializeTranscriptFileIfAbsent(meetingId); + } + + private int nextPersistedSortOrder(Long meetingId) { + Integer maxSortOrder = transcriptMapper.selectList(new LambdaQueryWrapper() + .eq(MeetingTranscript::getMeetingId, meetingId) + .orderByDesc(MeetingTranscript::getSortOrder) + .last("LIMIT 1")) + .stream() + .findFirst() + .map(MeetingTranscript::getSortOrder) + .orElse(0); + return maxSortOrder == null ? 0 : maxSortOrder + 1; + } + + private RealtimeMeetingTranscriptCacheItem findBySentenceKey(RealtimeMeetingTranscriptCacheState state, String sentenceKey) { + if (state.getItems() == null || state.getItems().isEmpty()) { + return null; + } + return state.getItems().stream() + .filter(item -> sentenceKey.equals(item.getSentenceKey())) + .findFirst() + .orElse(null); + } + + private String resolveSpeakerId(RealtimeMeetingTranscriptCacheItem item) { + if (item == null) { + return null; + } + if (item.getUserId() != null && !item.getUserId().isBlank()) { + return item.getUserId().trim(); + } + if (item.getSpeakerId() == null || item.getSpeakerId().isBlank() || "-1".equals(item.getSpeakerId().trim())) { + return null; + } + return item.getSpeakerId().trim(); + } + + private String resolveSpeakerName(RealtimeMeetingTranscriptCacheItem item) { + if (item == null) { + return null; + } + if (item.getSpeakerName() != null && !item.getSpeakerName().isBlank()) { + return item.getSpeakerName().trim(); + } + String speakerId = resolveSpeakerId(item); + return speakerId == null || speakerId.isBlank() ? null : speakerId; + } + + private String resolveSpeakerId(JsonNode sentence) { + String userId = readText(sentence, "user_id"); + if (userId != null && !userId.isBlank()) { + return userId; + } + return readText(sentence, "speaker_id"); + } + + private Integer readInteger(JsonNode node, String fieldName) { + if (node == null || !node.has(fieldName) || !node.get(fieldName).canConvertToInt()) { + return null; + } + return node.get(fieldName).asInt(); + } + + private Integer readTimeMilliseconds(JsonNode node, String primaryField, String fallbackField) { + if (node == null) { + return null; + } + if (node.has(primaryField) && node.get(primaryField).canConvertToInt()) { + return node.get(primaryField).asInt(); + } + if (node.has(fallbackField) && node.get(fallbackField).isNumber()) { + return Math.round((float) (node.get(fallbackField).asDouble() * 1000)); + } + return null; + } + + private String readText(JsonNode node, String fieldName) { + if (node == null || !node.has(fieldName) || node.get(fieldName).isNull()) { + return null; + } + String value = node.get(fieldName).asText(""); + return value == null ? null : value.trim(); + } + + private long countNonEmptyItems(RealtimeMeetingTranscriptCacheState state) { + if (state.getItems() == null || state.getItems().isEmpty()) { + return 0L; + } + return state.getItems().stream() + .filter(item -> item.getContent() != null && !item.getContent().isBlank()) + .count(); + } + + private int nextSortOrder(RealtimeMeetingTranscriptCacheState state) { + Integer current = state.getNextSortOrder(); + int next = current == null ? 0 : current; + state.setNextSortOrder(next + 1); + return next; + } + + private int nextLegacySequence(RealtimeMeetingTranscriptCacheState state) { + Integer current = state.getNextLegacySequence(); + int next = current == null ? 0 : current; + state.setNextLegacySequence(next + 1); + return next; + } + + private RealtimeMeetingTranscriptCacheState getOrCreateState(Long meetingId) { + RealtimeMeetingTranscriptCacheState state = transcriptCache.getState(meetingId); + if (state != null) { + if (state.getItems() == null) { + state.setItems(new ArrayList<>()); + } + return state; + } + RealtimeMeetingTranscriptCacheState next = new RealtimeMeetingTranscriptCacheState(); + next.setMeetingId(meetingId); + next.setItems(new ArrayList<>()); + next.setNextSortOrder(0); + next.setNextLegacySequence(0); + next.setUpdatedAt(System.currentTimeMillis()); + return next; + } + + private Object lockForMeeting(Long meetingId) { + return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object()); + } +} diff --git a/backend/src/main/java/com/imeeting/support/redis/RealtimeMeetingTranscriptCache.java b/backend/src/main/java/com/imeeting/support/redis/RealtimeMeetingTranscriptCache.java new file mode 100644 index 0000000..f432915 --- /dev/null +++ b/backend/src/main/java/com/imeeting/support/redis/RealtimeMeetingTranscriptCache.java @@ -0,0 +1,39 @@ +package com.imeeting.support.redis; + +import com.imeeting.common.RedisKeys; +import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheState; +import com.imeeting.support.RedisSupport; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.time.Duration; + +@Component +@RequiredArgsConstructor +public class RealtimeMeetingTranscriptCache { + + private static final Duration CACHE_TTL = Duration.ofHours(12); + + private final RedisSupport redisSupport; + + public RealtimeMeetingTranscriptCacheState getState(Long meetingId) { + if (meetingId == null) { + return null; + } + return redisSupport.getJsonQuietly(RedisKeys.realtimeMeetingTranscriptCacheKey(meetingId), RealtimeMeetingTranscriptCacheState.class); + } + + public void saveState(RealtimeMeetingTranscriptCacheState state) { + if (state == null || state.getMeetingId() == null) { + return; + } + redisSupport.setJson(RedisKeys.realtimeMeetingTranscriptCacheKey(state.getMeetingId()), state, CACHE_TTL); + } + + public void clear(Long meetingId) { + if (meetingId == null) { + return; + } + redisSupport.deleteQuietly(RedisKeys.realtimeMeetingTranscriptCacheKey(meetingId)); + } +} diff --git a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java index 0ffee12..58f8e3d 100644 --- a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java +++ b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java @@ -1,173 +1,133 @@ package com.imeeting.websocket; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheItem; import com.imeeting.dto.biz.RealtimeSocketSessionData; import com.imeeting.service.biz.RealtimeMeetingSessionStateService; import com.imeeting.service.biz.RealtimeMeetingSocketSessionService; +import com.imeeting.service.realtime.RealtimeAsrChannel; +import com.imeeting.service.realtime.RealtimeAsrChannelCallback; +import com.imeeting.service.realtime.RealtimeAsrChannelContext; +import com.imeeting.service.realtime.RealtimeAsrChannelFactory; import com.imeeting.service.realtime.RealtimeMeetingAudioStorageService; +import com.imeeting.service.realtime.RealtimeMeetingTranscriptCacheService; +import com.imeeting.service.realtime.impl.LocalRealtimeAsrChannel; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.CloseStatus; -import org.springframework.web.socket.PingMessage; import org.springframework.web.socket.PongMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; -import java.io.ByteArrayOutputStream; import java.net.URI; import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; @Slf4j @Component @RequiredArgsConstructor public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandler { - private static final String ATTR_FRONTEND_SESSION = "frontendSession"; - private static final String ATTR_UPSTREAM_SOCKET = "upstreamSocket"; private static final String ATTR_MEETING_ID = "meetingId"; private static final String ATTR_TARGET_WS_URL = "targetWsUrl"; + private static final String ATTR_PROVIDER = "provider"; private static final String ATTR_FRONTEND_TEXT_COUNT = "frontendTextCount"; private static final String ATTR_FRONTEND_BINARY_COUNT = "frontendBinaryCount"; - private static final String ATTR_UPSTREAM_SEND_CHAIN = "upstreamSendChain"; - private static final String ATTR_START_MESSAGE_SENT = "startMessageSent"; - private static final String ATTR_PENDING_AUDIO_FRAMES = "pendingAudioFrames"; - private static final CompletableFuture COMPLETED = CompletableFuture.completedFuture(null); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService; private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService; + private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService; + private final RealtimeAsrChannelFactory realtimeAsrChannelFactory; + private final ConcurrentMap meetingSessions = new ConcurrentHashMap<>(); + private final ConcurrentMap meetingLocks = new ConcurrentHashMap<>(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String sessionToken = extractQueryParam(session.getUri(), "sessionToken"); RealtimeSocketSessionData sessionData = realtimeMeetingSocketSessionService.getSessionData(sessionToken); if (sessionData == null) { - log.warn("Realtime websocket rejected: invalid session token, sessionId={}", session.getId()); + log.warn("实时会议 websocket 拒绝连接:会话令牌无效,sessionId={}", session.getId()); session.close(CloseStatus.POLICY_VIOLATION.withReason("实时 Socket 会话无效")); return; } ConcurrentWebSocketSessionDecorator frontendSession = new ConcurrentWebSocketSessionDecorator(session, (int) Duration.ofSeconds(15).toMillis(), 1024 * 1024); - session.getAttributes().put(ATTR_FRONTEND_SESSION, frontendSession); session.getAttributes().put(ATTR_MEETING_ID, sessionData.getMeetingId()); session.getAttributes().put(ATTR_TARGET_WS_URL, sessionData.getTargetWsUrl()); + session.getAttributes().put(ATTR_PROVIDER, sessionData.getProvider()); session.getAttributes().put(ATTR_FRONTEND_TEXT_COUNT, new AtomicInteger()); session.getAttributes().put(ATTR_FRONTEND_BINARY_COUNT, new AtomicInteger()); - session.getAttributes().put(ATTR_UPSTREAM_SEND_CHAIN, COMPLETED); - session.getAttributes().put(ATTR_START_MESSAGE_SENT, Boolean.FALSE); - session.getAttributes().put(ATTR_PENDING_AUDIO_FRAMES, new ArrayList()); realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), session.getId()); - log.info("Realtime websocket accepted: meetingId={}, sessionId={}, upstream={}", - sessionData.getMeetingId(), session.getId(), sessionData.getTargetWsUrl()); + log.info("实时会议 websocket 已接入:meetingId={}, sessionId={}, provider={}, upstream={}", + sessionData.getMeetingId(), session.getId(), sessionData.getProvider(), sessionData.getTargetWsUrl()); - java.net.http.WebSocket upstreamSocket; - try { - upstreamSocket = java.net.http.HttpClient.newHttpClient() - .newWebSocketBuilder() - .buildAsync(URI.create(sessionData.getTargetWsUrl()), - new UpstreamListener( - frontendSession, - session, - sessionData.getMeetingId(), - sessionData.getTargetWsUrl(), - realtimeMeetingSessionStateService - )) - .get(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - log.error("Realtime websocket upstream connect interrupted: meetingId={}, sessionId={}", - sessionData.getMeetingId(), session.getId(), ex); - sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接第三方识别服务时被中断"); - realtimeMeetingAudioStorageService.closeSession(session.getId()); - frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断")); - return; - } catch (ExecutionException | CompletionException ex) { - log.warn("Failed to connect upstream websocket, meetingId={}, target={}", sessionData.getMeetingId(), sessionData.getTargetWsUrl(), ex); - sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接第三方识别服务失败,请检查模型 WebSocket 配置或服务状态"); - realtimeMeetingAudioStorageService.closeSession(session.getId()); - frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败")); - return; - } - - session.getAttributes().put(ATTR_UPSTREAM_SOCKET, upstreamSocket); + attachFrontendSession(sessionData, session, frontendSession); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { - java.net.http.WebSocket upstreamSocket = getUpstreamSocket(session); - if (upstreamSocket == null) { - log.warn("Frontend text ignored because upstream socket is unavailable, meetingId={}, sessionId={}", + MeetingChannelSession meetingSession = getMeetingSession(session); + if (meetingSession == null || !meetingSession.isChannelOpen()) { + log.warn("前端文本消息已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId()); return; } int count = nextCount(session, ATTR_FRONTEND_TEXT_COUNT); - log.info("Frontend text -> upstream: meetingId={}, sessionId={}, count={}, payload={}", - session.getAttributes().get(ATTR_MEETING_ID), session.getId(), count, summarizeText(message.getPayload())); - sendUpstreamOrdered(session, () -> upstreamSocket.sendText(message.getPayload(), true), "text"); - if (looksLikeStartMessage(message.getPayload())) { - session.getAttributes().put(ATTR_START_MESSAGE_SENT, Boolean.TRUE); - flushPendingAudioFrames(session, upstreamSocket); - } + String payload = message.getPayload(); + log.info("前端文本 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, payload={}", + session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, summarizeText(payload)); + meetingSession.channel.handleFrontendText(meetingSession.context, payload); } @Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) { - java.net.http.WebSocket upstreamSocket = getUpstreamSocket(session); - if (upstreamSocket == null) { - log.warn("Frontend binary ignored because upstream socket is unavailable, meetingId={}, sessionId={}", + MeetingChannelSession meetingSession = getMeetingSession(session); + if (meetingSession == null || !meetingSession.isChannelOpen()) { + log.warn("前端音频帧已忽略:上游 ASR 连接不可用,meetingId={}, sessionId={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId()); return; } int count = nextCount(session, ATTR_FRONTEND_BINARY_COUNT); int bytes = message.getPayloadLength(); if (shouldLogBinaryFrame(count)) { - log.info("Frontend binary -> upstream: meetingId={}, sessionId={}, count={}, bytes={}", - session.getAttributes().get(ATTR_MEETING_ID), session.getId(), count, bytes); + log.info("前端音频帧 -> ASR 渠道:meetingId={}, sessionId={}, provider={}, count={}, bytes={}", + session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, bytes); } byte[] payload = toByteArray(message.getPayload()); realtimeMeetingAudioStorageService.append(session.getId(), payload); - if (!Boolean.TRUE.equals(session.getAttributes().get(ATTR_START_MESSAGE_SENT))) { - queuePendingAudioFrame(session, payload); - if (shouldLogBinaryFrame(count)) { - log.warn("Frontend binary queued before start message: meetingId={}, sessionId={}, count={}, bytes={}", - session.getAttributes().get(ATTR_MEETING_ID), session.getId(), count, bytes); - } - return; - } - sendUpstreamOrdered(session, () -> upstreamSocket.sendBinary(ByteBuffer.wrap(payload), true), "binary"); + meetingSession.channel.handleFrontendBinary(meetingSession.context, payload); } @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) { - java.net.http.WebSocket upstreamSocket = getUpstreamSocket(session); - if (upstreamSocket == null) { + if (getMeetingSession(session) == null) { return; } - sendUpstreamOrdered(session, () -> upstreamSocket.sendPong(copyBuffer(message.getPayload())), "pong"); + log.debug("前端 pong 已在本地忽略:meetingId={}, sessionId={}, bytes={}", + session.getAttributes().get(ATTR_MEETING_ID), session.getId(), message.getPayloadLength()); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - log.error("Realtime websocket transport error: meetingId={}, sessionId={}, upstream={}", + log.error("实时会议 websocket 传输异常:meetingId={}, sessionId={}, upstream={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_TARGET_WS_URL), exception); - closeUpstreamSocket(session, CloseStatus.SERVER_ERROR); + detachFrontend(session); + realtimeMeetingAudioStorageService.closeSession(session.getId()); if (session.isOpen()) { session.close(CloseStatus.SERVER_ERROR); } @@ -175,32 +135,149 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { - log.info("Realtime websocket closed: meetingId={}, sessionId={}, code={}, reason={}", + log.info("实时会议 websocket 已关闭:meetingId={}, sessionId={}, code={}, reason={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId(), status.getCode(), status.getReason()); Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); if (meetingIdValue instanceof Long meetingId) { + detachFrontend(meetingId, session.getId()); realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, session.getId()); } realtimeMeetingAudioStorageService.closeSession(session.getId()); - closeUpstreamSocket(session, status); } - private java.net.http.WebSocket getUpstreamSocket(WebSocketSession session) { - Object socket = session.getAttributes().get(ATTR_UPSTREAM_SOCKET); - if (socket instanceof java.net.http.WebSocket webSocket) { - return webSocket; + public void closeMeetingSession(Long meetingId) { + if (meetingId == null) { + return; + } + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null || meetingSession.channel == null) { + return; + } + meetingSession.channel.closeMeeting(meetingSession.context); + } + + private void attachFrontendSession(RealtimeSocketSessionData sessionData, + WebSocketSession rawSession, + ConcurrentWebSocketSessionDecorator frontendSession) throws Exception { + Long meetingId = sessionData.getMeetingId(); + MeetingChannelSession meetingSession; + boolean reused = false; + synchronized (lockForMeeting(meetingId)) { + meetingSession = meetingSessions.get(meetingId); + if (meetingSession != null && meetingSession.isChannelOpen()) { + meetingSession.clearFrontendIfClosed(); + if (meetingSession.hasOpenFrontend()) { + sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃前端连接"); + frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("已存在活跃的前端连接")); + realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); + return; } - return null; + if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) { + sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_REJECTED", "当前状态下无法继续会议"); + frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("当前状态下无法继续会议")); + realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); + return; + } + meetingSession.bindFrontend(rawSession, frontendSession); + reused = true; + } else { + RealtimeAsrChannel channel = realtimeAsrChannelFactory.getRequired(sessionData.getProvider()); + RealtimeAsrChannelContext context = new RealtimeAsrChannelContext(); + context.setMeetingId(meetingId); + context.setProvider(realtimeAsrChannelFactory.normalizeProvider(sessionData.getProvider())); + context.setTargetWsUrl(sessionData.getTargetWsUrl()); + context.setCallback(new HandlerChannelCallback()); + context.bindFrontendSession(rawSession, frontendSession); + meetingSession = new MeetingChannelSession(meetingId, channel, context); + meetingSessions.put(meetingId, meetingSession); + } } - private void closeUpstreamSocket(WebSocketSession session, CloseStatus status) { - java.net.http.WebSocket upstreamSocket = getUpstreamSocket(session); - if (upstreamSocket != null) { - upstreamSocket.sendClose(status.getCode(), status.getReason() == null ? "" : status.getReason()); - session.getAttributes().remove(ATTR_UPSTREAM_SOCKET); + if (reused) { + sendProxyReady(frontendSession); + replayCachedMessages(meetingId, frontendSession); + return; + } + + try { + meetingSession.channel.connect(meetingSession.context); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + removeMeetingSession(meetingId, meetingSession); + log.error("连接上游 ASR websocket 时被中断:meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex); + sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接上游 ASR 服务时被中断"); + realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); + frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断")); + } catch (Exception ex) { + removeMeetingSession(meetingId, meetingSession); + log.warn("连接上游 ASR websocket 失败:meetingId={}, provider={}, target={}", + meetingId, sessionData.getProvider(), sessionData.getTargetWsUrl(), ex); + sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接上游 ASR 服务失败"); + realtimeMeetingAudioStorageService.closeSession(rawSession.getId()); + frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败")); + } + } + + private void replayCachedMessages(Long meetingId, ConcurrentWebSocketSessionDecorator frontendSession) { + try { + if (!frontendSession.isOpen()) { + return; + } + for (RealtimeMeetingTranscriptCacheItem item : realtimeMeetingTranscriptCacheService.listOrderedItems(meetingId)) { + frontendSession.sendMessage(new TextMessage(LocalRealtimeAsrChannel.buildFrontendTranscriptMessage(item))); + } + } catch (Exception ex) { + log.warn("回放缓存转写消息失败:meetingId={}", meetingId, ex); + } + } + + private void sendProxyReady(ConcurrentWebSocketSessionDecorator frontendSession) throws Exception { + if (frontendSession.isOpen()) { + frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}")); + } + } + + private void detachFrontend(WebSocketSession session) { + Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); + if (meetingIdValue instanceof Long meetingId) { + detachFrontend(meetingId, session.getId()); + } + } + + private void detachFrontend(Long meetingId, String sessionId) { + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { + return; + } + synchronized (lockForMeeting(meetingId)) { + meetingSession.detachFrontend(sessionId); + } + } + + private MeetingChannelSession getMeetingSession(WebSocketSession session) { + Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); + if (!(meetingIdValue instanceof Long meetingId)) { + return null; + } + return meetingSessions.get(meetingId); + } + + void removeMeetingSession(Long meetingId) { + synchronized (lockForMeeting(meetingId)) { + meetingSessions.remove(meetingId); + } + } + + void removeMeetingSession(Long meetingId, MeetingChannelSession meetingSession) { + synchronized (lockForMeeting(meetingId)) { + meetingSessions.remove(meetingId, meetingSession); } } + private Object lockForMeeting(Long meetingId) { + return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object()); + } + private String extractQueryParam(URI uri, String key) { if (uri == null || uri.getQuery() == null || uri.getQuery().isBlank()) { return null; @@ -213,13 +290,6 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl .orElse(null); } - private ByteBuffer copyBuffer(ByteBuffer source) { - ByteBuffer duplicate = source.asReadOnlyBuffer(); - byte[] bytes = new byte[duplicate.remaining()]; - duplicate.get(bytes); - return ByteBuffer.wrap(bytes); - } - private byte[] toByteArray(ByteBuffer source) { ByteBuffer duplicate = source.asReadOnlyBuffer(); byte[] bytes = new byte[duplicate.remaining()]; @@ -235,21 +305,18 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl return 0; } - @SuppressWarnings("unchecked") - private void sendUpstreamOrdered(WebSocketSession session, Supplier> sendAction, String messageType) { - synchronized (session) { - CompletableFuture chain = (CompletableFuture) session.getAttributes() - .getOrDefault(ATTR_UPSTREAM_SEND_CHAIN, COMPLETED); - CompletableFuture nextChain = chain - .exceptionally(ex -> null) - .thenCompose(ignored -> sendAction.get().thenApply(ignoredResult -> null)); - nextChain = nextChain.whenComplete((ignored, ex) -> { - if (ex != null) { - log.error("Ordered upstream send failed: meetingId={}, sessionId={}, type={}", - session.getAttributes().get(ATTR_MEETING_ID), session.getId(), messageType, ex); - } - }); - session.getAttributes().put(ATTR_UPSTREAM_SEND_CHAIN, nextChain); + private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) { + try { + if (!frontendSession.isOpen()) { + return; + } + Map payload = new HashMap<>(); + payload.put("type", "error"); + payload.put("code", code); + payload.put("message", message); + frontendSession.sendMessage(new TextMessage(OBJECT_MAPPER.writeValueAsString(payload))); + } catch (Exception ex) { + log.warn("向前端发送实时代理错误消息失败:code={}", code, ex); } } @@ -268,237 +335,113 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl return normalized.substring(0, 240) + "..."; } - private boolean looksLikeStartMessage(String payload) { - if (payload == null || payload.isBlank()) { - return false; - } - String normalized = payload.replaceAll("\\s+", ""); - return normalized.contains("\"type\":\"start\""); - } - - private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) { - try { - if (!frontendSession.isOpen()) { - return; - } - Map payload = new HashMap<>(); - payload.put("type", "error"); - payload.put("code", code); - payload.put("message", message); - frontendSession.sendMessage(new TextMessage(new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(payload))); - } catch (Exception ex) { - log.warn("Failed to send realtime proxy error to frontend: code={}", code, ex); - } - } - - @SuppressWarnings("unchecked") - private void queuePendingAudioFrame(WebSocketSession session, byte[] payload) { - synchronized (session) { - List pendingFrames = (List) session.getAttributes().get(ATTR_PENDING_AUDIO_FRAMES); - if (pendingFrames == null) { - pendingFrames = new ArrayList<>(); - session.getAttributes().put(ATTR_PENDING_AUDIO_FRAMES, pendingFrames); - } - pendingFrames.add(payload); - } - } - - @SuppressWarnings("unchecked") - private void flushPendingAudioFrames(WebSocketSession session, java.net.http.WebSocket upstreamSocket) { - List pendingFrames; - synchronized (session) { - pendingFrames = (List) session.getAttributes().get(ATTR_PENDING_AUDIO_FRAMES); - if (pendingFrames == null || pendingFrames.isEmpty()) { - return; - } - session.getAttributes().put(ATTR_PENDING_AUDIO_FRAMES, new ArrayList()); - } - log.info("Flushing queued audio frames after start message: meetingId={}, sessionId={}, frameCount={}", - session.getAttributes().get(ATTR_MEETING_ID), session.getId(), pendingFrames.size()); - for (byte[] frame : pendingFrames) { - sendUpstreamOrdered(session, () -> upstreamSocket.sendBinary(ByteBuffer.wrap(frame), true), "binary-flush"); - } - } - - private static final class UpstreamListener implements java.net.http.WebSocket.Listener { - private final ConcurrentWebSocketSessionDecorator frontendSession; - private final WebSocketSession rawSession; + static final class MeetingChannelSession { private final Long meetingId; - private final String targetWsUrl; - private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; - private final StringBuilder textBuffer = new StringBuilder(); - private final ByteArrayOutputStream binaryBuffer = new ByteArrayOutputStream(); - private final AtomicInteger upstreamTextCount = new AtomicInteger(); - private final AtomicInteger upstreamBinaryCount = new AtomicInteger(); + private final RealtimeAsrChannel channel; + private final RealtimeAsrChannelContext context; - private UpstreamListener(ConcurrentWebSocketSessionDecorator frontendSession, WebSocketSession rawSession, - Long meetingId, String targetWsUrl, - RealtimeMeetingSessionStateService realtimeMeetingSessionStateService) { - this.frontendSession = frontendSession; - this.rawSession = rawSession; + private MeetingChannelSession(Long meetingId, RealtimeAsrChannel channel, RealtimeAsrChannelContext context) { this.meetingId = meetingId; - this.targetWsUrl = targetWsUrl; - this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService; + this.channel = channel; + this.context = context; + } + + private void bindFrontend(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) { + context.bindFrontendSession(rawSession, frontendSession); } + private void detachFrontend(String sessionId) { + if (context.getRawSession() != null && context.getRawSession().getId().equals(sessionId)) { + context.bindFrontendSession(null, null); + } + } + + private void clearFrontendIfClosed() { + if (context.getRawSession() != null && !context.getRawSession().isOpen()) { + context.bindFrontendSession(null, null); + } + } + + private boolean hasOpenFrontend() { + return context.getFrontendSession() != null + && context.getFrontendSession().isOpen() + && context.getRawSession() != null + && context.getRawSession().isOpen(); + } + + private boolean isChannelOpen() { + return channel != null && channel.isOpen(context); + } + } + + private final class HandlerChannelCallback implements RealtimeAsrChannelCallback { @Override - public void onOpen(java.net.http.WebSocket webSocket) { - log.info("Upstream websocket opened: meetingId={}, sessionId={}, upstream={}", - meetingId, rawSession.getId(), targetWsUrl); - if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) { - sendFrontendError("REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃实时连接,请先关闭旧连接后再继续"); - webSocket.sendClose(CloseStatus.POLICY_VIOLATION.getCode(), "Active realtime connection already exists"); - closeFrontend(CloseStatus.POLICY_VIOLATION.withReason("已存在活动的实时连接")); + public void onChannelOpen(Long meetingId) throws Exception { + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { return; } - try { - if (frontendSession.isOpen()) { - frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}")); - } - } catch (Exception ex) { - log.error("Failed to notify frontend that upstream websocket is ready: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex); - closeFrontend(CloseStatus.SERVER_ERROR); + ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); + if (frontendSession != null && frontendSession.isOpen()) { + sendProxyReady(frontendSession); + } + } + + @Override + public void sendFrontendText(Long meetingId, String payload) throws Exception { + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { return; } - webSocket.request(1); + ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); + if (frontendSession != null && frontendSession.isOpen()) { + frontendSession.sendMessage(new TextMessage(payload)); + } } @Override - public java.util.concurrent.CompletionStage onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) { - textBuffer.append(data); - if (last) { - int count = upstreamTextCount.incrementAndGet(); - try { - if (frontendSession.isOpen()) { - frontendSession.sendMessage(new TextMessage(textBuffer.toString())); - } - log.info("Upstream text -> frontend: meetingId={}, sessionId={}, count={}, payload={}", - meetingId, rawSession.getId(), count, summarizeText(textBuffer.toString())); - } catch (Exception ex) { - log.error("Failed to forward upstream text: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex); - closeFrontend(CloseStatus.SERVER_ERROR); - } finally { - textBuffer.setLength(0); - } - } - webSocket.request(1); - return COMPLETED; + public void sendFrontendBinary(Long meetingId, byte[] payload) throws Exception { + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { + return; + } + ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); + if (frontendSession != null && frontendSession.isOpen()) { + frontendSession.sendMessage(new BinaryMessage(payload)); + } } @Override - public java.util.concurrent.CompletionStage onBinary(java.net.http.WebSocket webSocket, ByteBuffer data, boolean last) { - byte[] chunk = new byte[data.remaining()]; - data.get(chunk); - binaryBuffer.writeBytes(chunk); - if (last) { - int count = upstreamBinaryCount.incrementAndGet(); - try { - if (frontendSession.isOpen()) { - frontendSession.sendMessage(new BinaryMessage(binaryBuffer.toByteArray())); - } - if (shouldLogBinaryFrame(count)) { - log.info("Upstream binary -> frontend: meetingId={}, sessionId={}, count={}, bytes={}", - meetingId, rawSession.getId(), count, binaryBuffer.size()); - } - } catch (Exception ex) { - log.error("Failed to forward upstream binary: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex); - closeFrontend(CloseStatus.SERVER_ERROR); - } finally { - binaryBuffer.reset(); - } - } - webSocket.request(1); - return COMPLETED; + public void sendFrontendError(Long meetingId, String code, String message) { + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { + return; + } + ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession(); + if (frontendSession != null) { + RealtimeMeetingProxyWebSocketHandler.this.sendFrontendError(frontendSession, code, message); + } } @Override - public java.util.concurrent.CompletionStage onPing(java.net.http.WebSocket webSocket, ByteBuffer message) { + public void removeMeetingSession(Long meetingId) { + RealtimeMeetingProxyWebSocketHandler.this.removeMeetingSession(meetingId); + } + + @Override + public void closeFrontend(Long meetingId, CloseStatus status) { + MeetingChannelSession meetingSession = meetingSessions.get(meetingId); + if (meetingSession == null) { + return; + } try { - if (frontendSession.isOpen()) { - frontendSession.sendMessage(new PingMessage(copyBuffer(message))); - } - log.info("Upstream ping -> frontend: meetingId={}, sessionId={}, bytes={}", - meetingId, rawSession.getId(), message.remaining()); - } catch (Exception ex) { - log.error("Failed to forward upstream ping: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex); - closeFrontend(CloseStatus.SERVER_ERROR); - } - webSocket.request(1); - return COMPLETED; - } - - @Override - public java.util.concurrent.CompletionStage onPong(java.net.http.WebSocket webSocket, ByteBuffer message) { - try { - if (frontendSession.isOpen()) { - frontendSession.sendMessage(new PongMessage(copyBuffer(message))); - } - log.info("Upstream pong -> frontend: meetingId={}, sessionId={}, bytes={}", - meetingId, rawSession.getId(), message.remaining()); - } catch (Exception ex) { - log.error("Failed to forward upstream pong: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex); - closeFrontend(CloseStatus.SERVER_ERROR); - } - webSocket.request(1); - return COMPLETED; - } - - @Override - public java.util.concurrent.CompletionStage onClose(java.net.http.WebSocket webSocket, int statusCode, String reason) { - log.info("Upstream websocket closed: meetingId={}, sessionId={}, code={}, reason={}", - meetingId, rawSession.getId(), statusCode, reason); - sendFrontendError("REALTIME_UPSTREAM_CLOSED", reason == null || reason.isBlank() ? "第三方识别服务已断开连接" : "第三方识别服务已断开: " + reason); - closeFrontend(new CloseStatus(statusCode, reason)); - return COMPLETED; - } - - @Override - public void onError(java.net.http.WebSocket webSocket, Throwable error) { - log.error("Upstream websocket error: meetingId={}, sessionId={}, upstream={}", - meetingId, rawSession.getId(), targetWsUrl, error); - sendFrontendError("REALTIME_UPSTREAM_ERROR", error == null || error.getMessage() == null || error.getMessage().isBlank() - ? "第三方识别服务连接异常" - : "第三方识别服务连接异常: " + error.getMessage()); - closeFrontend(CloseStatus.SERVER_ERROR); - } - - private void sendFrontendError(String code, String message) { - try { - if (!frontendSession.isOpen()) { - return; - } - frontendSession.sendMessage(new TextMessage("{\"type\":\"error\",\"code\":\"" + code + "\",\"message\":\"" + escapeJson(message) + "\"}")); - } catch (Exception ex) { - log.warn("Failed to send upstream error to frontend: meetingId={}, sessionId={}, code={}", meetingId, rawSession.getId(), code, ex); - } - } - - private String escapeJson(String value) { - if (value == null) { - return ""; - } - return value - .replace("\\", "\\\\") - .replace("\"", "\\\"") - .replace("\r", "\\r") - .replace("\n", "\\n"); - } - - private void closeFrontend(CloseStatus status) { - try { - if (rawSession.isOpen()) { + WebSocketSession rawSession = meetingSession.context.getRawSession(); + if (rawSession != null && rawSession.isOpen()) { rawSession.close(status); } } catch (Exception ignored) { // ignore close failure } } - - private ByteBuffer copyBuffer(ByteBuffer source) { - ByteBuffer duplicate = source.asReadOnlyBuffer(); - byte[] bytes = new byte[duplicate.remaining()]; - duplicate.get(bytes); - return ByteBuffer.wrap(bytes); - } } } diff --git a/frontend/src/api/business/meeting.ts b/frontend/src/api/business/meeting.ts index 15bf03d..df798ec 100644 --- a/frontend/src/api/business/meeting.ts +++ b/frontend/src/api/business/meeting.ts @@ -249,12 +249,6 @@ export const createRealtimeMeeting = (data: CreateRealtimeMeetingCommand) => { ); }; -export const appendRealtimeTranscripts = (meetingId: number, data: RealtimeTranscriptItemDTO[]) => { - return http.post<{ code: string; data: boolean; msg: string }>( - `/api/biz/meeting/${meetingId}/realtime/transcripts`, - data - ); -}; export const getRealtimeMeetingSessionStatus = (meetingId: number) => { return http.get<{ code: string; data: RealtimeMeetingSessionStatus; msg: string }>( diff --git a/frontend/src/pages/business/RealtimeAsrSession.tsx b/frontend/src/pages/business/RealtimeAsrSession.tsx index abac51e..886d4e0 100644 --- a/frontend/src/pages/business/RealtimeAsrSession.tsx +++ b/frontend/src/pages/business/RealtimeAsrSession.tsx @@ -18,7 +18,6 @@ import dayjs from "dayjs"; import PageHeader from "../../components/shared/PageHeader"; import PageContainer from "@/components/shared/PageContainer"; import { - appendRealtimeTranscripts, completeRealtimeMeeting, getMeetingDetail, getRealtimeMeetingSessionStatus, @@ -28,7 +27,6 @@ import { type MeetingTranscriptVO, type MeetingVO, type RealtimeMeetingSessionStatus, - type RealtimeTranscriptItemDTO, type RealtimeSocketSessionVO, } from "../../api/business/meeting"; const { Text, Title } = Typography; @@ -44,6 +42,7 @@ type WsMessage = { data?: { text?: string; is_final?: boolean; + sentence_id?: number; start?: number; end?: number; speaker_id?: string; @@ -58,6 +57,7 @@ type WsMessage = { type TranscriptCard = { id: string; + sentenceId?: number; speakerName: string; userId?: string | number; text: string; @@ -66,6 +66,15 @@ type TranscriptCard = { final: boolean; }; +type NormalizedWsMessage = { + text: string; + isFinal: boolean; + sentenceId?: number; + speaker?: WsSpeaker; + startTime?: number; + endTime?: number; +}; + type RealtimeMeetingSessionDraft = { meetingId: number; meetingTitle: string; @@ -123,12 +132,18 @@ function resolveSpeaker(speaker?: WsSpeaker) { return { speakerId: "spk_0", speakerName: "Unknown", userId: undefined }; } if (typeof speaker === "string") { - return { speakerId: speaker, speakerName: speaker, userId: undefined }; + const normalized = speaker.trim(); + if (!normalized || normalized === "-1") { + return {speakerId: "spk_0", speakerName: "Unknown", userId: undefined}; + } + return {speakerId: normalized, speakerName: normalized, userId: undefined}; } + const rawUserId = speaker.user_id === null || speaker.user_id === undefined ? undefined : String(speaker.user_id).trim(); + const speakerId = rawUserId && rawUserId !== "-1" ? rawUserId : "spk_0"; return { - speakerId: speaker.user_id ? String(speaker.user_id) : "spk_0", - speakerName: speaker.name || (speaker.user_id ? String(speaker.user_id) : "Unknown"), - userId: speaker.user_id, + speakerId, + speakerName: speaker.name?.trim() || (speakerId !== "spk_0" ? speakerId : "Unknown"), + userId: speakerId === "spk_0" ? undefined : rawUserId, }; } @@ -159,17 +174,25 @@ function toMs(value?: number) { return Math.round(value * 1000); } +function buildTranscriptCardId(sentenceId?: number) { + if (sentenceId === undefined || sentenceId === null) { + return `live-${Date.now()}-${Math.random()}`; + } + return `sentence-${sentenceId}`; +} + function buildRealtimeProxyWsUrl(socketSession: RealtimeSocketSessionVO) { const protocol = window.location.protocol === "https:" ? "wss" : "ws"; return `${protocol}://${window.location.host}${socketSession.path}?sessionToken=${encodeURIComponent(socketSession.sessionToken)}`; } -function normalizeWsMessage(payload: WsMessage) { +function normalizeWsMessage(payload: WsMessage): NormalizedWsMessage | null { if (payload.type === "partial" || payload.type === "segment") { const data = payload.data || {}; return { text: data.text || "", isFinal: payload.type === "segment" || !!data.is_final, + sentenceId: data.sentence_id, speaker: { name: data.speaker_name, user_id: data.user_id ?? data.speaker_id, @@ -212,7 +235,6 @@ export function RealtimeAsrSession() { const [audioLevel, setAudioLevel] = useState(0); const [elapsedSeconds, setElapsedSeconds] = useState(0); const [sessionStatus, setSessionStatus] = useState(null); - const transcriptRef = useRef(null); const wsRef = useRef(null); const audioContextRef = useRef(null); @@ -223,6 +245,7 @@ export function RealtimeAsrSession() { const completeOnceRef = useRef(false); const startedAtRef = useRef(null); const sessionStartedRef = useRef(false); + const elapsedOffsetRef = useRef(0); const finalTranscriptCount = transcripts.length; const totalTranscriptChars = useMemo( @@ -281,7 +304,7 @@ export function RealtimeAsrSession() { } setTranscripts( (transcriptRes.data.data || []).map((item: MeetingTranscriptVO) => ({ - id: String(item.id), + id: `persisted-${item.id}`, speakerName: item.speakerName || item.speakerId || "发言人", text: item.content, startTime: item.startTime, @@ -301,12 +324,12 @@ export function RealtimeAsrSession() { useEffect(() => { if (!recording) { - setElapsedSeconds(0); + setElapsedSeconds(elapsedOffsetRef.current); return; } const timer = window.setInterval(() => { if (startedAtRef.current) { - setElapsedSeconds(Math.floor((Date.now() - startedAtRef.current) / 1000)); + setElapsedSeconds(elapsedOffsetRef.current + Math.floor((Date.now() - startedAtRef.current) / 1000)); } }, 1000); return () => window.clearInterval(timer); @@ -326,7 +349,7 @@ export function RealtimeAsrSession() { } const token = localStorage.getItem("accessToken"); if (wsRef.current?.readyState === WebSocket.OPEN) { - wsRef.current.send(JSON.stringify({ is_speaking: false })); + wsRef.current.close(); } fetch(`/api/biz/meeting/${meetingId}/realtime/pause`, { method: "POST", @@ -360,6 +383,43 @@ export function RealtimeAsrSession() { setAudioLevel(0); }; + const closeFrontendSocket = async (sendStop: boolean) => { + const socket = wsRef.current; + if (sendStop && socket?.readyState === WebSocket.OPEN) { + socket.send(JSON.stringify({type: "stop"})); + await new Promise((resolve) => window.setTimeout(resolve, 150)); + } + socket?.close(); + wsRef.current = null; + sessionStartedRef.current = false; + }; + + const upsertTranscriptCard = (normalized: NormalizedWsMessage, speaker: ReturnType) => { + setTranscripts((prev) => { + const next = [...prev]; + const cardId = buildTranscriptCardId(normalized.sentenceId); + const nextCard: TranscriptCard = { + id: cardId, + sentenceId: normalized.sentenceId, + speakerName: speaker.speakerName, + userId: speaker.userId, + text: normalized.text, + startTime: normalized.startTime, + endTime: normalized.endTime, + final: true, + }; + if (normalized.sentenceId !== undefined && normalized.sentenceId !== null) { + const index = next.findIndex((item) => item.id === cardId); + if (index >= 0) { + next[index] = {...next[index], ...nextCard}; + return next; + } + } + next.push(nextCard); + return next; + }); + }; + const handleFatalRealtimeError = async (errorMessage: string) => { setConnecting(false); setRecording(false); @@ -369,6 +429,8 @@ export function RealtimeAsrSession() { wsRef.current = null; await shutdownAudioPipeline(); startedAtRef.current = null; + elapsedOffsetRef.current = 0; + setElapsedSeconds(0); message.error(errorMessage); }; @@ -419,26 +481,6 @@ export function RealtimeAsrSession() { processor.connect(audioContext.destination); }; - const saveFinalTranscript = async (normalized: { - text: string; - speaker?: WsSpeaker; - startTime?: number; - endTime?: number; - }) => { - if (!normalized.text || !meetingId) { - return; - } - const speaker = resolveSpeaker(normalized.speaker); - const item: RealtimeTranscriptItemDTO = { - speakerId: speaker.speakerId, - speakerName: speaker.speakerName, - content: normalized.text, - startTime: normalized.startTime, - endTime: normalized.endTime, - }; - await appendRealtimeTranscripts(meetingId, [item]); - }; - const handlePause = async () => { if (!meetingId || pausing || finishing || (!recording && !connecting)) { return; @@ -451,20 +493,19 @@ export function RealtimeAsrSession() { setPausing(true); setStatusText("暂停识别中..."); try { - if (wsRef.current?.readyState === WebSocket.OPEN) { - wsRef.current.send(JSON.stringify({ is_speaking: false })); + if (recording && startedAtRef.current) { + elapsedOffsetRef.current += Math.floor((Date.now() - startedAtRef.current) / 1000); } - wsRef.current?.close(); - wsRef.current = null; - sessionStartedRef.current = false; + await closeFrontendSocket(false); await shutdownAudioPipeline(); const pauseRes = await pauseRealtimeMeeting(meetingId); setSessionStatus(pauseRes.data.data); setRecording(false); setConnecting(false); startedAtRef.current = null; - setStatusText(pauseRes.data.data?.hasTranscript ? "已暂停,可继续识别" : "已暂停,当前还没有转录内容"); - message.success("实时识别已暂停"); + setElapsedSeconds(elapsedOffsetRef.current); + setStatusText("已暂停,可继续会议并等待说话人修正"); + message.success("实时会议已暂停"); } catch (error) { setStatusText("暂停失败"); message.error(error instanceof Error ? error.message : "暂停实时识别失败"); @@ -547,21 +588,9 @@ export function RealtimeAsrSession() { const speaker = resolveSpeaker(normalized.speaker); if (normalized.isFinal) { - setTranscripts((prev) => [ - ...prev, - { - id: `${Date.now()}-${Math.random()}`, - speakerName: speaker.speakerName, - userId: speaker.userId, - text: normalized.text, - startTime: normalized.startTime, - endTime: normalized.endTime, - final: true, - }, - ]); + upsertTranscriptCard(normalized, speaker); setStreamingText(""); setStreamingSpeaker("Unknown"); - void saveFinalTranscript(normalized); } else { setStreamingText(normalized.text); setStreamingSpeaker(speaker.speakerName); @@ -602,12 +631,7 @@ export function RealtimeAsrSession() { setFinishing(true); setStatusText("结束会议中..."); - if (wsRef.current?.readyState === WebSocket.OPEN) { - wsRef.current.send(JSON.stringify({ is_speaking: false })); - } - wsRef.current?.close(); - wsRef.current = null; - sessionStartedRef.current = false; + await closeFrontendSocket(true); await shutdownAudioPipeline();