微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Camel-Kafka安全协议SASL_SASL无法正常工作

如何解决Camel-Kafka安全协议SASL_SASL无法正常工作

以Camel-Quarkus提供的官方示例为起点,我修改了逻辑以便写给Kafka经纪人。在Camel Kafka组件指向本地经纪人的情况下,所有效果都很好

尝试联系我们的Confluent Cloud broker,事情变得有些复杂。我们使用的安全协议是SASL_SSL。以下代码段导致在该问题末尾添加日志。为了重现,请在https://github.com/LeonardoBonacci/camel-kafka-sasl

中找到完整的代码
final String brokers = "the-kafka-host.confluent.cloud:9092";
final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD";

from("direct:start")
    .setBody(exchange -> "I do not arrive")
    .log(LoggingLevel.INFO,"Sending to Kafka: ${body}")
    .to("kafka:foo-topic?" 
         + "brokers=" + brokers
         + "&saslMechanism=PLAIN"  
         + "&securityProtocol=SASL_SSL"
         + "&sslEndpointAlgorithm=HTTPS"
         + "&saslJaasConfig=" + saslJaasConfig);

记录的 ProducerConfig 似乎正确。当我在普通的Kafka Producer中使用相同的凭据时,除了将记录写入Kafka主题外,它还会打印几乎相等的 ProducerConfig 。这表明配置值可以很好地传播到基础生产者。

解释日志似乎SSL握手有效。下一步不太成功: SaslClientAuthenticator 的失败尝试。

从各种矛盾的博客文章和官方文档中,我无法推断出是否真正支持SASL_SSL。

有人可以帮助我解决这个问题吗?非常感谢!

