refactor: 移除 Android gRPC 服务和相关测试代码

- 删除 `AndroidGatewayGrpcService` 及其依赖的 DTO 和接口
- 删除 `AndroidRealtimeGrpcManualTest` 手动测试类
- 删除与 Android 实时会议 gRPC 会话相关的数据传输对象和业务逻辑类
dev_na
chenhao 2026-04-30 16:47:30 +08:00
parent 4904526e09
commit f20be9bcc6
40 changed files with 1143 additions and 2368 deletions

View File

@ -91,22 +91,6 @@ public final class RedisKeys {
return "biz:android:device:topics:" + deviceId;
}
public static String androidDeviceOutboxKey(String deviceId) {
return "biz:android:device:outbox:" + deviceId;
}
public static String androidDeviceMessageSeqKey(String deviceId) {
return "biz:android:device:message-seq:" + deviceId;
}
public static String realtimeMeetingGrpcSessionKey(String streamToken) {
return "biz:meeting:realtime:grpc-session:" + streamToken;
}
public static String realtimeMeetingGrpcConnectionKey(String connectionId) {
return "biz:meeting:realtime:grpc-conn:" + connectionId;
}
public static String realtimeMeetingEventSeqKey(Long meetingId) {
return "biz:meeting:realtime:event-seq:" + meetingId;
}

View File

