on('connection', function($socket) use (&$push_value){ // 客户端连接时立即发送当前数值 $socket->emit('push_value', ['value' => $push_value]); // 连接时自动推送最新的未回价数量 try { // 使用PDO连接数据库查询最新数量 $dsn = 'mysql:host=localhost;dbname=pgserver;charset=utf8'; $pdo = new PDO($dsn, 'root', ''); $stmt = $pdo->query('SELECT COUNT(*) as count FROM pg_inquiry WHERE return_price_status = 2'); $result = $stmt->fetch(PDO::FETCH_ASSOC); $count = $result['count'] ?? 0; // 推送未回价数量 $socket->emit('updateUnreturnedCount', ['count' => $count]); error_log("WebSocket连接时推送未回价数量: $count"); } catch (Exception $e) { error_log("WebSocket连接时获取未回价数量失败: " . $e->getMessage()); } $socket->on('login', function ($uid)use($socket){ global $uidConnectionMap; if(isset($socket->uid)){ return; } $uid = (string)$uid; if(!isset($uidConnectionMap[$uid])){ $uidConnectionMap[$uid] = 0; } ++$uidConnectionMap[$uid]; $socket->join($uid); $socket->uid = $uid; }); $socket->on('disconnect', function () use($socket) { if(!isset($socket->uid)){ return; } global $uidConnectionMap; if(--$uidConnectionMap[$socket->uid] <= 0){ unset($uidConnectionMap[$socket->uid]); } }); }); // 监听http端口用于推送和更新数值 $sender_io->on('workerStart', function()use ($sender_io){ $inner_http_worker = new Worker('http://0.0.0.0:2121'); $inner_http_worker->onMessage = function($http_connection, $request_data)use ($sender_io){ global $uidConnectionMap, $push_value; // 添加跨域头 $http_connection->header('Access-Control-Allow-Origin: *'); $http_connection->header('Access-Control-Allow-Methods: GET, POST, OPTIONS'); $http_connection->header('Access-Control-Allow-Headers: Content-Type'); // 处理OPTIONS预检请求 if(isset($request_data['server']['REQUEST_METHOD']) && $request_data['server']['REQUEST_METHOD'] === 'OPTIONS'){ return $http_connection->send('ok'); } // Workerman HTTP Worker 的 $request_data 是一个数组 // 包含 get, post, cookie, server 等键 $msgContent = array(); // 解析 GET 参数 if(isset($request_data['get']) && is_array($request_data['get'])){ $msgContent = array_merge($msgContent, $request_data['get']); } // 解析 POST 参数 if(isset($request_data['post']) && is_array($request_data['post'])){ $msgContent = array_merge($msgContent, $request_data['post']); } // 调试:输出所有信息 $logData = array( 'msgContent' => $msgContent, 'get' => $request_data['get'] ?? [], 'post' => $request_data['post'] ?? [] ); error_log("WebSocket HTTP Request: " . json_encode($logData)); // 更新数值并推送给所有客户端 if(isset($msgContent['type']) && $msgContent['type'] == 'update_value'){ $push_value = isset($msgContent['value']) ? intval($msgContent['value']) : rand(100, 1000); $sender_io->emit('push_value', ['value' => $push_value]); return $http_connection->send('ok'); } // 广播事件给所有客户端(通用推送) if(isset($msgContent['type']) && $msgContent['type'] == 'broadcast'){ $event = isset($msgContent['event']) ? $msgContent['event'] : ''; $eventData = isset($msgContent['data']) ? $msgContent['data'] : ''; error_log("Broadcast check - event: '$event', data: '$eventData'"); if(!empty($event)){ if(is_string($eventData)){ $decoded = json_decode($eventData, true); $eventData = $decoded !== null ? $decoded : $eventData; } $sender_io->emit($event, $eventData); error_log("Broadcast sent - event: $event, data: " . print_r($eventData, true)); return $http_connection->send('ok'); } } // 推送消息给指定用户或所有用户 if(isset($msgContent['type']) && $msgContent['type'] == 'publish'){ $to = isset($msgContent['to']) ? $msgContent['to'] : ''; $content = isset($msgContent['content']) ? $msgContent['content'] : ''; if($to){ $sender_io->to($to)->emit('newMsg', $content); }else{ $sender_io->emit('newMsg', $content); } if($to && !isset($uidConnectionMap[$to])){ return $http_connection->send('offline'); }else{ return $http_connection->send('ok'); } } return $http_connection->send('fail'); }; $inner_http_worker->listen(); }); Worker::runAll();