WebSocket сервер на PHP

10.10.2018

Теги: JavaScriptPHPWeb-разработкаWebSocketСокет

Протокол WebSocket предназначен для решения разных задач и снятия ограничений обмена данными между браузером и сервером. Он позволяет пересылать любые данные, на любой домен, безопасно и почти без лишнего сетевого трафика. Для установления соединения WebSocket клиент и сервер используют протокол, похожий на HTTP. Клиент формирует особый HTTP-запрос, на который сервер отвечает определенным образом.

Простой сокет-сервер

В первую очередь надо в файле php.ini расскомментировать строку, позволяющую работать с сокетами и перезапустить сервер:

extension = php_sockets.dll

Вот как выглядит простейший сокет-сервер:

<?php
function SocketServer($limit = 0) {
    $starttime = time();
    echo 'SERVER START' . PHP_EOL;

    echo 'Socket create...' . PHP_EOL;
    $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

    if (false === $socket) {
        die('Error: ' . socket_strerror(socket_last_error()) . PHP_EOL);
    }

    echo 'Socket bind...' . PHP_EOL;
    $bind = socket_bind($socket, '127.0.0.1', 7777); // привязываем к ip и порту
    if (false === $bind) {
        die('Error: ' . socket_strerror(socket_last_error()) . PHP_EOL);
    }

    echo 'Set options...' . PHP_EOL;
    // разрешаем использовать один порт для нескольких соединений
    $option = socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);
    if (false === $option) {
        die('Error: ' . socket_strerror(socket_last_error()) . PHP_EOL);
    }

    echo 'Listening socket...' . PHP_EOL;
    $listen = socket_listen($socket); // слушаем сокет
    if (false === $listen) {
        die('Error: ' . socket_strerror(socket_last_error()) . PHP_EOL);
    }

    while (true) { // бесконечный цикл ожидания подключений
        echo 'Waiting for connections...' . PHP_EOL;
        $connect = socket_accept($socket); // зависаем, пока не получим ответа
        if ($connect !== false) {
            echo 'Client connected...' . PHP_EOL;
            echo 'Send message to client...' . PHP_EOL;
            socket_write($connect, 'Hello, Client!');
        } else {
            echo 'Error: ' . socket_strerror(socket_last_error()) . PHP_EOL;
            usleep(1000);
        }

        // останавливаем сервер после $limit секунд
        if ($limit && (time() - $starttime > $limit)) {
            echo 'Closing connection...' . PHP_EOL;
            socket_close($socket);
            echo 'SERVER STOP' . PHP_EOL;
            return;
        }
    }
}

error_reporting(E_ALL); // выводим все ошибки и предупреждения
set_time_limit(0);      // бесконечное время работы скрипта
ob_implicit_flush();    // включаем вывод без буферизации

// Запускаем сервер в работу, завершение работы через 60 секунд
SocketServer(60);

Запустим его в работу:

> php.exe -f simple.php
SERVER START
Socket create...
Socket bind...
Set option...
Listening socket...
Waiting for connections...

Попробуем пообщаться с сервером с помощью telnet:

> telnet

Получив приглашение telnet, даем команду:

> open 127.0.0.1 7777

И видим сообщение от сервера:

Наш сервер в другом окне тоже встрепенулся:

WebSocket сервер

Протокол WebSocket работает над TCP. Это означает, что при соединении браузер отправляет по HTTP специальные заголовки, спрашивая: «Поддерживает ли сервер WebSocket?». Если сервер в ответных заголовках отвечает «Да, поддерживаю», то дальше HTTP прекращается и общение идёт на специальном протоколе WebSocket, который уже не имеет с HTTP ничего общего.

GET /chat HTTP/1.1
Host: websocket.server.com
Upgrade: websocket
Connection: Upgrade
Origin: http://www.example.com
Sec-WebSocket-Key: Iv8io/9s+lYFgZWcXczP8Q==
Sec-WebSocket-Version: 13

Здесь GET и Host — стандартные HTTP-заголовки, а Upgrade и Connection указывают, что браузер хочет перейти на WebSocket.