2020-11-02 07:16:03,244 DEBUG [org.apa.cam.sup.DefaultComponent] (Quarkus Main Thread) Creating endpoint uri=[direct://start],path=[start]
2020-11-02 07:16:03,246 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) direct://start converted to endpoint: direct://start by component: org.apache.camel.component.direct.DirectComponent@28f69db6
2020-11-02 07:16:03,261 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Resolving language: simple
2020-11-02 07:16:03,266 DEBUG [org.apa.cam.rei.LogReifier] (Quarkus Main Thread) LogName is not configured,using route id as logName: route1
2020-11-02 07:16:03,267 DEBUG [org.apa.cam.imp.con.CoreTypeConverterRegistry] (Quarkus Main Thread) Promoting fallback type converter as a kNown type converter to convert from: org.apache.camel.LoggingLevel to: java.lang.String for the fallback converter: org.apache.camel.impl.converter.EnumTypeConverter@1fecfc7b
2020-11-02 07:16:03,269 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using ComponentResolver: org.apache.camel.quarkus.core.FastCamelContext$$Lambda$693/0x0000000840394040@51356a7e to resolve component with name: kafka
2020-11-02 07:16:03,269 DEBUG [org.apa.cam.sup.ResolverHelper] (Quarkus Main Thread) Lookup Component with name kafka in registry. Found: org.apache.camel.component.kafka.KafkaComponent@a614e14
2020-11-02 07:16:03,270 DEBUG [org.apa.cam.imp.eng.DefaultConfigurerResolver] (Quarkus Main Thread) Found configurer: kafka-component via type: org.apache.camel.component.kafka.KafkaComponentConfigurer via: meta-inf/services/org/apache/camel/configurer/kafka-component
2020-11-02 07:16:03,270 DEBUG [org.apa.cam.imp.eng.DefaultConfigurerResolver] (Quarkus Main Thread) Found configurer: kafka-endpoint via type: org.apache.camel.component.kafka.KafkaEndpointConfigurer via: meta-inf/services/org/apache/camel/configurer/kafka-endpoint
2020-11-02 07:16:03,272 DEBUG [org.apa.cam.sup.DefaultComponent] (Quarkus Main Thread) Creating endpoint uri=[kafka://foo-topic?brokers=the-kafka-host.confluent.cloud%3A9092&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslEndpointAlgorithm=HTTPS],path=[foo-topic]
2020-11-02 07:16:03,278 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) kafka://foo-topic?brokers=the-kafka-host.confluent.cloud%3A9092&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslEndpointAlgorithm=HTTPS converted to endpoint: kafka://foo-topic?brokers=the-kafka-host.confluent.cloud%3A9092&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslEndpointAlgorithm=HTTPS by component: org.apache.camel.component.kafka.KafkaComponent@a614e14
2020-11-02 07:16:03,282 DEBUG [org.apa.cam.sup.EventHelper] (Quarkus Main Thread) Ignoring notifying event Initialized CamelContext: camel-1. The EventNotifier has not been started yet: org.apache.camel.quarkus.core.CamelManagementEventBridge@7650b836
2020-11-02 07:16:03,282 DEBUG [org.apa.cam.sup.EventHelper] (Quarkus Main Thread) Ignoring notifying event Initialized CamelContext: camel-1. The EventNotifier has not been started yet: org.apache.camel.quarkus.core.CamelContextRuntime$1@519e862a
2020-11-02 07:16:03,283 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Resolving language: simple
2020-11-02 07:16:03,285 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Apache Camel 3.6.0 (camel-1) is starting
2020-11-02 07:16:03,287 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using ClassResolver=org.apache.camel.impl.engine.DefaultClassResolver@35084cf5,PackageScanClassResolver=org.apache.camel.impl.engine.DefaultPackageScanClassResolver@2b7ebe50,ApplicationContextClassLoader=null,RouteController=org.apache.camel.impl.engine.DefaultRouteController@5bfb4540
2020-11-02 07:16:03,289 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2020-11-02 07:16:03,289 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using HeadersMapFactory: org.apache.camel.impl.engine.DefaultHeadersMapFactory@1ae00ddf
2020-11-02 07:16:03,290 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using ReactiveExecutor: org.apache.camel.impl.engine.DefaultReactiveExecutor@1621a5c3
2020-11-02 07:16:03,290 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using ThreadPoolFactory: org.apache.camel.support.DefaultThreadPoolFactory@205f397
2020-11-02 07:16:03,293 DEBUG [org.apa.cam.imp.eng.InternalRouteStartupManager] (Quarkus Main Thread) Warming up route id: route1 having autoStartup=true
2020-11-02 07:16:03,304 INFO  [org.apa.kaf.cli.pro.ProducerConfig] (Quarkus Main Thread) ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [the-kafka-host.confluent.cloud:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        Metadata.max.age.ms = 300000
        Metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = PLAIN
        security.protocol = SASL_SSL
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = HTTPS
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2020-11-02 07:16:03,515 INFO  [org.apa.kaf.com.sec.aut.AbstractLogin] (Quarkus Main Thread) Successfully logged in.
2020-11-02 07:16:03,585 DEBUG [org.apa.kaf.com.sec.ssl.SslEngineBuilder] (Quarkus Main Thread) Created SSL context with keystore null,truststore null,provider SunjsSE.
2020-11-02 07:16:03,622 DEBUG [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Starting Kafka producer I/O thread.
2020-11-02 07:16:03,623 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Quarkus Main Thread) Kafka version: 2.5.0
2020-11-02 07:16:03,625 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Initialize connection to node the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) for sending Metadata request
2020-11-02 07:16:03,625 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Quarkus Main Thread) Kafka commitId: 66563e712b0b9f84
2020-11-02 07:16:03,630 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Quarkus Main Thread) Kafka startTimeMs: 1604254563622
2020-11-02 07:16:03,635 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Initiating connection to node the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) using address the-kafka-host.confluent.cloud/123.123.123.123
2020-11-02 07:16:03,636 DEBUG [org.apa.kaf.cli.pro.KafkaProducer] (Quarkus Main Thread) [Producer clientId=producer-1] Kafka producer started
2020-11-02 07:16:03,636 DEBUG [org.apa.cam.com.kaf.KafkaProducer] (Quarkus Main Thread) Created KafkaProducer: org.apache.kafka.clients.producer.KafkaProducer@1939e92
2020-11-02 07:16:03,644 DEBUG [org.apa.cam.imp.eng.BaseExecutorServiceManager] (Quarkus Main Thread) Created new ThreadPool for source: kafka://foo-topic?brokers=the-kafka-host.confluent.cloud%3A9092&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslEndpointAlgorithm=HTTPS with name: KafkaProducer[foo-topic]. -> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@4a80dbc9[Running,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 0][KafkaProducer[foo-topic]]
2020-11-02 07:16:03,646 DEBUG [org.apa.cam.imp.eng.InternalRouteStartupManager] (Quarkus Main Thread) Route: route1 >>> Route[direct://start -> null]
2020-11-02 07:16:03,646 DEBUG [org.apa.cam.imp.eng.InternalRouteStartupManager] (Quarkus Main Thread) Starting consumer (order: 1000) on route: route1
2020-11-02 07:16:03,648 DEBUG [org.apa.cam.sup.DefaultConsumer] (Quarkus Main Thread) Init consumer: Consumer[direct://start]
2020-11-02 07:16:03,648 DEBUG [org.apa.cam.sup.DefaultConsumer] (Quarkus Main Thread) Starting consumer: Consumer[direct://start]
2020-11-02 07:16:03,649 INFO  [org.apa.cam.imp.eng.InternalRouteStartupManager] (Quarkus Main Thread) Route: route1 started and consuming from: direct://start
2020-11-02 07:16:03,652 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Total 1 routes,of which 1 are started
2020-11-02 07:16:03,652 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Apache Camel 3.6.0 (camel-1) started in 0.365 seconds
2020-11-02 07:16:03,655 INFO  [io.quarkus] (Quarkus Main Thread) sasl 1.0 on JVM (powered by Quarkus 1.9.0.Final) started in 2.528s.
2020-11-02 07:16:03,656 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2020-11-02 07:16:03,661 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [camel-core,camel-direct,camel-kafka,camel-support-common,cdi]
2020-11-02 07:16:03,661 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to SEND_APIVERSIONS_REQUEST
2020-11-02 07:16:03,662 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Creating SaslClient: client=null;service=kafka;serviceHostname=the-kafka-host.confluent.cloud;mechs=[PLAIN]
2020-11-02 07:16:03,672 DEBUG [org.apa.kaf.com.net.Selector] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Created socket with SO_RCVBUF = 65536,SO_SNDBUF = 131072,SO_TIMEOUT = 0 to node -1
2020-11-02 07:16:03,740 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
2020-11-02 07:16:03,859 DEBUG [org.apa.kaf.com.net.SslTransportLayer] (kafka-producer-network-thread | producer-1) [SslTransportLayer channelId=-1 key=channel=java.nio.channels.socketChannel[connection-pending remote=the-kafka-host.confluent.cloud/123.123.123.123:9092],selector=sun.nio.ch.WindowsSelectorImpl@652ed71e,interestOps=8,readyOps=0] SSL handshake completed successfully with peerHost 'the-kafka-host.confluent.cloud' peerPort 9092 peerPrincipal 'CN=*.ap-southeast-2.aws.confluent.cloud' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384'
2020-11-02 07:16:03,895 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2020-11-02 07:16:03,935 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to SEND_HANDSHAKE_REQUEST
2020-11-02 07:16:03,936 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2020-11-02 07:16:03,974 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to INITIAL
2020-11-02 07:16:03,977 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to INTERMEDIATE
2020-11-02 07:16:05,155 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to Failed
2020-11-02 07:16:05,155 INFO  [org.apa.kaf.com.net.Selector] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Failed authentication with the-kafka-host.confluent.cloud/123.123.123.123 (Authentication Failed)
2020-11-02 07:16:05,160 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Node -1 disconnected.
2020-11-02 07:16:05,160 ERROR [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Connection to node -1 (the-kafka-host.confluent.cloud/123.123.123.123:9092) Failed authentication due to: Authentication Failed
2020-11-02 07:16:05,161 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Bootstrap broker the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-02 07:16:05,261 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending Metadata request since no node is available
2020-11-02 07:16:05,311 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Initialize connection to node the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) for sending Metadata request
2020-11-02 07:16:05,311 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Initiating connection to node the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) using address the-kafka-host.confluent.cloud/123.123.123.123
2020-11-02 07:16:05,314 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to SEND_APIVERSIONS_REQUEST
2020-11-02 07:16:05,317 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Creating SaslClient: client=null;service=kafka;serviceHostname=the-kafka-host.confluent.cloud;mechs=[PLAIN]
2020-11-02 07:16:05,349 DEBUG [org.apa.kaf.com.net.Selector] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Created socket with SO_RCVBUF = 65536,SO_TIMEOUT = 0 to node -1
2020-11-02 07:16:05,357 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
2020-11-02 07:16:05,400 DEBUG [org.apa.kaf.com.net.SslTransportLayer] (kafka-producer-network-thread | producer-1) [SslTransportLayer channelId=-1 key=channel=java.nio.channels.socketChannel[connection-pending remote=the-kafka-host.confluent.cloud/123.123.123.123:9092],readyOps=0] SSL handshake completed successfully with peerHost 'the-kafka-host.confluent.cloud' peerPort 9092 peerPrincipal 'CN=*.ap-southeast-2.aws.confluent.cloud' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384'
2020-11-02 07:16:05,400 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2020-11-02 07:16:05,440 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to SEND_HANDSHAKE_REQUEST
2020-11-02 07:16:05,441 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2020-11-02 07:16:05,476 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to INITIAL
2020-11-02 07:16:05,477 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to INTERMEDIATE
2020-11-02 07:16:06,722 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to Failed
2020-11-02 07:16:06,722 INFO  [org.apa.kaf.com.net.Selector] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Failed authentication with the-kafka-host.confluent.cloud/123.123.123.123 (Authentication Failed)
2020-11-02 07:16:06,724 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Node -1 disconnected.
2020-11-02 07:16:06,724 ERROR [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Connection to node -1 (the-kafka-host.confluent.cloud/123.123.123.123:9092) Failed authentication due to: Authentication Failed
2020-11-02 07:16:06,727 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Bootstrap broker the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-02 07:16:06,824 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending Metadata request since no node is available
2020-11-02 07:16:06,876 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending Metadata request since no node is available
2020-11-02 07:16:06,927 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending Metadata request since no node is available
2020-11-02 07:16:06,978 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending Metadata request since no node is available
2

解决方法

首先,确保您的saslJaasConfig

  1. 用户名被转义的双引号或单引号括起来:

    required username=\"USERNAME\"required username='USERNAME'

  2. 密码被转义的双引号或单引号括起来:

    password=\"PASSWORD\"password='PASSWORD'

  3. ; 结尾

    String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";

重要提示: 如果密码包含特殊字符,这可能会导致问题,因为 Camel 的字符串解析器可能会将这些特殊字符与路由中的其他参数混淆。如果您的密码包含这些,您可以将 saslJaasConfig 表示为 RAW

String saslJaasConfig = "RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";)";

就我而言,这解决了问题。它还解释了为什么使用 DSL 可以解决这个问题 - 使用 DSL,值是以声明方式设置的,不需要进行字符串解析。

,

我认为信任库文件存在问题,您必须在客户端提及信任库文件,请仅检查 sslTruststoreLocation 属性

dropoff_location
,

使用dsl可以解决问题:

kafka(topic)
           .brokers(props.getProperty("bootstrap.servers"))
           .saslMechanism(props.getProperty("sasl.mechanism")) 
           .securityProtocol(props.getProperty("security.protocol"))
           .sslEndpointAlgorithm(props.getProperty("ssl.endpoint.identification.algorithm"))
           .saslJaasConfig(props.getProperty("sasl.jaas.config")))

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。