first commit

This commit is contained in:
annnj-company
2026-04-17 18:29:53 +08:00
parent e49fa5a215
commit 130c1026c4
5615 changed files with 1639145 additions and 0 deletions

View File

@@ -0,0 +1,124 @@
<?php
namespace PHPSocketIO;
class ChannelAdapter extends DefaultAdapter
{
protected $_channelId = null;
public static $ip = '127.0.0.1';
public static $port = 2206;
public function __construct($nsp)
{
parent::__construct($nsp);
$this->_channelId = (function_exists('random_int') ? random_int(1, 10000000): rand(1, 10000000)) . "-" . (function_exists('posix_getpid') ? posix_getpid(): 1);
\Channel\Client::connect(self::$ip, self::$port);
\Channel\Client::$onMessage = array($this, 'onChannelMessage');
\Channel\Client::subscribe("socket.io#/#");
Debug::debug('ChannelAdapter __construct');
}
public function __destruct()
{
Debug::debug('ChannelAdapter __destruct');
}
public function add($id ,$room)
{
$this->sids[$id][$room] = true;
$this->rooms[$room][$id] = true;
$channel = "socket.io#/#$room#";
\Channel\Client::subscribe($channel);
}
public function del($id, $room)
{
unset($this->sids[$id][$room]);
unset($this->rooms[$room][$id]);
if(!empty($this->rooms[$room]))
{
unset($this->rooms[$room]);
$channel = "socket.io#/#$room#";
\Channel\Client::unsubscribe($channel);
}
}
public function delAll($id)
{
$rooms = isset($this->sids[$id]) ? $this->sids[$id] : array();
if($rooms)
{
foreach($rooms as $room)
{
if(isset($this->rooms[$room][$id]))
{
unset($this->rooms[$room][$id]);
$channel = "socket.io#/#$room#";
\Channel\Client::unsubscribe($channel);
}
}
}
if(empty($this->rooms[$room]))
{
unset($this->rooms[$room]);
}
unset($this->sids[$id]);
}
public function onChannelMessage($channel, $msg)
{
if($this->_channelId === array_shift($msg))
{
//echo "ignore same channel_id \n";
return;
}
$packet = $msg[0];
$opts = $msg[1];
if(!$packet)
{
echo "invalid channel:$channel packet \n";
return;
}
if(empty($packet['nsp']))
{
$packet['nsp'] = '/';
}
if($packet['nsp'] != $this->nsp->name)
{
echo "ignore different namespace {$packet['nsp']} != {$this->nsp->name}\n";
return;
}
$this->broadcast($packet, $opts, true);
}
public function broadcast($packet, $opts, $remote = false)
{
parent::broadcast($packet, $opts);
if (!$remote)
{
$packet['nsp'] = '/';
if(!empty($opts['rooms']))
{
foreach($opts['rooms'] as $room)
{
$chn = "socket.io#/#$room#";
$msg = array($this->_channelId, $packet, $opts);
\Channel\Client::publish($chn, $msg);
}
}
else
{
$chn = "socket.io#/#";
$msg = array($this->_channelId, $packet, $opts);
\Channel\Client::publish($chn, $msg);
}
}
}
}

View File

@@ -0,0 +1,260 @@
<?php
namespace PHPSocketIO;
use PHPSocketIO\Parser\Parser;
class Client
{
public $server = null;
public $conn = null;
public $encoder = null;
public $decoder = null;
public $id = null;
public $request = null;
public $nsps = array();
public $connectBuffer = array();
public function __construct($server, $conn)
{
$this->server = $server;
$this->conn = $conn;
$this->encoder = new \PHPSocketIO\Parser\Encoder();
$this->decoder = new \PHPSocketIO\Parser\Decoder();
$this->id = $conn->id;
$this->request = $conn->request;
$this->setup();
Debug::debug('Client __construct');
}
public function __destruct()
{
Debug::debug('Client __destruct');
}
/**
* Sets up event listeners.
*
* @api private
*/
public function setup(){
$this->decoder->on('decoded', array($this,'ondecoded'));
$this->conn->on('data', array($this,'ondata'));
$this->conn->on('error', array($this, 'onerror'));
$this->conn->on('close' ,array($this, 'onclose'));
}
/**
* Connects a client to a namespace.
*
* @param {String} namespace name
* @api private
*/
public function connect($name){
if (!isset($this->server->nsps[$name]))
{
$this->packet(array('type'=> Parser::ERROR, 'nsp'=> $name, 'data'=> 'Invalid namespace'));
return;
}
$nsp = $this->server->of($name);
if ('/' !== $name && !isset($this->nsps['/']))
{
$this->connectBuffer[$name] = $name;
return;
}
$nsp->add($this, $nsp, array($this, 'nspAdd'));
}
public function nspAdd($socket, $nsp)
{
$this->sockets[$socket->id] = $socket;
$this->nsps[$nsp->name] = $socket;
if ('/' === $nsp->name && $this->connectBuffer)
{
foreach($this->connectBuffer as $name)
{
$this->connect($name);
}
$this->connectBuffer = array();
}
}
/**
* Disconnects from all namespaces and closes transport.
*
* @api private
*/
public function disconnect()
{
foreach($this->sockets as $socket)
{
$socket->disconnect();
}
$this->sockets = array();
$this->close();
}
/**
* Removes a socket. Called by each `Socket`.
*
* @api private
*/
public function remove($socket)
{
if(isset($this->sockets[$socket->id]))
{
$nsp = $this->sockets[$socket->id]->nsp->name;
unset($this->sockets[$socket->id]);
unset($this->nsps[$nsp]);
} else {
//echo('ignoring remove for '. $socket->id);
}
}
/**
* Closes the underlying connection.
*
* @api private
*/
public function close()
{
if (empty($this->conn)) return;
if('open' === $this->conn->readyState)
{
//echo('forcing transport close');
$this->conn->close();
$this->onclose('forced server close');
}
}
/**
* Writes a packet to the transport.
*
* @param {Object} packet object
* @param {Object} options
* @api private
*/
public function packet($packet, $preEncoded = false, $volatile = false)
{
if('open' === $this->conn->readyState)
{
if (!$preEncoded)
{
// not broadcasting, need to encode
$encodedPackets = $this->encoder->encode($packet);
$this->writeToEngine($encodedPackets, $volatile);
} else { // a broadcast pre-encodes a packet
$this->writeToEngine($packet);
}
} else {
// todo check
// echo('ignoring packet write ' . var_export($packet, true));
}
}
public function writeToEngine($encodedPackets, $volatile = false)
{
if($volatile)echo new \Exception('volatile');
if ($volatile && !$this->conn->transport->writable) return;
// todo check
if(isset($encodedPackets['nsp']))unset($encodedPackets['nsp']);
foreach($encodedPackets as $packet)
{
$this->conn->write($packet);
}
}
/**
* Called with incoming transport data.
*
* @api private
*/
public function ondata($data)
{
try {
// todo chek '2["chat message","2"]' . "\0" . ''
$this->decoder->add(trim($data));
} catch(\Exception $e) {
$this->onerror($e);
}
}
/**
* Called when parser fully decodes a packet.
*
* @api private
*/
public function ondecoded($packet)
{
if(Parser::CONNECT === $packet['type'])
{
$this->connect($packet->nsp);
} else {
if(isset($this->nsps[$packet['nsp']]))
{
$this->nsps[$packet['nsp']]->onpacket($packet);
} else {
//echo('no socket for namespace ' . $packet['nsp']);
}
}
}
/**
* Handles an error.
*
* @param {Objcet} error object
* @api private
*/
public function onerror($err)
{
foreach($this->sockets as $socket)
{
$socket->onerror($err);
}
$this->onclose('client error');
}
/**
* Called upon transport close.
*
* @param {String} reason
* @api private
*/
public function onclose($reason)
{
if (empty($this->conn)) return;
// ignore a potential subsequent `close` event
$this->destroy();
// `nsps` and `sockets` are cleaned up seamlessly
foreach($this->sockets as $socket)
{
$socket->onclose($reason);
}
$this->sockets = null;
}
/**
* Cleans up event listeners.
*
* @api private
*/
public function destroy()
{
if (!$this->conn) return;
$this->conn->removeAllListeners();
$this->decoder->removeAllListeners();
$this->encoder->removeAllListeners();
$this->server = $this->conn = $this->encoder = $this->decoder = $this->request = $this->nsps = null;
}
}

View File

@@ -0,0 +1,12 @@
<?php
namespace PHPSocketIO;
class Debug
{
public static function debug($var)
{
global $debug;
if($debug)
echo var_export($var, true)."\n";
}
}

View File

@@ -0,0 +1,96 @@
<?php
namespace PHPSocketIO;
class DefaultAdapter
{
public $nsp = null;
public $rooms = array();
public $sids = array();
public $encoder = null;
public function __construct($nsp)
{
$this->nsp = $nsp;
$this->encoder = new Parser\Encoder();
Debug::debug('DefaultAdapter __construct');
}
public function __destruct()
{
Debug::debug('DefaultAdapter __destruct');
}
public function add($id, $room)
{
$this->sids[$id][$room] = true;
$this->rooms[$room][$id] = true;
}
public function del($id, $room)
{
unset($this->sids[$id][$room]);
unset($this->rooms[$room][$id]);
if(empty($this->rooms[$room]))
{
unset($this->rooms[$room]);
}
}
public function delAll($id)
{
$rooms = array_keys(isset($this->sids[$id]) ? $this->sids[$id] : array());
foreach($rooms as $room)
{
$this->del($id, $room);
}
unset($this->sids[$id]);
}
public function broadcast($packet, $opts, $remote = false)
{
$rooms = isset($opts['rooms']) ? $opts['rooms'] : array();
$except = isset($opts['except']) ? $opts['except'] : array();
$flags = isset($opts['flags']) ? $opts['flags'] : array();
$packetOpts = array(
'preEncoded' => true,
'volatile' => isset($flags['volatile']) ? $flags['volatile'] : null,
'compress' => isset($flags['compress']) ? $flags['compress'] : null
);
$packet['nsp'] = $this->nsp->name;
$encodedPackets = $this->encoder->encode($packet);
if($rooms)
{
$ids = array();
foreach($rooms as $i=>$room)
{
if(!isset($this->rooms[$room]))
{
continue;
}
$room = $this->rooms[$room];
foreach($room as $id=>$item)
{
if(isset($ids[$id]) || isset($except[$id]))
{
continue;
}
if(isset($this->nsp->connected[$id]))
{
$ids[$id] = true;
$this->nsp->connected[$id]->packet($encodedPackets, $packetOpts);
}
}
}
} else {
foreach($this->sids as $id=>$sid)
{
if(isset($except[$id])) continue;
if(isset($this->nsp->connected[$id]))
{
$socket = $this->nsp->connected[$id];
$volatile = isset($flags['volatile']) ? $flags['volatile'] : null;
$socket->packet($encodedPackets, true, $volatile);
}
}
}
}
}

View File

