如何解决debezium 如何手动提交偏移量
我使用 debezium 将数据从 Postgres 同步到 flink,并使用此代码创建引擎
this.engine = DebeziumEngine.create(Connect.class)
.using(properties)
.notifying(debeziumConsumer)
.using((success,message,error) -> {
if (!success && error != null) {
this.reportError(error);
}
})
.build();
我想在 flink 执行检查点时调用 ChangeEventSourceCoordinator#commitOffset
,但是 coordinator
在 BaseSourceTask
中是私有的,而 task
在 EmbeddedEngine
中是私有的,所以我可以' t 在我的代码中调用 commitOffset
,还有其他方法可以实现手动提交吗?
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>{
private SourceTask task;
}
public abstract class BaseSourceTask extends SourceTask {
private ChangeEventSourceCoordinator coordinator;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。