@ -5,13 +5,11 @@ import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidCreateRealtimeMeetingCommand;
import com.imeeting.dto.android.AndroidCreateRealtimeMeetingVO;
import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO;
import com.imeeting.dto.biz.CreateRealtimeMeetingCommand;
import com.imeeting.dto.biz.MeetingTranscriptVO;
import com.imeeting.dto.biz.RealtimeMeetingRuntimeProfile;
import com.imeeting.dto.biz.MeetingVO;
import com.imeeting.dto.biz.RealtimeMeetingCompleteDTO;
import com.imeeting.dto.biz.RealtimeMeetingRuntimeProfile;
import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO;
import com.imeeting.entity.biz.Meeting;
import com.imeeting.service.android.AndroidAuthService;
@ -21,12 +19,11 @@ import com.imeeting.service.biz.MeetingCommandService;
import com.imeeting.service.biz.MeetingQueryService;
import com.imeeting.service.biz.MeetingRuntimeProfileResolver;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService;
import com.unisbase.common.ApiResponse;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
@ -56,15 +53,14 @@ public class AndroidMeetingRealtimeController {
private final MeetingQueryService meetingQueryService;
private final MeetingCommandService meetingCommandService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final AndroidRealtimeSessionTicketService androidRealtimeSessionTicketService;
private final MeetingRuntimeProfileResolver meetingRuntimeProfileResolver;
private final GrpcServerProperties grpcServerProperties;
@Operation(summary = "创建Android实时会议")
@Operation(summary = "创建 Android 实时会议")
@ApiResponses({
@io.swagger.v3.oas.annotations.responses.ApiResponse(
responseCode = "200",
description = "返回实时会议创建结果与本次生效的运行时参数",
description = "返回实时会议创建结果和当前生效的运行时参数",
content = @Content(schema = @Schema(implementation = AndroidCreateRealtimeMeetingVO.class))
)
})
@ -117,11 +113,11 @@ public class AndroidMeetingRealtimeController {
return ApiResponse.ok(vo);
}
@Operation(summary = "查询Android实时会议状态")
@Operation(summary = "查询 Android 实时会议状态")
@ApiResponses({
@io.swagger.v3.oas.annotations.responses.ApiResponse(
responseCode = "200",
description = "返回实时会议当前状态、恢复信息和连接状态",
description = "返回实时会议当前状态和恢复信息",
content = @Content(schema = @Schema(implementation = RealtimeMeetingSessionStatusVO.class))
)
})
@ -133,7 +129,7 @@ public class AndroidMeetingRealtimeController {
return ApiResponse.ok(realtimeMeetingSessionStateService.getStatus(id));
}
@Operation(summary = "查询Android会议转写")
@Operation(summary = "查询 Android 会议转写")
@ApiResponses({
@io.swagger.v3.oas.annotations.responses.ApiResponse(
responseCode = "200",
@ -149,7 +145,7 @@ public class AndroidMeetingRealtimeController {
return ApiResponse.ok(meetingQueryService.getTranscripts(id));
}
@Operation(summary = "暂停Android实时会议")
@Operation(summary = "暂停 Android 实时会议")
@ApiResponses({
@io.swagger.v3.oas.annotations.responses.ApiResponse(
responseCode = "200",
@ -165,7 +161,7 @@ public class AndroidMeetingRealtimeController {
return ApiResponse.ok(realtimeMeetingSessionStateService.pause(id));
}
@Operation(summary = "完成Android实时会议")
@Operation(summary = "完成 Android 实时会议")
@ApiResponses({
@io.swagger.v3.oas.annotations.responses.ApiResponse(
responseCode = "200",
@ -188,24 +184,6 @@ public class AndroidMeetingRealtimeController {
return ApiResponse.ok(true);
}
@Operation(summary = "打开Android实时会议gRPC会话")
@ApiResponses({
@io.swagger.v3.oas.annotations.responses.ApiResponse(
responseCode = "200",
description = "返回实时会议 gRPC 会话信息,包括连接参数和状态",
content = @Content(schema = @Schema(implementation = AndroidRealtimeGrpcSessionVO.class))
)
})
@PostMapping("/{id}/realtime/grpc-session")
public ApiResponse<AndroidRealtimeGrpcSessionVO> openRealtimeGrpcSession(@PathVariable Long id,
HttpServletRequest request,
@RequestBody(required = false) AndroidOpenRealtimeGrpcSessionCommand command) {
AndroidAuthContext authContext = androidAuthService.authenticateHttp(request);
Meeting meeting = meetingAccessService.requireMeeting(id);
meetingAuthorizationService.assertCanControlRealtimeMeeting(meeting, authContext, MeetingConstants.SOURCE_ANDROID);
return ApiResponse.ok(androidRealtimeSessionTicketService.createSession(id, command, authContext));
}
private CreateRealtimeMeetingCommand buildCreateCommand(AndroidCreateRealtimeMeetingCommand command,
AndroidAuthContext authContext,
RealtimeMeetingRuntimeProfile runtimeProfile) {

View File

@ -0,0 +1,62 @@
package com.imeeting.controller.biz;
import com.imeeting.dto.biz.DeviceAdminUpdateCommand;
import com.imeeting.dto.biz.DeviceOnlineAdminVO;
import com.imeeting.service.biz.DeviceOnlineManagementService;
import com.unisbase.common.ApiResponse;
import com.unisbase.security.LoginUser;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@Tag(name = "设备在线管理")
@RestController
@RequestMapping("/api/admin/devices")
@RequiredArgsConstructor
public class DeviceManagementController {
private final DeviceOnlineManagementService deviceOnlineManagementService;
@Operation(summary = "查询设备在线管理列表")
@ApiResponses({
@io.swagger.v3.oas.annotations.responses.ApiResponse(
responseCode = "200",
description = "返回设备在线管理列表",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = DeviceOnlineAdminVO.class)))
)
})
@GetMapping
public ApiResponse<List<DeviceOnlineAdminVO>> list() {
return ApiResponse.ok(deviceOnlineManagementService.listForAdmin(currentLoginUser()));
}
@Operation(summary = "更新设备管理信息")
@PutMapping("/{id}")
public ApiResponse<DeviceOnlineAdminVO> update(@PathVariable Long id, @RequestBody DeviceAdminUpdateCommand command) {
return ApiResponse.ok(deviceOnlineManagementService.update(id, command, currentLoginUser()));
}
@Operation(summary = "踢下线设备")
@PostMapping("/{id}/kick")
public ApiResponse<Boolean> kick(@PathVariable Long id) {
return ApiResponse.ok(deviceOnlineManagementService.kick(id, currentLoginUser()));
}
private LoginUser currentLoginUser() {
return (LoginUser) SecurityContextHolder.getContext().getAuthentication().getPrincipal();
}
}

View File

@ -1,19 +0,0 @@
package com.imeeting.dto.android;
import lombok.Data;
import java.util.List;
import java.util.Map;
@Data
public class AndroidOpenRealtimeGrpcSessionCommand {
private Long asrModelId;
private String mode;
private String language;
private Integer useSpkId;
private Boolean enablePunctuation;
private Boolean enableItn;
private Boolean enableTextRefine;
private Boolean saveAudio;
private List<Map<String, Object>> hotwords;
}

View File

@ -1,16 +0,0 @@
package com.imeeting.dto.android;
import com.imeeting.dto.biz.RealtimeMeetingResumeConfig;
import lombok.Data;
@Data
public class AndroidRealtimeGrpcSessionData {
private Long meetingId;
private Long tenantId;
private Long userId;
private String deviceId;
private Long asrModelId;
private String targetWsUrl;
private String startMessageJson;
private RealtimeMeetingResumeConfig resumeConfig;
}

View File

@ -1,27 +0,0 @@
package com.imeeting.dto.android;
import com.imeeting.dto.biz.RealtimeMeetingResumeConfig;
import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@Schema(description = "Android 实时会议 gRPC 会话信息")
@Data
public class AndroidRealtimeGrpcSessionVO {
@Schema(description = "会议 ID")
private Long meetingId;
@Schema(description = "实时流会话令牌")
private String streamToken;
@Schema(description = "令牌剩余有效秒数")
private Long expiresInSeconds;
@Schema(description = "实时音频采样率")
private Integer sampleRate;
@Schema(description = "音频通道数")
private Integer channels;
@Schema(description = "音频编码格式")
private String encoding;
@Schema(description = "恢复会议时使用的运行时参数")
private RealtimeMeetingResumeConfig resumeConfig;
@Schema(description = "当前实时会议状态")
private RealtimeMeetingSessionStatusVO status;
}

View File

@ -0,0 +1,15 @@
package com.imeeting.dto.biz;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@Data
@Schema(description = "设备管理更新请求")
public class DeviceAdminUpdateCommand {
@Schema(description = "设备名称")
private String deviceName;
@Schema(description = "状态1启用0停用")
private Integer status;
}

View File

@ -0,0 +1,53 @@
package com.imeeting.dto.biz;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@Schema(description = "设备在线管理视图")
public class DeviceOnlineAdminVO {
@Schema(description = "设备ID")
private Long deviceId;
@Schema(description = "绑定帐号用户ID")
private Long userId;
@Schema(description = "绑定帐号用户名")
private String username;
@Schema(description = "绑定帐号显示名")
private String displayName;
@Schema(description = "设备编码,对应 Android deviceId")
private String deviceCode;
@Schema(description = "设备名称")
private String deviceName;
@Schema(description = "终端类型")
private String terminalType;
@Schema(description = "终端版本")
private String terminalVersion;
@Schema(description = "是否在线")
private Boolean online;
@Schema(description = "最后一次在线时间")
private LocalDateTime lastOnlineAt;
@Schema(description = "状态1启用0停用")
private Integer status;
@Schema(description = "创建时间")
private LocalDateTime createdAt;
@Schema(description = "更新时间")
private LocalDateTime updatedAt;
@Schema(description = "租户ID")
private Long tenantId;
}

View File

@ -0,0 +1,31 @@
package com.imeeting.entity.biz;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.unisbase.entity.BaseEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
@Data
@EqualsAndHashCode(callSuper = true)
@TableName("biz_device_info")
public class DeviceInfoEntity extends BaseEntity {
@TableId(value = "device_id", type = IdType.AUTO)
private Long deviceId;
private Long userId;
private String deviceCode;
private String deviceName;
private String terminalType;
private String terminalVersion;
private LocalDateTime lastOnlineAt;
}

View File

@ -1,116 +0,0 @@
package com.imeeting.grpc.gateway;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidDeviceSessionState;
import com.imeeting.grpc.common.ErrorEvent;
import com.imeeting.service.android.AndroidAuthService;
import com.imeeting.service.android.AndroidDeviceSessionService;
import com.imeeting.service.android.AndroidGatewayPushService;
import io.grpc.BindableService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class AndroidGatewayGrpcService extends AndroidGatewayServiceGrpc.AndroidGatewayServiceImplBase implements BindableService {
private final AndroidAuthService androidAuthService;
private final AndroidDeviceSessionService androidDeviceSessionService;
private final AndroidGatewayPushService androidGatewayPushService;
private final GrpcServerProperties grpcServerProperties;
@Override
public StreamObserver<GatewayClientPacket> connect(StreamObserver<GatewayServerPacket> responseObserver) {
return new StreamObserver<>() {
private String connectionId;
private String deviceId;
private AndroidAuthContext authContext;
@Override
public void onNext(GatewayClientPacket packet) {
try {
switch (packet.getBodyCase()) {
case HELLO -> handleHello(packet);
case HEARTBEAT -> handleHeartbeat(packet.getHeartbeat());
case SUBSCRIBE -> handleSubscribe(packet.getSubscribe());
case ACK, BODY_NOT_SET -> {
}
}
} catch (Exception ex) {
sendError(responseObserver, ex.getMessage());
}
}
@Override
public void onError(Throwable t) {
cleanup();
}
@Override
public void onCompleted() {
cleanup();
responseObserver.onCompleted();
}
private void handleHello(GatewayClientPacket packet) {
authContext = androidAuthService.authenticateGrpc(packet.getAuth(), packet.getHello().getDeviceId());
AndroidDeviceSessionState sessionState = androidDeviceSessionService.openSession(authContext, packet.getHello());
connectionId = sessionState.getConnectionId();
deviceId = sessionState.getDeviceId();
androidGatewayPushService.register(connectionId, deviceId, responseObserver);
responseObserver.onNext(GatewayServerPacket.newBuilder()
.setRequestId(packet.getRequestId())
.setHelloAck(HelloAck.newBuilder()
.setConnectionId(connectionId)
.setAuthMode(authContext.getAuthMode())
.setServerTime(System.currentTimeMillis())
.setHeartbeatIntervalSeconds(grpcServerProperties.getGateway().getHeartbeatIntervalSeconds())
.build())
.build());
}
private void handleHeartbeat(Heartbeat heartbeat) {
if (heartbeat == null || connectionId == null) {
return;
}
AndroidDeviceSessionState state = androidDeviceSessionService.refreshHeartbeat(connectionId, heartbeat.getClientTime());
if (state == null) {
sendError(responseObserver, "未找到安卓设备会话");
return;
}
responseObserver.onNext(GatewayServerPacket.newBuilder()
.setPong(Pong.newBuilder()
.setConnectionId(connectionId)
.setServerTime(System.currentTimeMillis())
.build())
.build());
}
private void handleSubscribe(Subscribe subscribe) {
if (deviceId == null) {
return;
}
androidDeviceSessionService.updateTopics(deviceId, subscribe.getTopicsList());
}
private void cleanup() {
if (connectionId != null) {
androidGatewayPushService.unregister(connectionId);
androidDeviceSessionService.closeSession(connectionId);
}
}
};
}
private void sendError(StreamObserver<GatewayServerPacket> responseObserver, String message) {
responseObserver.onNext(GatewayServerPacket.newBuilder()
.setError(ErrorEvent.newBuilder()
.setCode("ANDROID_GATEWAY_ERROR")
.setMessage(message == null ? "网关处理失败" : message)
.setRetryable(false)
.build())
.build());
}
}

View File

@ -0,0 +1,177 @@
package com.imeeting.grpc.push;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidDeviceSessionState;
import com.imeeting.service.android.AndroidAuthService;
import com.imeeting.service.android.AndroidDeviceSessionService;
import com.imeeting.service.android.AndroidGatewayPushService;
import com.imeeting.service.biz.DeviceOnlineManagementService;
import com.unisbase.common.exception.BusinessException;
import io.grpc.BindableService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class AndroidPushGrpcService extends PushServiceGrpc.PushServiceImplBase implements BindableService {
private final AndroidAuthService androidAuthService;
private final AndroidDeviceSessionService androidDeviceSessionService;
private final AndroidGatewayPushService androidGatewayPushService;
private final DeviceOnlineManagementService deviceOnlineManagementService;
@Override
public StreamObserver<ClientMessage> communicate(StreamObserver<ServerMessage> responseObserver) {
return new StreamObserver<>() {
private String connectionId;
private String deviceId;
private boolean connected;
@Override
public void onNext(ClientMessage message) {
try {
switch (message.getPayloadCase()) {
case CONNECT -> handleConnect(message.getConnect());
case HEARTBEAT -> handleHeartbeat(message.getHeartbeat());
case ACK -> handleAck(message.getAck());
case PAYLOAD_NOT_SET -> sendError(responseObserver, "PUSH_BAD_REQUEST", "Missing push payload", false);
}
} catch (BusinessException ex) {
log.warn("Android push gRPC business rejection, connectionId={}", connectionId, ex);
sendError(responseObserver, ex.getCode(), ex.getMessage(), false);
} catch (Exception ex) {
log.warn("Android push gRPC request handling failed, connectionId={}", connectionId, ex);
sendError(responseObserver, "PUSH_PROCESSING_ERROR", ex.getMessage(), false);
}
}
@Override
public void onError(Throwable throwable) {
log.warn("Android push gRPC stream failed, connectionId={}", connectionId, throwable);
cleanup();
}
@Override
public void onCompleted() {
cleanup();
responseObserver.onCompleted();
}
private void handleConnect(ConnectRequest request) {
if (connected) {
sendError(responseObserver, "PUSH_ALREADY_CONNECTED", "Push connection already established", false);
return;
}
AndroidAuthContext authContext = androidAuthService.authenticateGrpc(
request.getDeviceId(),
request.getAppVersion(),
resolvePlatform(request.getPlatform())
);
authContext.setUserId(parseNullableLong(request.getUserId(), "user_id"));
authContext.setTenantId(parseNullableLong(request.getTenantId(), "tenant_id"));
AndroidDeviceSessionState sessionState = androidDeviceSessionService.openSession(authContext, request.getConnectionId());
connectionId = sessionState.getConnectionId();
deviceId = sessionState.getDeviceId();
deviceOnlineManagementService.recordConnected(authContext);
connected = true;
String replacedConnectionId = androidGatewayPushService.register(connectionId, deviceId, responseObserver);
if (replacedConnectionId != null && !replacedConnectionId.equals(connectionId)) {
androidDeviceSessionService.closeSession(replacedConnectionId);
}
responseObserver.onNext(ServerMessage.newBuilder()
.setConnectAck(ConnectResponse.newBuilder()
.setSuccess(true)
.setMessage(connectionId)
.build())
.build());
}
private void handleHeartbeat(HeartbeatRequest request) {
if (!validateConnected()) {
return;
}
if (!request.getConnectionId().isBlank() && !request.getConnectionId().equals(connectionId)) {
sendError(responseObserver, "PUSH_CONNECTION_MISMATCH", "Connection id does not match active session", false);
return;
}
if (!request.getDeviceId().isBlank() && !request.getDeviceId().equals(deviceId)) {
sendError(responseObserver, "PUSH_DEVICE_MISMATCH", "Device id does not match active session", false);
return;
}
AndroidDeviceSessionState state = androidDeviceSessionService.refreshHeartbeat(connectionId, request.getTimestamp());
responseObserver.onNext(ServerMessage.newBuilder()
.setHeartbeat(HeartbeatResponse.newBuilder()
.setTimestamp(System.currentTimeMillis())
.setOk(state != null)
.build())
.build());
}
private void handleAck(AckRequest request) {
if (!validateConnected()) {
return;
}
if (!request.getConnectionId().isBlank() && !request.getConnectionId().equals(connectionId)) {
sendError(responseObserver, "PUSH_CONNECTION_MISMATCH", "Connection id does not match active session", false);
return;
}
if (!request.getDeviceId().isBlank() && !request.getDeviceId().equals(deviceId)) {
sendError(responseObserver, "PUSH_DEVICE_MISMATCH", "Device id does not match active session", false);
}
}
private boolean validateConnected() {
if (connected) {
return true;
}
sendError(responseObserver, "PUSH_NOT_CONNECTED", "Push connection has not been established", false);
return false;
}
private void cleanup() {
if (connectionId == null) {
return;
}
AndroidDeviceSessionState state = androidDeviceSessionService.getByConnectionId(connectionId);
androidGatewayPushService.unregister(connectionId);
androidDeviceSessionService.closeSession(connectionId);
deviceOnlineManagementService.recordDisconnected(deviceId, state == null ? null : state.getLastSeenAt());
connectionId = null;
deviceId = null;
connected = false;
}
};
}
private void sendError(StreamObserver<ServerMessage> responseObserver, String code, String message, boolean retryable) {
responseObserver.onNext(ServerMessage.newBuilder()
.setError(ErrorEvent.newBuilder()
.setCode(code)
.setMessage(message == null || message.isBlank() ? "Push request failed" : message)
.setRetryable(retryable)
.build())
.build());
}
private String resolvePlatform(Platform platform) {
return switch (platform) {
case IOS -> "ios";
case ANDROID -> "android";
case PLATFORM_UNKNOWN, UNRECOGNIZED -> "android";
};
}
private Long parseNullableLong(String value, String fieldName) {
if (value == null || value.isBlank()) {
return null;
}
try {
return Long.parseLong(value.trim());
} catch (NumberFormatException ex) {
throw new RuntimeException("Invalid " + fieldName);
}
}
}

View File

@ -1,139 +0,0 @@
package com.imeeting.grpc.realtime;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.grpc.common.ErrorEvent;
import com.imeeting.service.android.AndroidAuthService;
import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService;
import io.grpc.BindableService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.RealtimeMeetingServiceImplBase implements BindableService {
private final AndroidAuthService androidAuthService;
private final RealtimeMeetingGrpcSessionService realtimeMeetingGrpcSessionService;
@Override
public StreamObserver<RealtimeClientPacket> streamMeetingAudio(StreamObserver<RealtimeServerPacket> responseObserver) {
return new StreamObserver<>() {
private String connectionId;
private AndroidAuthContext authContext;
private boolean completed;
@Override
public void onNext(RealtimeClientPacket packet) {
if (completed) {
return;
}
try {
switch (packet.getBodyCase()) {
case OPEN -> handleOpen(packet);
case AUDIO -> handleAudio(packet.getAudio());
case CONTROL -> handleControl(packet.getControl());
case BODY_NOT_SET -> {
}
}
} catch (Exception ex) {
handleProcessingError(packet, ex);
}
}
@Override
public void onError(Throwable t) {
completed = true;
log.warn("Realtime meeting gRPC client stream failed, connectionId={}", connectionId, t);
safeCloseStream("client_error", false);
}
@Override
public void onCompleted() {
completed = true;
if (connectionId != null) {
safeCloseStream("client_completed", false);
} else {
safeCompleteResponse();
}
}
private void handleOpen(RealtimeClientPacket packet) {
authContext = androidAuthService.authenticateGrpc(packet.getAuth(), null);
connectionId = realtimeMeetingGrpcSessionService.openStream(
packet.getOpen().getMeetingId(),
packet.getOpen().getStreamToken(),
authContext,
responseObserver
);
}
private void handleAudio(AudioChunk audioChunk) {
if (connectionId == null) {
throw new RuntimeException("实时 gRPC 流未打开");
}
realtimeMeetingGrpcSessionService.onAudio(connectionId, audioChunk.getPcm16().toByteArray(), audioChunk.getSeq(), audioChunk.getLastChunk());
}
private void handleControl(RealtimeControl control) {
if (connectionId == null) {
return;
}
switch (control.getType()) {
case STOP_SPEAKING, END_INPUT -> realtimeMeetingGrpcSessionService.onStopSpeaking(connectionId);
case CLOSE_STREAM -> safeCloseStream("client_close_stream", true);
case START, CONTROL_TYPE_UNSPECIFIED -> {
}
}
}
private void handleProcessingError(RealtimeClientPacket packet, Exception ex) {
String requestId = packet == null ? "" : packet.getRequestId();
log.error("Realtime meeting gRPC packet processing failed, requestId={}, connectionId={}", requestId, connectionId, ex);
safeSendError(requestId, ex.getMessage());
completed = true;
if (connectionId != null) {
safeCloseStream("grpc_processing_error", true);
} else {
safeCompleteResponse();
}
}
private void safeSendError(String requestId, String message) {
try {
responseObserver.onNext(RealtimeServerPacket.newBuilder()
.setRequestId(requestId == null ? "" : requestId)
.setError(ErrorEvent.newBuilder()
.setCode("REALTIME_GRPC_ERROR")
.setMessage(message == null || message.isBlank() ? "实时会议 gRPC 处理失败" : message)
.setRetryable(false)
.build())
.build());
} catch (Exception observerEx) {
log.warn("Failed to deliver realtime gRPC error packet, requestId={}, connectionId={}", requestId, connectionId, observerEx);
}
}
private void safeCloseStream(String reason, boolean notifyClient) {
if (connectionId == null) {
return;
}
try {
realtimeMeetingGrpcSessionService.closeStream(connectionId, reason, notifyClient);
} catch (Exception closeEx) {
log.error("Failed to close realtime gRPC stream, connectionId={}, reason={}", connectionId, reason, closeEx);
}
}
private void safeCompleteResponse() {
try {
responseObserver.onCompleted();
} catch (Exception observerEx) {
log.warn("Failed to complete realtime gRPC response, connectionId={}", connectionId, observerEx);
}
}
};
}
}

View File

@ -0,0 +1,66 @@
package com.imeeting.mapper;
import com.baomidou.mybatisplus.annotation.InterceptorIgnore;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imeeting.dto.biz.DeviceOnlineAdminVO;
import com.imeeting.entity.biz.DeviceInfoEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface DeviceInfoMapper extends BaseMapper<DeviceInfoEntity> {
@InterceptorIgnore(tenantLine = "true")
@Select("""
SELECT *
FROM biz_device_info
WHERE device_code = #{deviceCode}
AND is_deleted = 0
ORDER BY updated_at DESC, device_id DESC
LIMIT 1
""")
DeviceInfoEntity selectByDeviceCodeIgnoreTenant(@Param("deviceCode") String deviceCode);
@InterceptorIgnore(tenantLine = "true")
@Select("""
SELECT *
FROM biz_device_info
WHERE device_id = #{deviceId}
AND is_deleted = 0
LIMIT 1
""")
DeviceInfoEntity selectByIdIgnoreTenant(@Param("deviceId") Long deviceId);
@InterceptorIgnore(tenantLine = "true")
@Select("""
<script>
SELECT
d.device_id AS deviceId,
d.user_id AS userId,
d.device_code AS deviceCode,
d.device_name AS deviceName,
d.terminal_type AS terminalType,
d.terminal_version AS terminalVersion,
d.last_online_at AS lastOnlineAt,
d.status AS status,
d.created_at AS createdAt,
d.updated_at AS updatedAt,
d.tenant_id AS tenantId,
u.username AS username,
u.display_name AS displayName
FROM biz_device_info d
LEFT JOIN sys_user u
ON u.user_id = d.user_id
AND u.is_deleted = 0
WHERE d.is_deleted = 0
<if test="!platformAdmin and tenantId != null">
AND d.tenant_id = #{tenantId}
</if>
ORDER BY d.updated_at DESC, d.device_id DESC
</script>
""")
List<DeviceOnlineAdminVO> selectAdminList(@Param("tenantId") Long tenantId, @Param("platformAdmin") boolean platformAdmin);
}

View File

@ -1,11 +1,10 @@
package com.imeeting.service.android;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.grpc.common.ClientAuth;
import jakarta.servlet.http.HttpServletRequest;
public interface AndroidAuthService {
AndroidAuthContext authenticateGrpc(ClientAuth auth, String fallbackDeviceId);
AndroidAuthContext authenticateGrpc(String deviceId, String appVersion, String platform);
AndroidAuthContext authenticateHttp(HttpServletRequest request);
}

View File

@ -2,17 +2,20 @@ package com.imeeting.service.android;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidDeviceSessionState;
import com.imeeting.grpc.gateway.DeviceHello;
import java.util.List;
public interface AndroidDeviceSessionService {
AndroidDeviceSessionState openSession(AndroidAuthContext authContext, DeviceHello hello);
AndroidDeviceSessionState openSession(AndroidAuthContext authContext, String requestedConnectionId);
AndroidDeviceSessionState refreshHeartbeat(String connectionId, long clientTime);
AndroidDeviceSessionState getByConnectionId(String connectionId);
AndroidDeviceSessionState getByDeviceId(String deviceId);
String getActiveConnectionId(String deviceId);
void updateTopics(String deviceId, List<String> topics);
void closeSession(String connectionId);

View File

@ -1,14 +1,17 @@
package com.imeeting.service.android;
import com.imeeting.grpc.gateway.GatewayServerPacket;
import com.imeeting.grpc.push.PushMessage;
import com.imeeting.grpc.push.ServerMessage;
import io.grpc.stub.StreamObserver;
public interface AndroidGatewayPushService {
void register(String connectionId, String deviceId, StreamObserver<GatewayServerPacket> observer);
String register(String connectionId, String deviceId, StreamObserver<ServerMessage> observer);
void unregister(String connectionId);
boolean pushToConnection(String connectionId, GatewayServerPacket packet);
boolean pushToConnection(String connectionId, PushMessage message);
int pushToDevice(String deviceId, GatewayServerPacket packet);
int pushToDevice(String deviceId, PushMessage message);
String disconnectDevice(String deviceId);
}

View File

@ -2,8 +2,10 @@ package com.imeeting.service.android.impl;
import com.imeeting.config.grpc.AndroidGrpcAuthProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.grpc.common.ClientAuth;
import com.imeeting.entity.biz.DeviceInfoEntity;
import com.imeeting.mapper.DeviceInfoMapper;
import com.imeeting.service.android.AndroidAuthService;
import com.unisbase.common.exception.BusinessException;
import com.unisbase.dto.InternalAuthCheckResponse;
import com.unisbase.security.LoginUser;
import com.unisbase.service.TokenValidationService;
@ -27,31 +29,15 @@ public class AndroidAuthServiceImpl implements AndroidAuthService {
private final AndroidGrpcAuthProperties properties;
private final TokenValidationService tokenValidationService;
private final DeviceInfoMapper deviceInfoMapper;
@Override
public AndroidAuthContext authenticateGrpc(ClientAuth auth, String fallbackDeviceId) {
ClientAuth.AuthType authType = auth == null ? ClientAuth.AuthType.AUTH_TYPE_UNSPECIFIED : auth.getAuthType();
if (authType == ClientAuth.AuthType.USER_JWT) {
InternalAuthCheckResponse authResult = validateToken(auth == null ? null : auth.getAccessToken());
return buildContext(authType.name(), false, auth == null ? null : auth.getDeviceId(),auth == null ? null : auth.getAppId(),
auth == null ? null : auth.getAppVersion(), auth == null ? null : auth.getPlatform(), auth == null ? null : auth.getAccessToken(), fallbackDeviceId, authResult, null);
}
if (authType == ClientAuth.AuthType.DEVICE_TOKEN) {
return buildContext(authType.name(), false, auth == null ? null : auth.getDeviceId(), auth == null ? null : auth.getAppId(),
auth == null ? null : auth.getAppVersion(), auth == null ? null : auth.getPlatform(), auth == null ? null : auth.getAccessToken(), fallbackDeviceId, null, null);
}
public AndroidAuthContext authenticateGrpc(String deviceId, String appVersion, String platform) {
if (properties.isEnabled() && !properties.isAllowAnonymous()) {
throw new RuntimeException("缺少 Android gRPC 认证信息");
throw new RuntimeException("Android gRPC push does not allow anonymous access");
}
return buildContext("NONE", true,
auth == null ? null : auth.getDeviceId(),
auth == null ? null : auth.getAppId(),
auth == null ? null : auth.getAppVersion(),
auth == null ? null : auth.getPlatform(),
auth == null ? null : auth.getAccessToken(),
fallbackDeviceId,
null,
null);
assertDeviceEnabled(deviceId);
return buildContext("NONE", true, deviceId, null, appVersion, platform, null, null, null, null);
}
@Override
@ -64,6 +50,7 @@ public class AndroidAuthServiceImpl implements AndroidAuthService {
String platform = request.getHeader(HEADER_PLATFORM);
requireAndroidHttpHeaders(deviceId, appVersion, platform);
assertDeviceEnabled(deviceId);
if (loginUser != null) {
return buildContext("USER_JWT", false,
@ -100,7 +87,7 @@ public class AndroidAuthServiceImpl implements AndroidAuthService {
null,
null);
}
throw new RuntimeException("缺少 Android HTTP 访问令牌");
throw new RuntimeException("Missing Android HTTP access token");
}
private AndroidAuthContext buildContext(String authMode, boolean anonymous, String deviceId,
@ -108,7 +95,7 @@ public class AndroidAuthServiceImpl implements AndroidAuthService {
String fallbackDeviceId, InternalAuthCheckResponse authResult, LoginUser loginUser) {
String resolvedDeviceId = StringUtils.hasText(deviceId) ? deviceId : fallbackDeviceId;
if (!StringUtils.hasText(resolvedDeviceId)) {
throw new RuntimeException("缺少 Android deviceId");
throw new RuntimeException("Missing Android deviceId");
}
AndroidAuthContext context = new AndroidAuthContext();
context.setAuthMode(authMode);
@ -148,14 +135,14 @@ public class AndroidAuthServiceImpl implements AndroidAuthService {
private InternalAuthCheckResponse validateToken(String token) {
String resolvedToken = normalizeToken(token);
if (!StringUtils.hasText(resolvedToken)) {
throw new RuntimeException("缺少 Android 访问令牌");
throw new RuntimeException("Missing Android access token");
}
InternalAuthCheckResponse authResult = tokenValidationService.validateAccessToken(resolvedToken);
if (authResult == null || !authResult.isValid()) {
throw new RuntimeException(authResult == null || !StringUtils.hasText(authResult.getMessage()) ? "Android 访问令牌无效" : authResult.getMessage());
throw new RuntimeException(authResult == null || !StringUtils.hasText(authResult.getMessage()) ? "Android access token is invalid" : authResult.getMessage());
}
if (authResult.getUserId() == null || authResult.getTenantId() == null) {
throw new RuntimeException("Android 访问令牌缺少用户或租户上下文");
throw new RuntimeException("Android access token is missing user or tenant context");
}
return authResult;
}
@ -166,7 +153,7 @@ public class AndroidAuthServiceImpl implements AndroidAuthService {
return null;
}
if (!authorization.startsWith(BEARER_PREFIX)) {
throw new RuntimeException("Android HTTP 访问令牌无效");
throw new RuntimeException("Android HTTP access token format is invalid");
}
return authorization.substring(BEARER_PREFIX.length()).trim();
}
@ -183,13 +170,23 @@ public class AndroidAuthServiceImpl implements AndroidAuthService {
private void requireAndroidHttpHeaders(String deviceId, String appVersion, String platform) {
if (!StringUtils.hasText(deviceId)) {
throw new RuntimeException("缺少 Android device_id");
throw new RuntimeException("Missing Android device_id");
}
if (!StringUtils.hasText(appVersion)) {
throw new RuntimeException("缺少 Android-App-Version 请求头");
throw new RuntimeException("Missing X-Android-App-Version header");
}
if (!StringUtils.hasText(platform)) {
throw new RuntimeException("缺少 X-Android-Platform 请求头");
throw new RuntimeException("Missing X-Android-Platform header");
}
}
private void assertDeviceEnabled(String deviceId) {
if (!StringUtils.hasText(deviceId)) {
return;
}
DeviceInfoEntity device = deviceInfoMapper.selectByDeviceCodeIgnoreTenant(deviceId.trim());
if (device != null && device.getStatus() != null && device.getStatus() == 0) {
throw new BusinessException("403", "设备被禁用");
}
}

View File

@ -5,7 +5,6 @@ import com.imeeting.common.RedisKeys;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidDeviceSessionState;
import com.imeeting.grpc.gateway.DeviceHello;
import com.imeeting.service.android.AndroidDeviceSessionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -26,13 +25,13 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ
private final GrpcServerProperties grpcServerProperties;
@Override
public AndroidDeviceSessionState openSession(AndroidAuthContext authContext, DeviceHello hello) {
public AndroidDeviceSessionState openSession(AndroidAuthContext authContext, String requestedConnectionId) {
AndroidDeviceSessionState state = new AndroidDeviceSessionState();
state.setConnectionId("android_" + UUID.randomUUID().toString().replace("-", ""));
state.setConnectionId(nonBlank(requestedConnectionId, "android_" + UUID.randomUUID().toString().replace("-", "")));
state.setDeviceId(authContext.getDeviceId());
state.setStatus("ONLINE");
state.setLastSeenAt(System.currentTimeMillis());
state.setAppVersion(nonBlank(hello.getAppVersion(), authContext.getAppVersion()));
state.setAppVersion(authContext.getAppVersion());
state.setPlatform(nonBlank(authContext.getPlatform(), "android"));
state.setTenantCode(authContext.getTenantCode());
writeState(state);
@ -65,6 +64,26 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ
}
}
@Override
public AndroidDeviceSessionState getByDeviceId(String deviceId) {
String raw = redisTemplate.opsForValue().get(RedisKeys.androidDeviceOnlineKey(deviceId));
if (raw == null || raw.isBlank()) {
return null;
}
try {
return objectMapper.readValue(raw, AndroidDeviceSessionState.class);
} catch (Exception ex) {
log.warn("Failed to read android device online state, deviceId={}", deviceId, ex);
return null;
}
}
@Override
public String getActiveConnectionId(String deviceId) {
String value = redisTemplate.opsForValue().get(RedisKeys.androidDeviceActiveConnectionKey(deviceId));
return value == null || value.isBlank() ? null : value;
}
@Override
public void updateTopics(String deviceId, List<String> topics) {
try {
@ -73,7 +92,7 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ
objectMapper.writeValueAsString(topics == null ? List.of() : topics)
);
} catch (Exception ex) {
throw new RuntimeException("更新安卓设备主题失败", ex);
throw new RuntimeException("Failed to update android device topics", ex);
}
}
@ -84,11 +103,11 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ
redisTemplate.delete(RedisKeys.androidDeviceConnectionKey(connectionId));
return;
}
String activeConn = redisTemplate.opsForValue().get(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId()));
String activeConn = getActiveConnectionId(state.getDeviceId());
if (connectionId.equals(activeConn)) {
redisTemplate.delete(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId()));
}
redisTemplate.delete(RedisKeys.androidDeviceOnlineKey(state.getDeviceId()));
}
redisTemplate.delete(RedisKeys.androidDeviceConnectionKey(connectionId));
}
@ -100,7 +119,7 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ
redisTemplate.opsForValue().set(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId()), state.getConnectionId(), ttl);
redisTemplate.opsForValue().set(RedisKeys.androidDeviceConnectionKey(state.getConnectionId()), json, ttl);
} catch (Exception ex) {
throw new RuntimeException("写入安卓设备会话状态失败", ex);
throw new RuntimeException("Failed to store android device session", ex);
}
}

View File

@ -1,24 +1,38 @@
package com.imeeting.service.android.impl;
import com.imeeting.grpc.gateway.GatewayServerPacket;
import com.imeeting.grpc.push.PushMessage;
import com.imeeting.grpc.push.ServerMessage;
import com.imeeting.service.android.AndroidGatewayPushService;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Service
public class AndroidGatewayPushServiceImpl implements AndroidGatewayPushService {
private final Map<String, Binding> byConnectionId = new ConcurrentHashMap<>();
private final Map<String, Set<String>> byDeviceId = new ConcurrentHashMap<>();
private final Map<String, String> connectionByDeviceId = new ConcurrentHashMap<>();
@Override
public void register(String connectionId, String deviceId, StreamObserver<GatewayServerPacket> observer) {
byConnectionId.put(connectionId, new Binding(deviceId, observer));
byDeviceId.computeIfAbsent(deviceId, key -> ConcurrentHashMap.newKeySet()).add(connectionId);
public String register(String connectionId, String deviceId, StreamObserver<ServerMessage> observer) {
Binding newBinding = new Binding(deviceId, observer);
Binding previousBinding = byConnectionId.put(connectionId, newBinding);
if (previousBinding != null && !previousBinding.deviceId().equals(deviceId)) {
connectionByDeviceId.remove(previousBinding.deviceId(), connectionId);
}
String previousConnectionId = connectionByDeviceId.put(deviceId, connectionId);
if (previousConnectionId != null && !previousConnectionId.equals(connectionId)) {
Binding replacedBinding = byConnectionId.remove(previousConnectionId);
if (replacedBinding != null) {
safeComplete(previousConnectionId, replacedBinding);
}
}
return previousConnectionId;
}
@Override
@ -27,43 +41,62 @@ public class AndroidGatewayPushServiceImpl implements AndroidGatewayPushService
if (binding == null) {
return;
}
Set<String> connectionIds = byDeviceId.get(binding.deviceId());
if (connectionIds == null) {
return;
}
connectionIds.remove(connectionId);
if (connectionIds.isEmpty()) {
byDeviceId.remove(binding.deviceId());
}
connectionByDeviceId.remove(binding.deviceId(), connectionId);
}
@Override
public boolean pushToConnection(String connectionId, GatewayServerPacket packet) {
public boolean pushToConnection(String connectionId, PushMessage message) {
Binding binding = byConnectionId.get(connectionId);
if (binding == null) {
return false;
}
synchronized (binding) {
binding.observer().onNext(packet);
try {
binding.observer().onNext(ServerMessage.newBuilder().setPush(message).build());
} catch (Exception ex) {
log.warn("Failed to push android message, connectionId={}, deviceId={}", connectionId, binding.deviceId(), ex);
unregister(connectionId);
return false;
}
}
return true;
}
@Override
public int pushToDevice(String deviceId, GatewayServerPacket packet) {
Set<String> connectionIds = byDeviceId.get(deviceId);
if (connectionIds == null || connectionIds.isEmpty()) {
public int pushToDevice(String deviceId, PushMessage message) {
String connectionId = connectionByDeviceId.get(deviceId);
if (connectionId == null || connectionId.isBlank()) {
return 0;
}
int pushed = 0;
for (String connectionId : connectionIds) {
if (pushToConnection(connectionId, packet)) {
pushed++;
}
}
return pushed;
return pushToConnection(connectionId, message) ? 1 : 0;
}
private record Binding(String deviceId, StreamObserver<GatewayServerPacket> observer) {
@Override
public String disconnectDevice(String deviceId) {
String connectionId = connectionByDeviceId.get(deviceId);
if (connectionId == null || connectionId.isBlank()) {
return null;
}
Binding binding = byConnectionId.get(connectionId);
if (binding == null) {
connectionByDeviceId.remove(deviceId, connectionId);
return null;
}
unregister(connectionId);
safeComplete(connectionId, binding);
return connectionId;
}
private void safeComplete(String connectionId, Binding binding) {
synchronized (binding) {
try {
binding.observer().onCompleted();
} catch (Exception ex) {
log.debug("Failed to complete replaced android push stream, connectionId={}, deviceId={}", connectionId, binding.deviceId(), ex);
}
}
}
private record Binding(String deviceId, StreamObserver<ServerMessage> observer) {
}
}

View File

@ -0,0 +1,21 @@
package com.imeeting.service.biz;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.biz.DeviceAdminUpdateCommand;
import com.imeeting.dto.biz.DeviceOnlineAdminVO;
import com.unisbase.security.LoginUser;
import java.util.List;
public interface DeviceOnlineManagementService {
void recordConnected(AndroidAuthContext authContext);
void recordDisconnected(String deviceCode, Long lastSeenAtMillis);
List<DeviceOnlineAdminVO> listForAdmin(LoginUser loginUser);
DeviceOnlineAdminVO update(Long id, DeviceAdminUpdateCommand command, LoginUser loginUser);
boolean kick(Long id, LoginUser loginUser);
}

View File

@ -0,0 +1,160 @@
package com.imeeting.service.biz.impl;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidDeviceSessionState;
import com.imeeting.dto.biz.DeviceAdminUpdateCommand;
import com.imeeting.dto.biz.DeviceOnlineAdminVO;
import com.imeeting.entity.biz.DeviceInfoEntity;
import com.imeeting.mapper.DeviceInfoMapper;
import com.imeeting.service.android.AndroidDeviceSessionService;
import com.imeeting.service.android.AndroidGatewayPushService;
import com.imeeting.service.biz.DeviceOnlineManagementService;
import com.unisbase.security.LoginUser;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
@Service
@RequiredArgsConstructor
public class DeviceOnlineManagementServiceImpl implements DeviceOnlineManagementService {
private final DeviceInfoMapper deviceInfoMapper;
private final AndroidDeviceSessionService androidDeviceSessionService;
private final AndroidGatewayPushService androidGatewayPushService;
@Override
public void recordConnected(AndroidAuthContext authContext) {
if (authContext == null || !StringUtils.hasText(authContext.getDeviceId())) {
return;
}
String deviceCode = authContext.getDeviceId().trim();
DeviceInfoEntity existing = deviceInfoMapper.selectByDeviceCodeIgnoreTenant(deviceCode);
LocalDateTime now = LocalDateTime.now();
if (existing == null) {
DeviceInfoEntity created = new DeviceInfoEntity();
created.setTenantId(authContext.getTenantId());
created.setUserId(authContext.getUserId());
created.setDeviceCode(deviceCode);
created.setDeviceName(null);
created.setTerminalType(normalizeTerminalType(authContext.getPlatform()));
created.setTerminalVersion(normalize(authContext.getAppVersion()));
created.setLastOnlineAt(now);
created.setStatus(1);
deviceInfoMapper.insert(created);
return;
}
existing.setTerminalType(normalizeTerminalType(authContext.getPlatform()));
existing.setTerminalVersion(normalize(authContext.getAppVersion()));
existing.setLastOnlineAt(now);
if (existing.getUserId() == null && authContext.getUserId() != null) {
existing.setUserId(authContext.getUserId());
}
if (existing.getTenantId() == null && authContext.getTenantId() != null) {
existing.setTenantId(authContext.getTenantId());
}
deviceInfoMapper.updateById(existing);
}
@Override
public void recordDisconnected(String deviceCode, Long lastSeenAtMillis) {
if (!StringUtils.hasText(deviceCode)) {
return;
}
DeviceInfoEntity existing = deviceInfoMapper.selectByDeviceCodeIgnoreTenant(deviceCode.trim());
if (existing == null) {
return;
}
existing.setLastOnlineAt(toLocalDateTime(lastSeenAtMillis));
deviceInfoMapper.updateById(existing);
}
@Override
public List<DeviceOnlineAdminVO> listForAdmin(LoginUser loginUser) {
List<DeviceOnlineAdminVO> devices = deviceInfoMapper.selectAdminList(loginUser == null ? null : loginUser.getTenantId(), isPlatformAdmin(loginUser));
for (DeviceOnlineAdminVO device : devices) {
AndroidDeviceSessionState state = androidDeviceSessionService.getByDeviceId(device.getDeviceCode());
if (state != null) {
device.setOnline(true);
device.setLastOnlineAt(toLocalDateTime(state.getLastSeenAt()));
} else {
device.setOnline(false);
}
}
return devices;
}
@Override
public DeviceOnlineAdminVO update(Long id, DeviceAdminUpdateCommand command, LoginUser loginUser) {
DeviceInfoEntity existing = requireVisibleDevice(id, loginUser);
existing.setDeviceName(normalize(command.getDeviceName()));
boolean disableAfterUpdate = command.getStatus() != null && command.getStatus() == 0;
if (command.getStatus() != null) {
existing.setStatus(command.getStatus());
}
deviceInfoMapper.updateById(existing);
if (disableAfterUpdate) {
disconnectDevice(existing.getDeviceCode());
}
return listForAdmin(loginUser).stream()
.filter(item -> id.equals(item.getDeviceId()))
.findFirst()
.orElseThrow(() -> new RuntimeException("Device not found after update"));
}
@Override
public boolean kick(Long id, LoginUser loginUser) {
DeviceInfoEntity existing = requireVisibleDevice(id, loginUser);
disconnectDevice(existing.getDeviceCode());
return true;
}
private DeviceInfoEntity requireVisibleDevice(Long id, LoginUser loginUser) {
DeviceInfoEntity existing = deviceInfoMapper.selectByIdIgnoreTenant(id);
if (existing == null) {
throw new RuntimeException("Device not found");
}
if (!isPlatformAdmin(loginUser) && loginUser != null && loginUser.getTenantId() != null) {
if (existing.getTenantId() == null || !loginUser.getTenantId().equals(existing.getTenantId())) {
throw new RuntimeException("Device is not visible in current tenant");
}
}
return existing;
}
private boolean isPlatformAdmin(LoginUser loginUser) {
return loginUser != null && Boolean.TRUE.equals(loginUser.getIsPlatformAdmin());
}
private String normalizeTerminalType(String platform) {
String normalized = normalize(platform);
return normalized == null ? null : normalized.toLowerCase();
}
private String normalize(String value) {
if (!StringUtils.hasText(value)) {
return null;
}
return value.trim();
}
private void disconnectDevice(String deviceCode) {
String activeConnectionId = androidDeviceSessionService.getActiveConnectionId(deviceCode);
AndroidDeviceSessionState state = activeConnectionId == null ? null : androidDeviceSessionService.getByConnectionId(activeConnectionId);
androidGatewayPushService.disconnectDevice(deviceCode);
if (activeConnectionId != null) {
androidDeviceSessionService.closeSession(activeConnectionId);
}
recordDisconnected(deviceCode, state == null ? null : state.getLastSeenAt());
}
private LocalDateTime toLocalDateTime(Long millis) {
long timestamp = millis != null && millis > 0 ? millis : System.currentTimeMillis();
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
}
}

View File

@ -1,14 +0,0 @@
package com.imeeting.service.realtime;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO;
public interface AndroidRealtimeSessionTicketService {
AndroidRealtimeGrpcSessionVO createSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext);
AndroidRealtimeGrpcSessionData prepareSessionData(Long meetingId, AndroidAuthContext authContext);
AndroidRealtimeGrpcSessionData getSessionData(String streamToken);
}

View File

@ -1,71 +0,0 @@
package com.imeeting.service.realtime;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
public interface AsrUpstreamBridgeService {
AsrUpstreamSession openSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId, AsrUpstreamEventListener listener);
interface AsrUpstreamSession {
boolean isReady();
void sendAudio(byte[] payload);
void sendStopSpeaking();
void close(String reason);
}
interface AsrUpstreamEventListener {
void onReady();
void onTranscript(AsrTranscriptResult result);
void onError(String code, String message, boolean retryable);
void onClosed(String reason);
}
class AsrTranscriptResult {
private final boolean finalResult;
private final String text;
private final String speakerId;
private final String speakerName;
private final Integer startTime;
private final Integer endTime;
public AsrTranscriptResult(boolean finalResult, String text, String speakerId, String speakerName,
Integer startTime, Integer endTime) {
this.finalResult = finalResult;
this.text = text;
this.speakerId = speakerId;
this.speakerName = speakerName;
this.startTime = startTime;
this.endTime = endTime;
}
public boolean isFinalResult() {
return finalResult;
}
public String getText() {
return text;
}
public String getSpeakerId() {
return speakerId;
}
public String getSpeakerName() {
return speakerName;
}
public Integer getStartTime() {
return startTime;
}
public Integer getEndTime() {
return endTime;
}
}
}

View File

@ -1,15 +0,0 @@
package com.imeeting.service.realtime;
import com.imeeting.dto.android.AndroidAuthContext;
import io.grpc.stub.StreamObserver;
import com.imeeting.grpc.realtime.RealtimeServerPacket;
public interface RealtimeMeetingGrpcSessionService {
String openStream(Long meetingId, String streamToken, AndroidAuthContext authContext, StreamObserver<RealtimeServerPacket> responseObserver);
void onAudio(String connectionId, byte[] payload, long seq, boolean lastChunk);
void onStopSpeaking(String connectionId);
void closeStream(String connectionId, String reason, boolean notifyClient);
}

View File

@ -1,251 +0,0 @@
package com.imeeting.service.realtime.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.common.MeetingConstants;
import com.imeeting.common.RedisKeys;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO;
import com.imeeting.dto.biz.AiModelVO;
import com.imeeting.dto.biz.RealtimeMeetingResumeConfig;
import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO;
import com.imeeting.entity.biz.Meeting;
import com.imeeting.service.biz.AiModelService;
import com.imeeting.service.biz.MeetingAccessService;
import com.imeeting.service.biz.MeetingAuthorizationService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Service
@RequiredArgsConstructor
public class AndroidRealtimeSessionTicketServiceImpl implements AndroidRealtimeSessionTicketService {
private final ObjectMapper objectMapper;
private final StringRedisTemplate redisTemplate;
private final MeetingAccessService meetingAccessService;
private final MeetingAuthorizationService meetingAuthorizationService;
private final AiModelService aiModelService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final GrpcServerProperties grpcServerProperties;
@Override
public AndroidRealtimeGrpcSessionVO createSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext) {
PreparedRealtimeSession prepared = prepareSession(meetingId, command, authContext);
String streamToken = UUID.randomUUID().toString().replace("-", "");
Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getSessionTtlSeconds());
try {
redisTemplate.opsForValue().set(
RedisKeys.realtimeMeetingGrpcSessionKey(streamToken),
objectMapper.writeValueAsString(prepared.sessionData()),
ttl
);
} catch (Exception ex) {
throw new RuntimeException("创建实时会议 gRPC 会话失败", ex);
}
AndroidRealtimeGrpcSessionVO vo = new AndroidRealtimeGrpcSessionVO();
vo.setMeetingId(meetingId);
vo.setStreamToken(streamToken);
vo.setExpiresInSeconds(ttl.toSeconds());
vo.setSampleRate(grpcServerProperties.getRealtime().getSampleRate());
vo.setChannels(grpcServerProperties.getRealtime().getChannels());
vo.setEncoding(grpcServerProperties.getRealtime().getEncoding());
vo.setResumeConfig(prepared.resumeConfig());
vo.setStatus(prepared.status());
return vo;
}
@Override
public AndroidRealtimeGrpcSessionData prepareSessionData(Long meetingId, AndroidAuthContext authContext) {
return prepareSession(meetingId, null, authContext).sessionData();
}
@Override
public AndroidRealtimeGrpcSessionData getSessionData(String streamToken) {
if (streamToken == null || streamToken.isBlank()) {
return null;
}
String raw = redisTemplate.opsForValue().get(RedisKeys.realtimeMeetingGrpcSessionKey(streamToken));
if (raw == null || raw.isBlank()) {
return null;
}
try {
return objectMapper.readValue(raw, AndroidRealtimeGrpcSessionData.class);
} catch (Exception ex) {
throw new RuntimeException("读取实时会议 gRPC 会话失败", ex);
}
}
private PreparedRealtimeSession prepareSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext) {
if (meetingId == null) {
throw new RuntimeException("会议 ID 不能为空");
}
Meeting meeting = meetingAccessService.requireMeeting(meetingId);
meetingAuthorizationService.assertCanControlRealtimeMeeting(meeting, authContext, MeetingConstants.SOURCE_ANDROID);
realtimeMeetingSessionStateService.initSessionIfAbsent(meetingId, meeting.getTenantId(), meeting.getCreatorId());
RealtimeMeetingSessionStatusVO currentStatus = realtimeMeetingSessionStateService.getStatus(meetingId);
RealtimeMeetingResumeConfig currentResumeConfig = currentStatus == null ? null : currentStatus.getResumeConfig();
Long asrModelId = firstNonNull(
command == null ? null : command.getAsrModelId(),
currentResumeConfig == null ? null : currentResumeConfig.getAsrModelId(),
resolveDefaultAsrModelId(meeting.getTenantId())
);
if (asrModelId == null) {
throw new RuntimeException("ASR 模型 ID 不能为空");
}
realtimeMeetingSessionStateService.assertCanOpenSession(meetingId);
AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR");
if (asrModel == null) {
throw new RuntimeException("ASR 模型不存在");
}
String targetWsUrl = resolveWsUrl(asrModel);
if (targetWsUrl == null || targetWsUrl.isBlank()) {
throw new RuntimeException("ASR 模型未配置 WebSocket 地址");
}
RealtimeMeetingResumeConfig resumeConfig = buildResumeConfig(command, currentResumeConfig, asrModelId);
realtimeMeetingSessionStateService.rememberResumeConfig(meetingId, resumeConfig);
RealtimeMeetingSessionStatusVO latestStatus = realtimeMeetingSessionStateService.getStatus(meetingId);
Map<String, Object> startMessage = buildStartMessage(asrModel, meetingId, resumeConfig);
AndroidRealtimeGrpcSessionData sessionData = new AndroidRealtimeGrpcSessionData();
sessionData.setMeetingId(meetingId);
sessionData.setTenantId(authContext != null && authContext.getTenantId() != null ? authContext.getTenantId() : meeting.getTenantId());
sessionData.setUserId(authContext != null && authContext.getUserId() != null ? authContext.getUserId() : meeting.getCreatorId());
sessionData.setDeviceId(authContext == null ? null : authContext.getDeviceId());
sessionData.setAsrModelId(asrModelId);
sessionData.setTargetWsUrl(targetWsUrl);
sessionData.setResumeConfig(resumeConfig);
try {
sessionData.setStartMessageJson(objectMapper.writeValueAsString(startMessage));
} catch (Exception ex) {
throw new RuntimeException("序列化实时启动消息失败", ex);
}
return new PreparedRealtimeSession(sessionData, resumeConfig, latestStatus);
}
private Long resolveDefaultAsrModelId(Long tenantId) {
AiModelVO defaultModel = aiModelService.getDefaultModel("ASR", tenantId == null ? 0L : tenantId);
return defaultModel == null ? null : defaultModel.getId();
}
private RealtimeMeetingResumeConfig buildResumeConfig(AndroidOpenRealtimeGrpcSessionCommand command,
RealtimeMeetingResumeConfig currentResumeConfig,
Long asrModelId) {
RealtimeMeetingResumeConfig config = new RealtimeMeetingResumeConfig();
config.setAsrModelId(asrModelId);
config.setMode(nonBlank(command == null ? null : command.getMode(), currentResumeConfig == null ? null : currentResumeConfig.getMode(), "2pass"));
config.setLanguage(nonBlank(command == null ? null : command.getLanguage(), currentResumeConfig == null ? null : currentResumeConfig.getLanguage(), "auto"));
config.setUseSpkId(firstNonNull(command == null ? null : command.getUseSpkId(), currentResumeConfig == null ? null : currentResumeConfig.getUseSpkId(), 1));
config.setEnablePunctuation(firstNonNull(command == null ? null : command.getEnablePunctuation(), currentResumeConfig == null ? null : currentResumeConfig.getEnablePunctuation(), Boolean.TRUE));
config.setEnableItn(firstNonNull(command == null ? null : command.getEnableItn(), currentResumeConfig == null ? null : currentResumeConfig.getEnableItn(), Boolean.TRUE));
config.setEnableTextRefine(firstNonNull(command == null ? null : command.getEnableTextRefine(), currentResumeConfig == null ? null : currentResumeConfig.getEnableTextRefine(), Boolean.FALSE));
config.setSaveAudio(firstNonNull(command == null ? null : command.getSaveAudio(), currentResumeConfig == null ? null : currentResumeConfig.getSaveAudio(), Boolean.FALSE));
config.setHotwords(command != null && command.getHotwords() != null
? command.getHotwords()
: currentResumeConfig == null || currentResumeConfig.getHotwords() == null
? List.of()
: currentResumeConfig.getHotwords());
return config;
}
private String resolveWsUrl(AiModelVO model) {
if (model.getWsUrl() != null && !model.getWsUrl().isBlank()) {
return model.getWsUrl();
}
if (model.getBaseUrl() == null || model.getBaseUrl().isBlank()) {
return "";
}
return model.getBaseUrl()
.replaceFirst("^http://", "ws://")
.replaceFirst("^https://", "wss://");
}
private Map<String, Object> buildStartMessage(AiModelVO model, Long meetingId, RealtimeMeetingResumeConfig resumeConfig) {
Map<String, Object> root = new HashMap<>();
root.put("type", "start");
root.put("request_id", "android_" + System.currentTimeMillis() + "_" + meetingId);
root.put("authorization", buildAuthorization(model.getApiKey()));
Map<String, Object> config = new HashMap<>();
Map<String, Object> audio = new HashMap<>();
audio.put("format", "pcm");
audio.put("sample_rate", grpcServerProperties.getRealtime().getSampleRate());
audio.put("channels", grpcServerProperties.getRealtime().getChannels());
config.put("audio", audio);
Map<String, Object> recognition = new HashMap<>();
recognition.put("language", nonBlank(resumeConfig.getLanguage(), "auto"));
recognition.put("enable_punctuation", boolOrDefault(resumeConfig.getEnablePunctuation(), true));
recognition.put("enable_itn", boolOrDefault(resumeConfig.getEnableItn(), true));
recognition.put("enable_speaker", Integer.valueOf(1).equals(resumeConfig.getUseSpkId()));
recognition.put("enable_two_pass", !"online".equalsIgnoreCase(resumeConfig.getMode()));
recognition.put("enable_text_refine", boolOrDefault(resumeConfig.getEnableTextRefine(), false));
recognition.put("speaker_threshold", readSpeakerThreshold(model.getMediaConfig()));
recognition.put("hotwords", resumeConfig.getHotwords() == null ? List.of() : resumeConfig.getHotwords());
config.put("recognition", recognition);
config.put("model", model.getModelCode());
config.put("save_audio", boolOrDefault(resumeConfig.getSaveAudio(), false));
root.put("config", config);
return root;
}
private String buildAuthorization(String apiKey) {
if (apiKey == null || apiKey.isBlank()) {
return "";
}
return apiKey.startsWith("Bearer ") ? apiKey : "Bearer " + apiKey;
}
private Object readSpeakerThreshold(Map<String, Object> mediaConfig) {
return mediaConfig == null ? null : mediaConfig.get("svThreshold");
}
private boolean boolOrDefault(Boolean value, boolean defaultValue) {
return value != null ? value : defaultValue;
}
@SafeVarargs
private <T> T firstNonNull(T... values) {
for (T value : values) {
if (value != null) {
return value;
}
}
return null;
}
private String nonBlank(String value, String defaultValue) {
return nonBlank(value, defaultValue, null);
}
private String nonBlank(String value, String fallbackValue, String defaultValue) {
if (value != null && !value.isBlank()) {
return value.trim();
}
if (fallbackValue != null && !fallbackValue.isBlank()) {
return fallbackValue.trim();
}
return defaultValue;
}
private record PreparedRealtimeSession(AndroidRealtimeGrpcSessionData sessionData,
RealtimeMeetingResumeConfig resumeConfig,
RealtimeMeetingSessionStatusVO status) {
}
}

View File

@ -1,262 +0,0 @@
package com.imeeting.service.realtime.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
import com.imeeting.service.realtime.AsrUpstreamBridgeService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Service
@RequiredArgsConstructor
public class AsrUpstreamBridgeServiceImpl implements AsrUpstreamBridgeService {
private final ObjectMapper objectMapper;
@Override
public AsrUpstreamSession openSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId,
AsrUpstreamEventListener listener) {
BridgeSession session = new BridgeSession(sessionData, connectionId, listener, objectMapper);
session.connect();
return session;
}
private static final class BridgeSession implements AsrUpstreamSession {
private final AndroidRealtimeGrpcSessionData sessionData;
private final String connectionId;
private final AsrUpstreamEventListener listener;
private final ObjectMapper objectMapper;
private final HttpClient httpClient = HttpClient.newHttpClient();
private final Queue<byte[]> pendingAudio = new ConcurrentLinkedQueue<>();
private final AtomicBoolean ready = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final StringBuilder textBuffer = new StringBuilder();
private volatile WebSocket webSocket;
private CompletableFuture<Void> sendChain = CompletableFuture.completedFuture(null);
private BridgeSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId,
AsrUpstreamEventListener listener, ObjectMapper objectMapper) {
this.sessionData = sessionData;
this.connectionId = connectionId;
this.listener = listener;
this.objectMapper = objectMapper;
}
private void connect() {
try {
WebSocket socket = httpClient.newWebSocketBuilder()
.buildAsync(URI.create(sessionData.getTargetWsUrl()), new ListenerImpl())
.get();
this.webSocket = socket;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
listener.onError("REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接第三方识别服务时被中断", true);
} catch (ExecutionException ex) {
listener.onError("REALTIME_UPSTREAM_CONNECT_FAILED", "连接第三方识别服务失败,请检查模型 WebSocket 配置或服务状态", true);
}
}
@Override
public boolean isReady() {
return ready.get();
}
@Override
public void sendAudio(byte[] payload) {
if (payload == null || payload.length == 0 || closed.get()) {
return;
}
if (!ready.get() || webSocket == null) {
pendingAudio.add(payload);
return;
}
sendOrdered(() -> webSocket.sendBinary(ByteBuffer.wrap(payload), true));
}
@Override
public void sendStopSpeaking() {
if (closed.get() || webSocket == null) {
return;
}
sendOrdered(() -> webSocket.sendText("{\"is_speaking\":false}", true));
}
@Override
public void close(String reason) {
if (!closed.compareAndSet(false, true)) {
return;
}
if (webSocket != null) {
webSocket.sendClose(1000, reason == null ? "" : reason);
}
}
private void sendOrdered(java.util.function.Supplier<CompletableFuture<WebSocket>> action) {
synchronized (this) {
sendChain = sendChain.exceptionally(ex -> null)
.thenCompose(ignored -> action.get().thenApply(ws -> null));
}
}
private void flushPendingAudio() {
byte[] payload;
while ((payload = pendingAudio.poll()) != null) {
sendAudio(payload);
}
}
private void handleTextMessage(String payload) {
try {
JsonNode root = objectMapper.readTree(payload);
if ((root.hasNonNull("code") || "error".equals(root.path("type").asText())) && root.hasNonNull("message")) {
listener.onError(
root.path("code").asText("REALTIME_UPSTREAM_ERROR"),
root.path("message").asText("第三方识别服务连接异常"),
true
);
return;
}
AsrTranscriptResult result = normalizeTranscript(root);
if (result != null) {
listener.onTranscript(result);
}
} catch (Exception ex) {
log.debug("Ignore invalid upstream ASR payload, connectionId={}, payload={}", connectionId, payload, ex);
}
}
private AsrTranscriptResult normalizeTranscript(JsonNode root) {
String type = root.path("type").asText();
if ("partial".equals(type) || "segment".equals(type)) {
JsonNode data = root.path("data");
String text = data.path("text").asText("");
if (text.isBlank()) {
return null;
}
boolean isFinal = "segment".equals(type) || data.path("is_final").asBoolean(false);
String speakerId = textValue(data, "speaker_id");
String speakerName = textValue(data, "speaker_name");
if (speakerName == null) {
speakerName = speakerId;
}
Integer startTime = toMs(data.path("start"));
Integer endTime = toMs(data.path("end"));
return new AsrTranscriptResult(isFinal, text, speakerId, speakerName, startTime, endTime);
}
String text = root.path("text").asText("");
if (text.isBlank()) {
return null;
}
boolean isFinal = root.path("is_final").asBoolean(false);
JsonNode speaker = root.path("speaker");
String speakerId = null;
String speakerName = null;
if (speaker.isTextual()) {
speakerId = speaker.asText();
speakerName = speaker.asText();
} else if (speaker.isObject()) {
speakerId = textValue(speaker, "user_id");
if (speakerId == null) {
speakerId = textValue(speaker, "speaker_id");
}
speakerName = textValue(speaker, "name");
if (speakerName == null) {
speakerName = speakerId;
}
}
Integer startTime = null;
Integer endTime = null;
JsonNode timestamp = root.path("timestamp");
if (timestamp.isArray() && timestamp.size() > 0) {
JsonNode first = timestamp.get(0);
JsonNode last = timestamp.get(timestamp.size() - 1);
if (first.isArray() && first.size() > 0) {
startTime = first.get(0).isInt() ? first.get(0).asInt() : null;
}
if (last.isArray() && last.size() > 1) {
endTime = last.get(1).isInt() ? last.get(1).asInt() : null;
}
}
return new AsrTranscriptResult(isFinal, text, speakerId, speakerName, startTime, endTime);
}
private String textValue(JsonNode node, String fieldName) {
JsonNode target = node.path(fieldName);
if (target.isMissingNode() || target.isNull()) {
return null;
}
String value = target.asText();
return value == null || value.isBlank() ? null : value;
}
private Integer toMs(JsonNode node) {
if (!node.isNumber()) {
return null;
}
return (int) Math.round(node.asDouble() * 1000D);
}
private final class ListenerImpl implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
sendOrdered(() -> webSocket.sendText(sessionData.getStartMessageJson(), true));
ready.set(true);
flushPendingAudio();
listener.onReady();
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
textBuffer.append(data);
if (last) {
handleTextMessage(textBuffer.toString());
textBuffer.setLength(0);
}
webSocket.request(1);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
webSocket.request(1);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
if (closed.compareAndSet(false, true)) {
listener.onClosed(reason == null || reason.isBlank() ? "upstream_closed" : reason);
}
return CompletableFuture.completedFuture(null);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
if (closed.compareAndSet(false, true)) {
listener.onError(
"REALTIME_UPSTREAM_ERROR",
error == null || error.getMessage() == null || error.getMessage().isBlank()
? "第三方识别服务连接异常"
: "第三方识别服务连接异常: " + error.getMessage(),
true
);
}
}
}
}
}

View File

@ -1,338 +0,0 @@
package com.imeeting.service.realtime.impl;
import com.imeeting.common.RedisKeys;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO;
import com.imeeting.dto.biz.RealtimeTranscriptItemDTO;
import com.imeeting.grpc.common.ErrorEvent;
import com.imeeting.grpc.realtime.RealtimeServerPacket;
import com.imeeting.grpc.realtime.SessionStatusEvent;
import com.imeeting.grpc.realtime.StreamClosed;
import com.imeeting.grpc.realtime.StreamReady;
import com.imeeting.grpc.realtime.TranscriptEvent;
import com.imeeting.service.biz.MeetingCommandService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService;
import com.imeeting.service.realtime.AsrUpstreamBridgeService;
import com.imeeting.service.realtime.RealtimeMeetingAudioStorageService;
import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Service
@RequiredArgsConstructor
public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrpcSessionService {
private final AndroidRealtimeSessionTicketService ticketService;
private final AsrUpstreamBridgeService asrUpstreamBridgeService;
private final MeetingCommandService meetingCommandService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService;
private final StringRedisTemplate redisTemplate;
private final GrpcServerProperties grpcServerProperties;
private final ConcurrentMap<String, SessionRuntime> sessions = new ConcurrentHashMap<>();
@Override
public String openStream(Long meetingId, String streamToken, AndroidAuthContext authContext, StreamObserver<RealtimeServerPacket> responseObserver) {
AndroidRealtimeGrpcSessionData sessionData;
if (meetingId != null && meetingId > 0) {
sessionData = ticketService.prepareSessionData(meetingId, authContext);
streamToken = "";
} else if (streamToken != null && !streamToken.isBlank()) {
sessionData = ticketService.getSessionData(streamToken);
if (sessionData == null) {
throw new RuntimeException("实时会议 gRPC 会话令牌无效");
}
if (sessionData.getDeviceId() != null && !sessionData.getDeviceId().isBlank()
&& authContext.getDeviceId() != null
&& !sessionData.getDeviceId().equals(authContext.getDeviceId())) {
throw new RuntimeException("实时会议 gRPC 会话令牌与 deviceId 不匹配");
}
} else {
throw new RuntimeException("会议 ID 不能为空");
}
String connectionId = "grpc_" + java.util.UUID.randomUUID().toString().replace("-", "");
SessionRuntime runtime = new SessionRuntime(connectionId, streamToken, sessionData, responseObserver);
SessionRuntime previous = sessions.putIfAbsent(connectionId, runtime);
if (previous != null) {
throw new RuntimeException("实时会议 gRPC connectionId 重复");
}
try {
writeConnectionState(runtime);
realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), connectionId);
runtime.upstreamSession = asrUpstreamBridgeService.openSession(sessionData, connectionId, new UpstreamCallback(runtime));
return connectionId;
} catch (Exception ex) {
sessions.remove(connectionId);
cleanupFailedOpen(runtime);
throw ex;
}
}
@Override
public void onAudio(String connectionId, byte[] payload, long seq, boolean lastChunk) {
SessionRuntime runtime = sessions.get(connectionId);
if (runtime == null || runtime.closed.get()) {
return;
}
touchConnectionState(runtime);
realtimeMeetingAudioStorageService.append(connectionId, payload);
runtime.upstreamSession.sendAudio(payload);
if (lastChunk) {
runtime.upstreamSession.sendStopSpeaking();
}
}
@Override
public void onStopSpeaking(String connectionId) {
SessionRuntime runtime = sessions.get(connectionId);
if (runtime == null || runtime.closed.get()) {
return;
}
touchConnectionState(runtime);
runtime.upstreamSession.sendStopSpeaking();
}
@Override
public void closeStream(String connectionId, String reason, boolean notifyClient) {
SessionRuntime runtime = sessions.remove(connectionId);
if (runtime == null) {
return;
}
if (!runtime.closed.compareAndSet(false, true)) {
return;
}
try {
if (runtime.upstreamSession != null) {
runtime.upstreamSession.close(reason == null ? "closed" : reason);
}
} catch (Exception ex) {
log.warn("Failed to close upstream realtime session, connectionId={}", connectionId, ex);
}
try {
redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(connectionId));
} catch (Exception ex) {
log.error("Failed to delete realtime gRPC connection state, connectionId={}", connectionId, ex);
}
try {
realtimeMeetingAudioStorageService.closeSession(connectionId);
} catch (Exception ex) {
log.error("Failed to close realtime gRPC audio session, connectionId={}", connectionId, ex);
}
try {
realtimeMeetingSessionStateService.pauseByDisconnect(runtime.sessionData.getMeetingId(), connectionId);
} catch (Exception ex) {
log.error("Failed to pause realtime meeting by disconnect, meetingId={}, connectionId={}",
runtime.sessionData.getMeetingId(), connectionId, ex);
}
if (notifyClient) {
runtime.send(RealtimeServerPacket.newBuilder()
.setClosed(StreamClosed.newBuilder()
.setMeetingId(runtime.sessionData.getMeetingId())
.setReason(reason == null ? "closed" : reason)
.build())
.build());
}
runtime.complete();
}
private void cleanupFailedOpen(SessionRuntime runtime) {
try {
redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(runtime.connectionId));
} catch (Exception ex) {
log.warn("Failed to rollback realtime gRPC connection state, connectionId={}", runtime.connectionId, ex);
}
try {
realtimeMeetingAudioStorageService.closeSession(runtime.connectionId);
} catch (Exception ex) {
log.warn("Failed to rollback realtime gRPC audio session, connectionId={}", runtime.connectionId, ex);
}
try {
if (runtime.upstreamSession != null) {
runtime.upstreamSession.close("open_failed");
}
} catch (Exception ex) {
log.warn("Failed to rollback realtime gRPC upstream session, connectionId={}", runtime.connectionId, ex);
}
}
private void writeConnectionState(SessionRuntime runtime) {
Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getConnectionTtlSeconds());
String value = runtime.sessionData.getMeetingId() + ":" + runtime.sessionData.getDeviceId() + ":" + runtime.streamToken;
redisTemplate.opsForValue().set(RedisKeys.realtimeMeetingGrpcConnectionKey(runtime.connectionId), value, ttl);
}
private void touchConnectionState(SessionRuntime runtime) {
writeConnectionState(runtime);
}
private long nextEventSeq(Long meetingId) {
Long value = redisTemplate.opsForValue().increment(RedisKeys.realtimeMeetingEventSeqKey(meetingId));
return value == null ? 1L : value;
}
private TranscriptEvent.TranscriptType toTranscriptType(boolean finalResult) {
return finalResult ? TranscriptEvent.TranscriptType.FINAL : TranscriptEvent.TranscriptType.PARTIAL;
}
private SessionStatusEvent buildStatusEvent(Long meetingId) {
RealtimeMeetingSessionStatusVO status = realtimeMeetingSessionStateService.getStatus(meetingId);
if (status == null) {
return SessionStatusEvent.newBuilder().setMeetingId(meetingId).build();
}
return SessionStatusEvent.newBuilder()
.setMeetingId(meetingId)
.setStatus(nullToEmpty(status.getStatus()))
.setHasTranscript(Boolean.TRUE.equals(status.getHasTranscript()))
.setCanResume(Boolean.TRUE.equals(status.getCanResume()))
.setRemainingSeconds(status.getRemainingSeconds() == null ? 0L : status.getRemainingSeconds())
.setActiveConnection(Boolean.TRUE.equals(status.getActiveConnection()))
.build();
}
private String nullToEmpty(String value) {
return value == null ? "" : value;
}
private final class UpstreamCallback implements AsrUpstreamBridgeService.AsrUpstreamEventListener {
private final SessionRuntime runtime;
private UpstreamCallback(SessionRuntime runtime) {
this.runtime = runtime;
}
@Override
public void onReady() {
if (runtime.closed.get()) {
return;
}
if (!realtimeMeetingSessionStateService.activate(runtime.sessionData.getMeetingId(), runtime.connectionId)) {
runtime.sendError("REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃实时连接,请先关闭旧连接后再继续", false);
closeStream(runtime.connectionId, "active_connection_exists", true);
return;
}
touchConnectionState(runtime);
runtime.send(RealtimeServerPacket.newBuilder()
.setReady(StreamReady.newBuilder()
.setConnectionId(runtime.connectionId)
.setMeetingId(runtime.sessionData.getMeetingId())
.setServerTime(System.currentTimeMillis())
.build())
.build());
runtime.send(RealtimeServerPacket.newBuilder().setStatus(buildStatusEvent(runtime.sessionData.getMeetingId())).build());
}
@Override
public void onTranscript(AsrUpstreamBridgeService.AsrTranscriptResult result) {
if (runtime.closed.get() || result == null || !result.isFinalResult() || result.getText() == null || result.getText().isBlank()) {
return;
}
RealtimeTranscriptItemDTO item = new RealtimeTranscriptItemDTO();
item.setSpeakerId(result.getSpeakerId());
item.setSpeakerName(result.getSpeakerName());
item.setContent(result.getText());
item.setStartTime(result.getStartTime());
item.setEndTime(result.getEndTime());
meetingCommandService.saveRealtimeTranscriptSnapshot(runtime.sessionData.getMeetingId(), item, result.isFinalResult());
runtime.send(RealtimeServerPacket.newBuilder()
.setTranscript(TranscriptEvent.newBuilder()
.setMeetingId(runtime.sessionData.getMeetingId())
.setEventSeq(nextEventSeq(runtime.sessionData.getMeetingId()))
.setType(toTranscriptType(result.isFinalResult()))
.setText(result.getText())
.setSpeakerId(nullToEmpty(result.getSpeakerId()))
.setSpeakerName(nullToEmpty(result.getSpeakerName()))
.setStartTime(result.getStartTime() == null ? 0 : result.getStartTime())
.setEndTime(result.getEndTime() == null ? 0 : result.getEndTime())
.build())
.build());
}
@Override
public void onError(String code, String message, boolean retryable) {
if (runtime.closed.get()) {
return;
}
runtime.sendError(code, message, retryable);
closeStream(runtime.connectionId, code == null ? "upstream_error" : code, true);
}
@Override
public void onClosed(String reason) {
closeStream(runtime.connectionId, reason == null ? "upstream_closed" : reason, true);
}
}
private static final class SessionRuntime {
private final String connectionId;
private final String streamToken;
private final AndroidRealtimeGrpcSessionData sessionData;
private final StreamObserver<RealtimeServerPacket> responseObserver;
private final AtomicBoolean closed = new AtomicBoolean(false);
private AsrUpstreamBridgeService.AsrUpstreamSession upstreamSession;
private SessionRuntime(String connectionId, String streamToken, AndroidRealtimeGrpcSessionData sessionData,
StreamObserver<RealtimeServerPacket> responseObserver) {
this.connectionId = connectionId;
this.streamToken = streamToken;
this.sessionData = sessionData;
this.responseObserver = responseObserver;
}
private void send(RealtimeServerPacket packet) {
try {
synchronized (responseObserver) {
responseObserver.onNext(packet);
}
} catch (Exception ex) {
RealtimeMeetingGrpcSessionServiceImpl.log.debug(
"Ignore downstream realtime gRPC delivery failure, connectionId={}",
connectionId,
ex
);
}
}
private void sendError(String code, String message, boolean retryable) {
send(RealtimeServerPacket.newBuilder()
.setError(ErrorEvent.newBuilder()
.setCode(code == null ? "REALTIME_ERROR" : code)
.setMessage(message == null ? "未知错误" : message)
.setRetryable(retryable)
.build())
.build());
}
private void complete() {
try {
responseObserver.onCompleted();
} catch (Exception ex) {
RealtimeMeetingGrpcSessionServiceImpl.log.debug(
"Ignore downstream realtime gRPC completion failure, connectionId={}",
connectionId,
ex
);
}
}
}
}

