微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用在docker上运行的debezium和confluent-sink-connector将所有更改从源数据库复制到目标数据库

如何解决如何使用在docker上运行的debezium和confluent-sink-connector将所有更改从源数据库复制到目标数据库

下面的代码是我的Kafka-connect-JDBC和MysqL驱动程序的Dockerfile

FROM debezium/connect:1.3
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV MysqL_DRIVER_VERSION 8.0.20
ARG KAFKA_JDBC_VERSION=5.5.0
RUN curl -k -SL "https://dev.MysqL.com/get/Downloads/connector-j/mysql-connector-java-${MysqL_DRIVER_VERSION}.tar.gz" \
    | tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-8.0.20/mysql-connector-java-${MysqL_DRIVER_VERSION}.jar
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
    curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
docker build . --tag kafka kafka-connect-sink 

下面是我的源数据库json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
    "name": "inventory-connector","config": {
        "connector.class": "io.debezium.connector.MysqL.MysqLConnector","tasks.max": "1","database.hostname": "MysqL","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.include.list": "inventory","database.history.kafka.bootstrap.servers": "kafka:9092","database.history.kafka.topic": "dbhistory.inventory"
    }
}'

下面是我的目标数据库接收器json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
    "name": "inventory-connector-sink","config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","connection.url": "jdbc:MysqL://192.168.0.104:3306/pk?useSSL=false","connection.user": "pavan","connection.password": "root","topics": "dbserver1.inventory.customers","table.name.format": "pk.customers","auto.create": "true","auto.evolve": "true","delete.enabled": "true","insert.mode": "upsert","pk.fields": "id","pk.mode": "record_key","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","transforms.unwrap.delete.handling.mode": "rewrite"
    }
}'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
    "name": "inventory-connector-sink-addresses","topics": "dbserver1.inventory.addresses","table.name.format": "pk.addresses","transforms.unwrap.delete.handling.mode": "rewrite"
    }
}'

使用这种配置,我需要订阅每个主题,但是问题是我有100多个表要在目标db中进行复制,无论如何,我可以在单个json配置中做到这一点,以便我可以订阅所有主题

解决方法

您可以使用topics(或topics.regex)属性来定义要使用的主题列表,并使用JBDC Sink连接器或table.name.format SMT的RegexRouter属性(或将它们组合在一起) )以覆盖目标表名称:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
    "name": "inventory-connector-sink-addresses","config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false","connection.user": "pavan","connection.password": "root","topics": "dbserver1.inventory.addresses,dbserver1.inventory.customers","auto.create": "true","auto.evolve": "true","delete.enabled": "true","insert.mode": "upsert","pk.fields": "","pk.mode": "record_key","transforms": "route,unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","transforms.unwrap.delete.handling.mode": "rewrite","transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)","transforms.route.replacement": "pk.$3"
    }
}'

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。