如何解决使用RocketMQ时,我通过异步发送消息,有时会抛出ConcurrentModificationException
我只是发送一个字符串,我不知道为什么会这样。 客户端版本:4.5.2 服务器版本:4.5.2
希望有人能帮助我!
private void sendMission(String pushId) {
try {
Message message = new Message();
message.setTopic(missionTpoic);
message.setBody(pushId.getBytes("utf-8"));
log.info("sendMission to MQ begin,message : {}",message);
rocketMQService.getProducer().send(message,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info( "======== onSuccess ========== {}",sendResult);
}
@Override
public void onException(Throwable e) {
log.error( "======== onException ==========",e);
}
},10000L);
log.info("sendMission to MQ end,message);
} catch (Exception e) {
throw new BusinessRuntimeException(BaseExceptionCode.SYstem_ERROR.getAdminCode(),e,"Occur a error when push message into mq.");
}
log.info("sendMissionDone!!! pushId : {}",pushId);
}
异常如下:
2021-03-26 16:06:13 {:} 错误 [AsyncSenderExecutor_1] c.p.b.p.c.SuperPushMissionExecutor:138 -- ======== onException ==========
java.util.ConcurrentModificationException: null
在 java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
在 java.util.HashMap$EntryIterator.next(HashMap.java:1479)
在 java.util.HashMap$EntryIterator.next(HashMap.java:1477)
在 org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String(MessageDecoder.java:387)
在 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:767)
在 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:557)
在 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.access$300(DefaultMQProducerImpl.java:90)
在 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$3.run(DefaultMQProducerImpl.java:491)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
在 java.util.concurrent.FutureTask.run(FutureTask.java:266)
在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
在 java.lang.Thread.run(Thread.java:748)
解决方法
我不知道问题是否解决了。看起来“rocketMQService.getProducer()”启动了一个threadPool并使用多线程发送相同的“Message”,而“message.properties”基于HashMap,当使用线程访问HashMap会抛出ConcurrentModificationException。
所以解决方案是检查“rocketMQService.getProducer()”上的代码,不要使用多线程发送相同的“消息”。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。