如何解决Apache Kafka的mongoSourceConnector
我正在使用Apache Kafka和mongoSourceConnector来接收数据库更改(例如文档插入),并希望将其发送到kafka。在我的文档字段中,我将发送一个主题密钥,该密钥将把默认的kafka主题重写为文档的字段主题值。
为此,我尝试了smt ExtractTopic $ Value(io.confluent.connect.transforms.ExtractTopic)
我的配置如下:
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=<my_connection_uri>
database=<my_db>
collection=messages
#topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
pipeline=[{"$match": { "$or": [{"operationType": "insert"},{"operationType": "update"}]}}]
batch.size=0
change.stream.full.document=updateLookup
publish.full.document.only=true
collation=
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# key.converter=org.apache.kafka.connect.json.JsonConverter
# value.converter=org.apache.kafka.connect.json.JsonConverter
transforms=ValueFieldExample
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=topic
kafkacat返回时不带smt(暂时在此处搜索默认主题
{"_id": {"$oid": "5f719f1d559f00326a405b22"},"part_id": "","topic": "my_custom_topic","payload": "some payload data","__v": 0}
但是如果正在使用smt转换(ExtractTopic),则会抛出该错误:
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field extraction],found: java.lang.String
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。