@@ -0,0 +1,299 @@
<?php
namespace PHPSocketIO\Engine;
use \PHPSocketIO\Engine\Transports\Polling;
use \PHPSocketIO\Engine\Transports\PollingXHR;
use \PHPSocketIO\Engine\Transports\WebSocket;
use \PHPSocketIO\Event\Emitter;
use \PHPSocketIO\Debug;
class Engine extends Emitter
{
public $pingTimeout = 60;
public $pingInterval = 25;
public $upgradeTimeout = 5;
public $transports = array();
public $allowUpgrades = array();
public $allowRequest = array();
public $clients = array();
public $origins = '*:*';
public static $allowTransports = array(
'polling' => 'polling',
'websocket' => 'websocket'
);
public static $errorMessages = array(
'Transport unknown',
'Session ID unknown',
'Bad handshake method',
'Bad request'
);
const ERROR_UNKNOWN_TRANSPORT = 0;
const ERROR_UNKNOWN_SID = 1;
const ERROR_BAD_HANDSHAKE_METHOD = 2;
const ERROR_BAD_REQUEST = 3;
public function __construct($opts = array())
{
$ops_map = array(
'pingTimeout',
'pingInterval',
'upgradeTimeout',
'transports',
'allowUpgrades',
'allowRequest'
);
foreach($ops_map as $key)
{
if(isset($opts[$key]))
{
$this->$key = $opts[$key];
}
}
Debug::debug('Engine __construct');
}
public function __destruct()
{
Debug::debug('Engine __destruct');
}
public function handleRequest($req, $res)
{
$this->prepare($req);
$req->res = $res;
$this->verify($req, $res, false, array($this, 'dealRequest'));
}
public function dealRequest($err, $success, $req)
{
if (!$success)
{
self::sendErrorMessage($req, $req->res, $err);
return;
}
if(isset($req->_query['sid']))
{
$this->clients[$req->_query['sid']]->transport->onRequest($req);
}
else
{
$this->handshake($req->_query['transport'], $req);
}
}
protected function sendErrorMessage($req, $res, $code)
{
$headers = array('Content-Type'=> 'application/json');
if(isset($req->headers['origin']))
{
$headers['Access-Control-Allow-Credentials'] = 'true';
$headers['Access-Control-Allow-Origin'] = $req->headers['origin'];
}
else
{
$headers['Access-Control-Allow-Origin'] = '*';
}
$res->writeHead(403, '', $headers);
$res->end(json_encode(array(
'code' => $code,
'message' => isset(self::$errorMessages[$code]) ? self::$errorMessages[$code] : $code
)));
}
protected function verify($req, $res, $upgrade, $fn)
{
if(!isset($req->_query['transport']) || !isset(self::$allowTransports[$req->_query['transport']]))
{
return call_user_func($fn, self::ERROR_UNKNOWN_TRANSPORT, false, $req, $res);
}
$transport = $req->_query['transport'];
$sid = isset($req->_query['sid']) ? $req->_query['sid'] : '';
if($sid)
{
if(!isset($this->clients[$sid]))
{
return call_user_func($fn, self::ERROR_UNKNOWN_SID, false, $req, $res);
}
if(!$upgrade && $this->clients[$sid]->transport->name !== $transport)
{
return call_user_func($fn, self::ERROR_BAD_REQUEST, false, $req, $res);
}
}
else
{
if('GET' !== $req->method)
{
return call_user_func($fn, self::ERROR_BAD_HANDSHAKE_METHOD, false, $req, $res);
}
return $this->checkRequest($req, $fn);
}
call_user_func($fn, null, true, $req, $res);
}
public function checkRequest($req, $fn)
{
if ($this->origins === "*:*" || empty($this->origins))
{
return call_user_func($fn, null, true, $req);
}
$origin = null;
if (isset($req->headers['origin']))
{
$origin = $req->headers['origin'];
}
else if(isset($req->headers['referer']))
{
$origin = $req->headers['referer'];
}
// file:// URLs produce a null Origin which can't be authorized via echo-back
if ('null' === $origin || null === $origin) {
return call_user_func($fn, null, true, $req);
}
if ($origin)
{
$parts = parse_url($origin);
$defaultPort = 'https:' === $parts['scheme'] ? 443 : 80;
$parts['port'] = isset($parts['port']) ? $parts['port'] : $defaultPort;
$allowed_origins = explode(' ', $this->origins);
foreach( $allowed_origins as $allow_origin ){
$ok =
$allow_origin === $parts['scheme'] . '://' . $parts['host'] . ':' . $parts['port'] ||
$allow_origin === $parts['scheme'] . '://' . $parts['host'] ||
$allow_origin === $parts['scheme'] . '://' . $parts['host'] . ':*' ||
$allow_origin === '*:' . $parts['port'];
return call_user_func($fn, null, $ok, $req);
}
}
call_user_func($fn, null, false, $req);
}
protected function prepare($req)
{
if(!isset($req->_query))
{
$info = parse_url($req->url);
if(isset($info['query']))
{
parse_str($info['query'], $req->_query);
}
}
}
public function handshake($transport, $req)
{
$id = bin2hex(pack('d', microtime(true)).pack('N', function_exists('random_int') ? random_int(1, 100000000): rand(1, 100000000)));
if (isset($req->_query['j']))
{
$transport = '\\PHPSocketIO\\Engine\\Transports\\PollingJsonp';
}
else
{
$transport = '\\PHPSocketIO\\Engine\\Transports\\PollingXHR';
}
$transport = new $transport($req);
$transport->supportsBinary = !isset($req->_query['b64']);
$socket = new Socket($id, $this, $transport, $req);
/* $transport->on('headers', function(&$headers)use($id)
{
$headers['Set-Cookie'] = "io=$id";
}); */
$transport->onRequest($req);
$this->clients[$id] = $socket;
$socket->once('close', array($this, 'onSocketClose'));
$this->emit('connection', $socket);
}
public function onSocketClose($id)
{
unset($this->clients[$id]);
}
public function attach($worker)
{
$this->server = $worker;
$worker->onConnect = array($this, 'onConnect');
}
public function onConnect($connection)
{
$connection->onRequest = array($this, 'handleRequest');
$connection->onWebSocketConnect = array($this, 'onWebSocketConnect');
// clean
$connection->onClose = function($connection)
{
if(!empty($connection->httpRequest))
{
$connection->httpRequest->destroy();
$connection->httpRequest = null;
}
if(!empty($connection->httpResponse))
{
$connection->httpResponse->destroy();
$connection->httpResponse = null;
}
if(!empty($connection->onRequest))
{
$connection->onRequest = null;
}
if(!empty($connection->onWebSocketConnect))
{
$connection->onWebSocketConnect = null;
}
};
}
public function onWebSocketConnect($connection, $req, $res)
{
$this->prepare($req);
$this->verify($req, $res, true, array($this, 'dealWebSocketConnect'));
}
public function dealWebSocketConnect($err, $success, $req, $res)
{
if (!$success)
{
self::sendErrorMessage($req, $res, $err);
return;
}
if(isset($req->_query['sid']))
{
if(!isset($this->clients[$req->_query['sid']]))
{
self::sendErrorMessage($req, $res, 'upgrade attempt for closed client');
return;
}
$client = $this->clients[$req->_query['sid']];
if($client->upgrading)
{
self::sendErrorMessage($req, $res, 'transport has already been trying to upgrade');
return;
}
if($client->upgraded)
{
self::sendErrorMessage($req, $res, 'transport had already been upgraded');
return;
}
$transport = new WebSocket($req);
$client->maybeUpgrade($transport);
}
else
{
$this->handshake($req->_query['transport'], $req);
}
}
}

View File

@@ -0,0 +1,303 @@
<?php
namespace PHPSocketIO\Engine;
use \PHPSocketIO\Debug;
class Parser
{
public function __construct()
{
Debug::debug('Engine/Parser __construct');
}
public static $packets=array(
'open'=> 0 // non-ws
, 'close'=> 1 // non-ws
, 'ping'=> 2
, 'pong'=> 3
, 'message'=> 4
, 'upgrade'=> 5
, 'noop'=> 6
);
public static $packetsList = array(
'open',
'close',
'ping',
'pong',
'message',
'upgrade',
'noop'
);
public static $err = array(
'type' => 'error',
'data' => 'parser error'
);
public static function encodePacket($packet)
{
$data = !isset($packet['data']) ? '' : $packet['data'];
return self::$packets[$packet['type']].$data;
}
/**
* Encodes a packet with binary data in a base64 string
*
* @param {Object} packet, has `type` and `data`
* @return {String} base64 encoded message
*/
public static function encodeBase64Packet($packet)
{
$data = isset($packet['data']) ? '' : $packet['data'];
return $message = 'b' . self::$packets[$packet['type']] . base64_encode($packet['data']);
}
/**
* Decodes a packet. Data also available as an ArrayBuffer if requested.
*
* @return {Object} with `type` and `data` (if any)
* @api private
*/
public static function decodePacket($data, $binaryType = null, $utf8decode = true)
{
// String data todo check if (typeof data == 'string' || data === undefined)
if ($data[0] === 'b')
{
return self::decodeBase64Packet(substr($data, 1), $binaryType);
}
$type = $data[0];
if (!isset(self::$packetsList[$type]))
{
return self::$err;
}
if (isset($data[1]))
{
return array('type'=> self::$packetsList[$type], 'data'=> substr($data, 1));
}
else
{
return array('type'=> self::$packetsList[$type]);
}
}
/**
* Decodes a packet encoded in a base64 string.
*
* @param {String} base64 encoded message
* @return {Object} with `type` and `data` (if any)
*/
public static function decodeBase64Packet($msg, $binaryType)
{
$type = self::$packetsList[$msg[0]];
$data = base64_decode(substr($data, 1));
return array('type'=> $type, 'data'=> $data);
}
/**
* Encodes multiple messages (payload).
*
* <length>:data
*
* Example:
*
* 11:hello world2:hi
*
* If any contents are binary, they will be encoded as base64 strings. Base64
* encoded strings are marked with a b before the length specifier
*
* @param {Array} packets
* @api private
*/
public static function encodePayload($packets, $supportsBinary = null)
{
if ($supportsBinary)
{
return self::encodePayloadAsBinary($packets);
}
if (!$packets)
{
return '0:';
}
$results = '';
foreach($packets as $msg)
{
$results .= self::encodeOne($msg, $supportsBinary);
}
return $results;
}
public static function encodeOne($packet, $supportsBinary = null, $result = null)
{
$message = self::encodePacket($packet, $supportsBinary, true);
return strlen($message) . ':' . $message;
}
/*
* Decodes data when a payload is maybe expected. Possible binary contents are
* decoded from their base64 representation
*
* @api public
*/
public static function decodePayload($data, $binaryType = null)
{
if(!preg_match('/^\d+:\d/',$data))
{
return self::decodePayloadAsBinary($data, $binaryType);
}
if ($data === '')
{
// parser error - ignoring payload
return self::$err;
}
$length = '';//, n, msg;
for ($i = 0, $l = strlen($data); $i < $l; $i++)
{
$chr = $data[$i];
if (':' != $chr)
{
$length .= $chr;
}
else
{
if ('' == $length || ($length != ($n = intval($length))))
{
// parser error - ignoring payload
return self::$err;
}
$msg = substr($data, $i + 1, $n);
if ($length != strlen($msg))
{
// parser error - ignoring payload
return self::$err;
}
if (isset($msg[0]))
{
$packet = self::decodePacket($msg, $binaryType, true);
if (self::$err['type'] == $packet['type'] && self::$err['data'] == $packet['data'])
{
// parser error in individual packet - ignoring payload
return self::$err;
}
return $packet;
}
// advance cursor
$i += $n;
$length = '';
}
}
if ($length !== '')
{
// parser error - ignoring payload
echo new \Exception('parser error');
return self::$err;
}
}
/**
* Encodes multiple messages (payload) as binary.
*
* <1 = binary, 0 = string><number from 0-9><number from 0-9>[...]<number
* 255><data>
*
* Example:
* 1 3 255 1 2 3, if the binary contents are interpreted as 8 bit integers
*
* @param {Array} packets
* @return {Buffer} encoded payload
* @api private
*/
public static function encodePayloadAsBinary($packets)
{
$results = '';
foreach($packets as $msg)
{
$results .= self::encodeOneAsBinary($msg);
}
return $results;
}
public static function encodeOneAsBinary($p)
{
// todo is string or arraybuf
$packet = self::encodePacket($p, true, true);
$encodingLength = ''.strlen($packet);
$sizeBuffer = chr(0);
for ($i = 0; $i < strlen($encodingLength); $i++)
{
$sizeBuffer .= chr($encodingLength[$i]);
}
$sizeBuffer .= chr(255);
return $sizeBuffer.$packet;
}
/*
* Decodes data when a payload is maybe expected. Strings are decoded by
* interpreting each byte as a key code for entries marked to start with 0. See
* description of encodePayloadAsBinary
* @api public
*/
public static function decodePayloadAsBinary($data, $binaryType = null)
{
$bufferTail = $data;
$buffers = array();
while (strlen($bufferTail) > 0)
{
$strLen = '';
$isString = $bufferTail[0] == 0;
$numberTooLong = false;
for ($i = 1; ; $i++)
{
$tail = ord($bufferTail[$i]);
if ($tail === 255) break;
// 310 = char length of Number.MAX_VALUE
if (strlen($strLen) > 310)
{
$numberTooLong = true;
break;
}
$strLen .= $tail;
}
if($numberTooLong) return self::$err;
$bufferTail = substr($bufferTail, strlen($strLen) + 1);
$msgLength = intval($strLen, 10);
$msg = substr($bufferTail, 1, $msgLength + 1);
$buffers[] = $msg;
$bufferTail = substr($bufferTail, $msgLength + 1);
}
$total = count($buffers);
$packets = array();
foreach($buffers as $i => $buffer)
{
$packets[] = self::decodePacket($buffer, $binaryType, true);
}
return $packets;
}
}

