如何解决KeyError: 'beam:coders:javasdk:0.1' with KafkaIO and Python external transform
我正在尝试设置一个 Apache Beam Java 管道:
在此之前,我尝试了没有 Kafka 的简单管道:例如,使用“Create”在 Java 中生成一些测试值,然后将它们传递给一个虚拟的 Python 转换。 到目前为止,这是有效的。
这是管道代码的摘录:
public static void main(String[] args) {
// ...
pipeline
.apply(KafkaIO.<String,String>read()
.withBootstrapServers(servers)
.withTopic(readTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
)
.apply(new CrossLanguageTransform(options.getExpansionServiceURL()))
.apply(KafkaIO.<String,String>write()
.withBootstrapServers(servers)
.withTopic(writetopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
);
pipeline.run().waitUntilFinish();
}
这是外部转换包装器:
public class CrossLanguageTransform extends PTransform<PCollection<KV<String,String>>,PCollection<KV<String,String>>> {
private static final String URN = "beam:transforms:xlang:pythonwordcount";
private static String expansionAddress;
public CrossLanguageTransform(String expansionAddress) {
this.expansionAddress = expansionAddress;
}
@Override
public PCollection<KV<String,String>> expand(PCollection<KV<String,String>> pcoll) {
return pcoll.apply(
"PythonWordCount",External.of(URN,new byte [] {},expansionAddress)
);
}
}
这是我的 Python 转换的摘录:
URN = 'beam:transforms:xlang:pythonwordcount'
@PTransform.register_urn(URN,None)
class PythonWordCount(PTransform):
def expand(self,pcoll: PCollection[Tuple[str,str]]) -> PCollection[Tuple[str,str]]:
return (
pcoll |
'GetValues' >> Values() |
'Split' >> FlatMap(lambda x: re.findall(r'[A-Za-z\']+',x)).with_output_types(str) |
'PairWithOne' >> Map(lambda x: (x,1)) |
'GruoupAndSumWindow' >> WindowInto(FixedWindows(10)) |
'GroupAndSum' >> CombinePerKey(sum) |
'Format' >> Map(format_result)
)
def to_runner_api_parameter(self,unused_context):
return URN,None
@staticmethod
def from_runner_api_parameter(unused_ptransform,unused_paramter,unused_context):
return PythonWordCount()
我这样设置 Python 的扩展服务:
./expansion_service.py -p 9097
还有 Flink 的作业服务器:
# From Apache Beam's repository
./gradlew :runners:flink:1.13:job-server:runShadow
当我像这样运行 Java 管道时:
mvn exec:java -Dexec.mainClass=org.apache.beam.examples.CrossLanguagePipeline -Pportable-runner -Dexec.args="--runner=PortableRunner --jobEndpoint=localhost:8099 --useExternal=true --expansionServiceURL=localhost:9097 --experiments=beam_fn_api"
我得到以下堆栈跟踪:
java.lang.RuntimeException: expansion service error: Traceback (most recent call last):
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/portability/expansion_service.py",line 58,in Expand
producers = {
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/portability/expansion_service.py",line 59,in <dictcomp>
pcoll_id: (context.transforms.get_by_id(t_id),pcoll_tag)
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",line 114,in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pipeline.py",line 1350,in from_runner_api
result.outputs = {
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pipeline.py",line 1351,in <dictcomp>
None if tag == 'None' else tag: context.pcollections.get_by_id(id)
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/pvalue.py",line 207,in from_runner_api
element_type=context.element_type_from_coder_id(proto.coder_id),File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",line 267,in element_type_from_coder_id
self.coders[coder_id].to_type_hint())
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",line 163,in __getitem__
return self.get_by_id(id)
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py",in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py",line 364,in from_runner_api
[context.coders.get_by_id(c) for c in coder_proto.component_coder_ids],File "/home/ignacio/.local/share/pyenv/versions/beam-poc/lib/python3.8/site-packages/apache_beam/coders/coders.py",in <listcomp>
[context.coders.get_by_id(c) for c in coder_proto.component_coder_ids],line 361,in from_runner_api
parameter_type,constructor = cls._kNown_urns[coder_proto.spec.urn]
KeyError: 'beam:coders:javasdk:0.1'
at org.apache.beam.runners.core.construction.External$ExpandableTransform.expand (External.java:222)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:499)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:376)
at org.apache.beam.examples.CrossLanguageTransform.expand (CrossLanguageTransform.java:19)
at org.apache.beam.examples.CrossLanguageTransform.expand (CrossLanguageTransform.java:8)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:482)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:363)
at org.apache.beam.examples.CrossLanguagePipeline.main (CrossLanguagePipeline.java:49)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
我不知道为什么这在其他示例中不起作用。
类型似乎在 KafkaIO.read
和 PythonWordCount
expand
方法之间匹配。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。