feat(grpc): 增加客户端心跳检测和连接管理

- 在 application.yml 中添加 gRPC服务器心跳检测配置
- 重构 ClientNotificationServiceImpl 类,使用 client_id 和 client_user 组合作为唯一标识
- 新增 GrpcServerConfig 配置类,用于配置心跳检测参数
- 更新 UserServiceImpl,禁止将用户名设置为默认用户
master
chenhao 2025-09-05 15:37:10 +08:00
parent de1823ec28
commit 52becc4f3f
4 changed files with 130 additions and 12 deletions

View File

@ -0,0 +1,45 @@
package com.unisinsight.project.config;
import io.grpc.ServerBuilder;
import net.devh.boot.grpc.server.serverfactory.GrpcServerConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
/**
* @author : ch
* @version : 1.0
* @ClassName : GrpcServerConfig
* @Description : gRPC
* @DATE : Created in 10:30 2025/9/5
* <pre> Copyright: Copyright(c) 2025 </pre>
* <pre> Company : </pre>
* Modification History:
* Date Author Version Discription
* --------------------------------------------------------------------------
* 2025/09/05 ch 1.0 Why & What is modified: *
*/
@Configuration
public class GrpcServerConfig {
/**
* gRPC
* 线
*
* @return GrpcServerConfigurer
*/
@Bean
public GrpcServerConfigurer keepAliveServerConfigurer() {
return serverBuilder -> {
if (serverBuilder instanceof ServerBuilder) {
ServerBuilder<?> builder = (ServerBuilder<?>) serverBuilder;
// 启用keepAlive
builder.keepAliveTime(30, TimeUnit.SECONDS) // 每30秒发送一次keepalive探测
.keepAliveTimeout(5, TimeUnit.SECONDS) // keepalive探测超时时间为5秒
.permitKeepAliveTime(10, TimeUnit.SECONDS) // 允许客户端每10秒发送一次keepalive探测
.permitKeepAliveWithoutCalls(true); // 允许客户端在没有活跃调用时发送keepalive探测
}
};
}
}

View File

