链接函数导致 Flink 状态函数中的冲突

如何解决链接函数导致 Flink 状态函数中的冲突

我正在尝试通过附加另一个函数并创建函数链来修改 Flink Stateful Function 提供的 DataStream API example。但是,我收到以下错误

线程“main”中的异常 java.lang.IllegalArgumentException: Hash 用户指定的 ID“Feedback_union_uid1”上的冲突。最有可能的 原因是一个非唯一的 ID。请检查通过指定的所有 ID uid(String) 是独一无二的。在 org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:178) 在 org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:109) 在 org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:165) 在 org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113) 在 org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850) 在 org.apache.flink.client.StreamGraphTranslator.translatetoJobGraph(StreamGraphTranslator.java:52) 在 org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) 在 org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55) 在 org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:98) 在 org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:79) 在 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818) 在 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714) 在 org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) 在 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700) 在 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682) 在 org.apache.flink.statefun.examples.datastream.Example.main(Example.java:161)

这是修改后的例子:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License,Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,software
 * distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.statefun.examples.datastream;

import static org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder.requestReplyFunctionBuilder;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.message.RoutableMessage;
import org.apache.flink.statefun.flink.core.message.RoutableMessageBuilder;
import org.apache.flink.statefun.flink.datastream.StatefulFunctionDataStreamBuilder;
import org.apache.flink.statefun.flink.datastream.StatefulFunctionEgressstreams;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.state.PersistedValue;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.source.sourceFunction;

public class Example {

  private static final FunctionType GREET = new FunctionType("example","greet");
  private static final FunctionType GREET2 = new FunctionType("example2","greet2");
  private static final FunctionType REMOTE_GREET = new FunctionType("example","remote-greet");
  private static final FunctionType REMOTE_GREET2 = new FunctionType("example2","remote-greet2");
  private static final EgressIdentifier<String> GREETINGS =
          new EgressIdentifier<>("example","out",String.class);
  private static final EgressIdentifier<String> GREETINGS2 =
          new EgressIdentifier<>("example2","out2",String.class);

  public static void main(String... args) throws Exception {

    // -----------------------------------------------------------------------------------------
    // obtain the stream execution env and create some data streams
    // -----------------------------------------------------------------------------------------

//    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    Configuration conf = new Configuration();
    conf.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY,"/tmp");
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,"/tmp");
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER,true);
    //conf.setInteger(RestOptions.PORT,8050);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

    env.getConfig().enableSysoutLogging();
    env.setParallelism(1);
    StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
    statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);

//    System.out.println("Plan 1 " + env.getExecutionPlan());
    DataStream<RoutableMessage> names =
        env.addSource(new NameSource())
            .map(
                name ->
                    RoutableMessageBuilder.builder()
                        .withTargetAddress(GREET,name)
                        .withMessageBody(name)
                        .build());// .uid("source step");

//    System.out.println("Plan 2 " + env.getExecutionPlan());

    // -----------------------------------------------------------------------------------------
    // wire up stateful functions
    // -----------------------------------------------------------------------------------------

    StatefulFunctionEgressstreams out =
        StatefulFunctionDataStreamBuilder.builder("example")
            .withDataStreamAsIngress(names)
            .withFunctionProvider(GREET,unused -> new MyFunction())
            .withRequestReplyRemoteFunction(
                requestReplyFunctionBuilder(
                        REMOTE_GREET,URI.create("http://localhost:5000/statefun"))
                    .withMaxRequestDuration(Duration.ofSeconds(15))
                    .withMaxnumBatchRequests(500))
            .withEgressId(GREETINGS)
            .withConfiguration(statefunConfig)
            .build(env);


    // -----------------------------------------------------------------------------------------
    // obtain the outputs
    // -----------------------------------------------------------------------------------------

    DataStream<RoutableMessage> output = out.getDataStreamForEgressId(GREETINGS).map(
            name ->
                    RoutableMessageBuilder.builder()
                            .withTargetAddress(GREET2,name)
                            .withMessageBody(name)
                            .build());//.uid("source step 2");

    StatefulFunctionEgressstreams out2 =
            StatefulFunctionDataStreamBuilder.builder("example2")
                    .withDataStreamAsIngress(output)
                    .withFunctionProvider(GREET2,unused -> new MyFunction2())
                    .withRequestReplyRemoteFunction(
                            requestReplyFunctionBuilder(
                                    REMOTE_GREET2,URI.create("http://localhost:5001/statefun"))
                                    .withMaxRequestDuration(Duration.ofSeconds(15))
                                    .withMaxnumBatchRequests(500))
                    .withEgressId(GREETINGS2)
                    .withConfiguration(statefunConfig)
                    .build(env);

//    System.out.println("Plan 3 " + env.getExecutionPlan());
    DataStream<String> output2 = out2.getDataStreamForEgressId(GREETINGS2);

    // -----------------------------------------------------------------------------------------
    // the rest of the pipeline
    // -----------------------------------------------------------------------------------------

    output2
        .map(
            new RichMapFunction<String,String>() {
              @Override
              public String map(String value) {
                System.out.println(value);
                return "'" + value + "'";
              }
            })
        .addSink(new PrintSinkFunction<>());

    System.out.println("Plan 4 " + env.getExecutionPlan());
    //System.out.print(env.getStreamGraph("Flink Streaming Job",false));
    env.execute();
  }

  private static final class MyFunction implements StatefulFunction {

    @Persisted
    private final PersistedValue<Integer> seenCount = PersistedValue.of("seen",Integer.class);

    @Override
    public void invoke(Context context,Object input) {
      int seen = seenCount.updateAndGet(MyFunction::increment);
      System.out.print("MyFunction: " + input.toString());
      context.send(GREETINGS,String.format("seen1: Hello %s at the %d-th time",input,seen));
    }

    private static int increment(@Nullable Integer n) {
      return n == null ? 1 : n + 1;
    }
  }

  private static final class MyFunction2 implements StatefulFunction {

    @Persisted
    private final PersistedValue<Integer> seenCount2 = PersistedValue.of("seen",Object input) {
      int seen = seenCount2.updateAndGet(MyFunction2::increment);
      System.out.print("MyFunction2: " + input.toString());
      context.send(GREETINGS2,String.format("seen2: Hello %s at the %d-th time",seen));
    }

    private static int increment(@Nullable Integer n) {
      return n == null ? 1 : n + 1;
    }
  }

  private static final class NameSource implements SourceFunction<String> {

    private static final long serialVersionUID = 1;

    private volatile boolean canceled;

    @Override
    public void run(SourceContext<String> ctx) throws InterruptedException {
      String[] names = {"Stephan","Igal","Gordon","Seth","Marta"};
      ThreadLocalRandom random = ThreadLocalRandom.current();
      while (true) {
        int index = random.nextInt(names.length);
        final String name = names[index];
        synchronized (ctx.getCheckpointLock()) {
          if (canceled) {
            return;
          }
          ctx.collect(name);
        }
        Thread.sleep(1000);
      }
    }

    @Override
    public void cancel() {
      canceled = true;
    }
  }
}

我意识到我只能通过 DataStream API 而不能通过 StateFun API 修改舞台的 uid——避免此类错误的最佳实践是什么?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?