View File

@@ -0,0 +1,51 @@
<?php
namespace PHPSocketIO\Engine\Protocols\Http;
class Request
{
public $onData = null;
public $onEnd = null;
public $httpVersion = null;
public $headers = array();
public $rawHeaders = null;
public $method = null;
public $url = null;
public $connection = null;
public function __construct($connection, $raw_head)
{
$this->connection = $connection;
$this->parseHead($raw_head);
}
public function parseHead($raw_head)
{
$header_data = explode("\r\n", $raw_head);
list($this->method, $this->url, $protocol) = explode(' ', $header_data[0]);
list($null, $this->httpVersion) = explode('/', $protocol);
unset($header_data[0]);
foreach($header_data as $content)
{
if(empty($content))
{
continue;
}
$this->rawHeaders[] = $content;
list($key, $value) = explode(':', $content, 2);
$this->headers[strtolower($key)] = trim($value);
}
}
public function destroy()
{
$this->onData = $this->onEnd = $this->onClose = null;
$this->connection = null;
}
}

View File

@@ -0,0 +1,203 @@
<?php
namespace PHPSocketIO\Engine\Protocols\Http;
class Response
{
public $statusCode = 200;
protected $_statusPhrase = null;
protected $_connection = null;
protected $_headers = array();
public $headersSent = false;
public $writable = true;
protected $_buffer = '';
public function __construct($connection)
{
$this->_connection = $connection;
}
protected function initHeader()
{
$this->_headers['Connection'] = 'keep-alive';
$this->_headers['Content-Type'] = 'Content-Type: text/html;charset=utf-8';
}
public function writeHead($status_code, $reason_phrase = '', $headers = null)
{
if($this->headersSent)
{
echo "header has already send\n";
return false;
}
$this->statusCode = $status_code;
if($reason_phrase)
{
$this->_statusPhrase = $reason_phrase;
}
if($headers)
{
foreach($headers as $key=>$val)
{
$this->_headers[$key] = $val;
}
}
$this->_buffer = $this->getHeadBuffer();
$this->headersSent = true;
}
public function getHeadBuffer()
{
if(!$this->_statusPhrase)
{
$this->_statusPhrase = isset(self::$codes[$this->statusCode]) ? self::$codes[$this->statusCode] : '';
}
$head_buffer = "HTTP/1.1 $this->statusCode $this->_statusPhrase\r\n";
if(!isset($this->_headers['Content-Length']) && !isset($this->_headers['Transfer-Encoding']))
{
$head_buffer .= "Transfer-Encoding: chunked\r\n";
}
if(!isset($this->_headers['Connection']))
{
$head_buffer .= "Connection: keep-alive\r\n";
}
foreach($this->_headers as $key=>$val)
{
if($key === 'Set-Cookie' && is_array($val))
{
foreach($val as $v)
{
$head_buffer .= "Set-Cookie: $v\r\n";
}
continue;
}
$head_buffer .= "$key: $val\r\n";
}
return $head_buffer."\r\n";
}
public function setHeader($key, $val)
{
$this->_headers[$key] = $val;
}
public function getHeader($name)
{
return isset($this->_headers[$name]) ? $this->_headers[$name] : '';
}
public function removeHeader($name)
{
unset($this->_headers[$name]);
}
public function write($chunk)
{
if(!isset($this->_headers['Content-Length']))
{
$chunk = dechex(strlen($chunk)) . "\r\n" . $chunk . "\r\n";
}
if(!$this->headersSent)
{
$head_buffer = $this->getHeadBuffer();
$this->_buffer = $head_buffer . $chunk;
$this->headersSent = true;
}
else
{
$this->_buffer .= $chunk;
}
}
public function end($data = null)
{
if(!$this->writable)
{
echo new \Exception('unwirtable');
return false;
}
if($data !== null)
{
$this->write($data);
}
if(!$this->headersSent)
{
$head_buffer = $this->getHeadBuffer();
$this->_buffer = $head_buffer;
$this->headersSent = true;
}
if(!isset($this->_headers['Content-Length']))
{
$ret = $this->_connection->send($this->_buffer . "0\r\n\r\n", true);
$this->destroy();
return $ret;
}
$ret = $this->_connection->send($this->_buffer, true);
$this->destroy();
return $ret;
}
public function destroy()
{
if(!empty($this->_connection->httpRequest))
{
$this->_connection->httpRequest->destroy();
}
$this->_connection->httpResponse = $this->_connection->httpRequest = null;
$this->_connection = null;
$this->writable = false;
}
public static $codes = array(
100 => 'Continue',
101 => 'Switching Protocols',
200 => 'OK',
201 => 'Created',
202 => 'Accepted',
203 => 'Non-Authoritative Information',
204 => 'No Content',
205 => 'Reset Content',
206 => 'Partial Content',
300 => 'Multiple Choices',
301 => 'Moved Permanently',
302 => 'Found',
303 => 'See Other',
304 => 'Not Modified',
305 => 'Use Proxy',
306 => '(Unused)',
307 => 'Temporary Redirect',
400 => 'Bad Request',
401 => 'Unauthorized',
402 => 'Payment Required',
403 => 'Forbidden',
404 => 'Not Found',
405 => 'Method Not Allowed',
406 => 'Not Acceptable',
407 => 'Proxy Authentication Required',
408 => 'Request Timeout',
409 => 'Conflict',
410 => 'Gone',
411 => 'Length Required',
412 => 'Precondition Failed',
413 => 'Request Entity Too Large',
414 => 'Request-URI Too Long',
415 => 'Unsupported Media Type',
416 => 'Requested Range Not Satisfiable',
417 => 'Expectation Failed',
422 => 'Unprocessable Entity',
423 => 'Locked',
500 => 'Internal Server Error',
501 => 'Not Implemented',
502 => 'Bad Gateway',
503 => 'Service Unavailable',
504 => 'Gateway Timeout',
505 => 'HTTP Version Not Supported',
);
}

View File