Сервер может проанализировать эти заголовки и решить, разрешает ли он WebSocket с данного домена Origin. Ответ сервера, если он понимает и разрешает WebSocket-подключение:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: hsBlbuDTkk24srzEOTBUlZAlC2g=

Для тестирования работы сервера нам нужен клиент:

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8" />
    <title>Простой WebSocket клиент</title>
    <link rel="stylesheet" href="style.css" type="text/css" />
    <script src="socket.js" type="text/javascript"></script>
</head>
<body>
    <div>
        <span>Сервер</span>
        <input id="server" type="text" value="" />
    </div>
    <div>
        <input id="connect" type="button" value="Установить соединение" />
        <input id="disconnect" type="button" value="Разорвать соединение" />
    </div>
    <div>
        <span>Сообщение</span>
        <input id="message" type="text" value="" />
        <input id="send-msg" type="button" value="Отправить сообщение" />
    </div>
    <div>
        <span>Информация</span>
        <div id="socket-info"></div>
    </div>
</body>
</html>
window.addEventListener('DOMContentLoaded', function () {

    var socket;

    // показать сообщение в #socket-info
    function showMessage(message) {
        var div = document.createElement('div');
        div.appendChild(document.createTextNode(message));
        document.getElementById('socket-info').appendChild(div);
    }

    /*
     * Установить соединение с сервером и назначить обработчики событий
     */
    document.getElementById('connect').onclick = function () {
        // новое соединение открываем, если старое соединение закрыто
        if (socket === undefined || socket.readyState !== 1) {
            socket = new WebSocket(document.getElementById('server').value);
        } else {
            showMessage('Надо закрыть уже имеющееся соединение');
        }

        /*
         * четыре функции обратного вызова: одна при получении данных и три – при изменениях в состоянии соединения
         */
        socket.onmessage = function (event) { // при получении данных от сервера
            showMessage('Получено сообщение от сервера: ' + event.data);
        }
        socket.onopen = function () { // при установке соединения с сервером
            showMessage('Соединение с сервером установлено');
        }
        socket.onerror = function(error) { // если произошла какая-то ошибка
            showMessage('Произошла ошибка: ' + error.message);
        };
        socket.onclose = function(event) { // при закрытии соединения с сервером
            showMessage('Соединение с сервером закрыто');
            if (event.wasClean) {
                showMessage('Соединение закрыто чисто');
            } else {
                showMessage('Обрыв соединения'); // например, «убит» процесс сервера
            }
            showMessage('Код: ' + event.code + ', причина: ' + event.reason);
        };
    };

    /*
     * Отправка сообщения серверу
     */
    document.getElementById('send-msg').onclick = function () {
        if (socket !== undefined && socket.readyState === 1) {
            var message = document.getElementById('message').value;
            socket.send(message);
            showMessage('Отправлено сообщение серверу: ' + message);
        } else {
            showMessage('Невозможно отправить сообщение, нет соединения');
        }
    };

    /*
     * Закрыть соединение с сервером
     */
    document.getElementById('disconnect').onclick = function () {
        if (socket !== undefined && socket.readyState === 1) {
            socket.close();
        } else {
            showMessage('Соединение с сервером уже было закрыто');
        }
    };

});
body > div {
    margin-bottom: 15px;
    overflow: hidden;
}
span {
    display: block;
    margin-bottom: 2px;
}
input {
    padding: 5px;
    box-sizing: border-box;
}
input[type="text"] {
    width: 100%;
}
input[type="button"] {
    width: 25%;
    float: left;
    margin-top: 5px;
    margin-right: 5px;
}
div#socket-info {
    padding: 5px;
    border: 1px solid #ddd;
}

Проверим его в работе. Открываем HTML-страницу в браузере и заполняем первое поле «Сервер»:

ws://echo.websocket.org

Это гарантированно работающий WebSocket echo-сервер, которые отправляет все сообщения обратно. Жмем кнопку «Установить соединение», набираем текст сообщения в поле «Сообщение», жмем кнопку «Отправить сообщение»:

А теперь код WebSocket сервера на PHP:

<?php
/**
 * Класс WebSocket сервера
 */
class WebSocketServer {

