微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在PySpark中使用foreach来编写kafka主题?

如何解决如何在PySpark中使用foreach来编写kafka主题?

我正在尝试通过foreach插入在每一行上创建的精致日志,并希望将其存储到Kafka主题中,如下所示-

def refine(df):
    log = df.value
    event_logs = json.dumps(get_event_logs(log)) #A function to refine the row/log
    pdf = pd.DataFrame({"value": event_logs},index=[0])

    spark = SparkSession.builder.appName("myAPP").getorCreate() 
    df = spark.createDataFrame(pdf)

    query = df.selectExpr("CAST(value AS STRING)") \
       .write \
       .format("kafka") \
       .option("kafka.bootstrap.servers","localhost:9092") \
       .option("topic","intest") \
       .save()

我正在使用以下代码进行调用

query = streaming_df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")  \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .foreach(refine)\
    .start()
query.awaitTermination()

但是refine函数在某种程度上无法获得我在提交代码时发送的Kafka软件包。我相信the子手无法访问通过以下命令发送的Kafka软件包-

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...

因为提交代码时,我收到以下错误消息,

pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".; 

所以,我的问题是如何将数据存储到foreach内的Kafka中?另外,我想知道在foreach内创建另一个会话是否是个好主意;我不得不在foreach内重新声明会话,因为主驱动程序的退出会话无法在foreach中用于某些与可序列化有关的问题。

P.S:如果我尝试将其沉入...format("console")内的控制台(foreach)中,则它将正常工作。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。