微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法通过 Kafka Connect 将数据从 MongoDB 发送到 neo4j

如何解决无法通过 Kafka Connect 将数据从 MongoDB 发送到 neo4j

我正在尝试使用 Confluent 社区平台以及 MongoDB 和 neo4j 连接器将数据从 MongoDB 发送到 neo4j。我使用的是 neo4j 社区版 4.2.3。

我将 Mongo 配置为源,将 neo4j 配置为接收器。这些是我的连接器配置:

curl -X PUT http://localhost:8083/connectors/source-mongodb-test/config -H "Content-Type: application/json" -d '{
      "tasks.max":1,"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","connection.uri":"<my-connection.uri>","database":"data","collection":"test","pipeline":"","topic.namespace.map":"{\"data\": \"default-topic\",\"data.test\": \"my-mongotesttopic\"}"
}'
curl -X PUT http://localhost:8083/connectors/sink-neo4j-test/config -H "Content-Type: application/json" -d '{
    "topics": "my-mongotesttopic","connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector","kafka.key.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer","kafka.value.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","errors.retry.timeout": "-1","errors.retry.delay.max.ms": "1000","errors.tolerance": "all","errors.log.enable": true,"errors.log.include.messages": true,"neo4j.server.uri": "bolt://<myIP>:7687","neo4j.authentication.basic.username": "neo4j","neo4j.authentication.basic.password": "<password>","neo4j.topic.cypher.my-mongotesttopic":
         "CREATE (t:Object {name: event.title})"
}'

出于测试目的,我实际上可以创建一个从 MongoDB 继承标题数据的节点以获取 neo4j 名称属性

这是kafkacat显示的消息:

{
    "_id": {"_data": "82608B06B9000000012B022C0100296E5A1004B52723604929469DA4FBB4F1CD6C528B46645F69640064608B06B9941B272EACC036740004"},"operationType": "insert","clusterTime": {
        "$timestamp": {
            "t": 1619723961,"i": 1
        }
    },"fullDocument": {
        "_id": {"$oid": "608b06b9941b272eacc03674"},"title": "Titel 9"
    },"ns": {
        "db": "data","coll": "test"
    },"documentKey": {
        "_id": {"$oid": "608b06b9941b272eacc03674"}
    }
}

但是我无法将标题放入neo4j。我还尝试将 sinkconfig 中的密码查询设置为 CREATE (t:Object),当我将数据插入 mongoDB 时,它创建了一个带有对象标签的简单节点。所以连接 kafka --> neo4j 似乎工作。我还尝试了不同的方法来定义名称属性内容,如 event.title、event.fulldocument.title 等,但没有任何效果。是不是我在接收器配置中忘记了一些东西,或者密码查询有什么问题?

提前致谢:-)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。