    /**
     * Функция вызывается, когда получено сообщение от клиента
     */
    public $handler;

    /**
     * IP адрес сервера
     */
    private $ip;
    /**
     * Порт сервера
     */
    private $port;
    /**
     * Сокет для принятия новых соединений, прослушивает указанный порт
     */
    private $connection;
    /**
     * Для хранения всех подключений, принятых слушающим сокетом
     */
    private $connects;

    /**
     * Ограничение по времени работы сервера
     */
    private $timeLimit = 0;
    /**
     * Время начала работы сервера
     */
    private $startTime;
    /**
     * Выводить сообщения в консоль?
     */
    private $verbose = false;
    /**
     * Записывать сообщения в log-файл?
     */
    private $logging = false;
    /**
     * Имя log-файла
     */
    private $logFile = 'ws-log.txt';
    /**
     * Ресурс log-файла
     */
    private $resource;


    public function __construct($ip = '127.0.0.1', $port = 7777) {
        $this->ip = $ip;
        $this->port = $port;

        // эта функция вызывается, когда получено сообщение от клиента;
        // при создании экземпляра класса должна быть переопределена
        $this->handler = function($connection, $data) {
            $message = '[' . date('r') . '] Получено сообщение от клиента: ' . $data . PHP_EOL;
            if ($this->verbose) {
                echo $message;
            }
            if ($this->logging) {
                fwrite($this->resource, $message);
            }
        };
    }

    public function __destruct() {
        if (is_resource($this->connection)) {
            $this->stopServer();
        }
        if ($this->logging) {
            fclose($this->resource);
        }
    }

    /**
     * Дополнительные настройки для отладки
     */
    public function settings($timeLimit = 0, $verbose = false, $logging = false, $logFile = 'ws-log.txt') {
        $this->timeLimit = $timeLimit;
        $this->verbose = $verbose;
        $this->logging = $logging;
        $this->logFile = $logFile;
        if ($this->logging) {
            $this->resource = fopen($this->logFile, 'a');
        }
    }

    /**
     * Выводит сообщение в консоль и/или записывает в лог-файл
     */
    private function debug($message) {
        $message = '[' . date('r') . '] ' . $message . PHP_EOL;
        if ($this->verbose) {
            echo $message;
        }
        if ($this->logging) {
            fwrite($this->resource, $message);
        }
    }

    /**
     * Отправляет сообщение клиенту
     */
    public static function response($connect, $data) {
        socket_write($connect, self::encode($data));
    }

