feat(grpc): 优化客户端连接管理并添加镜像更新通知功能

- 改进客户端连接键生成逻辑,加入时间戳以区分不同连接
- 优化客户端连接和断开连接的处理逻辑- 添加镜像更新通知功能,在更新镜像时发送通知给客户端
- 重构代码,移除不必要的方法,提高代码可维护性
master
chenhao 2025-09-09 15:00:49 +08:00
parent 13dff08c82
commit 1d729350ec
9 changed files with 38 additions and 44 deletions

View File

@ -62,6 +62,10 @@ public class DeviceImageMappingController {
boolean removedByIds = deviceImageMappingService.removeByIds(collect);
log.info("终端镜像映射新增接口删除了 {} 条旧数据ID列表: {}", removedByIds, collect);
}
//发送消息
notificationService.sendNotification(deviceImageMappingReq.getDeviceId(), NotificationMessage.newBuilder()
.setType(GrpcTypeEnum.IMAGE_UPDATE.getType())
.setContent(GrpcTypeEnum.IMAGE_UPDATE.getDesc()).build());
return Result.successResult();
}
List<DeviceImageMappingReq> reqData = deviceImageMappingReq.getData();

View File

@ -34,6 +34,9 @@ public class ImageDesktop extends Model<ImageDesktop> {
@TableField(value = "desktop_name")
@ApiModelProperty("名称")
private String desktopName;
@TableField(value = "clone_name")
@ApiModelProperty("显示名称")
private String cloneName;
/**
* 1VHD 2VHDX 3QCOW2
**/

View File

@ -27,6 +27,9 @@ public class ImageDesktopReq {
@JsonProperty("desktop_name")
@ApiModelProperty("名称")
private String desktopName;
@JsonProperty("clone_name")
@ApiModelProperty("显示名称")
private String cloneName;
/**
* 1VHD 2VHDX 3QCOW2
**/

View File

@ -72,6 +72,9 @@ public class ImageVirtualMachinesReq {
@ApiModelProperty("驱动名称")
private String imageToolName;
@JsonProperty("clone_name")
@ApiModelProperty("克隆名称")
private String cloneName;
/**

View File

@ -29,6 +29,9 @@ public class ImageDesktopRes implements Serializable {
@JsonProperty("desktop_name")
@ApiModelProperty("名称")
private String desktopName;
@JsonProperty("clone_name")
@ApiModelProperty("显示名称")
private String cloneName;
/**
* 1VHD 2VHDX 3QCOW2
**/

View File

@ -44,7 +44,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
public static final String DEFAULT_USER = "NexOS";
// 生成客户端连接key的工具方法
private String generateClientKey(String clientId, String clientUser) {
return clientId + "#" + clientUser;
return clientId + "#" + clientUser + "#" + System.currentTimeMillis();
}
@ -74,7 +74,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
log.info("客户端连接client_sn:[{}],client_user: [{}]", clientSn, clientUser);
addClientConnection(responseObserver, clientSn, clientUser);
String clientKey = addClientConnection(responseObserver, clientSn, clientUser);
responseObserver.onNext(NotificationMessage.newBuilder()
@ -85,12 +85,11 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
final io.grpc.Context context = io.grpc.Context.current();
// 注册一个取消监听器来监听客户端断开连接
String finalClientUser = clientUser;
context.addListener(context1 -> {
// 客户端断开连接时的处理逻辑
log.info("通过Context监听到客户端断开连接: {}, 用户: {}", clientSn, finalClientUser);
removeClientConnection(clientSn, finalClientUser);
log.info("清理客户端连接: {}#{}, 剩余客户端数: {}", clientSn, finalClientUser, clientStreamMap.size());
log.info("通过Context监听到客户端断开连接: {}", clientKey);
removeClientConnection(clientSn, clientKey);
log.info("清理客户端连接: {}, 剩余客户端数: {}", clientKey, clientStreamMap.size());
}, executorService);
}
@ -114,8 +113,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
return remove;
}
private StreamObserver<NotificationMessage> removeClientConnection(String clientId, String clientUser) {
String clientKey = generateClientKey(clientId, clientUser);
private StreamObserver<NotificationMessage> removeClientConnection(String clientId, String clientKey) {
StreamObserver<NotificationMessage> remove = clientStreamMap.remove(clientKey);
//更新设备状态
LambdaUpdateWrapper<Device> updateWrapper = new LambdaUpdateWrapper<>();
@ -126,7 +124,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
return remove;
}
private void addClientConnection(StreamObserver<NotificationMessage> responseObserver, String clientId, String clientUser) {
private String addClientConnection(StreamObserver<NotificationMessage> responseObserver, String clientId, String clientUser) {
if (clientStreamMap.size() >= MAX_CLIENT_COUNT) {
NotificationMessage errMsg = NotificationMessage.newBuilder()
.setCode(HttpStatus.TOO_MANY_REQUESTS.value())
@ -134,7 +132,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
.build();
responseObserver.onNext(errMsg);
responseObserver.onCompleted();
return;
return null;
}
String clientKey = generateClientKey(clientId, clientUser);
@ -164,6 +162,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
updateWrapper.set(Device::getDeviceStatus, 1);
updateWrapper.eq(Device::getDeviceId, clientId);
deviceService.update(updateWrapper);
return clientKey;
}
@Override
@ -227,7 +226,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
found = true;
} catch (Exception e) {
log.error("发送消息失败,错误信息:{},错误详情:{}", e.getMessage(), e.getStackTrace());
removeClientConnection(clientId, key.split("#")[1]);
removeClientConnection(clientId, key);
}
}
}
@ -238,29 +237,7 @@ public class ClientNotificationServiceImpl extends NotificationServiceGrpc.Notif
}
return Result.successResult();
}
/**
*
* @param clientId ID
* @param clientUser
* @param notification
* @return
*/
public Result<?> sendNotification(String clientId, String clientUser, NotificationMessage notification) {
String clientKey = generateClientKey(clientId, clientUser);
StreamObserver<NotificationMessage> notificationStream = clientStreamMap.get(clientKey);
if (notificationStream == null) {
return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "未找到对应的客户端连接");
}
try {
notificationStream.onNext(notification);
} catch (Exception e) {
log.error("发送消息失败,错误信息:{},错误详情:{}", e.getMessage(), e.getStackTrace());
removeClientConnection(clientId, clientUser);
return Result.errorResult(BaseErrorCode.CONNECTION_FAILURE, "对应的客户端连接失效");
}
return Result.successResult();
}
}

