如何从Java程序创建对ibm mq amqp主题的持久订阅?

如何解决如何从Java程序创建对ibm mq amqp主题的持久订阅?

我们通过提供createDurableSubscriber和订户名称,以编程方式创建了clientId的IBM MQ AMQP TOPIC订户。

我们启动了程序,因此它订阅了TOPIC并停止了该程序。然后将消息发送到主题,然后再次启动接收器程序,但是我们无法接收发送的消息并释放消息,这对于持久订阅是不应该发生的。

当使用mqsc命令DISPLAY TOPICDISPLAY TPSTATUSDISPLAY TPSTATUS SUBDISPLAY SUB SUBID连接订户时,但未看到订户程序停止时,我们可以看到amqp主题及其持久订阅。我们已经定义了属性DEFPSIST(YES),并且客户端(生产者到主题)正在发送持久消息。

消息消失了,因为我们无法在订阅者的持久队列中看到消息?是否取决于到期属性?

DISPLAY SUB SUBID为订户连接后的输出。

AMQ8096: WebSphere MQ subscription inquired.


SUBID("hex sub id")
   SUB(:private:CLINET01:TOPIC01)            TOPICSTR(TOPIC01)
   TOPICOBJ(SYSTEM.BASE.TOPIC)             DISTYPE(RESOLVED)
   DEST(SYSTEM.MANAGED.DURABLE.5F6B5C2524FB9AED)
   DESTQMGR(qm.name)                   PUBAPPID( )
   SELECTOR( )                             SELTYPE(NONE)
   USERDATA(010)
   PUBACCT(***************************************************)
   DESTCORL(***************************************************)
   DESTCLAS(MANAGED)                       DURABLE(YES)
   EXPIRY(0)                               PSPROP(MSGPROP)
   PUBPRTY(ASPUB)                          REQONLY(NO)
   SUBSCOPE(ALL)                           SUBLEVEL(1)
   SUBTYPE(API)                            VARUSER(FIXED)
   WSCHEMA(TOPIC)                          SUBUSER(mqm)
   CRDATE(2020-09-28)                      CRTIME(04:14:09)
   ALTDATE(2020-09-28)                     ALTTIME(04:14:09)

订户ID具有私有(不确定原因)和客户端ID,但没有订户名称Sub4。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.Queue;

import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.String;
import javax.jms.Destination;
import javax.naming.Context;
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.DeliveryMode;
import javax.naming.InitialContext;
import javax.jms.Message;

public class AMQPQueueExample1 implements Runnable  {
private static final int DELIVERY_MODE = DeliveryMode.PERSISTENT;

public void run(){
try{
 Connection connection = null;
 Context context = new InitialContext();
 ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("myFactoryLookup");
 connection = connectionFactory.createConnection();
 connection.setClientID("123");//("WHATS_MY_PURPOSE3"); // Why do we need clientID while publishing the TOPIC from consumer / publisher
 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
 Topic priceTopic = (Topic) context.lookup("myTopicLookup1");
 MessageConsumer subscriber1 = session.createDurableSubscriber(priceTopic,"sub420"); //"sub3");
System.out.println("TOPIC "+priceTopic);

connection.start();
while(true){
TextMessage   message1 = (TextMessage) subscriber1.receive(1000);
if(message1!=null)
           System.out.println("Subscriber 1 received : " + message1.getText());


}
}catch(Exception e){
e.printStackTrace();
}
}

 public static void main(String[] args)  {

AMQPQueueExample1 amp=new AMQPQueueExample1();
 Thread thread = new Thread(amp);
thread.start();


 }
}

值从jndi.properties文件中获取上下文工厂和提供者URL。

解决方法

从注释中看起来好像您正在使用MQ 8.0.0.5?如果是这种情况,则该版本的MQ不支持Apache Qpid JMS客户端。我相信使用该版本可以实现非常基本的非持久订阅 ,但是其他任何JMS方法都不太可能。

我怀疑那个版本的MQ无法完全理解Qpid JMS的AMQP 1.0流,因此订阅的有效期设置为0,而不是无限。

MQ 9.2增加了对更多JMS 2.0规范的支持-尽管不是每个JMS功能。这里提供了有关所支持方法的更多信息:

https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.2.0/com.ibm.mq.dev.doc/q125050_.htm

创建持久的订户和/或消费者应该可以按预期工作。

,

马修·怀特海德(Matthew Whitehead)“ MQ Light messaging from Microsoft®.NET™ (Part 4)”的一篇文章指出:

AMQP频道不支持为MQ Light订阅设置无限的到期时间。虽然可以创建具有很长生存期的订阅,但无法创建永久存在的订阅。

如果您希望创建永不过期的订阅,则可以通过创建MQ管理的订阅并让MQ Light客户端加入并退出订阅来实现。这也有助于确保在第一个订阅者连接之前发布到某个主题的任何消息都不会完全丢失。阅读my previous article,了解如何将MQ Light客户端加入托管订阅。

AMQP相关字段

为提供上述到期功能,MQ Light使用AMQP 1.0的2个功能:

  • 源超时
  • 源有效期政策

源超时用于指定订阅将过期的时间(以秒为单位)。

源到期策略用于确定导致到期计时器开始的原因。 MQ AMQP通道仅支持链接分离的过期策略,这意味着计时器将在最后一个链接从预订分离后立即启动。


我搜索了有关如何在Apache QPID中设置“源超时”或“源过期策略”的参考,但是链接的博客参考通过管理定义的订阅来设置过期。根据您问题中的信息,我认为您可以提前定义类似的内容。我没有指定EXPIRY,因为这将从EXPIRY(UNLIMITED)中提取SYSTEM.DEFAULT.SUB

DEFINE SUB(':private:CLINET01:TOPIC01') TOPICOBJ(SYSTEM.BASE.TOPIC) TOPICSTR('TOPIC01') DESTCLAS(MANAGED)

然后,当您连接AMQP订户时,它将恢复该现有的订户,并将到期时间设置为UNLIMITED

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res