    /**
     * Запускает сервер в работу
     */
    public function startServer() {

        $this->debug('Try start server...');

        $this->connection = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

        if (false === $this->connection) {
            $this->debug('Error socket_create(): ' . socket_strerror(socket_last_error()));
            return;
        }

        $bind = socket_bind($this->connection, $this->ip, $this->port); // привязываем к ip и порту
        if (false === $bind) {
            $this->debug('Error socket_bind(): ' . socket_strerror(socket_last_error()));
            return;
        }

        // разрешаем использовать один порт для нескольких соединений
        $option = socket_set_option($this->connection, SOL_SOCKET, SO_REUSEADDR, 1);
        if (false === $option) {
            $this->debug('Error socket_set_option(): ' . socket_strerror(socket_last_error()));
            return;
        }

        $listen = socket_listen($this->connection); // слушаем сокет
        if (false === $listen) {
            $this->debug('Error socket_listen(): ' . socket_strerror(socket_last_error()));
            return;
        }

        $this->debug('Server is running...');

        $this->connects = array($this->connection);
        $this->startTime = time();

        while (true) {

            $this->debug('Waiting for connections...');

            // создаем копию массива, так что массив $this->connects не будет изменен функцией socket_select()
            $read = $this->connects;
            $write = $except = null;

            /*
             * Сокет $this->connection только прослушивает порт на предмет новых соединений. Как только поступило
             * новое соединение, мы создаем новый ресурс сокета с помощью socket_accept() и помещаем его в массив
             * $this->connects для дальнейшего чтения из него.
             */

            if ( ! socket_select($read, $write, $except, null)) { // ожидаем сокеты, доступные для чтения (без таймаута)
                break;
            }

            // если слушающий сокет есть в массиве чтения, значит было новое соединение
            if (in_array($this->connection, $read)) {
                // принимаем новое соединение и производим рукопожатие
                if (($connect = socket_accept($this->connection)) && $this->handshake($connect)) {
                    $this->debug('New connection accepted');
                    $this->connects[] = $connect; // добавляем его в список необходимых для обработки
                }
                // удаляем слушающий сокет из массива для чтения
                unset($read[ array_search($this->connection, $read) ]);
            }

            foreach ($read as $connect) { // обрабатываем все соединения, в которых есть данные для чтения
                $data = socket_read($connect, 100000);
                $decoded = self::decode($data);
                // если клиент не прислал данных или хочет разорвать соединение
                if (false === $decoded || 'close' === $decoded['type']) {
                    $this->debug('Connection closing');
                    socket_write($connect, self::encode('  Closed on client demand', 'close'));
                    socket_shutdown($connect);
                    socket_close($connect);
                    unset($this->connects[ array_search($connect, $this->connects) ]);
                    $this->debug('Closed successfully');
                    continue;
                }
                // получено сообщение от клиента, вызываем пользовательскую
                // функцию, чтобы обработать полученные данные
                if (is_callable($this->handler)) {
                    call_user_func($this->handler, $connect, $decoded['payload']);
                }
            }

            // если истекло ограничение по времени, останавливаем сервер
            if ($this->timeLimit && time() - $this->startTime > $this->timeLimit) {
                $this->debug('Time limit. Stopping server.');
                $this->stopServer();
                return;
            }

        }

    }

    /**
     * Останавливает работу сервера
     */
    public function stopServer() {
        // закрываем слушающий сокет
        socket_close($this->connection);
        if (!empty($this->connects)) { // отправляем все клиентам сообщение о разрыве соединения
            foreach ($this->connects as $connect) {
                if (is_resource($connect)) {
                    socket_write($connect, self::encode('  Closed on server demand', 'close'));
                    socket_shutdown($connect);
                    socket_close($connect);
                }
            }
        }
    }

    /**
     * Для кодирования сообщений перед отправкой клиенту
     */
    private static function encode($payload, $type = 'text', $masked = false) {
        $frameHead = array();
        $payloadLength = strlen($payload);

        switch ($type) {
            case 'text':
                // first byte indicates FIN, Text-Frame (10000001):
                $frameHead[0] = 129;
                break;
            case 'close':
                // first byte indicates FIN, Close Frame(10001000):
                $frameHead[0] = 136;
                break;
            case 'ping':
                // first byte indicates FIN, Ping frame (10001001):
                $frameHead[0] = 137;
                break;
            case 'pong':
                // first byte indicates FIN, Pong frame (10001010):
                $frameHead[0] = 138;
                break;
        }

        // set mask and payload length (using 1, 3 or 9 bytes)
        if ($payloadLength > 65535) {
            $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 255 : 127;
            for ($i = 0; $i < 8; $i++) {
                $frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
            }
            // most significant bit MUST be 0
            if ($frameHead[2] > 127) {
                return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
            }
        } elseif ($payloadLength > 125) {
            $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 254 : 126;
            $frameHead[2] = bindec($payloadLengthBin[0]);
            $frameHead[3] = bindec($payloadLengthBin[1]);
        } else {
            $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
        }

        // convert frame-head to string:
        foreach (array_keys($frameHead) as $i) {
            $frameHead[$i] = chr($frameHead[$i]);
        }
        if ($masked === true) {
            // generate a random mask:
            $mask = array();
            for ($i = 0; $i < 4; $i++) {
                $mask[$i] = chr(rand(0, 255));
            }
            $frameHead = array_merge($frameHead, $mask);
        }
        $frame = implode('', $frameHead);

        // append payload to frame:
        for ($i = 0; $i < $payloadLength; $i++) {
            $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
        }

        return $frame;
    }

