微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

debezium 如何手动提交偏移量

如何解决debezium 如何手动提交偏移量

我使用 debezium 将数据从 Postgres 同步到 flink,并使用此代码创建引擎

this.engine = DebeziumEngine.create(Connect.class)
            .using(properties)
            .notifying(debeziumConsumer)
            .using((success,message,error) -> {
                if (!success && error != null) {
                    this.reportError(error);
                }
            })
            .build();

我想在 flink 执行检查点时调用 ChangeEventSourceCoordinator#commitOffset,但是 coordinatorBaseSourceTask 中是私有的,而 taskEmbeddedEngine 中是私有的,所以我可以' t 在我的代码调用 commitOffset,还有其他方法可以实现手动提交吗?

public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>{
       private SourceTask task;
}
public abstract class BaseSourceTask extends SourceTask {
       private ChangeEventSourceCoordinator coordinator;
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。