微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

MessageListenerContainer

如何解决MessageListenerContainer

嗨,我正在尝试从 mongo oplog 实现 MessageListener,它应该从上次停止的流文档中检索文档。 目前我的代码设置如下。不知道如何从最后一个文档中检索 resumetoken 并设置它,以便如果侦听器应用程序关闭并重新联机,它应该在最后一次读取后读取。

    @Bean
    MessageListenerContainer candidateMessageListenerContainer(MongoTemplate mongoTemplate,@Qualifier("candidateMessageListener") MessageListener documentMessageListener)
    {
    Executor executor = Executors.newSingleThreadExecutor();
    MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate,executor)
    {
        @Override
        public boolean isAutoStartup()
        {
        return true;
        }
    };
    ChangeStreamRequest<Candidate> request = ChangeStreamRequest.builder(documentMessageListener)
                    .collection("candidate") // The name of the collection to listen to,Do not specify the default listening database
                    .filter(newAggregation(match(where("operationType").in("insert","update","replace","delete")))) // Filter the types of operations that need to be monitored,You can specify filter conditions according to your needs
                    .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
                    // When not set,When the document is updated,Only information about the changed fields will be sent,Set up UPDATE_LOOKUP All information of the document will be returned
                    .build();
    messageListenerContainer.register(request,Candidate.class);
    return messageListenerContainer;
    }

解决方法

您可以在 getResumeToken() 上使用 getTimestamp()ChangeStreamEvent 方法来获取此信息。有关详细信息,请参阅 Reference Documentation - Resuming Change Streams

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。