微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

 

WebSocket 服务

composer require hyperf/websocket-server

  

配置文件 [config/autoload/server.PHP]

<?PHP

return [
    'mode' => SWOOLE_PROCESS,'servers' => [
        [
            'name' => 'http','type' => Server::SERVER_HTTP,'host' => '0.0.0.0','port' => 11111,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [
                SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class,'onRequest'],],[
            'name' => 'ws','type' => Server::SERVER_WEBSOCKET,'port' => 12222,'callbacks' => [
                SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class,'onHandShake'],SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class,'onMessage'],SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class,'onClose'],

  

WebSocket 服务器端代码示例

<?PHP

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */

namespace App\Controller;

use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnopenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;

class WebSocketController extends Controller implements OnMessageInterface,OnopenInterface,OnCloseInterface
{

    /**
     * 发送消息
     * @param WebSocketServer $server
     * @param Frame $frame
     */
    public function onMessage(WebSocketServer $server,Frame $frame): void
    {
        //心跳刷新缓存
        $redis = $this->container->get(\Redis::class);
        //获取所有的客户端id
        $fdList = $redis->sMembers('websocket_sjd_1');
        //如果当前客户端在客户端集合中,就刷新
        if (in_array($frame->fd,$fdList)) {
            $redis->sAdd('websocket_sjd_1',$frame->fd);
            $redis->expire('websocket_sjd_1',7200);
        }
        $server->push($frame->fd,'Recv: ' . $frame->data);

    }

    /**
     * 客户端失去链接
     * @param Server $server
     * @param int $fd
     * @param int $reactorId
     */
    public function onClose(Server $server,int $fd,int $reactorId): void
    {
        //删掉客户端id
        $redis = $this->container->get(\Redis::class);
        //移除集合中指定的value
        $redis->sRem('websocket_sjd_1',$fd);
        var_dump('closed');

    }

    /**
     * 客户端链接
     * @param WebSocketServer $server
     * @param Request $request
     */
    public function onopen(WebSocketServer $server,Request $request): void
    {
        //保存客户端id
        $redis = $this->container->get(\Redis::class);

        $res1 = $redis->sAdd('websocket_sjd_1',$request->fd);
        var_dump($res1);

        $res = $redis->expire('websocket_sjd_1',7200);
        var_dump($res);

        $server->push($request->fd,'Opened');

    }
}

  

WebSocket 前端代码

function WebSockettest() {
 if ("WebSocket" in window) {
            console.log("您的浏览器支持 WebSocket!");
            var num = 0

            // 打开一个 web socket
            var ws = new WebSocket("ws://127.0.0.1:12222");

            ws.onopen = function () {
                // Web Socket 已连接上,使用 send() 方法发送数据
                //alert("数据发送中...");
                //ws.send("发送数据");
            };

            window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
                var ping = {"type": "ping"};
                ws.send(JSON.stringify(ping));
            },5000);

            ws.onmessage = function (evt) {
                var d = JSON.parse(evt.data);
                console.log(d);
                if (d.code == 300) {
                    $(".address").text(d.address)
                }
                if (d.code == 200) {
                    var v = d.data
                    console.log(v);
                    num++
                    var str = `<div class="item">
                                    <p>${v.recordOutTime}</p>
                                    <p>${v.userOutName}</p>
                                    <p>${v.userOutNum}</p>
                                    <p>${v.doorOutName}</p>
                                </div>`
                    $(".tableHead").after(str)
                    if (num > 7) {
                        num--
                        $(".table .item:nth-last-child(1)").remove()
                    }
                }
            };

            ws.error = function (e) {
                console.log(e)
                alert(e)
            }
            ws.onclose = function () {
                // 关闭 websocket
                alert("连接已关闭...");
            };
        } else {
            alert("您的浏览器不支持 WebSocket!");
        }
    }

  

AMQP 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.PHP]

<?PHP
return [
    'default' => [
        'host' => 'localhost','port' => 5672,'user' => 'guest','password' => 'guest','vhost' => '/','pool' => [
            'min_connections' => 1,'max_connections' => 10,'connect_timeout' => 10.0,'wait_timeout' => 3.0,'heartbeat' => -1,'params' => [
            'insist' => false,'login_method' => 'AMQPLAIN','login_response' => null,'locale' => 'en_US','connection_timeout' => 3.0,'read_write_timeout' => 6.0,'context' => null,'keepalive' => false,'heartbeat' => 3,];

  

MQ 消费者代码

<?PHP
declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;

/**
 * @Consumer(exchange="hyperf",routingKey="hyperf",queue="hyperf",nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    /**
     * rabbmitMQ消费端代码
     * @param $data
     * @return string
     */
    public function consume($data): string
    {
        print_r($data);

        //获取集合中所有的value
        $redis = $this->container->get(\Redis::class);
        $fdList=$redis->sMembers('websocket_sjd_1');

        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
        foreach($fdList as $key=>$v){
            if(!empty($v)){
                $server->push((int)$v,$data);
            }
        }

        return Result::ACK;
    }

  

} 

控制器代码

/**
    * test
     * @return array
     */
    public function test()
    {
        $data = array(
            'code' => 200,'data' => [
                'userOutName' => 'ccflow','userOutNum' => '9999','recordOutTime' => date("Y-m-d H:i:s",time()),'doorOutName' => '教师公寓',]
        );
        $data = \GuzzleHttp\json_encode($data);
        $message = new DemoProducer($data);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $result = $producer->produce($message);
        var_dump($result);

        $user = $this->request->input('user','Hyperf');
        $method = $this->request->getmethod();

        return [
            'method' => $method,'message' => "{$user}.",];
    }

  

最终效果

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