微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java – Disruptor – 未调用EventHandlers

我正在玩 Disruptor框架,并且发现我的事件处理程序没有被调用.

这是我的设置代码

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializedisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY,EXECUTOR,new SingleThreadedClaimstrategy(BUFFER_SIZE),new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

在其他地方,我发布了一些事件.我尝试过以下两种方法

事件发布方法A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

在这种情况下,我发现第一个EventHandler被调用,但除此之外从未发生任何事情.

事件发布方法B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateto(
                TwitterStatusReceivedEvent event,long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

在这种情况下,我发现根本没有调用任何事件处理程序.

我究竟做错了什么?

更新

这是我的EventHandler.我应该如何表示处理已完成?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event,long sequence,boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}

解决方法

每个事件处理程序都需要在自己的线程中运行,该线程在关闭破坏程序之前不会退出.由于您使用的是单线程执行程序,因此只会执行恰好执行的第一个事件处理程序. (disruptor类将每个处理程序存储在一个hashmap中,以便处理程序最终运行会有所不同)

如果你切换到cachedThreadPool,你会发现它都开始运行.您不需要对序列号进行任何管理,因为这些都是由disruptor类为您设置和管理的EventProcessor处理的.只处理你得到的每个事件是完全正确的.

原文地址:https://www.jb51.cc/java/120554.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