82 lines
2.8 KiB
Python
82 lines
2.8 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
from qiniu import Auth, put_file_v2, BucketManager
|
|
|
|
# Add app path
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), 'app'))
|
|
|
|
from app.core.config import QINIU_ACCESS_KEY, QINIU_SECRET_KEY, QINIU_BUCKET, QINIU_DOMAIN
|
|
|
|
def test_qiniu_connection():
|
|
print("=== 七牛云连接测试 ===")
|
|
print(f"Access Key: {QINIU_ACCESS_KEY[:10]}...")
|
|
print(f"Secret Key: {QINIU_SECRET_KEY[:10]}...")
|
|
print(f"Bucket: {QINIU_BUCKET}")
|
|
print(f"Domain: {QINIU_DOMAIN}")
|
|
|
|
# 创建认证对象
|
|
q = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY)
|
|
|
|
# 测试1: 生成上传token
|
|
try:
|
|
key = "test/connection-test.txt"
|
|
token = q.upload_token(QINIU_BUCKET, key, 3600)
|
|
print(f"✓ Token生成成功: {token[:50]}...")
|
|
except Exception as e:
|
|
print(f"✗ Token生成失败: {e}")
|
|
return False
|
|
|
|
# 测试2: 列举存储空间 (测试认证是否正确)
|
|
try:
|
|
bucket_manager = BucketManager(q)
|
|
ret, eof, info = bucket_manager.list(QINIU_BUCKET, limit=100)
|
|
print(f"✓ Bucket访问成功, status_code: {info.status_code}")
|
|
if ret:
|
|
print(f" 存储空间中有文件: {len(ret.get('items', []))} 个")
|
|
except Exception as e:
|
|
print(f"✗ Bucket访问失败: {e}")
|
|
return False
|
|
|
|
# 测试3: 上传一个小文件
|
|
test_file = "/Users/jiliu/工作/projects/imeeting/backend/uploads/result.json"
|
|
if os.path.exists(test_file):
|
|
try:
|
|
key = "test/result1.json"
|
|
token = q.upload_token(QINIU_BUCKET, key, 3600)
|
|
ret, info = put_file_v2(token, key, test_file, version='v2')
|
|
|
|
print(f"上传结果:")
|
|
print(f" ret: {ret}")
|
|
print(f" status_code: {info.status_code}")
|
|
print(f" text_body: {info.text_body}")
|
|
print(f" url: {info.url}")
|
|
print(f" req_id: {info.req_id}")
|
|
print(f" x_log: {info.x_log}")
|
|
|
|
if info.status_code == 200:
|
|
print("✓ 文件上传成功")
|
|
url = f"http://{QINIU_DOMAIN}/{key}"
|
|
print(f" 访问URL: {url}")
|
|
return True
|
|
else:
|
|
print(f"✗ 文件上传失败: {info.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"✗ 文件上传异常: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
else:
|
|
print(f"✗ 测试文件不存在: {test_file}")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
success = test_qiniu_connection()
|
|
if success:
|
|
print("\n🎉 七牛云连接测试成功!")
|
|
else:
|
|
print("\n❌ 七牛云连接测试失败!") |