View File

@ -1,36 +0,0 @@
syntax = "proto3";
package imeeting.android.common;
option java_multiple_files = true;
option java_package = "com.imeeting.grpc.common";
option java_outer_classname = "AndroidCommonProto";
message ClientAuth {
enum AuthType {
AUTH_TYPE_UNSPECIFIED = 0;
NONE = 1;
DEVICE_TOKEN = 2;
USER_JWT = 3;
STREAM_TOKEN = 4;
}
AuthType auth_type = 1;
string access_token = 2;
string device_id = 3;
string tenant_code = 4;
string app_id = 5;
string app_version = 6;
string platform = 7;
}
message ErrorEvent {
string code = 1;
string message = 2;
bool retryable = 3;
}
message JsonPayload {
string topic = 1;
string json = 2;
}

View File

@ -1,85 +0,0 @@
syntax = "proto3";
package imeeting.android.gateway;
option java_multiple_files = true;
option java_package = "com.imeeting.grpc.gateway";
option java_outer_classname = "AndroidGatewayProto";
import "android/common.proto";
service AndroidGatewayService {
rpc Connect(stream GatewayClientPacket) returns (stream GatewayServerPacket);
}
message GatewayClientPacket {
string request_id = 1;
imeeting.android.common.ClientAuth auth = 2;
oneof body {
DeviceHello hello = 10;
Heartbeat heartbeat = 11;
Ack ack = 12;
Subscribe subscribe = 13;
}
}
message GatewayServerPacket {
string request_id = 1;
oneof body {
HelloAck hello_ack = 10;
ServerPush push = 11;
DevicePresenceEvent device_presence = 12;
imeeting.android.common.ErrorEvent error = 13;
Pong pong = 14;
}
}
message DeviceHello {
string device_id = 1;
string device_name = 2;
string device_model = 3;
string os_version = 4;
string app_version = 5;
string network_type = 6;
}
message HelloAck {
string connection_id = 1;
string auth_mode = 2;
int64 server_time = 3;
int64 heartbeat_interval_seconds = 4;
}
message Heartbeat {
string connection_id = 1;
int64 client_time = 2;
}
message Pong {
string connection_id = 1;
int64 server_time = 2;
}
message Ack {
string message_id = 1;
}
message Subscribe {
repeated string topics = 1;
}
message ServerPush {
string message_id = 1;
string topic = 2;
string type = 3;
string json = 4;
int64 server_time = 5;
}
message DevicePresenceEvent {
string device_id = 1;
string status = 2;
int64 last_seen_time = 3;
}

