37 lines
823 B
Python
37 lines
823 B
Python
#!/usr/bin/env python3
|
||
"""
|
||
测试worker线程是否正常工作
|
||
"""
|
||
import sys
|
||
import os
|
||
import time
|
||
import threading
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from app.services.async_meeting_service import AsyncMeetingService
|
||
|
||
# 创建服务实例
|
||
service = AsyncMeetingService()
|
||
|
||
# 直接调用处理任务方法测试
|
||
print("测试直接调用_process_tasks方法...")
|
||
|
||
# 设置worker_running为True
|
||
service.worker_running = True
|
||
|
||
# 创建线程并启动
|
||
thread = threading.Thread(target=service._process_tasks)
|
||
thread.daemon = False # 不设置为daemon,确保能看到输出
|
||
thread.start()
|
||
|
||
print(f"线程是否活动: {thread.is_alive()}")
|
||
print("等待5秒...")
|
||
|
||
# 等待一段时间
|
||
time.sleep(5)
|
||
|
||
# 停止worker
|
||
service.worker_running = False
|
||
thread.join(timeout=10)
|
||
|
||
print("测试完成") |