微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

并行的Source定义

上篇:Source-ocal模式引入webui

一、Source-单并行的Source

代码

package cn._51doit.flink.day01;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Source-单并行的Source
 */
public class FromCollectionDemo_01 {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);  //设置web ui的端口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        //有限的数据流(用fromCollection创建的并行度)
        DataStreamSource<Integer> nums = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

        int parallelism = nums.getParallelism();

        System.out.println("fromCollection创建的并行度的DataStreamSource为:"+parallelism);

        nums.print();

        env.execute();

    }
}


控制台打印输出

验证结果1

  • fromCollection创建的并行度的DataStream为1,是一个有限的数据流,执行程序就完全退出,通常用来做实验,程序执行完毕就退出

基于集合的Source

  • 基于集合的Source是将一个普通的Java集合、迭代器或者可变参数转换成一个分布式数据集DataStreamSource,它是DataStream的子类
  • 所以也可以使用DataStream类型来引用。得到DataStream后就可以调用Transformation或Sink度数据进行处理了

fromCollection方法

  • 一个非并行的Source,可以将一个Collection类型的数据作为参数传入到该方法中,返回一个DataStreamSource

验证结果2

  • fromElements创建的并行度的DataStream为1,也是一个有限的数据流,执行程序就完全退出,通常用来做实验,程序执行完毕就退出

fromElements方法

源码读解

fromElements方法进去看,底层调的是addSource方法,它的并行度是1

fromElementsFunction方法进去看,是实现了SourceFunction,它是用来读数据的

二、多并行的Source

(1)fromParallelCollection创建的Source是多并行的Source,并且是一个有限的数据流

代码演示:

package cn._51doit.flink.day01;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;

/**
 * Source-多并行的Source
 */
public class FromParCollection {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Long> nums = env.fromParallelCollection(new NumberSequenceIterator(1L, 20L),Long.class);

        int parallelism = nums.getParallelism();

        System.out.println("fromParallelCollection创建的DataStream的并行度为:"+parallelism);

        nums.print();

        env.execute();

    }
}

它是一个多个的并行度,而且是一个有限的数据流,运行得出:它的并行度为4,有限的数据流,如下控制台打印的列表

fromParallelCollection(SplittableIterator, Class) 方法

  • 一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数
  • 一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型

(2)generateSequence创建的Source也是多并行的Source,并且是一个有限的数据流

代码演示

package cn._51doit.flink.day01;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;

/**
 * Source-多并行的Source
 */
public class GenSeqDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Long> nums = env.generateSequence(1, 100);

        int parallelism = nums.getParallelism();


        System.out.println("generateSequence创建的DataStream的并行度为:"+parallelism);

        nums.print();

        env.execute();

    }
}

运行得出:它的并行度为4,有限的数据流,如下控制台打印的列表:

3> 3
3> 7
3> 11
3> 15
3> 19
3> 23
3> 27
3> 31
3> 35
3> 39
3> 43
3> 47
3> 51
3> 55
3> 59
2> 2
2> 6
2> 10
2> 14
2> 18
2> 22
2> 26
2> 30
2> 34
2> 38
2> 42
2> 46
2> 50
2> 54
2> 58
2> 62
2> 66
2> 70
2> 74
2> 78
2> 82
2> 86
2> 90
2> 94
2> 98
4> 4
1> 1
1> 5
1> 9
1> 13
1> 17
1> 21
1> 25
1> 29
1> 33
1> 37
1> 41
1> 45
1> 49
1> 53
1> 57
1> 61
1> 65
1> 69
1> 73
1> 77
1> 81
1> 85
1> 89
1> 93
1> 97
3> 63
3> 67
3> 71
3> 75
3> 79
3> 83
3> 87
3> 91
3> 95
3> 99
4> 8
4> 12
4> 16
4> 20
4> 24
4> 28
4> 32
4> 36
4> 40
4> 44
4> 48
4> 52
4> 56
4> 60
4> 64
4> 68
4> 72
4> 76
4> 80
4> 84
4> 88
4> 92
4> 96
4> 100

它是一个多个的并行度,而且是一个有限的数据流

generateSequence(long from, long to) 方法

  • 一个并行的Source(并行度也可以通过调用方法后,再调用setParallelism来设置)该方法需要传入两个long类型的参数
  • 一个是起始值,第二个是结束值,返回一个DataStreamSource

源码读解

fromParallelCollection方法点进去查看,返回fromParallelCollection方法点进去查看,底层调用也是addSource方法,还new FromSplittableIteratorFunction,它继承了RichParallelSourceFunction(抽象类),实现了ParallelSourceFunction方法

总结

单并行的Source:直接实现了SourceFunction接口

多并行的Source:可以洗出RichParallelSourceFunction或实现ParallelSourceFunction接口

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