Files
spacestation/www/api/websocket.php
T
2025-11-29 10:40:00 +08:00

295 lines
9.1 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
if (php_sapi_name() !== 'cli') {
die("Error: This script can only be run from the command line.\n");
}
// 设置错误显示
ini_set('display_errors', 1);
error_reporting(E_ALL);
// 定义路径常量
require_once 'index.php';
// 加载自定义控制器
$controller_path = APPPATH . 'core/HD_WS_Controller.php';
if (!file_exists($controller_path)) {
die("错误: 找不到 HD_WS_Controller\n");
}
require_once $controller_path;
class WebSocketServer
{
private $server;
private $ci;
private $db_timer_id;
public function __construct()
{
$this->server = new swoole_websocket_server("0.0.0.0", 9501);
$this->server->set([
'worker_num' => 2,
'daemonize' => 0,
'max_request' => 1000,
'heartbeat_check_interval' => 60,
'heartbeat_idle_time' => 600,
'log_file' => '/tmp/websocket.log'
]);
$this->bindEvents();
echo "[" . date('Y-m-d H:i:s') . "] WebSocket 服务器初始化完成\n";
}
private function bindEvents()
{
$this->server->on('start', [$this, 'onStart']);
$this->server->on('workerStart', [$this, 'onWorkerStart']);
$this->server->on('open', [$this, 'onOpen']);
$this->server->on('message', [$this, 'onMessage']);
$this->server->on('close', [$this, 'onClose']);
$this->server->on('workerStop', [$this, 'onWorkerStop']);
}
public function onStart($server)
{
echo "[" . date('Y-m-d H:i:s') . "] WebSocket 服务启动: ws://0.0.0.0:9501\n";
}
public function onWorkerStart($server, $worker_id)
{
echo "[" . date('Y-m-d H:i:s') . "] Worker进程 {$worker_id} 启动\n";
// 在每个worker中创建独立的CI实例
$this->ci = new HD_WS_Controller();
// 每个worker都设置定时器
$this->setupTimers();
}
private function setupTimers()
{
// 数据库心跳 - 每15秒执行一次(小于MySQL的wait_timeout
$this->db_timer_id = swoole_timer_tick(15000, function () {
$this->databaseHeartbeat();
});
echo "[" . date('Y-m-d H:i:s') . "] 数据库心跳定时器已启动\n";
// 清理过期连接 - 每60秒执行一次
swoole_timer_tick(60000, function () {
$this->cleanupExpiredConnections();
});
}
/**
* 数据库心跳检测
*/
private function databaseHeartbeat()
{
try {
if (!$this->ci) {
echo "[" . date('Y-m-d H:i:s') . "] CI实例未初始化\n";
return;
}
$this->ci->safeDbQuery(function ($db) {
$db->query('SELECT 1');
});
echo "[" . date('Y-m-d H:i:s') . "] 数据库心跳检测成功\n";
} catch (Exception $e) {
echo "[" . date('Y-m-d H:i:s') . "] 数据库心跳检测失败: " . $e->getMessage() . "\n";
// 如果是连接错误,尝试重新初始化CI实例
if (strpos($e->getMessage(), 'MySQL server has gone away') !== false) {
echo "[" . date('Y-m-d H:i:s') . "] 尝试重新初始化数据库连接\n";
$this->ci = new WebSocket_CI_Controller();
}
}
}
/**
* 清理过期连接
*/
private function cleanupExpiredConnections()
{
try {
$this->ci->safeDbQuery(function ($db) {
// 删除超过10分钟没有活动的连接
$expire_time = date('Y-m-d H:i:s', time() - 600);
$db->where('last_activity <', $expire_time);
$db->where('status', 1);
$deleted = $db->delete('lc_ws_conn');
if ($deleted > 0) {
echo "[" . date('Y-m-d H:i:s') . "] 清理了 {$deleted} 个过期连接\n";
}
});
} catch (Exception $e) {
echo "[" . date('Y-m-d H:i:s') . "] 清理过期连接错误: " . $e->getMessage() . "\n";
}
}
/**
* 客户端连接
* @param $server
* @param $request
* @return void
*/
public function onOpen($server, $request)
{
$fd = $request->fd;
$params = $request->get;
$token = isset($params['token']) ? $params['token'] : '';
$platform = isset($params['platform']) ? $params['platform'] : 0;
$client_ip = $request->header['x-real-ip'] ?: 'unknown';
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$fd} 连接,Token: {$token}, Platform: {$platform}\n";
try {
if (empty($token)) {
$server->push($fd, WsResponse::error(WsResponse::MST_TYPE_CONNECT, 'Token不能为空', WsResponse::CODE_ERROR));
$server->close($fd);
return;
}
/** @var MyResponse $res */
$validToken = new ValidToken();
$res = $validToken->validateToken($token, $platform);
if (!$res->isSuccess()) {
throw new Exception("token错误【" . $res->getMessage() . "");
}
$user_id = $res->getData()['uid'] ?: 0;
// 记录连接到数据库
$connection_id = $this->ci->recordConnection($fd, $user_id, $platform, $token, $client_ip);
echo "[" . date('Y-m-d H:i:s') . "] 用户 {$user_id} 连接成功,FD: {$fd}\n";
// 发送连接成功消息
// $server->push($fd, WsResponse::success(WsResponse::MST_TYPE_CONNECT, '连接成功'));
} catch (Exception $e) {
echo "[" . date('Y-m-d H:i:s') . "] 连接处理错误: " . $e->getMessage() . "\n";
$server->push($fd, WsResponse::error(WsResponse::MST_TYPE_CONNECT, $e->getMessage()));
$server->close($fd);
}
}
/**
* 收到消息
* @param $server
* @param $frame
* @return void
*/
public function onMessage($server, $frame)
{
$fd = $frame->fd;
echo "[" . date('Y-m-d H:i:s') . "] 收到消息 from {$fd}: {$frame->data}\n";
try {
// 更新活动时间
$this->ci->updateActivity($fd);
$mes = new Message();
/** @var MyResponse $result */
$result = $mes->handleMessage($server, $fd, $frame->data);
if (!$result->isSuccess()) {
throw new Exception($result->getMessage());
}
echo "[" . date('Y-m-d H:i:s') . "] 处理消息成功\n";
} catch (Exception $e) {
echo "[" . date('Y-m-d H:i:s') . "] 处理消息错误: " . $e->getMessage() . "\n";
$server->push($fd, WsResponse::error(WsResponse::MST_TYPE_MSG,
'处理失败: ' . $e->getMessage(), WsResponse::CODE_ERROR, $frame->data));
}
}
/**
* 处理消息
* @param $fd
* @param $data
* @return array
*/
private function handleMessage($fd, $data)
{
$type = $data['type'] ?: 'unknown';
switch ($type) {
case 'heartbeat':
return [
'code' => 0,
'type' => 'ping',
'msg' => 'pong'
];
case 'chat':
$content = $data['content'] ?: '';
return [
'code' => 0,
'msg' => '消息已发送',
'type' => 'chat_success',
'data' => ['content' => $content]
];
default:
return [
'code' => 400,
'msg' => '未知消息类型',
'type' => 'error'
];
}
}
/**
* 客户端断开
* @param $server
* @param $fd
* @return void
*/
public function onClose($server, $fd)
{
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$fd} 断开连接\n";
try {
$deleted = $this->ci->removeConnection($fd);
if ($deleted) {
echo "[" . date('Y-m-d H:i:s') . "] 成功删除客户端 {$fd} 的连接记录\n";
} else {
echo "[" . date('Y-m-d H:i:s') . "] 删除客户端 {$fd} 的连接记录失败或记录不存在\n";
}
} catch (Exception $e) {
echo "[" . date('Y-m-d H:i:s') . "] 处理断开连接时错误: " . $e->getMessage() . "\n";
}
}
/**
* Worker进程停止
* @param $server
* @param $worker_id
* @return void
*/
public function onWorkerStop($server, $worker_id)
{
echo "[" . date('Y-m-d H:i:s') . "] Worker进程 {$worker_id} 停止\n";
// 清理定时器
if (isset($this->db_timer_id)) {
swoole_timer_clear($this->db_timer_id);
}
// 关闭数据库连接
if ($this->ci) {
$this->ci->closeDbConnection();
}
}
/**
* 启动
* @return void
*/
public function start()
{
echo "[" . date('Y-m-d H:i:s') . "] 启动 WebSocket 服务...\n";
$this->server->start();
}
}
// 启动服务器
$server = new WebSocketServer();
$server->start();