我正在尝试编写一个执行以下操作的拓扑:
>订阅一个twitter feed(基于关键字)
>一个聚合螺栓,用于聚合收集中的一些tweets(例如N),并将它们发送给打印机螺栓
>一个简单的螺栓,将集合一次打印到控制台.
在现实中,我想对收藏进行一些更多的处理.
我在本地测试,看起来像是在工作.但是,我不知道我是否正确设置了螺栓上的分组,并且在部署在实际的风暴集群上时可以正常工作.如果有人可以帮助您查看此拓扑并提出任何错误,更改或改进,我将不胜感激.
谢谢.
这是我的拓扑结构.
builder.setSpout("spout",new TwitterFilterSpout("pittsburgh")); builder.setBolt("sampleaggregate",new SampleAggregatorBolt()) .shuffleGrouping("spout"); builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
聚合螺栓
public class SampleAggregatorBolt implements IRichBolt { protected OutputCollector collector; protected Tuple currentTuple; protected Logger log; /** * Holds the messages in the bolt till you are ready to send them out */ protected List<Status> statusCache; @Override public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) { this.collector = collector; log = Logger.getLogger(getClass().getName()); statusCache = new ArrayList<Status>(); } @Override public void execute(Tuple tuple) { currentTuple = tuple; Status currentStatus = null; try { currentStatus = (Status) tuple.getValue(0); } catch (ClassCastException e) { } if (currentStatus != null) { //add it to the status cache statusCache.add(currentStatus); collector.ack(tuple); //check the size of the status cache and pass it to the next stage if you have enough messages to emit if (statusCache.size() > 10) { collector.emit(new Values(statusCache)); } } } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweets")); } @Override public Map<String,Object> getComponentConfiguration() { return null; //To change body of implemented methods use File | Settings | File Templates. } protected void setupNonSerializableAttributes() { } }
打印机螺栓
public class PrinterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple,BasicOutputCollector collector) { System.out.println(tuple.size() + " " + tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } }
解决方法
从我可以看到它看起来不错.魔鬼的细节虽然如此.我不知道你的聚合器螺栓是什么,但是如果对传递给它的值做出任何假设,那么你应该考虑一个适当的字段分组.当您使用默认的并行性提示1时,这可能不会有太大的差异,但是如果您决定使用多个聚合螺栓实例进行缩放隐式逻辑假设,则可能需要非随机分组.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。