#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import sys from qiniu import Auth, put_file_v2, BucketManager # Add app path sys.path.append(os.path.join(os.path.dirname(__file__), 'app')) from app.core.config import QINIU_ACCESS_KEY, QINIU_SECRET_KEY, QINIU_BUCKET, QINIU_DOMAIN def test_qiniu_connection(): print("=== 七牛云连接测试 ===") print(f"Access Key: {QINIU_ACCESS_KEY[:10]}...") print(f"Secret Key: {QINIU_SECRET_KEY[:10]}...") print(f"Bucket: {QINIU_BUCKET}") print(f"Domain: {QINIU_DOMAIN}") # 创建认证对象 q = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY) # 测试1: 生成上传token try: key = "test/connection-test.txt" token = q.upload_token(QINIU_BUCKET, key, 3600) print(f"✓ Token生成成功: {token[:50]}...") except Exception as e: print(f"✗ Token生成失败: {e}") return False # 测试2: 列举存储空间 (测试认证是否正确) try: bucket_manager = BucketManager(q) ret, eof, info = bucket_manager.list(QINIU_BUCKET, limit=100) print(f"✓ Bucket访问成功, status_code: {info.status_code}") if ret: print(f" 存储空间中有文件: {len(ret.get('items', []))} 个") except Exception as e: print(f"✗ Bucket访问失败: {e}") return False # 测试3: 上传一个小文件 test_file = "/Users/jiliu/工作/projects/imeeting/backend/uploads/result.json" if os.path.exists(test_file): try: key = "test/result1.json" token = q.upload_token(QINIU_BUCKET, key, 3600) ret, info = put_file_v2(token, key, test_file, version='v2') print(f"上传结果:") print(f" ret: {ret}") print(f" status_code: {info.status_code}") print(f" text_body: {info.text_body}") print(f" url: {info.url}") print(f" req_id: {info.req_id}") print(f" x_log: {info.x_log}") if info.status_code == 200: print("✓ 文件上传成功") url = f"http://{QINIU_DOMAIN}/{key}" print(f" 访问URL: {url}") return True else: print(f"✗ 文件上传失败: {info.status_code}") return False except Exception as e: print(f"✗ 文件上传异常: {e}") import traceback traceback.print_exc() return False else: print(f"✗ 测试文件不存在: {test_file}") return False if __name__ == "__main__": success = test_qiniu_connection() if success: print("\n🎉 七牛云连接测试成功!") else: print("\n❌ 七牛云连接测试失败!")