微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

指定要通过GroupByKey传递的TaggedOutput的类型作为CombinePerKey的一部分

如何解决指定要通过GroupByKey传递的TaggedOutput的类型作为CombinePerKey的一部分

当我尝试基于apap Beam管道将项目从python 3.7迁移到3.8时,类型提示检查在该位置开始失败:

pcoll = (
    wrong_pcoll,some_pcoll_1,some_pcoll_2,some_pcoll_3,) | beam.Flatten(pipeline=pipeline)
pcoll | beam.CombinePerKey(MyCombineFn())  # << here

出现此错误

apache_beam.typehints.decorators.TypeCheckerror: Input type hint violation at GroupByKey: expected Tuple[TypeVariable[K],TypeVariable[V]],got Union[TaggedOutput,Tuple[Any,Any],_MyType1],_MyType2]]

wrong_pcoll实际上是一个TaggedOutput,因为它是从以前的ptransform上的一个标记输出接收的。

当作为TaggedOutput的wrong_pcoll类型(属于pcoll类型的一部分(与异常Union[TaggedOutput,_MyType2]]对应)的一部分传递给GrouByKey时,类型提示检查失败在CombinePerKey内部使用。

所以我有两个问题:

  1. 为什么它在python 3.7中起作用而在3.8上不起作用?
  2. 如何为标记输出指定类型?我尝试为PTransform的process()方法指定类型,该方法将它生成为它产生的所有输出类型的并集,但是由于某种原因,类型提示检查被选择为错误的类型。然后,我严格指定了所需的类型:Tuple[Any,Any],它已经起作用了。但这不是一种方法,因为process()还会产生其他类型,例如简单的str

作为一种解决方法,我可以将此wrong_pcoll通过带有beam.Maplambda x: x的简单.with_output_types(Tuple[Any,Any])进行传递,但这似乎不是解决它的明确方法

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。