View File

@ -0,0 +1,117 @@
syntax = "proto3";
package imeeting.push.v1;
option java_multiple_files = true;
option java_package = "com.imeeting.grpc.push";
option java_outer_classname = "PushProto";
// =========================
//
// =========================
enum Platform {
PLATFORM_UNKNOWN = 0;
ANDROID = 1;
IOS = 2;
}
// =========================
//
// =========================
message ConnectRequest {
Platform platform = 1;
string app_version = 2;
string device_id = 3;
string user_id = 4;
string tenant_id = 5;
string connection_id = 6;
}
// =========================
//
// =========================
message ConnectResponse {
bool success = 1;
string message = 2;
}
// =========================
//
// =========================
message PushMessage {
string message_id = 1;
int64 timestamp = 2;
string type = 3;
string title = 4;
string content = 5;
bool need_ack = 6;
}
// =========================
// ACK
// =========================
message AckRequest {
string message_id = 1;
string device_id = 2;
string connection_id = 3;
}
// =========================
//
// =========================
message HeartbeatRequest {
string device_id = 1;
string connection_id = 2;
int64 timestamp = 3;
}
message HeartbeatResponse {
int64 timestamp = 1;
bool ok = 2;
}
// =========================
//
// =========================
message ErrorEvent {
string code = 1;
string message = 2;
bool retryable = 3;
}
// =========================
//
// =========================
message ClientMessage {
oneof payload {
ConnectRequest connect = 1; //
AckRequest ack = 2; //
HeartbeatRequest heartbeat = 3; //
}
}
// =========================
//
// =========================
message ServerMessage {
oneof payload {
ConnectResponse connect_ack = 1; //
PushMessage push = 2; //
HeartbeatResponse heartbeat = 3; //
ErrorEvent error = 4; //
}
}
// =========================
//
// =========================
service PushService {
//
rpc Communicate(stream ClientMessage)
returns (stream ServerMessage);
}

