imetting_backend/test/test_token_expiration.py

296 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
JWT Token 过期测试脚本
用于测试JWT token的过期、撤销机制可以模拟指定用户的token失效
运行方法:
cd /Users/jiliu/工作/projects/imeeting/backend
source venv/bin/activate # 激活虚拟环境
python test/test_token_expiration.py
功能:
1. 登录指定用户并获取token
2. 验证token有效性
3. 撤销指定用户的所有token模拟失效
4. 验证撤销后token失效
期望结果:在网页上登录的用户执行失效命令后,网页会自动登出
"""
import sys
import os
import requests
import json
import time
from datetime import datetime
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
BASE_URL = "http://127.0.0.1:8000"
# 禁用代理以避免本地请求被代理
PROXIES = {'http': None, 'https': None}
def invalidate_user_tokens():
"""模拟指定用户的token失效"""
print("模拟用户Token失效工具")
print("=" * 40)
# 获取要失效的用户名
target_username = input("请输入要失效token的用户名 (默认: mula): ").strip()
if not target_username:
target_username = "mula"
# 获取管理员凭据来执行失效操作
admin_username = input("请输入管理员用户名 (默认: mula): ").strip()
admin_password = input("请输入管理员密码 (默认: 781126): ").strip()
if not admin_username:
admin_username = "mula"
if not admin_password:
admin_password = "781126"
try:
# 1. 管理员登录获取token
print(f"\n步骤1: 管理员登录 ({admin_username})")
admin_login_data = {
"username": admin_username,
"password": admin_password
}
response = requests.post(f"{BASE_URL}/api/auth/login", json=admin_login_data, proxies=PROXIES)
if response.status_code != 200:
print(f"❌ 管理员登录失败")
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.text}")
return
admin_data = response.json()
admin_token = admin_data["token"]
admin_headers = {"Authorization": f"Bearer {admin_token}"}
print(f"✅ 管理员登录成功: {admin_data['username']} ({admin_data['caption']})")
# 2. 如果目标用户不是管理员先登录目标用户验证token存在
if target_username != admin_username:
print(f"\n步骤2: 验证目标用户 ({target_username}) 是否存在")
target_password = input(f"请输入 {target_username} 的密码 (用于验证): ").strip()
if not target_password:
print("❌ 需要提供目标用户的密码来验证")
return
target_login_data = {
"username": target_username,
"password": target_password
}
response = requests.post(f"{BASE_URL}/api/auth/login", json=target_login_data, proxies=PROXIES)
if response.status_code != 200:
print(f"❌ 目标用户登录失败,无法验证用户存在")
return
target_data = response.json()
print(f"✅ 目标用户验证成功: {target_data['username']} ({target_data['caption']})")
target_user_id = target_data['user_id']
else:
target_user_id = admin_data['user_id']
# 3. 撤销目标用户的所有token
print(f"\n步骤3: 撤销用户 {target_username} (ID: {target_user_id}) 的所有token")
# 使用管理员权限调用新的admin API
response = requests.post(f"{BASE_URL}/api/auth/admin/revoke-user-tokens/{target_user_id}",
headers=admin_headers, proxies=PROXIES)
if response.status_code == 200:
result = response.json()
print(f"✅ Token撤销成功: {result.get('message', '已撤销所有token')}")
# 4. 验证token是否真的失效了
print(f"\n步骤4: 验证token失效")
if target_username != admin_username:
# 尝试使用目标用户的token访问protected API
target_token = target_data["token"]
target_headers = {"Authorization": f"Bearer {target_token}"}
response = requests.get(f"{BASE_URL}/api/auth/me", headers=target_headers, proxies=PROXIES)
if response.status_code == 401:
print(f"✅ 验证成功:用户 {target_username} 的token已失效")
else:
print(f"❌ 验证失败:用户 {target_username} 的token仍然有效")
else:
# 如果目标用户就是管理员验证当前管理员token是否失效
response = requests.get(f"{BASE_URL}/api/auth/me", headers=admin_headers, proxies=PROXIES)
if response.status_code == 401:
print(f"✅ 验证成功:用户 {target_username} 的token已失效")
else:
print(f"❌ 验证失败:用户 {target_username} 的token仍然有效")
print(f"\n🌟 操作完成!")
print(f"如果用户 {target_username} 在网页上已登录,现在应该会自动登出。")
print(f"你可以在网页上验证是否自动跳转到登录页面。")
else:
print(f"❌ Token撤销失败: {response.status_code}")
print(f"响应内容: {response.text}")
except requests.exceptions.ConnectionError:
print("❌ 无法连接到后端服务器,请确保服务器正在运行")
except Exception as e:
print(f"❌ 操作失败: {e}")
def test_token_expiration():
"""测试token过期机制"""
print("JWT Token 过期测试")
print("=" * 40)
# 1. 登录获取token
username = input("请输入用户名 (默认: test): ").strip()
password = input("请输入密码 (默认: test): ").strip()
# 使用默认值如果输入为空
if not username:
username = "test"
if not password:
password = "test"
login_data = {
"username": username,
"password": password
}
try:
# 登录
print(f"正在尝试登录用户: {login_data['username']}")
response = requests.post(f"{BASE_URL}/api/auth/login", json=login_data, proxies=PROXIES)
if response.status_code != 200:
print(f"❌ 登录失败")
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.text}")
print(f"请求URL: {BASE_URL}/api/auth/login")
print("请检查:")
print("1. 后端服务是否正在运行")
print("2. 用户名和密码是否正确")
print("3. 数据库连接是否正常")
return
user_data = response.json()
token = user_data["token"]
print(f"✅ 登录成功获得token: {token[:20]}...")
# 2. 测试token有效性
headers = {"Authorization": f"Bearer {token}"}
print("\n测试1: 验证token有效性")
response = requests.get(f"{BASE_URL}/api/auth/me", headers=headers, proxies=PROXIES)
if response.status_code == 200:
user_info = response.json()
print(f"✅ Token有效用户: {user_info.get('username')}")
else:
print(f"❌ Token无效: {response.status_code}")
return
# 3. 测试受保护的API
print("\n测试2: 访问受保护的API")
response = requests.get(f"{BASE_URL}/api/meetings", headers=headers, proxies=PROXIES)
if response.status_code == 200:
print("✅ 成功访问会议列表API")
else:
print(f"❌ 访问受保护API失败: {response.status_code}")
# 4. 登出token
print("\n测试3: 登出token")
response = requests.post(f"{BASE_URL}/api/auth/logout", headers=headers, proxies=PROXIES)
if response.status_code == 200:
print("✅ 登出成功")
# 5. 验证登出后token失效
print("\n测试4: 验证登出后token失效")
response = requests.get(f"{BASE_URL}/api/auth/me", headers=headers, proxies=PROXIES)
if response.status_code == 401:
print("✅ Token已失效登出成功")
else:
print(f"❌ Token仍然有效登出失败: {response.status_code}")
else:
print(f"❌ 登出失败: {response.status_code}")
except requests.exceptions.ConnectionError:
print("❌ 无法连接到后端服务器,请确保服务器正在运行")
except Exception as e:
print(f"❌ 测试失败: {e}")
def check_token_format():
"""检查token格式是否为JWT"""
token = input("\n请粘贴JWT token (或按Enter跳过): ").strip()
if not token:
return
print(f"\nJWT格式检查:")
# JWT应该有三个部分用.分隔
parts = token.split('.')
if len(parts) != 3:
print("❌ 不是有效的JWT格式 (应该有3个部分用.分隔)")
return
try:
import base64
import json
# 解码header
header_padding = parts[0] + '=' * (4 - len(parts[0]) % 4)
header = json.loads(base64.urlsafe_b64decode(header_padding))
# 解码payload
payload_padding = parts[1] + '=' * (4 - len(parts[1]) % 4)
payload = json.loads(base64.urlsafe_b64decode(payload_padding))
print("✅ JWT格式有效")
print(f"算法: {header.get('alg')}")
print(f"类型: {header.get('typ')}")
print(f"用户ID: {payload.get('user_id')}")
print(f"用户名: {payload.get('username')}")
if 'exp' in payload:
exp_time = datetime.fromtimestamp(payload['exp'])
print(f"过期时间: {exp_time}")
if datetime.now() > exp_time:
print("❌ Token已过期")
else:
print("✅ Token未过期")
except Exception as e:
print(f"❌ JWT解码失败: {e}")
if __name__ == "__main__":
print("JWT Token 测试工具")
print("=" * 50)
print(f"工作目录: {os.getcwd()}")
print(f"测试脚本路径: {__file__}")
print()
print("请选择功能:")
print("1. 模拟指定用户Token失效 (推荐)")
print("2. 完整Token过期测试")
print("3. JWT格式检查")
choice = input("\n请输入选项 (1-3, 默认: 1): ").strip()
if choice == "2":
test_token_expiration()
check_token_format()
elif choice == "3":
check_token_format()
else:
# 默认选择1
invalidate_user_tokens()
print("\n=== 测试完成 ===")
print("如果测试失败,请检查:")
print("1. 确保后端服务正在运行: python main.py")
print("2. 确保在 backend 目录下运行测试")
print("3. 确保Redis服务正在运行")
print("4. 如果选择了选项1请在网页上验证用户是否自动登出")