    /**
     * Для декодирования сообщений, полученных от клиента
     */
    private static function decode($data) {
        if ( ! strlen($data)) {
            return false;
        }

        $unmaskedPayload = '';
        $decodedData = array();

        // estimate frame type:
        $firstByteBinary = sprintf('%08b', ord($data[0]));
        $secondByteBinary = sprintf('%08b', ord($data[1]));
        $opcode = bindec(substr($firstByteBinary, 4, 4));
        $isMasked = ($secondByteBinary[0] == '1') ? true : false;
        $payloadLength = ord($data[1]) & 127;

        // unmasked frame is received:
        if (!$isMasked) {
            return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
        }

        switch ($opcode) {
            // text frame:
            case 1:
                $decodedData['type'] = 'text';
                break;
            case 2:
                $decodedData['type'] = 'binary';
                break;
            // connection close frame:
            case 8:
                $decodedData['type'] = 'close';
                break;
            // ping frame:
            case 9:
                $decodedData['type'] = 'ping';
                break;
            // pong frame:
            case 10:
                $decodedData['type'] = 'pong';
                break;
            default:
                return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
        }

        if ($payloadLength === 126) {
            $mask = substr($data, 4, 4);
            $payloadOffset = 8;
            $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
        } elseif ($payloadLength === 127) {
            $mask = substr($data, 10, 4);
            $payloadOffset = 14;
            $tmp = '';
            for ($i = 0; $i < 8; $i++) {
                $tmp .= sprintf('%08b', ord($data[$i + 2]));
            }
            $dataLength = bindec($tmp) + $payloadOffset;
            unset($tmp);
        } else {
            $mask = substr($data, 2, 4);
            $payloadOffset = 6;
            $dataLength = $payloadLength + $payloadOffset;
        }

        /**
         * We have to check for large frames here. socket_recv cuts at 1024 bytes
         * so if websocket-frame is > 1024 bytes we have to wait until whole
         * data is transferd.
         */
        if (strlen($data) < $dataLength) {
            return false;
        }

        if ($isMasked) {
            for ($i = $payloadOffset; $i < $dataLength; $i++) {
                $j = $i - $payloadOffset;
                if (isset($data[$i])) {
                    $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
                }
            }
            $decodedData['payload'] = $unmaskedPayload;
        } else {
            $payloadOffset = $payloadOffset - 4;
            $decodedData['payload'] = substr($data, $payloadOffset);
        }

        return $decodedData;
    }

    /**
     * «Рукопожатие», т.е. отправка заголовков согласно протоколу WebSocket
     */
    private function handshake($connect) {

        $info = array();

        $data = socket_read($connect, 1000);
        $lines = explode("\r\n", $data);
        foreach ($lines as $i => $line) {
            if ($i) {
                if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) {
                    $info[$matches[1]] = $matches[2];
                }
            } else {
                $header = explode(' ', $line);
                $info['method'] = $header[0];
                $info['uri'] = $header[1];
            }
            if (empty(trim($line))) break;
        }

        // получаем адрес клиента
        $ip = $port = null;
        if ( ! socket_getpeername($connect, $ip, $port)) {
            return false;
        }
        $info['ip'] = $ip;
        $info['port'] = $port;

        if (empty($info['Sec-WebSocket-Key'])) {
            return false;
        }

        // отправляем заголовок согласно протоколу вебсокета
        $SecWebSocketAccept = 
            base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
        $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
                   "Upgrade: websocket\r\n" .
                   "Connection: Upgrade\r\n" .
                   "Sec-WebSocket-Accept:".$SecWebSocketAccept."\r\n\r\n";
        socket_write($connect, $upgrade);

        return true;

    }

}

Для тестирования напишем небольшой PHP-скрипт, который запускает в работу сервер и все сообщения клиента отправляет обратно (echo-сервер):

<?php 
error_reporting(E_ALL);
set_time_limit(0);
ob_implicit_flush();

require 'WebSocketServer.class.php';

$server = new WebSocketServer('127.0.0.1', 7777);
// максимальное время работы 100 секунд, выводить сообщения в консоль
$server->settings(100, true);

// эта функция вызывается, когда получено сообщение от клиента
$server->handler = function($connect, $data) {
    // полученные от клиента данные отправляем обратно
    WebSocketServer::response($connect, $data);
};