@@ -0,0 +1,209 @@
<?php
namespace PHPSocketIO\Engine\Protocols;
use \PHPSocketIO\Engine\Protocols\WebSocket;
use \PHPSocketIO\Engine\Protocols\Http\Request;
use \PHPSocketIO\Engine\Protocols\Http\Response;
use \Workerman\Connection\TcpConnection;
class SocketIO
{
public static function input($http_buffer, $connection)
{
if(!empty($connection->hasReadedHead))
{
return strlen($http_buffer);
}
$pos = strpos($http_buffer, "\r\n\r\n");
if(!$pos)
{
if(strlen($http_buffer)>=TcpConnection::$maxPackageSize)
{
$connection->close("HTTP/1.1 400 bad request\r\n\r\nheader too long");
return 0;
}
return 0;
}
$head_len = $pos + 4;
$raw_head = substr($http_buffer, 0, $head_len);
$raw_body = substr($http_buffer, $head_len);
$req = new Request($connection, $raw_head);
$res = new Response($connection);
$connection->httpRequest = $req;
$connection->httpResponse = $res;
$connection->hasReadedHead = true;
TcpConnection::$statistics['total_request']++;
$connection->onClose = '\PHPSocketIO\Engine\Protocols\SocketIO::emitClose';
if(isset($req->headers['upgrade']) && $req->headers['upgrade'] == 'websocket')
{
$connection->consumeRecvBuffer(strlen($http_buffer));
WebSocket::dealHandshake($connection, $req, $res);
self::cleanup($connection);
return 0;
}
if(!empty($connection->onRequest))
{
$connection->consumeRecvBuffer(strlen($http_buffer));
self::emitRequest($connection, $req, $res);
if($req->method === 'GET' || $req->method === 'OPTIONS')
{
self::emitEnd($connection, $req);
return 0;
}
// POST
if('\PHPSocketIO\Engine\Protocols\SocketIO::onData' !== $connection->onMessage)
{
$connection->onMessage = '\PHPSocketIO\Engine\Protocols\SocketIO::onData';
}
if(!$raw_body)
{
return 0;
}
self::onData($connection, $raw_body);
return 0;
}
else
{
if($req->method === 'GET')
{
return $pos + 4;
}
elseif(isset($req->headers['content-length']))
{
return $req->headers['content-length'];
}
else
{
$connection->close("HTTP/1.1 400 bad request\r\n\r\ntrunk not support");
return 0;
}
}
}
public static function onData($connection, $data)
{
$req = $connection->httpRequest;
self::emitData($connection, $req, $data);
if((isset($req->headers['content-length']) && $req->headers['content-length'] <= strlen($data))
|| substr($data, -5) === "0\r\n\r\n")
{
self::emitEnd($connection, $req);
}
}
protected static function emitRequest($connection, $req, $res)
{
try
{
call_user_func($connection->onRequest, $req, $res);
}
catch(\Exception $e)
{
echo $e;
}
}
public static function emitClose($connection)
{
$req = $connection->httpRequest;
if(isset($req->onClose))
{
try
{
call_user_func($req->onClose, $req);
}
catch(\Exception $e)
{
echo $e;
}
}
$res = $connection->httpResponse;
if(isset($res->onClose))
{
try
{
call_user_func($res->onClose, $res);
}
catch(\Exception $e)
{
echo $e;
}
}
self::cleanup($connection);
}
public static function cleanup($connection)
{
if(!empty($connection->onRequest))
{
$connection->onRequest = null;
}
if(!empty($connection->onWebSocketConnect))
{
$connection->onWebSocketConnect = null;
}
if(!empty($connection->httpRequest))
{
$connection->httpRequest->destroy();
$connection->httpRequest = null;
}
if(!empty($connection->httpResponse))
{
$connection->httpResponse->destroy();
$connection->httpResponse = null;
}
}
public static function emitData($connection, $req, $data)
{
if(isset($req->onData))
{
try
{
call_user_func($req->onData, $req, $data);
}
catch(\Exception $e)
{
echo $e;
}
}
}
public static function emitEnd($connection, $req)
{
if(isset($req->onEnd))
{
try
{
call_user_func($req->onEnd, $req);
}
catch(\Exception $e)
{
echo $e;
}
}
$connection->hasReadedHead = false;
}
public static function encode($buffer, $connection)
{
if(!isset($connection->onRequest))
{
$connection->httpResponse->setHeader('Content-Length', strlen($buffer));
return $connection->httpResponse->getHeadBuffer() . $buffer;
}
return $buffer;
}
public static function decode($http_buffer, $connection)
{
if(isset($connection->onRequest))
{
return $http_buffer;
}
else
{
list($head, $body) = explode("\r\n\r\n", $http_buffer, 2);
return $body;
}
}
}

View File

@@ -0,0 +1,86 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace PHPSocketIO\Engine\Protocols;
use \PHPSocketIO\Engine\Protocols\Http\Request;
use \PHPSocketIO\Engine\Protocols\Http\Response;
use \PHPSocketIO\Engine\Protocols\WebSocket\RFC6455;
use \Workerman\Connection\TcpConnection;
/**
* WebSocket 协议服务端解包和打包
*/
class WebSocket
{
/**
* 最小包头
* @var int
*/
const MIN_HEAD_LEN = 7;
/**
* 检查包的完整性
* @param string $buffer
*/
public static function input($buffer, $connection)
{
if(strlen($buffer) < self::MIN_HEAD_LEN)
{
return 0;
}
// flash policy file
if(0 === strpos($buffer,'<policy'))
{
$policy_xml = '<?xml version="1.0"?><cross-domain-policy><site-control permitted-cross-domain-policies="all"/><allow-access-from domain="*" to-ports="*"/></cross-domain-policy>'."\0";
$connection->send($policy_xml, true);
$connection->consumeRecvBuffer(strlen($buffer));
return 0;
}
// http head
$pos = strpos($buffer, "\r\n\r\n");
if(!$pos)
{
if(strlen($buffer)>=TcpConnection::$maxPackageSize)
{
$connection->close("HTTP/1.1 400 bad request\r\n\r\nheader too long");
return 0;
}
return 0;
}
$req = new Request($connection, $buffer);
$res = new Response($connection);
$connection->consumeRecvBuffer(strlen($buffer));
return self::dealHandshake($connection, $req, $res);
$connection->consumeRecvBuffer($pos+4);
return 0;
}
/**
* 处理websocket握手
* @param string $buffer
* @param TcpConnection $connection
* @return int
*/
public static function dealHandshake($connection, $req, $res)
{
if(isset($req->headers['sec-websocket-key1']))
{
$res->writeHead(400);
$res->end("Not support");
return 0;
}
$connection->protocol = 'PHPSocketIO\Engine\Protocols\WebSocket\RFC6455';
return RFC6455::dealHandshake($connection, $req, $res);
}
}

View File

@@ -0,0 +1,335 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace PHPSocketIO\Engine\Protocols\WebSocket;
use Workerman\Connection\ConnectionInterface;
/**
* WebSocket 协议服务端解包和打包
*/
class RFC6455 implements \Workerman\Protocols\ProtocolInterface
{
/**
* websocket头部最小长度
* @var int
*/
const MIN_HEAD_LEN = 6;
/**
* websocket blob类型
* @var char
*/
const BINARY_TYPE_BLOB = "\x81";
/**
* websocket arraybuffer类型
* @var char
*/
const BINARY_TYPE_ARRAYBUFFER = "\x82";
/**
* 检查包的完整性
* @param string $buffer
*/
public static function input($buffer, ConnectionInterface $connection)
{
// 数据长度
$recv_len = strlen($buffer);
// 长度不够
if($recv_len < self::MIN_HEAD_LEN)
{
return 0;
}
// $connection->websocketCurrentFrameLength有值说明当前fin为0则缓冲websocket帧数据
if($connection->websocketCurrentFrameLength)
{
// 如果当前帧数据未收全,则继续收
if($connection->websocketCurrentFrameLength > $recv_len)
{
// 返回0因为不清楚完整的数据包长度需要等待fin=1的帧
return 0;
}
}
else
{
$data_len = ord($buffer[1]) & 127;
$firstbyte = ord($buffer[0]);
$is_fin_frame = $firstbyte>>7;
$opcode = $firstbyte & 0xf;
switch($opcode)
{
// 附加数据帧 @todo 实现附加数据帧
case 0x0:
break;
// 文本数据帧
case 0x1:
break;
// 二进制数据帧
case 0x2:
break;
// 关闭的包
case 0x8:
// 如果有设置onWebSocketClose回调尝试执行
if(isset($connection->onWebSocketClose))
{
call_user_func($connection->onWebSocketClose, $connection);
}
// 默认行为是关闭连接
else
{
$connection->close();
}
return 0;
// ping的包
case 0x9:
// 如果有设置onWebSocketPing回调尝试执行
if(isset($connection->onWebSocketPing))
{
call_user_func($connection->onWebSocketPing, $connection);
}
// 默认发送pong
else
{
$connection->send(pack('H*', '8a00'), true);
}
// 从接受缓冲区中消费掉该数据包
if(!$data_len)
{
$connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
return 0;
}
break;
// pong的包
case 0xa:
// 如果有设置onWebSocketPong回调尝试执行
if(isset($connection->onWebSocketPong))
{
call_user_func($connection->onWebSocketPong, $connection);
}
// 从接受缓冲区中消费掉该数据包
if(!$data_len)
{
$connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
return 0;
}
break;
// 错误的opcode
default :
echo "error opcode $opcode and close websocket connection\n";
$connection->close();
return 0;
}
// websocket二进制数据
$head_len = self::MIN_HEAD_LEN;
if ($data_len === 126) {
$head_len = 8;
if($head_len > $recv_len)
{
return 0;
}
$pack = unpack('ntotal_len', substr($buffer, 2, 2));
$data_len = $pack['total_len'];
} else if ($data_len === 127) {
$head_len = 14;
if($head_len > $recv_len)
{
return 0;
}
$arr = unpack('N2', substr($buffer, 2, 8));
$data_len = $arr[1]*4294967296 + $arr[2];
}
$current_frame_length = $head_len + $data_len;
if($is_fin_frame)
{
return $current_frame_length;
}
else
{
$connection->websocketCurrentFrameLength = $current_frame_length;
}
}
// 收到的数据刚好是一个frame
if($connection->websocketCurrentFrameLength == $recv_len)
{
self::decode($buffer, $connection);
$connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
$connection->websocketCurrentFrameLength = 0;
return 0;
}
// 收到的数据大于一个frame
elseif($connection->websocketCurrentFrameLength < $recv_len)
{
self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
$connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
$current_frame_length = $connection->websocketCurrentFrameLength;
$connection->websocketCurrentFrameLength = 0;
// 继续读取下一个frame
return self::input(substr($buffer, $current_frame_length), $connection);
}
// 收到的数据不足一个frame
else
{
return 0;
}
}
/**
* 打包
* @param string $buffer
* @return string
*/
public static function encode($buffer, ConnectionInterface $connection)
{
$len = strlen($buffer);
if(empty($connection->websocketHandshake))
{
// 默认是utf8文本格式
$connection->websocketType = self::BINARY_TYPE_BLOB;
}
$first_byte = $connection->websocketType;
if($len<=125)
{
$encode_buffer = $first_byte.chr($len).$buffer;
}
else if($len<=65535)
{
$encode_buffer = $first_byte.chr(126).pack("n", $len).$buffer;
}
else
{
$encode_buffer = $first_byte.chr(127).pack("xxxxN", $len).$buffer;
}
// 还没握手不能发数据,先将数据缓冲起来,等握手完毕后发送
if(empty($connection->websocketHandshake))
{
if(empty($connection->websocketTmpData))
{
// 临时数据缓冲
$connection->websocketTmpData = '';
}
$connection->websocketTmpData .= $encode_buffer;
// 返回空,阻止发送
return '';
}
return $encode_buffer;
}
/**
* 解包
* @param string $buffer
* @return string
*/
public static function decode($buffer, ConnectionInterface $connection)
{
$len = $masks = $data = $decoded = null;
$len = ord($buffer[1]) & 127;
if ($len === 126) {
$masks = substr($buffer, 4, 4);
$data = substr($buffer, 8);
} else if ($len === 127) {
$masks = substr($buffer, 10, 4);
$data = substr($buffer, 14);
} else {
$masks = substr($buffer, 2, 4);
$data = substr($buffer, 6);
}
for ($index = 0; $index < strlen($data); $index++) {
$decoded .= $data[$index] ^ $masks[$index % 4];
}
if($connection->websocketCurrentFrameLength)
{
$connection->websocketDataBuffer .= $decoded;
return $connection->websocketDataBuffer;
}
else
{
$decoded = $connection->websocketDataBuffer . $decoded;
$connection->websocketDataBuffer = '';
return $decoded;
}
}
/**
* 处理websocket握手
* @param string $buffer
* @param TcpConnection $connection
* @return int
*/
public static function dealHandshake($connection, $req, $res)
{
$headers = array();
if(isset($connection->onWebSocketConnect))
{
try
{
call_user_func_array($connection->onWebSocketConnect, array($connection, $req, $res));
}
catch (\Exception $e)
{
echo $e;
}
if(!$res->writable)
{
return false;
}
}
if(isset($req->headers['sec-websocket-key']))
{
$sec_websocket_key = $req->headers['sec-websocket-key'];
}
else
{
$res->writeHead(400);
$res->end('<b>400 Bad Request</b><br>Upgrade to websocket but Sec-WebSocket-Key not found.');
return 0;
}
// 标记已经握手
$connection->websocketHandshake = true;
// 缓冲fin为0的包直到fin为1
$connection->websocketDataBuffer = '';
// 当前数据帧的长度可能是fin为0的帧也可能是fin为1的帧
$connection->websocketCurrentFrameLength = 0;
// 当前帧的数据缓冲
$connection->websocketCurrentFrameBuffer = '';
// blob or arraybuffer
$connection->websocketType = self::BINARY_TYPE_BLOB;
$sec_websocket_accept = base64_encode(sha1($sec_websocket_key.'258EAFA5-E914-47DA-95CA-C5AB0DC85B11',true));
$headers['Content-Length'] = 0;
$headers['Upgrade'] = 'websocket';
$headers['Sec-WebSocket-Version'] = 13;
$headers['Connection'] = 'Upgrade';
$headers['Sec-WebSocket-Accept'] = $sec_websocket_accept;
$res->writeHead(101, '', $headers);
$res->end();
// 握手后有数据要发送
if(!empty($connection->websocketTmpData))
{
$connection->send($connection->websocketTmpData, true);
$connection->websocketTmpData = '';
}
return 0;
}
}

