如何解决Spark Streaming 丰富数据帧模式问题
我的主要目的是:
- 来自 kafka 主题的 ReadStream
- 用Redis丰富并“添加新列”
- WriteStream 到 kafkatopic“带有新列”
经过一些问答后,我的代码如下所示,主要问题是在控制台输出中看不到添加的列。那么,如何才能真正将这个Redis查找到的数据添加到writestream部分内的数据集中???
非常感谢任何帮助,谢谢
注意: 尝试过 spark.sqlContext.createDataFrame... 但这给出了错误: 必须使用 writestream.start() 执行对流源的查询
val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","aa.bbb.ccc.yyy:9092")
.option("subscribe","my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets","earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema,Map.empty[String,String])
.alias("C"))
//***
//some extra operations
//***
val query =
my_raw_events_df
.writeStream
.foreachBatch((dataset,batchId) =>
{
dataset.foreachPartition(rows =>
{
val redisConn =new RedisClient("172.xx.xx.xx",6379,Option("*******"))
rows.foreach(row =>
{
var redisTarget = "SomeParameterKey"
var valueFromRedis = redisConn.get(redisTarget).getOrElse("")
Row.fromSeq(row.toSeq ++ Array[Any](valueFromRedis))
})
redisConn.close
})
dataset
.select("*")
.write
.format("console")
.save()
})
.trigger(Trigger.ProcessingTime("5 seconds"))
.outputMode("append")
.start()
query.awaitTermination()
query.stop()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。