refactor: 使用 ModelProviderEnum 替代硬编码字符串并优化实时会议 WebSocket 处理逻辑

- 在 `AiModelServiceImpl`、`AiModelController` 和 `MeetingCommandServiceImpl` 中使用 `ModelProviderEnum` 替代硬编码的 "custom" 字符串
- 移除 `appendRealtimeTranscripts` 方法,并在 `RealtimeMeetingProxyWebSocketHandler` 中添加对 `RealtimeAsrChannel` 的支持
- 优化前端 `RealtimeAsrSession` 组件,移除不必要的方法并更新状态管理逻辑
- 添加 `RealtimeMeetingTranscriptCacheService` 和 `RealtimeMeetingTranscriptCacheState` 类,用于缓存和处理实时转录数据
dev_na
chenhao 2026-06-25 10:17:51 +08:00
parent 2bab042ca0
commit 31a4c5c767
24 changed files with 1450 additions and 549 deletions

View File

@ -71,6 +71,10 @@ public final class RedisKeys {
return "biz:meeting:realtime:state:" + meetingId;
}
public static String realtimeMeetingTranscriptCacheKey(Long meetingId) {
return "biz:meeting:realtime:transcript-cache:" + meetingId;
}
public static String realtimeMeetingResumeTimeoutKey(Long meetingId) {
return realtimeMeetingResumeTimeoutPrefix() + meetingId;
}

View File

