微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

阿里云-redis性能测试(代码实现)

什么是发布/订阅

Pub/Sub功能(means Publish,Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。
同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,Redis作为一个pub/sub的server,在订阅者和发布者之间起到了消息路由的功能

测试说明

Redis 提供了消息发布(pub)与订阅(sub)功能。即一个 client 发布消息,其他多个 client 订阅消息。
需要注意的是 阿里Redis 发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。
此外,消息发布者(即 publish 客户端)无需独占与服务器端的连接,您可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如 List 操作等)。但是,消息订阅者(即 subscribe 客户端)需要独占与服务器端的连接,即进行 subscribe 期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用,消息发送端和订阅端是独立的不耦合的。

测试目的

我们的应用服务器和redis都是阿里提供的redis,我们这里采用内网访问redis。那是不是说外网不行呢,肯定是可以的,但是不建议这么做。因为都知道内网肯定快很多嘛。因此这里只针对内网进行测试。看一下在单线程情况下发布订阅10000次的时间消耗。当然实际上发布者何订阅者可以多个,并且消息的channel也可以是多个的。本地测试的目的很简单:一是加强对redis的了解和使用,二是能在项目开发中发挥redis作用

测试工具

使用java操作redis的api jedis 分别定义好发布者何订阅者,将代码部署到线上测试服进行测试。

利用redis自带的命令简单使用
开启订阅

5017cf8990fb4288.m.cnhza.kvstore.aliyuncs.com:6379> psubscribe fs
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "fs"
3) (integer) 1
1) "pmessage"
2) "fs"
3) "fs"
4) "im a fish1"
1) "pmessage"
2) "fs"
3) "fs"
4) "im a fish1"

发布者发布消息

5017cf8990fb4288.m.cnhza.kvstore.aliyuncs.com:6379> publish fs 'im a fish1'
(integer) 0
5017cf8990fb4288.m.cnhza.kvstore.aliyuncs.com:6379> publish fs2 'im a fish2'
(integer) 0

代码如下

发布者

package com.study.redis.pubsub;

import org.apache.log4j.Logger;

import redis.clients.jedis.Jedis;

/**

  • @author wangkai

  • @2016年11月18日 下午5:20:42

  • @desc:消息发布者 (即 publish client)
    */
    public class KVStorePubClient {

    private Logger LOG = Logger.getLogger(KVStorePubClient.class.getName());

    private Jedis jedis;//

    public KVStorePubClient(String host,int port,String password) {
    LOG.info("消息发布端------");
    jedis = new Jedis(host,port);
    // KVStore的实例ID及密码
    String authString = jedis.auth(password);// kvstore_instance_id:password
    if (!authString.equals("OK")) {
    LOG.error("AUTH Failed: " + authString);
    return;
    }
    }

    public void pub(String channel,String message) {
    LOG.info(" >>> 发布(PUBLISH) > Channel:" + channel

    • " > 发送出的Message:" + message);
      jedis.publish(channel,message);

    }

    public void close(String channel) {
    LOG.info(" >>> 发布(PUBLISH)结束 > Channel:" + channel

    • " > Message:quit");
      // 消息发布者结束发送,即发送一个“quit”消息;
      jedis.publish(channel,"quit");
      }

}

订阅

package com.study.redis.pubsub;

