imeeting/backend/src/main/java/com/imeeting/grpc/gateway/AndroidGatewayGrpcService.java

117 lines
5.1 KiB
Java

package com.imeeting.grpc.gateway;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidDeviceSessionState;
import com.imeeting.grpc.common.ErrorEvent;
import com.imeeting.service.android.AndroidAuthService;
import com.imeeting.service.android.AndroidDeviceSessionService;
import com.imeeting.service.android.AndroidGatewayPushService;
import io.grpc.BindableService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class AndroidGatewayGrpcService extends AndroidGatewayServiceGrpc.AndroidGatewayServiceImplBase implements BindableService {
private final AndroidAuthService androidAuthService;
private final AndroidDeviceSessionService androidDeviceSessionService;
private final AndroidGatewayPushService androidGatewayPushService;
private final GrpcServerProperties grpcServerProperties;
@Override
public StreamObserver<GatewayClientPacket> connect(StreamObserver<GatewayServerPacket> responseObserver) {
return new StreamObserver<>() {
private String connectionId;
private String deviceId;
private AndroidAuthContext authContext;
@Override
public void onNext(GatewayClientPacket packet) {
try {
switch (packet.getBodyCase()) {
case HELLO -> handleHello(packet);
case HEARTBEAT -> handleHeartbeat(packet.getHeartbeat());
case SUBSCRIBE -> handleSubscribe(packet.getSubscribe());
case ACK, BODY_NOT_SET -> {
}
}
} catch (Exception ex) {
sendError(responseObserver, ex.getMessage());
}
}
@Override
public void onError(Throwable t) {
cleanup();
}
@Override
public void onCompleted() {
cleanup();
responseObserver.onCompleted();
}
private void handleHello(GatewayClientPacket packet) {
authContext = androidAuthService.authenticateGrpc(packet.getAuth(), packet.getHello().getDeviceId());
AndroidDeviceSessionState sessionState = androidDeviceSessionService.openSession(authContext, packet.getHello());
connectionId = sessionState.getConnectionId();
deviceId = sessionState.getDeviceId();
androidGatewayPushService.register(connectionId, deviceId, responseObserver);
responseObserver.onNext(GatewayServerPacket.newBuilder()
.setRequestId(packet.getRequestId())
.setHelloAck(HelloAck.newBuilder()
.setConnectionId(connectionId)
.setAuthMode(authContext.getAuthMode())
.setServerTime(System.currentTimeMillis())
.setHeartbeatIntervalSeconds(grpcServerProperties.getGateway().getHeartbeatIntervalSeconds())
.build())
.build());
}
private void handleHeartbeat(Heartbeat heartbeat) {
if (heartbeat == null || connectionId == null) {
return;
}
AndroidDeviceSessionState state = androidDeviceSessionService.refreshHeartbeat(connectionId, heartbeat.getClientTime());
if (state == null) {
sendError(responseObserver, "未找到安卓设备会话");
return;
}
responseObserver.onNext(GatewayServerPacket.newBuilder()
.setPong(Pong.newBuilder()
.setConnectionId(connectionId)
.setServerTime(System.currentTimeMillis())
.build())
.build());
}
private void handleSubscribe(Subscribe subscribe) {
if (deviceId == null) {
return;
}
androidDeviceSessionService.updateTopics(deviceId, subscribe.getTopicsList());
}
private void cleanup() {
if (connectionId != null) {
androidGatewayPushService.unregister(connectionId);
androidDeviceSessionService.closeSession(connectionId);
}
}
};
}
private void sendError(StreamObserver<GatewayServerPacket> responseObserver, String message) {
responseObserver.onNext(GatewayServerPacket.newBuilder()
.setError(ErrorEvent.newBuilder()
.setCode("ANDROID_GATEWAY_ERROR")
.setMessage(message == null ? "网关处理失败" : message)
.setRetryable(false)
.build())
.build());
}
}