server = new swoole_websocket_server("0.0.0.0", 9501); $this->server->set([ 'worker_num' => 2, 'daemonize' => 0, 'max_request' => 1000, 'heartbeat_check_interval' => 60, 'heartbeat_idle_time' => 600, 'log_file' => '/tmp/websocket.log' ]); $this->bindEvents(); echo "[" . date('Y-m-d H:i:s') . "] WebSocket 服务器初始化完成\n"; } private function bindEvents() { $this->server->on('start', [$this, 'onStart']); $this->server->on('workerStart', [$this, 'onWorkerStart']); $this->server->on('open', [$this, 'onOpen']); $this->server->on('message', [$this, 'onMessage']); $this->server->on('close', [$this, 'onClose']); $this->server->on('workerStop', [$this, 'onWorkerStop']); } public function onStart($server) { echo "[" . date('Y-m-d H:i:s') . "] WebSocket 服务启动: ws://0.0.0.0:9501\n"; } public function onWorkerStart($server, $worker_id) { echo "[" . date('Y-m-d H:i:s') . "] Worker进程 {$worker_id} 启动\n"; // 在每个worker中创建独立的CI实例 $this->ci = new HD_WS_Controller(); // 每个worker都设置定时器 $this->setupTimers(); } private function setupTimers() { // 数据库心跳 - 每15秒执行一次(小于MySQL的wait_timeout) $this->db_timer_id = swoole_timer_tick(15000, function () { $this->databaseHeartbeat(); }); echo "[" . date('Y-m-d H:i:s') . "] 数据库心跳定时器已启动\n"; // 清理过期连接 - 每60秒执行一次 swoole_timer_tick(60000, function () { $this->cleanupExpiredConnections(); }); } /** * 数据库心跳检测 */ private function databaseHeartbeat() { try { if (!$this->ci) { echo "[" . date('Y-m-d H:i:s') . "] CI实例未初始化\n"; return; } $this->ci->safeDbQuery(function ($db) { $db->query('SELECT 1'); }); echo "[" . date('Y-m-d H:i:s') . "] 数据库心跳检测成功\n"; } catch (Exception $e) { echo "[" . date('Y-m-d H:i:s') . "] 数据库心跳检测失败: " . $e->getMessage() . "\n"; // 如果是连接错误,尝试重新初始化CI实例 if (strpos($e->getMessage(), 'MySQL server has gone away') !== false) { echo "[" . date('Y-m-d H:i:s') . "] 尝试重新初始化数据库连接\n"; $this->ci = new WebSocket_CI_Controller(); } } } /** * 清理过期连接 */ private function cleanupExpiredConnections() { try { $this->ci->safeDbQuery(function ($db) { // 删除超过10分钟没有活动的连接 $expire_time = date('Y-m-d H:i:s', time() - 600); $db->where('last_activity <', $expire_time); $db->where('status', 1); $deleted = $db->delete('lc_ws_conn'); if ($deleted > 0) { echo "[" . date('Y-m-d H:i:s') . "] 清理了 {$deleted} 个过期连接\n"; } }); } catch (Exception $e) { echo "[" . date('Y-m-d H:i:s') . "] 清理过期连接错误: " . $e->getMessage() . "\n"; } } /** * 客户端连接 * @param $server * @param $request * @return void */ public function onOpen($server, $request) { $fd = $request->fd; $params = $request->get; $token = isset($params['token']) ? $params['token'] : ''; $platform = isset($params['platform']) ? $params['platform'] : 0; $client_ip = $request->header['x-real-ip'] ?: 'unknown'; echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$fd} 连接,Token: {$token}, Platform: {$platform}\n"; try { if (empty($token)) { $server->push($fd, WsResponse::error(WsResponse::MST_TYPE_CONNECT, 'Token不能为空', WsResponse::CODE_ERROR)); $server->close($fd); return; } /** @var MyResponse $res */ $validToken = new ValidToken(); $res = $validToken->validateToken($token, $platform); if (!$res->isSuccess()) { throw new Exception("token错误【" . $res->getMessage() . "】"); } $user_id = $res->getData()['uid'] ?: 0; // 记录连接到数据库 $connection_id = $this->ci->recordConnection($fd, $user_id, $platform, $token, $client_ip); echo "[" . date('Y-m-d H:i:s') . "] 用户 {$user_id} 连接成功,FD: {$fd}\n"; // 发送连接成功消息 // $server->push($fd, WsResponse::success(WsResponse::MST_TYPE_CONNECT, '连接成功')); } catch (Exception $e) { echo "[" . date('Y-m-d H:i:s') . "] 连接处理错误: " . $e->getMessage() . "\n"; $server->push($fd, WsResponse::error(WsResponse::MST_TYPE_CONNECT, $e->getMessage())); $server->close($fd); } } /** * 收到消息 * @param $server * @param $frame * @return void */ public function onMessage($server, $frame) { $fd = $frame->fd; echo "[" . date('Y-m-d H:i:s') . "] 收到消息 from {$fd}: {$frame->data}\n"; try { // 更新活动时间 $this->ci->updateActivity($fd); $mes = new Message(); /** @var MyResponse $result */ $result = $mes->handleMessage($server, $fd, $frame->data); if (!$result->isSuccess()) { throw new Exception($result->getMessage()); } echo "[" . date('Y-m-d H:i:s') . "] 处理消息成功\n"; } catch (Exception $e) { echo "[" . date('Y-m-d H:i:s') . "] 处理消息错误: " . $e->getMessage() . "\n"; $server->push($fd, WsResponse::error(WsResponse::MST_TYPE_MSG, '处理失败: ' . $e->getMessage(), WsResponse::CODE_ERROR, $frame->data)); } } /** * 处理消息 * @param $fd * @param $data * @return array */ private function handleMessage($fd, $data) { $type = $data['type'] ?: 'unknown'; switch ($type) { case 'heartbeat': return [ 'code' => 0, 'type' => 'ping', 'msg' => 'pong' ]; case 'chat': $content = $data['content'] ?: ''; return [ 'code' => 0, 'msg' => '消息已发送', 'type' => 'chat_success', 'data' => ['content' => $content] ]; default: return [ 'code' => 400, 'msg' => '未知消息类型', 'type' => 'error' ]; } } /** * 客户端断开 * @param $server * @param $fd * @return void */ public function onClose($server, $fd) { echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$fd} 断开连接\n"; try { $deleted = $this->ci->removeConnection($fd); if ($deleted) { echo "[" . date('Y-m-d H:i:s') . "] 成功删除客户端 {$fd} 的连接记录\n"; } else { echo "[" . date('Y-m-d H:i:s') . "] 删除客户端 {$fd} 的连接记录失败或记录不存在\n"; } } catch (Exception $e) { echo "[" . date('Y-m-d H:i:s') . "] 处理断开连接时错误: " . $e->getMessage() . "\n"; } } /** * Worker进程停止 * @param $server * @param $worker_id * @return void */ public function onWorkerStop($server, $worker_id) { echo "[" . date('Y-m-d H:i:s') . "] Worker进程 {$worker_id} 停止\n"; // 清理定时器 if (isset($this->db_timer_id)) { swoole_timer_clear($this->db_timer_id); } // 关闭数据库连接 if ($this->ci) { $this->ci->closeDbConnection(); } } /** * 启动 * @return void */ public function start() { echo "[" . date('Y-m-d H:i:s') . "] 启动 WebSocket 服务...\n"; $this->server->start(); } } // 启动服务器 $server = new WebSocketServer(); $server->start();