如何解决为什么这个风暴流的显式定义不起作用,而隐式定义呢?
给定一个使用 Stream API 的简单 Apache Storm Topology,有两种初始化流的方法:
版本 1 - 隐式声明
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new Intspout(),new ValueMapper<Integer>(0),1)
.filter(x -> x > 5)
.print();
结果:这按预期工作,它只打印大于 5 的整数。
版本 2 - 显式声明
Stream<Integer> integerStream = builder.newStream(new Intspout(),1);
integerStream.filter(x -> x > 5);
integerStream.print();
问题:为什么这个显式声明不能正常工作以及如何解决这个问题?
拓扑在本地集群上运行,其中 Intspout
只是一个简单的 spout,它使用以下命令发出随机整数:
StormTopology topo = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test",new HashMap<>(),topo);
解决方法
那是因为 integerStream.filter(x -> x > 5);
返回一个您忽略的新流。
这有效:
Stream<Integer> integerStream = builder.newStream(new IntSpout(),new ValueMapper<Integer>(0),1);
Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
filteredStream.print();
您的第一个示例中也存在语法错误。在第四行的末尾有一个额外的分号。
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(),1)
.filter(x -> x > 5) // <= there was a semicolon here
.print();
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。