微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在Spark Streaming中合并两个流?

如何解决如何在Spark Streaming中合并两个流?

我正在尝试结合两个流媒体火花。主要流是由多个参数值组成的数据记录,另一个流是模型上下文。因此,在加入这两个流之后,我们可以使用模型上下文来计算新的单个值。请注意,在我的场景中,主流是连续数据记录,每秒钟来自源,而另一流是事件触发的流。这意味着只要用户通过网络单击,模型上下文就会进入流中。

我可以通过join方法来实现自己的方案。但是,我想做的是我想在火花流中的各个小批处理中保持模型上下文。 join方法有效,但是直到下一批开始执行。

我已经搜索了这个问题,发现mapWithState是我的案例的解决方案,我使用命令对其进行了编码,但似乎效果不佳。

假定所有数据都是从kafka主题接收的,如下所示。

下面是我的代码

JavaStreamingContext streamingContext = SparkConfig.initSparkConfig();

//two different topic
Map<String,Object> kafkaParams = KafkaConfig.createKafkaConfig();
        Collection<String> tracetopic = Arrays.asList("trace_test");
        Collection<String> modelTopic = Arrays.asList("model_test");


        //two different streams.
        JavaInputDStream<ConsumerRecord<String,String>> traceStream = KafkaUtils.createDirectStream(
                streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.<String,String>Subscribe(tracetopic,kafkaParams));

        JavaInputDStream<ConsumerRecord<String,String>> modelStream = KafkaUtils.createDirectStream(
                streamingContext,String>Subscribe(modelTopic,kafkaParams));


        //transformation
        JavaPairDStream<String,String> tracePairstream = traceStream.mapToPair(new ConsumerRecordToTupleString());
        JavaPairDStream<String,String> modelPairstream = modelStream.mapToPair(new ConsumerRecordToTupleString());


        //Applying mapWithState
        StateSpec<String,String,Tuple2<String,String>> stateSpec = StateSpec.function(modelUpdateFunc).timeout(Durations.seconds(3600));

      
        JavaMapWithStateDStream<String,String>> modelStateStream = modelPairstream.mapWithState(stateSpec);


        JavaPairDStream<String,String> modelPairWithStateStream = modelStateStream.mapToPair(new PairFunction<Tuple2<String,String>,String>() {
            @Override
            public Tuple2<String,String> call(Tuple2<String,String> stringStringTuple2) throws Exception {
                return new Tuple2<>(stringStringTuple2._1,stringStringTuple2._2);
            }
        });


        //joining two streams
        JavaPairDStream<String,String>> traceModelStream = modelPairWithStateStream.join(tracePairstream);
        traceModelStream.print();

但是,当我在模型流中打印上下文时,仅当我通过Kafka生成数据时才打印第一批的值。我希望它显示批次之间的模型上下文,并在控制台中为每个批次打印模型上下文,但这没用。

如果您能帮助我,我非常感谢:)

谢谢。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。