@ -3,6 +3,7 @@ package com.imeeting.controller.biz;
import com.imeeting.dto.biz.AiLocalProfileVO;
import com.imeeting.dto.biz.AiModelDTO;
import com.imeeting.dto.biz.AiModelVO;
import com.imeeting.enums.ModelProviderEnum;
import com.imeeting.service.biz.AiModelService;
import com.unisbase.common.ApiResponse;
import com.unisbase.common.annotation.Log;
@ -121,7 +122,7 @@ public class AiModelController {
@PostMapping("/llm-connectivity-test")
@PreAuthorize("isAuthenticated()")
public ApiResponse<Boolean> testLlmConnectivity(@RequestBody AiModelDTO dto) {
if ("custom".equalsIgnoreCase(dto.getProvider()) && (dto.getBaseUrl() == null || dto.getBaseUrl().isBlank())) {
if (ModelProviderEnum.LOCAL.getCode().equalsIgnoreCase(dto.getProvider()) && (dto.getBaseUrl() == null || dto.getBaseUrl().isBlank())) {
return ApiResponse.error("基础地址不能为空");
}
if (dto.getModelCode() == null || dto.getModelCode().isBlank()) {

View File

@ -399,17 +399,6 @@ public class MeetingController {
return ApiResponse.ok(result);
}
@Operation(summary = "追加实时转写片段")
@PostMapping("/{id}/realtime/transcripts")
@PreAuthorize("isAuthenticated()")
public ApiResponse<Boolean> appendRealtimeTranscripts(@PathVariable Long id, @RequestBody List<RealtimeTranscriptItemDTO> items) {
LoginUser loginUser = currentLoginUser();
Meeting meeting = meetingAccessService.requireMeeting(id);
meetingAccessService.assertCanControlRealtimeMeeting(meeting, loginUser, MeetingConstants.SOURCE_WEB);
meetingCommandService.appendRealtimeTranscripts(id, items);
return ApiResponse.ok(true);
}
@Operation(summary = "暂停实时会议")
@PostMapping("/{id}/realtime/pause")
@PreAuthorize("isAuthenticated()")

View File

@ -0,0 +1,21 @@
package com.imeeting.dto.biz;
import lombok.Data;
@Data
public class RealtimeMeetingTranscriptCacheItem {
private String sentenceKey;
private Integer sentenceId;
private Integer sentenceType;
private String speakerId;
private String speakerName;
private String userId;
private Integer startTime;
private Integer endTime;
private String content;
private Integer sortOrder;
private Boolean finalResult;
private Long transcriptId;
private Long firstReceivedAt;
private Long updatedAt;
}

View File

@ -0,0 +1,15 @@
package com.imeeting.dto.biz;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@Data
public class RealtimeMeetingTranscriptCacheState {
private Long meetingId;
private Integer nextSortOrder;
private Integer nextLegacySequence;
private List<RealtimeMeetingTranscriptCacheItem> items = new ArrayList<>();
private Long updatedAt;
}

View File

@ -8,5 +8,6 @@ public class RealtimeSocketSessionData {
private Long userId;
private Long tenantId;
private Long asrModelId;
private String provider;
private String targetWsUrl;
}

View File

@ -0,0 +1,18 @@
package com.imeeting.enums;
import lombok.Getter;
@Getter
public enum ModelProviderEnum {
LOCAL("local", "本地"),
;
private final String code;
private final String description;
ModelProviderEnum(String code, String description) {
this.code = code;
this.description = description;
}
}

View File

@ -28,8 +28,6 @@ public interface MeetingCommandService {
void deleteMeeting(Long id);
void appendRealtimeTranscripts(Long meetingId, List<RealtimeTranscriptItemDTO> items);
void saveRealtimeTranscriptSnapshot(Long meetingId, RealtimeTranscriptItemDTO item, boolean finalResult);
void completeRealtimeMeeting(Long meetingId, String audioUrl, boolean overwriteAudio);

View File

@ -25,6 +25,8 @@ public interface RealtimeMeetingSessionStateService {
void refreshAfterTranscript(Long meetingId);
void refreshAfterTranscriptCapture(Long meetingId, long transcriptCount);
boolean markCompletingIfResumeExpired(Long meetingId);
void expireEmptySession(Long meetingId);

View File

@ -11,6 +11,7 @@ import com.imeeting.dto.biz.AiLocalProfileVO;
import com.imeeting.dto.biz.AiModelVO;
import com.imeeting.entity.biz.AsrModel;
import com.imeeting.entity.biz.LlmModel;
import com.imeeting.enums.ModelProviderEnum;
import com.imeeting.mapper.biz.AsrModelMapper;
import com.imeeting.mapper.biz.LlmModelMapper;
import com.imeeting.service.biz.AiModelService;
@ -169,7 +170,7 @@ public class AiModelServiceImpl implements AiModelService {
if (resolvedBaseUrl == null || resolvedBaseUrl.isBlank()) {
return Collections.emptyList();
}
if ("custom".equals(providerKey)) {
if (ModelProviderEnum.LOCAL.getCode().equals(providerKey)) {
return fetchLocalProfile(resolvedBaseUrl, apiKey).getAsrModels();
}
String targetUrl = resolveModelListUrl(providerKey, resolvedBaseUrl, apiKey);
@ -464,7 +465,7 @@ public class AiModelServiceImpl implements AiModelService {
}
private String resolveModelListUrl(String providerKey, String baseUrl, String apiKey) {
if ("Custom".equalsIgnoreCase(providerKey)) {
if (ModelProviderEnum.LOCAL.getCode().equalsIgnoreCase(providerKey)) {
return baseUrl+"/api/asrconfig";
}
if ("gemini".equals(providerKey) || "google".equals(providerKey)) {
@ -832,7 +833,7 @@ public class AiModelServiceImpl implements AiModelService {
}
private void pushAsrConfig(AsrModel entity) {
if ("custom".equals(normalizeProvider(entity.getProvider()))) {
if (ModelProviderEnum.LOCAL.getCode().equals(normalizeProvider(entity.getProvider()))) {
if (entity.getApiKey() == null || entity.getApiKey().isBlank()) {
log.info("Skip syncing local ASR profile because apiKey is blank, modelName={}", entity.getModelName());
return;

View File

@ -49,6 +49,7 @@ import com.imeeting.support.redis.MeetingAsrPermitCache;
import com.imeeting.support.redis.MeetingLockCache;
import com.unisbase.common.exception.BusinessException;
import com.unisbase.common.exception.ErrorCodeEnum;
import com.imeeting.websocket.RealtimeMeetingProxyWebSocketHandler;
import com.unisbase.service.SysParamService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -82,7 +83,8 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
private final MeetingDomainSupport meetingDomainSupport;
private final MeetingRuntimeProfileResolver meetingRuntimeProfileResolver;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService;
private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService;
private final RealtimeMeetingProxyWebSocketHandler realtimeMeetingProxyWebSocketHandler;
private final MeetingProgressService meetingProgressService;
private final MeetingPointsService meetingPointsService;
private final MeetingSummaryPromptAssembler meetingSummaryPromptAssembler;
@ -109,7 +111,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
MeetingDomainSupport meetingDomainSupport,
MeetingRuntimeProfileResolver meetingRuntimeProfileResolver,
RealtimeMeetingSessionStateService realtimeMeetingSessionStateService,
RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService,
RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService, RealtimeMeetingProxyWebSocketHandler realtimeMeetingProxyWebSocketHandler,
MeetingProgressService meetingProgressService,
MeetingPointsService meetingPointsService,
MeetingSummaryPromptAssembler meetingSummaryPromptAssembler,
@ -131,7 +133,8 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
this.meetingDomainSupport = meetingDomainSupport;
this.meetingRuntimeProfileResolver = meetingRuntimeProfileResolver;
this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService;
this.realtimeMeetingAudioStorageService = realtimeMeetingAudioStorageService;
this.realtimeMeetingAudioStorageService = realtimeMeetingAudioStorageService;
this.realtimeMeetingProxyWebSocketHandler = realtimeMeetingProxyWebSocketHandler;
this.meetingProgressService = meetingProgressService;
this.meetingPointsService = meetingPointsService;
this.meetingSummaryPromptAssembler = meetingSummaryPromptAssembler;
@ -347,58 +350,6 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
deleteMeetingArtifactsAfterCommit(id);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void appendRealtimeTranscripts(Long meetingId, List<RealtimeTranscriptItemDTO> items) {
if (items == null || items.isEmpty()) {
return;
}
Integer maxSortOrder = transcriptMapper.selectList(new LambdaQueryWrapper<MeetingTranscript>()
.eq(MeetingTranscript::getMeetingId, meetingId)
.orderByDesc(MeetingTranscript::getSortOrder)
.last("LIMIT 1"))
.stream()
.findFirst()
.map(MeetingTranscript::getSortOrder)
.orElse(0);
int nextSortOrder = maxSortOrder == null ? 0 : maxSortOrder + 1;
boolean inserted = false;
for (RealtimeTranscriptItemDTO item : items) {
if (item.getContent() == null || item.getContent().isBlank()) {
continue;
}
MeetingTranscript existing = transcriptMapper.selectOne(new LambdaQueryWrapper<MeetingTranscript>()
.eq(MeetingTranscript::getMeetingId, meetingId)
.eq(MeetingTranscript::getContent, item.getContent().trim())
.eq(item.getSpeakerId() != null && !item.getSpeakerId().isBlank(), MeetingTranscript::getSpeakerId, item.getSpeakerId())
.eq(item.getStartTime() != null, MeetingTranscript::getStartTime, item.getStartTime())
.eq(item.getEndTime() != null, MeetingTranscript::getEndTime, item.getEndTime())
.last("LIMIT 1"));
if (existing != null) {
continue;
}
MeetingTranscript transcript = new MeetingTranscript();
transcript.setMeetingId(meetingId);
transcript.setSpeakerId(meetingDomainSupport.resolveSpeakerId(item.getSpeakerId()));
transcript.setSpeakerName(meetingDomainSupport.resolveSpeakerName(item.getSpeakerId(), item.getSpeakerName()));
transcript.setContent(item.getContent().trim());
transcript.setStartTime(item.getStartTime());
transcript.setEndTime(item.getEndTime());
transcript.setSortOrder(nextSortOrder++);
transcriptMapper.insert(transcript);
inserted = true;
}
if (inserted) {
meetingTranscriptFileService.initializeTranscriptFileIfAbsent(meetingId);
realtimeMeetingSessionStateService.refreshAfterTranscript(meetingId);
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void saveRealtimeTranscriptSnapshot(Long meetingId, RealtimeTranscriptItemDTO item, boolean finalResult) {
@ -457,11 +408,11 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
if (meeting == null) {
throw new RuntimeException("会议不存在");
}
realtimeMeetingProxyWebSocketHandler.closeMeetingSession(meetingId);
RealtimeMeetingSessionStatusVO currentStatus = realtimeMeetingSessionStateService.getStatus(meetingId);
if (overwriteAudio) {
if (audioUrl == null || audioUrl.isBlank()) {
throw new RuntimeException("overwriteAudio 为 true 时必须提供音频地址");
throw new RuntimeException("overwriteAudio=true requires audioUrl");
}
meetingDomainSupport.applyMeetingAudioMetadata(
meeting,

View File

@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imeeting.dto.biz.RealtimeMeetingResumeConfig;
import com.imeeting.dto.biz.RealtimeMeetingSessionState;
import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO;
import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheState;
import com.imeeting.entity.biz.Meeting;
import com.imeeting.entity.biz.MeetingTranscript;
import com.imeeting.enums.MeetingStatusEnum;
@ -12,6 +13,7 @@ import com.imeeting.mapper.biz.MeetingTranscriptMapper;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.support.redis.MeetingLockCache;
import com.imeeting.support.redis.RealtimeMeetingSessionCache;
import com.imeeting.support.redis.RealtimeMeetingTranscriptCache;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -31,6 +33,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
private final MeetingLockCache meetingLockCache;
private final MeetingTranscriptMapper transcriptMapper;
private final MeetingMapper meetingMapper;
private final RealtimeMeetingTranscriptCache realtimeMeetingTranscriptCache;
@Value("${imeeting.realtime.resume-window-minutes:30}")
private String resumeWindowMinutesValue;
@ -49,8 +52,9 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
next.setTenantId(tenantId);
next.setUserId(userId);
next.setStatus("IDLE");
next.setHasTranscript(countTranscripts(meetingId) > 0);
next.setTranscriptCountSnapshot(countTranscripts(meetingId));
long transcriptCount = countCapturedTranscripts(meetingId);
next.setHasTranscript(transcriptCount > 0);
next.setTranscriptCountSnapshot(transcriptCount);
next.setUpdatedAt(System.currentTimeMillis());
writeState(next);
}
@ -72,16 +76,16 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
String currentStatus = status.getStatus();
if ("COMPLETING".equals(currentStatus)) {
throw new RuntimeException("实时会议正在结束处理中");
throw new RuntimeException("Realtime meeting is completing");
}
if ("COMPLETED".equals(currentStatus)) {
throw new RuntimeException("实时会议已结束");
throw new RuntimeException("Realtime meeting is already completed");
}
if ("ACTIVE".equals(currentStatus) || Boolean.TRUE.equals(status.getActiveConnection())) {
throw new RuntimeException("实时会议已存在活动连接");
throw new RuntimeException("Realtime meeting already has an active connection");
}
if ("PAUSED_RESUMABLE".equals(currentStatus) && !Boolean.TRUE.equals(status.getCanResume())) {
throw new RuntimeException("实时会议恢复窗口已过期");
throw new RuntimeException("Realtime meeting resume window has expired");
}
}
@ -102,7 +106,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
}
long now = System.currentTimeMillis();
long transcriptCount = countTranscripts(meetingId);
long transcriptCount = countCapturedTranscripts(meetingId);
state.setStatus("ACTIVE");
state.setHasTranscript(transcriptCount > 0);
state.setTranscriptCountSnapshot(transcriptCount);
@ -182,9 +186,13 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
@Override
public void refreshAfterTranscript(Long meetingId) {
refreshAfterTranscriptCapture(meetingId, countTranscripts(meetingId));
}
@Override
public void refreshAfterTranscriptCapture(Long meetingId, long transcriptCount) {
RealtimeMeetingSessionState state = getOrCreateState(meetingId);
long now = System.currentTimeMillis();
long transcriptCount = countTranscripts(meetingId);
state.setHasTranscript(transcriptCount > 0);
state.setTranscriptCountSnapshot(transcriptCount);
@ -214,7 +222,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
return false;
}
long transcriptCount = countTranscripts(meetingId);
long transcriptCount = countCapturedTranscripts(meetingId);
if (transcriptCount <= 0) {
clear(meetingId);
return false;
@ -256,7 +264,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
}
private RealtimeMeetingSessionStatusVO pauseState(Long meetingId, RealtimeMeetingSessionState state) {
long transcriptCount = countTranscripts(meetingId);
long transcriptCount = countCapturedTranscripts(meetingId);
long now = System.currentTimeMillis();
state.setHasTranscript(transcriptCount > 0);
@ -307,7 +315,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
} else {
vo.setStatus("IDLE");
}
vo.setHasTranscript(countTranscripts(meetingId) > 0);
vo.setHasTranscript(countCapturedTranscripts(meetingId) > 0);
vo.setCanResume(false);
vo.setRemainingSeconds(0L);
vo.setActiveConnection(false);
@ -362,8 +370,9 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
RealtimeMeetingSessionState next = new RealtimeMeetingSessionState();
next.setMeetingId(meetingId);
next.setStatus("IDLE");
next.setHasTranscript(countTranscripts(meetingId) > 0);
next.setTranscriptCountSnapshot(countTranscripts(meetingId));
long transcriptCount = countCapturedTranscripts(meetingId);
next.setHasTranscript(transcriptCount > 0);
next.setTranscriptCountSnapshot(transcriptCount);
next.setUpdatedAt(System.currentTimeMillis());
return next;
}
@ -400,4 +409,21 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
return transcriptMapper.selectCount(new LambdaQueryWrapper<MeetingTranscript>()
.eq(MeetingTranscript::getMeetingId, meetingId));
}
private long countCapturedTranscripts(Long meetingId) {
return Math.max(countTranscripts(meetingId), countCachedTranscripts(meetingId));
}
private long countCachedTranscripts(Long meetingId) {
if (meetingId == null) {
return 0L;
}
RealtimeMeetingTranscriptCacheState state = realtimeMeetingTranscriptCache.getState(meetingId);
if (state == null || state.getItems() == null || state.getItems().isEmpty()) {
return 0L;
}
return state.getItems().stream()
.filter(item -> item.getContent() != null && !item.getContent().isBlank())
.count();
}
}

View File

@ -10,12 +10,13 @@ import com.imeeting.service.biz.AiModelService;
import com.imeeting.service.biz.MeetingAccessService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.biz.RealtimeMeetingSocketSessionService;
import com.imeeting.service.realtime.RealtimeAsrChannel;
import com.imeeting.service.realtime.RealtimeAsrChannelFactory;
import com.imeeting.support.redis.RealtimeMeetingSocketSessionCache;
import com.unisbase.security.LoginUser;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -30,6 +31,7 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS
private final MeetingAccessService meetingAccessService;
private final AiModelService aiModelService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final RealtimeAsrChannelFactory realtimeAsrChannelFactory;
@Override
public RealtimeSocketSessionVO createSession(Long meetingId, Long asrModelId, String mode, String language,
@ -37,10 +39,10 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS
Boolean enableTextRefine, Boolean saveAudio,
List<Map<String, Object>> hotwords, LoginUser loginUser) {
if (meetingId == null) {
throw new RuntimeException("会议 ID 不能为空");
throw new RuntimeException("浼氳 ID 涓嶈兘涓虹┖");
}
if (asrModelId == null) {
throw new RuntimeException("ASR 模型 ID 不能为空");
throw new RuntimeException("ASR 妯″瀷 ID 涓嶈兘涓虹┖");
}
Meeting meeting = meetingAccessService.requireMeeting(meetingId);
@ -51,12 +53,13 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS
AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR");
if (asrModel == null) {
throw new RuntimeException("ASR 模型不存在");
throw new RuntimeException("ASR 妯″瀷涓嶅瓨鍦?");
}
String targetWsUrl = resolveWsUrl(asrModel);
RealtimeAsrChannel realtimeAsrChannel = realtimeAsrChannelFactory.getRequired(asrModel.getProvider());
String targetWsUrl = realtimeAsrChannel.resolveTargetWsUrl(asrModel);
if (targetWsUrl == null || targetWsUrl.isBlank()) {
throw new RuntimeException("ASR 模型未配置 WebSocket 地址");
throw new RuntimeException("ASR 妯″瀷鏈厤缃?WebSocket 鍦板潃");
}
RealtimeMeetingResumeConfig resumeConfig = new RealtimeMeetingResumeConfig();
@ -82,6 +85,7 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS
sessionData.setUserId(loginUser.getUserId());
sessionData.setTenantId(loginUser.getTenantId());
sessionData.setAsrModelId(asrModelId);
sessionData.setProvider(realtimeAsrChannelFactory.normalizeProvider(asrModel.getProvider()));
sessionData.setTargetWsUrl(targetWsUrl);
String sessionToken = UUID.randomUUID().toString().replace("-", "");
@ -91,9 +95,8 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS
vo.setSessionToken(sessionToken);
vo.setPath(WS_PATH);
vo.setExpiresInSeconds(socketSessionCache.getSessionTtlSeconds());
vo.setStartMessage(buildStartMessage(
vo.setStartMessage(realtimeAsrChannel.buildStartMessage(
asrModel,
meetingId,
mode,
language,
useSpkId,
@ -110,74 +113,4 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS
public RealtimeSocketSessionData getSessionData(String sessionToken) {
return socketSessionCache.get(sessionToken);
}
private String resolveWsUrl(AiModelVO model) {
if (model.getWsUrl() != null && !model.getWsUrl().isBlank()) {
return model.getWsUrl();
}
if (model.getBaseUrl() == null || model.getBaseUrl().isBlank()) {
return "";
}
return model.getBaseUrl()
.replaceFirst("^http://", "ws://")
.replaceFirst("^https://", "wss://");
}
private Map<String, Object> buildStartMessage(AiModelVO model, Long meetingId, String mode, String language,
Integer useSpkId, Boolean enablePunctuation, Boolean enableItn,
Boolean enableTextRefine, Boolean saveAudio,
List<Map<String, Object>> hotwords) {
Map<String, Object> root = new HashMap<>();
root.put("type", "start");
root.put("request_id", "web_" + System.currentTimeMillis() + "_" + meetingId);
root.put("authorization", buildAuthorization(model.getApiKey()));
Map<String, Object> config = new HashMap<>();
Map<String, Object> audio = new HashMap<>();
audio.put("format", "pcm");
audio.put("sample_rate", 16000);
audio.put("channels", 1);
config.put("audio", audio);
Map<String, Object> recognition = new HashMap<>();
recognition.put("language", normalizeLanguage(language));
recognition.put("enable_punctuation", boolOrDefault(enablePunctuation, true));
recognition.put("enable_itn", boolOrDefault(enableItn, true));
recognition.put("enable_speaker", Integer.valueOf(1).equals(useSpkId));
recognition.put("enable_two_pass", !"online".equalsIgnoreCase(mode));
recognition.put("enable_text_refine", boolOrDefault(enableTextRefine, false));
recognition.put("speaker_threshold", readSpeakerThreshold(model.getMediaConfig()));
recognition.put("hotwords", hotwords == null ? List.of() : hotwords);
config.put("recognition", recognition);
config.put("model", model.getModelCode());
config.put("save_audio", boolOrDefault(saveAudio, false));
root.put("config", config);
return root;
}
private String buildAuthorization(String apiKey) {
if (apiKey == null || apiKey.isBlank()) {
return "";
}
return apiKey.startsWith("Bearer ") ? apiKey : "Bearer " + apiKey;
}
private Object readSpeakerThreshold(Map<String, Object> mediaConfig) {
if (mediaConfig == null) {
return null;
}
return mediaConfig.get("svThreshold");
}
private String normalizeLanguage(String language) {
if (language == null || language.isBlank()) {
return "auto";
}
return language.trim();
}
private boolean boolOrDefault(Boolean value, boolean defaultValue) {
return value != null ? value : defaultValue;
}
}

View File

@ -0,0 +1,32 @@
package com.imeeting.service.realtime;
import com.imeeting.dto.biz.AiModelVO;
import java.util.List;
import java.util.Map;
public interface RealtimeAsrChannel {
boolean supports(String provider);
String resolveTargetWsUrl(AiModelVO model);
Map<String, Object> buildStartMessage(AiModelVO model,
String mode,
String language,
Integer useSpkId,
Boolean enablePunctuation,
Boolean enableItn,
Boolean enableTextRefine,
Boolean saveAudio,
List<Map<String, Object>> hotwords);
void connect(RealtimeAsrChannelContext context) throws Exception;
void handleFrontendText(RealtimeAsrChannelContext context, String payload);
void handleFrontendBinary(RealtimeAsrChannelContext context, byte[] payload);
void closeMeeting(RealtimeAsrChannelContext context);
boolean isOpen(RealtimeAsrChannelContext context);
}

View File

@ -0,0 +1,17 @@
package com.imeeting.service.realtime;
import org.springframework.web.socket.CloseStatus;
public interface RealtimeAsrChannelCallback {
void onChannelOpen(Long meetingId) throws Exception;
void sendFrontendText(Long meetingId, String payload) throws Exception;
void sendFrontendBinary(Long meetingId, byte[] payload) throws Exception;
void sendFrontendError(Long meetingId, String code, String message);
void removeMeetingSession(Long meetingId);
void closeFrontend(Long meetingId, CloseStatus status);
}

View File

@ -0,0 +1,26 @@
package com.imeeting.service.realtime;
import lombok.Data;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Data
public class RealtimeAsrChannelContext {
private Long meetingId;
private String provider;
private String targetWsUrl;
private WebSocketSession rawSession;
private ConcurrentWebSocketSessionDecorator frontendSession;
private RealtimeAsrChannelCallback callback;
private final ConcurrentMap<String, Object> channelState = new ConcurrentHashMap<>();
private volatile ConcurrentMap<String, Object> frontendState = new ConcurrentHashMap<>();
public void bindFrontendSession(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) {
this.rawSession = rawSession;
this.frontendSession = frontendSession;
this.frontendState = new ConcurrentHashMap<>();
}
}

View File

@ -0,0 +1,29 @@
package com.imeeting.service.realtime;
import com.imeeting.enums.ModelProviderEnum;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@RequiredArgsConstructor
public class RealtimeAsrChannelFactory {
private final List<RealtimeAsrChannel> channels;
public RealtimeAsrChannel getRequired(String provider) {
String normalizedProvider = normalizeProvider(provider);
return channels.stream()
.filter(channel -> channel.supports(normalizedProvider))
.findFirst()
.orElseThrow(() -> new RuntimeException("暂不支持的实时 ASR 渠道: " + provider));
}
public String normalizeProvider(String provider) {
if (provider == null || provider.isBlank()) {
return ModelProviderEnum.LOCAL.getCode();
}
return provider.trim().toLowerCase();
}
}

View File

@ -0,0 +1,13 @@
package com.imeeting.service.realtime;
import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheItem;
import java.util.List;
public interface RealtimeMeetingTranscriptCacheService {
void mergeUpstreamMessage(Long meetingId, String payload);
List<RealtimeMeetingTranscriptCacheItem> listOrderedItems(Long meetingId);
void clear(Long meetingId);
}

View File

@ -0,0 +1,538 @@
package com.imeeting.service.realtime.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.imeeting.dto.biz.AiModelVO;
import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheItem;
import com.imeeting.enums.ModelProviderEnum;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.realtime.RealtimeAsrChannel;
import com.imeeting.service.realtime.RealtimeAsrChannelContext;
import com.imeeting.service.realtime.RealtimeMeetingTranscriptCacheService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@Slf4j
@Component
@RequiredArgsConstructor
public class LocalRealtimeAsrChannel implements RealtimeAsrChannel {
private static final String STATE_UPSTREAM_SOCKET = "upstreamSocket";
private static final String STATE_CLOSE_AFTER_END = "closeAfterEnd";
private static final String STATE_START_MESSAGE_FORWARDED = "startMessageForwarded";
private static final String STATE_UPSTREAM_SEND_CHAIN = "upstreamSendChain";
private static final String STATE_START_MESSAGE_SENT = "startMessageSent";
private static final String STATE_PENDING_AUDIO_FRAMES = "pendingAudioFrames";
private static final CompletableFuture<Void> COMPLETED = CompletableFuture.completedFuture(null);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String STOP_MESSAGE = "{\"type\":\"stop\"}";
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService;
@Override
public boolean supports(String provider) {
return ModelProviderEnum.LOCAL.getCode().equalsIgnoreCase(provider);
}
@Override
public String resolveTargetWsUrl(AiModelVO model) {
if (model.getWsUrl() != null && !model.getWsUrl().isBlank()) {
return model.getWsUrl();
}
if (model.getBaseUrl() == null || model.getBaseUrl().isBlank()) {
return "";
}
return model.getBaseUrl()
.replaceFirst("^http://", "ws://")
.replaceFirst("^https://", "wss://");
}
@Override
public Map<String, Object> buildStartMessage(AiModelVO model,
String mode,
String language,
Integer useSpkId,
Boolean enablePunctuation,
Boolean enableItn,
Boolean enableTextRefine,
Boolean saveAudio,
List<Map<String, Object>> hotwords) {
Map<String, Object> root = new HashMap<>();
root.put("type", "start");
Map<String, Object> payload = new HashMap<>();
payload.put("format", "pcm");
payload.put("sample_rate", 16000);
payload.put("language", normalizeProtocolLanguage(language));
payload.put("context", "");
payload.put("enable_inverse_text_normalization", boolOrDefault(enableItn, true));
payload.put("unfixed_token_num", 3);
payload.put("silence_duration_ms", 800);
payload.put("min_partial_sec", 0.3D);
payload.put("pre_roll_ms", 240);
payload.put("max_sentence_count", 8);
payload.put("partial_holdback_chars", 2);
payload.put("enable_native_partial_stream", false);
payload.put("enable_speaker", Integer.valueOf(1).equals(useSpkId));
payload.put("match_speaker_registry", false);
payload.put("speaker_threshold", readSpeakerThreshold(model.getMediaConfig()));
payload.put("enable_realtime_longform", false);
payload.put("enable_realtime_vad_split", false);
payload.put("force_stable_segment_sec", 6);
payload.put("force_stable_min_chars", 24);
payload.put("max_segment_sec", 12);
payload.put("hotwords", hotwords == null ? List.of() : hotwords);
root.put("payload", payload);
return root;
}
@Override
public void connect(RealtimeAsrChannelContext context) throws Exception {
initializeFrontendState(context);
java.net.http.WebSocket upstreamSocket = java.net.http.HttpClient.newHttpClient()
.newWebSocketBuilder()
.buildAsync(URI.create(context.getTargetWsUrl()), new UpstreamListener(context))
.get();
context.getChannelState().put(STATE_UPSTREAM_SOCKET, upstreamSocket);
}
@Override
public void handleFrontendText(RealtimeAsrChannelContext context, String payload) {
java.net.http.WebSocket upstreamSocket = getUpstreamSocket(context);
if (upstreamSocket == null) {
return;
}
initializeFrontendState(context);
if (looksLikeStartMessage(payload)) {
context.getFrontendState().put(STATE_START_MESSAGE_SENT, Boolean.TRUE);
if (!Boolean.TRUE.equals(context.getChannelState().get(STATE_START_MESSAGE_FORWARDED))) {
context.getChannelState().put(STATE_START_MESSAGE_FORWARDED, Boolean.TRUE);
sendUpstreamOrdered(context, () -> upstreamSocket.sendText(payload, true), "text-start");
}
flushPendingAudioFrames(context, upstreamSocket);
return;
}
if (looksLikeStopMessage(payload)) {
context.getChannelState().put(STATE_CLOSE_AFTER_END, Boolean.TRUE);
}
sendUpstreamOrdered(context, () -> upstreamSocket.sendText(payload, true), "text");
}
@Override
public void handleFrontendBinary(RealtimeAsrChannelContext context, byte[] payload) {
java.net.http.WebSocket upstreamSocket = getUpstreamSocket(context);
if (upstreamSocket == null) {
return;
}
initializeFrontendState(context);
if (!Boolean.TRUE.equals(context.getFrontendState().get(STATE_START_MESSAGE_SENT))) {
queuePendingAudioFrame(context, payload);
return;
}
sendUpstreamOrdered(context, () -> upstreamSocket.sendBinary(ByteBuffer.wrap(payload), true), "binary");
}
@Override
public void closeMeeting(RealtimeAsrChannelContext context) {
if (context == null) {
return;
}
java.net.http.WebSocket upstreamSocket = getUpstreamSocket(context);
if (upstreamSocket == null) {
return;
}
context.getChannelState().put(STATE_CLOSE_AFTER_END, Boolean.TRUE);
upstreamSocket.sendText(STOP_MESSAGE, true);
}
@Override
public boolean isOpen(RealtimeAsrChannelContext context) {
return getUpstreamSocket(context) != null;
}
public static String buildFrontendTranscriptMessage(RealtimeMeetingTranscriptCacheItem item) throws JsonProcessingException {
ObjectNode root = OBJECT_MAPPER.createObjectNode();
boolean isFinal = Boolean.TRUE.equals(item.getFinalResult());
root.put("type", isFinal ? "segment" : "partial");
ObjectNode data = root.putObject("data");
data.put("text", item.getContent());
data.put("is_final", isFinal);
if (item.getSentenceId() != null) {
data.put("sentence_id", item.getSentenceId());
}
if (item.getStartTime() != null) {
data.put("start", item.getStartTime() / 1000D);
}
if (item.getEndTime() != null) {
data.put("end", item.getEndTime() / 1000D);
}
if (item.getSpeakerId() != null && !item.getSpeakerId().isBlank()) {
data.put("speaker_id", item.getSpeakerId());
}
if (item.getSpeakerName() != null && !item.getSpeakerName().isBlank()) {
data.put("speaker_name", item.getSpeakerName());
}
if (item.getUserId() != null && !item.getUserId().isBlank()) {
data.put("user_id", item.getUserId());
}
return OBJECT_MAPPER.writeValueAsString(root);
}
private Object readSpeakerThreshold(Map<String, Object> mediaConfig) {
if (mediaConfig == null) {
return null;
}
return mediaConfig.get("svThreshold");
}
private String normalizeProtocolLanguage(String language) {
if (language == null || language.isBlank()) {
return null;
}
String normalized = language.trim();
if ("auto".equalsIgnoreCase(normalized)) {
return null;
}
return normalized;
}
private boolean boolOrDefault(Boolean value, boolean defaultValue) {
return value != null ? value : defaultValue;
}
private void initializeFrontendState(RealtimeAsrChannelContext context) {
context.getFrontendState().putIfAbsent(STATE_UPSTREAM_SEND_CHAIN, COMPLETED);
context.getFrontendState().putIfAbsent(STATE_START_MESSAGE_SENT, Boolean.FALSE);
context.getFrontendState().putIfAbsent(STATE_PENDING_AUDIO_FRAMES, new ArrayList<byte[]>());
}
private java.net.http.WebSocket getUpstreamSocket(RealtimeAsrChannelContext context) {
Object value = context.getChannelState().get(STATE_UPSTREAM_SOCKET);
return value instanceof java.net.http.WebSocket socket ? socket : null;
}
@SuppressWarnings("unchecked")
private void sendUpstreamOrdered(RealtimeAsrChannelContext context,
Supplier<CompletableFuture<?>> sendAction,
String messageType) {
ConcurrentMap<String, Object> frontendState = context.getFrontendState();
synchronized (frontendState) {
CompletableFuture<Void> chain = (CompletableFuture<Void>) frontendState.getOrDefault(STATE_UPSTREAM_SEND_CHAIN, COMPLETED);
CompletableFuture<Void> nextChain = chain
.exceptionally(ex -> null)
.thenCompose(ignored -> sendAction.get().thenApply(ignoredResult -> null));
nextChain = nextChain.whenComplete((ignored, ex) -> {
if (ex != null) {
log.error("顺序发送上游消息失败meetingId={}, sessionId={}, type={}",
context.getMeetingId(), currentConnectionId(context), messageType, ex);
}
});
frontendState.put(STATE_UPSTREAM_SEND_CHAIN, nextChain);
}
}
@SuppressWarnings("unchecked")
private void queuePendingAudioFrame(RealtimeAsrChannelContext context, byte[] payload) {
ConcurrentMap<String, Object> frontendState = context.getFrontendState();
synchronized (frontendState) {
List<byte[]> pendingFrames = (List<byte[]>) frontendState.get(STATE_PENDING_AUDIO_FRAMES);
if (pendingFrames == null) {
pendingFrames = new ArrayList<>();
frontendState.put(STATE_PENDING_AUDIO_FRAMES, pendingFrames);
}
pendingFrames.add(payload);
}
}
@SuppressWarnings("unchecked")
private void flushPendingAudioFrames(RealtimeAsrChannelContext context, java.net.http.WebSocket upstreamSocket) {
List<byte[]> pendingFrames;
ConcurrentMap<String, Object> frontendState = context.getFrontendState();
synchronized (frontendState) {
pendingFrames = (List<byte[]>) frontendState.get(STATE_PENDING_AUDIO_FRAMES);
if (pendingFrames == null || pendingFrames.isEmpty()) {
return;
}
frontendState.put(STATE_PENDING_AUDIO_FRAMES, new ArrayList<byte[]>());
}
log.info("start 后开始补发排队音频帧meetingId={}, sessionId={}, frameCount={}",
context.getMeetingId(), currentConnectionId(context), pendingFrames.size());
for (byte[] frame : pendingFrames) {
sendUpstreamOrdered(context, () -> upstreamSocket.sendBinary(ByteBuffer.wrap(frame), true), "binary-flush");
}
}
private String currentConnectionId(RealtimeAsrChannelContext context) {
return context.getRawSession() == null ? null : context.getRawSession().getId();
}
private static ByteBuffer copyBuffer(ByteBuffer source) {
ByteBuffer duplicate = source.asReadOnlyBuffer();
byte[] bytes = new byte[duplicate.remaining()];
duplicate.get(bytes);
return ByteBuffer.wrap(bytes);
}
private static boolean shouldLogBinaryFrame(int count) {
return count <= 3 || count % 25 == 0;
}
private static String summarizeText(String payload) {
if (payload == null) {
return "";
}
String normalized = payload.replaceAll("\\s+", " ").trim();
if (normalized.length() <= 240) {
return normalized;
}
return normalized.substring(0, 240) + "...";
}
private static boolean looksLikeStartMessage(String payload) {
if (payload == null || payload.isBlank()) {
return false;
}
String normalized = payload.replaceAll("\\s+", "");
return normalized.contains("\"type\":\"start\"");
}
private static boolean looksLikeStopMessage(String payload) {
if (payload == null || payload.isBlank()) {
return false;
}
String normalized = payload.replaceAll("\\s+", "");
return normalized.contains("\"type\":\"stop\"");
}
private static boolean looksLikeEndMessage(String payload) {
if (payload == null || payload.isBlank()) {
return false;
}
try {
JsonNode root = OBJECT_MAPPER.readTree(payload);
return "end".equals(root.path("type").asText(""));
} catch (Exception ex) {
return false;
}
}
private static List<String> normalizeFrontendMessages(String upstreamPayload) {
try {
JsonNode root = OBJECT_MAPPER.readTree(upstreamPayload);
String type = root.path("type").asText("");
if (!"sentences".equals(type) && !"end".equals(type)) {
return List.of(upstreamPayload);
}
List<String> normalizedMessages = new ArrayList<>();
JsonNode sentences = root.path("sentences");
if (sentences.isArray()) {
for (JsonNode sentence : sentences) {
String text = sentence.path("sentence").asText("").trim();
if (text.isEmpty()) {
continue;
}
boolean isFinal = sentence.path("sentence_type").asInt(0) != 0 || "end".equals(type);
normalizedMessages.add(buildFrontendTranscriptMessage(sentence, text, isFinal));
}
}
if (normalizedMessages.isEmpty()) {
String fallbackText = root.path("result").path("voice_text_str").asText("").trim();
if (!fallbackText.isEmpty()) {
normalizedMessages.add(buildFrontendTranscriptMessage((JsonNode) null, fallbackText, true));
}
}
return normalizedMessages.isEmpty() ? List.of(upstreamPayload) : normalizedMessages;
} catch (Exception ex) {
return List.of(upstreamPayload);
}
}
private static String buildFrontendTranscriptMessage(JsonNode sentence, String text, boolean isFinal) throws JsonProcessingException {
ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("type", isFinal ? "segment" : "partial");
ObjectNode data = root.putObject("data");
data.put("text", text);
data.put("is_final", isFinal);
if (sentence != null && sentence.has("sentence_id") && sentence.get("sentence_id").canConvertToInt()) {
data.put("sentence_id", sentence.get("sentence_id").asInt());
}
if (sentence != null) {
copyOptionalTimeSeconds(sentence, data, "start_time", "start");
copyOptionalTimeSeconds(sentence, data, "end_time", "end");
copyOptionalText(sentence, data, "speaker_id");
copyOptionalText(sentence, data, "speaker_name");
copyOptionalText(sentence, data, "user_id");
}
return OBJECT_MAPPER.writeValueAsString(root);
}
private static void copyOptionalTimeSeconds(JsonNode source, ObjectNode target, String sourceFieldName, String targetFieldName) {
if (source == null) {
return;
}
if (source.has(sourceFieldName) && source.get(sourceFieldName).isNumber()) {
target.put(targetFieldName, source.get(sourceFieldName).asDouble() / 1000D);
return;
}
if (source.has(targetFieldName) && source.get(targetFieldName).isNumber()) {
target.put(targetFieldName, source.get(targetFieldName).asDouble());
}
}
private static void copyOptionalText(JsonNode source, ObjectNode target, String fieldName) {
if (source == null || !source.has(fieldName) || source.get(fieldName).isNull()) {
return;
}
String value = source.get(fieldName).asText("").trim();
if (!value.isEmpty()) {
target.put(fieldName, value);
}
}
private final class UpstreamListener implements java.net.http.WebSocket.Listener {
private final RealtimeAsrChannelContext context;
private final StringBuilder textBuffer = new StringBuilder();
private final ByteArrayOutputStream binaryBuffer = new ByteArrayOutputStream();
private final AtomicInteger upstreamTextCount = new AtomicInteger();
private final AtomicInteger upstreamBinaryCount = new AtomicInteger();
private UpstreamListener(RealtimeAsrChannelContext context) {
this.context = context;
}
@Override
public void onOpen(java.net.http.WebSocket webSocket) {
context.getChannelState().put(STATE_UPSTREAM_SOCKET, webSocket);
log.info("上游 ASR websocket 已打开meetingId={}, sessionId={}, upstream={}",
context.getMeetingId(), currentConnectionId(context), context.getTargetWsUrl());
String connectionId = currentConnectionId(context);
if (connectionId == null || !realtimeMeetingSessionStateService.activate(context.getMeetingId(), connectionId)) {
context.getCallback().sendFrontendError(context.getMeetingId(), "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议无法激活这条前端连接");
webSocket.sendClose(CloseStatus.POLICY_VIOLATION.getCode(), "当前会议无法激活这条前端连接");
context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.POLICY_VIOLATION.withReason("当前会议无法激活这条前端连接"));
return;
}
try {
context.getCallback().onChannelOpen(context.getMeetingId());
} catch (Exception ex) {
log.error("通知前端上游就绪失败meetingId={}, sessionId={}", context.getMeetingId(), currentConnectionId(context), ex);
context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.SERVER_ERROR);
return;
}
webSocket.request(1);
}
@Override
public java.util.concurrent.CompletionStage<?> onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) {
textBuffer.append(data);
if (last) {
int count = upstreamTextCount.incrementAndGet();
String upstreamPayload = textBuffer.toString();
realtimeMeetingTranscriptCacheService.mergeUpstreamMessage(context.getMeetingId(), upstreamPayload);
try {
for (String frontendPayload : normalizeFrontendMessages(upstreamPayload)) {
context.getCallback().sendFrontendText(context.getMeetingId(), frontendPayload);
}
log.info("上游 ASR 文本 -> 前端meetingId={}, sessionId={}, count={}, payload={}",
context.getMeetingId(), currentConnectionId(context), count, summarizeText(upstreamPayload));
if (Boolean.TRUE.equals(context.getChannelState().get(STATE_CLOSE_AFTER_END))
&& looksLikeEndMessage(upstreamPayload)) {
webSocket.sendClose(CloseStatus.NORMAL.getCode(), "meeting-complete");
}
} catch (Exception ex) {
log.error("转发上游 ASR 文本失败meetingId={}, sessionId={}", context.getMeetingId(), currentConnectionId(context), ex);
context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.SERVER_ERROR);
} finally {
textBuffer.setLength(0);
}
}
webSocket.request(1);
return COMPLETED;
}
@Override
public java.util.concurrent.CompletionStage<?> onBinary(java.net.http.WebSocket webSocket, ByteBuffer data, boolean last) {
byte[] chunk = new byte[data.remaining()];
data.get(chunk);
binaryBuffer.writeBytes(chunk);
if (last) {
int count = upstreamBinaryCount.incrementAndGet();
try {
context.getCallback().sendFrontendBinary(context.getMeetingId(), binaryBuffer.toByteArray());
if (shouldLogBinaryFrame(count)) {
log.info("上游 ASR 二进制消息 -> 前端meetingId={}, sessionId={}, count={}, bytes={}",
context.getMeetingId(), currentConnectionId(context), count, binaryBuffer.size());
}
} catch (Exception ex) {
log.error("转发上游 ASR 二进制消息失败meetingId={}, sessionId={}", context.getMeetingId(), currentConnectionId(context), ex);
context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.SERVER_ERROR);
} finally {
binaryBuffer.reset();
}
}
webSocket.request(1);
return COMPLETED;
}
@Override
public java.util.concurrent.CompletionStage<?> onPing(java.net.http.WebSocket webSocket, ByteBuffer message) {
webSocket.sendPong(copyBuffer(message));
log.info("上游 ASR ping 已本地响应meetingId={}, sessionId={}, bytes={}",
context.getMeetingId(), currentConnectionId(context), message.remaining());
webSocket.request(1);
return COMPLETED;
}
@Override
public java.util.concurrent.CompletionStage<?> onPong(java.net.http.WebSocket webSocket, ByteBuffer message) {
log.debug("上游 ASR pong 已本地忽略meetingId={}, sessionId={}, bytes={}",
context.getMeetingId(), currentConnectionId(context), message.remaining());
webSocket.request(1);
return COMPLETED;
}
@Override
public java.util.concurrent.CompletionStage<?> onClose(java.net.http.WebSocket webSocket, int statusCode, String reason) {
log.info("上游 ASR websocket 已关闭meetingId={}, sessionId={}, code={}, reason={}",
context.getMeetingId(), currentConnectionId(context), statusCode, reason);
context.getChannelState().remove(STATE_UPSTREAM_SOCKET);
context.getCallback().removeMeetingSession(context.getMeetingId());
context.getCallback().sendFrontendError(context.getMeetingId(),
"REALTIME_UPSTREAM_CLOSED",
reason == null || reason.isBlank() ? "上游 ASR WebSocket 已断开" : "上游 ASR WebSocket 已断开: " + reason);
context.getCallback().closeFrontend(context.getMeetingId(), new CloseStatus(statusCode, reason));
return COMPLETED;
}
@Override
public void onError(java.net.http.WebSocket webSocket, Throwable error) {
log.error("上游 ASR websocket 异常meetingId={}, sessionId={}, upstream={}",
context.getMeetingId(), currentConnectionId(context), context.getTargetWsUrl(), error);
context.getChannelState().remove(STATE_UPSTREAM_SOCKET);
context.getCallback().removeMeetingSession(context.getMeetingId());
context.getCallback().sendFrontendError(context.getMeetingId(),
"REALTIME_UPSTREAM_ERROR",
error == null || error.getMessage() == null || error.getMessage().isBlank()
? "上游 ASR WebSocket 连接异常"
: "上游 ASR WebSocket 连接异常: " + error.getMessage());
context.getCallback().closeFrontend(context.getMeetingId(), CloseStatus.SERVER_ERROR);
}
}
}

View File

@ -0,0 +1,286 @@
package com.imeeting.service.realtime.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheItem;
import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheState;
import com.imeeting.entity.biz.MeetingTranscript;
import com.imeeting.mapper.biz.MeetingTranscriptMapper;
import com.imeeting.service.biz.MeetingTranscriptFileService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.realtime.RealtimeMeetingTranscriptCacheService;
import com.imeeting.support.redis.RealtimeMeetingTranscriptCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class RealtimeMeetingTranscriptCacheServiceImpl implements RealtimeMeetingTranscriptCacheService {
private final RealtimeMeetingTranscriptCache transcriptCache;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final MeetingTranscriptMapper transcriptMapper;
private final MeetingTranscriptFileService meetingTranscriptFileService;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<Long, Object> meetingLocks = new ConcurrentHashMap<>();
@Autowired
public RealtimeMeetingTranscriptCacheServiceImpl(RealtimeMeetingTranscriptCache transcriptCache,
RealtimeMeetingSessionStateService realtimeMeetingSessionStateService,
MeetingTranscriptMapper transcriptMapper,
MeetingTranscriptFileService meetingTranscriptFileService) {
this.transcriptCache = transcriptCache;
this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService;
this.transcriptMapper = transcriptMapper;
this.meetingTranscriptFileService = meetingTranscriptFileService;
}
@Override
public void mergeUpstreamMessage(Long meetingId, String payload) {
if (meetingId == null || payload == null || payload.isBlank()) {
return;
}
synchronized (lockForMeeting(meetingId)) {
RealtimeMeetingTranscriptCacheState state = getOrCreateState(meetingId);
try {
JsonNode root = objectMapper.readTree(payload);
String type = root.path("type").asText("");
if (!"sentences".equals(type) && !"end".equals(type)) {
return;
}
JsonNode sentences = root.path("sentences");
if (!sentences.isArray()) {
return;
}
for (JsonNode sentence : sentences) {
RealtimeMeetingTranscriptCacheItem item = mergeSentenceNode(state, sentence, "end".equals(type));
persistFinalSentence(meetingId, item);
}
state.setUpdatedAt(System.currentTimeMillis());
transcriptCache.saveState(state);
realtimeMeetingSessionStateService.refreshAfterTranscriptCapture(meetingId, countNonEmptyItems(state));
} catch (Exception ignored) {
// ignore malformed upstream payload
}
}
}
@Override
public List<RealtimeMeetingTranscriptCacheItem> listOrderedItems(Long meetingId) {
RealtimeMeetingTranscriptCacheState state = transcriptCache.getState(meetingId);
if (state == null || state.getItems() == null || state.getItems().isEmpty()) {
return List.of();
}
return state.getItems().stream()
.filter(item -> item.getContent() != null && !item.getContent().isBlank())
.sorted(Comparator.comparing(item -> item.getSortOrder() == null ? Integer.MAX_VALUE : item.getSortOrder()))
.toList();
}
@Override
public void clear(Long meetingId) {
transcriptCache.clear(meetingId);
}
private RealtimeMeetingTranscriptCacheItem mergeSentenceNode(RealtimeMeetingTranscriptCacheState state,
JsonNode sentence,
boolean fromEndMessage) {
String text = sentence.path("sentence").asText("").trim();
if (text.isEmpty()) {
return null;
}
Integer sentenceId = sentence.has("sentence_id") && sentence.get("sentence_id").canConvertToInt()
? sentence.get("sentence_id").asInt()
: null;
String sentenceKey = sentenceId == null ? "sentence-" + nextLegacySequence(state) : "sentence-" + sentenceId;
RealtimeMeetingTranscriptCacheItem item = findBySentenceKey(state, sentenceKey);
long now = System.currentTimeMillis();
if (item == null) {
item = new RealtimeMeetingTranscriptCacheItem();
item.setSentenceKey(sentenceKey);
item.setSentenceId(sentenceId);
item.setSortOrder(nextSortOrder(state));
item.setFirstReceivedAt(now);
state.getItems().add(item);
}
item.setSentenceType(readInteger(sentence, "sentence_type"));
item.setSpeakerId(resolveSpeakerId(sentence));
item.setSpeakerName(readText(sentence, "speaker_name"));
item.setUserId(readText(sentence, "user_id"));
item.setStartTime(readTimeMilliseconds(sentence, "start_time", "start"));
item.setEndTime(readTimeMilliseconds(sentence, "end_time", "end"));
item.setContent(text);
item.setFinalResult(fromEndMessage || Objects.equals(item.getSentenceType(), 1));
item.setUpdatedAt(now);
return item;
}
private void persistFinalSentence(Long meetingId, RealtimeMeetingTranscriptCacheItem item) {
if (meetingId == null || item == null || !Boolean.TRUE.equals(item.getFinalResult())) {
return;
}
String content = item.getContent() == null ? null : item.getContent().trim();
if (content == null || content.isBlank()) {
return;
}
String speakerId = resolveSpeakerId(item);
String speakerName = resolveSpeakerName(item);
if (item.getTranscriptId() != null) {
transcriptMapper.update(null, new LambdaUpdateWrapper<MeetingTranscript>()
.eq(MeetingTranscript::getId, item.getTranscriptId())
.set(MeetingTranscript::getSpeakerId, speakerId)
.set(MeetingTranscript::getSpeakerName, speakerName)
.set(MeetingTranscript::getContent, content)
.set(item.getStartTime() != null, MeetingTranscript::getStartTime, item.getStartTime())
.set(item.getEndTime() != null, MeetingTranscript::getEndTime, item.getEndTime()));
meetingTranscriptFileService.initializeTranscriptFileIfAbsent(meetingId);
return;
}
MeetingTranscript transcript = new MeetingTranscript();
transcript.setMeetingId(meetingId);
transcript.setSpeakerId(speakerId);
transcript.setSpeakerName(speakerName);
transcript.setContent(content);
transcript.setStartTime(item.getStartTime());
transcript.setEndTime(item.getEndTime());
transcript.setSortOrder(nextPersistedSortOrder(meetingId));
transcriptMapper.insert(transcript);
item.setTranscriptId(transcript.getId());
meetingTranscriptFileService.initializeTranscriptFileIfAbsent(meetingId);
}
private int nextPersistedSortOrder(Long meetingId) {
Integer maxSortOrder = transcriptMapper.selectList(new LambdaQueryWrapper<MeetingTranscript>()
.eq(MeetingTranscript::getMeetingId, meetingId)
.orderByDesc(MeetingTranscript::getSortOrder)
.last("LIMIT 1"))
.stream()
.findFirst()
.map(MeetingTranscript::getSortOrder)
.orElse(0);
return maxSortOrder == null ? 0 : maxSortOrder + 1;
}
private RealtimeMeetingTranscriptCacheItem findBySentenceKey(RealtimeMeetingTranscriptCacheState state, String sentenceKey) {
if (state.getItems() == null || state.getItems().isEmpty()) {
return null;
}
return state.getItems().stream()
.filter(item -> sentenceKey.equals(item.getSentenceKey()))
.findFirst()
.orElse(null);
}
private String resolveSpeakerId(RealtimeMeetingTranscriptCacheItem item) {
if (item == null) {
return null;
}
if (item.getUserId() != null && !item.getUserId().isBlank()) {
return item.getUserId().trim();
}
if (item.getSpeakerId() == null || item.getSpeakerId().isBlank() || "-1".equals(item.getSpeakerId().trim())) {
return null;
}
return item.getSpeakerId().trim();
}
private String resolveSpeakerName(RealtimeMeetingTranscriptCacheItem item) {
if (item == null) {
return null;
}
if (item.getSpeakerName() != null && !item.getSpeakerName().isBlank()) {
return item.getSpeakerName().trim();
}
String speakerId = resolveSpeakerId(item);
return speakerId == null || speakerId.isBlank() ? null : speakerId;
}
private String resolveSpeakerId(JsonNode sentence) {
String userId = readText(sentence, "user_id");
if (userId != null && !userId.isBlank()) {
return userId;
}
return readText(sentence, "speaker_id");
}
private Integer readInteger(JsonNode node, String fieldName) {
if (node == null || !node.has(fieldName) || !node.get(fieldName).canConvertToInt()) {
return null;
}
return node.get(fieldName).asInt();
}
private Integer readTimeMilliseconds(JsonNode node, String primaryField, String fallbackField) {
if (node == null) {
return null;
}
if (node.has(primaryField) && node.get(primaryField).canConvertToInt()) {
return node.get(primaryField).asInt();
}
if (node.has(fallbackField) && node.get(fallbackField).isNumber()) {
return Math.round((float) (node.get(fallbackField).asDouble() * 1000));
}
return null;
}
private String readText(JsonNode node, String fieldName) {
if (node == null || !node.has(fieldName) || node.get(fieldName).isNull()) {
return null;
}
String value = node.get(fieldName).asText("");
return value == null ? null : value.trim();
}
private long countNonEmptyItems(RealtimeMeetingTranscriptCacheState state) {
if (state.getItems() == null || state.getItems().isEmpty()) {
return 0L;
}
return state.getItems().stream()
.filter(item -> item.getContent() != null && !item.getContent().isBlank())
.count();
}
private int nextSortOrder(RealtimeMeetingTranscriptCacheState state) {
Integer current = state.getNextSortOrder();
int next = current == null ? 0 : current;
state.setNextSortOrder(next + 1);
return next;
}
private int nextLegacySequence(RealtimeMeetingTranscriptCacheState state) {
Integer current = state.getNextLegacySequence();
int next = current == null ? 0 : current;
state.setNextLegacySequence(next + 1);
return next;
}
private RealtimeMeetingTranscriptCacheState getOrCreateState(Long meetingId) {
RealtimeMeetingTranscriptCacheState state = transcriptCache.getState(meetingId);
if (state != null) {
if (state.getItems() == null) {
state.setItems(new ArrayList<>());
}
return state;
}
RealtimeMeetingTranscriptCacheState next = new RealtimeMeetingTranscriptCacheState();
next.setMeetingId(meetingId);
next.setItems(new ArrayList<>());
next.setNextSortOrder(0);
next.setNextLegacySequence(0);
next.setUpdatedAt(System.currentTimeMillis());
return next;
}
private Object lockForMeeting(Long meetingId) {
return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object());
}
}

View File

@ -0,0 +1,39 @@
package com.imeeting.support.redis;
import com.imeeting.common.RedisKeys;
import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheState;
import com.imeeting.support.RedisSupport;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.time.Duration;
@Component
@RequiredArgsConstructor
public class RealtimeMeetingTranscriptCache {
private static final Duration CACHE_TTL = Duration.ofHours(12);
private final RedisSupport redisSupport;
public RealtimeMeetingTranscriptCacheState getState(Long meetingId) {
if (meetingId == null) {
return null;
}
return redisSupport.getJsonQuietly(RedisKeys.realtimeMeetingTranscriptCacheKey(meetingId), RealtimeMeetingTranscriptCacheState.class);
}
public void saveState(RealtimeMeetingTranscriptCacheState state) {
if (state == null || state.getMeetingId() == null) {
return;
}
redisSupport.setJson(RedisKeys.realtimeMeetingTranscriptCacheKey(state.getMeetingId()), state, CACHE_TTL);
}
public void clear(Long meetingId) {
if (meetingId == null) {
return;
}
redisSupport.deleteQuietly(RedisKeys.realtimeMeetingTranscriptCacheKey(meetingId));
}
}

View File

@ -1,173 +1,133 @@
package com.imeeting.websocket;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.dto.biz.RealtimeMeetingTranscriptCacheItem;
import com.imeeting.dto.biz.RealtimeSocketSessionData;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.biz.RealtimeMeetingSocketSessionService;
import com.imeeting.service.realtime.RealtimeAsrChannel;
import com.imeeting.service.realtime.RealtimeAsrChannelCallback;
import com.imeeting.service.realtime.RealtimeAsrChannelContext;
import com.imeeting.service.realtime.RealtimeAsrChannelFactory;
import com.imeeting.service.realtime.RealtimeMeetingAudioStorageService;
import com.imeeting.service.realtime.RealtimeMeetingTranscriptCacheService;
import com.imeeting.service.realtime.impl.LocalRealtimeAsrChannel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PingMessage;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@Slf4j
@Component
@RequiredArgsConstructor
public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandler {
private static final String ATTR_FRONTEND_SESSION = "frontendSession";
private static final String ATTR_UPSTREAM_SOCKET = "upstreamSocket";
private static final String ATTR_MEETING_ID = "meetingId";
private static final String ATTR_TARGET_WS_URL = "targetWsUrl";
private static final String ATTR_PROVIDER = "provider";
private static final String ATTR_FRONTEND_TEXT_COUNT = "frontendTextCount";
private static final String ATTR_FRONTEND_BINARY_COUNT = "frontendBinaryCount";
private static final String ATTR_UPSTREAM_SEND_CHAIN = "upstreamSendChain";
private static final String ATTR_START_MESSAGE_SENT = "startMessageSent";
private static final String ATTR_PENDING_AUDIO_FRAMES = "pendingAudioFrames";
private static final CompletableFuture<Void> COMPLETED = CompletableFuture.completedFuture(null);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService;
private final RealtimeMeetingTranscriptCacheService realtimeMeetingTranscriptCacheService;
private final RealtimeAsrChannelFactory realtimeAsrChannelFactory;
private final ConcurrentMap<Long, MeetingChannelSession> meetingSessions = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, Object> meetingLocks = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String sessionToken = extractQueryParam(session.getUri(), "sessionToken");
RealtimeSocketSessionData sessionData = realtimeMeetingSocketSessionService.getSessionData(sessionToken);
if (sessionData == null) {
log.warn("Realtime websocket rejected: invalid session token, sessionId={}", session.getId());
log.warn("实时会议 websocket 拒绝连接:会话令牌无效,sessionId={}", session.getId());
session.close(CloseStatus.POLICY_VIOLATION.withReason("实时 Socket 会话无效"));
return;
}
ConcurrentWebSocketSessionDecorator frontendSession =
new ConcurrentWebSocketSessionDecorator(session, (int) Duration.ofSeconds(15).toMillis(), 1024 * 1024);
session.getAttributes().put(ATTR_FRONTEND_SESSION, frontendSession);
session.getAttributes().put(ATTR_MEETING_ID, sessionData.getMeetingId());
session.getAttributes().put(ATTR_TARGET_WS_URL, sessionData.getTargetWsUrl());
session.getAttributes().put(ATTR_PROVIDER, sessionData.getProvider());
session.getAttributes().put(ATTR_FRONTEND_TEXT_COUNT, new AtomicInteger());
session.getAttributes().put(ATTR_FRONTEND_BINARY_COUNT, new AtomicInteger());
session.getAttributes().put(ATTR_UPSTREAM_SEND_CHAIN, COMPLETED);
session.getAttributes().put(ATTR_START_MESSAGE_SENT, Boolean.FALSE);
session.getAttributes().put(ATTR_PENDING_AUDIO_FRAMES, new ArrayList<byte[]>());
realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), session.getId());
log.info("Realtime websocket accepted: meetingId={}, sessionId={}, upstream={}",
sessionData.getMeetingId(), session.getId(), sessionData.getTargetWsUrl());
log.info("实时会议 websocket 已接入meetingId={}, sessionId={}, provider={}, upstream={}",
sessionData.getMeetingId(), session.getId(), sessionData.getProvider(), sessionData.getTargetWsUrl());
java.net.http.WebSocket upstreamSocket;
try {
upstreamSocket = java.net.http.HttpClient.newHttpClient()
.newWebSocketBuilder()
.buildAsync(URI.create(sessionData.getTargetWsUrl()),
new UpstreamListener(
frontendSession,
session,
sessionData.getMeetingId(),
sessionData.getTargetWsUrl(),
realtimeMeetingSessionStateService
))
.get();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.error("Realtime websocket upstream connect interrupted: meetingId={}, sessionId={}",
sessionData.getMeetingId(), session.getId(), ex);
sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接第三方识别服务时被中断");
realtimeMeetingAudioStorageService.closeSession(session.getId());
frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断"));
return;
} catch (ExecutionException | CompletionException ex) {
log.warn("Failed to connect upstream websocket, meetingId={}, target={}", sessionData.getMeetingId(), sessionData.getTargetWsUrl(), ex);
sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接第三方识别服务失败,请检查模型 WebSocket 配置或服务状态");
realtimeMeetingAudioStorageService.closeSession(session.getId());
frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败"));
return;
}
session.getAttributes().put(ATTR_UPSTREAM_SOCKET, upstreamSocket);
attachFrontendSession(sessionData, session, frontendSession);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
java.net.http.WebSocket upstreamSocket = getUpstreamSocket(session);
if (upstreamSocket == null) {
log.warn("Frontend text ignored because upstream socket is unavailable, meetingId={}, sessionId={}",
MeetingChannelSession meetingSession = getMeetingSession(session);
if (meetingSession == null || !meetingSession.isChannelOpen()) {
log.warn("前端文本消息已忽略:上游 ASR 连接不可用meetingId={}, sessionId={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId());
return;
}
int count = nextCount(session, ATTR_FRONTEND_TEXT_COUNT);
log.info("Frontend text -> upstream: meetingId={}, sessionId={}, count={}, payload={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), count, summarizeText(message.getPayload()));
sendUpstreamOrdered(session, () -> upstreamSocket.sendText(message.getPayload(), true), "text");
if (looksLikeStartMessage(message.getPayload())) {
session.getAttributes().put(ATTR_START_MESSAGE_SENT, Boolean.TRUE);
flushPendingAudioFrames(session, upstreamSocket);
}
String payload = message.getPayload();
log.info("前端文本 -> ASR 渠道meetingId={}, sessionId={}, provider={}, count={}, payload={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, summarizeText(payload));
meetingSession.channel.handleFrontendText(meetingSession.context, payload);
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
java.net.http.WebSocket upstreamSocket = getUpstreamSocket(session);
if (upstreamSocket == null) {
log.warn("Frontend binary ignored because upstream socket is unavailable, meetingId={}, sessionId={}",
MeetingChannelSession meetingSession = getMeetingSession(session);
if (meetingSession == null || !meetingSession.isChannelOpen()) {
log.warn("前端音频帧已忽略:上游 ASR 连接不可用meetingId={}, sessionId={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId());
return;
}
int count = nextCount(session, ATTR_FRONTEND_BINARY_COUNT);
int bytes = message.getPayloadLength();
if (shouldLogBinaryFrame(count)) {
log.info("Frontend binary -> upstream: meetingId={}, sessionId={}, count={}, bytes={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), count, bytes);
log.info("前端音频帧 -> ASR 渠道meetingId={}, sessionId={}, provider={}, count={}, bytes={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_PROVIDER), count, bytes);
}
byte[] payload = toByteArray(message.getPayload());
realtimeMeetingAudioStorageService.append(session.getId(), payload);
if (!Boolean.TRUE.equals(session.getAttributes().get(ATTR_START_MESSAGE_SENT))) {
queuePendingAudioFrame(session, payload);
if (shouldLogBinaryFrame(count)) {
log.warn("Frontend binary queued before start message: meetingId={}, sessionId={}, count={}, bytes={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), count, bytes);
}
return;
}
sendUpstreamOrdered(session, () -> upstreamSocket.sendBinary(ByteBuffer.wrap(payload), true), "binary");
meetingSession.channel.handleFrontendBinary(meetingSession.context, payload);
}
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) {
java.net.http.WebSocket upstreamSocket = getUpstreamSocket(session);
if (upstreamSocket == null) {
if (getMeetingSession(session) == null) {
return;
}
sendUpstreamOrdered(session, () -> upstreamSocket.sendPong(copyBuffer(message.getPayload())), "pong");
log.debug("前端 pong 已在本地忽略meetingId={}, sessionId={}, bytes={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), message.getPayloadLength());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("Realtime websocket transport error: meetingId={}, sessionId={}, upstream={}",
log.error("实时会议 websocket 传输异常:meetingId={}, sessionId={}, upstream={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), session.getAttributes().get(ATTR_TARGET_WS_URL), exception);
closeUpstreamSocket(session, CloseStatus.SERVER_ERROR);
detachFrontend(session);
realtimeMeetingAudioStorageService.closeSession(session.getId());
if (session.isOpen()) {
session.close(CloseStatus.SERVER_ERROR);
}
@ -175,32 +135,149 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
log.info("Realtime websocket closed: meetingId={}, sessionId={}, code={}, reason={}",
log.info("实时会议 websocket 已关闭:meetingId={}, sessionId={}, code={}, reason={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), status.getCode(), status.getReason());
Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
if (meetingIdValue instanceof Long meetingId) {
detachFrontend(meetingId, session.getId());
realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, session.getId());
}
realtimeMeetingAudioStorageService.closeSession(session.getId());
closeUpstreamSocket(session, status);
}
private java.net.http.WebSocket getUpstreamSocket(WebSocketSession session) {
Object socket = session.getAttributes().get(ATTR_UPSTREAM_SOCKET);
if (socket instanceof java.net.http.WebSocket webSocket) {
return webSocket;
public void closeMeetingSession(Long meetingId) {
if (meetingId == null) {
return;
}
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null || meetingSession.channel == null) {
return;
}
meetingSession.channel.closeMeeting(meetingSession.context);
}
private void attachFrontendSession(RealtimeSocketSessionData sessionData,
WebSocketSession rawSession,
ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
Long meetingId = sessionData.getMeetingId();
MeetingChannelSession meetingSession;
boolean reused = false;
synchronized (lockForMeeting(meetingId)) {
meetingSession = meetingSessions.get(meetingId);
if (meetingSession != null && meetingSession.isChannelOpen()) {
meetingSession.clearFrontendIfClosed();
if (meetingSession.hasOpenFrontend()) {
sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃前端连接");
frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("已存在活跃的前端连接"));
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
return;
}
return null;
if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) {
sendFrontendError(frontendSession, "REALTIME_ACTIVE_CONNECTION_REJECTED", "当前状态下无法继续会议");
frontendSession.close(CloseStatus.POLICY_VIOLATION.withReason("当前状态下无法继续会议"));
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
return;
}
meetingSession.bindFrontend(rawSession, frontendSession);
reused = true;
} else {
RealtimeAsrChannel channel = realtimeAsrChannelFactory.getRequired(sessionData.getProvider());
RealtimeAsrChannelContext context = new RealtimeAsrChannelContext();
context.setMeetingId(meetingId);
context.setProvider(realtimeAsrChannelFactory.normalizeProvider(sessionData.getProvider()));
context.setTargetWsUrl(sessionData.getTargetWsUrl());
context.setCallback(new HandlerChannelCallback());
context.bindFrontendSession(rawSession, frontendSession);
meetingSession = new MeetingChannelSession(meetingId, channel, context);
meetingSessions.put(meetingId, meetingSession);
}
}
private void closeUpstreamSocket(WebSocketSession session, CloseStatus status) {
java.net.http.WebSocket upstreamSocket = getUpstreamSocket(session);
if (upstreamSocket != null) {
upstreamSocket.sendClose(status.getCode(), status.getReason() == null ? "" : status.getReason());
session.getAttributes().remove(ATTR_UPSTREAM_SOCKET);
if (reused) {
sendProxyReady(frontendSession);
replayCachedMessages(meetingId, frontendSession);
return;
}
try {
meetingSession.channel.connect(meetingSession.context);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
removeMeetingSession(meetingId, meetingSession);
log.error("连接上游 ASR websocket 时被中断meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接上游 ASR 服务时被中断");
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接上游服务时被中断"));
} catch (Exception ex) {
removeMeetingSession(meetingId, meetingSession);
log.warn("连接上游 ASR websocket 失败meetingId={}, provider={}, target={}",
meetingId, sessionData.getProvider(), sessionData.getTargetWsUrl(), ex);
sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接上游 ASR 服务失败");
realtimeMeetingAudioStorageService.closeSession(rawSession.getId());
frontendSession.close(CloseStatus.SERVER_ERROR.withReason("连接 ASR WebSocket 失败"));
}
}
private void replayCachedMessages(Long meetingId, ConcurrentWebSocketSessionDecorator frontendSession) {
try {
if (!frontendSession.isOpen()) {
return;
}
for (RealtimeMeetingTranscriptCacheItem item : realtimeMeetingTranscriptCacheService.listOrderedItems(meetingId)) {
frontendSession.sendMessage(new TextMessage(LocalRealtimeAsrChannel.buildFrontendTranscriptMessage(item)));
}
} catch (Exception ex) {
log.warn("回放缓存转写消息失败meetingId={}", meetingId, ex);
}
}
private void sendProxyReady(ConcurrentWebSocketSessionDecorator frontendSession) throws Exception {
if (frontendSession.isOpen()) {
frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}"));
}
}
private void detachFrontend(WebSocketSession session) {
Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
if (meetingIdValue instanceof Long meetingId) {
detachFrontend(meetingId, session.getId());
}
}
private void detachFrontend(Long meetingId, String sessionId) {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) {
return;
}
synchronized (lockForMeeting(meetingId)) {
meetingSession.detachFrontend(sessionId);
}
}
private MeetingChannelSession getMeetingSession(WebSocketSession session) {
Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID);
if (!(meetingIdValue instanceof Long meetingId)) {
return null;
}
return meetingSessions.get(meetingId);
}
void removeMeetingSession(Long meetingId) {
synchronized (lockForMeeting(meetingId)) {
meetingSessions.remove(meetingId);
}
}
void removeMeetingSession(Long meetingId, MeetingChannelSession meetingSession) {
synchronized (lockForMeeting(meetingId)) {
meetingSessions.remove(meetingId, meetingSession);
}
}
private Object lockForMeeting(Long meetingId) {
return meetingLocks.computeIfAbsent(meetingId, ignored -> new Object());
}
private String extractQueryParam(URI uri, String key) {
if (uri == null || uri.getQuery() == null || uri.getQuery().isBlank()) {
return null;
@ -213,13 +290,6 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
.orElse(null);
}
private ByteBuffer copyBuffer(ByteBuffer source) {
ByteBuffer duplicate = source.asReadOnlyBuffer();
byte[] bytes = new byte[duplicate.remaining()];
duplicate.get(bytes);
return ByteBuffer.wrap(bytes);
}
private byte[] toByteArray(ByteBuffer source) {
ByteBuffer duplicate = source.asReadOnlyBuffer();
byte[] bytes = new byte[duplicate.remaining()];
@ -235,21 +305,18 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
return 0;
}
@SuppressWarnings("unchecked")
private void sendUpstreamOrdered(WebSocketSession session, Supplier<CompletableFuture<?>> sendAction, String messageType) {
synchronized (session) {
CompletableFuture<Void> chain = (CompletableFuture<Void>) session.getAttributes()
.getOrDefault(ATTR_UPSTREAM_SEND_CHAIN, COMPLETED);
CompletableFuture<Void> nextChain = chain
.exceptionally(ex -> null)
.thenCompose(ignored -> sendAction.get().thenApply(ignoredResult -> null));
nextChain = nextChain.whenComplete((ignored, ex) -> {
if (ex != null) {
log.error("Ordered upstream send failed: meetingId={}, sessionId={}, type={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), messageType, ex);
}
});
session.getAttributes().put(ATTR_UPSTREAM_SEND_CHAIN, nextChain);
private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) {
try {
if (!frontendSession.isOpen()) {
return;
}
Map<String, Object> payload = new HashMap<>();
payload.put("type", "error");
payload.put("code", code);
payload.put("message", message);
frontendSession.sendMessage(new TextMessage(OBJECT_MAPPER.writeValueAsString(payload)));
} catch (Exception ex) {
log.warn("向前端发送实时代理错误消息失败code={}", code, ex);
}
}
@ -268,237 +335,113 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl
return normalized.substring(0, 240) + "...";
}
private boolean looksLikeStartMessage(String payload) {
if (payload == null || payload.isBlank()) {
return false;
}
String normalized = payload.replaceAll("\\s+", "");
return normalized.contains("\"type\":\"start\"");
}
private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) {
try {
if (!frontendSession.isOpen()) {
return;
}
Map<String, Object> payload = new HashMap<>();
payload.put("type", "error");
payload.put("code", code);
payload.put("message", message);
frontendSession.sendMessage(new TextMessage(new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(payload)));
} catch (Exception ex) {
log.warn("Failed to send realtime proxy error to frontend: code={}", code, ex);
}
}
@SuppressWarnings("unchecked")
private void queuePendingAudioFrame(WebSocketSession session, byte[] payload) {
synchronized (session) {
List<byte[]> pendingFrames = (List<byte[]>) session.getAttributes().get(ATTR_PENDING_AUDIO_FRAMES);
if (pendingFrames == null) {
pendingFrames = new ArrayList<>();
session.getAttributes().put(ATTR_PENDING_AUDIO_FRAMES, pendingFrames);
}
pendingFrames.add(payload);
}
}
@SuppressWarnings("unchecked")
private void flushPendingAudioFrames(WebSocketSession session, java.net.http.WebSocket upstreamSocket) {
List<byte[]> pendingFrames;
synchronized (session) {
pendingFrames = (List<byte[]>) session.getAttributes().get(ATTR_PENDING_AUDIO_FRAMES);
if (pendingFrames == null || pendingFrames.isEmpty()) {
return;
}
session.getAttributes().put(ATTR_PENDING_AUDIO_FRAMES, new ArrayList<byte[]>());
}
log.info("Flushing queued audio frames after start message: meetingId={}, sessionId={}, frameCount={}",
session.getAttributes().get(ATTR_MEETING_ID), session.getId(), pendingFrames.size());
for (byte[] frame : pendingFrames) {
sendUpstreamOrdered(session, () -> upstreamSocket.sendBinary(ByteBuffer.wrap(frame), true), "binary-flush");
}
}
private static final class UpstreamListener implements java.net.http.WebSocket.Listener {
private final ConcurrentWebSocketSessionDecorator frontendSession;
private final WebSocketSession rawSession;
static final class MeetingChannelSession {
private final Long meetingId;
private final String targetWsUrl;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final StringBuilder textBuffer = new StringBuilder();
private final ByteArrayOutputStream binaryBuffer = new ByteArrayOutputStream();
private final AtomicInteger upstreamTextCount = new AtomicInteger();
private final AtomicInteger upstreamBinaryCount = new AtomicInteger();
private final RealtimeAsrChannel channel;
private final RealtimeAsrChannelContext context;
private UpstreamListener(ConcurrentWebSocketSessionDecorator frontendSession, WebSocketSession rawSession,
Long meetingId, String targetWsUrl,
RealtimeMeetingSessionStateService realtimeMeetingSessionStateService) {
this.frontendSession = frontendSession;
this.rawSession = rawSession;
private MeetingChannelSession(Long meetingId, RealtimeAsrChannel channel, RealtimeAsrChannelContext context) {
this.meetingId = meetingId;
this.targetWsUrl = targetWsUrl;
this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService;
this.channel = channel;
this.context = context;
}
private void bindFrontend(WebSocketSession rawSession, ConcurrentWebSocketSessionDecorator frontendSession) {
context.bindFrontendSession(rawSession, frontendSession);
}
private void detachFrontend(String sessionId) {
if (context.getRawSession() != null && context.getRawSession().getId().equals(sessionId)) {
context.bindFrontendSession(null, null);
}
}
private void clearFrontendIfClosed() {
if (context.getRawSession() != null && !context.getRawSession().isOpen()) {
context.bindFrontendSession(null, null);
}
}
private boolean hasOpenFrontend() {
return context.getFrontendSession() != null
&& context.getFrontendSession().isOpen()
&& context.getRawSession() != null
&& context.getRawSession().isOpen();
}
private boolean isChannelOpen() {
return channel != null && channel.isOpen(context);
}
}
private final class HandlerChannelCallback implements RealtimeAsrChannelCallback {
@Override
public void onOpen(java.net.http.WebSocket webSocket) {
log.info("Upstream websocket opened: meetingId={}, sessionId={}, upstream={}",
meetingId, rawSession.getId(), targetWsUrl);
if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) {
sendFrontendError("REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃实时连接,请先关闭旧连接后再继续");
webSocket.sendClose(CloseStatus.POLICY_VIOLATION.getCode(), "Active realtime connection already exists");
closeFrontend(CloseStatus.POLICY_VIOLATION.withReason("已存在活动的实时连接"));
public void onChannelOpen(Long meetingId) throws Exception {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) {
return;
}
try {
if (frontendSession.isOpen()) {
frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}"));
}
} catch (Exception ex) {
log.error("Failed to notify frontend that upstream websocket is ready: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
closeFrontend(CloseStatus.SERVER_ERROR);
ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
if (frontendSession != null && frontendSession.isOpen()) {
sendProxyReady(frontendSession);
}
}
@Override
public void sendFrontendText(Long meetingId, String payload) throws Exception {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) {
return;
}
webSocket.request(1);
ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
if (frontendSession != null && frontendSession.isOpen()) {
frontendSession.sendMessage(new TextMessage(payload));
}
}
@Override
public java.util.concurrent.CompletionStage<?> onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) {
textBuffer.append(data);
if (last) {
int count = upstreamTextCount.incrementAndGet();
try {
if (frontendSession.isOpen()) {
frontendSession.sendMessage(new TextMessage(textBuffer.toString()));
}
log.info("Upstream text -> frontend: meetingId={}, sessionId={}, count={}, payload={}",
meetingId, rawSession.getId(), count, summarizeText(textBuffer.toString()));
} catch (Exception ex) {
log.error("Failed to forward upstream text: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
closeFrontend(CloseStatus.SERVER_ERROR);
} finally {
textBuffer.setLength(0);
}
}
webSocket.request(1);
return COMPLETED;
public void sendFrontendBinary(Long meetingId, byte[] payload) throws Exception {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) {
return;
}
ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
if (frontendSession != null && frontendSession.isOpen()) {
frontendSession.sendMessage(new BinaryMessage(payload));
}
}
@Override
public java.util.concurrent.CompletionStage<?> onBinary(java.net.http.WebSocket webSocket, ByteBuffer data, boolean last) {
byte[] chunk = new byte[data.remaining()];
data.get(chunk);
binaryBuffer.writeBytes(chunk);
if (last) {
int count = upstreamBinaryCount.incrementAndGet();
try {
if (frontendSession.isOpen()) {
frontendSession.sendMessage(new BinaryMessage(binaryBuffer.toByteArray()));
}
if (shouldLogBinaryFrame(count)) {
log.info("Upstream binary -> frontend: meetingId={}, sessionId={}, count={}, bytes={}",
meetingId, rawSession.getId(), count, binaryBuffer.size());
}
} catch (Exception ex) {
log.error("Failed to forward upstream binary: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
closeFrontend(CloseStatus.SERVER_ERROR);
} finally {
binaryBuffer.reset();
}
}
webSocket.request(1);
return COMPLETED;
public void sendFrontendError(Long meetingId, String code, String message) {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) {
return;
}
ConcurrentWebSocketSessionDecorator frontendSession = meetingSession.context.getFrontendSession();
if (frontendSession != null) {
RealtimeMeetingProxyWebSocketHandler.this.sendFrontendError(frontendSession, code, message);
}
}
@Override
public java.util.concurrent.CompletionStage<?> onPing(java.net.http.WebSocket webSocket, ByteBuffer message) {
public void removeMeetingSession(Long meetingId) {
RealtimeMeetingProxyWebSocketHandler.this.removeMeetingSession(meetingId);
}
@Override
public void closeFrontend(Long meetingId, CloseStatus status) {
MeetingChannelSession meetingSession = meetingSessions.get(meetingId);
if (meetingSession == null) {
return;
}
try {
if (frontendSession.isOpen()) {
frontendSession.sendMessage(new PingMessage(copyBuffer(message)));
}
log.info("Upstream ping -> frontend: meetingId={}, sessionId={}, bytes={}",
meetingId, rawSession.getId(), message.remaining());
} catch (Exception ex) {
log.error("Failed to forward upstream ping: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
closeFrontend(CloseStatus.SERVER_ERROR);
}
webSocket.request(1);
return COMPLETED;
}
@Override
public java.util.concurrent.CompletionStage<?> onPong(java.net.http.WebSocket webSocket, ByteBuffer message) {
try {
if (frontendSession.isOpen()) {
frontendSession.sendMessage(new PongMessage(copyBuffer(message)));
}
log.info("Upstream pong -> frontend: meetingId={}, sessionId={}, bytes={}",
meetingId, rawSession.getId(), message.remaining());
} catch (Exception ex) {
log.error("Failed to forward upstream pong: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex);
closeFrontend(CloseStatus.SERVER_ERROR);
}
webSocket.request(1);
return COMPLETED;
}
@Override
public java.util.concurrent.CompletionStage<?> onClose(java.net.http.WebSocket webSocket, int statusCode, String reason) {
log.info("Upstream websocket closed: meetingId={}, sessionId={}, code={}, reason={}",
meetingId, rawSession.getId(), statusCode, reason);
sendFrontendError("REALTIME_UPSTREAM_CLOSED", reason == null || reason.isBlank() ? "第三方识别服务已断开连接" : "第三方识别服务已断开: " + reason);
closeFrontend(new CloseStatus(statusCode, reason));
return COMPLETED;
}
@Override
public void onError(java.net.http.WebSocket webSocket, Throwable error) {
log.error("Upstream websocket error: meetingId={}, sessionId={}, upstream={}",
meetingId, rawSession.getId(), targetWsUrl, error);
sendFrontendError("REALTIME_UPSTREAM_ERROR", error == null || error.getMessage() == null || error.getMessage().isBlank()
? "第三方识别服务连接异常"
: "第三方识别服务连接异常: " + error.getMessage());
closeFrontend(CloseStatus.SERVER_ERROR);
}
private void sendFrontendError(String code, String message) {
try {
if (!frontendSession.isOpen()) {
return;
}
frontendSession.sendMessage(new TextMessage("{\"type\":\"error\",\"code\":\"" + code + "\",\"message\":\"" + escapeJson(message) + "\"}"));
} catch (Exception ex) {
log.warn("Failed to send upstream error to frontend: meetingId={}, sessionId={}, code={}", meetingId, rawSession.getId(), code, ex);
}
}
private String escapeJson(String value) {
if (value == null) {
return "";
}
return value
.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\r", "\\r")
.replace("\n", "\\n");
}
private void closeFrontend(CloseStatus status) {
try {
if (rawSession.isOpen()) {
WebSocketSession rawSession = meetingSession.context.getRawSession();
if (rawSession != null && rawSession.isOpen()) {
rawSession.close(status);
}
} catch (Exception ignored) {
// ignore close failure
}
}
private ByteBuffer copyBuffer(ByteBuffer source) {
ByteBuffer duplicate = source.asReadOnlyBuffer();
byte[] bytes = new byte[duplicate.remaining()];
duplicate.get(bytes);
return ByteBuffer.wrap(bytes);
}
}
}

View File

@ -249,12 +249,6 @@ export const createRealtimeMeeting = (data: CreateRealtimeMeetingCommand) => {
);
};
export const appendRealtimeTranscripts = (meetingId: number, data: RealtimeTranscriptItemDTO[]) => {
return http.post<{ code: string; data: boolean; msg: string }>(
`/api/biz/meeting/${meetingId}/realtime/transcripts`,
data
);
};
export const getRealtimeMeetingSessionStatus = (meetingId: number) => {
return http.get<{ code: string; data: RealtimeMeetingSessionStatus; msg: string }>(

View File

@ -18,7 +18,6 @@ import dayjs from "dayjs";
import PageHeader from "../../components/shared/PageHeader";
import PageContainer from "@/components/shared/PageContainer";
import {
appendRealtimeTranscripts,
completeRealtimeMeeting,
getMeetingDetail,
getRealtimeMeetingSessionStatus,
@ -28,7 +27,6 @@ import {
type MeetingTranscriptVO,
type MeetingVO,
type RealtimeMeetingSessionStatus,
type RealtimeTranscriptItemDTO,
type RealtimeSocketSessionVO,
} from "../../api/business/meeting";
const { Text, Title } = Typography;
@ -44,6 +42,7 @@ type WsMessage = {
data?: {
text?: string;
is_final?: boolean;
sentence_id?: number;
start?: number;
end?: number;
speaker_id?: string;
@ -58,6 +57,7 @@ type WsMessage = {
type TranscriptCard = {
id: string;
sentenceId?: number;
speakerName: string;
userId?: string | number;
text: string;
@ -66,6 +66,15 @@ type TranscriptCard = {
final: boolean;
};
type NormalizedWsMessage = {
text: string;
isFinal: boolean;
sentenceId?: number;
speaker?: WsSpeaker;
startTime?: number;
endTime?: number;
};
type RealtimeMeetingSessionDraft = {
meetingId: number;
meetingTitle: string;
@ -123,12 +132,18 @@ function resolveSpeaker(speaker?: WsSpeaker) {
return { speakerId: "spk_0", speakerName: "Unknown", userId: undefined };
}
if (typeof speaker === "string") {
return { speakerId: speaker, speakerName: speaker, userId: undefined };
const normalized = speaker.trim();
if (!normalized || normalized === "-1") {
return {speakerId: "spk_0", speakerName: "Unknown", userId: undefined};
}
return {speakerId: normalized, speakerName: normalized, userId: undefined};
}
const rawUserId = speaker.user_id === null || speaker.user_id === undefined ? undefined : String(speaker.user_id).trim();
const speakerId = rawUserId && rawUserId !== "-1" ? rawUserId : "spk_0";
return {
speakerId: speaker.user_id ? String(speaker.user_id) : "spk_0",
speakerName: speaker.name || (speaker.user_id ? String(speaker.user_id) : "Unknown"),
userId: speaker.user_id,
speakerId,
speakerName: speaker.name?.trim() || (speakerId !== "spk_0" ? speakerId : "Unknown"),
userId: speakerId === "spk_0" ? undefined : rawUserId,
};
}
@ -159,17 +174,25 @@ function toMs(value?: number) {
return Math.round(value * 1000);
}
function buildTranscriptCardId(sentenceId?: number) {
if (sentenceId === undefined || sentenceId === null) {
return `live-${Date.now()}-${Math.random()}`;
}
return `sentence-${sentenceId}`;
}
function buildRealtimeProxyWsUrl(socketSession: RealtimeSocketSessionVO) {
const protocol = window.location.protocol === "https:" ? "wss" : "ws";
return `${protocol}://${window.location.host}${socketSession.path}?sessionToken=${encodeURIComponent(socketSession.sessionToken)}`;
}
function normalizeWsMessage(payload: WsMessage) {
function normalizeWsMessage(payload: WsMessage): NormalizedWsMessage | null {
if (payload.type === "partial" || payload.type === "segment") {
const data = payload.data || {};
return {
text: data.text || "",
isFinal: payload.type === "segment" || !!data.is_final,
sentenceId: data.sentence_id,
speaker: {
name: data.speaker_name,
user_id: data.user_id ?? data.speaker_id,
@ -212,7 +235,6 @@ export function RealtimeAsrSession() {
const [audioLevel, setAudioLevel] = useState(0);
const [elapsedSeconds, setElapsedSeconds] = useState(0);
const [sessionStatus, setSessionStatus] = useState<RealtimeMeetingSessionStatus | null>(null);
const transcriptRef = useRef<HTMLDivElement | null>(null);
const wsRef = useRef<WebSocket | null>(null);
const audioContextRef = useRef<AudioContext | null>(null);
@ -223,6 +245,7 @@ export function RealtimeAsrSession() {
const completeOnceRef = useRef(false);
const startedAtRef = useRef<number | null>(null);
const sessionStartedRef = useRef(false);
const elapsedOffsetRef = useRef(0);
const finalTranscriptCount = transcripts.length;
const totalTranscriptChars = useMemo(
@ -281,7 +304,7 @@ export function RealtimeAsrSession() {
}
setTranscripts(
(transcriptRes.data.data || []).map((item: MeetingTranscriptVO) => ({
id: String(item.id),
id: `persisted-${item.id}`,
speakerName: item.speakerName || item.speakerId || "发言人",
text: item.content,
startTime: item.startTime,
@ -301,12 +324,12 @@ export function RealtimeAsrSession() {
useEffect(() => {
if (!recording) {
setElapsedSeconds(0);
setElapsedSeconds(elapsedOffsetRef.current);
return;
}
const timer = window.setInterval(() => {
if (startedAtRef.current) {
setElapsedSeconds(Math.floor((Date.now() - startedAtRef.current) / 1000));
setElapsedSeconds(elapsedOffsetRef.current + Math.floor((Date.now() - startedAtRef.current) / 1000));
}
}, 1000);
return () => window.clearInterval(timer);
@ -326,7 +349,7 @@ export function RealtimeAsrSession() {
}
const token = localStorage.getItem("accessToken");
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify({ is_speaking: false }));
wsRef.current.close();
}
fetch(`/api/biz/meeting/${meetingId}/realtime/pause`, {
method: "POST",
@ -360,6 +383,43 @@ export function RealtimeAsrSession() {
setAudioLevel(0);
};
const closeFrontendSocket = async (sendStop: boolean) => {
const socket = wsRef.current;
if (sendStop && socket?.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({type: "stop"}));
await new Promise((resolve) => window.setTimeout(resolve, 150));
}
socket?.close();
wsRef.current = null;
sessionStartedRef.current = false;
};
const upsertTranscriptCard = (normalized: NormalizedWsMessage, speaker: ReturnType<typeof resolveSpeaker>) => {
setTranscripts((prev) => {
const next = [...prev];
const cardId = buildTranscriptCardId(normalized.sentenceId);
const nextCard: TranscriptCard = {
id: cardId,
sentenceId: normalized.sentenceId,
speakerName: speaker.speakerName,
userId: speaker.userId,
text: normalized.text,
startTime: normalized.startTime,
endTime: normalized.endTime,
final: true,
};
if (normalized.sentenceId !== undefined && normalized.sentenceId !== null) {
const index = next.findIndex((item) => item.id === cardId);
if (index >= 0) {
next[index] = {...next[index], ...nextCard};
return next;
}
}
next.push(nextCard);
return next;
});
};
const handleFatalRealtimeError = async (errorMessage: string) => {
setConnecting(false);
setRecording(false);
@ -369,6 +429,8 @@ export function RealtimeAsrSession() {
wsRef.current = null;
await shutdownAudioPipeline();
startedAtRef.current = null;
elapsedOffsetRef.current = 0;
setElapsedSeconds(0);
message.error(errorMessage);
};
@ -419,26 +481,6 @@ export function RealtimeAsrSession() {
processor.connect(audioContext.destination);
};
const saveFinalTranscript = async (normalized: {
text: string;
speaker?: WsSpeaker;
startTime?: number;
endTime?: number;
}) => {
if (!normalized.text || !meetingId) {
return;
}
const speaker = resolveSpeaker(normalized.speaker);
const item: RealtimeTranscriptItemDTO = {
speakerId: speaker.speakerId,
speakerName: speaker.speakerName,
content: normalized.text,
startTime: normalized.startTime,
endTime: normalized.endTime,
};
await appendRealtimeTranscripts(meetingId, [item]);
};
const handlePause = async () => {
if (!meetingId || pausing || finishing || (!recording && !connecting)) {
return;
@ -451,20 +493,19 @@ export function RealtimeAsrSession() {
setPausing(true);
setStatusText("暂停识别中...");
try {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify({ is_speaking: false }));
if (recording && startedAtRef.current) {
elapsedOffsetRef.current += Math.floor((Date.now() - startedAtRef.current) / 1000);
}
wsRef.current?.close();
wsRef.current = null;
sessionStartedRef.current = false;
await closeFrontendSocket(false);
await shutdownAudioPipeline();
const pauseRes = await pauseRealtimeMeeting(meetingId);
setSessionStatus(pauseRes.data.data);
setRecording(false);
setConnecting(false);
startedAtRef.current = null;
setStatusText(pauseRes.data.data?.hasTranscript ? "已暂停,可继续识别" : "已暂停,当前还没有转录内容");
message.success("实时识别已暂停");
setElapsedSeconds(elapsedOffsetRef.current);
setStatusText("已暂停,可继续会议并等待说话人修正");
message.success("实时会议已暂停");
} catch (error) {
setStatusText("暂停失败");
message.error(error instanceof Error ? error.message : "暂停实时识别失败");
@ -547,21 +588,9 @@ export function RealtimeAsrSession() {
const speaker = resolveSpeaker(normalized.speaker);
if (normalized.isFinal) {
setTranscripts((prev) => [
...prev,
{
id: `${Date.now()}-${Math.random()}`,
speakerName: speaker.speakerName,
userId: speaker.userId,
text: normalized.text,
startTime: normalized.startTime,
endTime: normalized.endTime,
final: true,
},
]);
upsertTranscriptCard(normalized, speaker);
setStreamingText("");
setStreamingSpeaker("Unknown");
void saveFinalTranscript(normalized);
} else {
setStreamingText(normalized.text);
setStreamingSpeaker(speaker.speakerName);
@ -602,12 +631,7 @@ export function RealtimeAsrSession() {
setFinishing(true);
setStatusText("结束会议中...");
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify({ is_speaking: false }));
}
wsRef.current?.close();
wsRef.current = null;
sessionStartedRef.current = false;
await closeFrontendSocket(true);
await shutdownAudioPipeline();