微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用PySpark进行Spark流式处理

如何解决使用PySpark进行Spark流式处理

我有一个Spark Streaming应用程序(pyspark),在其中我从后端数据中心监视应用程序通过Kafka端点接收消息(警报),如下所示。

时间T:{操作:“插入”,alarmId:.....,alarmName:.....,alarmType:.....,///} 时间T + 1:{操作:“更新”,与上面相同...} 时间T + 2:/// .... 时间T + n:{操作:“删除”,与上面相同...}

肯定会插入和删除,而更新则发生0 ..许多次。

如果有新警报,它首先以“插入”消息的形式出现,然后随后的“更新”消息也将在“插入”之后到达相同警报ID的流中,以提供同一警报的更新。清除警报后,删除就会到达。

我的业务问题是,每当有新警报消息“插入”到达时,我都需要检查以下“更新”,以及是否在下一个 Y 分钟,我需要创建一个票证(新消息)并将其自动发送到另一个工作流应用程序(使用REST API)。

我的问题是,如果要通过Spark Streaming解决此问题,应在此处应用哪种类型的窗口?如何确保此资格认证流程运行稳定?例如我不希望在X + 1次之后再次提出票证,因为它在X次发生时已经被提出。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。