View File

@@ -0,0 +1,380 @@
<?php
namespace PHPSocketIO\Engine;
use \PHPSocketIO\Event\Emitter;
use \Workerman\Lib\Timer;
use \PHPSocketIO\Debug;
class Socket extends Emitter
{
public $id = 0;
public $server = null;
public $upgrading = false;
public $upgraded = false;
public $readyState = 'opening';
public $writeBuffer = array();
public $packetsFn = array();
public $sentCallbackFn = array();
public $request = null;
public $remoteAddress = '';
public $checkIntervalTimer = null;
public $upgradeTimeoutTimer = null;
public $pingTimeoutTimer = null;
public function __construct($id, $server, $transport, $req)
{
$this->id = $id;
$this->server = $server;
$this->request = $req;
$this->remoteAddress = $req->connection->getRemoteIp().':'.$req->connection->getRemotePort();
$this->setTransport($transport);
$this->onOpen();
Debug::debug('Engine/Socket __construct');
}
public function __destruct()
{
Debug::debug('Engine/Socket __destruct');
}
public function maybeUpgrade($transport)
{
$this->upgrading = true;
$this->upgradeTimeoutTimer = Timer::add(
$this->server->upgradeTimeout,
array($this, 'upgradeTimeoutCallback'),
array($transport), false
);
$this->upgradeTransport = $transport;
$transport->on('packet', array($this, 'onUpgradePacket'));
$transport->once('close', array($this, 'onUpgradeTransportClose'));
$transport->once('error', array($this, 'onUpgradeTransportError'));
$this->once('close', array($this, 'onUpgradeTransportClose'));
}
public function onUpgradePacket($packet)
{
if(empty($this->upgradeTransport))
{
$this->onError('upgradeTransport empty');
return;
}
if('ping' === $packet['type'] && (isset($packet['data']) && 'probe' === $packet['data']))
{
$this->upgradeTransport->send(array(array('type'=> 'pong', 'data'=> 'probe')));
//$this->transport->shouldClose = function(){};
if ($this->checkIntervalTimer) {
Timer::del($this->checkIntervalTimer);
}
$this->checkIntervalTimer = Timer::add(0.5, array($this, 'check'));
}
else if('upgrade' === $packet['type'] && $this->readyState !== 'closed')
{
$this->upgradeCleanup();
$this->upgraded = true;
$this->clearTransport();
$this->transport->destroy();
$this->setTransport($this->upgradeTransport);
$this->emit('upgrade', $this->upgradeTransport);
$this->upgradeTransport = null;
$this->setPingTimeout();
$this->flush();
if($this->readyState === 'closing')
{
$this->transport->close(array($this, 'onClose'));
}
}
else
{
if(!empty($this->upgradeTransport))
{
$this->upgradeCleanup();
$this->upgradeTransport->close();
$this->upgradeTransport = null;
}
}
}
public function upgradeCleanup()
{
$this->upgrading = false;
Timer::del($this->checkIntervalTimer);
Timer::del($this->upgradeTimeoutTimer);
if(!empty($this->upgradeTransport))
{
$this->upgradeTransport->removeListener('packet', array($this, 'onUpgradePacket'));
$this->upgradeTransport->removeListener('close', array($this, 'onUpgradeTransportClose'));
$this->upgradeTransport->removeListener('error', array($this, 'onUpgradeTransportError'));
}
$this->removeListener('close', array($this, 'onUpgradeTransportClose'));
}
public function onUpgradeTransportClose()
{
$this->onUpgradeTransportError('transport closed');
}
public function onUpgradeTransportError($err)
{
//echo $err;
$this->upgradeCleanup();
if($this->upgradeTransport)
{
$this->upgradeTransport->close();
$this->upgradeTransport = null;
}
}
public function upgradeTimeoutCallback($transport)
{
//echo("client did not complete upgrade - closing transport\n");
$this->upgradeCleanup();
if('open' === $transport->readyState)
{
$transport->close();
}
}
public function setTransport($transport)
{
$this->transport = $transport;
$this->transport->once('error', array($this, 'onError'));
$this->transport->on('packet', array($this, 'onPacket'));
$this->transport->on('drain', array($this, 'flush'));
$this->transport->once('close', array($this, 'onClose'));
//this function will manage packet events (also message callbacks)
$this->setupSendCallback();
}
public function onOpen()
{
$this->readyState = 'open';
// sends an `open` packet
$this->transport->sid = $this->id;
$this->sendPacket('open', json_encode(array(
'sid'=> $this->id
, 'upgrades' => $this->getAvailableUpgrades()
, 'pingInterval'=> $this->server->pingInterval*1000
, 'pingTimeout'=> $this->server->pingTimeout*1000
)));
$this->emit('open');
$this->setPingTimeout();
}
public function onPacket($packet)
{
if ('open' === $this->readyState) {
// export packet event
$this->emit('packet', $packet);
// Reset ping timeout on any packet, incoming data is a good sign of
// other side's liveness
$this->setPingTimeout();
switch ($packet['type']) {
case 'ping':
$this->sendPacket('pong');
$this->emit('heartbeat');
break;
case 'error':
$this->onClose('parse error');
break;
case 'message':
$this->emit('data', $packet['data']);
$this->emit('message', $packet['data']);
break;
}
}
else
{
echo('packet received with closed socket');
}
}
public function check()
{
if('polling' == $this->transport->name && $this->transport->writable)
{
$this->transport->send(array(array('type' => 'noop')));
}
}
public function onError($err)
{
$this->onClose('transport error', $err);
}
public function setPingTimeout()
{
if ($this->pingTimeoutTimer) {
Timer::del($this->pingTimeoutTimer);
}
$this->pingTimeoutTimer = Timer::add(
$this->server->pingInterval + $this->server->pingTimeout ,
array($this, 'pingTimeoutCallback'), null, false);
}
public function pingTimeoutCallback()
{
$this->transport->close();
$this->onClose('ping timeout');
}
public function clearTransport()
{
$this->transport->close();
Timer::del($this->pingTimeoutTimer);
}
public function onClose($reason = '', $description = null)
{
if ('closed' !== $this->readyState)
{
Timer::del($this->pingTimeoutTimer);
Timer::del($this->checkIntervalTimer);
$this->checkIntervalTimer = null;
Timer::del($this->upgradeTimeoutTimer);
// clean writeBuffer in next tick, so developers can still
// grab the writeBuffer on 'close' event
$this->writeBuffer = array();
$this->packetsFn = array();
$this->sentCallbackFn = array();
$this->clearTransport();
$this->readyState = 'closed';
$this->emit('close', $this->id, $reason, $description);
$this->server = null;
$this->request = null;
$this->upgradeTransport = null;
$this->removeAllListeners();
if(!empty($this->transport))
{
$this->transport->removeAllListeners();
$this->transport = null;
}
}
}
public function send($data, $options, $callback)
{
$this->sendPacket('message', $data, $options, $callback);
return $this;
}
public function write($data, $options = array(), $callback = null)
{
return $this->send($data, $options, $callback);
}
public function sendPacket($type, $data = null, $callback = null)
{
if('closing' !== $this->readyState)
{
$packet = array(
'type'=> $type
);
if($data !== null)
{
$packet['data'] = $data;
}
// exports packetCreate event
$this->emit('packetCreate', $packet);
$this->writeBuffer[] = $packet;
//add send callback to object
if($callback)
{
$this->packetsFn[] = $callback;
}
$this->flush();
}
}
public function flush()
{
if ('closed' !== $this->readyState && $this->transport->writable
&& $this->writeBuffer)
{
$this->emit('flush', $this->writeBuffer);
$this->server->emit('flush', $this, $this->writeBuffer);
$wbuf = $this->writeBuffer;
$this->writeBuffer = array();
if($this->packetsFn)
{
if(!empty($this->transport->supportsFraming))
{
$this->sentCallbackFn[] = $this->packetsFn;
}
else
{
// @todo check
$this->sentCallbackFn[]=$this->packetsFn;
}
}
$this->packetsFn = array();
$this->transport->send($wbuf);
$this->emit('drain');
if($this->server)
{
$this->server->emit('drain', $this);
}
}
}
public function getAvailableUpgrades()
{
return array('websocket');
}
public function close()
{
if ('open' !== $this->readyState)
{
return;
}
$this->readyState = 'closing';
if ($this->writeBuffer) {
$this->once('drain', array($this, 'closeTransport'));
return;
}
$this->closeTransport();
}
public function closeTransport()
{
//todo onClose.bind(this, 'forced close'));
$this->transport->close(array($this, 'onClose'));
}
public function setupSendCallback()
{
$self = $this;
//the message was sent successfully, execute the callback
$this->transport->on('drain', array($this, 'onDrainCallback'));
}
public function onDrainCallback()
{
if ($this->sentCallbackFn)
{
$seqFn = array_shift($this->sentCallbackFn);
if(is_callable($seqFn))
{
echo('executing send callback');
call_user_func($seqFn, $this->transport);
}else if (is_array($seqFn)) {
echo('executing batch send callback');
foreach($seqFn as $fn)
{
call_user_func($fn, $this->transport);
}
}
}
}
}

View File

