微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Beam:ReadFromKafka 抛出 grpc._channel._InactiveRpcError

如何解决Apache Beam:ReadFromKafka 抛出 grpc._channel._InactiveRpcError

我正在尝试从 kakfa 接收数据并对其进行处理。首先,我尝试使用带有火花流的 kafka。火花流与运行在“localhost:9092”上的 Kafka 生产者配合得很好。所以我确信我本地计算机上的 spark 与 docker 中的 Kafka 生产者配合得很好。

但是当我在 Kafka 上尝试使用 Apache Beam 时,出现错误。我验证了 Spark 作业服务器在 docker 中成功运行。但是,我仍然收到如下所示的错误。感谢您的帮助。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,StandardOptions
from environment_variables import *
from apache_beam.io.kafka import ReadFromKafka

def run_pipeline():
  options = PipelineOptions(runner='SparkRunner',job_endpoint = 'localhost:8099',environment_type='LOOPBACK')
  options.view_as(StandardOptions).streaming = True
  with beam.Pipeline(options=options) as p:
      p | "kafka" >> beam.io.kafka.ReadFromKafka(consumer_config={"bootstrap.servers": 'localhost:9092'},topics=[TOPICS],expansion_service='localhost:8097')
      p | "print" >> beam.Map(print)

run_pipeline()

错误

Traceback (most recent call last):
  File "X:/Git_repo/project_red/Beam_streaming/junk6.py",line 18,in <module>
    run_pipeline()
  File "X:/Git_repo/project_red/Beam_streaming/junk6.py",line 16,in run_pipeline
    p | "kafka" >> beam.io.kafka.ReadFromKafka(consumer_config={"bootstrap.servers": 'localhost:9092'},expansion_service='localhost:8097')
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\transforms\ptransform.py",line 1058,in __ror__
    return self.transform.__ror__(pvalueish,self.label)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\transforms\ptransform.py",line 573,in __ror__
    result = p.apply(self,pvalueish,label)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\pipeline.py",line 646,in apply
    return self.apply(transform,pvalueish)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\pipeline.py",line 689,in apply
    pvalueish_result = self.runner.apply(transform,self._options)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\runners\runner.py",line 188,in apply
    return m(transform,input,options)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\runners\runner.py",line 218,in apply_PTransform
    return transform.expand(input)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\transforms\external.py",line 316,in expand
    response = service.Expand(request)
  File "X:\Git_repo\project_red\venv\lib\site-packages\grpc\_channel.py",line 923,in __call__
    return _end_unary_response_blocking(state,call,False,None)
  File "X:\Git_repo\project_red\venv\lib\site-packages\grpc\_channel.py",line 826,in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "Failed to connect to all addresses"
    debug_error_string = "{"created":"@1615358925.981000000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":5397,"referenced_errors":[{"created":"@1615358925.981000000","description":"Failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":398,"grpc_status":14}]}"
>

在 Docker 上运行的作业服务器

Job Server running at Docker

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。