View File

@ -1,100 +0,0 @@
syntax = "proto3";
package imeeting.android.realtime;
option java_multiple_files = true;
option java_package = "com.imeeting.grpc.realtime";
option java_outer_classname = "RealtimeMeetingProto";
import "android/common.proto";
service RealtimeMeetingService {
rpc StreamMeetingAudio(stream RealtimeClientPacket) returns (stream RealtimeServerPacket);
}
message RealtimeClientPacket {
string request_id = 1;
imeeting.android.common.ClientAuth auth = 2;
oneof body {
OpenMeetingStream open = 10;
AudioChunk audio = 11;
RealtimeControl control = 12;
}
}
message RealtimeServerPacket {
string request_id = 1;
oneof body {
StreamReady ready = 10;
TranscriptEvent transcript = 11;
SessionStatusEvent status = 12;
imeeting.android.common.ErrorEvent error = 13;
StreamClosed closed = 14;
}
}
message OpenMeetingStream {
string stream_token = 1;
int64 meeting_id = 2;
int32 sample_rate = 3;
int32 channels = 4;
string encoding = 5;
}
message AudioChunk {
bytes pcm16 = 1;
int64 seq = 2;
int64 client_time = 3;
bool last_chunk = 4;
}
message RealtimeControl {
enum ControlType {
CONTROL_TYPE_UNSPECIFIED = 0;
START = 1;
STOP_SPEAKING = 2;
END_INPUT = 3;
CLOSE_STREAM = 4;
}
ControlType type = 1;
}
message StreamReady {
string connection_id = 1;
int64 meeting_id = 2;
int64 server_time = 3;
}
message TranscriptEvent {
enum TranscriptType {
TRANSCRIPT_TYPE_UNSPECIFIED = 0;
PARTIAL = 1;
FINAL = 2;
}
TranscriptType type = 1;
int64 meeting_id = 2;
int64 event_seq = 3;
string text = 4;
string speaker_id = 5;
string speaker_name = 6;
int32 start_time = 7;
int32 end_time = 8;
}
message SessionStatusEvent {
int64 meeting_id = 1;
string status = 2;
bool has_transcript = 3;
bool can_resume = 4;
int64 remaining_seconds = 5;
bool active_connection = 6;
}
message StreamClosed {
int64 meeting_id = 1;
string reason = 2;
}