@@ -0,0 +1,79 @@
<?php
namespace PHPSocketIO\Engine;
use \PHPSocketIO\Event\Emitter;
use \PHPSocketIO\Debug;
class Transport extends Emitter
{
public $readyState = 'opening';
public $req = null;
public $res = null;
public function __construct()
{
Debug::debug('Transport __construct no access !!!!');
}
public function __destruct()
{
Debug::debug('Transport __destruct');
}
public function noop()
{
}
public function onRequest($req)
{
$this->req = $req;
}
public function close($fn = null)
{
$this->readyState = 'closing';
$fn = $fn ? $fn : array($this, 'noop');
$this->doClose($fn);
}
public function onError($msg, $desc = '')
{
if ($this->listeners('error'))
{
$err = array(
'type' => 'TransportError',
'description' => $desc,
);
$this->emit('error', $err);
}
else
{
echo("ignored transport error $msg $desc\n");
}
}
public function onPacket($packet)
{
$this->emit('packet', $packet);
}
public function onData($data)
{
$this->onPacket(Parser::decodePacket($data));
}
public function onClose()
{
$this->req = $this->res = null;
$this->readyState = 'closed';
$this->emit('close');
$this->removeAllListeners();
}
public function destroy()
{
$this->req = $this->res = null;
$this->readyState = 'closed';
$this->removeAllListeners();
$this->shouldClose = null;
}
}

View File

@@ -0,0 +1,208 @@
<?php
namespace PHPSocketIO\Engine\Transports;
use PHPSocketIO\Engine\Transport;
use PHPSocketIO\Engine\Parser;
use \PHPSocketIO\Debug;
class Polling extends Transport
{
public $name = 'polling';
public $chunks = '';
public $shouldClose = null;
public $writable = false;
public function onRequest($req)
{
$res = $req->res;
if ('GET' === $req->method)
{
$this->onPollRequest($req, $res);
}
else if('POST' === $req->method)
{
$this->onDataRequest($req, $res);
}
else
{
$res->writeHead(500);
$res->end();
}
}
public function onPollRequest($req, $res)
{
if($this->req)
{
echo ('request overlap');
// assert: this.res, '.req and .res should be (un)set together'
$this->onError('overlap from client');
$res->writeHead(500);
return;
}
$this->req = $req;
$this->res = $res;
$req->onClose = array($this, 'pollRequestOnClose');
$req->cleanup = array($this, 'pollRequestClean');
$this->writable = true;
$this->emit('drain');
// if we're still writable but had a pending close, trigger an empty send
if ($this->writable && $this->shouldClose)
{
echo('triggering empty send to append close packet');
$this->send(array(array('type'=>'noop')));
}
}
public function pollRequestOnClose()
{
$this->onError('poll connection closed prematurely');
$this->pollRequestClean();
}
public function pollRequestClean()
{
if(isset($this->req))
{
$this->req->res = null;
$this->req->onClose = $this->req->cleanup = null;
$this->req = $this->res = null;
}
}
public function onDataRequest($req, $res)
{
if(isset($this->dataReq))
{
// assert: this.dataRes, '.dataReq and .dataRes should be (un)set together'
$this->onError('data request overlap from client');
$res->writeHead(500);
return;
}
$this->dataReq = $req;
$this->dataRes = $res;
$req->onClose = array($this, 'dataRequestOnClose');
$req->onData = array($this, 'dataRequestOnData');
$req->onEnd = array($this, 'dataRequestOnEnd');
}
public function dataRequestCleanup()
{
$this->chunks = '';
$this->dataReq->res = null;
$this->dataReq->onClose = $this->dataReq->onData = $this->dataReq->onEnd = null;
$this->dataReq = $this->dataRes = null;
}
public function dataRequestClose()
{
$this->dataRequestCleanup();
$this->onError('data request connection closed prematurely');
}
public function dataRequestOnData($req, $data)
{
$this->chunks .= $data;
// todo maxHttpBufferSize
/*if(strlen($this->chunks) > $this->maxHttpBufferSize)
{
$this->chunks = '';
$req->connection->destroy();
}*/
}
public function dataRequestOnEnd ()
{
$this->onData($this->chunks);
$headers = array(
'Content-Type'=> 'text/html',
'Content-Length'=> 2,
'X-XSS-Protection' => '0',
);
$this->dataRes->writeHead(200, '', $this->headers($this->dataReq, $headers));
$this->dataRes->end('ok');
$this->dataRequestCleanup();
}
public function onData($data)
{
$packets = Parser::decodePayload($data);
if(isset($packets['type']))
{
if('close' === $packets['type'])
{
$this->onClose();
return false;
}
else
{
$packets = array($packets);
}
}
foreach($packets as $packet)
{
$this->onPacket($packet);
}
}
public function onClose()
{
if($this->writable)
{
// close pending poll request
$this->send(array(array('type'=> 'noop')));
}
parent::onClose();
}
public function send($packets)
{
$this->writable = false;
if($this->shouldClose)
{
echo('appending close packet to payload');
$packets[] = array('type'=>'close');
call_user_func($this->shouldClose);
$this->shouldClose = null;
}
$data = Parser::encodePayload($packets, $this->supportsBinary);
$this->write($data);
}
public function write($data)
{
$this->doWrite($data);
if(!empty($this->req->cleanup))
{
call_user_func($this->req->cleanup);
}
}
public function doClose($fn)
{
if(!empty($this->dataReq))
{
//echo('aborting ongoing data request');
$this->dataReq->destroy();
}
if($this->writable)
{
//echo('transport writable - closing right away');
$this->send(array(array('type'=> 'close')));
call_user_func($fn);
}
else
{
//echo("transport not writable - buffering orderly close\n");
$this->shouldClose = $fn;
}
}
}

View File

@@ -0,0 +1,63 @@
<?php
namespace PHPSocketIO\Engine\Transports;
use \PHPSocketIO\Debug;
class PollingJsonp extends Polling
{
public $head = null;
public $foot = ');';
public function __construct($req)
{
$j = isset($req->_query['j']) ? preg_replace('/[^0-9]/', '', $req->_query['j']) : '';
$this->head = "___eio[ $j ](";
Debug::debug('PollingJsonp __construct');
}
public function __destruct()
{
Debug::debug('PollingJsonp __destruct');
}
public function onData($data)
{
$parsed_data = null;
parse_str($data, $parsed_data);
$data = $parsed_data['d'];
// todo check
//client will send already escaped newlines as \\\\n and newlines as \\n
// \\n must be replaced with \n and \\\\n with \\n
/*data = data.replace(rSlashes, function(match, slashes) {
return slashes ? match : '\n';
});*/
call_user_func(array($this, 'parent::onData'), preg_replace('/\\\\n/', '\\n', $data));
}
public function doWrite($data)
{
$js = json_encode($data);
//$js = preg_replace(array('/\u2028/', '/\u2029/'), array('\\u2028', '\\u2029'), $js);
// prepare response
$data = $this->head . $js . $this->foot;
// explicit UTF-8 is required for pages not served under utf
$headers = array(
'Content-Type'=> 'text/javascript; charset=UTF-8',
'Content-Length'=> strlen($data),
'X-XSS-Protection'=>'0'
);
if(empty($this->res)){echo new \Exception('empty $this->res');return;}
$this->res->writeHead(200, '',$this->headers($this->req, $headers));
$this->res->end($data);
}
public function headers($req, $headers = array())
{
$listeners = $this->listeners('headers');
foreach($listeners as $listener)
{
$listener($headers);
}
return $headers;
}
}

View File

@@ -0,0 +1,70 @@
<?php
namespace PHPSocketIO\Engine\Transports;
use \PHPSocketIO\Debug;
class PollingXHR extends Polling
{
public function __construct()
{
Debug::debug('PollingXHR __construct');
}
public function __destruct()
{
Debug::debug('PollingXHR __destruct');
}
public function onRequest($req)
{
if('OPTIONS' === $req->method)
{
$res = $req->res;
$headers = $this->headers($req);
$headers['Access-Control-Allow-Headers'] = 'Content-Type';
$res->writeHead(200, '', $headers);
$res->end();
}
else
{
parent::onRequest($req);
}
}
public function doWrite($data)
{
// explicit UTF-8 is required for pages not served under utf todo
//$content_type = $isString
// ? 'text/plain; charset=UTF-8'
// : 'application/octet-stream';
$content_type = preg_match('/^\d+:/', $data) ? 'text/plain; charset=UTF-8' : 'application/octet-stream';
$content_length = strlen($data);
$headers = array(
'Content-Type'=> $content_type,
'Content-Length'=> $content_length,
'X-XSS-Protection' => '0',
);
if(empty($this->res)){echo new \Exception('empty this->res');return;}
$this->res->writeHead(200, '', $this->headers($this->req, $headers));
$this->res->end($data);
}
public function headers($req, $headers = array())
{
if(isset($req->headers['origin']))
{
$headers['Access-Control-Allow-Credentials'] = 'true';
$headers['Access-Control-Allow-Origin'] = $req->headers['origin'];
}
else
{
$headers['Access-Control-Allow-Origin'] = '*';
}
$listeners = $this->listeners('headers');
foreach($listeners as $listener)
{
$listener($headers);
}
return $headers;
}
}

View File

@@ -0,0 +1,56 @@
<?php
namespace PHPSocketIO\Engine\Transports;
use \PHPSocketIO\Engine\Transport;
use \PHPSocketIO\Engine\Parser;
use \PHPSocketIO\Debug;
class WebSocket extends Transport
{
public $writable = true;
public $supportsFraming = true;
public $supportsBinary = true;
public $name = 'websocket';
public function __construct($req)
{
$this->socket = $req->connection;
$this->socket->onMessage = array($this, 'onData2');
$this->socket->onClose = array($this, 'onClose');
$this->socket->onError = array($this, 'onError2');
Debug::debug('WebSocket __construct');
}
public function __destruct()
{
Debug::debug('WebSocket __destruct');
}
public function onData2($connection, $data)
{
call_user_func(array($this, 'parent::onData'), $data);
}
public function onError2($conection, $code, $msg)
{
call_user_func(array($this, 'parent::onClose'), $code, $msg);
}
public function send($packets)
{
foreach($packets as $packet)
{
$data = Parser::encodePacket($packet, $this->supportsBinary);
$this->socket->send($data);
$this->emit('drain');
}
}
public function doClose($fn = null)
{
if($this->socket)
{
$this->socket->close();
$this->socket = null;
if(!empty($fn))
{
call_user_func($fn);
}
}
}
}

View File

