diff --git a/nex-be/src/main/java/com/unisinsight/project/config/GrpcServerConfig.java b/nex-be/src/main/java/com/unisinsight/project/config/GrpcServerConfig.java new file mode 100644 index 0000000..5157dd4 --- /dev/null +++ b/nex-be/src/main/java/com/unisinsight/project/config/GrpcServerConfig.java @@ -0,0 +1,45 @@ +package com.unisinsight.project.config; + +import io.grpc.ServerBuilder; +import net.devh.boot.grpc.server.serverfactory.GrpcServerConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.TimeUnit; + +/** + * @author : ch + * @version : 1.0 + * @ClassName : GrpcServerConfig + * @Description : gRPC服务器配置类,用于配置心跳检测等参数 + * @DATE : Created in 10:30 2025/9/5 + *
       Copyright: Copyright(c) 2025     
+ *
       Company :   紫光汇智信息技术有限公司           
+ * Modification History: + * Date Author Version Discription + * -------------------------------------------------------------------------- + * 2025/09/05 ch 1.0 Why & What is modified: 创建配置类以支持心跳检测 * + */ +@Configuration +public class GrpcServerConfig { + + /** + * 配置gRPC服务器的心跳检测参数 + * 用于检测客户端物理断开连接(如关机、拔网线等) + * + * @return GrpcServerConfigurer + */ + @Bean + public GrpcServerConfigurer keepAliveServerConfigurer() { + return serverBuilder -> { + if (serverBuilder instanceof ServerBuilder) { + ServerBuilder builder = (ServerBuilder) serverBuilder; + // 启用keepAlive + builder.keepAliveTime(30, TimeUnit.SECONDS) // 每30秒发送一次keepalive探测 + .keepAliveTimeout(5, TimeUnit.SECONDS) // keepalive探测超时时间为5秒 + .permitKeepAliveTime(10, TimeUnit.SECONDS) // 允许客户端每10秒发送一次keepalive探测 + .permitKeepAliveWithoutCalls(true); // 允许客户端在没有活跃调用时发送keepalive探测 + } + }; + } +} \ No newline at end of file diff --git a/nex-be/src/main/java/com/unisinsight/project/grpc/service/ClientNotificationServiceImpl.java b/nex-be/src/main/java/com/unisinsight/project/grpc/service/ClientNotificationServiceImpl.java index 3a2124f..bcc8fec 100644 --- a/nex-be/src/main/java/com/unisinsight/project/grpc/service/ClientNotificationServiceImpl.java +++ b/nex-be/src/main/java/com/unisinsight/project/grpc/service/ClientNotificationServiceImpl.java @@ -27,19 +27,25 @@ import java.util.concurrent.ExecutorService; * @Description : * @DATE : Created in 9:17 2025/8/21 *
       Copyright: Copyright(c) 2025     
- *
       Company :   	紫光汇智信息技术有限公司		           
+ *
       Company :  	紫光汇智信息技术有限公司	           
* Modification History: * Date Author Version Discription * -------------------------------------------------------------------------- * 2025/08/21 ch 1.0 Why & What is modified: <修改原因描述> * + * 2025/09/05 ch 1.1 修改客户端连接标识,使用client_id和client_user组合作为唯一标识 * */ @GrpcService @Slf4j public class ClientNotificationServiceImpl extends NotificationServiceGrpc.NotificationServiceImplBase implements SendNotificationService { - // 保存客户端连接的映射表 + // 保存客户端连接的映射表,使用client_id和client_user组合作为key private final ConcurrentHashMap> clientStreamMap = new ConcurrentHashMap<>(); private final static int MAX_CLIENT_COUNT = 1000; + + // 生成客户端连接key的工具方法 + private String generateClientKey(String clientId, String clientUser) { + return clientId + "#" + clientUser; + } @Autowired @@ -89,15 +95,35 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif // 注册一个取消监听器来监听客户端断开连接 context.addListener(context1 -> { // 客户端断开连接时的处理逻辑 - log.info("通过Context监听到客户端断开连接: {}", clientSn); - removeClientConnection(clientSn); - log.info("清理客户端连接: {}, 剩余客户端数: {}", clientSn, clientStreamMap.size()); + log.info("通过Context监听到客户端断开连接: {}, 用户: {}", clientSn, clientUser); + removeClientConnection(clientSn, clientUser); + log.info("清理客户端连接: {}#{}, 剩余客户端数: {}", clientSn, clientUser, clientStreamMap.size()); }, executorService); } private StreamObserver removeClientConnection(String clientId) { - StreamObserver remove = clientStreamMap.remove(clientId); + // 遍历所有连接,移除匹配的客户端 + StreamObserver remove = null; + for (String key : clientStreamMap.keySet()) { + if (key.startsWith(clientId + "#")) { + remove = clientStreamMap.remove(key); + break; + } + } + + //更新设备状态 + LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); + //离线 + updateWrapper.set(Device::getDeviceStatus, 0); + updateWrapper.eq(Device::getDeviceId, clientId); + deviceService.update(updateWrapper); + return remove; + } + + private StreamObserver removeClientConnection(String clientId, String clientUser) { + String clientKey = generateClientKey(clientId, clientUser); + StreamObserver remove = clientStreamMap.remove(clientKey); //更新设备状态 LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); //离线 @@ -117,8 +143,11 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif responseObserver.onCompleted(); return; } + + String clientKey = generateClientKey(clientId, clientUser); + // 使用 putIfAbsent 确保线程安全的注册 - StreamObserver previousObserver = clientStreamMap.putIfAbsent(clientId, responseObserver); + StreamObserver previousObserver = clientStreamMap.putIfAbsent(clientKey, responseObserver); if (previousObserver != null) { // 如果已经存在连接,关闭旧连接 @@ -132,10 +161,10 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif log.warn("关闭旧连接时出错: {}", e.getMessage()); } // 更新为新的连接 - clientStreamMap.put(clientId, responseObserver); + clientStreamMap.put(clientKey, responseObserver); } - log.info("客户端注册: {}, 总客户端: {}", clientId, clientStreamMap.size()); + log.info("客户端注册: {}, 总客户端: {}", clientKey, clientStreamMap.size()); // 连接成功后更新设备状态 LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); //在线 @@ -171,7 +200,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif log.info("客户端注销:client_sn:[{}],client_user: [{}]", clientSn, clientUser); // 安全地移除并关闭连接 - StreamObserver notificationStream = removeClientConnection(clientSn); + StreamObserver notificationStream = removeClientConnection(clientSn, clientUser); if (notificationStream != null) { try { notificationStream.onNext(NotificationMessage.newBuilder() @@ -194,7 +223,39 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif @Override public Result sendNotification(String clientId, NotificationMessage notification) { - StreamObserver notificationStream = clientStreamMap.get(clientId); + // 遍历所有匹配的客户端连接并发送消息 + boolean found = false; + for (String key : clientStreamMap.keySet()) { + if (key.startsWith(clientId + "#")) { + StreamObserver notificationStream = clientStreamMap.get(key); + if (notificationStream != null) { + try { + notificationStream.onNext(notification); + found = true; + } catch (Exception e) { + log.error("发送消息失败,错误信息:{},错误详情:{}", e.getMessage(), e.getStackTrace()); + removeClientConnection(clientId, key.split("#")[1]); + } + } + } + } + + if (!found) { + return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "未找到对应的客户端连接"); + } + return Result.successResult(); + } + + /** + * 向指定的客户端用户发送通知 + * @param clientId 客户端ID + * @param clientUser 客户端用户 + * @param notification 通知消息 + * @return 发送结果 + */ + public Result sendNotification(String clientId, String clientUser, NotificationMessage notification) { + String clientKey = generateClientKey(clientId, clientUser); + StreamObserver notificationStream = clientStreamMap.get(clientKey); if (notificationStream == null) { return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "未找到对应的客户端连接"); } @@ -202,7 +263,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif notificationStream.onNext(notification); } catch (Exception e) { log.error("发送消息失败,错误信息:{},错误详情:{}", e.getMessage(), e.getStackTrace()); - removeClientConnection(clientId); + removeClientConnection(clientId, clientUser); return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "对应的客户端连接失效"); } return Result.successResult(); diff --git a/nex-be/src/main/java/com/unisinsight/project/service/impl/UserServiceImpl.java b/nex-be/src/main/java/com/unisinsight/project/service/impl/UserServiceImpl.java index 4f04dcf..0e47869 100644 --- a/nex-be/src/main/java/com/unisinsight/project/service/impl/UserServiceImpl.java +++ b/nex-be/src/main/java/com/unisinsight/project/service/impl/UserServiceImpl.java @@ -41,6 +41,7 @@ import java.util.stream.Collectors; public class UserServiceImpl extends ServiceImpl implements UserService { + public static final String DEFAULT_USER = "NexOS"; @Resource private UserMapper userMapper; @@ -52,6 +53,9 @@ public class UserServiceImpl extends ServiceImpl @Override public Result insert(UserReq userReq) { + if (DEFAULT_USER.equals(userReq.getUserName())) { + return Result.errorResult(BaseErrorCode.USER_NAME_EXIST, "用户名不能定义为" + DEFAULT_USER); + } User user = BeanUtil.copyProperties(userReq, User.class); user.setCreateUser("admin"); user.setUpdateUser("admin"); @@ -66,6 +70,9 @@ public class UserServiceImpl extends ServiceImpl @Override public Result update(UserReq userReq) { + if (DEFAULT_USER.equals(userReq.getUserName())) { + return Result.errorResult(BaseErrorCode.USER_NAME_EXIST, "用户名不能定义为" + DEFAULT_USER); + } User user = BeanUtil.copyProperties(userReq, User.class); user.setUpdateUser("admin"); int updated = userMapper.updateById(user); diff --git a/nex-be/src/main/resources/application.yml b/nex-be/src/main/resources/application.yml index ed69508..7efd7d3 100644 --- a/nex-be/src/main/resources/application.yml +++ b/nex-be/src/main/resources/application.yml @@ -53,6 +53,11 @@ grpc: server: # 指定Grpc暴露的端口,后续客户端通过这个端口访问 port: 50051 + # gRPC服务器心跳检测配置 + keep-alive-time: 30s + keep-alive-timeout: 5s + permit-keep-alive-time: 10s + permit-keep-alive-without-calls: true # Feign 配置 feign: