如何解决数据流作业不会在 GroupByKey() 之后发出消息
我有一个写入 BQ 的流数据流管道,我想对所有失败的行进行窗口化并做一些进一步的分析。管道看起来像这样,我在第二步中收到了所有错误消息,但所有消息都卡在了 beam.GroupByKey()
中。之后没有任何东西向下游移动。有没有人知道如何解决这个问题?
data = (
| "Read PubSub Messages" >> beam.io.ReadFrompubSub(subscription=options.input_subscription,with_attributes=True)
...
| "write to BQ" >> beam.io.WritetoBigQuery(
table=f"{options.bq_dataset}.{options.bq_table}",write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND,method='STREAMING_INSERTS',insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER
)
)
(
data[beam.io.gcp.bigquery.BigQueryWriteFn.Failed_ROWS]
| f"Window into: {options.window_size}m" >> GroupWindowsIntoBatches(options.window_size)
| f"Failed Rows for " >> beam.ParDo(BadRows(options.bq_dataset,'table'))
)
和
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries,where each contains one message
and its publish timestamp.
"""
def __init__(self,window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self,pcoll):
return (
pcoll
# Assigns window info to each Pub/Sub message based on its publish timestamp.
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(10))
# If the windowed elements do not fit into memory please consider using `beam.util.BatchElements`.
| "Add Dummy Key" >> beam.Map(lambda elem: (None,elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _,val: val)
)
此外,我不知道它是否相关,但我的 beam.DoFn.TimestampParam
中的 GroupWindowsIntoBatches
具有无效的时间戳(负)
解决方法
好的,问题是来自 BigQuery FAILED_ROWS 的消息没有时间戳。添加 | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x,time.time()))
似乎可以修复该组。
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries,where each contains one message
and its publish timestamp.
"""
def __init__(self,window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self,pcoll):
return (
pcoll
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x,time.time())) <----- Added This line
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(30))
| "Add Dummy Key" >> beam.Map(lambda elem: (None,elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _,val: val)
)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。