18 KiB
18 KiB
七牛云对象存储 (Kodo) Python SDK 开发规范
概述
本文档规范美家卡智影项目中使用七牛云对象存储 (Kodo) Python SDK 的开发标准,涵盖文件上传、下载、管理和 CDN 操作等核心功能。
SDK 版本: v5.0.0+
Python 版本: 3.8+ (兼容 2.7 和 3.3+)
官方文档: https://developer.qiniu.com/kodo/1242/python
1. 安装与初始化
1.1 安装 SDK
pip install qiniu
1.2 初始化配置
from qiniu import Auth
# 从环境变量读取密钥(推荐)
import os
access_key = os.getenv('QINIU_ACCESS_KEY')
secret_key = os.getenv('QINIU_SECRET_KEY')
# 构建鉴权对象
q = Auth(access_key, secret_key)
环境变量配置 (.env 文件):
QINIU_ACCESS_KEY=your-access-key
QINIU_SECRET_KEY=your-secret-key
QINIU_BUCKET_NAME=your-bucket-name
QINIU_BUCKET_DOMAIN=your-domain.com
2. 文件上传
2.1 上传方式选择
| 场景 | 推荐方式 | 说明 |
|---|---|---|
| 小文件 (< 100MB) | 表单上传 (put_file) | 简单快速,一次请求完成 |
| 大文件 (> 100MB) | 分片上传 v2 (put_file_v2) | 支持断点续传,适应弱网环境 |
| 网络不稳定 | 分片上传 v2 | 自动重试,更可靠 |
2.2 服务端生成上传 Token
from qiniu import Auth
def generate_upload_token(
bucket_name: str,
key: str = None,
expires: int = 3600,
policy: dict = None
) -> str:
"""
生成上传凭证
Args:
bucket_name: 存储空间名称
key: 指定文件名(可选)
expires: Token 有效期(秒),默认 3600
policy: 上传策略配置(可选)
Returns:
上传 Token 字符串
"""
q = Auth(access_key, secret_key)
# 自定义上传策略(可选)
if policy is None:
policy = {}
token = q.upload_token(bucket_name, key, expires, policy)
return token
2.3 客户端直传(推荐)
服务端生成 Token,客户端直传到七牛云:
# 服务端 API
from fastapi import APIRouter
from pydantic import BaseModel
router = APIRouter(prefix="/qiniu", tags=["Qiniu"])
class UploadTokenRequest(BaseModel):
key: str # 文件名
expires: int = 3600 # Token 有效期
class UploadTokenResponse(BaseModel):
token: str
key: str
upload_url: str = "https://upload.qiniup.com"
@router.post("/upload-token", response_model=UploadTokenResponse)
async def get_upload_token(request: UploadTokenRequest):
"""获取上传凭证,客户端直传"""
token = generate_upload_token(
bucket_name=os.getenv('QINIU_BUCKET_NAME'),
key=request.key,
expires=request.expires
)
return UploadTokenResponse(token=token, key=request.key)
2.4 服务端上传文件(保留场景)
from qiniu import Auth, put_file_v2, etag
import qiniu.config
def upload_file(
local_file_path: str,
key: str,
bucket_name: str = None
) -> dict:
"""
服务端上传文件到七牛云
Args:
local_file_path: 本地文件路径
key: 存储的文件名(如 "audios/voice.mp3")
bucket_name: 存储空间名称
Returns:
{"key": str, "hash": str, "url": str}
"""
bucket_name = bucket_name or os.getenv('QINIU_BUCKET_NAME')
# 生成上传 Token
token = q.upload_token(bucket_name, key, 3600)
# 使用分片上传 v2(推荐)
ret, info = put_file_v2(
up_token=token,
key=key,
file_path=local_file_path,
version='v2' # 指定分片上传 v2 版本
)
if ret is None:
raise Exception(f"上传失败: {info}")
# 验证文件完整性
assert ret['key'] == key
assert ret['hash'] == etag(local_file_path)
# 构建访问 URL
domain = os.getenv('QINIU_BUCKET_DOMAIN')
url = f"https://{domain}/{key}"
return {
"key": ret['key'],
"hash": ret['hash'],
"url": url
}
2.5 上传策略 (PutPolicy)
常用策略配置:
# 1. 限制文件大小 (10MB ~ 100MB)
policy = {
"fsizeMin": 1024 * 1024 * 10, # 最小 10MB
"fsizeLimit": 1024 * 1024 * 100, # 最大 100MB
"mimeLimit": "audio/*;video/*" # 限制文件类型
}
# 2. 上传后回调业务服务器
policy = {
"callbackUrl": "https://your-api.com/callback",
"callbackBody": "key=$(key)&hash=$(etag)&fname=$(fname)&fsize=$(fsize)",
"callbackBodyType": "application/x-www-form-urlencoded"
}
# 3. 上传后转码(持久化处理)
import base64
fops = 'avthumb/mp4/s/640x360/vb/1.25m'
saveas_key = base64.urlsafe_b64encode(f'{bucket_name}:output.mp4'.encode()).decode()
policy = {
"persistentOps": f"{fops}|saveas/{saveas_key}",
"persistentPipeline": "transcoding", # 队列名称
"persistentNotifyUrl": "https://your-api.com/pfop/callback"
}
3. 文件下载
3.1 公有空间下载
公有空间文件可直接访问:
def get_public_url(key: str, domain: str = None) -> str:
"""获取公有空间文件 URL"""
domain = domain or os.getenv('QINIU_BUCKET_DOMAIN')
return f"https://{domain}/{key}"
3.2 私有空间下载(临时 URL)
import requests
def get_private_url(key: str, expires: int = 3600) -> str:
"""
生成私有空间文件的临时下载 URL
Args:
key: 文件 Key
expires: 链接有效期(秒)
Returns:
带签名的临时 URL
"""
domain = os.getenv('QINIU_BUCKET_DOMAIN')
base_url = f"https://{domain}/{key}"
# 生成私有下载链接
private_url = q.private_download_url(base_url, expires=expires)
return private_url
# 使用示例
def download_file(key: str, local_path: str):
"""下载私有空间文件到本地"""
private_url = get_private_url(key, expires=3600)
response = requests.get(private_url)
if response.status_code == 200:
with open(local_path, 'wb') as f:
f.write(response.content)
return True
return False
4. 文件管理 (BucketManager)
4.1 初始化管理器
from qiniu import Auth, BucketManager
q = Auth(access_key, secret_key)
bucket = BucketManager(q)
4.2 获取文件信息
def get_file_info(bucket_name: str, key: str) -> dict:
"""
获取文件元信息
Returns:
{
"fsize": 文件大小(字节),
"hash": 文件哈希,
"mimeType": MIME类型,
"putTime": 上传时间(100纳秒时间戳),
"type": 存储类型(0=标准,1=低频,2=归档,3=深度归档)
}
"""
ret, info = bucket.stat(bucket_name, key)
if ret is None:
raise Exception(f"获取文件信息失败: {info}")
return ret
4.3 列举文件列表
from typing import List, Optional
def list_files(
bucket_name: str,
prefix: str = None, # 前缀筛选
limit: int = 100, # 每页数量
marker: str = None # 分页标记
) -> dict:
"""
列举空间文件列表
Returns:
{
"items": [{"key": ..., "fsize": ..., ...}],
"marker": "分页标记",
"commonPrefixes": ["公共前缀列表"]
}
"""
ret, eof, info = bucket.list(
bucket_name,
prefix=prefix,
marker=marker,
limit=limit,
delimiter=None # 不指定分隔符
)
return {
"items": ret.get('items', []),
"marker": ret.get('marker'),
"eof": eof # 是否已列举完
}
# 遍历所有文件
def list_all_files(bucket_name: str, prefix: str = None) -> List[dict]:
"""遍历获取所有文件"""
files = []
marker = None
while True:
result = list_files(bucket_name, prefix, limit=1000, marker=marker)
files.extend(result['items'])
if result['eof'] or not result['marker']:
break
marker = result['marker']
return files
4.4 删除文件
def delete_file(bucket_name: str, key: str) -> bool:
"""删除单个文件"""
ret, info = bucket.delete(bucket_name, key)
return ret == {}
def delete_files_batch(bucket_name: str, keys: List[str]) -> dict:
"""批量删除文件"""
from qiniu import build_batch_delete
ops = build_batch_delete(bucket_name, keys)
ret, info = bucket.batch(ops)
return ret
4.5 复制和移动文件
def copy_file(
src_bucket: str,
src_key: str,
dest_bucket: str,
dest_key: str,
force: bool = True
) -> bool:
"""复制文件"""
ret, info = bucket.copy(
src_bucket, src_key,
dest_bucket, dest_key,
force=force # 强制覆盖
)
return ret is not None
def move_file(
src_bucket: str,
src_key: str,
dest_bucket: str,
dest_key: str,
force: bool = True
) -> bool:
"""移动/重命名文件"""
ret, info = bucket.move(
src_bucket, src_key,
dest_bucket, dest_key,
force=force
)
return ret is not None
4.6 修改文件元信息
def change_mime(bucket_name: str, key: str, mime_type: str):
"""修改文件 MIME 类型"""
ret, info = bucket.change_mime(bucket_name, key, mime_type)
return ret is not None
def change_type(bucket_name: str, key: str, file_type: int):
"""
修改文件存储类型
file_type:
0 = 标准存储
1 = 低频存储
2 = 归档存储
3 = 深度归档存储
"""
ret, info = bucket.change_type(bucket_name, key, file_type)
return ret is not None
4.7 批量操作
from qiniu import (
build_batch_stat,
build_batch_copy,
build_batch_move,
build_batch_rename,
build_batch_delete
)
def batch_stat(bucket_name: str, keys: List[str]) -> List[dict]:
"""批量查询文件信息"""
ops = build_batch_stat(bucket_name, keys)
ret, info = bucket.batch(ops)
return ret
def batch_rename(
bucket_name: str,
key_map: dict, # {"old_key": "new_key", ...}
force: bool = True
):
"""批量重命名"""
ops = build_batch_rename(bucket_name, key_map, force=force)
ret, info = bucket.batch(ops)
return ret
def batch_copy(
src_bucket: str,
key_map: dict, # {"src_key": "dest_key", ...}
dest_bucket: str = None,
force: bool = True
):
"""批量复制"""
dest_bucket = dest_bucket or src_bucket
ops = build_batch_copy(src_bucket, key_map, dest_bucket, force=force)
ret, info = bucket.batch(ops)
return ret
4.8 抓取网络资源
def fetch_remote_file(
remote_url: str,
key: str,
bucket_name: str = None
) -> dict:
"""
抓取远程文件到七牛云
Args:
remote_url: 远程文件 URL
key: 保存的文件名
bucket_name: 目标空间
Returns:
{"key": ..., "hash": ..., "fsize": ...}
"""
bucket_name = bucket_name or os.getenv('QINIU_BUCKET_NAME')
ret, info = bucket.fetch(remote_url, bucket_name, key)
return ret
5. CDN 操作
5.1 初始化 CDN Manager
from qiniu import CdnManager
cdn_manager = CdnManager(q)
5.2 刷新 CDN 缓存
def refresh_urls(urls: List[str]) -> dict:
"""刷新指定 URL 的 CDN 缓存"""
ret, info = cdn_manager.refresh_urls(urls)
return ret
def refresh_dirs(dirs: List[str]) -> dict:
"""刷新整个目录的 CDN 缓存"""
ret, info = cdn_manager.refresh_dirs(dirs)
return ret
5.3 预取资源
def prefetch_urls(urls: List[str]) -> dict:
"""预取资源到 CDN 节点"""
ret, info = cdn_manager.prefetch_urls(urls)
return ret
5.4 获取 CDN 日志
def get_cdn_log_list(domains: List[str], log_date: str) -> List[dict]:
"""
获取 CDN 日志下载链接
Args:
domains: 域名列表
log_date: 日期 (YYYY-MM-DD)
Returns:
[{"name": ..., "url": ..., "size": ..., "mtime": ...}]
"""
ret, info = cdn_manager.get_log_list_data(domains, log_date)
return ret.get('data', [])
6. 项目集成方案
6.1 服务端封装模块
# app/services/qiniu_service.py
"""
七牛云对象存储服务封装
"""
import os
from typing import List, Optional
from qiniu import Auth, BucketManager, CdnManager, put_file_v2, etag
class QiniuService:
"""七牛云服务封装"""
def __init__(self):
access_key = os.getenv('QINIU_ACCESS_KEY')
secret_key = os.getenv('QINIU_SECRET_KEY')
self.bucket_name = os.getenv('QINIU_BUCKET_NAME')
self.domain = os.getenv('QINIU_BUCKET_DOMAIN')
self.auth = Auth(access_key, secret_key)
self.bucket = BucketManager(self.auth)
self.cdn = CdnManager(self.auth)
def get_upload_token(self, key: str, expires: int = 3600, policy: dict = None) -> str:
"""生成上传 Token"""
return self.auth.upload_token(self.bucket_name, key, expires, policy)
def get_file_url(self, key: str, private: bool = False, expires: int = 3600) -> str:
"""获取文件访问 URL"""
base_url = f"https://{self.domain}/{key}"
if private:
return self.auth.private_download_url(base_url, expires)
return base_url
def upload_file(self, local_path: str, key: str) -> dict:
"""服务端上传文件"""
token = self.get_upload_token(key)
ret, info = put_file_v2(token, key, local_path, version='v2')
if ret is None:
raise Exception(f"上传失败: {info}")
return {
"key": ret['key'],
"hash": ret['hash'],
"url": self.get_file_url(key)
}
def delete_file(self, key: str) -> bool:
"""删除文件"""
ret, info = self.bucket.delete(self.bucket_name, key)
return ret == {}
def refresh_cdn(self, keys: List[str]) -> dict:
"""刷新 CDN 缓存"""
urls = [self.get_file_url(key) for key in keys]
return self.cdn.refresh_urls(urls)
# 全局单例
_qiniu_service: Optional[QiniuService] = None
def get_qiniu_service() -> QiniuService:
global _qiniu_service
if _qiniu_service is None:
_qiniu_service = QiniuService()
return _qiniu_service
6.2 FastAPI 路由集成
# app/api/v1/qiniu.py
from fastapi import APIRouter, UploadFile, File
from app.services.qiniu_service import get_qiniu_service
router = APIRouter(prefix="/qiniu", tags=["Qiniu"])
@router.post("/upload-token")
async def get_upload_token(key: str, expires: int = 3600):
"""获取客户端直传 Token"""
service = get_qiniu_service()
token = service.get_upload_token(key, expires)
return {"token": token, "key": key}
@router.post("/upload")
async def upload_file(file: UploadFile = File(...), key: str = None):
"""服务端上传文件(小文件场景)"""
import tempfile
import shutil
service = get_qiniu_service()
# 生成唯一文件名
if key is None:
import uuid
ext = file.filename.split('.')[-1] if '.' in file.filename else ''
key = f"uploads/{uuid.uuid4()}.{ext}" if ext else f"uploads/{uuid.uuid4()}"
# 保存临时文件
with tempfile.NamedTemporaryFile(delete=False) as tmp:
shutil.copyfileobj(file.file, tmp)
tmp_path = tmp.name
try:
result = service.upload_file(tmp_path, key)
return result
finally:
os.unlink(tmp_path)
@router.delete("/files/{key:path}")
async def delete_file(key: str):
"""删除文件"""
service = get_qiniu_service()
success = service.delete_file(key)
return {"success": success}
7. 最佳实践
7.1 文件名规范
def generate_key(file_type: str, user_id: str, filename: str) -> str:
"""
生成规范的文件存储路径
格式: {type}/{user_id}/{date}/{uuid}.{ext}
"""
import uuid
from datetime import datetime
ext = filename.split('.')[-1] if '.' in filename else 'bin'
date = datetime.now().strftime('%Y%m')
unique_id = str(uuid.uuid4())[:8]
return f"{file_type}/{user_id}/{date}/{unique_id}.{ext}"
# 使用示例
key = generate_key("voices", "user_123", "my-voice.mp3")
# 结果: voices/user_123/202501/a1b2c3d4.mp3
7.2 错误处理
from qiniu import AuthError, HTTPError
def handle_qiniu_error(func):
"""七牛云操作错误处理装饰器"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except AuthError as e:
raise Exception(f"认证失败: {e}")
except HTTPError as e:
raise Exception(f"请求失败: {e}")
except Exception as e:
raise Exception(f"操作失败: {e}")
return wrapper
7.3 安全配置
- 密钥管理: 使用环境变量,禁止硬编码
- Token 有效期: 上传 Token 建议 1 小时,下载 Token 根据场景设置
- 上传策略: 限制文件大小和 MIME 类型
- 私有空间: 敏感文件使用私有空间 + 临时 URL
8. 常见问题
Q1: 上传失败,返回 401 错误?
A: 检查 AccessKey 和 SecretKey 是否正确,以及 Token 是否过期。
Q2: 如何支持大文件上传?
A: 使用分片上传 v2 (put_file_v2),SDK 会自动处理分片和断点续传。
Q3: 文件上传后如何获取访问 URL?
A: 公有空间直接拼接 https://{domain}/{key},私有空间使用 auth.private_download_url() 生成临时 URL。
Q4: 如何刷新 CDN 缓存?
A: 使用 CdnManager.refresh_urls() 或 refresh_dirs(),注意目录刷新有每日限额。
Q5: 上传回调不生效?
A: 确保 callbackUrl 是公网可访问的 HTTPS 地址,且返回 Content-Type: application/json。