微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark流数据集

如何解决Spark流数据集

我对Databricks来说还比较陌生,正在尝试读取传入的传感器数据并根据每组数据触发一个规则集。寻找有关进一步操作的帮助和指导

val connectionString = ConnectionStringBuilder("ConnectionString")   
    .setEventHubName("EventHubname")
    .build

  val eventHubsConf = EventHubsConf(connectionString)
    .setStartingPosition(EventPosition.fromEndOfStream)
    .setConsumerGroup("azuredatabricks")

  val eventhubs = spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load() 

 /*
  val df = eventhubs.select(($"enqueuedTime").as("Enqueued_Time"),($"systemProperties.iothub- 
           connection-device-id") .as("Device_ID"),($"body".cast("string")).as("telemetry_json")) 

  df.createOrReplaceTempView("tel_table")
*/

上面的代码从EventHub读取传入的流数据,并将数据也加载到数据帧中(代码的注释部分)。现在,我需要遍历数据的每一行,并将其传递给规则集。由于它是流数据,因此spark sql会引发错误。有人可以指导如何依次读取这些流数据。

我一直在寻求帮助的下一部分是如何从消息正文中推断模式。由于这些是传感器数据,因此架构不会是静态的。一个消息可以具有deviceid,Temperature,pH,其他则可以是Device,Voltage,Current。对于这种情况,有一种方法可以在Apache Spark中动态推断模式。

非常感谢您的帮助。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。