imetting_backend/test/test_worker_thread.py

37 lines
823 B
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
测试worker线程是否正常工作
"""
import sys
import os
import time
import threading
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.services.async_meeting_service import AsyncMeetingService
# 创建服务实例
service = AsyncMeetingService()
# 直接调用处理任务方法测试
print("测试直接调用_process_tasks方法...")
# 设置worker_running为True
service.worker_running = True
# 创建线程并启动
thread = threading.Thread(target=service._process_tasks)
thread.daemon = False # 不设置为daemon确保能看到输出
thread.start()
print(f"线程是否活动: {thread.is_alive()}")
print("等待5秒...")
# 等待一段时间
time.sleep(5)
# 停止worker
service.worker_running = False
thread.join(timeout=10)
print("测试完成")