微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Azure 认知搜索接收器 Kafka 连接器不工作

如何解决Azure 认知搜索接收器 Kafka 连接器不工作

我正在为 Azure 认知搜索创建 Kafka Connect 连接器,但出现错误。不确定我是否缺少任何配置。

连接器的 Json 配置:

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
      "name": "azure-search","config": {
        "topics": "mytopic","tasks.max": "1","connector.class": "io.confluent.connect.azure.search.AzureSearchSinkConnector","confluent.topic.bootstrap.servers": "localhost:9092","confluent.topic.replication.factor": "1","azure.search.service.name": "myazuresearch","index.name": "property-hub-final-out","azure.search.api.key": "<my key>","reporter.bootstrap.servers": "localhost:9092","reporter.error.topic.name": "test-error","reporter.error.topic.replication.factor": "1","reporter.error.topic.key.format": "string","reporter.error.topic.value.format": "string","reporter.result.topic.name": "test-result","reporter.result.topic.key.format": "string","reporter.result.topic.value.format": "string","reporter.result.topic.replication.factor": "1","key.converter": "org.apache.kafka.connect.storage.StringConverter","key.converter.schemas.enable": "false","value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://localhost:8081"
      }
    }'

错误

    tasks": [
    {
    "id": 0,"state": "Failed","worker_id": "192.168.245.73:8083","trace": "java.lang.NoClassDefFoundError: org/codehaus/jackson/node/ValueNode\n\tat io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:71)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:298)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:546)\n\tat org.apache.kafka.connect.runtime.distributed.distributedHerder.startTask(distributedHerder.java:1290)\n\tat org.apache.kafka.connect.runtime.distributed.distributedHerder.access$1700(distributedHerder.java:130)\n\tat org.apache.kafka.connect.runtime.distributed.distributedHerder$10.call(distributedHerder.java:1305)\n\tat org.apache.kafka.connect.runtime.distributed.distributedHerder$10.call(distributedHerder.java:1301)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.classNotFoundException: org.codehaus.jackson.node.ValueNode\n\tat java.base/java.net.urlclassloader.findClass(urlclassloader.java:471)\n\tat java.base/java.lang.classLoader.loadClass(ClassLoader.java:589)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)\n\tat java.base/java.lang.classLoader.loadClass(ClassLoader.java:522)\n\t... 11 more\n"
    }
    ],

我尝试将 jackson-core-asl-1.9.13.jar jar 添加

/confluent-6.1.1/share/java/kafka
/confluent-6.1.1/share/java/schema-registry
/confluent-6.1.1/share/java/kafka-connect-replicator

但仍然出现相同的错误

以下是Kafka消息的值模式:

    {
      "type": "record","name": "Test","namespace": "com.test.schema","fields": [
        {"name": "client_id","type": ["null","string"]},{"name": "client_name","string"]}
      ]
    }

解决方法

我在使用最新的连接器版本 (1.0.4) 时遇到了同样的问题,您可以改用 1.0.1 版本的连接器,它没有这个问题。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。