微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Java-Flink-多个源的集成测试

我有一个Flink作业,正在使用此处描述的方法进行集成测试:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing

作业从两个来源获取输入,这两个来源合并在CoFlatMapFuntion中.在测试环境中,我当前正在使用两个简单的SourceFunction来发出值,但是这不能对事件的发出顺序进行任何控制.为了正确测试作业的功能,这是必需的.

如何修改测试以确保一个函数在第二个源函数之前发出所有数据?

我已经看到了Integration test for complex topology (multiple inputs) in Flink中建议的方法,这对于单元测试很好,但是我正在寻找一种解决方案,允许我对整个工作进行集成测试.

最佳答案
我建议将控制代码添加到您的两个SourceFunction中,并使用MiniClusterWithClientResource.它可能看起来如下所示:

public class JobITCase {

    private static final int NUM_TMS = 2;
    private static final int NUM_SLOTS = 2;
    private static final int ParaLLELISM = NUM_SLOTS * NUM_TMS;

    @ClassRule
    public final static MiniClusterWithClientResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(NUM_SLOTS)
                .setNumberTaskManagers(NUM_TMS)
                .build());

    @Test
    public void testJob() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(ParaLLELISM);

        final MyControllableSourceFunction source1 = new MyControllableSourceFunction("source1");
        final MyControllableSourceFunction source2 = new MyControllableSourceFunction("source2");

        final DataStreamSource<Integer> input1 = env.addSource(source1);
        final DataStreamSource<Integer> input2 = env.addSource(source2);

        input1.connect(input2).map(new CoMapFunction<Integer,Integer,Integer>() {
            @Override
            public Integer map1(Integer integer) {
                System.out.println("Input 1: " + integer);
                return integer;
            }

            @Override
            public Integer map2(Integer integer) {
                System.out.println("Input 2: " + integer);
                return integer;
            }
        }).print();

        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();

        MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().submitJob(jobGraph).get();

        final CompletableFuture<JobResult> jobResultFuture = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().requestJobResult(jobGraph.getJobID());

        final ArrayList<CompletableFuture<Void>> finishedFutures = new ArrayList<>(ParaLLELISM);

        for (int i = 0; i < ParaLLELISM; i++) {
            MyControllableSourceFunction.startExecution(source1,i);
            finishedFutures.add(MyControllableSourceFunction.getFinishedFuture(source1,i));
        }

        FutureUtils.waitForAll(finishedFutures).join();

        for (int i = 0; i < ParaLLELISM; i++) {
            MyControllableSourceFunction.startExecution(source2,i);
        }

        jobResultFuture.join();
    }

    private static class MyControllableSourceFunction extends RichParallelSourceFunction<Integer> {

        private static final ConcurrentMap<String,CountDownLatch> startLatches = new ConcurrentHashMap<>();
        private static final ConcurrentMap<String,CompletableFuture<Void>> finishedFutures = new ConcurrentHashMap<>();

        private final String name;

        private boolean running = true;

        private MyControllableSourceFunction(String name) {
            this.name = name;
        }

        @Override
        public void run(SourceContext<Integer> sourceContext) throws Exception {
            final int index = getRuntimeContext().getIndexOfThisSubtask();

            final CountDownLatch startLatch = startLatches.computeIfAbsent(getId(index),ignored -> new CountDownLatch(1));
            final CompletableFuture<Void> finishedFuture = finishedFutures.computeIfAbsent(getId(index),ignored -> new CompletableFuture<>());

            startLatch.await();
            int counter = 0;

            while (running && counter < 10) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(counter++);
                }
            }

            finishedFuture.complete(null);
        }

        @Override
        public void cancel() {
            running = false;
        }

        private String getId(int index) {
            return name + '_' + index;
        }

        static void startExecution(MyControllableSourceFunction source,int index) {
            final CountDownLatch startLatch = startLatches.computeIfAbsent(source.getId(index),ignored -> new CountDownLatch(1));
            startLatch.countDown();
        }

        static CompletableFuture<Void> getFinishedFuture(MyControllableSourceFunction source,int index) {
            return finishedFutures.computeIfAbsent(source.getId(index),ignored -> new CompletableFuture<>());
        }
    }
}

原文地址:https://www.jb51.cc/java/532864.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