diff --git a/backend/src/main/java/com/imeeting/common/RedisKeys.java b/backend/src/main/java/com/imeeting/common/RedisKeys.java index 27921f2..aad9f48 100644 --- a/backend/src/main/java/com/imeeting/common/RedisKeys.java +++ b/backend/src/main/java/com/imeeting/common/RedisKeys.java @@ -91,22 +91,6 @@ public final class RedisKeys { return "biz:android:device:topics:" + deviceId; } - public static String androidDeviceOutboxKey(String deviceId) { - return "biz:android:device:outbox:" + deviceId; - } - - public static String androidDeviceMessageSeqKey(String deviceId) { - return "biz:android:device:message-seq:" + deviceId; - } - - public static String realtimeMeetingGrpcSessionKey(String streamToken) { - return "biz:meeting:realtime:grpc-session:" + streamToken; - } - - public static String realtimeMeetingGrpcConnectionKey(String connectionId) { - return "biz:meeting:realtime:grpc-conn:" + connectionId; - } - public static String realtimeMeetingEventSeqKey(Long meetingId) { return "biz:meeting:realtime:event-seq:" + meetingId; } diff --git a/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingRealtimeController.java b/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingRealtimeController.java index 46f58f8..5f442f3 100644 --- a/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingRealtimeController.java +++ b/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingRealtimeController.java @@ -5,13 +5,11 @@ import com.imeeting.config.grpc.GrpcServerProperties; import com.imeeting.dto.android.AndroidAuthContext; import com.imeeting.dto.android.AndroidCreateRealtimeMeetingCommand; import com.imeeting.dto.android.AndroidCreateRealtimeMeetingVO; -import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand; -import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO; import com.imeeting.dto.biz.CreateRealtimeMeetingCommand; import com.imeeting.dto.biz.MeetingTranscriptVO; -import com.imeeting.dto.biz.RealtimeMeetingRuntimeProfile; import com.imeeting.dto.biz.MeetingVO; import com.imeeting.dto.biz.RealtimeMeetingCompleteDTO; +import com.imeeting.dto.biz.RealtimeMeetingRuntimeProfile; import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; import com.imeeting.entity.biz.Meeting; import com.imeeting.service.android.AndroidAuthService; @@ -21,12 +19,11 @@ import com.imeeting.service.biz.MeetingCommandService; import com.imeeting.service.biz.MeetingQueryService; import com.imeeting.service.biz.MeetingRuntimeProfileResolver; import com.imeeting.service.biz.RealtimeMeetingSessionStateService; -import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService; import com.unisbase.common.ApiResponse; +import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.media.ArraySchema; import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; -import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.servlet.http.HttpServletRequest; @@ -56,15 +53,14 @@ public class AndroidMeetingRealtimeController { private final MeetingQueryService meetingQueryService; private final MeetingCommandService meetingCommandService; private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; - private final AndroidRealtimeSessionTicketService androidRealtimeSessionTicketService; private final MeetingRuntimeProfileResolver meetingRuntimeProfileResolver; private final GrpcServerProperties grpcServerProperties; - @Operation(summary = "创建Android实时会议") + @Operation(summary = "创建 Android 实时会议") @ApiResponses({ @io.swagger.v3.oas.annotations.responses.ApiResponse( responseCode = "200", - description = "返回实时会议创建结果与本次生效的运行时参数", + description = "返回实时会议创建结果和当前生效的运行时参数", content = @Content(schema = @Schema(implementation = AndroidCreateRealtimeMeetingVO.class)) ) }) @@ -117,11 +113,11 @@ public class AndroidMeetingRealtimeController { return ApiResponse.ok(vo); } - @Operation(summary = "查询Android实时会议状态") + @Operation(summary = "查询 Android 实时会议状态") @ApiResponses({ @io.swagger.v3.oas.annotations.responses.ApiResponse( responseCode = "200", - description = "返回实时会议当前状态、恢复信息和连接状态", + description = "返回实时会议当前状态和恢复信息", content = @Content(schema = @Schema(implementation = RealtimeMeetingSessionStatusVO.class)) ) }) @@ -133,7 +129,7 @@ public class AndroidMeetingRealtimeController { return ApiResponse.ok(realtimeMeetingSessionStateService.getStatus(id)); } - @Operation(summary = "查询Android会议转写") + @Operation(summary = "查询 Android 会议转写") @ApiResponses({ @io.swagger.v3.oas.annotations.responses.ApiResponse( responseCode = "200", @@ -149,7 +145,7 @@ public class AndroidMeetingRealtimeController { return ApiResponse.ok(meetingQueryService.getTranscripts(id)); } - @Operation(summary = "暂停Android实时会议") + @Operation(summary = "暂停 Android 实时会议") @ApiResponses({ @io.swagger.v3.oas.annotations.responses.ApiResponse( responseCode = "200", @@ -165,7 +161,7 @@ public class AndroidMeetingRealtimeController { return ApiResponse.ok(realtimeMeetingSessionStateService.pause(id)); } - @Operation(summary = "完成Android实时会议") + @Operation(summary = "完成 Android 实时会议") @ApiResponses({ @io.swagger.v3.oas.annotations.responses.ApiResponse( responseCode = "200", @@ -188,24 +184,6 @@ public class AndroidMeetingRealtimeController { return ApiResponse.ok(true); } - @Operation(summary = "打开Android实时会议gRPC会话") - @ApiResponses({ - @io.swagger.v3.oas.annotations.responses.ApiResponse( - responseCode = "200", - description = "返回实时会议 gRPC 会话信息,包括连接参数和状态", - content = @Content(schema = @Schema(implementation = AndroidRealtimeGrpcSessionVO.class)) - ) - }) - @PostMapping("/{id}/realtime/grpc-session") - public ApiResponse openRealtimeGrpcSession(@PathVariable Long id, - HttpServletRequest request, - @RequestBody(required = false) AndroidOpenRealtimeGrpcSessionCommand command) { - AndroidAuthContext authContext = androidAuthService.authenticateHttp(request); - Meeting meeting = meetingAccessService.requireMeeting(id); - meetingAuthorizationService.assertCanControlRealtimeMeeting(meeting, authContext, MeetingConstants.SOURCE_ANDROID); - return ApiResponse.ok(androidRealtimeSessionTicketService.createSession(id, command, authContext)); - } - private CreateRealtimeMeetingCommand buildCreateCommand(AndroidCreateRealtimeMeetingCommand command, AndroidAuthContext authContext, RealtimeMeetingRuntimeProfile runtimeProfile) { diff --git a/backend/src/main/java/com/imeeting/controller/biz/DeviceManagementController.java b/backend/src/main/java/com/imeeting/controller/biz/DeviceManagementController.java new file mode 100644 index 0000000..e3c3272 --- /dev/null +++ b/backend/src/main/java/com/imeeting/controller/biz/DeviceManagementController.java @@ -0,0 +1,62 @@ +package com.imeeting.controller.biz; + +import com.imeeting.dto.biz.DeviceAdminUpdateCommand; +import com.imeeting.dto.biz.DeviceOnlineAdminVO; +import com.imeeting.service.biz.DeviceOnlineManagementService; +import com.unisbase.common.ApiResponse; +import com.unisbase.security.LoginUser; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.ArraySchema; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.RequiredArgsConstructor; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@Tag(name = "设备在线管理") +@RestController +@RequestMapping("/api/admin/devices") +@RequiredArgsConstructor +public class DeviceManagementController { + + private final DeviceOnlineManagementService deviceOnlineManagementService; + + @Operation(summary = "查询设备在线管理列表") + @ApiResponses({ + @io.swagger.v3.oas.annotations.responses.ApiResponse( + responseCode = "200", + description = "返回设备在线管理列表", + content = @Content(array = @ArraySchema(schema = @Schema(implementation = DeviceOnlineAdminVO.class))) + ) + }) + @GetMapping + public ApiResponse> list() { + return ApiResponse.ok(deviceOnlineManagementService.listForAdmin(currentLoginUser())); + } + + @Operation(summary = "更新设备管理信息") + @PutMapping("/{id}") + public ApiResponse update(@PathVariable Long id, @RequestBody DeviceAdminUpdateCommand command) { + return ApiResponse.ok(deviceOnlineManagementService.update(id, command, currentLoginUser())); + } + + @Operation(summary = "踢下线设备") + @PostMapping("/{id}/kick") + public ApiResponse kick(@PathVariable Long id) { + return ApiResponse.ok(deviceOnlineManagementService.kick(id, currentLoginUser())); + } + + private LoginUser currentLoginUser() { + return (LoginUser) SecurityContextHolder.getContext().getAuthentication().getPrincipal(); + } +} diff --git a/backend/src/main/java/com/imeeting/dto/android/AndroidOpenRealtimeGrpcSessionCommand.java b/backend/src/main/java/com/imeeting/dto/android/AndroidOpenRealtimeGrpcSessionCommand.java deleted file mode 100644 index b583fe4..0000000 --- a/backend/src/main/java/com/imeeting/dto/android/AndroidOpenRealtimeGrpcSessionCommand.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.imeeting.dto.android; - -import lombok.Data; - -import java.util.List; -import java.util.Map; - -@Data -public class AndroidOpenRealtimeGrpcSessionCommand { - private Long asrModelId; - private String mode; - private String language; - private Integer useSpkId; - private Boolean enablePunctuation; - private Boolean enableItn; - private Boolean enableTextRefine; - private Boolean saveAudio; - private List> hotwords; -} diff --git a/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionData.java b/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionData.java deleted file mode 100644 index da12ec9..0000000 --- a/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionData.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.imeeting.dto.android; - -import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; -import lombok.Data; - -@Data -public class AndroidRealtimeGrpcSessionData { - private Long meetingId; - private Long tenantId; - private Long userId; - private String deviceId; - private Long asrModelId; - private String targetWsUrl; - private String startMessageJson; - private RealtimeMeetingResumeConfig resumeConfig; -} diff --git a/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionVO.java b/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionVO.java deleted file mode 100644 index 5bf028f..0000000 --- a/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionVO.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.imeeting.dto.android; - -import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; -import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; - -@Schema(description = "Android 实时会议 gRPC 会话信息") -@Data -public class AndroidRealtimeGrpcSessionVO { - @Schema(description = "会议 ID") - private Long meetingId; - @Schema(description = "实时流会话令牌") - private String streamToken; - @Schema(description = "令牌剩余有效秒数") - private Long expiresInSeconds; - @Schema(description = "实时音频采样率") - private Integer sampleRate; - @Schema(description = "音频通道数") - private Integer channels; - @Schema(description = "音频编码格式") - private String encoding; - @Schema(description = "恢复会议时使用的运行时参数") - private RealtimeMeetingResumeConfig resumeConfig; - @Schema(description = "当前实时会议状态") - private RealtimeMeetingSessionStatusVO status; -} diff --git a/backend/src/main/java/com/imeeting/dto/biz/DeviceAdminUpdateCommand.java b/backend/src/main/java/com/imeeting/dto/biz/DeviceAdminUpdateCommand.java new file mode 100644 index 0000000..41dee98 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/biz/DeviceAdminUpdateCommand.java @@ -0,0 +1,15 @@ +package com.imeeting.dto.biz; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +@Data +@Schema(description = "设备管理更新请求") +public class DeviceAdminUpdateCommand { + + @Schema(description = "设备名称") + private String deviceName; + + @Schema(description = "状态:1启用,0停用") + private Integer status; +} diff --git a/backend/src/main/java/com/imeeting/dto/biz/DeviceOnlineAdminVO.java b/backend/src/main/java/com/imeeting/dto/biz/DeviceOnlineAdminVO.java new file mode 100644 index 0000000..d363461 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/biz/DeviceOnlineAdminVO.java @@ -0,0 +1,53 @@ +package com.imeeting.dto.biz; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +@Schema(description = "设备在线管理视图") +public class DeviceOnlineAdminVO { + + @Schema(description = "设备ID") + private Long deviceId; + + @Schema(description = "绑定帐号用户ID") + private Long userId; + + @Schema(description = "绑定帐号用户名") + private String username; + + @Schema(description = "绑定帐号显示名") + private String displayName; + + @Schema(description = "设备编码,对应 Android deviceId") + private String deviceCode; + + @Schema(description = "设备名称") + private String deviceName; + + @Schema(description = "终端类型") + private String terminalType; + + @Schema(description = "终端版本") + private String terminalVersion; + + @Schema(description = "是否在线") + private Boolean online; + + @Schema(description = "最后一次在线时间") + private LocalDateTime lastOnlineAt; + + @Schema(description = "状态:1启用,0停用") + private Integer status; + + @Schema(description = "创建时间") + private LocalDateTime createdAt; + + @Schema(description = "更新时间") + private LocalDateTime updatedAt; + + @Schema(description = "租户ID") + private Long tenantId; +} diff --git a/backend/src/main/java/com/imeeting/entity/biz/DeviceInfoEntity.java b/backend/src/main/java/com/imeeting/entity/biz/DeviceInfoEntity.java new file mode 100644 index 0000000..10c89ba --- /dev/null +++ b/backend/src/main/java/com/imeeting/entity/biz/DeviceInfoEntity.java @@ -0,0 +1,31 @@ +package com.imeeting.entity.biz; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.unisbase.entity.BaseEntity; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.time.LocalDateTime; + +@Data +@EqualsAndHashCode(callSuper = true) +@TableName("biz_device_info") +public class DeviceInfoEntity extends BaseEntity { + + @TableId(value = "device_id", type = IdType.AUTO) + private Long deviceId; + + private Long userId; + + private String deviceCode; + + private String deviceName; + + private String terminalType; + + private String terminalVersion; + + private LocalDateTime lastOnlineAt; +} diff --git a/backend/src/main/java/com/imeeting/grpc/gateway/AndroidGatewayGrpcService.java b/backend/src/main/java/com/imeeting/grpc/gateway/AndroidGatewayGrpcService.java deleted file mode 100644 index 3ec68ed..0000000 --- a/backend/src/main/java/com/imeeting/grpc/gateway/AndroidGatewayGrpcService.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.imeeting.grpc.gateway; - -import com.imeeting.config.grpc.GrpcServerProperties; -import com.imeeting.dto.android.AndroidAuthContext; -import com.imeeting.dto.android.AndroidDeviceSessionState; -import com.imeeting.grpc.common.ErrorEvent; -import com.imeeting.service.android.AndroidAuthService; -import com.imeeting.service.android.AndroidDeviceSessionService; -import com.imeeting.service.android.AndroidGatewayPushService; -import io.grpc.BindableService; -import io.grpc.stub.StreamObserver; -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Service; - -@Service -@RequiredArgsConstructor -public class AndroidGatewayGrpcService extends AndroidGatewayServiceGrpc.AndroidGatewayServiceImplBase implements BindableService { - - private final AndroidAuthService androidAuthService; - private final AndroidDeviceSessionService androidDeviceSessionService; - private final AndroidGatewayPushService androidGatewayPushService; - private final GrpcServerProperties grpcServerProperties; - - @Override - public StreamObserver connect(StreamObserver responseObserver) { - return new StreamObserver<>() { - private String connectionId; - private String deviceId; - private AndroidAuthContext authContext; - - @Override - public void onNext(GatewayClientPacket packet) { - try { - switch (packet.getBodyCase()) { - case HELLO -> handleHello(packet); - case HEARTBEAT -> handleHeartbeat(packet.getHeartbeat()); - case SUBSCRIBE -> handleSubscribe(packet.getSubscribe()); - case ACK, BODY_NOT_SET -> { - } - } - } catch (Exception ex) { - sendError(responseObserver, ex.getMessage()); - } - } - - @Override - public void onError(Throwable t) { - cleanup(); - } - - @Override - public void onCompleted() { - cleanup(); - responseObserver.onCompleted(); - } - - private void handleHello(GatewayClientPacket packet) { - authContext = androidAuthService.authenticateGrpc(packet.getAuth(), packet.getHello().getDeviceId()); - AndroidDeviceSessionState sessionState = androidDeviceSessionService.openSession(authContext, packet.getHello()); - connectionId = sessionState.getConnectionId(); - deviceId = sessionState.getDeviceId(); - androidGatewayPushService.register(connectionId, deviceId, responseObserver); - responseObserver.onNext(GatewayServerPacket.newBuilder() - .setRequestId(packet.getRequestId()) - .setHelloAck(HelloAck.newBuilder() - .setConnectionId(connectionId) - .setAuthMode(authContext.getAuthMode()) - .setServerTime(System.currentTimeMillis()) - .setHeartbeatIntervalSeconds(grpcServerProperties.getGateway().getHeartbeatIntervalSeconds()) - .build()) - .build()); - } - - private void handleHeartbeat(Heartbeat heartbeat) { - if (heartbeat == null || connectionId == null) { - return; - } - AndroidDeviceSessionState state = androidDeviceSessionService.refreshHeartbeat(connectionId, heartbeat.getClientTime()); - if (state == null) { - sendError(responseObserver, "未找到安卓设备会话"); - return; - } - responseObserver.onNext(GatewayServerPacket.newBuilder() - .setPong(Pong.newBuilder() - .setConnectionId(connectionId) - .setServerTime(System.currentTimeMillis()) - .build()) - .build()); - } - - private void handleSubscribe(Subscribe subscribe) { - if (deviceId == null) { - return; - } - androidDeviceSessionService.updateTopics(deviceId, subscribe.getTopicsList()); - } - - private void cleanup() { - if (connectionId != null) { - androidGatewayPushService.unregister(connectionId); - androidDeviceSessionService.closeSession(connectionId); - } - } - }; - } - - private void sendError(StreamObserver responseObserver, String message) { - responseObserver.onNext(GatewayServerPacket.newBuilder() - .setError(ErrorEvent.newBuilder() - .setCode("ANDROID_GATEWAY_ERROR") - .setMessage(message == null ? "网关处理失败" : message) - .setRetryable(false) - .build()) - .build()); - } -} diff --git a/backend/src/main/java/com/imeeting/grpc/push/AndroidPushGrpcService.java b/backend/src/main/java/com/imeeting/grpc/push/AndroidPushGrpcService.java new file mode 100644 index 0000000..5cc0da6 --- /dev/null +++ b/backend/src/main/java/com/imeeting/grpc/push/AndroidPushGrpcService.java @@ -0,0 +1,177 @@ +package com.imeeting.grpc.push; + +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.dto.android.AndroidDeviceSessionState; +import com.imeeting.service.android.AndroidAuthService; +import com.imeeting.service.android.AndroidDeviceSessionService; +import com.imeeting.service.android.AndroidGatewayPushService; +import com.imeeting.service.biz.DeviceOnlineManagementService; +import com.unisbase.common.exception.BusinessException; +import io.grpc.BindableService; +import io.grpc.stub.StreamObserver; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AndroidPushGrpcService extends PushServiceGrpc.PushServiceImplBase implements BindableService { + + private final AndroidAuthService androidAuthService; + private final AndroidDeviceSessionService androidDeviceSessionService; + private final AndroidGatewayPushService androidGatewayPushService; + private final DeviceOnlineManagementService deviceOnlineManagementService; + + @Override + public StreamObserver communicate(StreamObserver responseObserver) { + return new StreamObserver<>() { + private String connectionId; + private String deviceId; + private boolean connected; + + @Override + public void onNext(ClientMessage message) { + try { + switch (message.getPayloadCase()) { + case CONNECT -> handleConnect(message.getConnect()); + case HEARTBEAT -> handleHeartbeat(message.getHeartbeat()); + case ACK -> handleAck(message.getAck()); + case PAYLOAD_NOT_SET -> sendError(responseObserver, "PUSH_BAD_REQUEST", "Missing push payload", false); + } + } catch (BusinessException ex) { + log.warn("Android push gRPC business rejection, connectionId={}", connectionId, ex); + sendError(responseObserver, ex.getCode(), ex.getMessage(), false); + } catch (Exception ex) { + log.warn("Android push gRPC request handling failed, connectionId={}", connectionId, ex); + sendError(responseObserver, "PUSH_PROCESSING_ERROR", ex.getMessage(), false); + } + } + + @Override + public void onError(Throwable throwable) { + log.warn("Android push gRPC stream failed, connectionId={}", connectionId, throwable); + cleanup(); + } + + @Override + public void onCompleted() { + cleanup(); + responseObserver.onCompleted(); + } + + private void handleConnect(ConnectRequest request) { + if (connected) { + sendError(responseObserver, "PUSH_ALREADY_CONNECTED", "Push connection already established", false); + return; + } + AndroidAuthContext authContext = androidAuthService.authenticateGrpc( + request.getDeviceId(), + request.getAppVersion(), + resolvePlatform(request.getPlatform()) + ); + authContext.setUserId(parseNullableLong(request.getUserId(), "user_id")); + authContext.setTenantId(parseNullableLong(request.getTenantId(), "tenant_id")); + AndroidDeviceSessionState sessionState = androidDeviceSessionService.openSession(authContext, request.getConnectionId()); + connectionId = sessionState.getConnectionId(); + deviceId = sessionState.getDeviceId(); + deviceOnlineManagementService.recordConnected(authContext); + connected = true; + String replacedConnectionId = androidGatewayPushService.register(connectionId, deviceId, responseObserver); + if (replacedConnectionId != null && !replacedConnectionId.equals(connectionId)) { + androidDeviceSessionService.closeSession(replacedConnectionId); + } + responseObserver.onNext(ServerMessage.newBuilder() + .setConnectAck(ConnectResponse.newBuilder() + .setSuccess(true) + .setMessage(connectionId) + .build()) + .build()); + } + + private void handleHeartbeat(HeartbeatRequest request) { + if (!validateConnected()) { + return; + } + if (!request.getConnectionId().isBlank() && !request.getConnectionId().equals(connectionId)) { + sendError(responseObserver, "PUSH_CONNECTION_MISMATCH", "Connection id does not match active session", false); + return; + } + if (!request.getDeviceId().isBlank() && !request.getDeviceId().equals(deviceId)) { + sendError(responseObserver, "PUSH_DEVICE_MISMATCH", "Device id does not match active session", false); + return; + } + AndroidDeviceSessionState state = androidDeviceSessionService.refreshHeartbeat(connectionId, request.getTimestamp()); + responseObserver.onNext(ServerMessage.newBuilder() + .setHeartbeat(HeartbeatResponse.newBuilder() + .setTimestamp(System.currentTimeMillis()) + .setOk(state != null) + .build()) + .build()); + } + + private void handleAck(AckRequest request) { + if (!validateConnected()) { + return; + } + if (!request.getConnectionId().isBlank() && !request.getConnectionId().equals(connectionId)) { + sendError(responseObserver, "PUSH_CONNECTION_MISMATCH", "Connection id does not match active session", false); + return; + } + if (!request.getDeviceId().isBlank() && !request.getDeviceId().equals(deviceId)) { + sendError(responseObserver, "PUSH_DEVICE_MISMATCH", "Device id does not match active session", false); + } + } + + private boolean validateConnected() { + if (connected) { + return true; + } + sendError(responseObserver, "PUSH_NOT_CONNECTED", "Push connection has not been established", false); + return false; + } + + private void cleanup() { + if (connectionId == null) { + return; + } + AndroidDeviceSessionState state = androidDeviceSessionService.getByConnectionId(connectionId); + androidGatewayPushService.unregister(connectionId); + androidDeviceSessionService.closeSession(connectionId); + deviceOnlineManagementService.recordDisconnected(deviceId, state == null ? null : state.getLastSeenAt()); + connectionId = null; + deviceId = null; + connected = false; + } + }; + } + + private void sendError(StreamObserver responseObserver, String code, String message, boolean retryable) { + responseObserver.onNext(ServerMessage.newBuilder() + .setError(ErrorEvent.newBuilder() + .setCode(code) + .setMessage(message == null || message.isBlank() ? "Push request failed" : message) + .setRetryable(retryable) + .build()) + .build()); + } + + private String resolvePlatform(Platform platform) { + return switch (platform) { + case IOS -> "ios"; + case ANDROID -> "android"; + case PLATFORM_UNKNOWN, UNRECOGNIZED -> "android"; + }; + } + + private Long parseNullableLong(String value, String fieldName) { + if (value == null || value.isBlank()) { + return null; + } + try { + return Long.parseLong(value.trim()); + } catch (NumberFormatException ex) { + throw new RuntimeException("Invalid " + fieldName); + } + } +} diff --git a/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java b/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java deleted file mode 100644 index ed3bd43..0000000 --- a/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java +++ /dev/null @@ -1,139 +0,0 @@ -package com.imeeting.grpc.realtime; - -import com.imeeting.dto.android.AndroidAuthContext; -import com.imeeting.grpc.common.ErrorEvent; -import com.imeeting.service.android.AndroidAuthService; -import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService; -import io.grpc.BindableService; -import io.grpc.stub.StreamObserver; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -@Slf4j -@Service -@RequiredArgsConstructor -public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.RealtimeMeetingServiceImplBase implements BindableService { - - private final AndroidAuthService androidAuthService; - private final RealtimeMeetingGrpcSessionService realtimeMeetingGrpcSessionService; - - @Override - public StreamObserver streamMeetingAudio(StreamObserver responseObserver) { - return new StreamObserver<>() { - private String connectionId; - private AndroidAuthContext authContext; - private boolean completed; - - @Override - public void onNext(RealtimeClientPacket packet) { - if (completed) { - return; - } - try { - switch (packet.getBodyCase()) { - case OPEN -> handleOpen(packet); - case AUDIO -> handleAudio(packet.getAudio()); - case CONTROL -> handleControl(packet.getControl()); - case BODY_NOT_SET -> { - } - } - } catch (Exception ex) { - handleProcessingError(packet, ex); - } - } - - @Override - public void onError(Throwable t) { - completed = true; - log.warn("Realtime meeting gRPC client stream failed, connectionId={}", connectionId, t); - safeCloseStream("client_error", false); - } - - @Override - public void onCompleted() { - completed = true; - if (connectionId != null) { - safeCloseStream("client_completed", false); - } else { - safeCompleteResponse(); - } - } - - private void handleOpen(RealtimeClientPacket packet) { - authContext = androidAuthService.authenticateGrpc(packet.getAuth(), null); - connectionId = realtimeMeetingGrpcSessionService.openStream( - packet.getOpen().getMeetingId(), - packet.getOpen().getStreamToken(), - authContext, - responseObserver - ); - } - - private void handleAudio(AudioChunk audioChunk) { - if (connectionId == null) { - throw new RuntimeException("实时 gRPC 流未打开"); - } - realtimeMeetingGrpcSessionService.onAudio(connectionId, audioChunk.getPcm16().toByteArray(), audioChunk.getSeq(), audioChunk.getLastChunk()); - } - - private void handleControl(RealtimeControl control) { - if (connectionId == null) { - return; - } - switch (control.getType()) { - case STOP_SPEAKING, END_INPUT -> realtimeMeetingGrpcSessionService.onStopSpeaking(connectionId); - case CLOSE_STREAM -> safeCloseStream("client_close_stream", true); - case START, CONTROL_TYPE_UNSPECIFIED -> { - } - } - } - - private void handleProcessingError(RealtimeClientPacket packet, Exception ex) { - String requestId = packet == null ? "" : packet.getRequestId(); - log.error("Realtime meeting gRPC packet processing failed, requestId={}, connectionId={}", requestId, connectionId, ex); - safeSendError(requestId, ex.getMessage()); - completed = true; - if (connectionId != null) { - safeCloseStream("grpc_processing_error", true); - } else { - safeCompleteResponse(); - } - } - - private void safeSendError(String requestId, String message) { - try { - responseObserver.onNext(RealtimeServerPacket.newBuilder() - .setRequestId(requestId == null ? "" : requestId) - .setError(ErrorEvent.newBuilder() - .setCode("REALTIME_GRPC_ERROR") - .setMessage(message == null || message.isBlank() ? "实时会议 gRPC 处理失败" : message) - .setRetryable(false) - .build()) - .build()); - } catch (Exception observerEx) { - log.warn("Failed to deliver realtime gRPC error packet, requestId={}, connectionId={}", requestId, connectionId, observerEx); - } - } - - private void safeCloseStream(String reason, boolean notifyClient) { - if (connectionId == null) { - return; - } - try { - realtimeMeetingGrpcSessionService.closeStream(connectionId, reason, notifyClient); - } catch (Exception closeEx) { - log.error("Failed to close realtime gRPC stream, connectionId={}, reason={}", connectionId, reason, closeEx); - } - } - - private void safeCompleteResponse() { - try { - responseObserver.onCompleted(); - } catch (Exception observerEx) { - log.warn("Failed to complete realtime gRPC response, connectionId={}", connectionId, observerEx); - } - } - }; - } -} diff --git a/backend/src/main/java/com/imeeting/mapper/DeviceInfoMapper.java b/backend/src/main/java/com/imeeting/mapper/DeviceInfoMapper.java new file mode 100644 index 0000000..8357312 --- /dev/null +++ b/backend/src/main/java/com/imeeting/mapper/DeviceInfoMapper.java @@ -0,0 +1,66 @@ +package com.imeeting.mapper; + +import com.baomidou.mybatisplus.annotation.InterceptorIgnore; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.imeeting.dto.biz.DeviceOnlineAdminVO; +import com.imeeting.entity.biz.DeviceInfoEntity; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; + +@Mapper +public interface DeviceInfoMapper extends BaseMapper { + + @InterceptorIgnore(tenantLine = "true") + @Select(""" + SELECT * + FROM biz_device_info + WHERE device_code = #{deviceCode} + AND is_deleted = 0 + ORDER BY updated_at DESC, device_id DESC + LIMIT 1 + """) + DeviceInfoEntity selectByDeviceCodeIgnoreTenant(@Param("deviceCode") String deviceCode); + + @InterceptorIgnore(tenantLine = "true") + @Select(""" + SELECT * + FROM biz_device_info + WHERE device_id = #{deviceId} + AND is_deleted = 0 + LIMIT 1 + """) + DeviceInfoEntity selectByIdIgnoreTenant(@Param("deviceId") Long deviceId); + + @InterceptorIgnore(tenantLine = "true") + @Select(""" + + """) + List selectAdminList(@Param("tenantId") Long tenantId, @Param("platformAdmin") boolean platformAdmin); +} diff --git a/backend/src/main/java/com/imeeting/service/android/AndroidAuthService.java b/backend/src/main/java/com/imeeting/service/android/AndroidAuthService.java index 5ec8ba5..2849e7f 100644 --- a/backend/src/main/java/com/imeeting/service/android/AndroidAuthService.java +++ b/backend/src/main/java/com/imeeting/service/android/AndroidAuthService.java @@ -1,11 +1,10 @@ package com.imeeting.service.android; import com.imeeting.dto.android.AndroidAuthContext; -import com.imeeting.grpc.common.ClientAuth; import jakarta.servlet.http.HttpServletRequest; public interface AndroidAuthService { - AndroidAuthContext authenticateGrpc(ClientAuth auth, String fallbackDeviceId); + AndroidAuthContext authenticateGrpc(String deviceId, String appVersion, String platform); AndroidAuthContext authenticateHttp(HttpServletRequest request); } diff --git a/backend/src/main/java/com/imeeting/service/android/AndroidDeviceSessionService.java b/backend/src/main/java/com/imeeting/service/android/AndroidDeviceSessionService.java index 2743453..c671efc 100644 --- a/backend/src/main/java/com/imeeting/service/android/AndroidDeviceSessionService.java +++ b/backend/src/main/java/com/imeeting/service/android/AndroidDeviceSessionService.java @@ -2,17 +2,20 @@ package com.imeeting.service.android; import com.imeeting.dto.android.AndroidAuthContext; import com.imeeting.dto.android.AndroidDeviceSessionState; -import com.imeeting.grpc.gateway.DeviceHello; import java.util.List; public interface AndroidDeviceSessionService { - AndroidDeviceSessionState openSession(AndroidAuthContext authContext, DeviceHello hello); + AndroidDeviceSessionState openSession(AndroidAuthContext authContext, String requestedConnectionId); AndroidDeviceSessionState refreshHeartbeat(String connectionId, long clientTime); AndroidDeviceSessionState getByConnectionId(String connectionId); + AndroidDeviceSessionState getByDeviceId(String deviceId); + + String getActiveConnectionId(String deviceId); + void updateTopics(String deviceId, List topics); void closeSession(String connectionId); diff --git a/backend/src/main/java/com/imeeting/service/android/AndroidGatewayPushService.java b/backend/src/main/java/com/imeeting/service/android/AndroidGatewayPushService.java index 471d341..c4cdb6c 100644 --- a/backend/src/main/java/com/imeeting/service/android/AndroidGatewayPushService.java +++ b/backend/src/main/java/com/imeeting/service/android/AndroidGatewayPushService.java @@ -1,14 +1,17 @@ package com.imeeting.service.android; -import com.imeeting.grpc.gateway.GatewayServerPacket; +import com.imeeting.grpc.push.PushMessage; +import com.imeeting.grpc.push.ServerMessage; import io.grpc.stub.StreamObserver; public interface AndroidGatewayPushService { - void register(String connectionId, String deviceId, StreamObserver observer); + String register(String connectionId, String deviceId, StreamObserver observer); void unregister(String connectionId); - boolean pushToConnection(String connectionId, GatewayServerPacket packet); + boolean pushToConnection(String connectionId, PushMessage message); - int pushToDevice(String deviceId, GatewayServerPacket packet); + int pushToDevice(String deviceId, PushMessage message); + + String disconnectDevice(String deviceId); } diff --git a/backend/src/main/java/com/imeeting/service/android/impl/AndroidAuthServiceImpl.java b/backend/src/main/java/com/imeeting/service/android/impl/AndroidAuthServiceImpl.java index f1a55f8..bdab808 100644 --- a/backend/src/main/java/com/imeeting/service/android/impl/AndroidAuthServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/android/impl/AndroidAuthServiceImpl.java @@ -2,8 +2,10 @@ package com.imeeting.service.android.impl; import com.imeeting.config.grpc.AndroidGrpcAuthProperties; import com.imeeting.dto.android.AndroidAuthContext; -import com.imeeting.grpc.common.ClientAuth; +import com.imeeting.entity.biz.DeviceInfoEntity; +import com.imeeting.mapper.DeviceInfoMapper; import com.imeeting.service.android.AndroidAuthService; +import com.unisbase.common.exception.BusinessException; import com.unisbase.dto.InternalAuthCheckResponse; import com.unisbase.security.LoginUser; import com.unisbase.service.TokenValidationService; @@ -27,31 +29,15 @@ public class AndroidAuthServiceImpl implements AndroidAuthService { private final AndroidGrpcAuthProperties properties; private final TokenValidationService tokenValidationService; + private final DeviceInfoMapper deviceInfoMapper; @Override - public AndroidAuthContext authenticateGrpc(ClientAuth auth, String fallbackDeviceId) { - ClientAuth.AuthType authType = auth == null ? ClientAuth.AuthType.AUTH_TYPE_UNSPECIFIED : auth.getAuthType(); - if (authType == ClientAuth.AuthType.USER_JWT) { - InternalAuthCheckResponse authResult = validateToken(auth == null ? null : auth.getAccessToken()); - return buildContext(authType.name(), false, auth == null ? null : auth.getDeviceId(),auth == null ? null : auth.getAppId(), - auth == null ? null : auth.getAppVersion(), auth == null ? null : auth.getPlatform(), auth == null ? null : auth.getAccessToken(), fallbackDeviceId, authResult, null); - } - if (authType == ClientAuth.AuthType.DEVICE_TOKEN) { - return buildContext(authType.name(), false, auth == null ? null : auth.getDeviceId(), auth == null ? null : auth.getAppId(), - auth == null ? null : auth.getAppVersion(), auth == null ? null : auth.getPlatform(), auth == null ? null : auth.getAccessToken(), fallbackDeviceId, null, null); - } + public AndroidAuthContext authenticateGrpc(String deviceId, String appVersion, String platform) { if (properties.isEnabled() && !properties.isAllowAnonymous()) { - throw new RuntimeException("缺少 Android gRPC 认证信息"); + throw new RuntimeException("Android gRPC push does not allow anonymous access"); } - return buildContext("NONE", true, - auth == null ? null : auth.getDeviceId(), - auth == null ? null : auth.getAppId(), - auth == null ? null : auth.getAppVersion(), - auth == null ? null : auth.getPlatform(), - auth == null ? null : auth.getAccessToken(), - fallbackDeviceId, - null, - null); + assertDeviceEnabled(deviceId); + return buildContext("NONE", true, deviceId, null, appVersion, platform, null, null, null, null); } @Override @@ -64,6 +50,7 @@ public class AndroidAuthServiceImpl implements AndroidAuthService { String platform = request.getHeader(HEADER_PLATFORM); requireAndroidHttpHeaders(deviceId, appVersion, platform); + assertDeviceEnabled(deviceId); if (loginUser != null) { return buildContext("USER_JWT", false, @@ -100,7 +87,7 @@ public class AndroidAuthServiceImpl implements AndroidAuthService { null, null); } - throw new RuntimeException("缺少 Android HTTP 访问令牌"); + throw new RuntimeException("Missing Android HTTP access token"); } private AndroidAuthContext buildContext(String authMode, boolean anonymous, String deviceId, @@ -108,7 +95,7 @@ public class AndroidAuthServiceImpl implements AndroidAuthService { String fallbackDeviceId, InternalAuthCheckResponse authResult, LoginUser loginUser) { String resolvedDeviceId = StringUtils.hasText(deviceId) ? deviceId : fallbackDeviceId; if (!StringUtils.hasText(resolvedDeviceId)) { - throw new RuntimeException("缺少 Android deviceId"); + throw new RuntimeException("Missing Android deviceId"); } AndroidAuthContext context = new AndroidAuthContext(); context.setAuthMode(authMode); @@ -148,14 +135,14 @@ public class AndroidAuthServiceImpl implements AndroidAuthService { private InternalAuthCheckResponse validateToken(String token) { String resolvedToken = normalizeToken(token); if (!StringUtils.hasText(resolvedToken)) { - throw new RuntimeException("缺少 Android 访问令牌"); + throw new RuntimeException("Missing Android access token"); } InternalAuthCheckResponse authResult = tokenValidationService.validateAccessToken(resolvedToken); if (authResult == null || !authResult.isValid()) { - throw new RuntimeException(authResult == null || !StringUtils.hasText(authResult.getMessage()) ? "Android 访问令牌无效" : authResult.getMessage()); + throw new RuntimeException(authResult == null || !StringUtils.hasText(authResult.getMessage()) ? "Android access token is invalid" : authResult.getMessage()); } if (authResult.getUserId() == null || authResult.getTenantId() == null) { - throw new RuntimeException("Android 访问令牌缺少用户或租户上下文"); + throw new RuntimeException("Android access token is missing user or tenant context"); } return authResult; } @@ -166,7 +153,7 @@ public class AndroidAuthServiceImpl implements AndroidAuthService { return null; } if (!authorization.startsWith(BEARER_PREFIX)) { - throw new RuntimeException("Android HTTP 访问令牌无效"); + throw new RuntimeException("Android HTTP access token format is invalid"); } return authorization.substring(BEARER_PREFIX.length()).trim(); } @@ -183,13 +170,23 @@ public class AndroidAuthServiceImpl implements AndroidAuthService { private void requireAndroidHttpHeaders(String deviceId, String appVersion, String platform) { if (!StringUtils.hasText(deviceId)) { - throw new RuntimeException("缺少 Android device_id"); + throw new RuntimeException("Missing Android device_id"); } if (!StringUtils.hasText(appVersion)) { - throw new RuntimeException("缺少 Android-App-Version 请求头"); + throw new RuntimeException("Missing X-Android-App-Version header"); } if (!StringUtils.hasText(platform)) { - throw new RuntimeException("缺少 X-Android-Platform 请求头"); + throw new RuntimeException("Missing X-Android-Platform header"); + } + } + + private void assertDeviceEnabled(String deviceId) { + if (!StringUtils.hasText(deviceId)) { + return; + } + DeviceInfoEntity device = deviceInfoMapper.selectByDeviceCodeIgnoreTenant(deviceId.trim()); + if (device != null && device.getStatus() != null && device.getStatus() == 0) { + throw new BusinessException("403", "设备被禁用"); } } diff --git a/backend/src/main/java/com/imeeting/service/android/impl/AndroidDeviceSessionServiceImpl.java b/backend/src/main/java/com/imeeting/service/android/impl/AndroidDeviceSessionServiceImpl.java index cf4b15c..9f24047 100644 --- a/backend/src/main/java/com/imeeting/service/android/impl/AndroidDeviceSessionServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/android/impl/AndroidDeviceSessionServiceImpl.java @@ -5,7 +5,6 @@ import com.imeeting.common.RedisKeys; import com.imeeting.config.grpc.GrpcServerProperties; import com.imeeting.dto.android.AndroidAuthContext; import com.imeeting.dto.android.AndroidDeviceSessionState; -import com.imeeting.grpc.gateway.DeviceHello; import com.imeeting.service.android.AndroidDeviceSessionService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -26,13 +25,13 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ private final GrpcServerProperties grpcServerProperties; @Override - public AndroidDeviceSessionState openSession(AndroidAuthContext authContext, DeviceHello hello) { + public AndroidDeviceSessionState openSession(AndroidAuthContext authContext, String requestedConnectionId) { AndroidDeviceSessionState state = new AndroidDeviceSessionState(); - state.setConnectionId("android_" + UUID.randomUUID().toString().replace("-", "")); + state.setConnectionId(nonBlank(requestedConnectionId, "android_" + UUID.randomUUID().toString().replace("-", ""))); state.setDeviceId(authContext.getDeviceId()); state.setStatus("ONLINE"); state.setLastSeenAt(System.currentTimeMillis()); - state.setAppVersion(nonBlank(hello.getAppVersion(), authContext.getAppVersion())); + state.setAppVersion(authContext.getAppVersion()); state.setPlatform(nonBlank(authContext.getPlatform(), "android")); state.setTenantCode(authContext.getTenantCode()); writeState(state); @@ -65,6 +64,26 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ } } + @Override + public AndroidDeviceSessionState getByDeviceId(String deviceId) { + String raw = redisTemplate.opsForValue().get(RedisKeys.androidDeviceOnlineKey(deviceId)); + if (raw == null || raw.isBlank()) { + return null; + } + try { + return objectMapper.readValue(raw, AndroidDeviceSessionState.class); + } catch (Exception ex) { + log.warn("Failed to read android device online state, deviceId={}", deviceId, ex); + return null; + } + } + + @Override + public String getActiveConnectionId(String deviceId) { + String value = redisTemplate.opsForValue().get(RedisKeys.androidDeviceActiveConnectionKey(deviceId)); + return value == null || value.isBlank() ? null : value; + } + @Override public void updateTopics(String deviceId, List topics) { try { @@ -73,7 +92,7 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ objectMapper.writeValueAsString(topics == null ? List.of() : topics) ); } catch (Exception ex) { - throw new RuntimeException("更新安卓设备主题失败", ex); + throw new RuntimeException("Failed to update android device topics", ex); } } @@ -84,11 +103,11 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ redisTemplate.delete(RedisKeys.androidDeviceConnectionKey(connectionId)); return; } - String activeConn = redisTemplate.opsForValue().get(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId())); + String activeConn = getActiveConnectionId(state.getDeviceId()); if (connectionId.equals(activeConn)) { redisTemplate.delete(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId())); + redisTemplate.delete(RedisKeys.androidDeviceOnlineKey(state.getDeviceId())); } - redisTemplate.delete(RedisKeys.androidDeviceOnlineKey(state.getDeviceId())); redisTemplate.delete(RedisKeys.androidDeviceConnectionKey(connectionId)); } @@ -100,7 +119,7 @@ public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionServ redisTemplate.opsForValue().set(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId()), state.getConnectionId(), ttl); redisTemplate.opsForValue().set(RedisKeys.androidDeviceConnectionKey(state.getConnectionId()), json, ttl); } catch (Exception ex) { - throw new RuntimeException("写入安卓设备会话状态失败", ex); + throw new RuntimeException("Failed to store android device session", ex); } } diff --git a/backend/src/main/java/com/imeeting/service/android/impl/AndroidGatewayPushServiceImpl.java b/backend/src/main/java/com/imeeting/service/android/impl/AndroidGatewayPushServiceImpl.java index ed27d63..8bfee0a 100644 --- a/backend/src/main/java/com/imeeting/service/android/impl/AndroidGatewayPushServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/android/impl/AndroidGatewayPushServiceImpl.java @@ -1,24 +1,38 @@ package com.imeeting.service.android.impl; -import com.imeeting.grpc.gateway.GatewayServerPacket; +import com.imeeting.grpc.push.PushMessage; +import com.imeeting.grpc.push.ServerMessage; import com.imeeting.service.android.AndroidGatewayPushService; import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +@Slf4j @Service public class AndroidGatewayPushServiceImpl implements AndroidGatewayPushService { private final Map byConnectionId = new ConcurrentHashMap<>(); - private final Map> byDeviceId = new ConcurrentHashMap<>(); + private final Map connectionByDeviceId = new ConcurrentHashMap<>(); @Override - public void register(String connectionId, String deviceId, StreamObserver observer) { - byConnectionId.put(connectionId, new Binding(deviceId, observer)); - byDeviceId.computeIfAbsent(deviceId, key -> ConcurrentHashMap.newKeySet()).add(connectionId); + public String register(String connectionId, String deviceId, StreamObserver observer) { + Binding newBinding = new Binding(deviceId, observer); + Binding previousBinding = byConnectionId.put(connectionId, newBinding); + if (previousBinding != null && !previousBinding.deviceId().equals(deviceId)) { + connectionByDeviceId.remove(previousBinding.deviceId(), connectionId); + } + + String previousConnectionId = connectionByDeviceId.put(deviceId, connectionId); + if (previousConnectionId != null && !previousConnectionId.equals(connectionId)) { + Binding replacedBinding = byConnectionId.remove(previousConnectionId); + if (replacedBinding != null) { + safeComplete(previousConnectionId, replacedBinding); + } + } + return previousConnectionId; } @Override @@ -27,43 +41,62 @@ public class AndroidGatewayPushServiceImpl implements AndroidGatewayPushService if (binding == null) { return; } - Set connectionIds = byDeviceId.get(binding.deviceId()); - if (connectionIds == null) { - return; - } - connectionIds.remove(connectionId); - if (connectionIds.isEmpty()) { - byDeviceId.remove(binding.deviceId()); - } + connectionByDeviceId.remove(binding.deviceId(), connectionId); } @Override - public boolean pushToConnection(String connectionId, GatewayServerPacket packet) { + public boolean pushToConnection(String connectionId, PushMessage message) { Binding binding = byConnectionId.get(connectionId); if (binding == null) { return false; } synchronized (binding) { - binding.observer().onNext(packet); + try { + binding.observer().onNext(ServerMessage.newBuilder().setPush(message).build()); + } catch (Exception ex) { + log.warn("Failed to push android message, connectionId={}, deviceId={}", connectionId, binding.deviceId(), ex); + unregister(connectionId); + return false; + } } return true; } @Override - public int pushToDevice(String deviceId, GatewayServerPacket packet) { - Set connectionIds = byDeviceId.get(deviceId); - if (connectionIds == null || connectionIds.isEmpty()) { + public int pushToDevice(String deviceId, PushMessage message) { + String connectionId = connectionByDeviceId.get(deviceId); + if (connectionId == null || connectionId.isBlank()) { return 0; } - int pushed = 0; - for (String connectionId : connectionIds) { - if (pushToConnection(connectionId, packet)) { - pushed++; - } - } - return pushed; + return pushToConnection(connectionId, message) ? 1 : 0; } - private record Binding(String deviceId, StreamObserver observer) { + @Override + public String disconnectDevice(String deviceId) { + String connectionId = connectionByDeviceId.get(deviceId); + if (connectionId == null || connectionId.isBlank()) { + return null; + } + Binding binding = byConnectionId.get(connectionId); + if (binding == null) { + connectionByDeviceId.remove(deviceId, connectionId); + return null; + } + unregister(connectionId); + safeComplete(connectionId, binding); + return connectionId; + } + + private void safeComplete(String connectionId, Binding binding) { + synchronized (binding) { + try { + binding.observer().onCompleted(); + } catch (Exception ex) { + log.debug("Failed to complete replaced android push stream, connectionId={}, deviceId={}", connectionId, binding.deviceId(), ex); + } + } + } + + private record Binding(String deviceId, StreamObserver observer) { } } diff --git a/backend/src/main/java/com/imeeting/service/biz/DeviceOnlineManagementService.java b/backend/src/main/java/com/imeeting/service/biz/DeviceOnlineManagementService.java new file mode 100644 index 0000000..aceb337 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/biz/DeviceOnlineManagementService.java @@ -0,0 +1,21 @@ +package com.imeeting.service.biz; + +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.dto.biz.DeviceAdminUpdateCommand; +import com.imeeting.dto.biz.DeviceOnlineAdminVO; +import com.unisbase.security.LoginUser; + +import java.util.List; + +public interface DeviceOnlineManagementService { + + void recordConnected(AndroidAuthContext authContext); + + void recordDisconnected(String deviceCode, Long lastSeenAtMillis); + + List listForAdmin(LoginUser loginUser); + + DeviceOnlineAdminVO update(Long id, DeviceAdminUpdateCommand command, LoginUser loginUser); + + boolean kick(Long id, LoginUser loginUser); +} diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/DeviceOnlineManagementServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/DeviceOnlineManagementServiceImpl.java new file mode 100644 index 0000000..74333b4 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/biz/impl/DeviceOnlineManagementServiceImpl.java @@ -0,0 +1,160 @@ +package com.imeeting.service.biz.impl; + +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.dto.android.AndroidDeviceSessionState; +import com.imeeting.dto.biz.DeviceAdminUpdateCommand; +import com.imeeting.dto.biz.DeviceOnlineAdminVO; +import com.imeeting.entity.biz.DeviceInfoEntity; +import com.imeeting.mapper.DeviceInfoMapper; +import com.imeeting.service.android.AndroidDeviceSessionService; +import com.imeeting.service.android.AndroidGatewayPushService; +import com.imeeting.service.biz.DeviceOnlineManagementService; +import com.unisbase.security.LoginUser; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; + +@Service +@RequiredArgsConstructor +public class DeviceOnlineManagementServiceImpl implements DeviceOnlineManagementService { + + private final DeviceInfoMapper deviceInfoMapper; + private final AndroidDeviceSessionService androidDeviceSessionService; + private final AndroidGatewayPushService androidGatewayPushService; + + @Override + public void recordConnected(AndroidAuthContext authContext) { + if (authContext == null || !StringUtils.hasText(authContext.getDeviceId())) { + return; + } + String deviceCode = authContext.getDeviceId().trim(); + DeviceInfoEntity existing = deviceInfoMapper.selectByDeviceCodeIgnoreTenant(deviceCode); + LocalDateTime now = LocalDateTime.now(); + if (existing == null) { + DeviceInfoEntity created = new DeviceInfoEntity(); + created.setTenantId(authContext.getTenantId()); + created.setUserId(authContext.getUserId()); + created.setDeviceCode(deviceCode); + created.setDeviceName(null); + created.setTerminalType(normalizeTerminalType(authContext.getPlatform())); + created.setTerminalVersion(normalize(authContext.getAppVersion())); + created.setLastOnlineAt(now); + created.setStatus(1); + deviceInfoMapper.insert(created); + return; + } + + existing.setTerminalType(normalizeTerminalType(authContext.getPlatform())); + existing.setTerminalVersion(normalize(authContext.getAppVersion())); + existing.setLastOnlineAt(now); + if (existing.getUserId() == null && authContext.getUserId() != null) { + existing.setUserId(authContext.getUserId()); + } + if (existing.getTenantId() == null && authContext.getTenantId() != null) { + existing.setTenantId(authContext.getTenantId()); + } + deviceInfoMapper.updateById(existing); + } + + @Override + public void recordDisconnected(String deviceCode, Long lastSeenAtMillis) { + if (!StringUtils.hasText(deviceCode)) { + return; + } + DeviceInfoEntity existing = deviceInfoMapper.selectByDeviceCodeIgnoreTenant(deviceCode.trim()); + if (existing == null) { + return; + } + existing.setLastOnlineAt(toLocalDateTime(lastSeenAtMillis)); + deviceInfoMapper.updateById(existing); + } + + @Override + public List listForAdmin(LoginUser loginUser) { + List devices = deviceInfoMapper.selectAdminList(loginUser == null ? null : loginUser.getTenantId(), isPlatformAdmin(loginUser)); + for (DeviceOnlineAdminVO device : devices) { + AndroidDeviceSessionState state = androidDeviceSessionService.getByDeviceId(device.getDeviceCode()); + if (state != null) { + device.setOnline(true); + device.setLastOnlineAt(toLocalDateTime(state.getLastSeenAt())); + } else { + device.setOnline(false); + } + } + return devices; + } + + @Override + public DeviceOnlineAdminVO update(Long id, DeviceAdminUpdateCommand command, LoginUser loginUser) { + DeviceInfoEntity existing = requireVisibleDevice(id, loginUser); + existing.setDeviceName(normalize(command.getDeviceName())); + boolean disableAfterUpdate = command.getStatus() != null && command.getStatus() == 0; + if (command.getStatus() != null) { + existing.setStatus(command.getStatus()); + } + deviceInfoMapper.updateById(existing); + if (disableAfterUpdate) { + disconnectDevice(existing.getDeviceCode()); + } + return listForAdmin(loginUser).stream() + .filter(item -> id.equals(item.getDeviceId())) + .findFirst() + .orElseThrow(() -> new RuntimeException("Device not found after update")); + } + + @Override + public boolean kick(Long id, LoginUser loginUser) { + DeviceInfoEntity existing = requireVisibleDevice(id, loginUser); + disconnectDevice(existing.getDeviceCode()); + return true; + } + + private DeviceInfoEntity requireVisibleDevice(Long id, LoginUser loginUser) { + DeviceInfoEntity existing = deviceInfoMapper.selectByIdIgnoreTenant(id); + if (existing == null) { + throw new RuntimeException("Device not found"); + } + if (!isPlatformAdmin(loginUser) && loginUser != null && loginUser.getTenantId() != null) { + if (existing.getTenantId() == null || !loginUser.getTenantId().equals(existing.getTenantId())) { + throw new RuntimeException("Device is not visible in current tenant"); + } + } + return existing; + } + + private boolean isPlatformAdmin(LoginUser loginUser) { + return loginUser != null && Boolean.TRUE.equals(loginUser.getIsPlatformAdmin()); + } + + private String normalizeTerminalType(String platform) { + String normalized = normalize(platform); + return normalized == null ? null : normalized.toLowerCase(); + } + + private String normalize(String value) { + if (!StringUtils.hasText(value)) { + return null; + } + return value.trim(); + } + + private void disconnectDevice(String deviceCode) { + String activeConnectionId = androidDeviceSessionService.getActiveConnectionId(deviceCode); + AndroidDeviceSessionState state = activeConnectionId == null ? null : androidDeviceSessionService.getByConnectionId(activeConnectionId); + androidGatewayPushService.disconnectDevice(deviceCode); + if (activeConnectionId != null) { + androidDeviceSessionService.closeSession(activeConnectionId); + } + recordDisconnected(deviceCode, state == null ? null : state.getLastSeenAt()); + } + + private LocalDateTime toLocalDateTime(Long millis) { + long timestamp = millis != null && millis > 0 ? millis : System.currentTimeMillis(); + return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()); + } +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/AndroidRealtimeSessionTicketService.java b/backend/src/main/java/com/imeeting/service/realtime/AndroidRealtimeSessionTicketService.java deleted file mode 100644 index 21686aa..0000000 --- a/backend/src/main/java/com/imeeting/service/realtime/AndroidRealtimeSessionTicketService.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.imeeting.service.realtime; - -import com.imeeting.dto.android.AndroidAuthContext; -import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand; -import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; -import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO; - -public interface AndroidRealtimeSessionTicketService { - AndroidRealtimeGrpcSessionVO createSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext); - - AndroidRealtimeGrpcSessionData prepareSessionData(Long meetingId, AndroidAuthContext authContext); - - AndroidRealtimeGrpcSessionData getSessionData(String streamToken); -} diff --git a/backend/src/main/java/com/imeeting/service/realtime/AsrUpstreamBridgeService.java b/backend/src/main/java/com/imeeting/service/realtime/AsrUpstreamBridgeService.java deleted file mode 100644 index 21a0265..0000000 --- a/backend/src/main/java/com/imeeting/service/realtime/AsrUpstreamBridgeService.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.imeeting.service.realtime; - -import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; - -public interface AsrUpstreamBridgeService { - - AsrUpstreamSession openSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId, AsrUpstreamEventListener listener); - - interface AsrUpstreamSession { - boolean isReady(); - - void sendAudio(byte[] payload); - - void sendStopSpeaking(); - - void close(String reason); - } - - interface AsrUpstreamEventListener { - void onReady(); - - void onTranscript(AsrTranscriptResult result); - - void onError(String code, String message, boolean retryable); - - void onClosed(String reason); - } - - class AsrTranscriptResult { - private final boolean finalResult; - private final String text; - private final String speakerId; - private final String speakerName; - private final Integer startTime; - private final Integer endTime; - - public AsrTranscriptResult(boolean finalResult, String text, String speakerId, String speakerName, - Integer startTime, Integer endTime) { - this.finalResult = finalResult; - this.text = text; - this.speakerId = speakerId; - this.speakerName = speakerName; - this.startTime = startTime; - this.endTime = endTime; - } - - public boolean isFinalResult() { - return finalResult; - } - - public String getText() { - return text; - } - - public String getSpeakerId() { - return speakerId; - } - - public String getSpeakerName() { - return speakerName; - } - - public Integer getStartTime() { - return startTime; - } - - public Integer getEndTime() { - return endTime; - } - } -} diff --git a/backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingGrpcSessionService.java b/backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingGrpcSessionService.java deleted file mode 100644 index 8425263..0000000 --- a/backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingGrpcSessionService.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.imeeting.service.realtime; - -import com.imeeting.dto.android.AndroidAuthContext; -import io.grpc.stub.StreamObserver; -import com.imeeting.grpc.realtime.RealtimeServerPacket; - -public interface RealtimeMeetingGrpcSessionService { - String openStream(Long meetingId, String streamToken, AndroidAuthContext authContext, StreamObserver responseObserver); - - void onAudio(String connectionId, byte[] payload, long seq, boolean lastChunk); - - void onStopSpeaking(String connectionId); - - void closeStream(String connectionId, String reason, boolean notifyClient); -} diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/AndroidRealtimeSessionTicketServiceImpl.java b/backend/src/main/java/com/imeeting/service/realtime/impl/AndroidRealtimeSessionTicketServiceImpl.java deleted file mode 100644 index 747a3d2..0000000 --- a/backend/src/main/java/com/imeeting/service/realtime/impl/AndroidRealtimeSessionTicketServiceImpl.java +++ /dev/null @@ -1,251 +0,0 @@ -package com.imeeting.service.realtime.impl; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.imeeting.common.MeetingConstants; -import com.imeeting.common.RedisKeys; -import com.imeeting.config.grpc.GrpcServerProperties; -import com.imeeting.dto.android.AndroidAuthContext; -import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand; -import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; -import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO; -import com.imeeting.dto.biz.AiModelVO; -import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; -import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; -import com.imeeting.entity.biz.Meeting; -import com.imeeting.service.biz.AiModelService; -import com.imeeting.service.biz.MeetingAccessService; -import com.imeeting.service.biz.MeetingAuthorizationService; -import com.imeeting.service.biz.RealtimeMeetingSessionStateService; -import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService; -import lombok.RequiredArgsConstructor; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Service; - -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -@Service -@RequiredArgsConstructor -public class AndroidRealtimeSessionTicketServiceImpl implements AndroidRealtimeSessionTicketService { - - private final ObjectMapper objectMapper; - private final StringRedisTemplate redisTemplate; - private final MeetingAccessService meetingAccessService; - private final MeetingAuthorizationService meetingAuthorizationService; - private final AiModelService aiModelService; - private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; - private final GrpcServerProperties grpcServerProperties; - - @Override - public AndroidRealtimeGrpcSessionVO createSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext) { - PreparedRealtimeSession prepared = prepareSession(meetingId, command, authContext); - - String streamToken = UUID.randomUUID().toString().replace("-", ""); - Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getSessionTtlSeconds()); - try { - redisTemplate.opsForValue().set( - RedisKeys.realtimeMeetingGrpcSessionKey(streamToken), - objectMapper.writeValueAsString(prepared.sessionData()), - ttl - ); - } catch (Exception ex) { - throw new RuntimeException("创建实时会议 gRPC 会话失败", ex); - } - - AndroidRealtimeGrpcSessionVO vo = new AndroidRealtimeGrpcSessionVO(); - vo.setMeetingId(meetingId); - vo.setStreamToken(streamToken); - vo.setExpiresInSeconds(ttl.toSeconds()); - vo.setSampleRate(grpcServerProperties.getRealtime().getSampleRate()); - vo.setChannels(grpcServerProperties.getRealtime().getChannels()); - vo.setEncoding(grpcServerProperties.getRealtime().getEncoding()); - vo.setResumeConfig(prepared.resumeConfig()); - vo.setStatus(prepared.status()); - return vo; - } - - @Override - public AndroidRealtimeGrpcSessionData prepareSessionData(Long meetingId, AndroidAuthContext authContext) { - return prepareSession(meetingId, null, authContext).sessionData(); - } - - @Override - public AndroidRealtimeGrpcSessionData getSessionData(String streamToken) { - if (streamToken == null || streamToken.isBlank()) { - return null; - } - String raw = redisTemplate.opsForValue().get(RedisKeys.realtimeMeetingGrpcSessionKey(streamToken)); - if (raw == null || raw.isBlank()) { - return null; - } - try { - return objectMapper.readValue(raw, AndroidRealtimeGrpcSessionData.class); - } catch (Exception ex) { - throw new RuntimeException("读取实时会议 gRPC 会话失败", ex); - } - } - - private PreparedRealtimeSession prepareSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext) { - if (meetingId == null) { - throw new RuntimeException("会议 ID 不能为空"); - } - Meeting meeting = meetingAccessService.requireMeeting(meetingId); - meetingAuthorizationService.assertCanControlRealtimeMeeting(meeting, authContext, MeetingConstants.SOURCE_ANDROID); - realtimeMeetingSessionStateService.initSessionIfAbsent(meetingId, meeting.getTenantId(), meeting.getCreatorId()); - RealtimeMeetingSessionStatusVO currentStatus = realtimeMeetingSessionStateService.getStatus(meetingId); - RealtimeMeetingResumeConfig currentResumeConfig = currentStatus == null ? null : currentStatus.getResumeConfig(); - - Long asrModelId = firstNonNull( - command == null ? null : command.getAsrModelId(), - currentResumeConfig == null ? null : currentResumeConfig.getAsrModelId(), - resolveDefaultAsrModelId(meeting.getTenantId()) - ); - if (asrModelId == null) { - throw new RuntimeException("ASR 模型 ID 不能为空"); - } - - realtimeMeetingSessionStateService.assertCanOpenSession(meetingId); - AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR"); - if (asrModel == null) { - throw new RuntimeException("ASR 模型不存在"); - } - String targetWsUrl = resolveWsUrl(asrModel); - if (targetWsUrl == null || targetWsUrl.isBlank()) { - throw new RuntimeException("ASR 模型未配置 WebSocket 地址"); - } - - RealtimeMeetingResumeConfig resumeConfig = buildResumeConfig(command, currentResumeConfig, asrModelId); - realtimeMeetingSessionStateService.rememberResumeConfig(meetingId, resumeConfig); - RealtimeMeetingSessionStatusVO latestStatus = realtimeMeetingSessionStateService.getStatus(meetingId); - - Map startMessage = buildStartMessage(asrModel, meetingId, resumeConfig); - AndroidRealtimeGrpcSessionData sessionData = new AndroidRealtimeGrpcSessionData(); - sessionData.setMeetingId(meetingId); - sessionData.setTenantId(authContext != null && authContext.getTenantId() != null ? authContext.getTenantId() : meeting.getTenantId()); - sessionData.setUserId(authContext != null && authContext.getUserId() != null ? authContext.getUserId() : meeting.getCreatorId()); - sessionData.setDeviceId(authContext == null ? null : authContext.getDeviceId()); - sessionData.setAsrModelId(asrModelId); - sessionData.setTargetWsUrl(targetWsUrl); - sessionData.setResumeConfig(resumeConfig); - try { - sessionData.setStartMessageJson(objectMapper.writeValueAsString(startMessage)); - } catch (Exception ex) { - throw new RuntimeException("序列化实时启动消息失败", ex); - } - return new PreparedRealtimeSession(sessionData, resumeConfig, latestStatus); - } - - private Long resolveDefaultAsrModelId(Long tenantId) { - AiModelVO defaultModel = aiModelService.getDefaultModel("ASR", tenantId == null ? 0L : tenantId); - return defaultModel == null ? null : defaultModel.getId(); - } - - private RealtimeMeetingResumeConfig buildResumeConfig(AndroidOpenRealtimeGrpcSessionCommand command, - RealtimeMeetingResumeConfig currentResumeConfig, - Long asrModelId) { - RealtimeMeetingResumeConfig config = new RealtimeMeetingResumeConfig(); - config.setAsrModelId(asrModelId); - config.setMode(nonBlank(command == null ? null : command.getMode(), currentResumeConfig == null ? null : currentResumeConfig.getMode(), "2pass")); - config.setLanguage(nonBlank(command == null ? null : command.getLanguage(), currentResumeConfig == null ? null : currentResumeConfig.getLanguage(), "auto")); - config.setUseSpkId(firstNonNull(command == null ? null : command.getUseSpkId(), currentResumeConfig == null ? null : currentResumeConfig.getUseSpkId(), 1)); - config.setEnablePunctuation(firstNonNull(command == null ? null : command.getEnablePunctuation(), currentResumeConfig == null ? null : currentResumeConfig.getEnablePunctuation(), Boolean.TRUE)); - config.setEnableItn(firstNonNull(command == null ? null : command.getEnableItn(), currentResumeConfig == null ? null : currentResumeConfig.getEnableItn(), Boolean.TRUE)); - config.setEnableTextRefine(firstNonNull(command == null ? null : command.getEnableTextRefine(), currentResumeConfig == null ? null : currentResumeConfig.getEnableTextRefine(), Boolean.FALSE)); - config.setSaveAudio(firstNonNull(command == null ? null : command.getSaveAudio(), currentResumeConfig == null ? null : currentResumeConfig.getSaveAudio(), Boolean.FALSE)); - config.setHotwords(command != null && command.getHotwords() != null - ? command.getHotwords() - : currentResumeConfig == null || currentResumeConfig.getHotwords() == null - ? List.of() - : currentResumeConfig.getHotwords()); - return config; - } - - private String resolveWsUrl(AiModelVO model) { - if (model.getWsUrl() != null && !model.getWsUrl().isBlank()) { - return model.getWsUrl(); - } - if (model.getBaseUrl() == null || model.getBaseUrl().isBlank()) { - return ""; - } - return model.getBaseUrl() - .replaceFirst("^http://", "ws://") - .replaceFirst("^https://", "wss://"); - } - - private Map buildStartMessage(AiModelVO model, Long meetingId, RealtimeMeetingResumeConfig resumeConfig) { - Map root = new HashMap<>(); - root.put("type", "start"); - root.put("request_id", "android_" + System.currentTimeMillis() + "_" + meetingId); - root.put("authorization", buildAuthorization(model.getApiKey())); - - Map config = new HashMap<>(); - Map audio = new HashMap<>(); - audio.put("format", "pcm"); - audio.put("sample_rate", grpcServerProperties.getRealtime().getSampleRate()); - audio.put("channels", grpcServerProperties.getRealtime().getChannels()); - config.put("audio", audio); - - Map recognition = new HashMap<>(); - recognition.put("language", nonBlank(resumeConfig.getLanguage(), "auto")); - recognition.put("enable_punctuation", boolOrDefault(resumeConfig.getEnablePunctuation(), true)); - recognition.put("enable_itn", boolOrDefault(resumeConfig.getEnableItn(), true)); - recognition.put("enable_speaker", Integer.valueOf(1).equals(resumeConfig.getUseSpkId())); - recognition.put("enable_two_pass", !"online".equalsIgnoreCase(resumeConfig.getMode())); - recognition.put("enable_text_refine", boolOrDefault(resumeConfig.getEnableTextRefine(), false)); - recognition.put("speaker_threshold", readSpeakerThreshold(model.getMediaConfig())); - recognition.put("hotwords", resumeConfig.getHotwords() == null ? List.of() : resumeConfig.getHotwords()); - config.put("recognition", recognition); - - config.put("model", model.getModelCode()); - config.put("save_audio", boolOrDefault(resumeConfig.getSaveAudio(), false)); - root.put("config", config); - return root; - } - - private String buildAuthorization(String apiKey) { - if (apiKey == null || apiKey.isBlank()) { - return ""; - } - return apiKey.startsWith("Bearer ") ? apiKey : "Bearer " + apiKey; - } - - private Object readSpeakerThreshold(Map mediaConfig) { - return mediaConfig == null ? null : mediaConfig.get("svThreshold"); - } - - private boolean boolOrDefault(Boolean value, boolean defaultValue) { - return value != null ? value : defaultValue; - } - - @SafeVarargs - private T firstNonNull(T... values) { - for (T value : values) { - if (value != null) { - return value; - } - } - return null; - } - - private String nonBlank(String value, String defaultValue) { - return nonBlank(value, defaultValue, null); - } - - private String nonBlank(String value, String fallbackValue, String defaultValue) { - if (value != null && !value.isBlank()) { - return value.trim(); - } - if (fallbackValue != null && !fallbackValue.isBlank()) { - return fallbackValue.trim(); - } - return defaultValue; - } - - private record PreparedRealtimeSession(AndroidRealtimeGrpcSessionData sessionData, - RealtimeMeetingResumeConfig resumeConfig, - RealtimeMeetingSessionStatusVO status) { - } -} diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/AsrUpstreamBridgeServiceImpl.java b/backend/src/main/java/com/imeeting/service/realtime/impl/AsrUpstreamBridgeServiceImpl.java deleted file mode 100644 index a599452..0000000 --- a/backend/src/main/java/com/imeeting/service/realtime/impl/AsrUpstreamBridgeServiceImpl.java +++ /dev/null @@ -1,262 +0,0 @@ -package com.imeeting.service.realtime.impl; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; -import com.imeeting.service.realtime.AsrUpstreamBridgeService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.WebSocket; -import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; - -@Slf4j -@Service -@RequiredArgsConstructor -public class AsrUpstreamBridgeServiceImpl implements AsrUpstreamBridgeService { - - private final ObjectMapper objectMapper; - - @Override - public AsrUpstreamSession openSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId, - AsrUpstreamEventListener listener) { - BridgeSession session = new BridgeSession(sessionData, connectionId, listener, objectMapper); - session.connect(); - return session; - } - - private static final class BridgeSession implements AsrUpstreamSession { - private final AndroidRealtimeGrpcSessionData sessionData; - private final String connectionId; - private final AsrUpstreamEventListener listener; - private final ObjectMapper objectMapper; - private final HttpClient httpClient = HttpClient.newHttpClient(); - private final Queue pendingAudio = new ConcurrentLinkedQueue<>(); - private final AtomicBoolean ready = new AtomicBoolean(false); - private final AtomicBoolean closed = new AtomicBoolean(false); - private final StringBuilder textBuffer = new StringBuilder(); - private volatile WebSocket webSocket; - private CompletableFuture sendChain = CompletableFuture.completedFuture(null); - - private BridgeSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId, - AsrUpstreamEventListener listener, ObjectMapper objectMapper) { - this.sessionData = sessionData; - this.connectionId = connectionId; - this.listener = listener; - this.objectMapper = objectMapper; - } - - private void connect() { - try { - WebSocket socket = httpClient.newWebSocketBuilder() - .buildAsync(URI.create(sessionData.getTargetWsUrl()), new ListenerImpl()) - .get(); - this.webSocket = socket; - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - listener.onError("REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接第三方识别服务时被中断", true); - } catch (ExecutionException ex) { - listener.onError("REALTIME_UPSTREAM_CONNECT_FAILED", "连接第三方识别服务失败,请检查模型 WebSocket 配置或服务状态", true); - } - } - - @Override - public boolean isReady() { - return ready.get(); - } - - @Override - public void sendAudio(byte[] payload) { - if (payload == null || payload.length == 0 || closed.get()) { - return; - } - if (!ready.get() || webSocket == null) { - pendingAudio.add(payload); - return; - } - sendOrdered(() -> webSocket.sendBinary(ByteBuffer.wrap(payload), true)); - } - - @Override - public void sendStopSpeaking() { - if (closed.get() || webSocket == null) { - return; - } - sendOrdered(() -> webSocket.sendText("{\"is_speaking\":false}", true)); - } - - @Override - public void close(String reason) { - if (!closed.compareAndSet(false, true)) { - return; - } - if (webSocket != null) { - webSocket.sendClose(1000, reason == null ? "" : reason); - } - } - - private void sendOrdered(java.util.function.Supplier> action) { - synchronized (this) { - sendChain = sendChain.exceptionally(ex -> null) - .thenCompose(ignored -> action.get().thenApply(ws -> null)); - } - } - - private void flushPendingAudio() { - byte[] payload; - while ((payload = pendingAudio.poll()) != null) { - sendAudio(payload); - } - } - - private void handleTextMessage(String payload) { - try { - JsonNode root = objectMapper.readTree(payload); - if ((root.hasNonNull("code") || "error".equals(root.path("type").asText())) && root.hasNonNull("message")) { - listener.onError( - root.path("code").asText("REALTIME_UPSTREAM_ERROR"), - root.path("message").asText("第三方识别服务连接异常"), - true - ); - return; - } - AsrTranscriptResult result = normalizeTranscript(root); - if (result != null) { - listener.onTranscript(result); - } - } catch (Exception ex) { - log.debug("Ignore invalid upstream ASR payload, connectionId={}, payload={}", connectionId, payload, ex); - } - } - - private AsrTranscriptResult normalizeTranscript(JsonNode root) { - String type = root.path("type").asText(); - if ("partial".equals(type) || "segment".equals(type)) { - JsonNode data = root.path("data"); - String text = data.path("text").asText(""); - if (text.isBlank()) { - return null; - } - boolean isFinal = "segment".equals(type) || data.path("is_final").asBoolean(false); - String speakerId = textValue(data, "speaker_id"); - String speakerName = textValue(data, "speaker_name"); - if (speakerName == null) { - speakerName = speakerId; - } - Integer startTime = toMs(data.path("start")); - Integer endTime = toMs(data.path("end")); - return new AsrTranscriptResult(isFinal, text, speakerId, speakerName, startTime, endTime); - } - - String text = root.path("text").asText(""); - if (text.isBlank()) { - return null; - } - boolean isFinal = root.path("is_final").asBoolean(false); - JsonNode speaker = root.path("speaker"); - String speakerId = null; - String speakerName = null; - if (speaker.isTextual()) { - speakerId = speaker.asText(); - speakerName = speaker.asText(); - } else if (speaker.isObject()) { - speakerId = textValue(speaker, "user_id"); - if (speakerId == null) { - speakerId = textValue(speaker, "speaker_id"); - } - speakerName = textValue(speaker, "name"); - if (speakerName == null) { - speakerName = speakerId; - } - } - Integer startTime = null; - Integer endTime = null; - JsonNode timestamp = root.path("timestamp"); - if (timestamp.isArray() && timestamp.size() > 0) { - JsonNode first = timestamp.get(0); - JsonNode last = timestamp.get(timestamp.size() - 1); - if (first.isArray() && first.size() > 0) { - startTime = first.get(0).isInt() ? first.get(0).asInt() : null; - } - if (last.isArray() && last.size() > 1) { - endTime = last.get(1).isInt() ? last.get(1).asInt() : null; - } - } - return new AsrTranscriptResult(isFinal, text, speakerId, speakerName, startTime, endTime); - } - - private String textValue(JsonNode node, String fieldName) { - JsonNode target = node.path(fieldName); - if (target.isMissingNode() || target.isNull()) { - return null; - } - String value = target.asText(); - return value == null || value.isBlank() ? null : value; - } - - private Integer toMs(JsonNode node) { - if (!node.isNumber()) { - return null; - } - return (int) Math.round(node.asDouble() * 1000D); - } - - private final class ListenerImpl implements WebSocket.Listener { - @Override - public void onOpen(WebSocket webSocket) { - sendOrdered(() -> webSocket.sendText(sessionData.getStartMessageJson(), true)); - ready.set(true); - flushPendingAudio(); - listener.onReady(); - webSocket.request(1); - } - - @Override - public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { - textBuffer.append(data); - if (last) { - handleTextMessage(textBuffer.toString()); - textBuffer.setLength(0); - } - webSocket.request(1); - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletionStage onBinary(WebSocket webSocket, ByteBuffer data, boolean last) { - webSocket.request(1); - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { - if (closed.compareAndSet(false, true)) { - listener.onClosed(reason == null || reason.isBlank() ? "upstream_closed" : reason); - } - return CompletableFuture.completedFuture(null); - } - - @Override - public void onError(WebSocket webSocket, Throwable error) { - if (closed.compareAndSet(false, true)) { - listener.onError( - "REALTIME_UPSTREAM_ERROR", - error == null || error.getMessage() == null || error.getMessage().isBlank() - ? "第三方识别服务连接异常" - : "第三方识别服务连接异常: " + error.getMessage(), - true - ); - } - } - } - } -} diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java b/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java deleted file mode 100644 index d55ccb9..0000000 --- a/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java +++ /dev/null @@ -1,338 +0,0 @@ -package com.imeeting.service.realtime.impl; - -import com.imeeting.common.RedisKeys; -import com.imeeting.config.grpc.GrpcServerProperties; -import com.imeeting.dto.android.AndroidAuthContext; -import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; -import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; -import com.imeeting.dto.biz.RealtimeTranscriptItemDTO; -import com.imeeting.grpc.common.ErrorEvent; -import com.imeeting.grpc.realtime.RealtimeServerPacket; -import com.imeeting.grpc.realtime.SessionStatusEvent; -import com.imeeting.grpc.realtime.StreamClosed; -import com.imeeting.grpc.realtime.StreamReady; -import com.imeeting.grpc.realtime.TranscriptEvent; -import com.imeeting.service.biz.MeetingCommandService; -import com.imeeting.service.biz.RealtimeMeetingSessionStateService; -import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService; -import com.imeeting.service.realtime.AsrUpstreamBridgeService; -import com.imeeting.service.realtime.RealtimeMeetingAudioStorageService; -import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService; -import io.grpc.stub.StreamObserver; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Service; - -import java.time.Duration; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; - -@Slf4j -@Service -@RequiredArgsConstructor -public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrpcSessionService { - - private final AndroidRealtimeSessionTicketService ticketService; - private final AsrUpstreamBridgeService asrUpstreamBridgeService; - private final MeetingCommandService meetingCommandService; - private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; - private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService; - private final StringRedisTemplate redisTemplate; - private final GrpcServerProperties grpcServerProperties; - - private final ConcurrentMap sessions = new ConcurrentHashMap<>(); - - @Override - public String openStream(Long meetingId, String streamToken, AndroidAuthContext authContext, StreamObserver responseObserver) { - AndroidRealtimeGrpcSessionData sessionData; - if (meetingId != null && meetingId > 0) { - sessionData = ticketService.prepareSessionData(meetingId, authContext); - streamToken = ""; - } else if (streamToken != null && !streamToken.isBlank()) { - sessionData = ticketService.getSessionData(streamToken); - if (sessionData == null) { - throw new RuntimeException("实时会议 gRPC 会话令牌无效"); - } - if (sessionData.getDeviceId() != null && !sessionData.getDeviceId().isBlank() - && authContext.getDeviceId() != null - && !sessionData.getDeviceId().equals(authContext.getDeviceId())) { - throw new RuntimeException("实时会议 gRPC 会话令牌与 deviceId 不匹配"); - } - } else { - throw new RuntimeException("会议 ID 不能为空"); - } - - String connectionId = "grpc_" + java.util.UUID.randomUUID().toString().replace("-", ""); - SessionRuntime runtime = new SessionRuntime(connectionId, streamToken, sessionData, responseObserver); - SessionRuntime previous = sessions.putIfAbsent(connectionId, runtime); - if (previous != null) { - throw new RuntimeException("实时会议 gRPC connectionId 重复"); - } - - try { - writeConnectionState(runtime); - realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), connectionId); - runtime.upstreamSession = asrUpstreamBridgeService.openSession(sessionData, connectionId, new UpstreamCallback(runtime)); - return connectionId; - } catch (Exception ex) { - sessions.remove(connectionId); - cleanupFailedOpen(runtime); - throw ex; - } - } - - @Override - public void onAudio(String connectionId, byte[] payload, long seq, boolean lastChunk) { - SessionRuntime runtime = sessions.get(connectionId); - if (runtime == null || runtime.closed.get()) { - return; - } - touchConnectionState(runtime); - realtimeMeetingAudioStorageService.append(connectionId, payload); - runtime.upstreamSession.sendAudio(payload); - if (lastChunk) { - runtime.upstreamSession.sendStopSpeaking(); - } - } - - @Override - public void onStopSpeaking(String connectionId) { - SessionRuntime runtime = sessions.get(connectionId); - if (runtime == null || runtime.closed.get()) { - return; - } - touchConnectionState(runtime); - runtime.upstreamSession.sendStopSpeaking(); - } - - @Override - public void closeStream(String connectionId, String reason, boolean notifyClient) { - SessionRuntime runtime = sessions.remove(connectionId); - if (runtime == null) { - return; - } - if (!runtime.closed.compareAndSet(false, true)) { - return; - } - - try { - if (runtime.upstreamSession != null) { - runtime.upstreamSession.close(reason == null ? "closed" : reason); - } - } catch (Exception ex) { - log.warn("Failed to close upstream realtime session, connectionId={}", connectionId, ex); - } - - try { - redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(connectionId)); - } catch (Exception ex) { - log.error("Failed to delete realtime gRPC connection state, connectionId={}", connectionId, ex); - } - - try { - realtimeMeetingAudioStorageService.closeSession(connectionId); - } catch (Exception ex) { - log.error("Failed to close realtime gRPC audio session, connectionId={}", connectionId, ex); - } - - try { - realtimeMeetingSessionStateService.pauseByDisconnect(runtime.sessionData.getMeetingId(), connectionId); - } catch (Exception ex) { - log.error("Failed to pause realtime meeting by disconnect, meetingId={}, connectionId={}", - runtime.sessionData.getMeetingId(), connectionId, ex); - } - - if (notifyClient) { - runtime.send(RealtimeServerPacket.newBuilder() - .setClosed(StreamClosed.newBuilder() - .setMeetingId(runtime.sessionData.getMeetingId()) - .setReason(reason == null ? "closed" : reason) - .build()) - .build()); - } - runtime.complete(); - } - - private void cleanupFailedOpen(SessionRuntime runtime) { - try { - redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(runtime.connectionId)); - } catch (Exception ex) { - log.warn("Failed to rollback realtime gRPC connection state, connectionId={}", runtime.connectionId, ex); - } - try { - realtimeMeetingAudioStorageService.closeSession(runtime.connectionId); - } catch (Exception ex) { - log.warn("Failed to rollback realtime gRPC audio session, connectionId={}", runtime.connectionId, ex); - } - try { - if (runtime.upstreamSession != null) { - runtime.upstreamSession.close("open_failed"); - } - } catch (Exception ex) { - log.warn("Failed to rollback realtime gRPC upstream session, connectionId={}", runtime.connectionId, ex); - } - } - - private void writeConnectionState(SessionRuntime runtime) { - Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getConnectionTtlSeconds()); - String value = runtime.sessionData.getMeetingId() + ":" + runtime.sessionData.getDeviceId() + ":" + runtime.streamToken; - redisTemplate.opsForValue().set(RedisKeys.realtimeMeetingGrpcConnectionKey(runtime.connectionId), value, ttl); - } - - private void touchConnectionState(SessionRuntime runtime) { - writeConnectionState(runtime); - } - - private long nextEventSeq(Long meetingId) { - Long value = redisTemplate.opsForValue().increment(RedisKeys.realtimeMeetingEventSeqKey(meetingId)); - return value == null ? 1L : value; - } - - private TranscriptEvent.TranscriptType toTranscriptType(boolean finalResult) { - return finalResult ? TranscriptEvent.TranscriptType.FINAL : TranscriptEvent.TranscriptType.PARTIAL; - } - - private SessionStatusEvent buildStatusEvent(Long meetingId) { - RealtimeMeetingSessionStatusVO status = realtimeMeetingSessionStateService.getStatus(meetingId); - if (status == null) { - return SessionStatusEvent.newBuilder().setMeetingId(meetingId).build(); - } - return SessionStatusEvent.newBuilder() - .setMeetingId(meetingId) - .setStatus(nullToEmpty(status.getStatus())) - .setHasTranscript(Boolean.TRUE.equals(status.getHasTranscript())) - .setCanResume(Boolean.TRUE.equals(status.getCanResume())) - .setRemainingSeconds(status.getRemainingSeconds() == null ? 0L : status.getRemainingSeconds()) - .setActiveConnection(Boolean.TRUE.equals(status.getActiveConnection())) - .build(); - } - - private String nullToEmpty(String value) { - return value == null ? "" : value; - } - - private final class UpstreamCallback implements AsrUpstreamBridgeService.AsrUpstreamEventListener { - private final SessionRuntime runtime; - - private UpstreamCallback(SessionRuntime runtime) { - this.runtime = runtime; - } - - @Override - public void onReady() { - if (runtime.closed.get()) { - return; - } - if (!realtimeMeetingSessionStateService.activate(runtime.sessionData.getMeetingId(), runtime.connectionId)) { - runtime.sendError("REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃实时连接,请先关闭旧连接后再继续", false); - closeStream(runtime.connectionId, "active_connection_exists", true); - return; - } - touchConnectionState(runtime); - runtime.send(RealtimeServerPacket.newBuilder() - .setReady(StreamReady.newBuilder() - .setConnectionId(runtime.connectionId) - .setMeetingId(runtime.sessionData.getMeetingId()) - .setServerTime(System.currentTimeMillis()) - .build()) - .build()); - runtime.send(RealtimeServerPacket.newBuilder().setStatus(buildStatusEvent(runtime.sessionData.getMeetingId())).build()); - } - - @Override - public void onTranscript(AsrUpstreamBridgeService.AsrTranscriptResult result) { - if (runtime.closed.get() || result == null || !result.isFinalResult() || result.getText() == null || result.getText().isBlank()) { - return; - } - RealtimeTranscriptItemDTO item = new RealtimeTranscriptItemDTO(); - item.setSpeakerId(result.getSpeakerId()); - item.setSpeakerName(result.getSpeakerName()); - item.setContent(result.getText()); - item.setStartTime(result.getStartTime()); - item.setEndTime(result.getEndTime()); - meetingCommandService.saveRealtimeTranscriptSnapshot(runtime.sessionData.getMeetingId(), item, result.isFinalResult()); - runtime.send(RealtimeServerPacket.newBuilder() - .setTranscript(TranscriptEvent.newBuilder() - .setMeetingId(runtime.sessionData.getMeetingId()) - .setEventSeq(nextEventSeq(runtime.sessionData.getMeetingId())) - .setType(toTranscriptType(result.isFinalResult())) - .setText(result.getText()) - .setSpeakerId(nullToEmpty(result.getSpeakerId())) - .setSpeakerName(nullToEmpty(result.getSpeakerName())) - .setStartTime(result.getStartTime() == null ? 0 : result.getStartTime()) - .setEndTime(result.getEndTime() == null ? 0 : result.getEndTime()) - .build()) - .build()); - } - - @Override - public void onError(String code, String message, boolean retryable) { - if (runtime.closed.get()) { - return; - } - runtime.sendError(code, message, retryable); - closeStream(runtime.connectionId, code == null ? "upstream_error" : code, true); - } - - @Override - public void onClosed(String reason) { - closeStream(runtime.connectionId, reason == null ? "upstream_closed" : reason, true); - } - } - - private static final class SessionRuntime { - private final String connectionId; - private final String streamToken; - private final AndroidRealtimeGrpcSessionData sessionData; - private final StreamObserver responseObserver; - private final AtomicBoolean closed = new AtomicBoolean(false); - private AsrUpstreamBridgeService.AsrUpstreamSession upstreamSession; - - private SessionRuntime(String connectionId, String streamToken, AndroidRealtimeGrpcSessionData sessionData, - StreamObserver responseObserver) { - this.connectionId = connectionId; - this.streamToken = streamToken; - this.sessionData = sessionData; - this.responseObserver = responseObserver; - } - - private void send(RealtimeServerPacket packet) { - try { - synchronized (responseObserver) { - responseObserver.onNext(packet); - } - } catch (Exception ex) { - RealtimeMeetingGrpcSessionServiceImpl.log.debug( - "Ignore downstream realtime gRPC delivery failure, connectionId={}", - connectionId, - ex - ); - } - } - - private void sendError(String code, String message, boolean retryable) { - send(RealtimeServerPacket.newBuilder() - .setError(ErrorEvent.newBuilder() - .setCode(code == null ? "REALTIME_ERROR" : code) - .setMessage(message == null ? "未知错误" : message) - .setRetryable(retryable) - .build()) - .build()); - } - - private void complete() { - try { - responseObserver.onCompleted(); - } catch (Exception ex) { - RealtimeMeetingGrpcSessionServiceImpl.log.debug( - "Ignore downstream realtime gRPC completion failure, connectionId={}", - connectionId, - ex - ); - } - } - } -} diff --git a/backend/src/main/proto/android/common.proto b/backend/src/main/proto/android/common.proto deleted file mode 100644 index 20ae3ac..0000000 --- a/backend/src/main/proto/android/common.proto +++ /dev/null @@ -1,36 +0,0 @@ -syntax = "proto3"; - -package imeeting.android.common; - -option java_multiple_files = true; -option java_package = "com.imeeting.grpc.common"; -option java_outer_classname = "AndroidCommonProto"; - -message ClientAuth { - enum AuthType { - AUTH_TYPE_UNSPECIFIED = 0; - NONE = 1; - DEVICE_TOKEN = 2; - USER_JWT = 3; - STREAM_TOKEN = 4; - } - - AuthType auth_type = 1; - string access_token = 2; - string device_id = 3; - string tenant_code = 4; - string app_id = 5; - string app_version = 6; - string platform = 7; -} - -message ErrorEvent { - string code = 1; - string message = 2; - bool retryable = 3; -} - -message JsonPayload { - string topic = 1; - string json = 2; -} diff --git a/backend/src/main/proto/android/gateway.proto b/backend/src/main/proto/android/gateway.proto deleted file mode 100644 index 10de1b8..0000000 --- a/backend/src/main/proto/android/gateway.proto +++ /dev/null @@ -1,85 +0,0 @@ -syntax = "proto3"; - -package imeeting.android.gateway; - -option java_multiple_files = true; -option java_package = "com.imeeting.grpc.gateway"; -option java_outer_classname = "AndroidGatewayProto"; - -import "android/common.proto"; - -service AndroidGatewayService { - rpc Connect(stream GatewayClientPacket) returns (stream GatewayServerPacket); -} - -message GatewayClientPacket { - string request_id = 1; - imeeting.android.common.ClientAuth auth = 2; - - oneof body { - DeviceHello hello = 10; - Heartbeat heartbeat = 11; - Ack ack = 12; - Subscribe subscribe = 13; - } -} - -message GatewayServerPacket { - string request_id = 1; - - oneof body { - HelloAck hello_ack = 10; - ServerPush push = 11; - DevicePresenceEvent device_presence = 12; - imeeting.android.common.ErrorEvent error = 13; - Pong pong = 14; - } -} - -message DeviceHello { - string device_id = 1; - string device_name = 2; - string device_model = 3; - string os_version = 4; - string app_version = 5; - string network_type = 6; -} - -message HelloAck { - string connection_id = 1; - string auth_mode = 2; - int64 server_time = 3; - int64 heartbeat_interval_seconds = 4; -} - -message Heartbeat { - string connection_id = 1; - int64 client_time = 2; -} - -message Pong { - string connection_id = 1; - int64 server_time = 2; -} - -message Ack { - string message_id = 1; -} - -message Subscribe { - repeated string topics = 1; -} - -message ServerPush { - string message_id = 1; - string topic = 2; - string type = 3; - string json = 4; - int64 server_time = 5; -} - -message DevicePresenceEvent { - string device_id = 1; - string status = 2; - int64 last_seen_time = 3; -} diff --git a/backend/src/main/proto/android/push.proto b/backend/src/main/proto/android/push.proto new file mode 100644 index 0000000..fcc64f3 --- /dev/null +++ b/backend/src/main/proto/android/push.proto @@ -0,0 +1,117 @@ +syntax = "proto3"; + +package imeeting.push.v1; + +option java_multiple_files = true; +option java_package = "com.imeeting.grpc.push"; +option java_outer_classname = "PushProto"; + +// ========================= +// 平台 +// ========================= +enum Platform { + PLATFORM_UNKNOWN = 0; + ANDROID = 1; + IOS = 2; +} + +// ========================= +// 连接请求(流内握手) +// ========================= +message ConnectRequest { + Platform platform = 1; + string app_version = 2; + + string device_id = 3; + string user_id = 4; + string tenant_id = 5; + + string connection_id = 6; +} + +// ========================= +// 连接响应(服务端返回) +// ========================= +message ConnectResponse { + bool success = 1; + string message = 2; +} + +// ========================= +// 推送消息(服务端 → 客户端) +// ========================= +message PushMessage { + string message_id = 1; + int64 timestamp = 2; + + string type = 3; + string title = 4; + string content = 5; + + bool need_ack = 6; +} + +// ========================= +// ACK(客户端 → 服务端) +// ========================= +message AckRequest { + string message_id = 1; + string device_id = 2; + string connection_id = 3; +} + +// ========================= +// 心跳 +// ========================= +message HeartbeatRequest { + string device_id = 1; + string connection_id = 2; + int64 timestamp = 3; +} + +message HeartbeatResponse { + int64 timestamp = 1; + bool ok = 2; +} + +// ========================= +// 错误 +// ========================= +message ErrorEvent { + string code = 1; + string message = 2; + bool retryable = 3; +} + +// ========================= +// 客户端消息封装 +// ========================= +message ClientMessage { + oneof payload { + ConnectRequest connect = 1; // 首包:建立连接 + AckRequest ack = 2; // 消息确认 + HeartbeatRequest heartbeat = 3; // 心跳 + } +} + +// ========================= +// 服务端消息封装 +// ========================= +message ServerMessage { + oneof payload { + ConnectResponse connect_ack = 1; // 连接结果 + PushMessage push = 2; // 推送消息 + HeartbeatResponse heartbeat = 3; // 心跳响应 + ErrorEvent error = 4; // 错误信息 + } +} + +// ========================= +// 单一双向流服务 +// ========================= +service PushService { + + // 唯一通信通道(双向流) + rpc Communicate(stream ClientMessage) + returns (stream ServerMessage); +} \ No newline at end of file diff --git a/backend/src/main/proto/android/realtime_meeting.proto b/backend/src/main/proto/android/realtime_meeting.proto deleted file mode 100644 index 9f6f8ec..0000000 --- a/backend/src/main/proto/android/realtime_meeting.proto +++ /dev/null @@ -1,100 +0,0 @@ -syntax = "proto3"; - -package imeeting.android.realtime; - -option java_multiple_files = true; -option java_package = "com.imeeting.grpc.realtime"; -option java_outer_classname = "RealtimeMeetingProto"; - -import "android/common.proto"; - -service RealtimeMeetingService { - rpc StreamMeetingAudio(stream RealtimeClientPacket) returns (stream RealtimeServerPacket); -} - -message RealtimeClientPacket { - string request_id = 1; - imeeting.android.common.ClientAuth auth = 2; - - oneof body { - OpenMeetingStream open = 10; - AudioChunk audio = 11; - RealtimeControl control = 12; - } -} - -message RealtimeServerPacket { - string request_id = 1; - - oneof body { - StreamReady ready = 10; - TranscriptEvent transcript = 11; - SessionStatusEvent status = 12; - imeeting.android.common.ErrorEvent error = 13; - StreamClosed closed = 14; - } -} - -message OpenMeetingStream { - string stream_token = 1; - int64 meeting_id = 2; - int32 sample_rate = 3; - int32 channels = 4; - string encoding = 5; -} - -message AudioChunk { - bytes pcm16 = 1; - int64 seq = 2; - int64 client_time = 3; - bool last_chunk = 4; -} - -message RealtimeControl { - enum ControlType { - CONTROL_TYPE_UNSPECIFIED = 0; - START = 1; - STOP_SPEAKING = 2; - END_INPUT = 3; - CLOSE_STREAM = 4; - } - - ControlType type = 1; -} - -message StreamReady { - string connection_id = 1; - int64 meeting_id = 2; - int64 server_time = 3; -} - -message TranscriptEvent { - enum TranscriptType { - TRANSCRIPT_TYPE_UNSPECIFIED = 0; - PARTIAL = 1; - FINAL = 2; - } - - TranscriptType type = 1; - int64 meeting_id = 2; - int64 event_seq = 3; - string text = 4; - string speaker_id = 5; - string speaker_name = 6; - int32 start_time = 7; - int32 end_time = 8; -} - -message SessionStatusEvent { - int64 meeting_id = 1; - string status = 2; - bool has_transcript = 3; - bool can_resume = 4; - int64 remaining_seconds = 5; - bool active_connection = 6; -} - -message StreamClosed { - int64 meeting_id = 1; - string reason = 2; -} diff --git a/backend/src/test/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcServiceTest.java b/backend/src/test/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcServiceTest.java deleted file mode 100644 index 040d3d3..0000000 --- a/backend/src/test/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcServiceTest.java +++ /dev/null @@ -1,110 +0,0 @@ -package com.imeeting.grpc.realtime; - -import com.imeeting.dto.android.AndroidAuthContext; -import com.imeeting.grpc.common.ClientAuth; -import com.imeeting.service.android.AndroidAuthService; -import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService; -import io.grpc.stub.StreamObserver; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -class RealtimeMeetingGrpcServiceTest { - - @Test - void streamMeetingAudioShouldReturnErrorEventWhenAuthenticationFails() { - AndroidAuthService authService = mock(AndroidAuthService.class); - RealtimeMeetingGrpcSessionService sessionService = mock(RealtimeMeetingGrpcSessionService.class); - RealtimeMeetingGrpcService service = new RealtimeMeetingGrpcService(authService, sessionService); - CapturingObserver responseObserver = new CapturingObserver(); - - when(authService.authenticateGrpc(any(ClientAuth.class), isNull())) - .thenThrow(new RuntimeException("缺少 Android deviceId")); - - StreamObserver requestObserver = service.streamMeetingAudio(responseObserver); - RealtimeClientPacket openPacket = RealtimeClientPacket.newBuilder() - .setRequestId("rt-open-001") - .setOpen(OpenMeetingStream.newBuilder().setMeetingId(1001L).build()) - .build(); - - assertDoesNotThrow(() -> requestObserver.onNext(openPacket)); - - assertEquals(1, responseObserver.values.size()); - RealtimeServerPacket errorPacket = responseObserver.values.get(0); - assertEquals("rt-open-001", errorPacket.getRequestId()); - assertTrue(errorPacket.hasError()); - assertEquals("REALTIME_GRPC_ERROR", errorPacket.getError().getCode()); - assertEquals("缺少 Android deviceId", errorPacket.getError().getMessage()); - assertTrue(responseObserver.completed); - assertNull(responseObserver.error); - verify(sessionService, never()).closeStream(anyString(), anyString(), anyBoolean()); - } - - @Test - void streamMeetingAudioShouldReturnErrorEventWhenSessionOpenFails() { - AndroidAuthService authService = mock(AndroidAuthService.class); - RealtimeMeetingGrpcSessionService sessionService = mock(RealtimeMeetingGrpcSessionService.class); - RealtimeMeetingGrpcService service = new RealtimeMeetingGrpcService(authService, sessionService); - CapturingObserver responseObserver = new CapturingObserver(); - AndroidAuthContext authContext = new AndroidAuthContext(); - authContext.setDeviceId("android-test-001"); - - when(authService.authenticateGrpc(any(ClientAuth.class), isNull())).thenReturn(authContext); - when(sessionService.openStream(1001L, "", authContext, responseObserver)) - .thenThrow(new RuntimeException("ASR 模型未配置 WebSocket 地址")); - - StreamObserver requestObserver = service.streamMeetingAudio(responseObserver); - RealtimeClientPacket openPacket = RealtimeClientPacket.newBuilder() - .setRequestId("rt-open-002") - .setAuth(ClientAuth.newBuilder().setDeviceId("android-test-001").build()) - .setOpen(OpenMeetingStream.newBuilder().setMeetingId(1001L).build()) - .build(); - - assertDoesNotThrow(() -> requestObserver.onNext(openPacket)); - - assertEquals(1, responseObserver.values.size()); - RealtimeServerPacket errorPacket = responseObserver.values.get(0); - assertEquals("rt-open-002", errorPacket.getRequestId()); - assertTrue(errorPacket.hasError()); - assertEquals("REALTIME_GRPC_ERROR", errorPacket.getError().getCode()); - assertEquals("ASR 模型未配置 WebSocket 地址", errorPacket.getError().getMessage()); - assertTrue(responseObserver.completed); - assertNull(responseObserver.error); - verify(sessionService, never()).closeStream(anyString(), anyString(), anyBoolean()); - } - - private static final class CapturingObserver implements StreamObserver { - private final List values = new ArrayList<>(); - private Throwable error; - private boolean completed; - - @Override - public void onNext(RealtimeServerPacket value) { - values.add(value); - } - - @Override - public void onError(Throwable t) { - error = t; - } - - @Override - public void onCompleted() { - completed = true; - } - } -} \ No newline at end of file diff --git a/backend/src/test/java/com/imeeting/manual/AndroidRealtimeGrpcManualTest.java b/backend/src/test/java/com/imeeting/manual/AndroidRealtimeGrpcManualTest.java deleted file mode 100644 index 59f773b..0000000 --- a/backend/src/test/java/com/imeeting/manual/AndroidRealtimeGrpcManualTest.java +++ /dev/null @@ -1,430 +0,0 @@ -package com.imeeting.manual; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.protobuf.ByteString; -import com.imeeting.grpc.common.ClientAuth; -import com.imeeting.grpc.realtime.AudioChunk; -import com.imeeting.grpc.realtime.OpenMeetingStream; -import com.imeeting.grpc.realtime.RealtimeClientPacket; -import com.imeeting.grpc.realtime.RealtimeControl; -import com.imeeting.grpc.realtime.RealtimeMeetingServiceGrpc; -import com.imeeting.grpc.realtime.RealtimeServerPacket; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.stub.StreamObserver; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import java.io.InputStream; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -@Disabled("Manual realtime integration test; requires a running local backend, gRPC service, and PCM fixture.") -public class AndroidRealtimeGrpcManualTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - @Test - void shouldRunAndroidRealtimeFlow() throws Exception { - TestConfig config = TestConfig.load(); - System.out.println("[manual-test] config=" + config); - - CreatedMeetingInfo meetingInfo = createRealtimeMeeting(config); - System.out.println("[manual-test] created meetingId=" + meetingInfo.meetingId()); - - ManualRealtimeClient client = new ManualRealtimeClient(config, meetingInfo); - try { - client.open(); - assertTrue(client.awaitReady(config.readyTimeoutSeconds(), TimeUnit.SECONDS), - "Did not receive StreamReady within timeout"); - - client.streamPcmFile(config.pcmFile(), config.chunkMs()); - client.sendStopSpeaking(); - - if (config.afterStopWaitSeconds() > 0) { - TimeUnit.SECONDS.sleep(config.afterStopWaitSeconds()); - } - - client.sendCloseStream(); - client.completeClientStream(); - client.awaitClosed(Math.max(5, config.afterStopWaitSeconds()), TimeUnit.SECONDS); - - assertTrue(client.isReadyReceived(), "StreamReady was not received"); - assertTrue(client.getErrorMessage() == null, "Realtime stream returned error: " + client.getErrorMessage()); - if (config.requireTranscript()) { - assertTrue(client.getTranscriptCount() > 0, - "No transcript events received. Check ASR config, PCM format, and upstream websocket connectivity."); - } - } finally { - client.shutdown(); - } - - if (config.beforeCompleteWaitSeconds() > 0) { - TimeUnit.SECONDS.sleep(config.beforeCompleteWaitSeconds()); - } - - completeRealtimeMeeting(config, meetingInfo.meetingId()); - - if (config.afterCompleteWaitSeconds() > 0) { - TimeUnit.SECONDS.sleep(config.afterCompleteWaitSeconds()); - } - - JsonNode transcripts = queryTranscripts(config, meetingInfo.meetingId()); - if (config.requireTranscript()) { - assertTrue(transcripts.isArray() && transcripts.size() > 0, - "No persisted transcripts found after complete. meetingId=" + meetingInfo.meetingId()); - } - } - - private CreatedMeetingInfo createRealtimeMeeting(TestConfig config) throws Exception { - HttpClient httpClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(10)) - .build(); - - Map body = new LinkedHashMap<>(); - body.put("title", config.meetingTitle()); - if (config.tags() != null && !config.tags().isBlank()) { - body.put("tags", config.tags()); - } - - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(config.baseUrl() + "/api/android/meeting/realtime/create")) - .timeout(Duration.ofSeconds(20)) - .header("Content-Type", "application/json") - .header("X-Android-Device-Id", config.deviceId()) - .header("X-Android-Platform", "android") - .POST(HttpRequest.BodyPublishers.ofString(OBJECT_MAPPER.writeValueAsString(body), StandardCharsets.UTF_8)) - .build(); - - HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - assertTrue(response.statusCode() >= 200 && response.statusCode() < 300, - "Create realtime meeting failed, httpStatus=" + response.statusCode() + ", body=" + response.body()); - - JsonNode data = readData(response.body()); - long meetingId = data.path("meetingId").asLong(0L); - assertTrue(meetingId > 0, "Invalid meetingId in response: " + response.body()); - - int sampleRate = data.path("sampleRate").asInt(16000); - int channels = data.path("channels").asInt(1); - String encoding = text(data, "encoding"); - assertNotNull(encoding, "encoding is null in response: " + response.body()); - - return new CreatedMeetingInfo(meetingId, sampleRate, channels, encoding); - } - - private void completeRealtimeMeeting(TestConfig config, long meetingId) throws Exception { - HttpClient httpClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(10)) - .build(); - - Map body = new LinkedHashMap<>(); - body.put("overwriteAudio", config.overwriteAudio()); - if (config.completeAudioUrl() != null && !config.completeAudioUrl().isBlank()) { - body.put("audioUrl", config.completeAudioUrl()); - } - - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(config.baseUrl() + "/api/android/meeting/" + meetingId + "/realtime/complete")) - .timeout(Duration.ofSeconds(20)) - .header("Content-Type", "application/json") - .header("X-Android-Device-Id", config.deviceId()) - .header("X-Android-Platform", "android") - .POST(HttpRequest.BodyPublishers.ofString(OBJECT_MAPPER.writeValueAsString(body), StandardCharsets.UTF_8)) - .build(); - - HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - assertTrue(response.statusCode() >= 200 && response.statusCode() < 300, - "Complete realtime meeting failed, httpStatus=" + response.statusCode() + ", body=" + response.body()); - - JsonNode data = readData(response.body()); - assertTrue(data.asBoolean(false), "Complete realtime meeting did not return true: " + response.body()); - } - - private JsonNode queryTranscripts(TestConfig config, long meetingId) throws Exception { - HttpClient httpClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(10)) - .build(); - - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(config.baseUrl() + "/api/android/meeting/" + meetingId + "/transcripts")) - .timeout(Duration.ofSeconds(20)) - .header("X-Android-Device-Id", config.deviceId()) - .header("X-Android-Platform", "android") - .GET() - .build(); - - HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - assertTrue(response.statusCode() >= 200 && response.statusCode() < 300, - "Query transcripts failed, httpStatus=" + response.statusCode() + ", body=" + response.body()); - return readData(response.body()); - } - - private JsonNode readData(String responseBody) throws Exception { - JsonNode root = OBJECT_MAPPER.readTree(responseBody); - JsonNode data = root.path("data"); - assertTrue(!data.isMissingNode() && !data.isNull(), "Invalid REST response, missing data: " + responseBody); - return data; - } - - private String text(JsonNode node, String fieldName) { - JsonNode value = node.path(fieldName); - return value.isMissingNode() || value.isNull() ? null : value.asText(); - } - - private record CreatedMeetingInfo(long meetingId, int sampleRate, int channels, String encoding) { - } - - private record TestConfig(String baseUrl, - String grpcHost, - int grpcPort, - String deviceId, - String meetingTitle, - String tags, - Path pcmFile, - int chunkMs, - long readyTimeoutSeconds, - long afterStopWaitSeconds, - long beforeCompleteWaitSeconds, - long afterCompleteWaitSeconds, - boolean overwriteAudio, - String completeAudioUrl, - boolean requireTranscript) { - - private static final String DEFAULT_BASE_URL = "http://127.0.0.1:8081"; - private static final String DEFAULT_GRPC_HOST = "127.0.0.1"; - private static final int DEFAULT_GRPC_PORT = 19090; - private static final String DEFAULT_DEVICE_ID = "android-local-test-001"; - private static final String DEFAULT_MEETING_TITLE = "android-realtime-manual-test"; - private static final String DEFAULT_TAGS = "android,grpc,manual"; - private static final String DEFAULT_PCM_FILE = "C:\\Users\\85206\\Downloads\\no_recoder_audio.pcm"; - private static final int DEFAULT_CHUNK_MS = 40; - private static final long DEFAULT_READY_TIMEOUT_SECONDS = 15L; - private static final long DEFAULT_AFTER_STOP_WAIT_SECONDS = 8L; - private static final long DEFAULT_BEFORE_COMPLETE_WAIT_SECONDS = 3L; - private static final long DEFAULT_AFTER_COMPLETE_WAIT_SECONDS = 1L; - private static final boolean DEFAULT_OVERWRITE_AUDIO = false; - private static final String DEFAULT_COMPLETE_AUDIO_URL = ""; - private static final boolean DEFAULT_REQUIRE_TRANSCRIPT = true; - - private static TestConfig load() { - String baseUrl = stringProperty("manualRealtime.baseUrl", DEFAULT_BASE_URL); - String grpcHost = stringProperty("manualRealtime.grpcHost", DEFAULT_GRPC_HOST); - int grpcPort = intProperty("manualRealtime.grpcPort", DEFAULT_GRPC_PORT); - String deviceId = stringProperty("manualRealtime.deviceId", DEFAULT_DEVICE_ID); - String meetingTitle = stringProperty("manualRealtime.meetingTitle", DEFAULT_MEETING_TITLE); - String tags = stringProperty("manualRealtime.tags", DEFAULT_TAGS); - Path pcmFile = Path.of(stringProperty("manualRealtime.pcmFile", DEFAULT_PCM_FILE)); - int chunkMs = intProperty("manualRealtime.chunkMs", DEFAULT_CHUNK_MS); - long readyTimeoutSeconds = longProperty("manualRealtime.readyTimeoutSeconds", DEFAULT_READY_TIMEOUT_SECONDS); - long afterStopWaitSeconds = longProperty("manualRealtime.afterStopWaitSeconds", DEFAULT_AFTER_STOP_WAIT_SECONDS); - long beforeCompleteWaitSeconds = longProperty("manualRealtime.beforeCompleteWaitSeconds", DEFAULT_BEFORE_COMPLETE_WAIT_SECONDS); - long afterCompleteWaitSeconds = longProperty("manualRealtime.afterCompleteWaitSeconds", DEFAULT_AFTER_COMPLETE_WAIT_SECONDS); - boolean overwriteAudio = booleanProperty("manualRealtime.overwriteAudio", DEFAULT_OVERWRITE_AUDIO); - String completeAudioUrl = stringProperty("manualRealtime.completeAudioUrl", DEFAULT_COMPLETE_AUDIO_URL); - boolean requireTranscript = booleanProperty("manualRealtime.requireTranscript", DEFAULT_REQUIRE_TRANSCRIPT); - - assertTrue(Files.exists(pcmFile), - "PCM file does not exist: " + pcmFile + ". Please set -DmanualRealtime.pcmFile=."); - assertTrue(Files.isRegularFile(pcmFile), "PCM file is not a regular file: " + pcmFile); - - return new TestConfig(baseUrl, grpcHost, grpcPort, deviceId, meetingTitle, tags, pcmFile, - chunkMs, readyTimeoutSeconds, afterStopWaitSeconds, - beforeCompleteWaitSeconds, afterCompleteWaitSeconds, overwriteAudio, completeAudioUrl, requireTranscript); - } - - private static String stringProperty(String key, String defaultValue) { - String value = System.getProperty(key); - return value == null || value.isBlank() ? defaultValue : value.trim(); - } - - private static int intProperty(String key, int defaultValue) { - return Integer.parseInt(stringProperty(key, Integer.toString(defaultValue))); - } - - private static long longProperty(String key, long defaultValue) { - return Long.parseLong(stringProperty(key, Long.toString(defaultValue))); - } - - private static boolean booleanProperty(String key, boolean defaultValue) { - return Boolean.parseBoolean(stringProperty(key, Boolean.toString(defaultValue))); - } - } - - private static final class ManualRealtimeClient { - private final TestConfig config; - private final CreatedMeetingInfo meetingInfo; - private final CountDownLatch readyLatch = new CountDownLatch(1); - private final CountDownLatch closedLatch = new CountDownLatch(1); - private final AtomicInteger transcriptCount = new AtomicInteger(); - private final AtomicReference errorMessage = new AtomicReference<>(); - private final ManagedChannel channel; - private final StreamObserver requestObserver; - private volatile boolean readyReceived; - - private ManualRealtimeClient(TestConfig config, CreatedMeetingInfo meetingInfo) { - this.config = config; - this.meetingInfo = meetingInfo; - this.channel = ManagedChannelBuilder.forAddress(config.grpcHost(), config.grpcPort()) - .usePlaintext() - .build(); - this.requestObserver = RealtimeMeetingServiceGrpc.newStub(channel) - .streamMeetingAudio(new StreamObserver<>() { - @Override - public void onNext(RealtimeServerPacket packet) { - switch (packet.getBodyCase()) { - case READY -> { - readyReceived = true; - readyLatch.countDown(); - System.out.println("[manual-test] READY connectionId=" + packet.getReady().getConnectionId()); - } - case STATUS -> System.out.println("[manual-test] STATUS status=" + packet.getStatus().getStatus() - + ", active=" + packet.getStatus().getActiveConnection()); - case TRANSCRIPT -> { - transcriptCount.incrementAndGet(); - System.out.println("[manual-test] TRANSCRIPT type=" + packet.getTranscript().getType() - + ", text=" + packet.getTranscript().getText()); - } - case ERROR -> { - String message = packet.getError().getCode() + ": " + packet.getError().getMessage(); - errorMessage.compareAndSet(null, message); - System.out.println("[manual-test] ERROR " + message); - } - case CLOSED -> { - System.out.println("[manual-test] CLOSED reason=" + packet.getClosed().getReason()); - closedLatch.countDown(); - } - case BODY_NOT_SET -> System.out.println("[manual-test] EMPTY packet received"); - } - } - - @Override - public void onError(Throwable throwable) { - errorMessage.compareAndSet(null, throwable.getMessage()); - readyLatch.countDown(); - closedLatch.countDown(); - System.out.println("[manual-test] gRPC onError: " + throwable.getMessage()); - } - - @Override - public void onCompleted() { - closedLatch.countDown(); - System.out.println("[manual-test] gRPC onCompleted"); - } - }); - } - - private void open() { - ClientAuth auth = ClientAuth.newBuilder() - .setAuthType(ClientAuth.AuthType.NONE) - .setDeviceId(config.deviceId()) - .setPlatform("android") - .setAppVersion("manual-test") - .build(); - - OpenMeetingStream open = OpenMeetingStream.newBuilder() - .setStreamToken("") - .setMeetingId(meetingInfo.meetingId()) - .setSampleRate(meetingInfo.sampleRate()) - .setChannels(meetingInfo.channels()) - .setEncoding(meetingInfo.encoding()) - .build(); - - requestObserver.onNext(RealtimeClientPacket.newBuilder() - .setRequestId("manual-open") - .setAuth(auth) - .setOpen(open) - .build()); - } - - private boolean awaitReady(long timeout, TimeUnit timeUnit) throws InterruptedException { - return readyLatch.await(timeout, timeUnit); - } - - private void streamPcmFile(Path pcmFile, int chunkMs) throws Exception { - int bytesPerSample = 2; - int chunkSize = meetingInfo.sampleRate() * meetingInfo.channels() * bytesPerSample * chunkMs / 1000; - byte[] buffer = new byte[chunkSize]; - long seq = 1L; - - try (InputStream inputStream = Files.newInputStream(pcmFile)) { - while (true) { - int read = inputStream.read(buffer); - if (read <= 0) { - break; - } - AudioChunk audioChunk = AudioChunk.newBuilder() - .setPcm16(ByteString.copyFrom(buffer, 0, read)) - .setSeq(seq) - .setClientTime(System.currentTimeMillis()) - .setLastChunk(false) - .build(); - requestObserver.onNext(RealtimeClientPacket.newBuilder() - .setRequestId("manual-audio-" + seq) - .setAudio(audioChunk) - .build()); - seq++; - Thread.sleep(chunkMs); - } - } - } - - private void sendStopSpeaking() { - requestObserver.onNext(RealtimeClientPacket.newBuilder() - .setRequestId("manual-stop-speaking") - .setControl(RealtimeControl.newBuilder() - .setType(RealtimeControl.ControlType.STOP_SPEAKING) - .build()) - .build()); - } - - private void sendCloseStream() { - requestObserver.onNext(RealtimeClientPacket.newBuilder() - .setRequestId("manual-close-stream") - .setControl(RealtimeControl.newBuilder() - .setType(RealtimeControl.ControlType.CLOSE_STREAM) - .build()) - .build()); - } - - private void completeClientStream() { - requestObserver.onCompleted(); - } - - private boolean awaitClosed(long timeout, TimeUnit timeUnit) throws InterruptedException { - return closedLatch.await(timeout, timeUnit); - } - - private boolean isReadyReceived() { - return readyReceived; - } - - private int getTranscriptCount() { - return transcriptCount.get(); - } - - private String getErrorMessage() { - return errorMessage.get(); - } - - private void shutdown() throws InterruptedException { - channel.shutdownNow(); - channel.awaitTermination(5, TimeUnit.SECONDS); - } - } -} diff --git a/backend/src/test/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImplTest.java b/backend/src/test/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImplTest.java deleted file mode 100644 index 6d0d546..0000000 --- a/backend/src/test/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImplTest.java +++ /dev/null @@ -1,123 +0,0 @@ -package com.imeeting.service.realtime.impl; - -import com.imeeting.config.grpc.GrpcServerProperties; -import com.imeeting.dto.android.AndroidAuthContext; -import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; -import com.imeeting.grpc.realtime.RealtimeServerPacket; -import com.imeeting.grpc.realtime.TranscriptEvent; -import com.imeeting.service.biz.MeetingCommandService; -import com.imeeting.service.biz.RealtimeMeetingSessionStateService; -import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService; -import com.imeeting.service.realtime.AsrUpstreamBridgeService; -import com.imeeting.service.realtime.RealtimeMeetingAudioStorageService; -import io.grpc.stub.StreamObserver; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.data.redis.core.ValueOperations; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -class RealtimeMeetingGrpcSessionServiceImplTest { - - @Test - void openStreamShouldOnlyForwardAndPersistFinalTranscript() { - AndroidRealtimeSessionTicketService ticketService = mock(AndroidRealtimeSessionTicketService.class); - AsrUpstreamBridgeService asrUpstreamBridgeService = mock(AsrUpstreamBridgeService.class); - MeetingCommandService meetingCommandService = mock(MeetingCommandService.class); - RealtimeMeetingSessionStateService sessionStateService = mock(RealtimeMeetingSessionStateService.class); - RealtimeMeetingAudioStorageService audioStorageService = mock(RealtimeMeetingAudioStorageService.class); - StringRedisTemplate redisTemplate = mock(StringRedisTemplate.class); - @SuppressWarnings("unchecked") - ValueOperations valueOperations = mock(ValueOperations.class); - when(redisTemplate.opsForValue()).thenReturn(valueOperations); - - GrpcServerProperties grpcServerProperties = new GrpcServerProperties(); - AndroidRealtimeGrpcSessionData sessionData = new AndroidRealtimeGrpcSessionData(); - sessionData.setMeetingId(1001L); - sessionData.setDeviceId("android-test-001"); - sessionData.setTargetWsUrl("ws://localhost/mock-asr"); - when(ticketService.prepareSessionData(eq(1001L), any(AndroidAuthContext.class))).thenReturn(sessionData); - - AsrUpstreamBridgeService.AsrUpstreamSession upstreamSession = mock(AsrUpstreamBridgeService.AsrUpstreamSession.class); - ArgumentCaptor listenerCaptor = - ArgumentCaptor.forClass(AsrUpstreamBridgeService.AsrUpstreamEventListener.class); - when(asrUpstreamBridgeService.openSession(eq(sessionData), anyString(), listenerCaptor.capture())) - .thenReturn(upstreamSession); - - RealtimeMeetingGrpcSessionServiceImpl service = new RealtimeMeetingGrpcSessionServiceImpl( - ticketService, - asrUpstreamBridgeService, - meetingCommandService, - sessionStateService, - audioStorageService, - redisTemplate, - grpcServerProperties - ); - - CapturingObserver responseObserver = new CapturingObserver(); - AndroidAuthContext authContext = new AndroidAuthContext(); - authContext.setDeviceId("android-test-001"); - - service.openStream(1001L, "", authContext, responseObserver); - - AsrUpstreamBridgeService.AsrUpstreamEventListener listener = listenerCaptor.getValue(); - listener.onTranscript(new AsrUpstreamBridgeService.AsrTranscriptResult( - false, - "partial transcript", - "spk-1", - "Speaker 1", - 100, - 500 - )); - - assertTrue(responseObserver.values.isEmpty()); - verify(meetingCommandService, never()).saveRealtimeTranscriptSnapshot(anyLong(), any(), eq(false)); - - listener.onTranscript(new AsrUpstreamBridgeService.AsrTranscriptResult( - true, - "final transcript", - "spk-1", - "Speaker 1", - 100, - 800 - )); - - assertEquals(1, responseObserver.values.size()); - RealtimeServerPacket packet = responseObserver.values.get(0); - assertTrue(packet.hasTranscript()); - assertEquals(TranscriptEvent.TranscriptType.FINAL, packet.getTranscript().getType()); - assertEquals("final transcript", packet.getTranscript().getText()); - verify(meetingCommandService, times(1)).saveRealtimeTranscriptSnapshot(eq(1001L), any(), eq(true)); - } - - private static final class CapturingObserver implements StreamObserver { - private final List values = new ArrayList<>(); - - @Override - public void onNext(RealtimeServerPacket value) { - values.add(value); - } - - @Override - public void onError(Throwable t) { - } - - @Override - public void onCompleted() { - } - } -} diff --git a/frontend/src/api/index.ts b/frontend/src/api/index.ts index ea8f8aa..cdb905a 100644 --- a/frontend/src/api/index.ts +++ b/frontend/src/api/index.ts @@ -154,6 +154,21 @@ export async function deleteDevice(id: number) { return resp.data.data as boolean; } +export async function listManagedDevices() { + const resp = await http.get("/api/admin/devices"); + return resp.data.data as DeviceInfo[]; +} + +export async function updateManagedDevice(id: number, payload: Partial) { + const resp = await http.put(`/api/admin/devices/${id}`, payload); + return resp.data.data as DeviceInfo; +} + +export async function kickManagedDevice(id: number) { + const resp = await http.post(`/api/admin/devices/${id}/kick`); + return resp.data.data as boolean; +} + export async function listUserRoles(userId: number) { const resp = await http.get(`/sys/api/users/${userId}/roles`); return resp.data.data as number[]; diff --git a/frontend/src/locales/en-US.json b/frontend/src/locales/en-US.json index d2f2985..89ac721 100644 --- a/frontend/src/locales/en-US.json +++ b/frontend/src/locales/en-US.json @@ -325,12 +325,28 @@ "searchLabel": "Search devices", "newDevice": "New Device", "device": "Device", + "totalDevices": "Total Devices", + "totalDevicesHint": "Registered terminals in the device registry", + "onlineDevices": "Online Devices", + "onlineDevicesHint": "Currently connected through gRPC", + "enabledDevices": "Enabled Devices", + "enabledDevicesHint": "Allowed to access Android APIs", "unnamedDevice": "Unnamed device", + "unboundAccount": "Unbound", "ownerId": "ID", + "terminalType": "Terminal Type", + "terminalVersion": "Terminal Version", + "onlineStatus": "Online Status", + "online": "Online", + "offline": "Offline", + "lastOnlineAt": "Last Online", "enabled": "Enabled", "disabled": "Disabled", "updatedAt": "Updated At", "editDevice": "Edit device", + "kickDevice": "Kick device", + "kickDeviceConfirm": "Kick this device offline?", + "kickSucceeded": "Device has been kicked offline", "deleteDevice": "Delete this device?", "drawerTitleCreate": "New Device", "drawerTitleEdit": "Edit Device", diff --git a/frontend/src/locales/zh-CN.json b/frontend/src/locales/zh-CN.json index 03fad42..a390822 100644 --- a/frontend/src/locales/zh-CN.json +++ b/frontend/src/locales/zh-CN.json @@ -325,12 +325,28 @@ "searchLabel": "搜索设备", "newDevice": "新建设备", "device": "设备", + "totalDevices": "设备总数", + "totalDevicesHint": "已登记到设备管理中的终端数量", + "onlineDevices": "在线数量", + "onlineDevicesHint": "当前通过 gRPC 保持连接的设备", + "enabledDevices": "启用设备", + "enabledDevicesHint": "允许访问安卓接口的设备数量", "unnamedDevice": "未命名设备", + "unboundAccount": "未绑定", "ownerId": "ID", + "terminalType": "终端类型", + "terminalVersion": "终端版本", + "onlineStatus": "在线状态", + "online": "在线", + "offline": "离线", + "lastOnlineAt": "最后在线时间", "enabled": "启用", "disabled": "禁用", "updatedAt": "更新时间", "editDevice": "编辑设备", + "kickDevice": "踢下线", + "kickDeviceConfirm": "确认踢该设备下线?", + "kickSucceeded": "设备已踢下线", "deleteDevice": "确认删除该设备?", "drawerTitleCreate": "新建设备", "drawerTitleEdit": "编辑设备", diff --git a/frontend/src/pages/devices/index.less b/frontend/src/pages/devices/index.less index 619dd7d..a03cc4b 100644 --- a/frontend/src/pages/devices/index.less +++ b/frontend/src/pages/devices/index.less @@ -2,6 +2,109 @@ padding: 24px; } +.devices-metrics { + margin-bottom: 16px; +} + +.devices-metric-card { + position: relative; + overflow: hidden; + border-radius: 18px; + box-shadow: 0 14px 36px rgba(16, 24, 40, 0.08); + min-height: 144px; + + .ant-card-body { + position: relative; + z-index: 1; + display: flex; + align-items: center; + gap: 16px; + padding: 22px 24px; + } +} + +.devices-metric-card::before { + content: ""; + position: absolute; + inset: 0; + opacity: 1; +} + +.devices-metric-card::after { + content: ""; + position: absolute; + right: -28px; + top: -28px; + width: 120px; + height: 120px; + border-radius: 999px; + background: rgba(255, 255, 255, 0.3); +} + +.devices-metric-card--total::before { + background: linear-gradient(135deg, #f7f9ff 0%, #eef4ff 100%); +} + +.devices-metric-card--online::before { + background: linear-gradient(135deg, #f0fff8 0%, #dcfce7 100%); +} + +.devices-metric-card--enabled::before { + background: linear-gradient(135deg, #fff8ed 0%, #ffedd5 100%); +} + +.devices-metric-card__icon { + width: 54px; + height: 54px; + border-radius: 16px; + display: flex; + align-items: center; + justify-content: center; + font-size: 24px; + flex: 0 0 auto; +} + +.devices-metric-card--total .devices-metric-card__icon { + background: rgba(49, 110, 255, 0.12); + color: #275df5; +} + +.devices-metric-card--online .devices-metric-card__icon { + background: rgba(22, 163, 74, 0.12); + color: #15803d; +} + +.devices-metric-card--enabled .devices-metric-card__icon { + background: rgba(234, 88, 12, 0.12); + color: #c2410c; +} + +.devices-metric-card__content { + min-width: 0; +} + +.devices-metric-card__label { + font-size: 13px; + font-weight: 600; + letter-spacing: 0.04em; + text-transform: uppercase; + color: rgba(15, 23, 42, 0.62); + margin-bottom: 6px; +} + +.devices-metric-card__value { + font-size: 34px; + line-height: 1; + font-weight: 700; + color: #0f172a; + margin-bottom: 10px; +} + +.devices-metric-card__hint { + font-size: 13px; + color: rgba(15, 23, 42, 0.68); +} + .devices-header { display: flex; justify-content: space-between; @@ -14,7 +117,8 @@ } .devices-table-card { - border-radius: 8px; + border-radius: 18px; + box-shadow: 0 12px 28px rgba(15, 23, 42, 0.06); } .devices-table-toolbar { @@ -28,12 +132,12 @@ .device-icon-placeholder { width: 40px; height: 40px; - background-color: #f0f5ff; - border-radius: 8px; + background: linear-gradient(135deg, #f1f5ff 0%, #e0e7ff 100%); + border-radius: 12px; display: flex; align-items: center; justify-content: center; - color: #1890ff; + color: #315efb; font-size: 20px; } @@ -57,3 +161,21 @@ .tabular-nums { font-variant-numeric: tabular-nums; } + +@media (max-width: 768px) { + .devices-page { + padding: 16px; + } + + .devices-metric-card { + min-height: auto; + } + + .devices-metric-card .ant-card-body { + padding: 18px; + } + + .devices-metric-card__value { + font-size: 30px; + } +} diff --git a/frontend/src/pages/devices/index.tsx b/frontend/src/pages/devices/index.tsx index d1a44e1..018884e 100644 --- a/frontend/src/pages/devices/index.tsx +++ b/frontend/src/pages/devices/index.tsx @@ -1,20 +1,18 @@ -import { Button, Card, Drawer, Form, Input, Popconfirm, Select, Space, Table, Tag, Typography, message } from "antd"; -import { DeleteOutlined, DesktopOutlined, EditOutlined, PlusOutlined, SearchOutlined, UserOutlined } from "@ant-design/icons"; +import { Button, Card, Col, Drawer, Form, Input, Popconfirm, Row, Select, Space, Table, Tag, Typography, message } from "antd"; +import { CheckCircleOutlined, DesktopOutlined, DisconnectOutlined, EditOutlined, SearchOutlined, ThunderboltOutlined, UserOutlined } from "@ant-design/icons"; import { useEffect, useMemo, useState } from "react"; import { useTranslation } from "react-i18next"; -import { createDevice, deleteDevice, listDevices, listUsers, updateDevice } from "@/api"; +import { kickManagedDevice, listManagedDevices, updateManagedDevice } from "@/api"; import PageHeader from "@/components/shared/PageHeader"; import { useDict } from "@/hooks/useDict"; import { usePermission } from "@/hooks/usePermission"; -import type { DeviceInfo, SysUser } from "@/types"; +import type { DeviceInfo } from "@/types"; import { getStandardPagination } from "@/utils/pagination"; import "./index.less"; const { Text } = Typography; type DeviceFormValues = { - userId: number; - deviceCode: string; deviceName?: string; status: number; }; @@ -26,14 +24,7 @@ export default function Devices() { const [loading, setLoading] = useState(false); const [saving, setSaving] = useState(false); const [searchText, setSearchText] = useState(""); - - const handleSearch = () => {}; - - const handleResetSearch = () => { - setSearchText(""); - }; const [devices, setDevices] = useState([]); - const [users, setUsers] = useState([]); const [open, setOpen] = useState(false); const [editing, setEditing] = useState(null); const [form] = Form.useForm(); @@ -41,9 +32,8 @@ export default function Devices() { const loadData = async () => { setLoading(true); try { - const [deviceList, userList] = await Promise.all([listDevices(), listUsers()]); + const deviceList = await listManagedDevices(); setDevices(deviceList || []); - setUsers(userList || []); } finally { setLoading(false); } @@ -53,42 +43,35 @@ export default function Devices() { loadData(); }, []); - const userMap = useMemo(() => { - const map: Record = {}; - users.forEach((user) => { - map[user.userId] = user; - }); - return map; - }, [users]); - const filteredData = useMemo(() => { if (!searchText) { return devices; } const lower = searchText.toLowerCase(); return devices.filter((device) => { - const owner = userMap[device.userId]; + const ownerName = device.displayName || ""; + const ownerUsername = device.username || ""; return ( device.deviceCode.toLowerCase().includes(lower) || (device.deviceName || "").toLowerCase().includes(lower) || - (owner?.displayName || "").toLowerCase().includes(lower) || - String(device.userId).includes(lower) + (device.terminalType || "").toLowerCase().includes(lower) || + (device.terminalVersion || "").toLowerCase().includes(lower) || + ownerName.toLowerCase().includes(lower) || + ownerUsername.toLowerCase().includes(lower) ); }); - }, [devices, searchText, userMap]); + }, [devices, searchText]); - const openCreate = () => { - setEditing(null); - form.resetFields(); - form.setFieldsValue({ status: 1 }); - setOpen(true); - }; + const stats = useMemo(() => { + const total = devices.length; + const online = devices.filter((device) => device.online).length; + const enabled = devices.filter((device) => device.status === 1).length; + return { total, online, enabled }; + }, [devices]); const openEdit = (record: DeviceInfo) => { setEditing(record); form.setFieldsValue({ - userId: record.userId, - deviceCode: record.deviceCode, deviceName: record.deviceName, status: record.status ?? 1 }); @@ -96,22 +79,16 @@ export default function Devices() { }; const submit = async () => { + if (!editing) { + return; + } const values = await form.validateFields(); setSaving(true); try { - const payload: Partial = { - userId: values.userId, - deviceCode: values.deviceCode, + await updateManagedDevice(editing.deviceId, { deviceName: values.deviceName, status: values.status - }; - - if (editing) { - await updateDevice(editing.deviceId, payload); - } else { - await createDevice(payload); - } - + }); message.success(t("devicesExt.operationSucceeded")); setOpen(false); await loadData(); @@ -120,9 +97,9 @@ export default function Devices() { } }; - const remove = async (id: number) => { - await deleteDevice(id); - message.success(t("devicesExt.operationSucceeded")); + const kick = async (record: DeviceInfo) => { + await kickManagedDevice(record.deviceId); + message.success(t("devicesExt.kickSucceeded")); await loadData(); }; @@ -130,26 +107,57 @@ export default function Devices() {
+ + + +
+
+
+
{t("devicesExt.totalDevices")}
+
{stats.total}
+
{t("devicesExt.totalDevicesHint")}
+
+
+ + + +
+
+
+
{t("devicesExt.onlineDevices")}
+
{stats.online}
+
{t("devicesExt.onlineDevicesHint")}
+
+
+ + + +
+
+
+
{t("devicesExt.enabledDevices")}
+
{stats.enabled}
+
{t("devicesExt.enabledDevicesHint")}
+
+
+ +
+
-
- } - style={{ width: 360 }} - value={searchText} - onChange={(event) => setSearchText(event.target.value)} - allowClear - aria-label={t("devicesExt.searchLabel")} - /> - - -
- {can("device:create") ? ( - - ) : null} + } + style={{ width: 420 }} + value={searchText} + onChange={(event) => setSearchText(event.target.value)} + allowClear + aria-label={t("devicesExt.searchLabel")} + /> +
@@ -159,12 +167,13 @@ export default function Devices() { dataSource={filteredData} loading={loading} size="middle" - scroll={{ y: "calc(100vh - 350px)" }} + scroll={{ y: "calc(100vh - 350px)", x: 1200 }} pagination={getStandardPagination(filteredData.length, 1, 1000)} columns={[ { title: t("devicesExt.device"), key: "device", + width: 280, render: (_value: unknown, record) => (
@@ -180,21 +189,52 @@ export default function Devices() { { title: t("devices.owner"), key: "user", + width: 220, render: (_value: unknown, record) => { - const owner = userMap[record.userId]; - return owner ? ( + if (!record.userId) { + return {t("devicesExt.unboundAccount")}; + } + return ( - ) : ( - {t("devicesExt.ownerId")}: {record.userId} ); } }, + { + title: t("devicesExt.terminalType"), + dataIndex: "terminalType", + width: 140, + render: (text: string) => {text || "-"} + }, + { + title: t("devicesExt.terminalVersion"), + dataIndex: "terminalVersion", + width: 160, + render: (text: string) => {text || "-"} + }, + { + title: t("devicesExt.onlineStatus"), + dataIndex: "online", + width: 120, + render: (online: boolean) => ( + {online ? t("devicesExt.online") : t("devicesExt.offline")} + ) + }, + { + title: t("devicesExt.lastOnlineAt"), + dataIndex: "lastOnlineAt", + width: 180, + render: (text: string) => ( + + {text ? text.replace("T", " ").substring(0, 19) : "-"} + + ) + }, { title: t("common.status"), dataIndex: "status", @@ -217,16 +257,21 @@ export default function Devices() { { title: t("common.action"), key: "action", - width: 120, + width: 140, fixed: "right", render: (_value: unknown, record) => ( {can("device:update") ? (
); -} \ No newline at end of file +} diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 0c3be24..261eefe 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -84,9 +84,15 @@ export interface PermissionNode extends SysPermission { export interface DeviceInfo extends BaseEntity { deviceId: number; - userId: number; + userId?: number; + username?: string; + displayName?: string; deviceCode: string; deviceName?: string; + terminalType?: string; + terminalVersion?: string; + online?: boolean; + lastOnlineAt?: string; } export interface SysDictType extends BaseEntity {