微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 Scala 将字符串作为 json 沉入 flink kinesis 流中?

如何解决如何使用 Scala 将字符串作为 json 沉入 flink kinesis 流中?

我们如何将 pojo 变量作为 json 生成到 kinesis flink 流中:

val inputStream: DataStream[Array[Byte]] = env.addSource {
  loadConsumerOrFail(config,jobName)
}
inputStream.print()
val transformedStream: DataStream[String] = inputStream.map { jsonstr =>
  val sJson = JsonMethods.parse((jsonstr.map(_.tochar)).mkString)
  val payloadJsonValue = sJson \ "line"

  implicit val formats = DefaultFormats
  val payvalue = JsonMethods.compact(JsonMethods.render(payloadJsonValue)).replace("\"","")
  val payloadBytes = base64Decoder.decode(payvalue)

  val collectorPayload  = new CollectorPayload
  thriftDeserializer.deserialize(collectorPayload,payloadBytes)

  badStream(collectorPayload.ipAddress,collectorPayload.userAgent,collectorPayload.timestamp,collectorPayload.refererUri,collectorPayload.hostname,(sJson \ "failure_tstamp").extract[String],collectorPayload.body,collectorPayload.toString)

}
transformedStream.addSink(loadProducerOrFail(config,jobName))

这里将transformedStream沉入另一个kinesis但作为json但如何转换为json

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。