@ -27,20 +27,26 @@ import java.util.concurrent.ExecutorService;
* @Description :
* @DATE : Created in 9:17 2025/8/21
* <pre> Copyright: Copyright(c) 2025 </pre>
* <pre> Company : </pre>
* <pre> Company : </pre>
* Modification History:
* Date Author Version Discription
* --------------------------------------------------------------------------
* 2025/08/21 ch 1.0 Why & What is modified: <> *
* 2025/09/05 ch 1.1 使client_idclient_user *
*/
@GrpcService
@Slf4j
public class ClientNotificationServiceImpl extends NotificationServiceGrpc.NotificationServiceImplBase implements SendNotificationService {
// 保存客户端连接的映射表
// 保存客户端连接的映射表使用client_id和client_user组合作为key
private final ConcurrentHashMap<String, StreamObserver<NotificationMessage>> clientStreamMap =
new ConcurrentHashMap<>();
private final static int MAX_CLIENT_COUNT = 1000;
// 生成客户端连接key的工具方法
private String generateClientKey(String clientId, String clientUser) {
return clientId + "#" + clientUser;
}
@Autowired
@Qualifier("grpcConnectionListenerExecutor")
@ -89,15 +95,35 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
// 注册一个取消监听器来监听客户端断开连接
context.addListener(context1 -> {
// 客户端断开连接时的处理逻辑
log.info("通过Context监听到客户端断开连接: {}", clientSn);
removeClientConnection(clientSn);
log.info("清理客户端连接: {}, 剩余客户端数: {}", clientSn, clientStreamMap.size());
log.info("通过Context监听到客户端断开连接: {}, 用户: {}", clientSn, clientUser);
removeClientConnection(clientSn, clientUser);
log.info("清理客户端连接: {}#{}, 剩余客户端数: {}", clientSn, clientUser, clientStreamMap.size());
}, executorService);
}
private StreamObserver<NotificationMessage> removeClientConnection(String clientId) {
StreamObserver<NotificationMessage> remove = clientStreamMap.remove(clientId);
// 遍历所有连接,移除匹配的客户端
StreamObserver<NotificationMessage> remove = null;
for (String key : clientStreamMap.keySet()) {
if (key.startsWith(clientId + "#")) {
remove = clientStreamMap.remove(key);
break;
}
}
//更新设备状态
LambdaUpdateWrapper<Device> updateWrapper = new LambdaUpdateWrapper<>();
//离线
updateWrapper.set(Device::getDeviceStatus, 0);
updateWrapper.eq(Device::getDeviceId, clientId);
deviceService.update(updateWrapper);
return remove;
}
private StreamObserver<NotificationMessage> removeClientConnection(String clientId, String clientUser) {
String clientKey = generateClientKey(clientId, clientUser);
StreamObserver<NotificationMessage> remove = clientStreamMap.remove(clientKey);
//更新设备状态
LambdaUpdateWrapper<Device> updateWrapper = new LambdaUpdateWrapper<>();
//离线
@ -117,8 +143,11 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
responseObserver.onCompleted();
return;
}
String clientKey = generateClientKey(clientId, clientUser);
// 使用 putIfAbsent 确保线程安全的注册
StreamObserver<NotificationMessage> previousObserver = clientStreamMap.putIfAbsent(clientId, responseObserver);
StreamObserver<NotificationMessage> previousObserver = clientStreamMap.putIfAbsent(clientKey, responseObserver);
if (previousObserver != null) {
// 如果已经存在连接,关闭旧连接
@ -132,10 +161,10 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
log.warn("关闭旧连接时出错: {}", e.getMessage());
}
// 更新为新的连接
clientStreamMap.put(clientId, responseObserver);
clientStreamMap.put(clientKey, responseObserver);
}
log.info("客户端注册: {}, 总客户端: {}", clientId, clientStreamMap.size());
log.info("客户端注册: {}, 总客户端: {}", clientKey, clientStreamMap.size());
// 连接成功后更新设备状态
LambdaUpdateWrapper<Device> updateWrapper = new LambdaUpdateWrapper<>();
//在线
@ -171,7 +200,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
log.info("客户端注销client_sn:[{}],client_user: [{}]", clientSn, clientUser);
// 安全地移除并关闭连接
StreamObserver<NotificationMessage> notificationStream = removeClientConnection(clientSn);
StreamObserver<NotificationMessage> notificationStream = removeClientConnection(clientSn, clientUser);
if (notificationStream != null) {
try {
notificationStream.onNext(NotificationMessage.newBuilder()
@ -194,7 +223,39 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
@Override
public Result<?> sendNotification(String clientId, NotificationMessage notification) {
StreamObserver<NotificationMessage> notificationStream = clientStreamMap.get(clientId);
// 遍历所有匹配的客户端连接并发送消息
boolean found = false;
for (String key : clientStreamMap.keySet()) {
if (key.startsWith(clientId + "#")) {
StreamObserver<NotificationMessage> notificationStream = clientStreamMap.get(key);
if (notificationStream != null) {
try {
notificationStream.onNext(notification);
found = true;
} catch (Exception e) {
log.error("发送消息失败,错误信息:{},错误详情:{}", e.getMessage(), e.getStackTrace());
removeClientConnection(clientId, key.split("#")[1]);
}
}
}
}
if (!found) {
return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "未找到对应的客户端连接");
}
return Result.successResult();
}
/**
*
* @param clientId ID
* @param clientUser
* @param notification
* @return
*/
public Result<?> sendNotification(String clientId, String clientUser, NotificationMessage notification) {
String clientKey = generateClientKey(clientId, clientUser);
StreamObserver<NotificationMessage> notificationStream = clientStreamMap.get(clientKey);
if (notificationStream == null) {
return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "未找到对应的客户端连接");
}
@ -202,7 +263,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
notificationStream.onNext(notification);
} catch (Exception e) {
log.error("发送消息失败,错误信息:{},错误详情:{}", e.getMessage(), e.getStackTrace());
removeClientConnection(clientId);
removeClientConnection(clientId, clientUser);
return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "对应的客户端连接失效");
}
return Result.successResult();

View File

@ -41,6 +41,7 @@ import java.util.stream.Collectors;
public class UserServiceImpl extends ServiceImpl<UserMapper, User>
implements UserService {
public static final String DEFAULT_USER = "NexOS";
@Resource
private UserMapper userMapper;
@ -52,6 +53,9 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User>
@Override
public Result<?> insert(UserReq userReq) {
if (DEFAULT_USER.equals(userReq.getUserName())) {
return Result.errorResult(BaseErrorCode.USER_NAME_EXIST, "用户名不能定义为" + DEFAULT_USER);
}
User user = BeanUtil.copyProperties(userReq, User.class);
user.setCreateUser("admin");
user.setUpdateUser("admin");
@ -66,6 +70,9 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User>
@Override
public Result<?> update(UserReq userReq) {
if (DEFAULT_USER.equals(userReq.getUserName())) {
return Result.errorResult(BaseErrorCode.USER_NAME_EXIST, "用户名不能定义为" + DEFAULT_USER);
}
User user = BeanUtil.copyProperties(userReq, User.class);
user.setUpdateUser("admin");
int updated = userMapper.updateById(user);

View File

@ -53,6 +53,11 @@ grpc:
server:
# 指定Grpc暴露的端口后续客户端通过这个端口访问
port: 50051
# gRPC服务器心跳检测配置
keep-alive-time: 30s
keep-alive-timeout: 5s
permit-keep-alive-time: 10s
permit-keep-alive-without-calls: true
# Feign 配置
feign: