思路
- 首先理解websocket建立连接的过程, 可以参考这篇博文。
- 可以从swoole源码中看到,swoole_http_server类的监听事件列表相比swoole_server额外接受1种新的事件类型onRequest:
/**
* 注册事件回调函数,与swoole_server->on相同。swoole_http_server->on的不同之处是:
*
* * swoole_http_server->on不接受onConnect/onReceive回调设置
* * swoole_http_server->on 额外接受1种新的事件类型onRequest
*
* 事件列表
*
* * onStart
* * onShutdown
* * onWorkerStart
* * onWorkerStop
* * onTimer
* * onConnect
* * onReceive
* * onClose
* * onTask
* * onFinish
* * onPipeMessage
* * onWorkerError
* * onManagerStart
* * onManagerStop
* WebSocket
* * onOpen
* * onHandshake
* * onMessage
*
*
* $http_server->on('request', function(swoole_http_request $request, swoole_http_response $response) {
* $response->end("<h1>hello swoole</h1>");
* })
*
*
* 在收到一个完整的Http请求后,会回调此函数。回调函数共有2个参数:
*
* * $request,Http请求信息对象,包含了header/get/post/cookie等相关信息
* * $response,Http响应对象,支持cookie/header/status等Http操作
*
*
* !! $response/$request 对象传递给其他函数时,不要加&引用符号
*
* @param string $event
* @param callable $callback
*/
public function on($event, $callback)
{
} - websocketServer虽然只显示提供了onOpen,onHandshake,onMessage三个事件, 但是websocketServer是继承自httpServer的, 所以其可以onRequests。
<?php
namespace Swoole\WebSocket;
/**
* Class swoole_http_server
*
* 内置 Websocket 服务器
*/
class Server extends \Swoole\Http\Server
{
/**
* 向某个WebSocket客户端连接推送数据
* @param $fd
* @param $data
* @param bool $binary_data
* @param bool $finish
* @return bool
*/
function push($fd, $data, $binary_data = false, $finish = true)
{
}
/**
* @param $data
* @param $opcode
* @param bool $finish
* @param bool $mask
* @return string
*/
static function pack($data, $opcode = WEBSOCKET_OPCODE_TEXT, $finish = true, $mask = false)
{
}
/**
* 主动向websocket客户端发送关闭帧并关闭该连接
* @param int $fd
* @param int $code 关闭连接的状态码,根据RFC6455,对于应用程序关闭连接状态码,取值范围为1000或4000-4999之间
* @param string $reason 关闭连接的原因,utf-8格式字符串,字节长度不超过125
* @return bool
*/
public function disconnect($fd, $code, $reason)
{
}
/**
* 检查连接是否为有效的WebSocket客户端连接
* @param $fd
* @return bool
*/
public function isEstablished($fd)
{
}
} 实现
- 安装swoole:pecl install swoole
- 在laravel中创建swoole命令:php artisan make:command swoole, 该文件如下:
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Symfony\Component\Console\Input\InputArgument;
use App\Handlers\SwooleHandler;
use Swoole\WebSocket\Server;
class swoole extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'swoole:action {action}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'test swoole socket';
private $server = '';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$this->fire();
}
/*
* 获取命令行参数
*/
protected function getArguments(){
return array(
'action',InputArgument::REQUIRED,'start|stop|restart'
);
}
/*
* 执行相应命令功能
*/
public function fire(){
$arg = $this->argument('action');
switch($arg){
case 'start':
$this->line('启动websocket服务, 监听端口: ' . config('swoole.port'));
$this->start();
break;
case 'stop':
$this->line('停止websocket服务...');
break;
case 'restart':
$this->line('重启websocket服务...');
break;
}
}
/*
* 开启swoole服务
*/
public function start(){
$this->server = new Server(config('swoole.host'), config('swoole.port'));
$this->server->set(
array(
'worker_num' => config('swoole.worker_num'),
'daemonize' => config('swoole.daemonize'),
'log_file' => config('swoole.log_file'),
'max_request' => config('swoole.max_request'),
'dispatch_mode' => config('swoole.dispatch_mode'),
'debug_mode' => config('swoole.debug_mode'),
'max_connection' => config('swoole.max_connection')
)
);
$handler=SwooleHandler::getInstance();
$this->server->on('open', [$handler,'onOpen']);
$this->server->on('workerstart', [$handler,'onWorkStart']);
$this->server->on('message', [$handler,'onMessage']);
// $this->server->on('close', [$handler,'onClose']);
// websocket类是基于http的, 所以有http回调
$this->server->on('request', function ($request, $response) use($handler){
$handler->onRequest($this->server, $request, $response);
});
$this->server->start();
}
/*
* 停止swoole服务
*/
public function stop()
{
// kill -QUIT xxx.pid 平滑终止
}
/*
* 重启swoole服务
*/
public function restart()
{
// 可通过kill -USR2 xx.pid 来平滑重启
}
} - 通过基类Server的on方法来绑定websocket的onRequest方法:
$this->server->on('request', function ($request, $response) use($handler){
$handler->onRequest($this->server, $request, $response);
}); - SwooleHandler.php代码如下:
namespace App\Handlers;
/*
* websocket事件监听处理
*/
use App\Logic\Api\TaskLogic;
use Illuminate\Support\Facades\Redis;
use App\Logic\Api\WebsocketLogic;
class SwooleHandler
{
private static $_instance; //保存类实例的私有静态成员变量
private $redis;
public function __construct()
{
}
//定义私有的__clone()方法,确保单例类不能被复制或克隆
private function __clone() {}
//对外提供获取唯一实例的方法
public static function getInstance()
{
//检测类是否被实例化
if ( ! (self::$_instance instanceof self) ) {
self::$_instance = new SwooleHandler();
}
return self::$_instance;
}
/**
* 监听websocket启动事件
* @param $server
* @param $request
*/
public function onWorkStart($server, $request)
{
$redis = Redis::connection();
$server->redis = $redis;
}
/**
* 监听websocket连接事件
* @param $server
* @param $request
*/
public function onOpen($server, $request)
{
$params = $request->get;
$is_validate = is_validate_sign($params); // 校验websocket客户端签名
if ($is_validate && array_key_exists('canteen_id', $params))
{
$canteen_id = $params['canteen_id'];
// 连接成功之后将websocket fd缓存起来
WebsocketLogic::saveFd($canteen_id, $request->fd, $server->redis);
$response = [
'status' => 1,
'msg' => '连接成功',
'code' => 0
];
$server->push($request->fd, json_encode($response));
return;
}
else {
$response = [
'status' => 0,
'code' => 40000,
'msg' => '校验签名不通过',
];
$server->push($request->fd, json_encode($response));
$server->close($request->fd);
return;
}
}
/**
* 监听消息事件
* @param $server
* @param $frame
*/
public function onMessage($server, $frame)
{
// 省略
}
/**
* 监听http请求事件
* @param $server
* @param $request
* @param $response
*/
public function onRequest($server, $request, $response)
{
$response->header('content-type', 'application/json', true);
$remote_addr = $request->server['remote_addr'];
if ($remote_addr != '127.0.0.1') {
$resp = [
"error" => 1001,
"msg" => "非法调用",
];
$response->end(json_encode($resp));
return;
}
$post_data = $request->post;
// 从redis获取websocket fd
$canteen_id = $post_data['canteen_id']; // 食堂ID
$fd = WebsocketLogic::getFd($canteen_id, $server->redis);
$websocket_status = $server->connection_info($fd)['websocket_status'];
if ($websocket_status != 3) // 判断该websocket通道是否为连接成功的状态
{
$resp = [
"errno" => 50000,
"msg" => "客户端未连接",
];
$response->end(json_encode($resp));
return;
}
$response->end(json_encode($resp));
}
} -
webscoket成功建立连接后将fd缓存起来,onRequest发送推送数据给客户端时根据缓存起来的fd进行消息推送。
-
通过http请求发送websocket数据:
/*
* 发送数据给websocket服务端
*/
function push_data_to_ws($data)
{
$url = 'http://' . config('swoole.host') . ':' . config('swoole.port');
$headers = array('Accept' => 'application/json');
$request = Requests::post($url, $headers, $data);
$resp = $request->body;
return json_decode($resp, true);
} -
使用了模仿python的requests库实现的Requests For PHP, 使用curl实现也是一样的。
-
为需要推送数据至websocket客户端的业务写个接口,在其中调用push_data_to_ws方法即可。

京公网安备 11010502036488号