Files
meijiaka-zy/docs/qiniu-kodo-python-sdk-guide.md

18 KiB
Raw Permalink Blame History

七牛云对象存储 (Kodo) Python SDK 开发规范

概述

本文档规范美家卡智影项目中使用七牛云对象存储 (Kodo) Python SDK 的开发标准,涵盖文件上传、下载、管理和 CDN 操作等核心功能。

SDK 版本: v5.0.0+
Python 版本: 3.8+ (兼容 2.7 和 3.3+)
官方文档: https://developer.qiniu.com/kodo/1242/python


1. 安装与初始化

1.1 安装 SDK

pip install qiniu

1.2 初始化配置

from qiniu import Auth

# 从环境变量读取密钥(推荐)
import os
access_key = os.getenv('QINIU_ACCESS_KEY')
secret_key = os.getenv('QINIU_SECRET_KEY')

# 构建鉴权对象
q = Auth(access_key, secret_key)

环境变量配置 (.env 文件):

QINIU_ACCESS_KEY=your-access-key
QINIU_SECRET_KEY=your-secret-key
QINIU_BUCKET_NAME=your-bucket-name
QINIU_BUCKET_DOMAIN=your-domain.com

2. 文件上传

2.1 上传方式选择

场景 推荐方式 说明
小文件 (< 100MB) 表单上传 (put_file) 简单快速,一次请求完成
大文件 (> 100MB) 分片上传 v2 (put_file_v2) 支持断点续传,适应弱网环境
网络不稳定 分片上传 v2 自动重试,更可靠

2.2 服务端生成上传 Token

from qiniu import Auth

def generate_upload_token(
    bucket_name: str,
    key: str = None,
    expires: int = 3600,
    policy: dict = None
) -> str:
    """
    生成上传凭证
    
    Args:
        bucket_name: 存储空间名称
        key: 指定文件名(可选)
        expires: Token 有效期(秒),默认 3600
        policy: 上传策略配置(可选)
    
    Returns:
        上传 Token 字符串
    """
    q = Auth(access_key, secret_key)
    
    # 自定义上传策略(可选)
    if policy is None:
        policy = {}
    
    token = q.upload_token(bucket_name, key, expires, policy)
    return token

2.3 客户端直传(推荐)

服务端生成 Token,客户端直传到七牛云:

# 服务端 API
from fastapi import APIRouter
from pydantic import BaseModel

router = APIRouter(prefix="/qiniu", tags=["Qiniu"])

class UploadTokenRequest(BaseModel):
    key: str           # 文件名
    expires: int = 3600  # Token 有效期

class UploadTokenResponse(BaseModel):
    token: str
    key: str
    upload_url: str = "https://upload.qiniup.com"

@router.post("/upload-token", response_model=UploadTokenResponse)
async def get_upload_token(request: UploadTokenRequest):
    """获取上传凭证,客户端直传"""
    token = generate_upload_token(
        bucket_name=os.getenv('QINIU_BUCKET_NAME'),
        key=request.key,
        expires=request.expires
    )
    return UploadTokenResponse(token=token, key=request.key)

2.4 服务端上传文件(保留场景)

from qiniu import Auth, put_file_v2, etag
import qiniu.config

def upload_file(
    local_file_path: str,
    key: str,
    bucket_name: str = None
) -> dict:
    """
    服务端上传文件到七牛云
    
    Args:
        local_file_path: 本地文件路径
        key: 存储的文件名(如 "audios/voice.mp3"
        bucket_name: 存储空间名称
    
    Returns:
        {"key": str, "hash": str, "url": str}
    """
    bucket_name = bucket_name or os.getenv('QINIU_BUCKET_NAME')
    
    # 生成上传 Token
    token = q.upload_token(bucket_name, key, 3600)
    
    # 使用分片上传 v2(推荐)
    ret, info = put_file_v2(
        up_token=token,
        key=key,
        file_path=local_file_path,
        version='v2'  # 指定分片上传 v2 版本
    )
    
    if ret is None:
        raise Exception(f"上传失败: {info}")
    
    # 验证文件完整性
    assert ret['key'] == key
    assert ret['hash'] == etag(local_file_path)
    
    # 构建访问 URL
    domain = os.getenv('QINIU_BUCKET_DOMAIN')
    url = f"https://{domain}/{key}"
    
    return {
        "key": ret['key'],
        "hash": ret['hash'],
        "url": url
    }

2.5 上传策略 (PutPolicy)

常用策略配置:

# 1. 限制文件大小 (10MB ~ 100MB)
policy = {
    "fsizeMin": 1024 * 1024 * 10,       # 最小 10MB
    "fsizeLimit": 1024 * 1024 * 100,    # 最大 100MB
    "mimeLimit": "audio/*;video/*"       # 限制文件类型
}

