微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

状态是否也会在事件超时时通过 Spark 结构化流删除?

如何解决状态是否也会在事件超时时通过 Spark 结构化流删除?


问。状态是否超时并同时被删除
要么
只有状态超时,ProcessingTimeout 和 EventTimeout 的状态仍然保持?

我正在对 mapGroupsWithState/flatmapGroupsWithState 进行一些实验,并且对状态超时有些困惑。

考虑到我正在维护一个带有 10 秒水印的状态并根据事件时间应用超时说:

ds.withWatermark("timestamp","10 seconds")
  .groupByKey(...)
  .mapGroupsWithState(
    GroupStateTimeout.EventTimeTimeout)( //event timed out
    ...)(my_mapping_function)

在我的映射函数中说

我正在根据状态的存在执行一些操作。
我正在检查它:

//Considering it my_mapping_function for mapGroupsWithState/flatmapGroupsWithState

if(state.hasTimeout){
  println("State has timedout")
  state.remove()
}
else 
{
   val newState = state.getoption match {
                  case Some(s) => 
                               ....//some operations
                  case _ =>
                            println("no state")
                            ..return some state
    
    state.update(newState)

    //set the timeout,Does state also gets removed automatically when state has timed out?
    state.setTimeoutTimestamp(state.getCurrentWatermarkMs,"10 seconds")

}

现在考虑一个水印设置为(10秒)的例子:
带有 ts 12 seconds
的传入数据 (data1) 带有 ts 20 seconds
的传入数据 (data1) 所以水印到这里将是 (20-10) = 10 秒

带有 ts 12 seconds 的传入数据 (data2)
(data2) 的状态将在 20 seconds 超时
(如 10 秒(水印时间)+ 10 秒(我们设置了额外的超时时间)

所以如果传入的数据(data1)带有 ts 20 seconds
lly,传入数据 (data1) 与 ts 30 seconds
lly,传入数据 (data1) 与 ts 40 seconds
到这里,水印现在是20秒。 (40-10)

所以 data2 的状态是超时,因为最后一个数据长达 12 秒

问。当 data2 的状态超时时,状态只是超时还是状态被删除


因为它没有打印 println("State has timedout")
它打印了println("no state")

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。