如何解决MessageListenerContainer
嗨,我正在尝试从 mongo oplog 实现 MessageListener,它应该从上次停止的流文档中检索文档。 目前我的代码设置如下。不知道如何从最后一个文档中检索 resumetoken 并设置它,以便如果侦听器应用程序关闭并重新联机,它应该在最后一次读取后读取。
@Bean
MessageListenerContainer candidateMessageListenerContainer(MongoTemplate mongoTemplate,@Qualifier("candidateMessageListener") MessageListener documentMessageListener)
{
Executor executor = Executors.newSingleThreadExecutor();
MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate,executor)
{
@Override
public boolean isAutoStartup()
{
return true;
}
};
ChangeStreamRequest<Candidate> request = ChangeStreamRequest.builder(documentMessageListener)
.collection("candidate") // The name of the collection to listen to,Do not specify the default listening database
.filter(newAggregation(match(where("operationType").in("insert","update","replace","delete")))) // Filter the types of operations that need to be monitored,You can specify filter conditions according to your needs
.fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
// When not set,When the document is updated,Only information about the changed fields will be sent,Set up UPDATE_LOOKUP All information of the document will be returned
.build();
messageListenerContainer.register(request,Candidate.class);
return messageListenerContainer;
}
解决方法
您可以在 getResumeToken()
上使用 getTimestamp()
或 ChangeStreamEvent
方法来获取此信息。有关详细信息,请参阅 Reference Documentation - Resuming Change Streams。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。