# 2. 上传后回调业务服务器
policy = {
    "callbackUrl": "https://your-api.com/callback",
    "callbackBody": "key=$(key)&hash=$(etag)&fname=$(fname)&fsize=$(fsize)",
    "callbackBodyType": "application/x-www-form-urlencoded"
}

# 3. 上传后转码(持久化处理)
import base64
fops = 'avthumb/mp4/s/640x360/vb/1.25m'
saveas_key = base64.urlsafe_b64encode(f'{bucket_name}:output.mp4'.encode()).decode()

policy = {
    "persistentOps": f"{fops}|saveas/{saveas_key}",
    "persistentPipeline": "transcoding",  # 队列名称
    "persistentNotifyUrl": "https://your-api.com/pfop/callback"
}

3. 文件下载

3.1 公有空间下载

公有空间文件可直接访问:

def get_public_url(key: str, domain: str = None) -> str:
    """获取公有空间文件 URL"""
    domain = domain or os.getenv('QINIU_BUCKET_DOMAIN')
    return f"https://{domain}/{key}"

3.2 私有空间下载(临时 URL

import requests

def get_private_url(key: str, expires: int = 3600) -> str:
    """
    生成私有空间文件的临时下载 URL
    
    Args:
        key: 文件 Key
        expires: 链接有效期(秒)
    
    Returns:
        带签名的临时 URL
    """
    domain = os.getenv('QINIU_BUCKET_DOMAIN')
    base_url = f"https://{domain}/{key}"
    
    # 生成私有下载链接
    private_url = q.private_download_url(base_url, expires=expires)
    return private_url

# 使用示例
def download_file(key: str, local_path: str):
    """下载私有空间文件到本地"""
    private_url = get_private_url(key, expires=3600)
    
    response = requests.get(private_url)
    if response.status_code == 200:
        with open(local_path, 'wb') as f:
            f.write(response.content)
        return True
    return False

4. 文件管理 (BucketManager)

4.1 初始化管理器

from qiniu import Auth, BucketManager

q = Auth(access_key, secret_key)
bucket = BucketManager(q)

4.2 获取文件信息

def get_file_info(bucket_name: str, key: str) -> dict:
    """
    获取文件元信息
    
    Returns:
        {
            "fsize": 文件大小(字节),
            "hash": 文件哈希,
            "mimeType": MIME类型,
            "putTime": 上传时间(100纳秒时间戳),
            "type": 存储类型(0=标准,1=低频,2=归档,3=深度归档)
        }
    """
    ret, info = bucket.stat(bucket_name, key)
    if ret is None:
        raise Exception(f"获取文件信息失败: {info}")
    return ret

4.3 列举文件列表

from typing import List, Optional

def list_files(
    bucket_name: str,
    prefix: str = None,      # 前缀筛选
    limit: int = 100,        # 每页数量
    marker: str = None       # 分页标记
) -> dict:
    """
    列举空间文件列表
    
    Returns:
        {
            "items": [{"key": ..., "fsize": ..., ...}],
            "marker": "分页标记",
            "commonPrefixes": ["公共前缀列表"]
        }
    """
    ret, eof, info = bucket.list(
        bucket_name,
        prefix=prefix,
        marker=marker,
        limit=limit,
        delimiter=None  # 不指定分隔符
    )
    
    return {
        "items": ret.get('items', []),
        "marker": ret.get('marker'),
        "eof": eof  # 是否已列举完
    }

# 遍历所有文件
def list_all_files(bucket_name: str, prefix: str = None) -> List[dict]:
    """遍历获取所有文件"""
    files = []
    marker = None
    
    while True:
        result = list_files(bucket_name, prefix, limit=1000, marker=marker)
        files.extend(result['items'])
        
        if result['eof'] or not result['marker']:
            break
        marker = result['marker']
    
    return files

4.4 删除文件

def delete_file(bucket_name: str, key: str) -> bool:
    """删除单个文件"""
    ret, info = bucket.delete(bucket_name, key)
    return ret == {}

def delete_files_batch(bucket_name: str, keys: List[str]) -> dict:
    """批量删除文件"""
    from qiniu import build_batch_delete
    
    ops = build_batch_delete(bucket_name, keys)
    ret, info = bucket.batch(ops)
    return ret

4.5 复制和移动文件

def copy_file(
    src_bucket: str,
    src_key: str,
    dest_bucket: str,
    dest_key: str,
    force: bool = True
) -> bool:
    """复制文件"""
    ret, info = bucket.copy(
        src_bucket, src_key,
        dest_bucket, dest_key,
        force=force  # 强制覆盖
    )
    return ret is not None

def move_file(
    src_bucket: str,
    src_key: str,
    dest_bucket: str,
    dest_key: str,
    force: bool = True
) -> bool:
    """移动/重命名文件"""
    ret, info = bucket.move(
        src_bucket, src_key,
        dest_bucket, dest_key,
        force=force
    )
    return ret is not None

4.6 修改文件元信息

def change_mime(bucket_name: str, key: str, mime_type: str):
    """修改文件 MIME 类型"""
    ret, info = bucket.change_mime(bucket_name, key, mime_type)
    return ret is not None

def change_type(bucket_name: str, key: str, file_type: int):
    """
    修改文件存储类型
    
    file_type:
        0 = 标准存储
        1 = 低频存储
        2 = 归档存储
        3 = 深度归档存储
    """
    ret, info = bucket.change_type(bucket_name, key, file_type)
    return ret is not None

4.7 批量操作

from qiniu import (
    build_batch_stat,
    build_batch_copy,
    build_batch_move,
    build_batch_rename,
    build_batch_delete
)

def batch_stat(bucket_name: str, keys: List[str]) -> List[dict]:
    """批量查询文件信息"""
    ops = build_batch_stat(bucket_name, keys)
    ret, info = bucket.batch(ops)
    return ret

def batch_rename(
    bucket_name: str,
    key_map: dict,  # {"old_key": "new_key", ...}
    force: bool = True
):
    """批量重命名"""
    ops = build_batch_rename(bucket_name, key_map, force=force)
    ret, info = bucket.batch(ops)
    return ret

def batch_copy(
    src_bucket: str,
    key_map: dict,  # {"src_key": "dest_key", ...}
    dest_bucket: str = None,
    force: bool = True
):
    """批量复制"""
    dest_bucket = dest_bucket or src_bucket
    ops = build_batch_copy(src_bucket, key_map, dest_bucket, force=force)
    ret, info = bucket.batch(ops)
    return ret

4.8 抓取网络资源

def fetch_remote_file(
    remote_url: str,
    key: str,
    bucket_name: str = None
) -> dict:
    """
    抓取远程文件到七牛云
    
    Args:
        remote_url: 远程文件 URL
        key: 保存的文件名
        bucket_name: 目标空间
    
    Returns:
        {"key": ..., "hash": ..., "fsize": ...}
    """
    bucket_name = bucket_name or os.getenv('QINIU_BUCKET_NAME')
    ret, info = bucket.fetch(remote_url, bucket_name, key)
    return ret

5. CDN 操作

5.1 初始化 CDN Manager

from qiniu import CdnManager

cdn_manager = CdnManager(q)

5.2 刷新 CDN 缓存

def refresh_urls(urls: List[str]) -> dict:
    """刷新指定 URL 的 CDN 缓存"""
    ret, info = cdn_manager.refresh_urls(urls)
    return ret

def refresh_dirs(dirs: List[str]) -> dict:
    """刷新整个目录的 CDN 缓存"""
    ret, info = cdn_manager.refresh_dirs(dirs)
    return ret

5.3 预取资源

def prefetch_urls(urls: List[str]) -> dict:
    """预取资源到 CDN 节点"""
    ret, info = cdn_manager.prefetch_urls(urls)
    return ret

5.4 获取 CDN 日志

def get_cdn_log_list(domains: List[str], log_date: str) -> List[dict]:
    """
    获取 CDN 日志下载链接
    
    Args:
        domains: 域名列表
        log_date: 日期 (YYYY-MM-DD)
    
    Returns:
        [{"name": ..., "url": ..., "size": ..., "mtime": ...}]
    """
    ret, info = cdn_manager.get_log_list_data(domains, log_date)
    return ret.get('data', [])

6. 项目集成方案

6.1 服务端封装模块

# app/services/qiniu_service.py
"""
七牛云对象存储服务封装
"""

import os
from typing import List, Optional
from qiniu import Auth, BucketManager, CdnManager, put_file_v2, etag

class QiniuService:
    """七牛云服务封装"""
    
    def __init__(self):
        access_key = os.getenv('QINIU_ACCESS_KEY')
        secret_key = os.getenv('QINIU_SECRET_KEY')
        self.bucket_name = os.getenv('QINIU_BUCKET_NAME')
        self.domain = os.getenv('QINIU_BUCKET_DOMAIN')
        
        self.auth = Auth(access_key, secret_key)
        self.bucket = BucketManager(self.auth)
        self.cdn = CdnManager(self.auth)
    
    def get_upload_token(self, key: str, expires: int = 3600, policy: dict = None) -> str:
        """生成上传 Token"""
        return self.auth.upload_token(self.bucket_name, key, expires, policy)
    
    def get_file_url(self, key: str, private: bool = False, expires: int = 3600) -> str:
        """获取文件访问 URL"""
        base_url = f"https://{self.domain}/{key}"
        if private:
            return self.auth.private_download_url(base_url, expires)
        return base_url
    
    def upload_file(self, local_path: str, key: str) -> dict:
        """服务端上传文件"""
        token = self.get_upload_token(key)
        ret, info = put_file_v2(token, key, local_path, version='v2')
        
        if ret is None:
            raise Exception(f"上传失败: {info}")
        
        return {
            "key": ret['key'],
            "hash": ret['hash'],
            "url": self.get_file_url(key)
        }
    
    def delete_file(self, key: str) -> bool:
        """删除文件"""
        ret, info = self.bucket.delete(self.bucket_name, key)
        return ret == {}
    
    def refresh_cdn(self, keys: List[str]) -> dict:
        """刷新 CDN 缓存"""
        urls = [self.get_file_url(key) for key in keys]
        return self.cdn.refresh_urls(urls)

# 全局单例
_qiniu_service: Optional[QiniuService] = None

def get_qiniu_service() -> QiniuService:
    global _qiniu_service
    if _qiniu_service is None:
        _qiniu_service = QiniuService()
    return _qiniu_service

6.2 FastAPI 路由集成

# app/api/v1/qiniu.py

from fastapi import APIRouter, UploadFile, File
from app.services.qiniu_service import get_qiniu_service

router = APIRouter(prefix="/qiniu", tags=["Qiniu"])

@router.post("/upload-token")
async def get_upload_token(key: str, expires: int = 3600):
    """获取客户端直传 Token"""
    service = get_qiniu_service()
    token = service.get_upload_token(key, expires)
    return {"token": token, "key": key}

@router.post("/upload")
async def upload_file(file: UploadFile = File(...), key: str = None):
    """服务端上传文件(小文件场景)"""
    import tempfile
    import shutil
    
    service = get_qiniu_service()
    
    # 生成唯一文件名
    if key is None:
        import uuid
        ext = file.filename.split('.')[-1] if '.' in file.filename else ''
        key = f"uploads/{uuid.uuid4()}.{ext}" if ext else f"uploads/{uuid.uuid4()}"
    
    # 保存临时文件
    with tempfile.NamedTemporaryFile(delete=False) as tmp:
        shutil.copyfileobj(file.file, tmp)
        tmp_path = tmp.name
    
    try:
        result = service.upload_file(tmp_path, key)
        return result
    finally:
        os.unlink(tmp_path)

@router.delete("/files/{key:path}")
async def delete_file(key: str):
    """删除文件"""
    service = get_qiniu_service()
    success = service.delete_file(key)
    return {"success": success}

7. 最佳实践

7.1 文件名规范

def generate_key(file_type: str, user_id: str, filename: str) -> str:
    """
    生成规范的文件存储路径
    
    格式: {type}/{user_id}/{date}/{uuid}.{ext}
    """
    import uuid
    from datetime import datetime
    
    ext = filename.split('.')[-1] if '.' in filename else 'bin'
    date = datetime.now().strftime('%Y%m')
    unique_id = str(uuid.uuid4())[:8]
    
    return f"{file_type}/{user_id}/{date}/{unique_id}.{ext}"

# 使用示例
key = generate_key("voices", "user_123", "my-voice.mp3")
# 结果: voices/user_123/202501/a1b2c3d4.mp3

7.2 错误处理

from qiniu import AuthError, HTTPError

def handle_qiniu_error(func):
    """七牛云操作错误处理装饰器"""
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except AuthError as e:
            raise Exception(f"认证失败: {e}")
        except HTTPError as e:
            raise Exception(f"请求失败: {e}")
        except Exception as e:
            raise Exception(f"操作失败: {e}")
    return wrapper

7.3 安全配置

  1. 密钥管理: 使用环境变量,禁止硬编码
  2. Token 有效期: 上传 Token 建议 1 小时,下载 Token 根据场景设置
  3. 上传策略: 限制文件大小和 MIME 类型
  4. 私有空间: 敏感文件使用私有空间 + 临时 URL

8. 常见问题

Q1: 上传失败,返回 401 错误?

A: 检查 AccessKey 和 SecretKey 是否正确,以及 Token 是否过期。

Q2: 如何支持大文件上传?

A: 使用分片上传 v2 (put_file_v2)SDK 会自动处理分片和断点续传。

Q3: 文件上传后如何获取访问 URL?

A: 公有空间直接拼接 https://{domain}/{key},私有空间使用 auth.private_download_url() 生成临时 URL。

Q4: 如何刷新 CDN 缓存?

A: 使用 CdnManager.refresh_urls()refresh_dirs(),注意目录刷新有每日限额。

Q5: 上传回调不生效?

A: 确保 callbackUrl 是公网可访问的 HTTPS 地址,且返回 Content-Type: application/json。


9. 参考资料