微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

foreachRDD 在 Twitter API 的 J8 Spark Streaming 中为每个 RDD 提取平均单词和字符数

如何解决foreachRDD 在 Twitter API 的 J8 Spark Streaming 中为每个 RDD 提取平均单词和字符数

我正在尝试使用 Java 8 中的 spark 获取从 Twitter API 提取的每个 RDD 中的平均单词和字符数。但是,我无法使用流来实现这一点。我的代码如下:

//Create the stream.
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Outputs the text of tweets to a JavaDStream.
JavaDStream<String> statuses = twitterStream.map(Status::getText);
//Get the average number of words & characters in each RDD pulled during streaming.
statuses.foreachRDD(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            System.out.println(wc / c);
            System.out.println(cc / c);
        return avgWc,avgCc;});

我得到的错误foreachRDD 的预期返回类型是无效的,而我的返回是长格式。

我该如何解决这个问题?我需要另一种方法解决这个问题吗?

解决方法

一个可能的解决方案是使用 JavaDStream.transform。此函数允许留在 SparkStreaming-API 中:

JavaDStream<String> statuses = ...
JavaDStream<Tuple2<Long,Long>> avgs = statuses.transform(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            //System.out.println(wc / c);
            //System.out.println(cc / c);
            return jssc.sparkContext().parallelize(Collections.singletonList(Tuple2.apply(avgWc,avgCc)));
        }
);
avgs.print();
,

如果返回类型为 void,则无法返回数据。 您可以在“foreachRDD”函数之外创建一个列表并传递如下所示的值:

List<Data> listData=new ArrayList();
statuses.foreachRDD(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            System.out.println(wc / c);
            System.out.println(cc / c);
            Data data=new Data();
            data.setAvgCc(avgCc);
            data.setAvgWc(avgWc);
            listData.add(data);
        });

Data 是一个包含两个变量 avgCc 和 AvgWc 的类,如下图

public class Data {
    long avgWc;
    long avgCc;
    public long getAvgWc() {
        return avgWc;
    }
    public void setAvgWc(long avgWc) {
        this.avgWc = avgWc;
    }
    public long getAvgCc() {
        return avgCc;
    }
    public void setAvgCc(long avgCc) {
        this.avgCc = avgCc;
    }
    public Data(long avgWc,long avgCc) {
        super();
        this.avgWc = avgWc;
        this.avgCc = avgCc;
    }
    public Data() {
        super();
    }
}

如果这有帮助,请告诉我。或者您需要更多说明。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。