View File

@ -1,110 +0,0 @@
package com.imeeting.grpc.realtime;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.grpc.common.ClientAuth;
import com.imeeting.service.android.AndroidAuthService;
import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class RealtimeMeetingGrpcServiceTest {
@Test
void streamMeetingAudioShouldReturnErrorEventWhenAuthenticationFails() {
AndroidAuthService authService = mock(AndroidAuthService.class);
RealtimeMeetingGrpcSessionService sessionService = mock(RealtimeMeetingGrpcSessionService.class);
RealtimeMeetingGrpcService service = new RealtimeMeetingGrpcService(authService, sessionService);
CapturingObserver responseObserver = new CapturingObserver();
when(authService.authenticateGrpc(any(ClientAuth.class), isNull()))
.thenThrow(new RuntimeException("缺少 Android deviceId"));
StreamObserver<RealtimeClientPacket> requestObserver = service.streamMeetingAudio(responseObserver);
RealtimeClientPacket openPacket = RealtimeClientPacket.newBuilder()
.setRequestId("rt-open-001")
.setOpen(OpenMeetingStream.newBuilder().setMeetingId(1001L).build())
.build();
assertDoesNotThrow(() -> requestObserver.onNext(openPacket));
assertEquals(1, responseObserver.values.size());
RealtimeServerPacket errorPacket = responseObserver.values.get(0);
assertEquals("rt-open-001", errorPacket.getRequestId());
assertTrue(errorPacket.hasError());
assertEquals("REALTIME_GRPC_ERROR", errorPacket.getError().getCode());
assertEquals("缺少 Android deviceId", errorPacket.getError().getMessage());
assertTrue(responseObserver.completed);
assertNull(responseObserver.error);
verify(sessionService, never()).closeStream(anyString(), anyString(), anyBoolean());
}
@Test
void streamMeetingAudioShouldReturnErrorEventWhenSessionOpenFails() {
AndroidAuthService authService = mock(AndroidAuthService.class);
RealtimeMeetingGrpcSessionService sessionService = mock(RealtimeMeetingGrpcSessionService.class);
RealtimeMeetingGrpcService service = new RealtimeMeetingGrpcService(authService, sessionService);
CapturingObserver responseObserver = new CapturingObserver();
AndroidAuthContext authContext = new AndroidAuthContext();
authContext.setDeviceId("android-test-001");
when(authService.authenticateGrpc(any(ClientAuth.class), isNull())).thenReturn(authContext);
when(sessionService.openStream(1001L, "", authContext, responseObserver))
.thenThrow(new RuntimeException("ASR 模型未配置 WebSocket 地址"));
StreamObserver<RealtimeClientPacket> requestObserver = service.streamMeetingAudio(responseObserver);
RealtimeClientPacket openPacket = RealtimeClientPacket.newBuilder()
.setRequestId("rt-open-002")
.setAuth(ClientAuth.newBuilder().setDeviceId("android-test-001").build())
.setOpen(OpenMeetingStream.newBuilder().setMeetingId(1001L).build())
.build();
assertDoesNotThrow(() -> requestObserver.onNext(openPacket));
assertEquals(1, responseObserver.values.size());
RealtimeServerPacket errorPacket = responseObserver.values.get(0);
assertEquals("rt-open-002", errorPacket.getRequestId());
assertTrue(errorPacket.hasError());
assertEquals("REALTIME_GRPC_ERROR", errorPacket.getError().getCode());
assertEquals("ASR 模型未配置 WebSocket 地址", errorPacket.getError().getMessage());
assertTrue(responseObserver.completed);
assertNull(responseObserver.error);
verify(sessionService, never()).closeStream(anyString(), anyString(), anyBoolean());
}
private static final class CapturingObserver implements StreamObserver<RealtimeServerPacket> {
private final List<RealtimeServerPacket> values = new ArrayList<>();
private Throwable error;
private boolean completed;
@Override
public void onNext(RealtimeServerPacket value) {
values.add(value);
}
@Override
public void onError(Throwable t) {
error = t;
}
@Override
public void onCompleted() {
completed = true;
}
}
}

View File

