如何解决屏蔽来自Kafka流的数据
我正在使用Spark结构化流技术来从kafka流数据,这为我提供了具有以下架构的数据框
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int
Value Colum以二进制格式出现在这里,但是它实际上是具有struct类型的json字符串,并且要求读取json struct并屏蔽其中的一些字段并写入数据。
解决方法
您可以按照Structured Streaming + Kafka Integration Guide中给出的准则来了解如何将二进制值转换为字符串值。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1")
.load()
df.selectExpr("CAST(value AS STRING)")
.as[String]
然后,您可以根据实际的json结构定义模式,例如:
val schema: StructType = new StructType()
.add("field1",StringType)
.add("field2",ArrayType(new StructType()
.add("f2",StringType)
.add("f2",DoubleType)
))
然后使用from_json
函数将允许您处理JSON字符串中的数据,请参考documentation,例如:
df.selectExpr("CAST(value AS STRING)")
.select(from_json('json,schema).as("data"))
然后您可以通过使用withColumn
和drop
等结构化API替换列来开始屏蔽。
如果您不想定义整个架构,则可以考虑使用get_json_object
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。