$server->startServer();

Запускаем скрипт в работу:

> php.exe -f echo-server.php
[Fri, 12 Oct 2018 15:08:13 +0300] Try start server...
[Fri, 12 Oct 2018 15:08:13 +0300] Server is running...
[Fri, 12 Oct 2018 15:08:13 +0300] Waiting for connections...

Еще один пример использования сервера — клиент отправляет команды, а сервер их выполняет:

<?php 
error_reporting(E_ALL);
set_time_limit(0);
ob_implicit_flush();

require 'WebSocketServer.class.php';

$server = new WebSocketServer('127.0.0.1', 7777);
// максимальное время работы 100 секунд, выводить сообщения в консоль
$server->settings(100, true);

// эта функция вызывается, когда получено сообщение от клиента
$server->handler = function($connect, $data) {
    // анализируем поступившую команду и даем ответ
    if ( ! in_array($data, array('date', 'time', 'country', 'city'))) {
        WebSocketServer::response($connect, 'Неизвестная команда');
        return;
    }
    switch ($data) {
        case 'date'   : $response = date('d.m.Y'); break;
        case 'time'   : $response = date('H:i:s'); break;
        case 'country': $response = 'Россия';      break;
        case 'city'   : $response = 'Москва';      break;
    }
    WebSocketServer::response($connect, $response);
};

$server->startServer();

Альтернативная реализация WebSocket сервера с использованием функций для работы с потоками:

<?php
/**
 * Класс WebSocket сервера
 */
class WebSocketServer {

    /**
     * Функция вызывается, когда получено сообщение от клиента
     */
    public $handler;

    /**
     * IP адрес сервера
     */
    private $ip;
    /**
     * Порт сервера
     */
    private $port;
    /**
     * Для хранения слушающего сокета потока
     */
    private $connection;
    /**
     * Для хранения всех подключений
     */
    private $connects;

    /**
     * Ограничение по времени работы сервера
     */
    private $timeLimit = 0;
    /**
     * Время начала работы сервера
     */
    private $startTime;
    /**
     * Выводить сообщения в консоль?
     */
    private $verbose = false;
    /**
     * Записывать сообщения в log-файл?
     */
    private $logging = false;
    /**
     * Имя log-файла
     */
    private $logFile = 'ws-log.txt';
    /**
     * Ресурс log-файла
     */
    private $resource;


    public function __construct($ip = '127.0.0.1', $port = 7777) {
        $this->ip = $ip;
        $this->port = $port;

        // эта функция вызывается, когда получено сообщение от клиента;
        // при создании экземпляра класса должна быть переопределена
        $this->handler = function($connection, $data) {
            $message = '[' . date('r') . '] Получено сообщение от клиента: ' . $data . PHP_EOL;
            if ($this->verbose) {
                echo $message;
            }
            if ($this->logging) {
                fwrite($this->resource, $message);
            }
        };
    }

    public function __destruct() {
        if (is_resource($this->connection)) {
            $this->stopServer();
        }
        if ($this->logging) {
            fclose($this->resource);
        }
    }

    /**
     * Дополнительные настройки для отладки
     */
    public function settings($timeLimit = 0, $verbose = false, $logging = false, $logFile = 'ws-log.txt') {
        $this->timeLimit = $timeLimit;
        $this->verbose = $verbose;
        $this->logging = $logging;
        $this->logFile = $logFile;
        if ($this->logging) {
            $this->resource = fopen($this->logFile, 'a');
        }
    }

    /**
     * Выводит сообщение в консоль или записывает в лог-файл
     */
    private function debug($message) {
        $message = '[' . date('r') . '] ' . $message . PHP_EOL;
        if ($this->verbose) {
            echo $message;
        }
        if ($this->logging) {
            fwrite($this->resource, $message);
        }
    }

    /**
     * Отправляет сообщение клиенту
     */
    public static function response($connect, $data) {
        fwrite($connect, self::encode($data));
    }

