什么是发布/订阅
Pub/Sub功能(means Publish,Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。
同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,Redis作为一个pub/sub的server,在订阅者和发布者之间起到了消息路由的功能。
测试说明
Redis 提供了消息发布(pub)与订阅(sub)功能。即一个 client 发布消息,其他多个 client 订阅消息。
需要注意的是 阿里Redis 发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。
此外,消息发布者(即 publish 客户端)无需独占与服务器端的连接,您可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如 List 操作等)。但是,消息订阅者(即 subscribe 客户端)需要独占与服务器端的连接,即进行 subscribe 期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用,消息发送端和订阅端是独立的不耦合的。
测试目的
我们的应用服务器和redis都是阿里提供的redis,我们这里采用内网访问redis。那是不是说外网不行呢,肯定是可以的,但是不建议这么做。因为都知道内网肯定快很多嘛。因此这里只针对内网进行测试。看一下在单线程情况下发布订阅10000次的时间消耗。当然实际上发布者何订阅者可以多个,并且消息的channel也可以是多个的。本地测试的目的很简单:一是加强对redis的了解和使用,二是能在项目开发中发挥redis作用
测试工具
使用java操作redis的api jedis 分别定义好发布者何订阅者,将代码部署到线上测试服进行测试。
5017cf8990fb4288.m.cnhza.kvstore.aliyuncs.com:6379> psubscribe fs
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "fs"
3) (integer) 1
1) "pmessage"
2) "fs"
3) "fs"
4) "im a fish1"
1) "pmessage"
2) "fs"
3) "fs"
4) "im a fish1"
发布者发布消息
5017cf8990fb4288.m.cnhza.kvstore.aliyuncs.com:6379> publish fs 'im a fish1'
(integer) 0
5017cf8990fb4288.m.cnhza.kvstore.aliyuncs.com:6379> publish fs2 'im a fish2'
(integer) 0
代码如下
发布者
package com.study.redis.pubsub;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
/**
-
-
@author wangkai
-
@2016年11月18日 下午5:20:42
-
@desc:消息发布者 (即 publish client)
*/
public class KVStorePubClient {
private Logger LOG = Logger.getLogger(KVStorePubClient.class.getName());
public KVStorePubClient(String host,int port,String password) {
LOG.info("消息发布端------");
jedis = new Jedis(host,port);
// KVStore的实例ID及密码
String authString = jedis.auth(password);// kvstore_instance_id:password
if (!authString.equals("OK")) {
LOG.error("AUTH Failed: " + authString);
return;
}
}
public void pub(String channel,String message) {
LOG.info(" >>> 发布(PUBLISH) > Channel:" + channel
- " > 发送出的Message:" + message);
jedis.publish(channel,message);
}
public void close(String channel) {
LOG.info(" >>> 发布(PUBLISH)结束 > Channel:" + channel
}
订阅者
package com.study.redis.pubsub;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
/**
-
- @author wangkai
- @2016年11月18日 下午5:36:40
- @desc:消息订阅者 (即 subscribe client)
*/
public class KVStoreSubClient extends Thread {
private Logger LOG = Logger.getLogger(this.getClass().getName());
private Jedis jedis;
private String channel;
private JedisPubSub listener;
public KVStoreSubClient(String host,String password){
LOG.info("消息订阅端--------");
jedis = new Jedis(host,port);
//ApSaraDB for Redis的实例ID及密码
String authString = jedis.auth(password);// kvstore_instance_id:password
if (!authString.equals("OK")) {
LOG.error("AUTH Failed: " + authString);
return;
}
}
public void setChannelAndListener(JedisPubSub listener,String channel){
this.listener=listener;
this.channel=channel;
}
private void subscribe(){
if(listener==null || channel==null){
LOG.error("Error:SubClient> listener or channel is null");
}
LOG.info(" >>> 订阅(SUBSCRIBE) > Channel:"+channel);
//接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅
jedis.subscribe(listener,channel);
}
// 主动取消订阅
public void unsubscribe(String channel){
LOG.info(" >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel);
listener.unsubscribe(channel);
}
@Override
public void run() {
try{
LOG.info("----------订阅消息SUBSCRIBE 开始-------");
subscribe();
LOG.info("----------订阅消息SUBSCRIBE 结束-------");
}catch(Exception e){
LOG.error("消息订阅失败",e);
unsubscribe(channel);// 异常发生时主动取消订阅
}
}
}
监听器
package com.study.redis.pubsub;
import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;
/**
-
-
@author wangkai
-
@2016年11月18日 下午5:36:18
-
@desc:消息监听者
*/
public class KVStoreMessageListener extends JedisPubSub {
private Logger LOG = Logger.getLogger(KVStoreMessageListener.class.getName());
//监听到订阅频道接受到消息时的回调
@Override
public void onMessage(String channel,String message) {
LOG.info(" <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
//当接收到的message为quit时,取消订阅(被动方式)
if(message.equalsIgnoreCase("quit")){
this.unsubscribe(channel);
}
}
// 监听到订阅模式接受到消息时的回调
@Override
public void onPMessage(String pattern,String channel,String message) {
// Todo Auto-generated method stub
}
// 订阅频道时的回调
@Override
public void onSubscribe(String channel,int subscribedChannels) {
// Todo Auto-generated method stub
// 取消订阅频道时的回调
@Override
public void onUnsubscribe(String channel,int subscribedChannels) {
// Todo Auto-generated method stub
}
// 取消订阅模式时的回调
@Override
public void onPUnsubscribe(String pattern,int subscribedChannels) {
// Todo Auto-generated method stub
}
// 订阅频道模式时的回调
@Override
public void onPSubscribe(String pattern,int subscribedChannels) {
// Todo Auto-generated method stub
}
}
测试入口
package com.study.redis.pubsub;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import redis.clients.jedis.JedisPubSub;
public class RedisApp {
private static Logger LOG = Logger.getLogger(Re<a href="https://www.jb51.cc/tag/dis/" target="_blank" class="keywords">dis</a>App.class.getName());
// Ap<a href="https://www.jb51.cc/tag/Sara/" target="_blank" class="keywords">Sara</a>DB for Re<a href="https://www.jb51.cc/tag/dis/" target="_blank" class="keywords">dis</a>的连接信息,从控制台可以获得
// 外网地址
// static final String host = "xxx.xxx.xxx.xxx";
static final String host = "*******.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password = "xxxxxxxxx";// password
public static void main(String[] args) throws Exception {
String log4jConfPath = "./log4j.properties";
PropertyCon<a href="https://www.jb51.cc/tag/fig/" target="_blank" class="keywords">fig</a>urator.con<a href="https://www.jb51.cc/tag/fig/" target="_blank" class="keywords">fig</a>ureAndWatch(log4jConfPath);
LOG.info("测试开始---------->>");
KVStorePubClient pubClient = new KVStorePubClient(host,port,password);
final String channel = "KVStore频道-A";
// 消息发送者开始<a href="https://www.jb51.cc/tag/faxiaoxi/" target="_blank" class="keywords">发消息</a>,此时还无人<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>,所以此消息不会被接收
pubClient.pub(channel,"Aliyun消息1:(此时还无人<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>,所以此消息不会被接收)");
// 消息接收者
KVStoreSubClient subClient = new KVStoreSubClient(host,password);
Je<a href="https://www.jb51.cc/tag/dis/" target="_blank" class="keywords">dis</a>PubSub listener = new KVStoreMessageListener();
subClient.setChannelAndListener(listener,channel);
// 消息接收者开始<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>
subClient.start();
// 从main入口<a href="https://www.jb51.cc/tag/huoqu/" target="_blank" class="keywords">获取</a>参数
Integer count = 100;
if (args != null && args.length != 0) {
String string = args[0];
count = Integer.valueOf(string);
}
// 消息发送者继续<a href="https://www.jb51.cc/tag/faxiaoxi/" target="_blank" class="keywords">发消息</a>
long start = Sy<a href="https://www.jb51.cc/tag/stem/" target="_blank" class="keywords">stem</a>.currentTimeMillis();
for (int i = 0; i < count; i++) {
String message = UUID.randomUUID().toString();
pubClient.pub(channel,message);
// Thread.sleep(1000);
}
long end = Sy<a href="https://www.jb51.cc/tag/stem/" target="_blank" class="keywords">stem</a>.currentTimeMillis();
LOG.info(count + "次发布<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>总共耗时: " + (end - start) + "ms");
LOG.info(count + "次发布<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>QPS: " + count/((end - start)/1000));
// 消息接收者主动取消<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>
subClient.unsubscribe(channel);
Thread.sleep(1000);
pubClient.pub(channel,"Aliyun消息2:(此时<a href="https://www.jb51.cc/tag/dingyue/" target="_blank" class="keywords">订阅</a>取消,所以此消息不会被接收)");
// 消息发布者结束发送,即发送<a href="https://www.jb51.cc/tag/yige/" target="_blank" class="keywords">一个</a>“quit”消息;
// 此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。
pubClient.close(channel);
}
}
测试结果
单线程单channel 10000次发布订阅时间消耗
27382 INFO 2016-11-29 10:39:40965 com.study.redis.pubsub.RedisApp 10000次发布订阅总共耗时: 27268ms
发布订阅的不足之处归纳
数据可靠性无法保证
一个redis-cli发布消息n个redis-cli接受消息。消息的发布是无状态的,即发布完消息后该redis-cli便在理会该消息是否被接受到,是否在传输过程中丢失,即对于发布者来说,消息是”即发即失”的.因此在使用时要多加考虑。
扩展性太差
不能通过增加消费者来加快消耗发布者的写入的数据,如果发布者发布的消息很多,则数据阻塞在通道中已等待被消费着来消耗。阻塞时间越久,数据丢失的风险越大(网络或者服务器的一个不稳定就会导致数据的丢失)。
资源消耗较高
在pub/sub中 消息发布者不需要独占一个Redis的链接,而消费者则需要单独占用一个Redis的链接,在Java中便不得独立出分出一个线程来处理消费者。这种场景一般对应这多个消费者,此时则有着过高的资源消耗。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。