From 52becc4f3f7ccd1912e2decad9cbafd39b774134 Mon Sep 17 00:00:00 2001
From: chenhao <852066789@qq.com>
Date: Fri, 5 Sep 2025 15:37:10 +0800
Subject: [PATCH] =?UTF-8?q?feat(grpc):=20=E5=A2=9E=E5=8A=A0=E5=AE=A2?=
=?UTF-8?q?=E6=88=B7=E7=AB=AF=E5=BF=83=E8=B7=B3=E6=A3=80=E6=B5=8B=E5=92=8C?=
=?UTF-8?q?=E8=BF=9E=E6=8E=A5=E7=AE=A1=E7=90=86?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 在 application.yml 中添加 gRPC服务器心跳检测配置
- 重构 ClientNotificationServiceImpl 类,使用 client_id 和 client_user 组合作为唯一标识
- 新增 GrpcServerConfig 配置类,用于配置心跳检测参数
- 更新 UserServiceImpl,禁止将用户名设置为默认用户
---
.../project/config/GrpcServerConfig.java | 45 ++++++++++
.../ClientNotificationServiceImpl.java | 85 ++++++++++++++++---
.../project/service/impl/UserServiceImpl.java | 7 ++
nex-be/src/main/resources/application.yml | 5 ++
4 files changed, 130 insertions(+), 12 deletions(-)
create mode 100644 nex-be/src/main/java/com/unisinsight/project/config/GrpcServerConfig.java
diff --git a/nex-be/src/main/java/com/unisinsight/project/config/GrpcServerConfig.java b/nex-be/src/main/java/com/unisinsight/project/config/GrpcServerConfig.java
new file mode 100644
index 0000000..5157dd4
--- /dev/null
+++ b/nex-be/src/main/java/com/unisinsight/project/config/GrpcServerConfig.java
@@ -0,0 +1,45 @@
+package com.unisinsight.project.config;
+
+import io.grpc.ServerBuilder;
+import net.devh.boot.grpc.server.serverfactory.GrpcServerConfigurer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author : ch
+ * @version : 1.0
+ * @ClassName : GrpcServerConfig
+ * @Description : gRPC服务器配置类,用于配置心跳检测等参数
+ * @DATE : Created in 10:30 2025/9/5
+ *
Copyright: Copyright(c) 2025
+ * Company : 紫光汇智信息技术有限公司
+ * Modification History:
+ * Date Author Version Discription
+ * --------------------------------------------------------------------------
+ * 2025/09/05 ch 1.0 Why & What is modified: 创建配置类以支持心跳检测 *
+ */
+@Configuration
+public class GrpcServerConfig {
+
+ /**
+ * 配置gRPC服务器的心跳检测参数
+ * 用于检测客户端物理断开连接(如关机、拔网线等)
+ *
+ * @return GrpcServerConfigurer
+ */
+ @Bean
+ public GrpcServerConfigurer keepAliveServerConfigurer() {
+ return serverBuilder -> {
+ if (serverBuilder instanceof ServerBuilder) {
+ ServerBuilder> builder = (ServerBuilder>) serverBuilder;
+ // 启用keepAlive
+ builder.keepAliveTime(30, TimeUnit.SECONDS) // 每30秒发送一次keepalive探测
+ .keepAliveTimeout(5, TimeUnit.SECONDS) // keepalive探测超时时间为5秒
+ .permitKeepAliveTime(10, TimeUnit.SECONDS) // 允许客户端每10秒发送一次keepalive探测
+ .permitKeepAliveWithoutCalls(true); // 允许客户端在没有活跃调用时发送keepalive探测
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/nex-be/src/main/java/com/unisinsight/project/grpc/service/ClientNotificationServiceImpl.java b/nex-be/src/main/java/com/unisinsight/project/grpc/service/ClientNotificationServiceImpl.java
index 3a2124f..bcc8fec 100644
--- a/nex-be/src/main/java/com/unisinsight/project/grpc/service/ClientNotificationServiceImpl.java
+++ b/nex-be/src/main/java/com/unisinsight/project/grpc/service/ClientNotificationServiceImpl.java
@@ -27,19 +27,25 @@ import java.util.concurrent.ExecutorService;
* @Description :
* @DATE : Created in 9:17 2025/8/21
* Copyright: Copyright(c) 2025
- * Company : 紫光汇智信息技术有限公司
+ * Company : 紫光汇智信息技术有限公司
* Modification History:
* Date Author Version Discription
* --------------------------------------------------------------------------
* 2025/08/21 ch 1.0 Why & What is modified: <修改原因描述> *
+ * 2025/09/05 ch 1.1 修改客户端连接标识,使用client_id和client_user组合作为唯一标识 *
*/
@GrpcService
@Slf4j
public class ClientNotificationServiceImpl extends NotificationServiceGrpc.NotificationServiceImplBase implements SendNotificationService {
- // 保存客户端连接的映射表
+ // 保存客户端连接的映射表,使用client_id和client_user组合作为key
private final ConcurrentHashMap> clientStreamMap =
new ConcurrentHashMap<>();
private final static int MAX_CLIENT_COUNT = 1000;
+
+ // 生成客户端连接key的工具方法
+ private String generateClientKey(String clientId, String clientUser) {
+ return clientId + "#" + clientUser;
+ }
@Autowired
@@ -89,15 +95,35 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
// 注册一个取消监听器来监听客户端断开连接
context.addListener(context1 -> {
// 客户端断开连接时的处理逻辑
- log.info("通过Context监听到客户端断开连接: {}", clientSn);
- removeClientConnection(clientSn);
- log.info("清理客户端连接: {}, 剩余客户端数: {}", clientSn, clientStreamMap.size());
+ log.info("通过Context监听到客户端断开连接: {}, 用户: {}", clientSn, clientUser);
+ removeClientConnection(clientSn, clientUser);
+ log.info("清理客户端连接: {}#{}, 剩余客户端数: {}", clientSn, clientUser, clientStreamMap.size());
}, executorService);
}
private StreamObserver removeClientConnection(String clientId) {
- StreamObserver remove = clientStreamMap.remove(clientId);
+ // 遍历所有连接,移除匹配的客户端
+ StreamObserver remove = null;
+ for (String key : clientStreamMap.keySet()) {
+ if (key.startsWith(clientId + "#")) {
+ remove = clientStreamMap.remove(key);
+ break;
+ }
+ }
+
+ //更新设备状态
+ LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>();
+ //离线
+ updateWrapper.set(Device::getDeviceStatus, 0);
+ updateWrapper.eq(Device::getDeviceId, clientId);
+ deviceService.update(updateWrapper);
+ return remove;
+ }
+
+ private StreamObserver removeClientConnection(String clientId, String clientUser) {
+ String clientKey = generateClientKey(clientId, clientUser);
+ StreamObserver remove = clientStreamMap.remove(clientKey);
//更新设备状态
LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>();
//离线
@@ -117,8 +143,11 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
responseObserver.onCompleted();
return;
}
+
+ String clientKey = generateClientKey(clientId, clientUser);
+
// 使用 putIfAbsent 确保线程安全的注册
- StreamObserver previousObserver = clientStreamMap.putIfAbsent(clientId, responseObserver);
+ StreamObserver previousObserver = clientStreamMap.putIfAbsent(clientKey, responseObserver);
if (previousObserver != null) {
// 如果已经存在连接,关闭旧连接
@@ -132,10 +161,10 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
log.warn("关闭旧连接时出错: {}", e.getMessage());
}
// 更新为新的连接
- clientStreamMap.put(clientId, responseObserver);
+ clientStreamMap.put(clientKey, responseObserver);
}
- log.info("客户端注册: {}, 总客户端: {}", clientId, clientStreamMap.size());
+ log.info("客户端注册: {}, 总客户端: {}", clientKey, clientStreamMap.size());
// 连接成功后更新设备状态
LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>();
//在线
@@ -171,7 +200,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
log.info("客户端注销:client_sn:[{}],client_user: [{}]", clientSn, clientUser);
// 安全地移除并关闭连接
- StreamObserver notificationStream = removeClientConnection(clientSn);
+ StreamObserver notificationStream = removeClientConnection(clientSn, clientUser);
if (notificationStream != null) {
try {
notificationStream.onNext(NotificationMessage.newBuilder()
@@ -194,7 +223,39 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
@Override
public Result> sendNotification(String clientId, NotificationMessage notification) {
- StreamObserver notificationStream = clientStreamMap.get(clientId);
+ // 遍历所有匹配的客户端连接并发送消息
+ boolean found = false;
+ for (String key : clientStreamMap.keySet()) {
+ if (key.startsWith(clientId + "#")) {
+ StreamObserver notificationStream = clientStreamMap.get(key);
+ if (notificationStream != null) {
+ try {
+ notificationStream.onNext(notification);
+ found = true;
+ } catch (Exception e) {
+ log.error("发送消息失败,错误信息:{},错误详情:{}", e.getMessage(), e.getStackTrace());
+ removeClientConnection(clientId, key.split("#")[1]);
+ }
+ }
+ }
+ }
+
+ if (!found) {
+ return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "未找到对应的客户端连接");
+ }
+ return Result.successResult();
+ }
+
+ /**
+ * 向指定的客户端用户发送通知
+ * @param clientId 客户端ID
+ * @param clientUser 客户端用户
+ * @param notification 通知消息
+ * @return 发送结果
+ */
+ public Result> sendNotification(String clientId, String clientUser, NotificationMessage notification) {
+ String clientKey = generateClientKey(clientId, clientUser);
+ StreamObserver notificationStream = clientStreamMap.get(clientKey);
if (notificationStream == null) {
return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "未找到对应的客户端连接");
}
@@ -202,7 +263,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
notificationStream.onNext(notification);
} catch (Exception e) {
log.error("发送消息失败,错误信息:{},错误详情:{}", e.getMessage(), e.getStackTrace());
- removeClientConnection(clientId);
+ removeClientConnection(clientId, clientUser);
return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "对应的客户端连接失效");
}
return Result.successResult();
diff --git a/nex-be/src/main/java/com/unisinsight/project/service/impl/UserServiceImpl.java b/nex-be/src/main/java/com/unisinsight/project/service/impl/UserServiceImpl.java
index 4f04dcf..0e47869 100644
--- a/nex-be/src/main/java/com/unisinsight/project/service/impl/UserServiceImpl.java
+++ b/nex-be/src/main/java/com/unisinsight/project/service/impl/UserServiceImpl.java
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
public class UserServiceImpl extends ServiceImpl
implements UserService {
+ public static final String DEFAULT_USER = "NexOS";
@Resource
private UserMapper userMapper;
@@ -52,6 +53,9 @@ public class UserServiceImpl extends ServiceImpl
@Override
public Result> insert(UserReq userReq) {
+ if (DEFAULT_USER.equals(userReq.getUserName())) {
+ return Result.errorResult(BaseErrorCode.USER_NAME_EXIST, "用户名不能定义为" + DEFAULT_USER);
+ }
User user = BeanUtil.copyProperties(userReq, User.class);
user.setCreateUser("admin");
user.setUpdateUser("admin");
@@ -66,6 +70,9 @@ public class UserServiceImpl extends ServiceImpl
@Override
public Result> update(UserReq userReq) {
+ if (DEFAULT_USER.equals(userReq.getUserName())) {
+ return Result.errorResult(BaseErrorCode.USER_NAME_EXIST, "用户名不能定义为" + DEFAULT_USER);
+ }
User user = BeanUtil.copyProperties(userReq, User.class);
user.setUpdateUser("admin");
int updated = userMapper.updateById(user);
diff --git a/nex-be/src/main/resources/application.yml b/nex-be/src/main/resources/application.yml
index ed69508..7efd7d3 100644
--- a/nex-be/src/main/resources/application.yml
+++ b/nex-be/src/main/resources/application.yml
@@ -53,6 +53,11 @@ grpc:
server:
# 指定Grpc暴露的端口,后续客户端通过这个端口访问
port: 50051
+ # gRPC服务器心跳检测配置
+ keep-alive-time: 30s
+ keep-alive-timeout: 5s
+ permit-keep-alive-time: 10s
+ permit-keep-alive-without-calls: true
# Feign 配置
feign: