如何解决Apache Beam 中具有键值状态的状态处理
我正在尝试使用 Apache Beam 实现一个有状态的进程。我已经阅读了 Kenneth KNowles 的两篇文章(Stateful processing with Apache Beam 和 Timely (and Stateful) Processing with Apache Beam),但是我没有找到解决我的问题的方法。我使用的是 Python SDK。
特别是,我试图拥有一个包含键值对象的有状态 DoFn,我需要添加新元素,有时还需要删除一些元素。
我在 DoFn 类中看到 a solution may be to use a SetStateSpec 和 Tuple coder。问题是 SetSpaceSpec 没有类似“pop”的功能选项。在我看来,删除元素的唯一方法是使用 .clear()
将它们全部删除。
看起来您不能仅指定要使用此功能擦除的元素。
克服这个问题的一个机会可能是在我需要删除状态中的元素时清除和重写状态,但这对我来说看起来效率很低。
你知道如何有效地做到这一点吗?
Python 版本 3.8.7
阿帕奇光束==2.29.0
解决方法
我遵循了@TudorPlugaru 的建议,并提出了这个建议。希望对其他人有用。
import json
from apache_beam.coders import Coder
class MyDictCoder(Coder):
""" My custom dictionary coders """
def encode(self,o):
return json.dumps(o).encode()
def decode(self,o):
return json.loads(o.decode())
def is_deterministic(self) -> bool:
return True
在 DoFn 声明中
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
class MyDoFn(beam.DoFn):
DICTSTATE= ReadModifyWriteStateSpec(name='dictstate',coder=MyDictCoder())
def process(self,element,DictState=beam.DoFn.StateParam(DICTSTATE)):
# Do something
yield DictState
并在管道内添加这一行(如 Beam example 中所做的那样)
beam.coders.registry.register_coder(typing.Dict,MyDictCoder)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。