Files
meijiaka-zy/python-api/app/services/sms_service.py
T
小鱼开发 51521fc0dd feat(payment): 微信支付 APIv2 + 积分充值 + SMS 短信 + 双 Token 认证
- 微信支付从 APIv3 降级为 APIv2(MD5/XML)
- 积分系统:充值下单、微信回调、消费冻结/结算/退款
- SMS B2M 短信验证码服务
- 双 Token 认证(Access 30min + Refresh 30days)
- SSE 单设备踢人
- 用户设备管理、积分账户模型
- Alembic 迁移脚本
2026-05-07 18:43:02 +08:00

245 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
B2M 短信服务封装
================
接入 B2M 短信平台(https://www.b2m.cn),提供单发短信能力。
安全接口特点:
- 请求体 AES 加密(AES/ECB/PKCS5Padding
- 可选 gzip 压缩
- appId 通过 HTTP Header 传输
- 响应体同样 AES 加密
使用示例:
from app.services.sms_service import get_sms_service
sms = get_sms_service()
result = await sms.send_single_sms(mobile="13800138000", content="您的验证码是 123456")
"""
from __future__ import annotations
import gzip
import json
import logging
import time
from typing import Any
import httpx
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from app.config import get_settings
logger = logging.getLogger(__name__)
class SMSError(Exception):
"""短信服务异常"""
def __init__(self, message: str, *, code: str | None = None):
super().__init__(message)
self.code = code
class B2MSMSService:
"""
B2M 短信服务
安全接口请求流程:
1. 参数拼装 JSON
2. UTF-8 编码 → byte 数组
3. 可选 gzip 压缩
4. AES 加密(ECB/PKCS5Padding
5. HTTP POST 发送
响应流程(逆向):
1. 从 Header 读取 result 状态码
2. 从 Body 读取加密 byte 数组
3. AES 解密
4. 可选 gzip 解压
5. UTF-8 解码 → JSON
"""
def __init__(self):
settings = get_settings()
self.app_id = settings.SMS_APP_ID
self.secret_key = settings.SMS_SECRET_KEY
self.base_url = settings.SMS_BASE_URL
self.extended_code = settings.SMS_EXTENDED_CODE
if not all([self.app_id, self.secret_key, self.base_url]):
raise SMSError("B2M 短信配置不完整:SMS_APP_ID / SMS_SECRET_KEY / SMS_BASE_URL 未配置")
# 预计算密钥 byte 数组,并校验长度(AES 要求 16/24/32 字节)
self._secret_key_bytes: bytes | None = None
if self.secret_key:
key_bytes = self.secret_key.encode("utf-8")
if len(key_bytes) not in (16, 24, 32):
raise SMSError(
f"AES 密钥长度必须是 16/24/32 字节,当前 {len(key_bytes)} 字节"
)
self._secret_key_bytes = key_bytes
if not all([self.app_id, self.secret_key, self.base_url]):
logger.warning("B2M 短信配置不完整,短信功能将不可用")
# ── AES 加解密 ──────────────────────────────────────
def _aes_encrypt(self, plaintext: bytes) -> bytes:
"""AES/ECB/PKCS5Padding 加密"""
if not self._secret_key_bytes:
raise SMSError("AES 密钥未加载")
# PKCS5Padding 在 128 位块大小下与 PKCS7 等价
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(plaintext) + padder.finalize()
# ECB 模式
cipher = Cipher(
algorithms.AES(self._secret_key_bytes),
modes.ECB(),
)
encryptor = cipher.encryptor()
return encryptor.update(padded_data) + encryptor.finalize()
def _aes_decrypt(self, ciphertext: bytes) -> bytes:
"""AES/ECB/PKCS5Padding 解密"""
if not self._secret_key_bytes:
raise SMSError("AES 密钥未加载")
cipher = Cipher(
algorithms.AES(self._secret_key_bytes),
modes.ECB(),
)
decryptor = cipher.decryptor()
padded_data = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
return unpadder.update(padded_data) + unpadder.finalize()
# ── 核心发送方法 ────────────────────────────────────
async def send_single_sms(
self,
client: httpx.AsyncClient,
*,
mobile: str,
content: str,
timer_time: str | None = None,
custom_sms_id: str | None = None,
use_gzip: bool = False,
) -> dict[str, Any]:
"""
发送单条短信(安全接口)
:param mobile: 手机号
:param content: 短信内容(需包含签名,如【美家卡】您的验证码是...)
:param timer_time: 定时发送时间(格式:yyyy-MM-dd HH:mm:ss),不填为即时发送
:param custom_sms_id: 自定义消息 ID(最长 32 位)
:param use_gzip: 是否启用 gzip 压缩
:return: {"mobile": "...", "smsId": "...", "customSmsId": "..."}
"""
if not all([self.app_id, self.secret_key, self.base_url]):
raise SMSError("B2M 短信配置不完整,无法发送短信")
# 1. 拼装 JSON 参数
payload = {
"mobile": mobile,
"content": content,
"requestTime": int(time.time() * 1000),
"requestValidPeriod": 30,
}
if timer_time:
payload["timerTime"] = timer_time
if custom_sms_id:
payload["customSmsId"] = custom_sms_id
if self.extended_code:
payload["extendedCode"] = self.extended_code
json_str = json.dumps(payload, ensure_ascii=False)
data = json_str.encode("utf-8")
# 2. 可选 gzip 压缩
headers = {
"appId": self.app_id,
"Content-Type": "application/octet-stream",
}
if use_gzip:
data = gzip.compress(data)
headers["gzip"] = "on"
# 3. AES 加密
encrypted_data = self._aes_encrypt(data)
# 4. 发送请求
url = f"{self.base_url.rstrip('/')}/inter/sendSingleSMS"
logger.debug(f"[SMS] 发送短信到 {mobile}, url={url}")
response = await client.post(
url,
headers=headers,
content=encrypted_data,
timeout=httpx.Timeout(30.0, connect=10.0),
)
# 5. 处理响应
result_code = response.headers.get("result", "UNKNOWN")
if result_code != "SUCCESS":
logger.error(f"[SMS] B2M 接口返回错误: {result_code}")
raise SMSError(
message=f"短信发送失败: {result_code}",
code=result_code,
)
# 6. 解密响应
encrypted_response = response.content
decrypted_data = self._aes_decrypt(encrypted_response)
# 7. 可选 gzip 解压
if use_gzip:
decrypted_data = gzip.decompress(decrypted_data)
# 8. 解析 JSON
response_json = json.loads(decrypted_data.decode("utf-8"))
logger.info(f"[SMS] 短信发送成功: {mobile}, smsId={response_json.get('smsId')}")
return response_json
# ── 便捷方法:发送验证码 ─────────────────────────────
async def send_verification_code(
self,
client: httpx.AsyncClient,
*,
mobile: str,
code: str,
expire_minutes: int = 5,
) -> dict[str, Any]:
"""
发送验证码短信
:param mobile: 手机号
:param code: 验证码
:param expire_minutes: 有效期(分钟),用于短信文案
:return: 发送结果
"""
# 短信内容需包含签名,格式:【签名】正文
content = f"【美家卡】您的验证码是 {code}{expire_minutes}分钟内有效,请勿泄露。"
return await self.send_single_sms(
client,
mobile=mobile,
content=content,
custom_sms_id=f"verify_{mobile}_{int(time.time())}",
)
# ── 便捷函数 ──────────────────────────────────────────
def get_sms_service() -> B2MSMSService:
"""获取短信服务实例"""
return B2MSMSService()