微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

当我提交 jar 文件时,来自 Kafka 的数据没有打印在控制台中 Spark 流 + Kafka 集成 3.1.1

如何解决当我提交 jar 文件时,来自 Kafka 的数据没有打印在控制台中 Spark 流 + Kafka 集成 3.1.1

我提交jar文件时没有错误

但是当我使用 HTTP 协议发送数据时,没有打印数据。

(当我使用“kafka-console-consumer.sh”检查时数据打印良好)

[图片,提交了一个jar文件:数据没有打印]

jar 文件中的代码和依赖项如下。

enter image description here

[图片,Kafka-console-consumer.sh:打印数据]

命令:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group test-consumer --topic test01 --from-beginning

enter image description here

[JAVA 文件]

2-1,依赖

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>3.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.1.1</version>
    </dependency>
</dependencies>

2-2,代码

package SparkTest.SparkStreaming;

import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;


public final class JavaWordCount {
    public static void main(String[] args) throws Exception {
        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster("yarn").setAppName("JavaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1));
        
        // load a topic from broker
        Map<String,Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers","localhost:9092");
        kafkaParams.put("key.deserializer",StringDeserializer.class);
        kafkaParams.put("value.deserializer",StringDeserializer.class);
        kafkaParams.put("group.id","test-consumer");
        kafkaParams.put("auto.offset.reset","latest");
        kafkaParams.put("enable.auto.commit",false);

        Collection<String> topics = Arrays.asList("test01");

        JavaInputDStream<ConsumerRecord<String,String>> stream =
          KafkaUtils.createDirectStream(
            jssc,LocationStrategies.Preferbrokers(),ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams)
          );
        
        JavaDStream<String> data = stream.map(v -> {
            return v.value();    // mapping to convert into spark D-Stream 
        });
      
        data.print();
        
        jssc.start();
        jssc.awaitTermination();
    }
}

解决方法

您在控制台使用者中使用 --from-beginning,但在 Spark 代码中使用 auto.offset.reset=latest

因此,如果您想查看任何数据,则需要运行生产者 而 Spark 运行

您还需要考虑使用 spark-sql-kafka-0-10 Structured Streaming 依赖项,as you can find in the KafkaWordCount example

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。