@ -1,430 +0,0 @@
package com.imeeting.manual;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import com.imeeting.grpc.common.ClientAuth;
import com.imeeting.grpc.realtime.AudioChunk;
import com.imeeting.grpc.realtime.OpenMeetingStream;
import com.imeeting.grpc.realtime.RealtimeClientPacket;
import com.imeeting.grpc.realtime.RealtimeControl;
import com.imeeting.grpc.realtime.RealtimeMeetingServiceGrpc;
import com.imeeting.grpc.realtime.RealtimeServerPacket;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled("Manual realtime integration test; requires a running local backend, gRPC service, and PCM fixture.")
public class AndroidRealtimeGrpcManualTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Test
void shouldRunAndroidRealtimeFlow() throws Exception {
TestConfig config = TestConfig.load();
System.out.println("[manual-test] config=" + config);
CreatedMeetingInfo meetingInfo = createRealtimeMeeting(config);
System.out.println("[manual-test] created meetingId=" + meetingInfo.meetingId());
ManualRealtimeClient client = new ManualRealtimeClient(config, meetingInfo);
try {
client.open();
assertTrue(client.awaitReady(config.readyTimeoutSeconds(), TimeUnit.SECONDS),
"Did not receive StreamReady within timeout");
client.streamPcmFile(config.pcmFile(), config.chunkMs());
client.sendStopSpeaking();
if (config.afterStopWaitSeconds() > 0) {
TimeUnit.SECONDS.sleep(config.afterStopWaitSeconds());
}
client.sendCloseStream();
client.completeClientStream();
client.awaitClosed(Math.max(5, config.afterStopWaitSeconds()), TimeUnit.SECONDS);
assertTrue(client.isReadyReceived(), "StreamReady was not received");
assertTrue(client.getErrorMessage() == null, "Realtime stream returned error: " + client.getErrorMessage());
if (config.requireTranscript()) {
assertTrue(client.getTranscriptCount() > 0,
"No transcript events received. Check ASR config, PCM format, and upstream websocket connectivity.");
}
} finally {
client.shutdown();
}
if (config.beforeCompleteWaitSeconds() > 0) {
TimeUnit.SECONDS.sleep(config.beforeCompleteWaitSeconds());
}
completeRealtimeMeeting(config, meetingInfo.meetingId());
if (config.afterCompleteWaitSeconds() > 0) {
TimeUnit.SECONDS.sleep(config.afterCompleteWaitSeconds());
}
JsonNode transcripts = queryTranscripts(config, meetingInfo.meetingId());
if (config.requireTranscript()) {
assertTrue(transcripts.isArray() && transcripts.size() > 0,
"No persisted transcripts found after complete. meetingId=" + meetingInfo.meetingId());
}
}
private CreatedMeetingInfo createRealtimeMeeting(TestConfig config) throws Exception {
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
Map<String, Object> body = new LinkedHashMap<>();
body.put("title", config.meetingTitle());
if (config.tags() != null && !config.tags().isBlank()) {
body.put("tags", config.tags());
}
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(config.baseUrl() + "/api/android/meeting/realtime/create"))
.timeout(Duration.ofSeconds(20))
.header("Content-Type", "application/json")
.header("X-Android-Device-Id", config.deviceId())
.header("X-Android-Platform", "android")
.POST(HttpRequest.BodyPublishers.ofString(OBJECT_MAPPER.writeValueAsString(body), StandardCharsets.UTF_8))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
assertTrue(response.statusCode() >= 200 && response.statusCode() < 300,
"Create realtime meeting failed, httpStatus=" + response.statusCode() + ", body=" + response.body());
JsonNode data = readData(response.body());
long meetingId = data.path("meetingId").asLong(0L);
assertTrue(meetingId > 0, "Invalid meetingId in response: " + response.body());
int sampleRate = data.path("sampleRate").asInt(16000);
int channels = data.path("channels").asInt(1);
String encoding = text(data, "encoding");
assertNotNull(encoding, "encoding is null in response: " + response.body());
return new CreatedMeetingInfo(meetingId, sampleRate, channels, encoding);
}
private void completeRealtimeMeeting(TestConfig config, long meetingId) throws Exception {
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
Map<String, Object> body = new LinkedHashMap<>();
body.put("overwriteAudio", config.overwriteAudio());
if (config.completeAudioUrl() != null && !config.completeAudioUrl().isBlank()) {
body.put("audioUrl", config.completeAudioUrl());
}
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(config.baseUrl() + "/api/android/meeting/" + meetingId + "/realtime/complete"))
.timeout(Duration.ofSeconds(20))
.header("Content-Type", "application/json")
.header("X-Android-Device-Id", config.deviceId())
.header("X-Android-Platform", "android")
.POST(HttpRequest.BodyPublishers.ofString(OBJECT_MAPPER.writeValueAsString(body), StandardCharsets.UTF_8))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
assertTrue(response.statusCode() >= 200 && response.statusCode() < 300,
"Complete realtime meeting failed, httpStatus=" + response.statusCode() + ", body=" + response.body());
JsonNode data = readData(response.body());
assertTrue(data.asBoolean(false), "Complete realtime meeting did not return true: " + response.body());
}
private JsonNode queryTranscripts(TestConfig config, long meetingId) throws Exception {
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(config.baseUrl() + "/api/android/meeting/" + meetingId + "/transcripts"))
.timeout(Duration.ofSeconds(20))
.header("X-Android-Device-Id", config.deviceId())
.header("X-Android-Platform", "android")
.GET()
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
assertTrue(response.statusCode() >= 200 && response.statusCode() < 300,
"Query transcripts failed, httpStatus=" + response.statusCode() + ", body=" + response.body());
return readData(response.body());
}
private JsonNode readData(String responseBody) throws Exception {
JsonNode root = OBJECT_MAPPER.readTree(responseBody);
JsonNode data = root.path("data");
assertTrue(!data.isMissingNode() && !data.isNull(), "Invalid REST response, missing data: " + responseBody);
return data;
}
private String text(JsonNode node, String fieldName) {
JsonNode value = node.path(fieldName);
return value.isMissingNode() || value.isNull() ? null : value.asText();
}
private record CreatedMeetingInfo(long meetingId, int sampleRate, int channels, String encoding) {
}
private record TestConfig(String baseUrl,
String grpcHost,
int grpcPort,
String deviceId,
String meetingTitle,
String tags,
Path pcmFile,
int chunkMs,
long readyTimeoutSeconds,
long afterStopWaitSeconds,
long beforeCompleteWaitSeconds,
long afterCompleteWaitSeconds,
boolean overwriteAudio,
String completeAudioUrl,
boolean requireTranscript) {
private static final String DEFAULT_BASE_URL = "http://127.0.0.1:8081";
private static final String DEFAULT_GRPC_HOST = "127.0.0.1";
private static final int DEFAULT_GRPC_PORT = 19090;
private static final String DEFAULT_DEVICE_ID = "android-local-test-001";
private static final String DEFAULT_MEETING_TITLE = "android-realtime-manual-test";
private static final String DEFAULT_TAGS = "android,grpc,manual";
private static final String DEFAULT_PCM_FILE = "C:\\Users\\85206\\Downloads\\no_recoder_audio.pcm";
private static final int DEFAULT_CHUNK_MS = 40;
private static final long DEFAULT_READY_TIMEOUT_SECONDS = 15L;
private static final long DEFAULT_AFTER_STOP_WAIT_SECONDS = 8L;
private static final long DEFAULT_BEFORE_COMPLETE_WAIT_SECONDS = 3L;
private static final long DEFAULT_AFTER_COMPLETE_WAIT_SECONDS = 1L;
private static final boolean DEFAULT_OVERWRITE_AUDIO = false;
private static final String DEFAULT_COMPLETE_AUDIO_URL = "";
private static final boolean DEFAULT_REQUIRE_TRANSCRIPT = true;
private static TestConfig load() {
String baseUrl = stringProperty("manualRealtime.baseUrl", DEFAULT_BASE_URL);
String grpcHost = stringProperty("manualRealtime.grpcHost", DEFAULT_GRPC_HOST);
int grpcPort = intProperty("manualRealtime.grpcPort", DEFAULT_GRPC_PORT);
String deviceId = stringProperty("manualRealtime.deviceId", DEFAULT_DEVICE_ID);
String meetingTitle = stringProperty("manualRealtime.meetingTitle", DEFAULT_MEETING_TITLE);
String tags = stringProperty("manualRealtime.tags", DEFAULT_TAGS);
Path pcmFile = Path.of(stringProperty("manualRealtime.pcmFile", DEFAULT_PCM_FILE));
int chunkMs = intProperty("manualRealtime.chunkMs", DEFAULT_CHUNK_MS);
long readyTimeoutSeconds = longProperty("manualRealtime.readyTimeoutSeconds", DEFAULT_READY_TIMEOUT_SECONDS);
long afterStopWaitSeconds = longProperty("manualRealtime.afterStopWaitSeconds", DEFAULT_AFTER_STOP_WAIT_SECONDS);
long beforeCompleteWaitSeconds = longProperty("manualRealtime.beforeCompleteWaitSeconds", DEFAULT_BEFORE_COMPLETE_WAIT_SECONDS);
long afterCompleteWaitSeconds = longProperty("manualRealtime.afterCompleteWaitSeconds", DEFAULT_AFTER_COMPLETE_WAIT_SECONDS);
boolean overwriteAudio = booleanProperty("manualRealtime.overwriteAudio", DEFAULT_OVERWRITE_AUDIO);
String completeAudioUrl = stringProperty("manualRealtime.completeAudioUrl", DEFAULT_COMPLETE_AUDIO_URL);
boolean requireTranscript = booleanProperty("manualRealtime.requireTranscript", DEFAULT_REQUIRE_TRANSCRIPT);
assertTrue(Files.exists(pcmFile),
"PCM file does not exist: " + pcmFile + ". Please set -DmanualRealtime.pcmFile=<your pcm file path>.");
assertTrue(Files.isRegularFile(pcmFile), "PCM file is not a regular file: " + pcmFile);
return new TestConfig(baseUrl, grpcHost, grpcPort, deviceId, meetingTitle, tags, pcmFile,
chunkMs, readyTimeoutSeconds, afterStopWaitSeconds,
beforeCompleteWaitSeconds, afterCompleteWaitSeconds, overwriteAudio, completeAudioUrl, requireTranscript);
}
private static String stringProperty(String key, String defaultValue) {
String value = System.getProperty(key);
return value == null || value.isBlank() ? defaultValue : value.trim();
}
private static int intProperty(String key, int defaultValue) {
return Integer.parseInt(stringProperty(key, Integer.toString(defaultValue)));
}
private static long longProperty(String key, long defaultValue) {
return Long.parseLong(stringProperty(key, Long.toString(defaultValue)));
}
private static boolean booleanProperty(String key, boolean defaultValue) {
return Boolean.parseBoolean(stringProperty(key, Boolean.toString(defaultValue)));
}
}
private static final class ManualRealtimeClient {
private final TestConfig config;
private final CreatedMeetingInfo meetingInfo;
private final CountDownLatch readyLatch = new CountDownLatch(1);
private final CountDownLatch closedLatch = new CountDownLatch(1);
private final AtomicInteger transcriptCount = new AtomicInteger();
private final AtomicReference<String> errorMessage = new AtomicReference<>();
private final ManagedChannel channel;
private final StreamObserver<RealtimeClientPacket> requestObserver;
private volatile boolean readyReceived;
private ManualRealtimeClient(TestConfig config, CreatedMeetingInfo meetingInfo) {
this.config = config;
this.meetingInfo = meetingInfo;
this.channel = ManagedChannelBuilder.forAddress(config.grpcHost(), config.grpcPort())
.usePlaintext()
.build();
this.requestObserver = RealtimeMeetingServiceGrpc.newStub(channel)
.streamMeetingAudio(new StreamObserver<>() {
@Override
public void onNext(RealtimeServerPacket packet) {
switch (packet.getBodyCase()) {
case READY -> {
readyReceived = true;
readyLatch.countDown();
System.out.println("[manual-test] READY connectionId=" + packet.getReady().getConnectionId());
}
case STATUS -> System.out.println("[manual-test] STATUS status=" + packet.getStatus().getStatus()
+ ", active=" + packet.getStatus().getActiveConnection());
case TRANSCRIPT -> {
transcriptCount.incrementAndGet();
System.out.println("[manual-test] TRANSCRIPT type=" + packet.getTranscript().getType()
+ ", text=" + packet.getTranscript().getText());
}
case ERROR -> {
String message = packet.getError().getCode() + ": " + packet.getError().getMessage();
errorMessage.compareAndSet(null, message);
System.out.println("[manual-test] ERROR " + message);
}
case CLOSED -> {
System.out.println("[manual-test] CLOSED reason=" + packet.getClosed().getReason());
closedLatch.countDown();
}
case BODY_NOT_SET -> System.out.println("[manual-test] EMPTY packet received");
}
}
@Override
public void onError(Throwable throwable) {
errorMessage.compareAndSet(null, throwable.getMessage());
readyLatch.countDown();
closedLatch.countDown();
System.out.println("[manual-test] gRPC onError: " + throwable.getMessage());
}
@Override
public void onCompleted() {
closedLatch.countDown();
System.out.println("[manual-test] gRPC onCompleted");
}
});
}
private void open() {
ClientAuth auth = ClientAuth.newBuilder()
.setAuthType(ClientAuth.AuthType.NONE)
.setDeviceId(config.deviceId())
.setPlatform("android")
.setAppVersion("manual-test")
.build();
OpenMeetingStream open = OpenMeetingStream.newBuilder()
.setStreamToken("")
.setMeetingId(meetingInfo.meetingId())
.setSampleRate(meetingInfo.sampleRate())
.setChannels(meetingInfo.channels())
.setEncoding(meetingInfo.encoding())
.build();
requestObserver.onNext(RealtimeClientPacket.newBuilder()
.setRequestId("manual-open")
.setAuth(auth)
.setOpen(open)
.build());
}
private boolean awaitReady(long timeout, TimeUnit timeUnit) throws InterruptedException {
return readyLatch.await(timeout, timeUnit);
}
private void streamPcmFile(Path pcmFile, int chunkMs) throws Exception {
int bytesPerSample = 2;
int chunkSize = meetingInfo.sampleRate() * meetingInfo.channels() * bytesPerSample * chunkMs / 1000;
byte[] buffer = new byte[chunkSize];
long seq = 1L;
try (InputStream inputStream = Files.newInputStream(pcmFile)) {
while (true) {
int read = inputStream.read(buffer);
if (read <= 0) {
break;
}
AudioChunk audioChunk = AudioChunk.newBuilder()
.setPcm16(ByteString.copyFrom(buffer, 0, read))
.setSeq(seq)
.setClientTime(System.currentTimeMillis())
.setLastChunk(false)
.build();
requestObserver.onNext(RealtimeClientPacket.newBuilder()
.setRequestId("manual-audio-" + seq)
.setAudio(audioChunk)
.build());
seq++;
Thread.sleep(chunkMs);
}
}
}
private void sendStopSpeaking() {
requestObserver.onNext(RealtimeClientPacket.newBuilder()
.setRequestId("manual-stop-speaking")
.setControl(RealtimeControl.newBuilder()
.setType(RealtimeControl.ControlType.STOP_SPEAKING)
.build())
.build());
}
private void sendCloseStream() {
requestObserver.onNext(RealtimeClientPacket.newBuilder()
.setRequestId("manual-close-stream")
.setControl(RealtimeControl.newBuilder()
.setType(RealtimeControl.ControlType.CLOSE_STREAM)
.build())
.build());
}
private void completeClientStream() {
requestObserver.onCompleted();
}
private boolean awaitClosed(long timeout, TimeUnit timeUnit) throws InterruptedException {
return closedLatch.await(timeout, timeUnit);
}
private boolean isReadyReceived() {
return readyReceived;
}
private int getTranscriptCount() {
return transcriptCount.get();
}
private String getErrorMessage() {
return errorMessage.get();
}
private void shutdown() throws InterruptedException {
channel.shutdownNow();
channel.awaitTermination(5, TimeUnit.SECONDS);
}
}
}

View File

