296 lines
11 KiB
Python
296 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
JWT Token 过期测试脚本
|
||
用于测试JWT token的过期、撤销机制,可以模拟指定用户的token失效
|
||
|
||
运行方法:
|
||
cd /Users/jiliu/工作/projects/imeeting/backend
|
||
source venv/bin/activate # 激活虚拟环境
|
||
python test/test_token_expiration.py
|
||
|
||
功能:
|
||
1. 登录指定用户并获取token
|
||
2. 验证token有效性
|
||
3. 撤销指定用户的所有token(模拟失效)
|
||
4. 验证撤销后token失效
|
||
|
||
期望结果:在网页上登录的用户执行失效命令后,网页会自动登出
|
||
"""
|
||
import sys
|
||
import os
|
||
import requests
|
||
import json
|
||
import time
|
||
from datetime import datetime
|
||
|
||
# 添加项目根目录到Python路径
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
BASE_URL = "http://127.0.0.1:8000"
|
||
|
||
# 禁用代理以避免本地请求被代理
|
||
PROXIES = {'http': None, 'https': None}
|
||
|
||
def invalidate_user_tokens():
|
||
"""模拟指定用户的token失效"""
|
||
print("模拟用户Token失效工具")
|
||
print("=" * 40)
|
||
|
||
# 获取要失效的用户名
|
||
target_username = input("请输入要失效token的用户名 (默认: mula): ").strip()
|
||
if not target_username:
|
||
target_username = "mula"
|
||
|
||
# 获取管理员凭据来执行失效操作
|
||
admin_username = input("请输入管理员用户名 (默认: mula): ").strip()
|
||
admin_password = input("请输入管理员密码 (默认: 781126): ").strip()
|
||
|
||
if not admin_username:
|
||
admin_username = "mula"
|
||
if not admin_password:
|
||
admin_password = "781126"
|
||
|
||
try:
|
||
# 1. 管理员登录获取token
|
||
print(f"\n步骤1: 管理员登录 ({admin_username})")
|
||
admin_login_data = {
|
||
"username": admin_username,
|
||
"password": admin_password
|
||
}
|
||
|
||
response = requests.post(f"{BASE_URL}/api/auth/login", json=admin_login_data, proxies=PROXIES)
|
||
if response.status_code != 200:
|
||
print(f"❌ 管理员登录失败")
|
||
print(f"状态码: {response.status_code}")
|
||
print(f"响应内容: {response.text}")
|
||
return
|
||
|
||
admin_data = response.json()
|
||
admin_token = admin_data["token"]
|
||
admin_headers = {"Authorization": f"Bearer {admin_token}"}
|
||
|
||
print(f"✅ 管理员登录成功: {admin_data['username']} ({admin_data['caption']})")
|
||
|
||
# 2. 如果目标用户不是管理员,先登录目标用户验证token存在
|
||
if target_username != admin_username:
|
||
print(f"\n步骤2: 验证目标用户 ({target_username}) 是否存在")
|
||
target_password = input(f"请输入 {target_username} 的密码 (用于验证): ").strip()
|
||
if not target_password:
|
||
print("❌ 需要提供目标用户的密码来验证")
|
||
return
|
||
|
||
target_login_data = {
|
||
"username": target_username,
|
||
"password": target_password
|
||
}
|
||
|
||
response = requests.post(f"{BASE_URL}/api/auth/login", json=target_login_data, proxies=PROXIES)
|
||
if response.status_code != 200:
|
||
print(f"❌ 目标用户登录失败,无法验证用户存在")
|
||
return
|
||
|
||
target_data = response.json()
|
||
print(f"✅ 目标用户验证成功: {target_data['username']} ({target_data['caption']})")
|
||
target_user_id = target_data['user_id']
|
||
else:
|
||
target_user_id = admin_data['user_id']
|
||
|
||
# 3. 撤销目标用户的所有token
|
||
print(f"\n步骤3: 撤销用户 {target_username} (ID: {target_user_id}) 的所有token")
|
||
|
||
# 使用管理员权限调用新的admin API
|
||
response = requests.post(f"{BASE_URL}/api/auth/admin/revoke-user-tokens/{target_user_id}",
|
||
headers=admin_headers, proxies=PROXIES)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
print(f"✅ Token撤销成功: {result.get('message', '已撤销所有token')}")
|
||
|
||
# 4. 验证token是否真的失效了
|
||
print(f"\n步骤4: 验证token失效")
|
||
if target_username != admin_username:
|
||
# 尝试使用目标用户的token访问protected API
|
||
target_token = target_data["token"]
|
||
target_headers = {"Authorization": f"Bearer {target_token}"}
|
||
|
||
response = requests.get(f"{BASE_URL}/api/auth/me", headers=target_headers, proxies=PROXIES)
|
||
if response.status_code == 401:
|
||
print(f"✅ 验证成功:用户 {target_username} 的token已失效")
|
||
else:
|
||
print(f"❌ 验证失败:用户 {target_username} 的token仍然有效")
|
||
else:
|
||
# 如果目标用户就是管理员,验证当前管理员token是否失效
|
||
response = requests.get(f"{BASE_URL}/api/auth/me", headers=admin_headers, proxies=PROXIES)
|
||
if response.status_code == 401:
|
||
print(f"✅ 验证成功:用户 {target_username} 的token已失效")
|
||
else:
|
||
print(f"❌ 验证失败:用户 {target_username} 的token仍然有效")
|
||
|
||
print(f"\n🌟 操作完成!")
|
||
print(f"如果用户 {target_username} 在网页上已登录,现在应该会自动登出。")
|
||
print(f"你可以在网页上验证是否自动跳转到登录页面。")
|
||
|
||
else:
|
||
print(f"❌ Token撤销失败: {response.status_code}")
|
||
print(f"响应内容: {response.text}")
|
||
|
||
except requests.exceptions.ConnectionError:
|
||
print("❌ 无法连接到后端服务器,请确保服务器正在运行")
|
||
except Exception as e:
|
||
print(f"❌ 操作失败: {e}")
|
||
|
||
def test_token_expiration():
|
||
"""测试token过期机制"""
|
||
print("JWT Token 过期测试")
|
||
print("=" * 40)
|
||
|
||
# 1. 登录获取token
|
||
username = input("请输入用户名 (默认: test): ").strip()
|
||
password = input("请输入密码 (默认: test): ").strip()
|
||
|
||
# 使用默认值如果输入为空
|
||
if not username:
|
||
username = "test"
|
||
if not password:
|
||
password = "test"
|
||
|
||
login_data = {
|
||
"username": username,
|
||
"password": password
|
||
}
|
||
|
||
try:
|
||
# 登录
|
||
print(f"正在尝试登录用户: {login_data['username']}")
|
||
response = requests.post(f"{BASE_URL}/api/auth/login", json=login_data, proxies=PROXIES)
|
||
if response.status_code != 200:
|
||
print(f"❌ 登录失败")
|
||
print(f"状态码: {response.status_code}")
|
||
print(f"响应内容: {response.text}")
|
||
print(f"请求URL: {BASE_URL}/api/auth/login")
|
||
print("请检查:")
|
||
print("1. 后端服务是否正在运行")
|
||
print("2. 用户名和密码是否正确")
|
||
print("3. 数据库连接是否正常")
|
||
return
|
||
|
||
user_data = response.json()
|
||
token = user_data["token"]
|
||
|
||
print(f"✅ 登录成功,获得token: {token[:20]}...")
|
||
|
||
# 2. 测试token有效性
|
||
headers = {"Authorization": f"Bearer {token}"}
|
||
|
||
print("\n测试1: 验证token有效性")
|
||
response = requests.get(f"{BASE_URL}/api/auth/me", headers=headers, proxies=PROXIES)
|
||
if response.status_code == 200:
|
||
user_info = response.json()
|
||
print(f"✅ Token有效,用户: {user_info.get('username')}")
|
||
else:
|
||
print(f"❌ Token无效: {response.status_code}")
|
||
return
|
||
|
||
# 3. 测试受保护的API
|
||
print("\n测试2: 访问受保护的API")
|
||
response = requests.get(f"{BASE_URL}/api/meetings", headers=headers, proxies=PROXIES)
|
||
if response.status_code == 200:
|
||
print("✅ 成功访问会议列表API")
|
||
else:
|
||
print(f"❌ 访问受保护API失败: {response.status_code}")
|
||
|
||
# 4. 登出token
|
||
print("\n测试3: 登出token")
|
||
response = requests.post(f"{BASE_URL}/api/auth/logout", headers=headers, proxies=PROXIES)
|
||
if response.status_code == 200:
|
||
print("✅ 登出成功")
|
||
|
||
# 5. 验证登出后token失效
|
||
print("\n测试4: 验证登出后token失效")
|
||
response = requests.get(f"{BASE_URL}/api/auth/me", headers=headers, proxies=PROXIES)
|
||
if response.status_code == 401:
|
||
print("✅ Token已失效,登出成功")
|
||
else:
|
||
print(f"❌ Token仍然有效,登出失败: {response.status_code}")
|
||
else:
|
||
print(f"❌ 登出失败: {response.status_code}")
|
||
|
||
except requests.exceptions.ConnectionError:
|
||
print("❌ 无法连接到后端服务器,请确保服务器正在运行")
|
||
except Exception as e:
|
||
print(f"❌ 测试失败: {e}")
|
||
|
||
def check_token_format():
|
||
"""检查token格式是否为JWT"""
|
||
token = input("\n请粘贴JWT token (或按Enter跳过): ").strip()
|
||
|
||
if not token:
|
||
return
|
||
|
||
print(f"\nJWT格式检查:")
|
||
|
||
# JWT应该有三个部分,用.分隔
|
||
parts = token.split('.')
|
||
if len(parts) != 3:
|
||
print("❌ 不是有效的JWT格式 (应该有3个部分用.分隔)")
|
||
return
|
||
|
||
try:
|
||
import base64
|
||
import json
|
||
|
||
# 解码header
|
||
header_padding = parts[0] + '=' * (4 - len(parts[0]) % 4)
|
||
header = json.loads(base64.urlsafe_b64decode(header_padding))
|
||
|
||
# 解码payload
|
||
payload_padding = parts[1] + '=' * (4 - len(parts[1]) % 4)
|
||
payload = json.loads(base64.urlsafe_b64decode(payload_padding))
|
||
|
||
print("✅ JWT格式有效")
|
||
print(f"算法: {header.get('alg')}")
|
||
print(f"类型: {header.get('typ')}")
|
||
print(f"用户ID: {payload.get('user_id')}")
|
||
print(f"用户名: {payload.get('username')}")
|
||
|
||
if 'exp' in payload:
|
||
exp_time = datetime.fromtimestamp(payload['exp'])
|
||
print(f"过期时间: {exp_time}")
|
||
|
||
if datetime.now() > exp_time:
|
||
print("❌ Token已过期")
|
||
else:
|
||
print("✅ Token未过期")
|
||
|
||
except Exception as e:
|
||
print(f"❌ JWT解码失败: {e}")
|
||
|
||
if __name__ == "__main__":
|
||
print("JWT Token 测试工具")
|
||
print("=" * 50)
|
||
print(f"工作目录: {os.getcwd()}")
|
||
print(f"测试脚本路径: {__file__}")
|
||
print()
|
||
|
||
print("请选择功能:")
|
||
print("1. 模拟指定用户Token失效 (推荐)")
|
||
print("2. 完整Token过期测试")
|
||
print("3. JWT格式检查")
|
||
|
||
choice = input("\n请输入选项 (1-3, 默认: 1): ").strip()
|
||
|
||
if choice == "2":
|
||
test_token_expiration()
|
||
check_token_format()
|
||
elif choice == "3":
|
||
check_token_format()
|
||
else:
|
||
# 默认选择1
|
||
invalidate_user_tokens()
|
||
|
||
print("\n=== 测试完成 ===")
|
||
print("如果测试失败,请检查:")
|
||
print("1. 确保后端服务正在运行: python main.py")
|
||
print("2. 确保在 backend 目录下运行测试")
|
||
print("3. 确保Redis服务正在运行")
|
||
print("4. 如果选择了选项1,请在网页上验证用户是否自动登出") |