微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

屏蔽来自Kafka流的数据

如何解决屏蔽来自Kafka流的数据

我正在使用Spark结构化流技术来从kafka流数据,这为我提供了具有以下架构的数据框

Column     Type
key        binary
value      binary
topic      string
partition  int
offset     long
timestamp  long
timestampType   int

Value Colum以二进制格式出现在这里,但是它实际上是具有struct类型的json字符串,并且要求读取json struct并屏蔽其中的一些字段并写入数据。

解决方法

您可以按照Structured Streaming + Kafka Integration Guide中给出的准则来了解如何将二进制值转换为字符串值。

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","host1:port1,host2:port2")
  .option("subscribe","topic1")
  .load()

df.selectExpr("CAST(value AS STRING)")
  .as[String]

然后,您可以根据实际的json结构定义模式,例如:

val schema: StructType = new StructType()
    .add("field1",StringType)
    .add("field2",ArrayType(new StructType()
      .add("f2",StringType)
      .add("f2",DoubleType)
    ))

然后使用from_json函数将允许您处理JSON字符串中的数据,请参考documentation,例如:

df.selectExpr("CAST(value AS STRING)")
  .select(from_json('json,schema).as("data"))

然后您可以通过使用withColumndrop等结构化API替换列来开始屏蔽。

如果您不想定义整个架构,则可以考虑使用get_json_object

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。