@@ -0,0 +1,105 @@
<?php
namespace PHPSocketIO\Event;
use \PHPSocketIO\Debug;
class Emitter
{
public function __construct()
{
Debug::debug('Emitter __construct');
}
public function __destruct()
{
Debug::debug('Emitter __destruct');
}
/**
* [event=>[[listener1, once?], [listener2,once?], ..], ..]
*/
protected $_eventListenerMap = array();
public function on($event_name, $listener)
{
$this->emit('newListener', $event_name, $listener);
$this->_eventListenerMap[$event_name][] = array($listener, 0);
return $this;
}
public function once($event_name, $listener)
{
$this->_eventListenerMap[$event_name][] = array($listener, 1);
return $this;
}
public function removeListener($event_name, $listener)
{
if(!isset($this->_eventListenerMap[$event_name]))
{
return $this;
}
foreach($this->_eventListenerMap[$event_name] as $key=>$item)
{
if($item[0] === $listener)
{
$this->emit('removeListener', $event_name, $listener);
unset($this->_eventListenerMap[$event_name][$key]);
}
}
if(empty($this->_eventListenerMap[$event_name]))
{
unset($this->_eventListenerMap[$event_name]);
}
return $this;
}
public function removeAllListeners($event_name = null)
{
$this->emit('removeListener', $event_name);
if(null === $event_name)
{
$this->_eventListenerMap = array();
return $this;
}
unset($this->_eventListenerMap[$event_name]);
return $this;
}
public function listeners($event_name)
{
if(empty($this->_eventListenerMap[$event_name]))
{
return array();
}
$listeners = array();
foreach($this->_eventListenerMap[$event_name] as $item)
{
$listeners[] = $item[0];
}
return $listeners;
}
public function emit($event_name = null)
{
if(empty($event_name) || empty($this->_eventListenerMap[$event_name]))
{
return false;
}
foreach($this->_eventListenerMap[$event_name] as $key=>$item)
{
$args = func_get_args();
unset($args[0]);
call_user_func_array($item[0], $args);
// once ?
if($item[1])
{
unset($this->_eventListenerMap[$event_name][$key]);
if(empty($this->_eventListenerMap[$event_name]))
{
unset($this->_eventListenerMap[$event_name]);
}
}
}
return true;
}
}

View File

@@ -0,0 +1,162 @@
<?php
namespace PHPSocketIO;
use PHPSocketIO\Event\Emitter;
use PHPSocketIO\Parser\Parser;
class Nsp extends Emitter
{
public $name = null;
public $server = null;
public $rooms = array();
public $flags = array();
public $sockets = array();
public $connected = array();
public $fns = array();
public $ids = 0;
public $acks = array();
public static $events = array(
'connect' => 'connect', // for symmetry with client
'connection' => 'connection',
'newListener' => 'newListener'
);
//public static $flags = array('json','volatile');
public function __construct($server, $name)
{
$this->name = $name;
$this->server = $server;
$this->initAdapter();
Debug::debug('Nsp __construct');
}
public function __destruct()
{
Debug::debug('Nsp __destruct');
}
public function initAdapter()
{
$adapter_name = $this->server->adapter();
$this->adapter = new $adapter_name($this);
}
public function to($name)
{
if(!isset($this->rooms[$name]))
{
$this->rooms[$name] = $name;
}
return $this;
}
public function in($name)
{
return $this->to($name);
}
public function add($client, $nsp, $fn)
{
$socket = new Socket($this, $client);
if('open' === $client->conn->readyState)
{
$this->sockets[$socket->id]=$socket;
$socket->onconnect();
if(!empty($fn)) call_user_func($fn, $socket, $nsp);
$this->emit('connect', $socket);
$this->emit('connection', $socket);
}
else
{
echo('next called after client was closed - ignoring socket');
}
}
/**
* Removes a client. Called by each `Socket`.
*
* @api private
*/
public function remove($socket)
{
// todo $socket->id
unset($this->sockets[$socket->id]);
}
/**
* Emits to all clients.
*
* @return {Namespace} self
* @api public
*/
public function emit($ev = null)
{
$args = func_get_args();
if (isset(self::$events[$ev]))
{
call_user_func_array(array($this, 'parent::emit'), $args);
}
else
{
// set up packet object
$parserType = Parser::EVENT; // default
//if (self::hasBin($args)) { $parserType = Parser::BINARY_EVENT; } // binary
$packet = array('type'=> $parserType, 'data'=> $args );
if (is_callable(end($args)))
{
echo('Callbacks are not supported when broadcasting');
return;
}
$this->adapter->broadcast($packet, array(
'rooms'=> $this->rooms,
'flags'=> $this->flags
));
$this->rooms = array();
$this->flags = array();;
}
return $this;
}
public function send()
{
$args = func_get_args();
array_unshift($args, 'message');
$this->emit($args);
return $this;
}
public function write()
{
$args = func_get_args();
return call_user_func_array(array($this, 'send'), $args);
}
public function clients($fn)
{
$this->adapter->clients($this->rooms, $fn);
return $this;
}
/**
* Sets the compress flag.
*
* @param {Boolean} if `true`, compresses the sending data
* @return {Socket} self
* @api public
*/
public function compress($compress)
{
$this->flags['compress'] = $compress;
return $this;
}
}

View File

@@ -0,0 +1,140 @@
<?php
namespace PHPSocketIO\Parser;
use \PHPSocketIO\Parser\Parser;
use \PHPSocketIO\Event\Emitter;
use \PHPSocketIO\Debug;
class Decoder extends Emitter
{
public function __construct()
{
Debug::debug('Decoder __construct');
}
public function __destruct()
{
Debug::debug('Decoder __destruct');
}
public function add($obj)
{
if (is_string($obj))
{
$packet = self::decodeString($obj);
if(Parser::BINARY_EVENT == $packet['type'] || Parser::BINARY_ACK == $packet['type'])
{
// binary packet's json todo BinaryReconstructor
$this->reconstructor = new BinaryReconstructor(packet);
// no attachments, labeled binary but no binary data to follow
if ($this->reconstructor->reconPack->attachments === 0)
{
$this->emit('decoded', $packet);
}
} else { // non-binary full packet
$this->emit('decoded', $packet);
}
}
else if (isBuf(obj) || !empty($obj['base64']))
{ // raw binary data
if (!$this->reconstructor)
{
throw new \Exception('got binary data when not reconstructing a packet');
} else {
$packet = $this->reconstructor->takeBinaryData($obj);
if ($packet)
{ // received final buffer
$this->reconstructor = null;
$this->emit('decoded', $packet);
}
}
}
else {
throw new \Exception('Unknown type: ' + obj);
}
}
public function decodeString($str)
{
$p = array();
$i = 0;
// look up type
$p['type'] = $str[0];
if(!isset(Parser::$types[$p['type']])) return self::error();
// look up attachments if type binary
if(Parser::BINARY_EVENT == $p['type'] || Parser::BINARY_ACK == $p['type'])
{
$buf = '';
while ($str[++$i] != '-')
{
$buf .= $str[$i];
if($i == strlen(str)) break;
}
if ($buf != intval($buf) || $str[$i] != '-')
{
throw new \Exception('Illegal attachments');
}
$p['attachments'] = intval($buf);
}
// look up namespace (if any)
if(isset($str[$i + 1]) && '/' === $str[$i + 1])
{
$p['nsp'] = '';
while (++$i)
{
if ($i === strlen($str)) break;
$c = $str[$i];
if (',' === $c) break;
$p['nsp'] .= $c;
}
} else {
$p['nsp'] = '/';
}
// look up id
if(isset($str[$i+1]))
{
$next = $str[$i+1];
if ('' !== $next && strval((int)$next) === strval($next))
{
$p['id'] = '';
while (++$i)
{
$c = $str[$i];
if (null == $c || strval((int)$c) != strval($c))
{
--$i;
break;
}
$p['id'] .= $str[$i];
if($i == strlen($str)) break;
}
$p['id'] = (int)$p['id'];
}
}
// look up json data
if (isset($str[++$i]))
{
// todo try
$p['data'] = json_decode(substr($str, $i), true);
}
return $p;
}
public static function error()
{
return array(
'type'=> Parser::ERROR,
'data'=> 'parser error'
);
}
public function destroy()
{
}
}

View File

@@ -0,0 +1,76 @@
<?php
namespace PHPSocketIO\Parser;
use \PHPSocketIO\Parser\Parser;
use \PHPSocketIO\Event\Emitter;
use \PHPSocketIO\Debug;
class Encoder extends Emitter
{
public function __construct()
{
Debug::debug('Encoder __construct');
}
public function __destruct()
{
Debug::debug('Encoder __destruct');
}
public function encode($obj)
{
if(Parser::BINARY_EVENT == $obj['type'] || Parser::BINARY_ACK == $obj['type'])
{
echo new \Exception("not support BINARY_EVENT BINARY_ACK");
return array();
}
else
{
$encoding = self::encodeAsString($obj);
return array($encoding);
}
}
public static function encodeAsString($obj) {
$str = '';
$nsp = false;
// first is type
$str .= $obj['type'];
// attachments if we have them
if (Parser::BINARY_EVENT == $obj['type'] || Parser::BINARY_ACK == $obj['type'])
{
$str .= $obj['attachments'];
$str .= '-';
}
// if we have a namespace other than `/`
// we append it followed by a comma `,`
if (!empty($obj['nsp']) && '/' !== $obj['nsp'])
{
$nsp = true;
$str .= $obj['nsp'];
}
// immediately followed by the id
if (isset($obj['id']))
{
if($nsp)
{
$str .= ',';
$nsp = false;
}
$str .= $obj['id'];
}
// json data
if(isset($obj['data']))
{
if ($nsp) $str .= ',';
$str .= json_encode($obj['data']);
}
return $str;
}
}

View File

@@ -0,0 +1,63 @@
<?php
namespace PHPSocketIO\Parser;
class Parser
{
/**
* Packet type `connect`.
*
* @api public
*/
const CONNECT = 0;
/**
* Packet type `disconnect`.
*
* @api public
*/
const DISCONNECT = 1;
/**
* Packet type `event`.
*
* @api public
*/
const EVENT = 2;
/**
* Packet type `ack`.
*
* @api public
*/
const ACK = 3;
/**
* Packet type `error`.
*
* @api public
*/
const ERROR = 4;
/**
* Packet type 'binary event'
*
* @api public
*/
const BINARY_EVENT = 5;
/**
* Packet type `binary ack`. For acks with binary arguments.
*
* @api public
*/
const BINARY_ACK = 6;
public static $types = array(
'CONNECT',
'DISCONNECT',
'EVENT',
'BINARY_EVENT',
'ACK',
'BINARY_ACK',
'ERROR'
);
}

View File

