#!/usr/bin/env python3 """ JWT Token 过期测试脚本 用于测试JWT token的过期、撤销机制,可以模拟指定用户的token失效 运行方法: cd /Users/jiliu/工作/projects/imeeting/backend source venv/bin/activate # 激活虚拟环境 python test/test_token_expiration.py 功能: 1. 登录指定用户并获取token 2. 验证token有效性 3. 撤销指定用户的所有token(模拟失效) 4. 验证撤销后token失效 期望结果:在网页上登录的用户执行失效命令后,网页会自动登出 """ import sys import os import requests import json import time from datetime import datetime # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) BASE_URL = "http://127.0.0.1:8000" # 禁用代理以避免本地请求被代理 PROXIES = {'http': None, 'https': None} def invalidate_user_tokens(): """模拟指定用户的token失效""" print("模拟用户Token失效工具") print("=" * 40) # 获取要失效的用户名 target_username = input("请输入要失效token的用户名 (默认: mula): ").strip() if not target_username: target_username = "mula" # 获取管理员凭据来执行失效操作 admin_username = input("请输入管理员用户名 (默认: mula): ").strip() admin_password = input("请输入管理员密码 (默认: 781126): ").strip() if not admin_username: admin_username = "mula" if not admin_password: admin_password = "781126" try: # 1. 管理员登录获取token print(f"\n步骤1: 管理员登录 ({admin_username})") admin_login_data = { "username": admin_username, "password": admin_password } response = requests.post(f"{BASE_URL}/api/auth/login", json=admin_login_data, proxies=PROXIES) if response.status_code != 200: print(f"❌ 管理员登录失败") print(f"状态码: {response.status_code}") print(f"响应内容: {response.text}") return admin_data = response.json() admin_token = admin_data["token"] admin_headers = {"Authorization": f"Bearer {admin_token}"} print(f"✅ 管理员登录成功: {admin_data['username']} ({admin_data['caption']})") # 2. 如果目标用户不是管理员,先登录目标用户验证token存在 if target_username != admin_username: print(f"\n步骤2: 验证目标用户 ({target_username}) 是否存在") target_password = input(f"请输入 {target_username} 的密码 (用于验证): ").strip() if not target_password: print("❌ 需要提供目标用户的密码来验证") return target_login_data = { "username": target_username, "password": target_password } response = requests.post(f"{BASE_URL}/api/auth/login", json=target_login_data, proxies=PROXIES) if response.status_code != 200: print(f"❌ 目标用户登录失败,无法验证用户存在") return target_data = response.json() print(f"✅ 目标用户验证成功: {target_data['username']} ({target_data['caption']})") target_user_id = target_data['user_id'] else: target_user_id = admin_data['user_id'] # 3. 撤销目标用户的所有token print(f"\n步骤3: 撤销用户 {target_username} (ID: {target_user_id}) 的所有token") # 使用管理员权限调用新的admin API response = requests.post(f"{BASE_URL}/api/auth/admin/revoke-user-tokens/{target_user_id}", headers=admin_headers, proxies=PROXIES) if response.status_code == 200: result = response.json() print(f"✅ Token撤销成功: {result.get('message', '已撤销所有token')}") # 4. 验证token是否真的失效了 print(f"\n步骤4: 验证token失效") if target_username != admin_username: # 尝试使用目标用户的token访问protected API target_token = target_data["token"] target_headers = {"Authorization": f"Bearer {target_token}"} response = requests.get(f"{BASE_URL}/api/auth/me", headers=target_headers, proxies=PROXIES) if response.status_code == 401: print(f"✅ 验证成功:用户 {target_username} 的token已失效") else: print(f"❌ 验证失败:用户 {target_username} 的token仍然有效") else: # 如果目标用户就是管理员,验证当前管理员token是否失效 response = requests.get(f"{BASE_URL}/api/auth/me", headers=admin_headers, proxies=PROXIES) if response.status_code == 401: print(f"✅ 验证成功:用户 {target_username} 的token已失效") else: print(f"❌ 验证失败:用户 {target_username} 的token仍然有效") print(f"\n🌟 操作完成!") print(f"如果用户 {target_username} 在网页上已登录,现在应该会自动登出。") print(f"你可以在网页上验证是否自动跳转到登录页面。") else: print(f"❌ Token撤销失败: {response.status_code}") print(f"响应内容: {response.text}") except requests.exceptions.ConnectionError: print("❌ 无法连接到后端服务器,请确保服务器正在运行") except Exception as e: print(f"❌ 操作失败: {e}") def test_token_expiration(): """测试token过期机制""" print("JWT Token 过期测试") print("=" * 40) # 1. 登录获取token username = input("请输入用户名 (默认: test): ").strip() password = input("请输入密码 (默认: test): ").strip() # 使用默认值如果输入为空 if not username: username = "test" if not password: password = "test" login_data = { "username": username, "password": password } try: # 登录 print(f"正在尝试登录用户: {login_data['username']}") response = requests.post(f"{BASE_URL}/api/auth/login", json=login_data, proxies=PROXIES) if response.status_code != 200: print(f"❌ 登录失败") print(f"状态码: {response.status_code}") print(f"响应内容: {response.text}") print(f"请求URL: {BASE_URL}/api/auth/login") print("请检查:") print("1. 后端服务是否正在运行") print("2. 用户名和密码是否正确") print("3. 数据库连接是否正常") return user_data = response.json() token = user_data["token"] print(f"✅ 登录成功,获得token: {token[:20]}...") # 2. 测试token有效性 headers = {"Authorization": f"Bearer {token}"} print("\n测试1: 验证token有效性") response = requests.get(f"{BASE_URL}/api/auth/me", headers=headers, proxies=PROXIES) if response.status_code == 200: user_info = response.json() print(f"✅ Token有效,用户: {user_info.get('username')}") else: print(f"❌ Token无效: {response.status_code}") return # 3. 测试受保护的API print("\n测试2: 访问受保护的API") response = requests.get(f"{BASE_URL}/api/meetings", headers=headers, proxies=PROXIES) if response.status_code == 200: print("✅ 成功访问会议列表API") else: print(f"❌ 访问受保护API失败: {response.status_code}") # 4. 登出token print("\n测试3: 登出token") response = requests.post(f"{BASE_URL}/api/auth/logout", headers=headers, proxies=PROXIES) if response.status_code == 200: print("✅ 登出成功") # 5. 验证登出后token失效 print("\n测试4: 验证登出后token失效") response = requests.get(f"{BASE_URL}/api/auth/me", headers=headers, proxies=PROXIES) if response.status_code == 401: print("✅ Token已失效,登出成功") else: print(f"❌ Token仍然有效,登出失败: {response.status_code}") else: print(f"❌ 登出失败: {response.status_code}") except requests.exceptions.ConnectionError: print("❌ 无法连接到后端服务器,请确保服务器正在运行") except Exception as e: print(f"❌ 测试失败: {e}") def check_token_format(): """检查token格式是否为JWT""" token = input("\n请粘贴JWT token (或按Enter跳过): ").strip() if not token: return print(f"\nJWT格式检查:") # JWT应该有三个部分,用.分隔 parts = token.split('.') if len(parts) != 3: print("❌ 不是有效的JWT格式 (应该有3个部分用.分隔)") return try: import base64 import json # 解码header header_padding = parts[0] + '=' * (4 - len(parts[0]) % 4) header = json.loads(base64.urlsafe_b64decode(header_padding)) # 解码payload payload_padding = parts[1] + '=' * (4 - len(parts[1]) % 4) payload = json.loads(base64.urlsafe_b64decode(payload_padding)) print("✅ JWT格式有效") print(f"算法: {header.get('alg')}") print(f"类型: {header.get('typ')}") print(f"用户ID: {payload.get('user_id')}") print(f"用户名: {payload.get('username')}") if 'exp' in payload: exp_time = datetime.fromtimestamp(payload['exp']) print(f"过期时间: {exp_time}") if datetime.now() > exp_time: print("❌ Token已过期") else: print("✅ Token未过期") except Exception as e: print(f"❌ JWT解码失败: {e}") if __name__ == "__main__": print("JWT Token 测试工具") print("=" * 50) print(f"工作目录: {os.getcwd()}") print(f"测试脚本路径: {__file__}") print() print("请选择功能:") print("1. 模拟指定用户Token失效 (推荐)") print("2. 完整Token过期测试") print("3. JWT格式检查") choice = input("\n请输入选项 (1-3, 默认: 1): ").strip() if choice == "2": test_token_expiration() check_token_format() elif choice == "3": check_token_format() else: # 默认选择1 invalidate_user_tokens() print("\n=== 测试完成 ===") print("如果测试失败,请检查:") print("1. 确保后端服务正在运行: python main.py") print("2. 确保在 backend 目录下运行测试") print("3. 确保Redis服务正在运行") print("4. 如果选择了选项1,请在网页上验证用户是否自动登出")