Files
pgserver3.0/pgserver/push.php
2026-04-27 10:10:35 +08:00

174 lines
7.1 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
// 直接启动WebSocket服务不通过thinkphp容器
require __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use PHPSocketIO\SocketIO;
// 加载环境变量第二个参数true表示解析sections
$env = parse_ini_file(__DIR__ . '/.env', true);
$push_config = isset($env['push']) ? $env['push'] : [];
$socket_port = isset($push_config['socket_port']) ? intval($push_config['socket_port']) : 22121;
$http_port = isset($push_config['http_port']) ? intval($push_config['http_port']) : 22120;
// 全局数组保存uid在线数据
$uidConnectionMap = [];
// 模拟推送的数值
$push_value = rand(100, 1000);
// PHPSocketIO服务
// 添加CORS支持配置
$sender_io = new SocketIO($socket_port, array(
'allowOrigin' => '*',
'allowMethods' => 'GET, POST, OPTIONS',
'allowHeaders' => 'Content-Type',
));
// 客户端发起连接事件
$sender_io->on('connection', function($socket) use (&$push_value){
// 客户端连接时立即发送当前数值
$socket->emit('push_value', ['value' => $push_value]);
// 连接时发送刷新事件,让客户端主动获取所有数量(避免重复查询逻辑)
error_log("WebSocket客户端连接发送4个刷新事件");
$socket->emit('updateUnreturnedCount', ['refresh' => true]);
$socket->emit('updateEstimatePendingCount', ['refresh' => true]);
$socket->emit('updateReportProduceCount', ['refresh' => true]);
$socket->emit('updateSurveyFollowCount', ['refresh' => true]);
$socket->on('login', function ($uid)use($socket){
global $uidConnectionMap;
if(isset($socket->uid)){
return;
}
$uid = (string)$uid;
if(!isset($uidConnectionMap[$uid])){
$uidConnectionMap[$uid] = 0;
}
++$uidConnectionMap[$uid];
$socket->join($uid);
$socket->uid = $uid;
// 登录时推送刷新事件,让客户端主动获取所有数量
$socket->emit('updateSurveyFollowCount', ['refresh' => true]);
$socket->emit('updateUnreturnedCount', ['refresh' => true]);
$socket->emit('updateEstimatePendingCount', ['refresh' => true]);
$socket->emit('updateReportProduceCount', ['refresh' => true]);
error_log("WebSocket登录时推送用户{$uid}的4个刷新事件");
});
// 处理客户端主动刷新请求
$socket->on('refreshAllCounts', function()use($socket){
error_log("收到客户端刷新请求");
// 发送刷新事件给当前客户端
$socket->emit('updateSurveyFollowCount', ['refresh' => true]);
$socket->emit('updateUnreturnedCount', ['refresh' => true]);
$socket->emit('updateEstimatePendingCount', ['refresh' => true]);
$socket->emit('updateReportProduceCount', ['refresh' => true]);
error_log("已向客户端发送刷新事件(包含报告数量)");
});
$socket->on('disconnect', function () use($socket) {
if(!isset($socket->uid)){
return;
}
global $uidConnectionMap;
if(--$uidConnectionMap[$socket->uid] <= 0){
unset($uidConnectionMap[$socket->uid]);
}
});
});
// 监听http端口用于推送和更新数值
$sender_io->on('workerStart', function()use ($sender_io, $http_port){
$inner_http_worker = new Worker("http://0.0.0.0:$http_port");
$inner_http_worker->onMessage = function($http_connection, $request_data)use ($sender_io){
global $uidConnectionMap, $push_value;
// 添加跨域头
$http_connection->header('Access-Control-Allow-Origin: *');
$http_connection->header('Access-Control-Allow-Methods: GET, POST, OPTIONS');
$http_connection->header('Access-Control-Allow-Headers: Content-Type');
// 处理OPTIONS预检请求
if(isset($request_data['server']['REQUEST_METHOD']) && $request_data['server']['REQUEST_METHOD'] === 'OPTIONS'){
return $http_connection->send('ok');
}
// Workerman HTTP Worker 的 $request_data 是一个数组
// 包含 get, post, cookie, server 等键
$msgContent = array();
// 解析 GET 参数
if(isset($request_data['get']) && is_array($request_data['get'])){
$msgContent = array_merge($msgContent, $request_data['get']);
}
// 解析 POST 参数
if(isset($request_data['post']) && is_array($request_data['post'])){
$msgContent = array_merge($msgContent, $request_data['post']);
}
// 调试:输出所有信息
$logData = array(
'msgContent' => $msgContent,
'get' => $request_data['get'] ?? [],
'post' => $request_data['post'] ?? []
);
error_log("WebSocket HTTP Request: " . json_encode($logData));
// 更新数值并推送给所有客户端
if(isset($msgContent['type']) && $msgContent['type'] == 'update_value'){
$push_value = isset($msgContent['value']) ? intval($msgContent['value']) : rand(100, 1000);
$sender_io->emit('push_value', ['value' => $push_value]);
return $http_connection->send('ok');
}
// 广播事件给所有客户端(通用推送)
if(isset($msgContent['type']) && $msgContent['type'] == 'broadcast'){
$event = isset($msgContent['event']) ? $msgContent['event'] : '';
$eventData = isset($msgContent['data']) ? $msgContent['data'] : '';
error_log("=== WebSocket广播调试 ===");
error_log("事件名称: '$event'");
error_log("原始数据: '$eventData'");
if(!empty($event)){
if(is_string($eventData)){
$decoded = json_decode($eventData, true);
$eventData = $decoded !== null ? $decoded : $eventData;
}
error_log("解码后数据: " . print_r($eventData, true));
// 获取所有连接的客户端并发送消息
// PHPSocketIO 1.x 使用 connections 数组获取所有连接
foreach ($sender_io->connections as $connection) {
$connection->emit($event, $eventData);
}
error_log("广播已发送到所有客户端 - 事件: $event");
return $http_connection->send('ok');
}
}
// 推送消息给指定用户或所有用户
if(isset($msgContent['type']) && $msgContent['type'] == 'publish'){
$to = isset($msgContent['to']) ? $msgContent['to'] : '';
$content = isset($msgContent['content']) ? $msgContent['content'] : '';
if($to){
$sender_io->to($to)->emit('newMsg', $content);
}else{
$sender_io->emit('newMsg', $content);
}
if($to && !isset($uidConnectionMap[$to])){
return $http_connection->send('offline');
}else{
return $http_connection->send('ok');
}
}
return $http_connection->send('fail');
};
$inner_http_worker->listen();
});
Worker::runAll();