如何解决如何在PySpark中使用foreach来编写kafka主题?
我正在尝试通过foreach
插入在每一行上创建的精致日志,并希望将其存储到Kafka主题中,如下所示-
def refine(df):
log = df.value
event_logs = json.dumps(get_event_logs(log)) #A function to refine the row/log
pdf = pd.DataFrame({"value": event_logs},index=[0])
spark = SparkSession.builder.appName("myAPP").getorCreate()
df = spark.createDataFrame(pdf)
query = df.selectExpr("CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("topic","intest") \
.save()
query = streaming_df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") \
.writeStream \
.outputMode("append") \
.format("console") \
.foreach(refine)\
.start()
query.awaitTermination()
但是refine
函数在某种程度上无法获得我在提交代码时发送的Kafka软件包。我相信the子手无法访问通过以下命令发送的Kafka软件包-
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
所以,我的问题是如何将数据存储到foreach
内的Kafka中?另外,我想知道在foreach
内创建另一个会话是否是个好主意;我不得不在foreach
内重新声明会话,因为主驱动程序的退出会话无法在foreach中用于某些与可序列化有关的问题。
P.S:如果我尝试将其沉入...format("console")
内的控制台(foreach
)中,则它将正常工作。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。