我已经开始使用风暴,所以我使用
this tutorial创建简单的拓扑
当我使用LocalCluster运行我的拓扑结构,似乎都很好,
我的问题是我没有在元组上得到ACK,这意味着我的spout ack不会被调用.
我的代码在下面 – 你知道为什么不叫Ack吗?
所以我的拓扑看起来像这样
public StormTopology build() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(HelloWorldSpout.class.getSimpleName(),helloWorldSpout,spoutParallelism); HelloWorldBolt bolt = new HelloWorldBolt(); builder.setBolt(HelloWorldBolt.class.getSimpleName(),bolt,boltParallelism) .shuffleGrouping(HelloWorldSpout.class.getSimpleName()); }
我的水嘴看起来像这样
public class HelloWorldSpout extends BaseRichSpout implements ISpout { private SpoutOutputCollector collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("int")); } public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) { this.collector = collector; } private static Boolean flag = false; public void nextTuple() { Utils.sleep(5000); //emit only 1 tuple - for testing if (!flag){ this.collector.emit(new Values(6)); flag = true; } } @Override public void ack(Object msgId) { System.out.println("[HelloWorldSpout] ack on msgId" + msgId); } public void fail(Object msgId){ System.out.println("[HelloWorldSpout] fail on msgId" + msgId); } }
我的螺栓看起来像这样
@SuppressWarnings("serial") public class HelloWorldBolt extends BaseRichBolt{ private OutputCollector collector; public void prepare(Map conf,OutputCollector collector) { this.collector = collector; logger.info("preparing HelloWorldBolt"); } public void execute(Tuple tuple) { System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0)); this.collector.ack(tuple); } public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } }
解决方法
您的喷嘴中的emit()方法只有一个参数,因此元组不被锚定.这就是为什么你没有得到一个回调到ack()方法在喷口,即使你是在螺栓中的元组.
要使其正常工作,您需要修改您的喷口以发出第二个参数,即消息ID.正是这个id被传回给spout中的ack()方法:
public void nextTuple() { Utils.sleep(5000); //emit only 1 tuple - for testing if (!flag){ Object msgId = "ID 6"; // this can be any object this.collector.emit(new Values(6),msgId); flag = true; } } @Override public void ack(Object msgId) { // msgId should be "ID 6" System.out.println("[HelloWorldSpout] ack on msgId" + msgId); }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。