如何解决java.lang.IncompatibleClassChangeError:Primitives 没有实现请求的接口 org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf
我已经为 WordCount 数据流作业编写了 Junit 测试用例 i,e WordCountTest,但是当我尝试在 intellij 中运行时,我遇到了以下错误。
我为数据流作业 IntelliJ 运行 Junit 的方式是,右键单击测试类(即 WordCountTest.java),然后单击“Run WordCountTest with coverage”。如果我做错了什么,请告诉我
完全错误
java.lang.IncompatibleClassChangeError: Class org.apache.beam.model.pipeline.v1.RunnerApi$StandardPTransforms$Primitives 没有实现请求的接口 org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf .ProtocolMessageEnum
at org.apache.beam.repackaged.direct_java.runners.core.construction.BeamUrns.getUrn(BeamUrns.java:27)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:129)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers.lambda$writeWithRunnerDeterminedSharding$1(PTransformMatchers.java:483)
at org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:277)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:575)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traversetopologically(Pipeline.java:468)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:267)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:217)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:398)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:334)
at com.somename.WordCountTest.testExtractWordsFn(WordCountTest.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at org.junit.runners.BlockJUnit4ClassRunner.runchild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runchild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runchildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
以下数据流作业和测试类信息:
WordCount.java
public class WordCount {
/**
* Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
* statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it to
* a ParDo in the pipeline.
*/
public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
public static class ExtractWordsFn extends DoFn<String,String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class,"emptyLines");
private final distribution lineLendist =
Metrics.distribution(ExtractWordsFn.class,"lineLendistro");
@ProcessElement
public void processElement(@Element String element,OutputReceiver<String> receiver) {
lineLendist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}
// Split the line into words.
String[] words = element.split(TOKENIZER_PATTERN,-1);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String,Long>,String> {
@Override
public String apply(KV<String,Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,* modular testing,and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>,PCollection<KV<String,Long>>> {
@Override
public PCollection<KV<String,Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String,Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
/**
* Options supported by {@link WordCount}.
*
* <p>Concept #4: Defining your own configuration options. Here,you can add your own arguments to
* be processed by the command-line parser,and specify default values for them. You can then
* access the options values in your pipeline code.
*
* <p>Inherits standard configuration options.
*/
public interface WordCountOptions extends PipelineOptions {
/**
* By default,this example reads from a public dataset containing the text of King Lear. Set
* this option to choose a different input file or glob.
*/
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
/** Set this required option to specify where to write the output. */
@Description("Path of the file to write to")
@required
String getoutput();
void setoutput(String value);
}
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform,and passes the
// static FormatAsTextFn() to the ParDo transform.
p.apply("ReadLines",TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts",TextIO.write().to(options.getoutput()));
p.run().waitUntilFinish();
}
public static void main(String[] args) {
WordCountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
runWordCount(options);
}
}
WordCountTest.java
/** Tests of WordCount. */
@RunWith(JUnit4.class)
public class WordCountTest {
/** Example test that tests a specific {@link DoFn}. */
@Test
public void testExtractWordsFn() throws Exception {
List<String> words = Arrays.asList(" some input words "," "," cool "," foo"," bar");
PCollection<String> output =
p.apply(Create.of(words).withCoder(StringUtf8Coder.of()))
.apply(ParDo.of(new ExtractWordsFn()));
PAssert.that(output).containsInAnyOrder("some","input","words","cool","foo","bar");
//p.run().waitUntilFinish();
p.run();
}
static final String[] WORDS_ARRAY =
new String[] {
"hi there","hi","hi sue bob","hi sue","","bob hi"
};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
static final String[] COUNTS_ARRAY = new String[] {"hi: 5","there: 1","sue: 2","bob: 2"};
@Rule public TestPipeline p = TestPipeline.create();
/** Example test that tests a PTransform by using an in-memory input and inspecting the output. */
@Test
@Category(ValidatesRunner.class)
public void testCountWords() throws Exception {
PCollection<String> input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output =
input.apply(new CountWords()).apply(MapElements.via(new FormatAsTextFn()));
PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
p.run().waitUntilFinish();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。