import org.apache.log4j.Logger;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**

  • @author wangkai
  • @2016年11月18日 下午5:36:40
  • @desc:消息订阅者 (即 subscribe client)
    */
    public class KVStoreSubClient extends Thread {
    private Logger LOG = Logger.getLogger(this.getClass().getName());
    private Jedis jedis;
    private String channel;
    private JedisPubSub listener;
    public KVStoreSubClient(String host,String password){
    LOG.info("消息订阅端--------");
    jedis = new Jedis(host,port);
    //ApSaraDB for Redis的实例ID及密码
    String authString = jedis.auth(password);// kvstore_instance_id:password
    if (!authString.equals("OK")) {
    LOG.error("AUTH Failed: " + authString);
    return;
    }
    }
    public void setChannelAndListener(JedisPubSub listener,String channel){
    this.listener=listener;
    this.channel=channel;
    }
    private void subscribe(){
    if(listener==null || channel==null){
    LOG.error("Error:SubClient> listener or channel is null");
    }
    LOG.info(" >>> 订阅(SUBSCRIBE) > Channel:"+channel);
    //接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅
    jedis.subscribe(listener,channel);
    }
    // 主动取消订阅
    public void unsubscribe(String channel){
    LOG.info(" >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel);
    listener.unsubscribe(channel);
    }
    @Override
    public void run() {
    try{
    LOG.info("----------订阅消息SUBSCRIBE 开始-------");
    subscribe();
    LOG.info("----------订阅消息SUBSCRIBE 结束-------");
    }catch(Exception e){
    LOG.error("消息订阅失败",e);
    unsubscribe(channel);// 异常发生时主动取消订阅
    }
    }
    }

监听器

package com.study.redis.pubsub;


import org.apache.log4j.Logger;

import redis.clients.jedis.JedisPubSub;

/**

  • @author wangkai

  • @2016年11月18日 下午5:36:18

  • @desc:消息监听者
    */
    public class KVStoreMessageListener extends JedisPubSub {

    private Logger LOG = Logger.getLogger(KVStoreMessageListener.class.getName());

    //监听到订阅频道接受到消息时的回调
    @Override
    public void onMessage(String channel,String message) {
    LOG.info(" <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
    //当接收到的message为quit时,取消订阅(被动方式)
    if(message.equalsIgnoreCase("quit")){
    this.unsubscribe(channel);
    }
    }
    // 监听到订阅模式接受到消息时的回调
    @Override
    public void onPMessage(String pattern,String channel,String message) {
    // Todo Auto-generated method stub
    }
    // 订阅频道时的回调
    @Override
    public void onSubscribe(String channel,int subscribedChannels) {
    // Todo Auto-generated method stub

    // 取消订阅频道时的回调
    @Override
    public void onUnsubscribe(String channel,int subscribedChannels) {
    // Todo Auto-generated method stub
    }
    // 取消订阅模式时的回调
    @Override
    public void onPUnsubscribe(String pattern,int subscribedChannels) {
    // Todo Auto-generated method stub
    }
    // 订阅频道模式时的回调
    @Override
    public void onPSubscribe(String pattern,int subscribedChannels) {
    // Todo Auto-generated method stub
    }
    }

测试入口

package com.study.redis.pubsub;


import java.util.UUID;

import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

import redis.clients.jedis.JedisPubSub;

public class RedisApp {

private static Logger LOG = Logger.getLogger(Re<a href="https://www.jb51.cc/tag/dis/" target="_blank" class="keywords">dis</a>App.class.getName());
// Ap<a href="https://www.jb51.cc/tag/Sara/" target="_blank" class="keywords">Sara</a>DB for Re<a href="https://www.jb51.cc/tag/dis/" target="_blank" class="keywords">dis</a>的连接信息,从控制台可以获得
// 外网地址
// static final String host = "xxx.xxx.xxx.xxx";
static final String host = "*******.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password = "xxxxxxxxx";// password

public static void main(String[] args) throws Exception {
    String log4jConfPath = "./log4j.properties";
    PropertyCon<a href="https://www.jb51.cc/tag/fig/" target="_blank" class="keywords">fig</a>urator.con<a href="https://www.jb51.cc/tag/fig/" target="_blank" class="keywords">fig</a>ureAndWatch(log4jConfPath);
    LOG.info("测试开始---------->>");
    KVStorePubClient pubClient = new KVStorePubClient(host,port,password);
    final String channel = "KVStore频道-A";
    // 消息发送者开始<a href="https://www.jb51.cc/tag/faxiaoxi/" target="_blank" class="keywords">发消息</a>,此时还无人<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>,所以此消息不会被接收
    pubClient.pub(channel,"Aliyun消息1:(此时还无人<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>,所以此消息不会被接收)");
    // 消息接收者
    KVStoreSubClient subClient = new KVStoreSubClient(host,password);
    Je<a href="https://www.jb51.cc/tag/dis/" target="_blank" class="keywords">dis</a>PubSub listener = new KVStoreMessageListener();
    subClient.setChannelAndListener(listener,channel);
    // 消息接收者开始<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>
    subClient.start();
    // 从main入口<a href="https://www.jb51.cc/tag/huoqu/" target="_blank" class="keywords">获取</a>参数
    Integer count = 100;
    if (args != null &amp;&amp; args.length != 0) {
        String string = args[0];
        count = Integer.valueOf(string);
    }
    // 消息发送者继续<a href="https://www.jb51.cc/tag/faxiaoxi/" target="_blank" class="keywords">发消息</a>
    long start = Sy<a href="https://www.jb51.cc/tag/stem/" target="_blank" class="keywords">stem</a>.currentTimeMillis();
    for (int i = 0; i < count; i++) {
        String message = UUID.randomUUID().toString();
        pubClient.pub(channel,message);
        // Thread.sleep(1000);
    }
    long end = Sy<a href="https://www.jb51.cc/tag/stem/" target="_blank" class="keywords">stem</a>.currentTimeMillis();
    LOG.info(count + "次发布<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>总共耗时: " + (end - start) + "ms");
    LOG.info(count + "次发布<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>QPS: " + count/((end - start)/1000));
    // 消息接收者主动取消<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>
    subClient.unsubscribe(channel);

    Thread.sleep(1000);
    pubClient.pub(channel,"Aliyun消息2:(此时<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>取消,所以此消息不会被接收)");
    // 消息发布者结束发送,即发送<a href="https://www.jb51.cc/tag/yige/" target="_blank" class="keywords">一个</a>“quit”消息;
    // 此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。
    pubClient.close(channel);
}

}

测试结果

单线程单channel 10000次发布订阅时间消耗

27382 INFO  2016-11-29 10:39:40965 com.study.redis.pubsub.RedisApp 10000次发布订阅总共耗时: 27268ms

发布订阅的不足之处归纳

  • 数据可靠性无法保证

一个redis-cli发布消息n个redis-cli接受消息。消息的发布是无状态的,即发布完消息后该redis-cli便在理会该消息是否被接受到,是否在传输过程中丢失,即对于发布者来说,消息是”即发即失”的.因此在使用时要多加考虑。

  • 扩展性太差

不能通过增加消费者来加快消耗发布者的写入的数据,如果发布者发布的消息很多,则数据阻塞在通道中已等待被消费着来消耗。阻塞时间越久,数据丢失的风险越大(网络或者服务器的一个不稳定就会导致数据的丢失)。

  • 资源消耗较高

在pub/sub中 消息发布者不需要独占一个Redis链接,而消费者则需要单独占用一个Redis链接,在Java中便不得独立出分出一个线程来处理消费者。这种场景一般对应这多个消费者,此时则有着过高的资源消耗。

那有没有方法解决这一不足呢?值得思考...

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