vdi/pc-fe/src/main/grpc/MockBTService.ts

323 lines
10 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// 本地测试用的 gRPC 服务器,用于模拟 BT 下载
// src/grpc/MockBTService.ts
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import path from 'path';
// 进度更新接口
interface ProgressUpdate {
download_id: string;
progress: number;
download_speed: number;
item_name?: string; // 可选的文件名字段
upload_speed: number;
eta: number;
total_size: number;
downloaded_size: number;
state: string;
}
// 模拟的下载任务
interface MockDownloadTask {
id: string;
itemName: string;
torrentUrl: string;
progress: number;
totalSize: number;
downloadedSize: number;
downloadSpeed: number;
state: 'downloading' | 'completed' | 'error' | 'paused';
startTime: number;
}
export class MockBTService {
private server: grpc.Server;
private activeDownloads: Map<string, MockDownloadTask> = new Map();
private progressIntervals: Map<string, NodeJS.Timeout> = new Map();
// 存储所有的进度回调函数,发送信息
private progressCallbacks: ((progress: ProgressUpdate) => void)[] = [];
constructor() {
this.server = new grpc.Server();
this.setupService();
}
private setupService() {
// 设置gRPC服务,加载 bittorrent.proto 定义,实现所有gRPC服务方法
const PROTO_PATH = path.join(__dirname, 'protos', 'bittorrent.proto');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
});
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
const bittorrent = protoDescriptor.bittorrent as any;
// 实现 gRPC 服务方法
this.server.addService(bittorrent.BTDownloadService.service, {
StartDownload: this.startDownload.bind(this),
StopDownload: this.stopDownload.bind(this),
GetDownloadStatus: this.getDownloadStatus.bind(this),
ListDownloads: this.listDownloads.bind(this),
SubscribeProgress: this.subscribeProgress.bind(this),
});
}
// 开始下载
// 在 startDownload 方法中,可以添加根据文件名设置不同大小的逻辑
private startDownload(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
const { torrent_url, item_name, item_id } = call.request;
const downloadId = item_id || `download-${Date.now()}`;
console.log(`模拟开始下载: ${item_name}, ID: ${downloadId}`);
// 根据文件名或ID设置不同大小的文件
let fileSize = 1024 * 1024 * 100; // 默认100MB
if (item_name.includes('large') || item_name.includes('大文件')) {
fileSize = 4 * 1024 * 1024 * 1024; // 4GB
} else if (item_name.includes('medium') || item_name.includes('中等')) {
fileSize = 1024 * 1024 * 1024; // 1GB
} else if (item_name.includes('small') || item_name.includes('小文件')) {
fileSize = 100 * 1024 * 1024; // 100MB
}
// 创建模拟下载任务
const task: MockDownloadTask = {
id: downloadId,
itemName: item_name,
torrentUrl: torrent_url,
progress: 0,
totalSize: fileSize,
downloadedSize: 0,
downloadSpeed: 1024 * 1024 * 2, // 2MB/s
state: 'downloading',
startTime: Date.now(),
};
this.activeDownloads.set(downloadId, task);
console.log(`已创建下载任务: ${downloadId}, 大小: ${fileSize} bytes`);
// 启动进度模拟
this.startProgressSimulation(downloadId);
callback(null, {
success: true,
message: '下载已开始',
download_id: downloadId,
});
}
// 停止下载
private stopDownload(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
const { download_id } = call.request;
if (this.activeDownloads.has(download_id)) {
const task = this.activeDownloads.get(download_id)!;
task.state = 'paused';
// 清除进度定时器
if (this.progressIntervals.has(download_id)) {
clearInterval(this.progressIntervals.get(download_id));
this.progressIntervals.delete(download_id);
}
console.log(`模拟停止下载: ${download_id}`);
callback(null, { success: true, message: '下载已停止' });
} else {
callback({
code: grpc.status.NOT_FOUND,
message: `下载任务不存在: ${download_id}`
});
}
}
// 获取下载状态
private getDownloadStatus(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
const { download_id } = call.request;
if (this.activeDownloads.has(download_id)) {
const task = this.activeDownloads.get(download_id)!;
callback(null, {
download_id: task.id,
progress: task.progress,
download_speed: task.downloadSpeed,
state: task.state,
total_size: task.totalSize,
downloaded_size: task.downloadedSize,
});
} else {
callback({
code: grpc.status.NOT_FOUND,
message: `下载任务不存在: ${download_id}`
});
}
}
// 列出所有下载
private listDownloads(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
const downloads = Array.from(this.activeDownloads.values()).map(task => ({
download_id: task.id,
item_name: task.itemName,
progress: task.progress,
download_speed: task.downloadSpeed,
state: task.state,
total_size: task.totalSize,
downloaded_size: task.downloadedSize,
}));
callback(null, { downloads });
}
// 订阅进度更新(流式响应)
private subscribeProgress(call: grpc.ServerWritableStream<any, any>) {
console.log('客户端订阅了进度更新');
// 存储回调以便发送进度更新
const callback = (progress: ProgressUpdate) => {
try {
call.write(progress);
} catch (error) {
console.error('发送进度更新失败:', error);
}
};
this.progressCallbacks.push(callback);
// 当客户端断开连接时清理
call.on('cancelled', () => {
console.log('客户端取消了进度订阅');
const index = this.progressCallbacks.indexOf(callback);
if (index > -1) {
this.progressCallbacks.splice(index, 1);
}
});
call.on('error', (error) => {
console.error('进度流错误:', error);
const index = this.progressCallbacks.indexOf(callback);
if (index > -1) {
this.progressCallbacks.splice(index, 1);
}
});
// 处理客户端断开连接
call.on('end', () => {
console.log('客户端断开连接');
const index = this.progressCallbacks.indexOf(callback);
if (index > -1) {
this.progressCallbacks.splice(index, 1);
}
});
}
// 启动进度模拟
// 在 MockBTService.ts 中修改 startProgressSimulation 方法
private startProgressSimulation(downloadId: string) {
const interval = setInterval(() => {
if (this.activeDownloads.has(downloadId)) {
const task = this.activeDownloads.get(downloadId)!;
if (task.state === 'downloading' && task.progress < 100) {
// // 更新进度但确保不超过100%
// task.progress += Math.random() * 2;
// 使用固定的进度增加而不是随机值,使下载更加稳定
task.progress += 1.5; // 每秒增加1.5%的进度
// 确保进度不超过100%
if (task.progress >= 100) {
task.progress = 100;
task.state = 'completed';
task.downloadedSize = task.totalSize; // 确保已完成时已下载大小等于总大小
console.log(`下载完成: ${downloadId}`);
// 清除定时器
clearInterval(interval);
this.progressIntervals.delete(downloadId);
} else {
// 根据进度计算已下载大小
task.downloadedSize = (task.totalSize * task.progress) / 100;
}
// 计算剩余时间(秒)
let eta = 0;
if (task.progress < 100) {
// 基于当前速度计算剩余时间
const progressPerSecond = 1.5; // 每秒进度百分比
eta = Math.round(((100 - task.progress) / progressPerSecond));
}
// 发送进度更新
this.sendProgressUpdate({
download_id: task.id,
item_name: task.itemName,
progress: task.progress,
download_speed: task.downloadSpeed,
upload_speed: 1024 * 512, // 512KB/s 上传速度
eta: task.progress >= 100 ? 0 : Math.round(((100 - task.progress) / 2) * 1000),
total_size: task.totalSize,
downloaded_size: task.downloadedSize,
state: task.state,
});
}
} else {
clearInterval(interval);
this.progressIntervals.delete(downloadId);
}
}, 1000); // 每秒更新一次进度
this.progressIntervals.set(downloadId, interval);
}
// 发送进度更新给所有订阅者
private sendProgressUpdate(progress: ProgressUpdate) {
console.log('发送进度更新:', progress);
// 通过已注册的回调函数传递给 BTGrpcClient
this.progressCallbacks.forEach((callback, index) => {
try {
callback(progress);
} catch (error) {
console.error(`发送进度更新给回调 ${index} 失败:`, error);
}
});
}
// 绑定端口并启动服务器
start(port: number = 50051): Promise<void> {
return new Promise((resolve, reject) => {
this.server.bindAsync(
`0.0.0.0:${port}`,
grpc.ServerCredentials.createInsecure(),
(error, port) => {
if (error) {
reject(error);
} else {
this.server.start();
console.log(`Mock gRPC 服务器运行在端口 ${port}`);
resolve();
}
}
);
});
}
// 停止服务器
stop(): Promise<void> {
return new Promise((resolve) => {
// 清理所有定时器
this.progressIntervals.forEach(interval => clearInterval(interval));
this.progressIntervals.clear();
this.activeDownloads.clear();
this.progressCallbacks = [];
this.server.tryShutdown(() => {
console.log('Mock gRPC 服务器已停止');
resolve();
});
});
}
}