    /**
     * Запускает сервер в работу
     */
    public function startServer() {
        
        $this->debug('Try start server...');

        $this->connection = stream_socket_server('tcp://' . $this->ip . ':' . $this->port, $errno, $errstr);
        
        if ( ! $this->connection) {
            $this->debug('Cannot start server: ' .$errstr. '(' .$errno. ')');
            return false;
        }

        $this->debug('Server is running...');

        $this->connects = array();
        $this->startTime = time();

        while (true) {

            $this->debug('Waiting for connections...');

            // формируем массив прослушиваемых сокетов
            $read = $this->connects;
            $read[] = $this->connection;
            $write = $except = null;

            if ( ! stream_select($read, $write, $except, null)) { // ожидаем сокеты доступные для чтения (без таймаута)
                break;
            }

            if (in_array($this->connection, $read)) { // есть новое соединение
                // принимаем новое соединение и производим рукопожатие
                if (($connect = stream_socket_accept($this->connection, -1)) && $this->handshake($connect)) {
                    $this->debug('New connection accepted');
                    $this->connects[] = $connect; // добавляем его в список необходимых для обработки
                }
                unset($read[ array_search($this->connection, $read) ]);
            }

            foreach ($read as $connect) { // обрабатываем все соединения
                $data = fread($connect, 100000);
                $decoded = self::decode($data);
                // если клиент не прислал данных или хочет разорвать соединение
                if (false === $decoded || 'close' === $decoded['type']) {
                    $this->debug('Connection closing');
                    fwrite($connect, self::encode('  Closed on client demand', 'close'));
                    fclose($connect);
                    unset($this->connects[ array_search($connect, $this->connects) ]);
                    $this->debug('Closed successfully');
                    continue;
                }
                // получено сообщение от клиента, вызываем пользовательскую
                // функцию, чтобы обработать полученные данные
                if (is_callable($this->handler)) {
                    call_user_func($this->handler, $connect, $decoded['payload']);
                }
            }

            // если истекло ограничение по времени, останавливаем сервер
            if ($this->timeLimit && time() - $this->startTime > $this->timeLimit) {
                $this->debug('Time limit. Stopping server.');
                $this->stopServer();
                return;
            }
        }
    }

    /**
     * Останавливает работу сервера
     */
    public function stopServer() {
        fclose($this->connection); // закрываем слушающий сокет
        if (!empty($this->connects)) { // отправляем все клиентам сообщение о разрыве соединения
            foreach ($this->connects as $connect) {
                if (is_resource($connect)) {
                    fwrite($connect, self::encode('  Closed on server demand', 'close'));
                    fclose($connect);
                }
            }
        }
    }

    /**
     * Для кодирования сообщений перед отправкой клиенту
     */
    private static function encode($payload, $type = 'text', $masked = false) {
        $frameHead = array();
        $payloadLength = strlen($payload);

        switch ($type) {
            case 'text':
                // first byte indicates FIN, Text-Frame (10000001):
                $frameHead[0] = 129;
                break;
            case 'close':
                // first byte indicates FIN, Close Frame(10001000):
                $frameHead[0] = 136;
                break;
            case 'ping':
                // first byte indicates FIN, Ping frame (10001001):
                $frameHead[0] = 137;
                break;
            case 'pong':
                // first byte indicates FIN, Pong frame (10001010):
                $frameHead[0] = 138;
                break;
        }

        // set mask and payload length (using 1, 3 or 9 bytes)
        if ($payloadLength > 65535) {
            $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 255 : 127;
            for ($i = 0; $i < 8; $i++) {
                $frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
            }
            // most significant bit MUST be 0
            if ($frameHead[2] > 127) {
                return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
            }
        } elseif ($payloadLength > 125) {
            $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
            $frameHead[1] = ($masked === true) ? 254 : 126;
            $frameHead[2] = bindec($payloadLengthBin[0]);
            $frameHead[3] = bindec($payloadLengthBin[1]);
        } else {
            $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
        }

        // convert frame-head to string:
        foreach (array_keys($frameHead) as $i) {
            $frameHead[$i] = chr($frameHead[$i]);
        }
        if ($masked === true) {
            // generate a random mask:
            $mask = array();
            for ($i = 0; $i < 4; $i++) {
                $mask[$i] = chr(rand(0, 255));
            }
            $frameHead = array_merge($frameHead, $mask);
        }
        $frame = implode('', $frameHead);

        // append payload to frame:
        for ($i = 0; $i < $payloadLength; $i++) {
            $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
        }

        return $frame;
    }