@@ -0,0 +1,460 @@
<?php
namespace PHPSocketIO;
use PHPSocketIO\Event\Emitter;
use PHPSocketIO\Parser\Parser;
class Socket extends Emitter
{
public $nsp = null;
public $server = null;
public $adapter = null;
public $id = null;
public $path = '/';
public $request = null;
public $client = null;
public $conn = null;
public $rooms = array();
public $_rooms = array();
public $flags = array();
public $acks = array();
public $connected = true;
public $disconnected = false;
public static $events = array(
'error'=>'error',
'connect' => 'connect',
'disconnect' => 'disconnect',
'newListener' => 'newListener',
'removeListener' => 'removeListener'
);
public static $flagsMap = array(
'json' => 'json',
'volatile' => 'volatile',
'broadcast' => 'broadcast'
);
public function __construct($nsp, $client)
{
$this->nsp = $nsp;
$this->server = $nsp->server;
$this->adapter = $this->nsp->adapter;
$this->id = $client->id;
$this->request = $client->request;
$this->client = $client;
$this->conn = $client->conn;
$this->handshake = $this->buildHandshake();
Debug::debug('IO Socket __construct');
}
public function __destruct()
{
Debug::debug('IO Socket __destruct');
}
public function buildHandshake()
{
//todo check this->request->_query
$info = !empty($this->request->url) ? parse_url($this->request->url) : array();
$query = array();
if(isset($info['query']))
{
parse_str($info['query'], $query);
}
return array(
'headers' => isset($this->request->headers) ? $this->request->headers : array(),
'time'=> date('D M d Y H:i:s') . ' GMT',
'address'=> $this->conn->remoteAddress,
'xdomain'=> isset($this->request->headers['origin']),
'secure' => !empty($this->request->connection->encrypted),
'issued' => time(),
'url' => isset($this->request->url) ? $this->request->url : '',
'query' => $query,
);
}
public function __get($name)
{
if($name === 'broadcast')
{
$this->flags['broadcast'] = true;
return $this;
}
return null;
}
public function emit($ev = null)
{
$args = func_get_args();
if (isset(self::$events[$ev]))
{
call_user_func_array(array($this, 'parent::emit'), $args);
}
else
{
$packet = array();
// todo check
//$packet['type'] = hasBin($args) ? Parser::BINARY_EVENT : Parser::EVENT;
$packet['type'] = Parser::EVENT;
$packet['data'] = $args;
$flags = $this->flags;
// access last argument to see if it's an ACK callback
if (is_callable(end($args)))
{
if ($this->_rooms || isset($flags['broadcast']))
{
throw new Exception('Callbacks are not supported when broadcasting');
}
echo('emitting packet with ack id ' . $this->nsp->ids);
$this->acks[$this->nsp->ids] = array_pop($args);
$packet['id'] = $this->nsp->ids++;
}
if ($this->_rooms || !empty($flags['broadcast']))
{
$this->adapter->broadcast($packet, array(
'except' => array($this->id => $this->id),
'rooms'=> $this->_rooms,
'flags' => $flags
));
}
else
{
// dispatch packet
$this->packet($packet);
}
// reset flags
$this->_rooms = array();
$this->flags = array();
}
return $this;
}
/**
* Targets a room when broadcasting.
*
* @param {String} name
* @return {Socket} self
* @api public
*/
public function to($name)
{
if(!isset($this->_rooms[$name]))
{
$this->_rooms[$name] = $name;
}
return $this;
}
public function in($name)
{
return $this->to($name);
}
/**
* Sends a `message` event.
*
* @return {Socket} self
* @api public
*/
public function send()
{
$args = func_get_args();
array_unshift($args, 'message');
call_user_func_array(array($this, 'emit'), $args);
return $this;
}
public function write()
{
$args = func_get_args();
array_unshift($args, 'message');
call_user_func_array(array($this, 'emit'), $args);
return $this;
}
/**
* Writes a packet.
*
* @param {Object} packet object
* @param {Object} options
* @api private
*/
public function packet($packet, $preEncoded = false)
{
if (!$this->nsp || !$this->client) return;
$packet['nsp'] = $this->nsp->name;
//$volatile = !empty(self::$flagsMap['volatile']);
$volatile = false;
$this->client->packet($packet, $preEncoded, $volatile);
}
/**
* Joins a room.
*
* @param {String} room
* @param {Function} optional, callback
* @return {Socket} self
* @api private
*/
public function join($room)
{
if(isset($this->rooms[$room])) return $this;
$this->adapter->add($this->id, $room);
$this->rooms[$room] = $room;
return $this;
}
/**
* Leaves a room.
*
* @param {String} room
* @param {Function} optional, callback
* @return {Socket} self
* @api private
*/
public function leave($room)
{
$this->adapter->del($this->id, $room);
unset($this->rooms[$room]);
return $this;
}
/**
* Leave all rooms.
*
* @api private
*/
public function leaveAll()
{
$this->adapter->delAll($this->id);
$this->rooms = array();
}
/**
* Called by `Namespace` upon succesful
* middleware execution (ie: authorization).
*
* @api private
*/
public function onconnect()
{
$this->nsp->connected[$this->id] = $this;
$this->join($this->id);
$this->packet(array(
'type' => Parser::CONNECT)
);
}
/**
* Called with each packet. Called by `Client`.
*
* @param {Object} packet
* @api private
*/
public function onpacket($packet)
{
switch ($packet['type'])
{
case Parser::EVENT:
$this->onevent($packet);
break;
case Parser::BINARY_EVENT:
$this->onevent($packet);
break;
case Parser::ACK:
$this->onack($packet);
break;
case Parser::BINARY_ACK:
$this->onack($packet);
break;
case Parser::DISCONNECT:
$this->ondisconnect();
break;
case Parser::ERROR:
$this->emit('error', $packet['data']);
}
}
/**
* Called upon event packet.
*
* @param {Object} packet object
* @api private
*/
public function onevent($packet)
{
$args = isset($packet['data']) ? $packet['data'] : array();
if (!empty($packet['id']))
{
$args[] = $this->ack($packet['id']);
}
call_user_func_array(array($this, 'parent::emit'), $args);
}
/**
* Produces an ack callback to emit with an event.
*
* @param {Number} packet id
* @api private
*/
public function ack($id)
{
$self = $this;
$sent = false;
return function()use(&$sent, $id, $self){
// prevent double callbacks
if ($sent) return;
$args = func_get_args();
$type = hasBin($args) ? Parser::BINARY_ACK : Parser::ACK;
$self->packet(array(
'id' => $id,
'type' => $type,
'data' => $args
));
};
}
/**
* Called upon ack packet.
*
* @api private
*/
public function onack($packet)
{
$ack = $this->acks[$packet['id']];
if (is_callable($ack))
{
call_user_func($ack, $packet['data']);
unset($this->acks[$packet['id']]);
} else {
echo ('bad ack '. packet.id);
}
}
/**
* Called upon client disconnect packet.
*
* @api private
*/
public function ondisconnect()
{
echo('got disconnect packet');
$this->onclose('client namespace disconnect');
}
/**
* Handles a client error.
*
* @api private
*/
public function onerror($err)
{
if ($this->listeners('error'))
{
$this->emit('error', $err);
}
else
{
//echo('Missing error handler on `socket`.');
}
}
/**
* Called upon closing. Called by `Client`.
*
* @param {String} reason
* @param {Error} optional error object
* @api private
*/
public function onclose($reason)
{
if (!$this->connected) return $this;
$this->emit('disconnect', $reason);
$this->leaveAll();
$this->nsp->remove($this);
$this->client->remove($this);
$this->connected = false;
$this->disconnected = true;
unset($this->nsp->connected[$this->id]);
// ....
$this->nsp = null;
$this->server = null;
$this->adapter = null;
$this->request = null;
$this->client = null;
$this->conn = null;
$this->removeAllListeners();
}
/**
* Produces an `error` packet.
*
* @param {Object} error object
* @api private
*/
public function error($err)
{
$this->packet(array(
'type' => Parser::ERROR, 'data' => $err )
);
}
/**
* Disconnects this client.
*
* @param {Boolean} if `true`, closes the underlying connection
* @return {Socket} self
* @api public
*/
public function disconnect( $close = false )
{
if (!$this->connected) return $this;
if ($close)
{
$this->client->disconnect();
} else {
$this->packet(array(
'type'=> Parser::DISCONNECT
));
$this->onclose('server namespace disconnect');
}
return $this;
}
/**
* Sets the compress flag.
*
* @param {Boolean} if `true`, compresses the sending data
* @return {Socket} self
* @api public
*/
public function compress($compress)
{
$this->flags['compress'] = $compress;
return $this;
}
}

View File

@@ -0,0 +1,138 @@
<?php
namespace PHPSocketIO;
use Workerman\Worker;
use PHPSocketIO\Engine\Engine;
class SocketIO
{
public $nsps = array();
protected $_adapter = null;
public $eio = null;
public $engine = null;
protected $_origins = '*:*';
protected $_path = null;
public function __construct($port = null, $opts = array())
{
$adapter = isset($opts['adapter']) ? $opts['adapter'] : '\PHPSocketIO\DefaultAdapter';
$this->adapter($adapter);
if(isset($opts['origins']))
{
$this->origins($opts['origins']);
}
$this->sockets = $this->of('/');
if(!class_exists('Protocols\SocketIO'))
{
class_alias('PHPSocketIO\Engine\Protocols\SocketIO', 'Protocols\SocketIO');
}
if($port)
{
$worker = new Worker('SocketIO://0.0.0.0:'.$port);
$worker->name = 'PHPSocketIO';
$this->attach($worker);
}
}
public function adapter($v = null)
{
if (empty($v)) return $this->_adapter;
$this->_adapter = $v;
foreach($this->nsps as $nsp)
{
$nsp->initAdapter();
}
return $this;
}
public function origins($v = null)
{
if ($v === null) return $this->_origins;
$this->_origins = $v;
if(isset($this->engine)) {
$this->engine->origins = $this->_origins;
}
return $this;
}
public function attach($srv, $opts = array())
{
$engine = new Engine();
$this->eio = $engine->attach($srv, $opts);
// Export http server
$this->worker = $srv;
// bind to engine events
$this->bind($engine);
return $this;
}
public function bind($engine)
{
$this->engine = $engine;
$this->engine->on('connection', array($this, 'onConnection'));
$this->engine->origins = $this->_origins;
return $this;
}
public function of($name, $fn = null)
{
if($name[0] !== '/')
{
$name = "/$name";
}
if(empty($this->nsps[$name]))
{
$this->nsps[$name] = new Nsp($this, $name);
}
if ($fn)
{
$this->nsps[$name]->on('connect', $fn);
}
return $this->nsps[$name];
}
public function onConnection($engine_socket)
{
$client = new Client($this, $engine_socket);
$client->connect('/');
return $this;
}
public function on()
{
if(func_get_arg(0) === 'workerStart')
{
$this->worker->onWorkerStart = func_get_arg(1);
return;
}
return call_user_func_array(array($this->sockets, 'on'), func_get_args());
}
public function in()
{
return call_user_func_array(array($this->sockets, 'in'), func_get_args());
}
public function to()
{
return call_user_func_array(array($this->sockets, 'to'), func_get_args());
}
public function emit()
{
return call_user_func_array(array($this->sockets, 'emit'), func_get_args());
}
public function send()
{
return call_user_func_array(array($this->sockets, 'send'), func_get_args());
}
public function write()
{
return call_user_func_array(array($this->sockets, 'write'), func_get_args());
}
}

View File

@@ -0,0 +1,15 @@
<?php
spl_autoload_register(function($name){
$path = str_replace('\\', DIRECTORY_SEPARATOR ,$name);
$path = str_replace('PHPSocketIO', '', $path);
if(is_file($class_file = __DIR__ . "/$path.php"))
{
require_once($class_file);
if(class_exists($name, false))
{
return true;
}
}
return false;
});