微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法从Kafka接收数据到Spark流

如何解决无法从Kafka接收数据到Spark流

我正在尝试使用Eclipse IDE中的Java代码通过kafka生产者生成一些随机数据。我在kafka使用者中收到了相同的数据,这些数据也是在同一IDE中使用Java代码创建的。我的工作取决于流数据。因此,我需要火花流以接收由kafka生成随机数据。对于火花流,我在jupyter-notebook中使用python代码。要将kafka与spark集成,必须将“ spark-streaming-kafka-0-10_2.12-3.0.0.jar”文件添加到spark jar。我也试图在pyspark中添加jar文件。这是我的火花代码

import time
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
n_secs = 3
topic = "generate"
spark = SparkSession.builder.master("local[*]") \
        .appName("kafkaStreaming") \
        .config("/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/spark-streaming-kafka-0-10_2.12-3.0.0.jar") \
        .getorCreate()
sc = spark.sparkContext

ssc = StreamingContext(sc,n_secs)
kStream = KafkaUtils.createDirectStream(ssc,[topic],{
                        'bootstrap.servers':'localhost:9092','group.id':'test-group','auto.offset.reset':'latest'})
lines = kStream.map(lambda x: x[1])
words = lines.flatmap(lambda line: line.split(" "))
print(words)
ssc.start()
time.sleep(100)
ssc.stop(stopSparkContext=True,stopGraceFully=True)

在上面的代码中,我使用SparkSession.config()方法添加了jar文件。创建DStream之后,我试图通过提供主题名称,引导服务器等,使用KafkaUtils.createDirectStream()从kafka接收数据。之后,我将数据转换为rdd并打印结果。这是我工作的整体流程。最初,我在Java中执行kafka生产者代码,它生成一些数据并由kafka消费者使用。到目前为止,它工作正常。在python中执行Spark Streaming代码时,它会显示出这样的错误

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1159,in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 985,in send_command
    response = connection.send_command(command)
  File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1164,in send_command
    "Error while receiving",e,proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving

Py4JError                                 Traceback (most recent call last)
<ipython-input-17-873ece723182> in <module>
     36                         'bootstrap.servers':'localhost:9092',37                         'group.id':'test-group',---> 38                         'auto.offset.reset':'latest'})
     39 
     40 lines = kStream.map(lambda x: x[1])

~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/streaming/kafka.py in createDirectStream(ssc,topics,kafkaParams,fromOffsets,keyDecoder,valueDecoder,messageHandler)
    144             func = funcWithoutMessageHandler
    145             jstream = helper.createDirectStreamWithoutMessageHandler(
--> 146                 ssc._jssc,set(topics),jfromOffsets)
    147         else:
    148             ser = AutoBatchedSerializer(PickleSerializer())

~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self,*args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer,self.gateway_client,self.target_id,self.name)
   1258 
   1259         for temp_arg in temp_args:

~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a,**kw)
     61     def deco(*a,**kw):
     62         try:
---> 63             return f(*a,**kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer,gateway_client,target_id,name)
    334             raise Py4JError(
    335                 "An error occurred while calling {0}{1}{2}".
--> 336                 format(target_id,".",name))
    337     else:
    338         type = answer[1]

Py4JError: An error occurred while calling o270.createDirectStreamWithoutMessageHandler

请任何人帮助我摆脱这个问题...

解决方法

从代码本身可以看到几件事:

  • 您的jar工件是用于spark 3.0的,并且您使用的是spark版本2.4.6。(提示:文件名的最后3位是spark版本)
  • 您已在配置选项中添加了jar文件。我建议首先通过在spark-submit命令中将其用作--jar <jar-file-path> 来验证您正在使用的jar文件。
  • 尝试先打印直接流,而不是对其进行各种转换。您可以这样做:
kStream = KafkaUtils.createDirectStream(ssc,[topic],{
                        'bootstrap.servers':'localhost:9092','group.id':'test-group','auto.offset.reset':'latest'})
kStream.pprint()
ssc.start()
# stream will run for 50 sec
ssc.awaitTerminationOrTimeout(50)
ssc.stop()
sc.stop()
  • 一旦您确认要获取数据,就可以使用foreachRDD,transform或其他API来处理数据

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。