    /**
     * Для декодирования сообщений, полученных от клиента
     */
    private static function decode($data) {
        if ( ! strlen($data)) {
            return false;
        }

        $unmaskedPayload = '';
        $decodedData = array();

        // estimate frame type:
        $firstByteBinary = sprintf('%08b', ord($data[0]));
        $secondByteBinary = sprintf('%08b', ord($data[1]));
        $opcode = bindec(substr($firstByteBinary, 4, 4));
        $isMasked = ($secondByteBinary[0] == '1') ? true : false;
        $payloadLength = ord($data[1]) & 127;

        // unmasked frame is received:
        if (!$isMasked) {
            return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
        }

        switch ($opcode) {
            // text frame:
            case 1:
                $decodedData['type'] = 'text';
                break;
            case 2:
                $decodedData['type'] = 'binary';
                break;
            // connection close frame:
            case 8:
                $decodedData['type'] = 'close';
                break;
            // ping frame:
            case 9:
                $decodedData['type'] = 'ping';
                break;
            // pong frame:
            case 10:
                $decodedData['type'] = 'pong';
                break;
            default:
                return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
        }

        if ($payloadLength === 126) {
            $mask = substr($data, 4, 4);
            $payloadOffset = 8;
            $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
        } elseif ($payloadLength === 127) {
            $mask = substr($data, 10, 4);
            $payloadOffset = 14;
            $tmp = '';
            for ($i = 0; $i < 8; $i++) {
                $tmp .= sprintf('%08b', ord($data[$i + 2]));
            }
            $dataLength = bindec($tmp) + $payloadOffset;
            unset($tmp);
        } else {
            $mask = substr($data, 2, 4);
            $payloadOffset = 6;
            $dataLength = $payloadLength + $payloadOffset;
        }

        /**
         * We have to check for large frames here. socket_recv cuts at 1024 bytes
         * so if websocket-frame is > 1024 bytes we have to wait until whole
         * data is transferd.
         */
        if (strlen($data) < $dataLength) {
            return false;
        }

        if ($isMasked) {
            for ($i = $payloadOffset; $i < $dataLength; $i++) {
                $j = $i - $payloadOffset;
                if (isset($data[$i])) {
                    $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
                }
            }
            $decodedData['payload'] = $unmaskedPayload;
        } else {
            $payloadOffset = $payloadOffset - 4;
            $decodedData['payload'] = substr($data, $payloadOffset);
        }

        return $decodedData;
    }

    /**
     * «Рукопожатие», т.е. отправка заголовков согласно протоколу WebSocket
     */
    private function handshake($connect) {
        $info = array();

        $line = fgets($connect);
        $header = explode(' ', $line);
        $info['method'] = $header[0];
        $info['uri'] = $header[1];

        // считываем заголовки из соединения
        while ($line = rtrim(fgets($connect))) {
            if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) {
                $info[$matches[1]] = $matches[2];
            } else {
                break;
            }
        }

        // получаем адрес клиента
        $address = explode(':', stream_socket_get_name($connect, true));
        $info['ip'] = $address[0];
        $info['port'] = $address[1];

        if (empty($info['Sec-WebSocket-Key'])) {
            return false;
        }

        // отправляем заголовок согласно протоколу вебсокета
        $SecWebSocketAccept = 
            base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
        $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
                   "Upgrade: websocket\r\n" .
                   "Connection: Upgrade\r\n" .
                   "Sec-WebSocket-Accept:".$SecWebSocketAccept."\r\n\r\n";
        fwrite($connect, $upgrade);

        return $info;
    }

}

Дополнительно

Поиск: HandShake • JavaScript • PHP • Server • Socket • Web-разработка • WebSocket • Клиент • Протокол • Сервер • Сокет

Каталог оборудования
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
Производители
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
Функциональные группы
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.