Files
meijiaka-zy/python-api/app/services/sms_service.py
T

251 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
B2M 短信服务封装
================
接入 B2M 短信平台(https://www.b2m.cn),提供单发短信能力。
安全接口特点:
- 请求体 AES 加密(AES/ECB/PKCS5Padding
- 可选 gzip 压缩
- appId 通过 HTTP Header 传输
- 响应体同样 AES 加密
使用示例:
from app.services.sms_service import get_sms_service
sms = get_sms_service()
result = await sms.send_single_sms(mobile="13800138000", content="您的验证码是 123456")
"""
from __future__ import annotations
import gzip
import json
import logging
import time
from typing import Any
import httpx
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from app.config import get_settings
logger = logging.getLogger(__name__)
class SMSError(Exception):
"""短信服务异常"""
def __init__(self, message: str, *, code: str | None = None):
super().__init__(message)
self.code = code
class B2MSMSService:
"""
B2M 短信服务
安全接口请求流程:
1. 参数拼装 JSON
2. UTF-8 编码 → byte 数组
3. 可选 gzip 压缩
4. AES 加密(ECB/PKCS5Padding
5. HTTP POST 发送
响应流程(逆向):
1. 从 Header 读取 result 状态码
2. 从 Body 读取加密 byte 数组
3. AES 解密
4. 可选 gzip 解压
5. UTF-8 解码 → JSON
"""
def __init__(self):
settings = get_settings()
self.app_id = settings.SMS_APP_ID
self.secret_key = settings.SMS_SECRET_KEY
self.base_url = settings.SMS_BASE_URL
if not all([self.app_id, self.secret_key, self.base_url]):
raise SMSError("B2M 短信配置不完整:SMS_APP_ID / SMS_SECRET_KEY / SMS_BASE_URL 未配置")
# 预计算密钥 byte 数组,并校验长度(AES 要求 16/24/32 字节)
self._secret_key_bytes: bytes | None = None
if self.secret_key:
key_bytes = self.secret_key.encode("utf-8")
if len(key_bytes) not in (16, 24, 32):
raise SMSError(
f"AES 密钥长度必须是 16/24/32 字节,当前 {len(key_bytes)} 字节"
)
self._secret_key_bytes = key_bytes
if not all([self.app_id, self.secret_key, self.base_url]):
logger.warning("B2M 短信配置不完整,短信功能将不可用")
# ── AES 加解密 ──────────────────────────────────────
def _aes_encrypt(self, plaintext: bytes) -> bytes:
"""AES/ECB/PKCS5Padding 加密"""
if not self._secret_key_bytes:
raise SMSError("AES 密钥未加载")
# PKCS5Padding 在 128 位块大小下与 PKCS7 等价
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(plaintext) + padder.finalize()
# ECB 模式
cipher = Cipher(
algorithms.AES(self._secret_key_bytes),
modes.ECB(),
)
encryptor = cipher.encryptor()
return encryptor.update(padded_data) + encryptor.finalize()
def _aes_decrypt(self, ciphertext: bytes) -> bytes:
"""AES/ECB/PKCS5Padding 解密"""
if not self._secret_key_bytes:
raise SMSError("AES 密钥未加载")
cipher = Cipher(
algorithms.AES(self._secret_key_bytes),
modes.ECB(),
)
decryptor = cipher.decryptor()
padded_data = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
return unpadder.update(padded_data) + unpadder.finalize()
# ── 核心发送方法 ────────────────────────────────────
async def send_single_sms(
self,
client: httpx.AsyncClient,
*,
mobile: str,
content: str,
extended_code: str | None = None,
timer_time: str | None = None,
custom_sms_id: str | None = None,
use_gzip: bool = False,
) -> dict[str, Any]:
"""
发送单条短信(安全接口)
:param mobile: 手机号
:param content: 短信内容(需包含签名,如【美家卡】您的验证码是...)
:param extended_code: 扩展码(不同短信内容搭配不同扩展码,在调用方指定)
:param timer_time: 定时发送时间(格式:yyyy-MM-dd HH:mm:ss),不填为即时发送
:param custom_sms_id: 自定义消息 ID(最长 32 位)
:param use_gzip: 是否启用 gzip 压缩
:return: {"mobile": "...", "smsId": "...", "customSmsId": "..."}
"""
if not all([self.app_id, self.secret_key, self.base_url]):
raise SMSError("B2M 短信配置不完整,无法发送短信")
# 1. 拼装 JSON 参数
payload = {
"mobile": mobile,
"content": content,
"requestTime": int(time.time() * 1000),
"requestValidPeriod": 30,
}
if timer_time:
payload["timerTime"] = timer_time
if custom_sms_id:
payload["customSmsId"] = custom_sms_id
if extended_code:
payload["extendedCode"] = extended_code
json_str = json.dumps(payload, ensure_ascii=False)
data = json_str.encode("utf-8")
# 2. 可选 gzip 压缩
headers = {
"appId": self.app_id,
"Content-Type": "application/octet-stream",
}
if use_gzip:
data = gzip.compress(data)
headers["gzip"] = "on"
# 3. AES 加密
encrypted_data = self._aes_encrypt(data)
# 4. 发送请求
url = f"{self.base_url.rstrip('/')}/inter/sendSingleSMS"
logger.debug(f"[SMS] 发送短信到 {mobile}, url={url}")
response = await client.post(
url,
headers=headers,
content=encrypted_data,
timeout=httpx.Timeout(30.0, connect=10.0),
)
# 5. 处理响应
result_code = response.headers.get("result", "UNKNOWN")
if result_code != "SUCCESS":
logger.error(f"[SMS] B2M 接口返回错误: {result_code}")
raise SMSError(
message=f"短信发送失败: {result_code}",
code=result_code,
)
# 6. 解密响应
encrypted_response = response.content
decrypted_data = self._aes_decrypt(encrypted_response)
# 7. 可选 gzip 解压
if use_gzip:
decrypted_data = gzip.decompress(decrypted_data)
# 8. 解析 JSON
response_json = json.loads(decrypted_data.decode("utf-8"))
logger.info(f"[SMS] 短信发送成功: {mobile}, smsId={response_json.get('smsId')}")
return response_json
# ── 便捷方法:发送验证码 ─────────────────────────────
# ── 扩展码配置(按短信内容类型分配,不同内容搭配不同扩展码)──
# 验证码短信扩展码
EXT_CODE_VERIFICATION: str | None = "11"
async def send_verification_code(
self,
client: httpx.AsyncClient,
*,
mobile: str,
code: str,
expire_minutes: int = 5,
) -> dict[str, Any]:
"""
发送验证码短信
:param mobile: 手机号
:param code: 验证码
:param expire_minutes: 有效期(分钟),用于短信文案
:return: 发送结果
"""
# 短信内容需包含签名,格式:【签名】正文
content = f"【厦门美家卡科技】您的验证码是 {code}{expire_minutes}分钟内有效,请勿泄露。"
return await self.send_single_sms(
client,
mobile=mobile,
content=content,
extended_code=self.EXT_CODE_VERIFICATION,
custom_sms_id=f"verify_{mobile}_{int(time.time())}",
)
# ── 便捷函数 ──────────────────────────────────────────
def get_sms_service() -> B2MSMSService:
"""获取短信服务实例"""
return B2MSMSService()