微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka

如何解决使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka

我有以下使用 python beam sdk 连接到 kafka 的代码。我知道 ReadFromKafka 转换是在 java sdk 工具(docker 容器)中运行的,但我无法弄清楚如何在 sdk 工具中使 ssl.truststore.locationssl.keystore.location 可访问'码头环境。 job_endpoint 参数指向 java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081

pipeline_args.extend([
    '--job_name=paul_test','--runner=PortableRunner','--sdk_location=container','--job_endpoint=localhost:8099','--streaming',"--environment_type=DOCKER",f"--sdk_harness_container_image_overrides=.*java.*,{my_beam_sdk_docker_image}:{my_beam_docker_tag}",])

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline:
    kafka = pipeline | ReadFromKafka(
        consumer_config={
            "bootstrap.servers": "bootstrap-server:17032","security.protocol": "SSL","ssl.truststore.location": "/opt/keys/client.truststore.jks",# how do I make this available to the Java SDK harness 
            "ssl.truststore.password": "password","ssl.keystore.type": "PKCS12","ssl.keystore.location": "/opt/keys/client.keystore.p12",# how do I make this available to the Java SDK harness 
            "ssl.keystore.password": "password","group.id": "group","basic.auth.credentials.source": "USER_INFO","schema.registry.basic.auth.user.info": "user:password"
        },topics=["topic"],max_num_records=2,# expansion_service="localhost:56938"
    )

    kafka | beam.Map(lambda x: print(x))

我尝试将 image override 选项指定为 --sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest' - 其中 beam_java_sdk:latest 是我基于 apache/beam_java11_sdk:2.27.0 的 docker 映像,并在其 entrypoint.sh 中提取凭证。但是 Beam 似乎没有使用它,我明白了

INFO  org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory  - Still waiting for startup of environment apache/beam_java11_sdk:2.27.0 for worker id 1-1

在日志中。很快就会不可避免地紧随其后

Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /opt/keys/client.keystore.p12 of type PKCS12

总而言之,我的问题是,在 Apache Beam 中,是否可以从 python 梁 sdk 中使文件在 java sdk 工具 docker 容器中可用?如果是这样,怎么做?

非常感谢。

解决方法

目前,没有直接的方法来实现这一点。正在进行讨论和跟踪问题以提供对此类扩展服务定制的支持(请参阅 herehereBEAM-12538BEAM-12539)。这就是简短的回答。

长答案是肯定的,你可以这样做。您必须将 ExpansionService.java 复制并粘贴到您的代码库中并构建您的自定义扩展服务,您可以在其中指定默认环境 (DOCKER) 和默认环境配置(您的图像)here。然后,您必须手动运行此扩展服务并使用 ReadFromKafka 的 expansion_service 参数指定其地址。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。