微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

数据流作业不会在 GroupByKey() 之后发出消息

如何解决数据流作业不会在 GroupByKey() 之后发出消息

我有一个写入 BQ 的流数据流管道,我想对所有失败的行进行窗口化并做一些进一步的分析。管道看起来像这样,我在第二步中收到了所有错误消息,但所有消息都卡在了 beam.GroupByKey() 中。之后没有任何东西向下游移动。有没有人知道如何解决这个问题?

data = (
            | "Read PubSub Messages" >> beam.io.ReadFrompubSub(subscription=options.input_subscription,with_attributes=True)
            ...
            | "write to BQ" >> beam.io.WritetoBigQuery(
                table=f"{options.bq_dataset}.{options.bq_table}",write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND,method='STREAMING_INSERTS',insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER
            )
        )

(
   data[beam.io.gcp.bigquery.BigQueryWriteFn.Failed_ROWS]
                | f"Window  into: {options.window_size}m" >> GroupWindowsIntoBatches(options.window_size)
                | f"Failed Rows for " >> beam.ParDo(BadRows(options.bq_dataset,'table'))
)

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries,where each contains one message
    and its publish timestamp.
    """

    def __init__(self,window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self,pcoll):
        return (
            pcoll
            # Assigns window info to each Pub/Sub message based on its publish timestamp.
            | "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(10))
            # If the windowed elements do not fit into memory please consider using `beam.util.BatchElements`.
            | "Add Dummy Key" >> beam.Map(lambda elem: (None,elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _,val: val)
        )

此外,我不知道它是否相关,但我的 beam.DoFn.TimestampParam 中的 GroupWindowsIntoBatches 具有无效的时间戳(负)

解决方法

好的,问题是来自 BigQuery FAILED_ROWS 的消息没有时间戳。添加 | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x,time.time())) 似乎可以修复该组。

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries,where each contains one message
    and its publish timestamp.
    """

    def __init__(self,window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self,pcoll):
        return (
            pcoll
            | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x,time.time())) <----- Added This line
            | "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(30))
            | "Add Dummy Key" >> beam.Map(lambda elem: (None,elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _,val: val)
        )

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。