Files
pgserver3.0/pgserver/push.php
2026-04-23 18:25:35 +08:00

145 lines
5.5 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
// 直接启动WebSocket服务不通过thinkphp容器
require __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use PHPSocketIO\SocketIO;
// 全局数组保存uid在线数据
$uidConnectionMap = [];
// 模拟推送的数值
$push_value = rand(100, 1000);
// PHPSocketIO服务
$sender_io = new SocketIO(2120);
// 客户端发起连接事件
$sender_io->on('connection', function($socket) use (&$push_value){
// 客户端连接时立即发送当前数值
$socket->emit('push_value', ['value' => $push_value]);
// 连接时自动推送最新的未回价数量
try {
// 使用PDO连接数据库查询最新数量
$dsn = 'mysql:host=localhost;dbname=pgserver;charset=utf8';
$pdo = new PDO($dsn, 'root', '');
$stmt = $pdo->query('SELECT COUNT(*) as count FROM pg_inquiry WHERE return_price_status = 2');
$result = $stmt->fetch(PDO::FETCH_ASSOC);
$count = $result['count'] ?? 0;
// 推送未回价数量
$socket->emit('updateUnreturnedCount', ['count' => $count]);
error_log("WebSocket连接时推送未回价数量: $count");
} catch (Exception $e) {
error_log("WebSocket连接时获取未回价数量失败: " . $e->getMessage());
}
$socket->on('login', function ($uid)use($socket){
global $uidConnectionMap;
if(isset($socket->uid)){
return;
}
$uid = (string)$uid;
if(!isset($uidConnectionMap[$uid])){
$uidConnectionMap[$uid] = 0;
}
++$uidConnectionMap[$uid];
$socket->join($uid);
$socket->uid = $uid;
});
$socket->on('disconnect', function () use($socket) {
if(!isset($socket->uid)){
return;
}
global $uidConnectionMap;
if(--$uidConnectionMap[$socket->uid] <= 0){
unset($uidConnectionMap[$socket->uid]);
}
});
});
// 监听http端口用于推送和更新数值
$sender_io->on('workerStart', function()use ($sender_io){
$inner_http_worker = new Worker('http://0.0.0.0:2121');
$inner_http_worker->onMessage = function($http_connection, $request_data)use ($sender_io){
global $uidConnectionMap, $push_value;
// 添加跨域头
$http_connection->header('Access-Control-Allow-Origin: *');
$http_connection->header('Access-Control-Allow-Methods: GET, POST, OPTIONS');
$http_connection->header('Access-Control-Allow-Headers: Content-Type');
// 处理OPTIONS预检请求
if(isset($request_data['server']['REQUEST_METHOD']) && $request_data['server']['REQUEST_METHOD'] === 'OPTIONS'){
return $http_connection->send('ok');
}
// Workerman HTTP Worker 的 $request_data 是一个数组
// 包含 get, post, cookie, server 等键
$msgContent = array();
// 解析 GET 参数
if(isset($request_data['get']) && is_array($request_data['get'])){
$msgContent = array_merge($msgContent, $request_data['get']);
}
// 解析 POST 参数
if(isset($request_data['post']) && is_array($request_data['post'])){
$msgContent = array_merge($msgContent, $request_data['post']);
}
// 调试:输出所有信息
$logData = array(
'msgContent' => $msgContent,
'get' => $request_data['get'] ?? [],
'post' => $request_data['post'] ?? []
);
error_log("WebSocket HTTP Request: " . json_encode($logData));
// 更新数值并推送给所有客户端
if(isset($msgContent['type']) && $msgContent['type'] == 'update_value'){
$push_value = isset($msgContent['value']) ? intval($msgContent['value']) : rand(100, 1000);
$sender_io->emit('push_value', ['value' => $push_value]);
return $http_connection->send('ok');
}
// 广播事件给所有客户端(通用推送)
if(isset($msgContent['type']) && $msgContent['type'] == 'broadcast'){
$event = isset($msgContent['event']) ? $msgContent['event'] : '';
$eventData = isset($msgContent['data']) ? $msgContent['data'] : '';
error_log("Broadcast check - event: '$event', data: '$eventData'");
if(!empty($event)){
if(is_string($eventData)){
$decoded = json_decode($eventData, true);
$eventData = $decoded !== null ? $decoded : $eventData;
}
$sender_io->emit($event, $eventData);
error_log("Broadcast sent - event: $event, data: " . print_r($eventData, true));
return $http_connection->send('ok');
}
}
// 推送消息给指定用户或所有用户
if(isset($msgContent['type']) && $msgContent['type'] == 'publish'){
$to = isset($msgContent['to']) ? $msgContent['to'] : '';
$content = isset($msgContent['content']) ? $msgContent['content'] : '';
if($to){
$sender_io->to($to)->emit('newMsg', $content);
}else{
$sender_io->emit('newMsg', $content);
}
if($to && !isset($uidConnectionMap[$to])){
return $http_connection->send('offline');
}else{
return $http_connection->send('ok');
}
}
return $http_connection->send('fail');
};
$inner_http_worker->listen();
});
Worker::runAll();