微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

当源连接器无法暂停时,应如何为现有主题的新使用者组配置偏移量

如何解决当源连接器无法暂停时,应如何为现有主题的新使用者组配置偏移量

我们有一个现有的主题,其中JDBC源连接器使用增量+时间戳记模式发布数据(源连接器使用增量+时间戳记(https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html#incremental-query-modes

我们有现有的消费群体,这些消费群体消费一些现有主题的数据。现在,我们引入一个新的使用者组(称为组k),该组应使用相同主题的数据并写入数据库。首先,我们有一个初始的数据迁移工作流程,以获取数据库的转储并将转储复制到目标数据库,然后再开始使用现有主题中的消息。

现在,当消费群体开始时,我想知道应该从哪个偏移量开始?

一种选择是使用最新的。但是问题在于,当为这个新的使用者组进行初始数据迁移时,现有的源连接器会将数据发布到现有的主题中。在我们的例子中,我们要迁移10个表,在进行表转储时可能会有间隙,但是仍对源数据库进行了一些更改,因此数据将添加主题中。因此,我们可能会错过处理某些记录的机会。

我们没有选择暂停源连接器的选项,这将为我们解决问题。

如果我们最早使用偏移量,那么最终将处理kafka主题中的所有旧数据,因为我们已经完成了初始数据迁移,因此不需要。

我们希望仅维护一个源连接器,而不考虑使用者组的数量

我正在浏览kafka消费者API,例如seek,它需要时间戳。我可以记下初始数据迁移之前的时间,并在使用者组启动并分配了分区后致电Consumer.seek。但是我找不到任何文档说时间戳基于GMT或其他。可以通过传递从纪元经过的毫秒数的时间来使用此API吗?

解决方法

如果我正确理解了这一句话:“如果我们使用最新偏移量,则可能会丢失一些数据,因为源连接器可能会在初始数据迁移期间向该主题写入了一些数据”,该主题最终将拥有一些来自初始加载和CDC的数据数据混合在一起,因此没有偏移可以清楚地区分这一点。因此,您将无法设置任何特定的偏移量。

我看到以下选项:

  • 让您的消费者组K过滤出初始负载数据并最早读取
  • 将初始加载数据生成为专用主题
  • 如果可能,请在工作时间之外执行初始加载,以免CDC数据流过(可能是周末或银行假期)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。