微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么这个风暴流的显式定义不起作用,而隐式定义呢?

如何解决为什么这个风暴流的显式定义不起作用,而隐式定义呢?

给定一个使用 Stream API 的简单 Apache Storm Topology,有两种初始化流的方法


版本 1 - 隐式声明

    StreamBuilder builder = new StreamBuilder();
    builder
        .newStream(new Intspout(),new ValueMapper<Integer>(0),1)
        .filter(x -> x > 5)
        .print();

结果:这按预期工作,它只打印大于 5 的整数。


版本 2 - 显式声明

    Stream<Integer> integerStream = builder.newStream(new Intspout(),1);
    integerStream.filter(x -> x > 5);
    integerStream.print();

结果:这不起作用 - 所有元组都被打印出来,包括整数


问题:为什么这个显式声明不能正常工作以及如何解决这个问题?


拓扑在本地集群上运行,其中 Intspout 只是一个简单的 spout,它使用以下命令发出随机整数

    StormTopology topo = builder.build();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test",new HashMap<>(),topo);

解决方法

那是因为 integerStream.filter(x -> x > 5); 返回一个您忽略的新流。

这有效:

    Stream<Integer> integerStream = builder.newStream(new IntSpout(),new ValueMapper<Integer>(0),1);
    Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
    filteredStream.print();

您的第一个示例中也存在语法错误。在第四行的末尾有一个额外的分号。

StreamBuilder builder = new StreamBuilder();
builder
    .newStream(new IntSpout(),1)
    .filter(x -> x > 5) // <= there was a semicolon here 
    .print();

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。