View File

@ -69,7 +69,7 @@ public class DeviceImageMappingServiceImpl extends ServiceImpl<DeviceImageMappin
if (CollectionUtil.isNotEmpty(images)) {
deviceImageMappingRes.forEach(deviceImage -> images.forEach(image -> {
if (ObjectUtil.isNotEmpty(deviceImage.getImageId()) && image.getId().equals(deviceImage.getImageId())) {
deviceImage.setImageName(image.getDesktopName());
deviceImage.setImageName(image.getCloneName());
deviceImage.setImageFileName(image.getDesktopName()+"."+image.getDesktopType());
deviceImage.setCreateTime(image.getCreateTime());
// 设置父镜像名称

View File

@ -54,7 +54,7 @@ public class ImageDesktopServiceImpl extends ServiceImpl<ImageDesktopMapper, Ima
public Result<PageResult<ImageDesktopRes>> selectPage(ImageDesktopReq imageDesktopReq) {
Page<ImageDesktop> page = new Page<>(imageDesktopReq.getPageNum(), imageDesktopReq.getPageSize());
LambdaQueryWrapper<ImageDesktop> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.like(StringUtils.isNotBlank(imageDesktopReq.getDesktopName()), ImageDesktop::getDesktopName, imageDesktopReq.getDesktopName());
queryWrapper.like(StringUtils.isNotBlank(imageDesktopReq.getCloneName()), ImageDesktop::getCloneName, imageDesktopReq.getCloneName());
queryWrapper.orderByAsc(ImageDesktop::getId);
Page<ImageDesktop> imageDesktopPage = mapper.selectPage(page, queryWrapper);
log.info("分页查询桌面镜像返回:{}", JSONUtil.toJsonStr(imageDesktopPage));
@ -130,7 +130,7 @@ public class ImageDesktopServiceImpl extends ServiceImpl<ImageDesktopMapper, Ima
ImageDesktop imageDesktop = mapper.selectById(deleteIdReq.getId());
boolean b = externalTorrentClient.deleteFile(imageDesktop.getStoragePath());
if (!b) {
return Result.errorResult(BaseErrorCode.HTTP_ERROR_CODE_500, "删除文件失败");
return Result.errorResultMessage(BaseErrorCode.HTTP_ERROR_CODE_500, "删除文件失败");
}
int deleted = mapper.deleteById(deleteIdReq.getId());
log.info("桌面镜像删除:{}", deleted);

View File

@ -253,13 +253,13 @@ public class ImageVirtualMachinesServiceImpl extends ServiceImpl<ImageVirtualMac
ImageVirtualMachines imageVirtualMachines = machinesMapper.selectById(deleteIdReq.getId());
if (ObjectUtils.isEmpty(imageVirtualMachines)) {
log.info("查询镜像虚拟机返回为空");
return Result.errorResult(BaseErrorCode.HTTP_ERROR_CODE_500, "虚拟机不存在");
return Result.errorResultMessage(BaseErrorCode.HTTP_ERROR_CODE_500, "虚拟机不存在");
}
LambdaQueryWrapper<ImageDesktop> desktopLambdaQueryWrapper=new LambdaQueryWrapper<>();
desktopLambdaQueryWrapper.eq(ImageDesktop::getImageVirtualId, imageVirtualMachines.getId());
int count = imageDesktopService.count(desktopLambdaQueryWrapper);
if (count>0){
return Result.errorResult(BaseErrorCode.HTTP_ERROR_CODE_500, "该镜像有对应的克隆桌面,不允许删除");
return Result.errorResultMessage(BaseErrorCode.HTTP_ERROR_CODE_500, "该镜像有对应的克隆桌面,不允许删除");
}
// 调用镜像删除服务
@ -269,7 +269,7 @@ public class ImageVirtualMachinesServiceImpl extends ServiceImpl<ImageVirtualMac
.build();
ApiResponse<ImageStatusRes> response = externalApiClient.deleteImage(deleteReq);
if (response == null || !"200".equals(response.getCode())) {
return Result.errorResult(BaseErrorCode.HTTP_REQUEST_FAILURE, "删除虚拟机失败");
return Result.errorResultMessage(BaseErrorCode.HTTP_REQUEST_FAILURE, "删除虚拟机失败");
}
// 删除数据库记录
@ -290,14 +290,14 @@ public class ImageVirtualMachinesServiceImpl extends ServiceImpl<ImageVirtualMac
ImageVirtualMachines imageVirtualMachines = getOne(queryWrapper);
if (ObjectUtils.isEmpty(imageVirtualMachines)) {
log.info("查询镜像虚拟机返回为空");
return Result.errorResult(BaseErrorCode.HTTP_ERROR_CODE_500, "虚拟机不存在");
return Result.errorResultMessage(BaseErrorCode.HTTP_ERROR_CODE_500, "虚拟机不存在");
}
// 调用远程接口获取虚拟机详细信息
ApiResponse<VmInfoDTO> vmInfoResponse = externalApiClient.getVmInfo(imageVirtualMachines.getImageName());
if (vmInfoResponse == null || !"200".equals(vmInfoResponse.getCode()) || vmInfoResponse.getData() == null) {
log.error("获取虚拟机信息失败: {}", vmInfoResponse);
return Result.errorResult(BaseErrorCode.HTTP_REQUEST_FAILURE, "获取虚拟机信息失败");
return Result.errorResultMessage(BaseErrorCode.HTTP_REQUEST_FAILURE, "获取虚拟机信息失败");
}
// 从返回数据中提取需要的信息
@ -316,12 +316,13 @@ public class ImageVirtualMachinesServiceImpl extends ServiceImpl<ImageVirtualMac
imageVirtualMachines.getImageName() + ".json", type);
if (!response) {
return Result.errorResult(BaseErrorCode.HTTP_REQUEST_FAILURE, "克隆虚拟机到桌面镜像失败");
return Result.errorResultMessage(BaseErrorCode.HTTP_REQUEST_FAILURE, "克隆虚拟机到桌面镜像失败");
}
// 克隆成功后生成桌面镜像数据
ImageDesktop imageDesktop = new ImageDesktop();
imageDesktop.setDesktopName(fileName);
imageDesktop.setCloneName(req.getCloneName());
imageDesktop.setDesktopType(type);
//制作中
imageDesktop.setPublishStatus(0);
@ -335,7 +336,7 @@ public class ImageVirtualMachinesServiceImpl extends ServiceImpl<ImageVirtualMac
boolean saved = imageDesktopService.save(imageDesktop);
if (!saved) {
log.error("保存桌面镜像数据失败");
return Result.errorResult(BaseErrorCode.HTTP_ERROR_CODE_500, "保存桌面镜像数据失败");
return Result.errorResultMessage(BaseErrorCode.HTTP_ERROR_CODE_500, "保存桌面镜像数据失败");
}
// 启动定时任务来检查进度