295 lines
9.1 KiB
PHP
295 lines
9.1 KiB
PHP
<?php
|
||
if (php_sapi_name() !== 'cli') {
|
||
die("Error: This script can only be run from the command line.\n");
|
||
}
|
||
// 设置错误显示
|
||
ini_set('display_errors', 1);
|
||
error_reporting(E_ALL);
|
||
|
||
// 定义路径常量
|
||
require_once 'index.php';
|
||
|
||
// 加载自定义控制器
|
||
$controller_path = APPPATH . 'core/HD_WS_Controller.php';
|
||
if (!file_exists($controller_path)) {
|
||
die("错误: 找不到 HD_WS_Controller\n");
|
||
}
|
||
|
||
require_once $controller_path;
|
||
|
||
class WebSocketServer
|
||
{
|
||
private $server;
|
||
private $ci;
|
||
private $db_timer_id;
|
||
|
||
public function __construct()
|
||
{
|
||
$this->server = new swoole_websocket_server("0.0.0.0", 9501);
|
||
|
||
$this->server->set([
|
||
'worker_num' => 2,
|
||
'daemonize' => 0,
|
||
'max_request' => 1000,
|
||
'heartbeat_check_interval' => 60,
|
||
'heartbeat_idle_time' => 600,
|
||
'log_file' => '/tmp/websocket.log'
|
||
]);
|
||
|
||
$this->bindEvents();
|
||
|
||
echo "[" . date('Y-m-d H:i:s') . "] WebSocket 服务器初始化完成\n";
|
||
}
|
||
|
||
private function bindEvents()
|
||
{
|
||
$this->server->on('start', [$this, 'onStart']);
|
||
$this->server->on('workerStart', [$this, 'onWorkerStart']);
|
||
$this->server->on('open', [$this, 'onOpen']);
|
||
$this->server->on('message', [$this, 'onMessage']);
|
||
$this->server->on('close', [$this, 'onClose']);
|
||
$this->server->on('workerStop', [$this, 'onWorkerStop']);
|
||
}
|
||
|
||
public function onStart($server)
|
||
{
|
||
echo "[" . date('Y-m-d H:i:s') . "] WebSocket 服务启动: ws://0.0.0.0:9501\n";
|
||
}
|
||
|
||
public function onWorkerStart($server, $worker_id)
|
||
{
|
||
echo "[" . date('Y-m-d H:i:s') . "] Worker进程 {$worker_id} 启动\n";
|
||
|
||
// 在每个worker中创建独立的CI实例
|
||
$this->ci = new HD_WS_Controller();
|
||
|
||
// 每个worker都设置定时器
|
||
$this->setupTimers();
|
||
}
|
||
|
||
private function setupTimers()
|
||
{
|
||
// 数据库心跳 - 每15秒执行一次(小于MySQL的wait_timeout)
|
||
$this->db_timer_id = swoole_timer_tick(15000, function () {
|
||
$this->databaseHeartbeat();
|
||
});
|
||
|
||
echo "[" . date('Y-m-d H:i:s') . "] 数据库心跳定时器已启动\n";
|
||
|
||
// 清理过期连接 - 每60秒执行一次
|
||
swoole_timer_tick(60000, function () {
|
||
$this->cleanupExpiredConnections();
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 数据库心跳检测
|
||
*/
|
||
private function databaseHeartbeat()
|
||
{
|
||
try {
|
||
if (!$this->ci) {
|
||
echo "[" . date('Y-m-d H:i:s') . "] CI实例未初始化\n";
|
||
return;
|
||
}
|
||
|
||
$this->ci->safeDbQuery(function ($db) {
|
||
$db->query('SELECT 1');
|
||
});
|
||
|
||
echo "[" . date('Y-m-d H:i:s') . "] 数据库心跳检测成功\n";
|
||
|
||
} catch (Exception $e) {
|
||
echo "[" . date('Y-m-d H:i:s') . "] 数据库心跳检测失败: " . $e->getMessage() . "\n";
|
||
|
||
// 如果是连接错误,尝试重新初始化CI实例
|
||
if (strpos($e->getMessage(), 'MySQL server has gone away') !== false) {
|
||
echo "[" . date('Y-m-d H:i:s') . "] 尝试重新初始化数据库连接\n";
|
||
$this->ci = new WebSocket_CI_Controller();
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 清理过期连接
|
||
*/
|
||
private function cleanupExpiredConnections()
|
||
{
|
||
try {
|
||
$this->ci->safeDbQuery(function ($db) {
|
||
// 删除超过10分钟没有活动的连接
|
||
$expire_time = date('Y-m-d H:i:s', time() - 600);
|
||
$db->where('last_activity <', $expire_time);
|
||
$db->where('status', 1);
|
||
$deleted = $db->delete('lc_ws_conn');
|
||
|
||
if ($deleted > 0) {
|
||
echo "[" . date('Y-m-d H:i:s') . "] 清理了 {$deleted} 个过期连接\n";
|
||
}
|
||
});
|
||
} catch (Exception $e) {
|
||
echo "[" . date('Y-m-d H:i:s') . "] 清理过期连接错误: " . $e->getMessage() . "\n";
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 客户端连接
|
||
* @param $server
|
||
* @param $request
|
||
* @return void
|
||
*/
|
||
public function onOpen($server, $request)
|
||
{
|
||
$fd = $request->fd;
|
||
$params = $request->get;
|
||
$token = isset($params['token']) ? $params['token'] : '';
|
||
$platform = isset($params['platform']) ? $params['platform'] : 0;
|
||
$client_ip = $request->header['x-real-ip'] ?: 'unknown';
|
||
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$fd} 连接,Token: {$token}, Platform: {$platform}\n";
|
||
try {
|
||
if (empty($token)) {
|
||
$server->push($fd, WsResponse::error(WsResponse::MST_TYPE_CONNECT, 'Token不能为空', WsResponse::CODE_ERROR));
|
||
$server->close($fd);
|
||
return;
|
||
}
|
||
/** @var MyResponse $res */
|
||
$validToken = new ValidToken();
|
||
$res = $validToken->validateToken($token, $platform);
|
||
|
||
if (!$res->isSuccess()) {
|
||
throw new Exception("token错误【" . $res->getMessage() . "】");
|
||
}
|
||
$user_id = $res->getData()['uid'] ?: 0;
|
||
// 记录连接到数据库
|
||
$connection_id = $this->ci->recordConnection($fd, $user_id, $platform, $token, $client_ip);
|
||
echo "[" . date('Y-m-d H:i:s') . "] 用户 {$user_id} 连接成功,FD: {$fd}\n";
|
||
// 发送连接成功消息
|
||
// $server->push($fd, WsResponse::success(WsResponse::MST_TYPE_CONNECT, '连接成功'));
|
||
} catch (Exception $e) {
|
||
echo "[" . date('Y-m-d H:i:s') . "] 连接处理错误: " . $e->getMessage() . "\n";
|
||
$server->push($fd, WsResponse::error(WsResponse::MST_TYPE_CONNECT, $e->getMessage()));
|
||
$server->close($fd);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 收到消息
|
||
* @param $server
|
||
* @param $frame
|
||
* @return void
|
||
*/
|
||
public function onMessage($server, $frame)
|
||
{
|
||
$fd = $frame->fd;
|
||
|
||
echo "[" . date('Y-m-d H:i:s') . "] 收到消息 from {$fd}: {$frame->data}\n";
|
||
try {
|
||
// 更新活动时间
|
||
$this->ci->updateActivity($fd);
|
||
$mes = new Message();
|
||
/** @var MyResponse $result */
|
||
$result = $mes->handleMessage($server, $fd, $frame->data);
|
||
if (!$result->isSuccess()) {
|
||
throw new Exception($result->getMessage());
|
||
}
|
||
echo "[" . date('Y-m-d H:i:s') . "] 处理消息成功\n";
|
||
} catch (Exception $e) {
|
||
echo "[" . date('Y-m-d H:i:s') . "] 处理消息错误: " . $e->getMessage() . "\n";
|
||
$server->push($fd, WsResponse::error(WsResponse::MST_TYPE_MSG,
|
||
'处理失败: ' . $e->getMessage(), WsResponse::CODE_ERROR, $frame->data));
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 处理消息
|
||
* @param $fd
|
||
* @param $data
|
||
* @return array
|
||
*/
|
||
private function handleMessage($fd, $data)
|
||
{
|
||
$type = $data['type'] ?: 'unknown';
|
||
|
||
switch ($type) {
|
||
case 'heartbeat':
|
||
return [
|
||
'code' => 0,
|
||
'type' => 'ping',
|
||
'msg' => 'pong'
|
||
];
|
||
case 'chat':
|
||
$content = $data['content'] ?: '';
|
||
return [
|
||
'code' => 0,
|
||
'msg' => '消息已发送',
|
||
'type' => 'chat_success',
|
||
'data' => ['content' => $content]
|
||
];
|
||
default:
|
||
return [
|
||
'code' => 400,
|
||
'msg' => '未知消息类型',
|
||
'type' => 'error'
|
||
];
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 客户端断开
|
||
* @param $server
|
||
* @param $fd
|
||
* @return void
|
||
*/
|
||
public function onClose($server, $fd)
|
||
{
|
||
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$fd} 断开连接\n";
|
||
|
||
try {
|
||
$deleted = $this->ci->removeConnection($fd);
|
||
|
||
if ($deleted) {
|
||
echo "[" . date('Y-m-d H:i:s') . "] 成功删除客户端 {$fd} 的连接记录\n";
|
||
} else {
|
||
echo "[" . date('Y-m-d H:i:s') . "] 删除客户端 {$fd} 的连接记录失败或记录不存在\n";
|
||
}
|
||
|
||
} catch (Exception $e) {
|
||
echo "[" . date('Y-m-d H:i:s') . "] 处理断开连接时错误: " . $e->getMessage() . "\n";
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Worker进程停止
|
||
* @param $server
|
||
* @param $worker_id
|
||
* @return void
|
||
*/
|
||
public function onWorkerStop($server, $worker_id)
|
||
{
|
||
echo "[" . date('Y-m-d H:i:s') . "] Worker进程 {$worker_id} 停止\n";
|
||
|
||
// 清理定时器
|
||
if (isset($this->db_timer_id)) {
|
||
swoole_timer_clear($this->db_timer_id);
|
||
}
|
||
|
||
// 关闭数据库连接
|
||
if ($this->ci) {
|
||
$this->ci->closeDbConnection();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 启动
|
||
* @return void
|
||
*/
|
||
public function start()
|
||
{
|
||
echo "[" . date('Y-m-d H:i:s') . "] 启动 WebSocket 服务...\n";
|
||
$this->server->start();
|
||
}
|
||
}
|
||
|
||
// 启动服务器
|
||
$server = new WebSocketServer();
|
||
$server->start(); |