微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

KeyError: 'beam:coders:javasdk:0.1' with KafkaIO and Python external transform

如何解决KeyError: 'beam:coders:javasdk:0.1' with KafkaIO and Python external transform

我正在尝试设置一个 Apache Beam Java 管道:

  • 从 Kafka 读取消息
  • 调用外部 Python 转换
  • 输出写入 Kafka

在此之前,我尝试了没有 Kafka 的简单管道:例如,使用“Create”在 Java 中生成一些测试值,然后将它们传递给一个虚拟的 Python 转换。 到目前为止,这是有效的。

这是管道代码的摘录:

    public static void main(String[] args) {
        // ...
        pipeline
            .apply(KafkaIO.<String,String>read()
                .withBootstrapServers(servers)
                .withTopic(readTopic)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata()
            )
            .apply(new CrossLanguageTransform(options.getExpansionServiceURL()))
            .apply(KafkaIO.<String,String>write()
                .withBootstrapServers(servers)
                .withTopic(writetopic)
                .withKeySerializer(StringSerializer.class)
                .withValueSerializer(StringSerializer.class)
            );

        pipeline.run().waitUntilFinish();
    }

这是外部转换包装器:

public class CrossLanguageTransform extends PTransform<PCollection<KV<String,String>>,PCollection<KV<String,String>>> {
    private static final String URN = "beam:transforms:xlang:pythonwordcount";

    private static String expansionAddress;

    public CrossLanguageTransform(String expansionAddress) {
        this.expansionAddress = expansionAddress;
    }

    @Override
    public PCollection<KV<String,String>> expand(PCollection<KV<String,String>> pcoll) {
        return pcoll.apply(
            "PythonWordCount",External.of(URN,new byte [] {},expansionAddress)
        );
    }
}

这是我的 Python 转换的摘录:

URN = 'beam:transforms:xlang:pythonwordcount'

@PTransform.register_urn(URN,None)
class PythonWordCount(PTransform):

    def expand(self,pcoll: PCollection[Tuple[str,str]]) -> PCollection[Tuple[str,str]]:
        return (
            pcoll |
            'GetValues' >> Values() |
            'Split' >> FlatMap(lambda x: re.findall(r'[A-Za-z\']+',x)).with_output_types(str) |
            'PairWithOne' >> Map(lambda x: (x,1)) |
            'GruoupAndSumWindow' >> WindowInto(FixedWindows(10)) |
            'GroupAndSum' >> CombinePerKey(sum) |
            'Format' >> Map(format_result)
        )

    def to_runner_api_parameter(self,unused_context):
        return URN,None

    @staticmethod
    def from_runner_api_parameter(unused_ptransform,unused_paramter,unused_context):
        return PythonWordCount()

我这样设置 Python 的扩展服务:

./expansion_service.py -p 9097

还有 Flink 的作业服务器:

# From Apache Beam's repository
./gradlew :runners:flink:1.13:job-server:runShadow

当我像这样运行 Java 管道时:

mvn exec:java -Dexec.mainClass=org.apache.beam.examples.CrossLanguagePipeline -Pportable-runner -Dexec.args="--runner=PortableRunner --jobEndpoint=localhost:8099 --useExternal=true --expansionServiceURL=localhost:9097 --experiments=beam_fn_api"

我得到以下堆栈跟踪:

java.lang.RuntimeException: expansion service error: Traceback (most recent call last):
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/portability/expansion_service.py",line 58,in Expand
    producers = {
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/portability/expansion_service.py",line 59,in <dictcomp>
    pcoll_id: (context.transforms.get_by_id(t_id),pcoll_tag)
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",line 114,in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pipeline.py",line 1350,in from_runner_api
    result.outputs = {
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pipeline.py",line 1351,in <dictcomp>
    None if tag == 'None' else tag: context.pcollections.get_by_id(id)
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pvalue.py",line 207,in from_runner_api
    element_type=context.element_type_from_coder_id(proto.coder_id),File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",line 267,in element_type_from_coder_id
    self.coders[coder_id].to_type_hint())
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",line 163,in __getitem__
    return self.get_by_id(id)
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py",line 364,in from_runner_api
    [context.coders.get_by_id(c) for c in coder_proto.component_coder_ids],File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py",in <listcomp>
    [context.coders.get_by_id(c) for c in coder_proto.component_coder_ids],line 361,in from_runner_api
    parameter_type,constructor = cls._kNown_urns[coder_proto.spec.urn]
KeyError: 'beam:coders:javasdk:0.1'

    at org.apache.beam.runners.core.construction.External$ExpandableTransform.expand (External.java:222)
    at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:548)
    at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:499)
    at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:376)
    at org.apache.beam.examples.CrossLanguageTransform.expand (CrossLanguageTransform.java:19)
    at org.apache.beam.examples.CrossLanguageTransform.expand (CrossLanguageTransform.java:8)
    at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:548)
    at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:482)
    at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:363)
    at org.apache.beam.examples.CrossLanguagePipeline.main (CrossLanguagePipeline.java:49)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)

我不知道为什么这在其他示例中不起作用。
类型似乎在 KafkaIO.readPythonWordCount expand 方法之间匹配。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。