思路
- 首先理解websocket建立连接的过程, 可以参考这篇博文。
- 可以从swoole源码中看到,swoole_http_server类的监听事件列表相比swoole_server额外接受1种新的事件类型onRequest:
/** * 注册事件回调函数,与swoole_server->on相同。swoole_http_server->on的不同之处是: * * * swoole_http_server->on不接受onConnect/onReceive回调设置 * * swoole_http_server->on 额外接受1种新的事件类型onRequest * * 事件列表 * * * onStart * * onShutdown * * onWorkerStart * * onWorkerStop * * onTimer * * onConnect * * onReceive * * onClose * * onTask * * onFinish * * onPipeMessage * * onWorkerError * * onManagerStart * * onManagerStop * WebSocket * * onOpen * * onHandshake * * onMessage * * * $http_server->on('request', function(swoole_http_request $request, swoole_http_response $response) { * $response->end("<h1>hello swoole</h1>"); * }) * * * 在收到一个完整的Http请求后,会回调此函数。回调函数共有2个参数: * * * $request,Http请求信息对象,包含了header/get/post/cookie等相关信息 * * $response,Http响应对象,支持cookie/header/status等Http操作 * * * !! $response/$request 对象传递给其他函数时,不要加&引用符号 * * @param string $event * @param callable $callback */ public function on($event, $callback) { }
- websocketServer虽然只显示提供了onOpen,onHandshake,onMessage三个事件, 但是websocketServer是继承自httpServer的, 所以其可以onRequests。
<?php namespace Swoole\WebSocket; /** * Class swoole_http_server * * 内置 Websocket 服务器 */ class Server extends \Swoole\Http\Server { /** * 向某个WebSocket客户端连接推送数据 * @param $fd * @param $data * @param bool $binary_data * @param bool $finish * @return bool */ function push($fd, $data, $binary_data = false, $finish = true) { } /** * @param $data * @param $opcode * @param bool $finish * @param bool $mask * @return string */ static function pack($data, $opcode = WEBSOCKET_OPCODE_TEXT, $finish = true, $mask = false) { } /** * 主动向websocket客户端发送关闭帧并关闭该连接 * @param int $fd * @param int $code 关闭连接的状态码,根据RFC6455,对于应用程序关闭连接状态码,取值范围为1000或4000-4999之间 * @param string $reason 关闭连接的原因,utf-8格式字符串,字节长度不超过125 * @return bool */ public function disconnect($fd, $code, $reason) { } /** * 检查连接是否为有效的WebSocket客户端连接 * @param $fd * @return bool */ public function isEstablished($fd) { } }
实现
- 安装swoole:pecl install swoole
- 在laravel中创建swoole命令:php artisan make:command swoole, 该文件如下:
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use Symfony\Component\Console\Input\InputArgument; use App\Handlers\SwooleHandler; use Swoole\WebSocket\Server; class swoole extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'swoole:action {action}'; /** * The console command description. * * @var string */ protected $description = 'test swoole socket'; private $server = ''; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * Execute the console command. * * @return mixed */ public function handle() { $this->fire(); } /* * 获取命令行参数 */ protected function getArguments(){ return array( 'action',InputArgument::REQUIRED,'start|stop|restart' ); } /* * 执行相应命令功能 */ public function fire(){ $arg = $this->argument('action'); switch($arg){ case 'start': $this->line('启动websocket服务, 监听端口: ' . config('swoole.port')); $this->start(); break; case 'stop': $this->line('停止websocket服务...'); break; case 'restart': $this->line('重启websocket服务...'); break; } } /* * 开启swoole服务 */ public function start(){ $this->server = new Server(config('swoole.host'), config('swoole.port')); $this->server->set( array( 'worker_num' => config('swoole.worker_num'), 'daemonize' => config('swoole.daemonize'), 'log_file' => config('swoole.log_file'), 'max_request' => config('swoole.max_request'), 'dispatch_mode' => config('swoole.dispatch_mode'), 'debug_mode' => config('swoole.debug_mode'), 'max_connection' => config('swoole.max_connection') ) ); $handler=SwooleHandler::getInstance(); $this->server->on('open', [$handler,'onOpen']); $this->server->on('workerstart', [$handler,'onWorkStart']); $this->server->on('message', [$handler,'onMessage']); // $this->server->on('close', [$handler,'onClose']); // websocket类是基于http的, 所以有http回调 $this->server->on('request', function ($request, $response) use($handler){ $handler->onRequest($this->server, $request, $response); }); $this->server->start(); } /* * 停止swoole服务 */ public function stop() { // kill -QUIT xxx.pid 平滑终止 } /* * 重启swoole服务 */ public function restart() { // 可通过kill -USR2 xx.pid 来平滑重启 } }
- 通过基类Server的on方法来绑定websocket的onRequest方法:
$this->server->on('request', function ($request, $response) use($handler){ $handler->onRequest($this->server, $request, $response); });
- SwooleHandler.php代码如下:
namespace App\Handlers; /* * websocket事件监听处理 */ use App\Logic\Api\TaskLogic; use Illuminate\Support\Facades\Redis; use App\Logic\Api\WebsocketLogic; class SwooleHandler { private static $_instance; //保存类实例的私有静态成员变量 private $redis; public function __construct() { } //定义私有的__clone()方法,确保单例类不能被复制或克隆 private function __clone() {} //对外提供获取唯一实例的方法 public static function getInstance() { //检测类是否被实例化 if ( ! (self::$_instance instanceof self) ) { self::$_instance = new SwooleHandler(); } return self::$_instance; } /** * 监听websocket启动事件 * @param $server * @param $request */ public function onWorkStart($server, $request) { $redis = Redis::connection(); $server->redis = $redis; } /** * 监听websocket连接事件 * @param $server * @param $request */ public function onOpen($server, $request) { $params = $request->get; $is_validate = is_validate_sign($params); // 校验websocket客户端签名 if ($is_validate && array_key_exists('canteen_id', $params)) { $canteen_id = $params['canteen_id']; // 连接成功之后将websocket fd缓存起来 WebsocketLogic::saveFd($canteen_id, $request->fd, $server->redis); $response = [ 'status' => 1, 'msg' => '连接成功', 'code' => 0 ]; $server->push($request->fd, json_encode($response)); return; } else { $response = [ 'status' => 0, 'code' => 40000, 'msg' => '校验签名不通过', ]; $server->push($request->fd, json_encode($response)); $server->close($request->fd); return; } } /** * 监听消息事件 * @param $server * @param $frame */ public function onMessage($server, $frame) { // 省略 } /** * 监听http请求事件 * @param $server * @param $request * @param $response */ public function onRequest($server, $request, $response) { $response->header('content-type', 'application/json', true); $remote_addr = $request->server['remote_addr']; if ($remote_addr != '127.0.0.1') { $resp = [ "error" => 1001, "msg" => "非法调用", ]; $response->end(json_encode($resp)); return; } $post_data = $request->post; // 从redis获取websocket fd $canteen_id = $post_data['canteen_id']; // 食堂ID $fd = WebsocketLogic::getFd($canteen_id, $server->redis); $websocket_status = $server->connection_info($fd)['websocket_status']; if ($websocket_status != 3) // 判断该websocket通道是否为连接成功的状态 { $resp = [ "errno" => 50000, "msg" => "客户端未连接", ]; $response->end(json_encode($resp)); return; } $response->end(json_encode($resp)); } }
-
webscoket成功建立连接后将fd缓存起来,onRequest发送推送数据给客户端时根据缓存起来的fd进行消息推送。
-
通过http请求发送websocket数据:
/* * 发送数据给websocket服务端 */ function push_data_to_ws($data) { $url = 'http://' . config('swoole.host') . ':' . config('swoole.port'); $headers = array('Accept' => 'application/json'); $request = Requests::post($url, $headers, $data); $resp = $request->body; return json_decode($resp, true); }
-
使用了模仿python的requests库实现的Requests For PHP, 使用curl实现也是一样的。
-
为需要推送数据至websocket客户端的业务写个接口,在其中调用push_data_to_ws方法即可。