@ -1,123 +0,0 @@
package com.imeeting.service.realtime.impl;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
import com.imeeting.grpc.realtime.RealtimeServerPacket;
import com.imeeting.grpc.realtime.TranscriptEvent;
import com.imeeting.service.biz.MeetingCommandService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService;
import com.imeeting.service.realtime.AsrUpstreamBridgeService;
import com.imeeting.service.realtime.RealtimeMeetingAudioStorageService;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class RealtimeMeetingGrpcSessionServiceImplTest {
@Test
void openStreamShouldOnlyForwardAndPersistFinalTranscript() {
AndroidRealtimeSessionTicketService ticketService = mock(AndroidRealtimeSessionTicketService.class);
AsrUpstreamBridgeService asrUpstreamBridgeService = mock(AsrUpstreamBridgeService.class);
MeetingCommandService meetingCommandService = mock(MeetingCommandService.class);
RealtimeMeetingSessionStateService sessionStateService = mock(RealtimeMeetingSessionStateService.class);
RealtimeMeetingAudioStorageService audioStorageService = mock(RealtimeMeetingAudioStorageService.class);
StringRedisTemplate redisTemplate = mock(StringRedisTemplate.class);
@SuppressWarnings("unchecked")
ValueOperations<String, String> valueOperations = mock(ValueOperations.class);
when(redisTemplate.opsForValue()).thenReturn(valueOperations);
GrpcServerProperties grpcServerProperties = new GrpcServerProperties();
AndroidRealtimeGrpcSessionData sessionData = new AndroidRealtimeGrpcSessionData();
sessionData.setMeetingId(1001L);
sessionData.setDeviceId("android-test-001");
sessionData.setTargetWsUrl("ws://localhost/mock-asr");
when(ticketService.prepareSessionData(eq(1001L), any(AndroidAuthContext.class))).thenReturn(sessionData);
AsrUpstreamBridgeService.AsrUpstreamSession upstreamSession = mock(AsrUpstreamBridgeService.AsrUpstreamSession.class);
ArgumentCaptor<AsrUpstreamBridgeService.AsrUpstreamEventListener> listenerCaptor =
ArgumentCaptor.forClass(AsrUpstreamBridgeService.AsrUpstreamEventListener.class);
when(asrUpstreamBridgeService.openSession(eq(sessionData), anyString(), listenerCaptor.capture()))
.thenReturn(upstreamSession);
RealtimeMeetingGrpcSessionServiceImpl service = new RealtimeMeetingGrpcSessionServiceImpl(
ticketService,
asrUpstreamBridgeService,
meetingCommandService,
sessionStateService,
audioStorageService,
redisTemplate,
grpcServerProperties
);
CapturingObserver responseObserver = new CapturingObserver();
AndroidAuthContext authContext = new AndroidAuthContext();
authContext.setDeviceId("android-test-001");
service.openStream(1001L, "", authContext, responseObserver);
AsrUpstreamBridgeService.AsrUpstreamEventListener listener = listenerCaptor.getValue();
listener.onTranscript(new AsrUpstreamBridgeService.AsrTranscriptResult(
false,
"partial transcript",
"spk-1",
"Speaker 1",
100,
500
));
assertTrue(responseObserver.values.isEmpty());
verify(meetingCommandService, never()).saveRealtimeTranscriptSnapshot(anyLong(), any(), eq(false));
listener.onTranscript(new AsrUpstreamBridgeService.AsrTranscriptResult(
true,
"final transcript",
"spk-1",
"Speaker 1",
100,
800
));
assertEquals(1, responseObserver.values.size());
RealtimeServerPacket packet = responseObserver.values.get(0);
assertTrue(packet.hasTranscript());
assertEquals(TranscriptEvent.TranscriptType.FINAL, packet.getTranscript().getType());
assertEquals("final transcript", packet.getTranscript().getText());
verify(meetingCommandService, times(1)).saveRealtimeTranscriptSnapshot(eq(1001L), any(), eq(true));
}
private static final class CapturingObserver implements StreamObserver<RealtimeServerPacket> {
private final List<RealtimeServerPacket> values = new ArrayList<>();
@Override
public void onNext(RealtimeServerPacket value) {
values.add(value);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
}
}

View File

@ -154,6 +154,21 @@ export async function deleteDevice(id: number) {
return resp.data.data as boolean;
}
export async function listManagedDevices() {
const resp = await http.get("/api/admin/devices");
return resp.data.data as DeviceInfo[];
}
export async function updateManagedDevice(id: number, payload: Partial<DeviceInfo>) {
const resp = await http.put(`/api/admin/devices/${id}`, payload);
return resp.data.data as DeviceInfo;
}
export async function kickManagedDevice(id: number) {
const resp = await http.post(`/api/admin/devices/${id}/kick`);
return resp.data.data as boolean;
}
export async function listUserRoles(userId: number) {
const resp = await http.get(`/sys/api/users/${userId}/roles`);
return resp.data.data as number[];

View File

@ -325,12 +325,28 @@
"searchLabel": "Search devices",
"newDevice": "New Device",
"device": "Device",
"totalDevices": "Total Devices",
"totalDevicesHint": "Registered terminals in the device registry",
"onlineDevices": "Online Devices",
"onlineDevicesHint": "Currently connected through gRPC",
"enabledDevices": "Enabled Devices",
"enabledDevicesHint": "Allowed to access Android APIs",
"unnamedDevice": "Unnamed device",
"unboundAccount": "Unbound",
"ownerId": "ID",
"terminalType": "Terminal Type",
"terminalVersion": "Terminal Version",
"onlineStatus": "Online Status",
"online": "Online",
"offline": "Offline",
"lastOnlineAt": "Last Online",
"enabled": "Enabled",
"disabled": "Disabled",
"updatedAt": "Updated At",
"editDevice": "Edit device",
"kickDevice": "Kick device",
"kickDeviceConfirm": "Kick this device offline?",
"kickSucceeded": "Device has been kicked offline",
"deleteDevice": "Delete this device?",
"drawerTitleCreate": "New Device",
"drawerTitleEdit": "Edit Device",

View File

@ -325,12 +325,28 @@
"searchLabel": "搜索设备",
"newDevice": "新建设备",
"device": "设备",
"totalDevices": "设备总数",
"totalDevicesHint": "已登记到设备管理中的终端数量",
"onlineDevices": "在线数量",
"onlineDevicesHint": "当前通过 gRPC 保持连接的设备",
"enabledDevices": "启用设备",
"enabledDevicesHint": "允许访问安卓接口的设备数量",
"unnamedDevice": "未命名设备",
"unboundAccount": "未绑定",
"ownerId": "ID",
"terminalType": "终端类型",
"terminalVersion": "终端版本",
"onlineStatus": "在线状态",
"online": "在线",
"offline": "离线",
"lastOnlineAt": "最后在线时间",
"enabled": "启用",
"disabled": "禁用",
"updatedAt": "更新时间",
"editDevice": "编辑设备",
"kickDevice": "踢下线",
"kickDeviceConfirm": "确认踢该设备下线?",
"kickSucceeded": "设备已踢下线",
"deleteDevice": "确认删除该设备?",
"drawerTitleCreate": "新建设备",
"drawerTitleEdit": "编辑设备",

View File

@ -2,6 +2,109 @@
padding: 24px;
}
.devices-metrics {
margin-bottom: 16px;
}
.devices-metric-card {
position: relative;
overflow: hidden;
border-radius: 18px;
box-shadow: 0 14px 36px rgba(16, 24, 40, 0.08);
min-height: 144px;
.ant-card-body {
position: relative;
z-index: 1;
display: flex;
align-items: center;
gap: 16px;
padding: 22px 24px;
}
}
.devices-metric-card::before {
content: "";
position: absolute;
inset: 0;
opacity: 1;
}
.devices-metric-card::after {
content: "";
position: absolute;
right: -28px;
top: -28px;
width: 120px;
height: 120px;
border-radius: 999px;
background: rgba(255, 255, 255, 0.3);
}
.devices-metric-card--total::before {
background: linear-gradient(135deg, #f7f9ff 0%, #eef4ff 100%);
}
.devices-metric-card--online::before {
background: linear-gradient(135deg, #f0fff8 0%, #dcfce7 100%);
}
.devices-metric-card--enabled::before {
background: linear-gradient(135deg, #fff8ed 0%, #ffedd5 100%);
}
.devices-metric-card__icon {
width: 54px;
height: 54px;
border-radius: 16px;
display: flex;
align-items: center;
justify-content: center;
font-size: 24px;
flex: 0 0 auto;
}
.devices-metric-card--total .devices-metric-card__icon {
background: rgba(49, 110, 255, 0.12);
color: #275df5;
}
.devices-metric-card--online .devices-metric-card__icon {
background: rgba(22, 163, 74, 0.12);
color: #15803d;
}
.devices-metric-card--enabled .devices-metric-card__icon {
background: rgba(234, 88, 12, 0.12);
color: #c2410c;
}
.devices-metric-card__content {
min-width: 0;
}
.devices-metric-card__label {
font-size: 13px;
font-weight: 600;
letter-spacing: 0.04em;
text-transform: uppercase;
color: rgba(15, 23, 42, 0.62);
margin-bottom: 6px;
}
.devices-metric-card__value {
font-size: 34px;
line-height: 1;
font-weight: 700;
color: #0f172a;
margin-bottom: 10px;
}
.devices-metric-card__hint {
font-size: 13px;
color: rgba(15, 23, 42, 0.68);
}
.devices-header {
display: flex;
justify-content: space-between;
@ -14,7 +117,8 @@
}
.devices-table-card {
border-radius: 8px;
border-radius: 18px;
box-shadow: 0 12px 28px rgba(15, 23, 42, 0.06);
}
.devices-table-toolbar {
@ -28,12 +132,12 @@
.device-icon-placeholder {
width: 40px;
height: 40px;
background-color: #f0f5ff;
border-radius: 8px;
background: linear-gradient(135deg, #f1f5ff 0%, #e0e7ff 100%);
border-radius: 12px;
display: flex;
align-items: center;
justify-content: center;
color: #1890ff;
color: #315efb;
font-size: 20px;
}
@ -57,3 +161,21 @@
.tabular-nums {
font-variant-numeric: tabular-nums;
}
@media (max-width: 768px) {
.devices-page {
padding: 16px;
}
.devices-metric-card {
min-height: auto;
}
.devices-metric-card .ant-card-body {
padding: 18px;
}
.devices-metric-card__value {
font-size: 30px;
}
}

View File

@ -1,20 +1,18 @@
import { Button, Card, Drawer, Form, Input, Popconfirm, Select, Space, Table, Tag, Typography, message } from "antd";
import { DeleteOutlined, DesktopOutlined, EditOutlined, PlusOutlined, SearchOutlined, UserOutlined } from "@ant-design/icons";
import { Button, Card, Col, Drawer, Form, Input, Popconfirm, Row, Select, Space, Table, Tag, Typography, message } from "antd";
import { CheckCircleOutlined, DesktopOutlined, DisconnectOutlined, EditOutlined, SearchOutlined, ThunderboltOutlined, UserOutlined } from "@ant-design/icons";
import { useEffect, useMemo, useState } from "react";
import { useTranslation } from "react-i18next";
import { createDevice, deleteDevice, listDevices, listUsers, updateDevice } from "@/api";
import { kickManagedDevice, listManagedDevices, updateManagedDevice } from "@/api";
import PageHeader from "@/components/shared/PageHeader";
import { useDict } from "@/hooks/useDict";
import { usePermission } from "@/hooks/usePermission";
import type { DeviceInfo, SysUser } from "@/types";
import type { DeviceInfo } from "@/types";
import { getStandardPagination } from "@/utils/pagination";
import "./index.less";
const { Text } = Typography;
type DeviceFormValues = {
userId: number;
deviceCode: string;
deviceName?: string;
status: number;
};
@ -26,14 +24,7 @@ export default function Devices() {
const [loading, setLoading] = useState(false);
const [saving, setSaving] = useState(false);
const [searchText, setSearchText] = useState("");
const handleSearch = () => {};
const handleResetSearch = () => {
setSearchText("");
};
const [devices, setDevices] = useState<DeviceInfo[]>([]);
const [users, setUsers] = useState<SysUser[]>([]);
const [open, setOpen] = useState(false);
const [editing, setEditing] = useState<DeviceInfo | null>(null);
const [form] = Form.useForm<DeviceFormValues>();
@ -41,9 +32,8 @@ export default function Devices() {
const loadData = async () => {
setLoading(true);
try {
const [deviceList, userList] = await Promise.all([listDevices(), listUsers()]);
const deviceList = await listManagedDevices();
setDevices(deviceList || []);
setUsers(userList || []);
} finally {
setLoading(false);
}
@ -53,42 +43,35 @@ export default function Devices() {
loadData();
}, []);
const userMap = useMemo(() => {
const map: Record<number, SysUser> = {};
users.forEach((user) => {
map[user.userId] = user;
});
return map;
}, [users]);
const filteredData = useMemo(() => {
if (!searchText) {
return devices;
}
const lower = searchText.toLowerCase();
return devices.filter((device) => {
const owner = userMap[device.userId];
const ownerName = device.displayName || "";
const ownerUsername = device.username || "";
return (
device.deviceCode.toLowerCase().includes(lower) ||
(device.deviceName || "").toLowerCase().includes(lower) ||
(owner?.displayName || "").toLowerCase().includes(lower) ||
String(device.userId).includes(lower)
(device.terminalType || "").toLowerCase().includes(lower) ||
(device.terminalVersion || "").toLowerCase().includes(lower) ||
ownerName.toLowerCase().includes(lower) ||
ownerUsername.toLowerCase().includes(lower)
);
});
}, [devices, searchText, userMap]);
}, [devices, searchText]);
const openCreate = () => {
setEditing(null);
form.resetFields();
form.setFieldsValue({ status: 1 });
setOpen(true);
};
const stats = useMemo(() => {
const total = devices.length;
const online = devices.filter((device) => device.online).length;
const enabled = devices.filter((device) => device.status === 1).length;
return { total, online, enabled };
}, [devices]);
const openEdit = (record: DeviceInfo) => {
setEditing(record);
form.setFieldsValue({
userId: record.userId,
deviceCode: record.deviceCode,
deviceName: record.deviceName,
status: record.status ?? 1
});
@ -96,22 +79,16 @@ export default function Devices() {
};
const submit = async () => {
if (!editing) {
return;
}
const values = await form.validateFields();
setSaving(true);
try {
const payload: Partial<DeviceInfo> = {
userId: values.userId,
deviceCode: values.deviceCode,
await updateManagedDevice(editing.deviceId, {
deviceName: values.deviceName,
status: values.status
};
if (editing) {
await updateDevice(editing.deviceId, payload);
} else {
await createDevice(payload);
}
});
message.success(t("devicesExt.operationSucceeded"));
setOpen(false);
await loadData();
@ -120,9 +97,9 @@ export default function Devices() {
}
};
const remove = async (id: number) => {
await deleteDevice(id);
message.success(t("devicesExt.operationSucceeded"));
const kick = async (record: DeviceInfo) => {
await kickManagedDevice(record.deviceId);
message.success(t("devicesExt.kickSucceeded"));
await loadData();
};
@ -130,26 +107,57 @@ export default function Devices() {
<div className="app-page devices-page">
<PageHeader title={t("devices.title")} subtitle={t("devices.subtitle")} />
<Row gutter={[16, 16]} className="devices-metrics">
<Col xs={24} md={8}>
<Card className="devices-metric-card devices-metric-card--total" bordered={false}>
<div className="devices-metric-card__icon">
<DesktopOutlined aria-hidden="true" />
</div>
<div className="devices-metric-card__content">
<div className="devices-metric-card__label">{t("devicesExt.totalDevices")}</div>
<div className="devices-metric-card__value tabular-nums">{stats.total}</div>
<div className="devices-metric-card__hint">{t("devicesExt.totalDevicesHint")}</div>
</div>
</Card>
</Col>
<Col xs={24} md={8}>
<Card className="devices-metric-card devices-metric-card--online" bordered={false}>
<div className="devices-metric-card__icon">
<ThunderboltOutlined aria-hidden="true" />
</div>
<div className="devices-metric-card__content">
<div className="devices-metric-card__label">{t("devicesExt.onlineDevices")}</div>
<div className="devices-metric-card__value tabular-nums">{stats.online}</div>
<div className="devices-metric-card__hint">{t("devicesExt.onlineDevicesHint")}</div>
</div>
</Card>
</Col>
<Col xs={24} md={8}>
<Card className="devices-metric-card devices-metric-card--enabled" bordered={false}>
<div className="devices-metric-card__icon">
<CheckCircleOutlined aria-hidden="true" />
</div>
<div className="devices-metric-card__content">
<div className="devices-metric-card__label">{t("devicesExt.enabledDevices")}</div>
<div className="devices-metric-card__value tabular-nums">{stats.enabled}</div>
<div className="devices-metric-card__hint">{t("devicesExt.enabledDevicesHint")}</div>
</div>
</Card>
</Col>
</Row>
<Card className="devices-table-card app-page__filter-card" styles={{ body: { padding: "16px" } }}>
<div className="app-page__toolbar" style={{ justifyContent: "space-between", width: "100%" }}>
<div className="app-page__toolbar">
<Input
placeholder={t("devicesExt.searchPlaceholder")}
prefix={<SearchOutlined aria-hidden="true" />}
style={{ width: 360 }}
style={{ width: 420 }}
value={searchText}
onChange={(event) => setSearchText(event.target.value)}
allowClear
aria-label={t("devicesExt.searchLabel")}
/>
<Button type="primary" icon={<SearchOutlined aria-hidden="true" />} onClick={handleSearch}>{t("common.search")}</Button>
<Button onClick={handleResetSearch}>{t("common.reset")}</Button>
</div>
{can("device:create") ? (
<Button type="primary" icon={<PlusOutlined aria-hidden="true" />} onClick={openCreate}>
{t("common.create")}
</Button>
) : null}
<Button onClick={loadData}>{t("common.refresh")}</Button>
</div>
</Card>
@ -159,12 +167,13 @@ export default function Devices() {
dataSource={filteredData}
loading={loading}
size="middle"
scroll={{ y: "calc(100vh - 350px)" }}
scroll={{ y: "calc(100vh - 350px)", x: 1200 }}
pagination={getStandardPagination(filteredData.length, 1, 1000)}
columns={[
{
title: t("devicesExt.device"),
key: "device",
width: 280,
render: (_value: unknown, record) => (
<Space>
<div className="device-icon-placeholder">
@ -180,21 +189,52 @@ export default function Devices() {
{
title: t("devices.owner"),
key: "user",
width: 220,
render: (_value: unknown, record) => {
const owner = userMap[record.userId];
return owner ? (
if (!record.userId) {
return <Text type="secondary">{t("devicesExt.unboundAccount")}</Text>;
}
return (
<Space>
<UserOutlined aria-hidden="true" style={{ color: "#8c8c8c" }} />
<span>{owner.displayName}</span>
<span>{record.displayName || record.username || `#${record.userId}`}</span>
<Text type="secondary" style={{ fontSize: "12px" }} className="tabular-nums">
({t("devicesExt.ownerId")}: {record.userId})
</Text>
</Space>
) : (
<span className="tabular-nums">{t("devicesExt.ownerId")}: {record.userId}</span>
);
}
},
{
title: t("devicesExt.terminalType"),
dataIndex: "terminalType",
width: 140,
render: (text: string) => <Tag>{text || "-"}</Tag>
},
{
title: t("devicesExt.terminalVersion"),
dataIndex: "terminalVersion",
width: 160,
render: (text: string) => <Text className="tabular-nums">{text || "-"}</Text>
},
{
title: t("devicesExt.onlineStatus"),
dataIndex: "online",
width: 120,
render: (online: boolean) => (
<Tag color={online ? "green" : "default"}>{online ? t("devicesExt.online") : t("devicesExt.offline")}</Tag>
)
},
{
title: t("devicesExt.lastOnlineAt"),
dataIndex: "lastOnlineAt",
width: 180,
render: (text: string) => (
<Text type="secondary" className="tabular-nums">
{text ? text.replace("T", " ").substring(0, 19) : "-"}
</Text>
)
},
{
title: t("common.status"),
dataIndex: "status",
@ -217,16 +257,21 @@ export default function Devices() {
{
title: t("common.action"),
key: "action",
width: 120,
width: 140,
fixed: "right",
render: (_value: unknown, record) => (
<Space>
{can("device:update") ? (
<Button type="text" icon={<EditOutlined aria-hidden="true" />} onClick={() => openEdit(record)} aria-label={t("devicesExt.editDevice")} />
) : null}
{can("device:delete") ? (
<Popconfirm title={t("devicesExt.deleteDevice")} onConfirm={() => remove(record.deviceId)}>
<Button type="text" danger icon={<DeleteOutlined aria-hidden="true" />} aria-label={t("common.delete")} />
{can("device:update") && record.online ? (
<Popconfirm title={t("devicesExt.kickDeviceConfirm")} onConfirm={() => kick(record)}>
<Button
type="text"
danger
icon={<DisconnectOutlined aria-hidden="true" />}
aria-label={t("devicesExt.kickDevice")}
/>
</Popconfirm>
) : null}
</Space>
@ -240,7 +285,7 @@ export default function Devices() {
title={
<div className="device-drawer-title">
<DesktopOutlined className="mr-2" aria-hidden="true" />
{editing ? t("devices.drawerTitleEdit") : t("devicesExt.drawerTitleCreate")}
{t("devices.drawerTitleEdit")}
</div>
}
open={open}
@ -257,17 +302,6 @@ export default function Devices() {
}
>
<Form form={form} layout="vertical">
<Form.Item label={t("devices.owner")} name="userId" rules={[{ required: true, message: t("devicesExt.selectOwner") }]}>
<Select
showSearch
placeholder={t("devicesExt.searchSelectUser")}
optionFilterProp="label"
options={users.map((user) => ({ label: `${user.displayName} (@${user.username})`, value: user.userId }))}
/>
</Form.Item>
<Form.Item label={t("devices.deviceCode")} name="deviceCode" rules={[{ required: true, message: t("devicesExt.deviceCodeRequired") }]}>
<Input placeholder={t("devicesExt.deviceCodePlaceholder")} />
</Form.Item>
<Form.Item label={t("devices.deviceName")} name="deviceName">
<Input placeholder={t("devicesExt.deviceNamePlaceholder")} />
</Form.Item>

View File

@ -84,9 +84,15 @@ export interface PermissionNode extends SysPermission {
export interface DeviceInfo extends BaseEntity {
deviceId: number;
userId: number;
userId?: number;
username?: string;
displayName?: string;
deviceCode: string;
deviceName?: string;
terminalType?: string;
terminalVersion?: string;
online?: boolean;
lastOnlineAt?: string;
}
export interface SysDictType extends BaseEntity {