如何解决链接函数导致 Flink 状态函数中的冲突
我正在尝试通过附加另一个函数并创建函数链来修改 Flink Stateful Function 提供的 DataStream API example。但是,我收到以下错误
线程“main”中的异常 java.lang.IllegalArgumentException: Hash
用户指定的 ID“Feedback_union_uid1”上的冲突。最有可能的
原因是一个非唯一的 ID。请检查通过指定的所有 ID
uid(String)
是独一无二的。在
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:178)
在
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:109)
在
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:165)
在
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113)
在
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
在
org.apache.flink.client.StreamGraphTranslator.translatetoJobGraph(StreamGraphTranslator.java:52)
在
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
在
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
在
org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:98)
在
org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:79)
在
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
在
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714)
在
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
在
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
在
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682)
在
org.apache.flink.statefun.examples.datastream.Example.main(Example.java:161)
这是修改后的例子:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License,Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,software
* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.examples.datastream;
import static org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder.requestReplyFunctionBuilder;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.message.RoutableMessage;
import org.apache.flink.statefun.flink.core.message.RoutableMessageBuilder;
import org.apache.flink.statefun.flink.datastream.StatefulFunctionDataStreamBuilder;
import org.apache.flink.statefun.flink.datastream.StatefulFunctionEgressstreams;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.state.PersistedValue;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.source.sourceFunction;
public class Example {
private static final FunctionType GREET = new FunctionType("example","greet");
private static final FunctionType GREET2 = new FunctionType("example2","greet2");
private static final FunctionType REMOTE_GREET = new FunctionType("example","remote-greet");
private static final FunctionType REMOTE_GREET2 = new FunctionType("example2","remote-greet2");
private static final EgressIdentifier<String> GREETINGS =
new EgressIdentifier<>("example","out",String.class);
private static final EgressIdentifier<String> GREETINGS2 =
new EgressIdentifier<>("example2","out2",String.class);
public static void main(String... args) throws Exception {
// -----------------------------------------------------------------------------------------
// obtain the stream execution env and create some data streams
// -----------------------------------------------------------------------------------------
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf = new Configuration();
conf.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY,"/tmp");
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,"/tmp");
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER,true);
//conf.setInteger(RestOptions.PORT,8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.getConfig().enableSysoutLogging();
env.setParallelism(1);
StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
// System.out.println("Plan 1 " + env.getExecutionPlan());
DataStream<RoutableMessage> names =
env.addSource(new NameSource())
.map(
name ->
RoutableMessageBuilder.builder()
.withTargetAddress(GREET,name)
.withMessageBody(name)
.build());// .uid("source step");
// System.out.println("Plan 2 " + env.getExecutionPlan());
// -----------------------------------------------------------------------------------------
// wire up stateful functions
// -----------------------------------------------------------------------------------------
StatefulFunctionEgressstreams out =
StatefulFunctionDataStreamBuilder.builder("example")
.withDataStreamAsIngress(names)
.withFunctionProvider(GREET,unused -> new MyFunction())
.withRequestReplyRemoteFunction(
requestReplyFunctionBuilder(
REMOTE_GREET,URI.create("http://localhost:5000/statefun"))
.withMaxRequestDuration(Duration.ofSeconds(15))
.withMaxnumBatchRequests(500))
.withEgressId(GREETINGS)
.withConfiguration(statefunConfig)
.build(env);
// -----------------------------------------------------------------------------------------
// obtain the outputs
// -----------------------------------------------------------------------------------------
DataStream<RoutableMessage> output = out.getDataStreamForEgressId(GREETINGS).map(
name ->
RoutableMessageBuilder.builder()
.withTargetAddress(GREET2,name)
.withMessageBody(name)
.build());//.uid("source step 2");
StatefulFunctionEgressstreams out2 =
StatefulFunctionDataStreamBuilder.builder("example2")
.withDataStreamAsIngress(output)
.withFunctionProvider(GREET2,unused -> new MyFunction2())
.withRequestReplyRemoteFunction(
requestReplyFunctionBuilder(
REMOTE_GREET2,URI.create("http://localhost:5001/statefun"))
.withMaxRequestDuration(Duration.ofSeconds(15))
.withMaxnumBatchRequests(500))
.withEgressId(GREETINGS2)
.withConfiguration(statefunConfig)
.build(env);
// System.out.println("Plan 3 " + env.getExecutionPlan());
DataStream<String> output2 = out2.getDataStreamForEgressId(GREETINGS2);
// -----------------------------------------------------------------------------------------
// the rest of the pipeline
// -----------------------------------------------------------------------------------------
output2
.map(
new RichMapFunction<String,String>() {
@Override
public String map(String value) {
System.out.println(value);
return "'" + value + "'";
}
})
.addSink(new PrintSinkFunction<>());
System.out.println("Plan 4 " + env.getExecutionPlan());
//System.out.print(env.getStreamGraph("Flink Streaming Job",false));
env.execute();
}
private static final class MyFunction implements StatefulFunction {
@Persisted
private final PersistedValue<Integer> seenCount = PersistedValue.of("seen",Integer.class);
@Override
public void invoke(Context context,Object input) {
int seen = seenCount.updateAndGet(MyFunction::increment);
System.out.print("MyFunction: " + input.toString());
context.send(GREETINGS,String.format("seen1: Hello %s at the %d-th time",input,seen));
}
private static int increment(@Nullable Integer n) {
return n == null ? 1 : n + 1;
}
}
private static final class MyFunction2 implements StatefulFunction {
@Persisted
private final PersistedValue<Integer> seenCount2 = PersistedValue.of("seen",Object input) {
int seen = seenCount2.updateAndGet(MyFunction2::increment);
System.out.print("MyFunction2: " + input.toString());
context.send(GREETINGS2,String.format("seen2: Hello %s at the %d-th time",seen));
}
private static int increment(@Nullable Integer n) {
return n == null ? 1 : n + 1;
}
}
private static final class NameSource implements SourceFunction<String> {
private static final long serialVersionUID = 1;
private volatile boolean canceled;
@Override
public void run(SourceContext<String> ctx) throws InterruptedException {
String[] names = {"Stephan","Igal","Gordon","Seth","Marta"};
ThreadLocalRandom random = ThreadLocalRandom.current();
while (true) {
int index = random.nextInt(names.length);
final String name = names[index];
synchronized (ctx.getCheckpointLock()) {
if (canceled) {
return;
}
ctx.collect(name);
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
canceled = true;
}
}
}
我意识到我只能通过 DataStream API 而不能通过 StateFun API 修改舞台的 uid——避免此类错误的最佳实践是什么?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。