250 lines
8.3 KiB
Python
250 lines
8.3 KiB
Python
"""
|
||
B2M 短信服务封装
|
||
================
|
||
|
||
接入 B2M 短信平台(https://www.b2m.cn),提供单发短信能力。
|
||
|
||
安全接口特点:
|
||
- 请求体 AES 加密(AES/ECB/PKCS5Padding)
|
||
- 可选 gzip 压缩
|
||
- appId 通过 HTTP Header 传输
|
||
- 响应体同样 AES 加密
|
||
|
||
使用示例:
|
||
from app.services.sms_service import get_sms_service
|
||
|
||
sms = get_sms_service()
|
||
result = await sms.send_single_sms(mobile="13800138000", content="您的验证码是 123456")
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import gzip
|
||
import json
|
||
import logging
|
||
import time
|
||
from typing import Any
|
||
|
||
import httpx
|
||
from cryptography.hazmat.primitives import padding
|
||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||
|
||
from app.config import get_settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class SMSError(Exception):
|
||
"""短信服务异常"""
|
||
|
||
def __init__(self, message: str, *, code: str | None = None):
|
||
super().__init__(message)
|
||
self.code = code
|
||
|
||
|
||
class B2MSMSService:
|
||
"""
|
||
B2M 短信服务
|
||
|
||
安全接口请求流程:
|
||
1. 参数拼装 JSON
|
||
2. UTF-8 编码 → byte 数组
|
||
3. 可选 gzip 压缩
|
||
4. AES 加密(ECB/PKCS5Padding)
|
||
5. HTTP POST 发送
|
||
|
||
响应流程(逆向):
|
||
1. 从 Header 读取 result 状态码
|
||
2. 从 Body 读取加密 byte 数组
|
||
3. AES 解密
|
||
4. 可选 gzip 解压
|
||
5. UTF-8 解码 → JSON
|
||
"""
|
||
|
||
def __init__(self):
|
||
settings = get_settings()
|
||
|
||
self.app_id = settings.SMS_APP_ID
|
||
self.secret_key = settings.SMS_SECRET_KEY
|
||
self.base_url = settings.SMS_BASE_URL
|
||
|
||
if not all([self.app_id, self.secret_key, self.base_url]):
|
||
raise SMSError("B2M 短信配置不完整:SMS_APP_ID / SMS_SECRET_KEY / SMS_BASE_URL 未配置")
|
||
|
||
# 预计算密钥 byte 数组,并校验长度(AES 要求 16/24/32 字节)
|
||
self._secret_key_bytes: bytes | None = None
|
||
if self.secret_key:
|
||
key_bytes = self.secret_key.encode("utf-8")
|
||
if len(key_bytes) not in (16, 24, 32):
|
||
raise SMSError(f"AES 密钥长度必须是 16/24/32 字节,当前 {len(key_bytes)} 字节")
|
||
self._secret_key_bytes = key_bytes
|
||
|
||
if not all([self.app_id, self.secret_key, self.base_url]):
|
||
logger.warning("B2M 短信配置不完整,短信功能将不可用")
|
||
|
||
# ── AES 加解密 ──────────────────────────────────────
|
||
|
||
def _aes_encrypt(self, plaintext: bytes) -> bytes:
|
||
"""AES/ECB/PKCS5Padding 加密"""
|
||
if not self._secret_key_bytes:
|
||
raise SMSError("AES 密钥未加载")
|
||
|
||
# PKCS5Padding 在 128 位块大小下与 PKCS7 等价
|
||
padder = padding.PKCS7(algorithms.AES.block_size).padder()
|
||
padded_data = padder.update(plaintext) + padder.finalize()
|
||
|
||
# ECB 模式
|
||
cipher = Cipher(
|
||
algorithms.AES(self._secret_key_bytes),
|
||
modes.ECB(), # nosec: B305 — B2M 短信平台协议强制使用 ECB 模式
|
||
)
|
||
encryptor = cipher.encryptor()
|
||
return encryptor.update(padded_data) + encryptor.finalize()
|
||
|
||
def _aes_decrypt(self, ciphertext: bytes) -> bytes:
|
||
"""AES/ECB/PKCS5Padding 解密"""
|
||
if not self._secret_key_bytes:
|
||
raise SMSError("AES 密钥未加载")
|
||
|
||
cipher = Cipher(
|
||
algorithms.AES(self._secret_key_bytes),
|
||
modes.ECB(), # nosec: B305 — B2M 短信平台协议强制使用 ECB 模式
|
||
)
|
||
decryptor = cipher.decryptor()
|
||
padded_data = decryptor.update(ciphertext) + decryptor.finalize()
|
||
|
||
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
|
||
return unpadder.update(padded_data) + unpadder.finalize()
|
||
|
||
# ── 核心发送方法 ────────────────────────────────────
|
||
|
||
async def send_single_sms(
|
||
self,
|
||
client: httpx.AsyncClient,
|
||
*,
|
||
mobile: str,
|
||
content: str,
|
||
extended_code: str | None = None,
|
||
timer_time: str | None = None,
|
||
custom_sms_id: str | None = None,
|
||
use_gzip: bool = False,
|
||
) -> dict[str, Any]:
|
||
"""
|
||
发送单条短信(安全接口)
|
||
|
||
:param mobile: 手机号
|
||
:param content: 短信内容(需包含签名,如【美家卡】您的验证码是...)
|
||
:param extended_code: 扩展码(不同短信内容搭配不同扩展码,在调用方指定)
|
||
:param timer_time: 定时发送时间(格式:yyyy-MM-dd HH:mm:ss),不填为即时发送
|
||
:param custom_sms_id: 自定义消息 ID(最长 32 位)
|
||
:param use_gzip: 是否启用 gzip 压缩
|
||
:return: {"mobile": "...", "smsId": "...", "customSmsId": "..."}
|
||
"""
|
||
if not all([self.app_id, self.secret_key, self.base_url]):
|
||
raise SMSError("B2M 短信配置不完整,无法发送短信")
|
||
|
||
# 1. 拼装 JSON 参数
|
||
payload = {
|
||
"mobile": mobile,
|
||
"content": content,
|
||
"requestTime": int(time.time() * 1000),
|
||
"requestValidPeriod": 30,
|
||
}
|
||
if timer_time:
|
||
payload["timerTime"] = timer_time
|
||
if custom_sms_id:
|
||
payload["customSmsId"] = custom_sms_id
|
||
if extended_code:
|
||
payload["extendedCode"] = extended_code
|
||
|
||
json_str = json.dumps(payload, ensure_ascii=False)
|
||
data = json_str.encode("utf-8")
|
||
|
||
# 2. 可选 gzip 压缩
|
||
headers = {
|
||
"appId": self.app_id,
|
||
"Content-Type": "application/octet-stream",
|
||
}
|
||
if use_gzip:
|
||
data = gzip.compress(data)
|
||
headers["gzip"] = "on"
|
||
|
||
# 3. AES 加密
|
||
encrypted_data = self._aes_encrypt(data)
|
||
|
||
# 4. 发送请求
|
||
url = f"{self.base_url.rstrip('/')}/inter/sendSingleSMS"
|
||
logger.debug(f"[SMS] 发送短信到 {mobile}, url={url}")
|
||
|
||
response = await client.post(
|
||
url,
|
||
headers=headers,
|
||
content=encrypted_data,
|
||
timeout=httpx.Timeout(30.0, connect=10.0),
|
||
)
|
||
|
||
# 5. 处理响应
|
||
result_code = response.headers.get("result", "UNKNOWN")
|
||
|
||
if result_code != "SUCCESS":
|
||
logger.error(f"[SMS] B2M 接口返回错误: {result_code}")
|
||
raise SMSError(
|
||
message=f"短信发送失败: {result_code}",
|
||
code=result_code,
|
||
)
|
||
|
||
# 6. 解密响应
|
||
encrypted_response = response.content
|
||
decrypted_data = self._aes_decrypt(encrypted_response)
|
||
|
||
# 7. 可选 gzip 解压
|
||
if use_gzip:
|
||
decrypted_data = gzip.decompress(decrypted_data)
|
||
|
||
# 8. 解析 JSON
|
||
response_json = json.loads(decrypted_data.decode("utf-8"))
|
||
logger.info(f"[SMS] 短信发送成功: {mobile}, smsId={response_json.get('smsId')}")
|
||
|
||
return response_json
|
||
|
||
# ── 便捷方法:发送验证码 ─────────────────────────────
|
||
|
||
# ── 扩展码配置(按短信内容类型分配,不同内容搭配不同扩展码)──
|
||
# 验证码短信扩展码
|
||
EXT_CODE_VERIFICATION: str | None = "11"
|
||
|
||
async def send_verification_code(
|
||
self,
|
||
client: httpx.AsyncClient,
|
||
*,
|
||
mobile: str,
|
||
code: str,
|
||
expire_minutes: int = 5,
|
||
) -> dict[str, Any]:
|
||
"""
|
||
发送验证码短信
|
||
|
||
:param mobile: 手机号
|
||
:param code: 验证码
|
||
:param expire_minutes: 有效期(分钟),用于短信文案
|
||
:return: 发送结果
|
||
"""
|
||
# 短信内容需包含签名,格式:【签名】正文
|
||
content = f"【厦门美家卡科技】您的验证码是 {code},{expire_minutes}分钟内有效,请勿泄露。"
|
||
|
||
return await self.send_single_sms(
|
||
client,
|
||
mobile=mobile,
|
||
content=content,
|
||
extended_code=self.EXT_CODE_VERIFICATION,
|
||
custom_sms_id=f"verify_{mobile}_{int(time.time())}",
|
||
)
|
||
|
||
|
||
# ── 便捷函数 ──────────────────────────────────────────
|
||
|
||
|
||
def get_sms_service() -> B2MSMSService:
|
||
"""获取短信服务实例"""
|
||
return B2MSMSService()
|