微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink知识点总结未完待续

1、flink编程模型

1)抽象数据集

DataStream(实时):使用StreamExecutionEnvironment创建DataStream

DataSet(离线): 使用ExecutionEnvironment创建DataSet

2)编程模型

Source:从指定的数据源创建最原始的抽象数据集

Transformation:对数据集进行转换操作,返回一个新的数据集

Sink:将数据输出到指定的存储系统中

注意:job中必须有source和sink,Transformation可有可无

2、source

1)单并行的source

无论指定多少个并行度,该source对应的DataStream并行度只有1,即产生数据的subTask实例就一个,例如socketTextStream方法创建的DataStream

2)多并行的source

可以根据提交任务时指定程序的并行度,DataStreamSource可以有多个并行,即有多个读取数据的subTask实例,FlinkKafkaConsumer

3)自定义source

3.1)单并行:实现sourceFunction接口

3.2)多并行:ParallelSourceFunction接口,可以继承RichParalleSourceFunction抽象类(必须要实现run方法和cancel方法,可以选择性实现open方法和close方法)

[方法执行顺序:open(创建连接)->Run(while循环)->cancel->close(关闭连接,释放资源)]

3.3)调用addSource,将自定义的sourceFunction的实现类的实例传入到该方法

4)flinkKafkaConsumer:

4.1)多并行的source,里面使用operator记录偏移量,开启checkpointing可以容错,

flinkKafkaConsumer对应的DataStreamSource的并行度的数量为提交任务时指定的-p,本地模式为当前机器逻辑核数量

4.2)如果DataStreamSource的并行度大于topic分区的数量,会有部分source对应的subtask是空跑,读取不到数据

4.3)如果DataStreamSource的并行度小于topic分区的数量,会有部分source对应的subtask可以读取多个分区的数量

4.4)最好是并行度和topic的分区的数量保持一致

3、 FlinkSink

sink是多并行,它的并行度与执行环境的并行度保持一致

调用完sink之后,可以再调用setparallelism设置sink的并行度

1、自定义sink 调用addSink方法,将实现sinkFunction接口或继承RichSinkFunction抽象类的实现传入到该方法中,必须重写一个invoke方法,在该方法中实现将数据写入到外部的存储系统中

自定义MyPrintSink:打印在控制台前面的编号是subtask的Index+1

可以通过getRuntimeContext().getIndexOfThisSubtask获取当前subtask的index

2、 RedisSink:结合Kafka,checkpointing可以实现数据的一致性(AtLeastOnce),因为RedisSink可以覆盖原来的数据

3 、FlinkKafkaProducer:结合Kafka,checkpointing可以实现数据的一致性(ExactlyOnce)

3.1)FlinkKafkaProducer支持事务:消费者在读取数据时,要指定事务隔离级别,只读取成功提交事务的数据(isolation.level=read_committed)

3.2)FlinkKafkaProducer继承了TwoParhaseCommitSinkFunction,实现了CheckpointingFunction和checkpointListener,可以将数据保存到operatorState中,checkpoint成功后再提交事务

4、FlinkTransformation

1)map算子

对DataStream中的数据依次取出来进行处理(做映射)

底层调用是transform方法,传入operator名称(Map),返回数据类型和StreamMap实例并将自定义的计算逻辑传入到StreamMap

StreamMap类实现了OnceinputStreamOperator接口,必须重写processElement方法,数据是封装到StreamRecord,使用Output将处理完的数据输出

StreamMap还实现了AbstractUdfStreamOperator接口,用来约束该类传入的计算逻辑的类型,(接口的类型),只能传入mapfunction类型

2) fliter算子

对数据过滤,保留计算逻辑返回为true的数据

底层调用是transform方法,传入operator名称(fliter),返回数据类型和StreamFliter实例并将自定义的过滤逻辑传入到StreamFliter

StreamMap类实现了OnceinputStreamOperator接口,必须重写processElement方法,应用过滤逻辑,返回true就使用output输出

StreamMap还实现了AbstractUdfStreamOperator接口,用来约束该类传入的计算逻辑的类型,(接口的类型)只能传入fliterFunction类型

3)FlatMap算子

对数据进行扁平化映射,可以输出0到多条数据,输出数据用collect方法

底层调用是transform方法,传入operator名称(FlatMap),返回数据类型和StreamFlatMap实例并将自定义的过滤逻辑传入到StreamFlatMap中

StreamMap类实现了OnceinputStreamOperator接口,必须重写processElement方法,应用计算逻辑,如果一条返回多条应用collect结合for循环将数据输出

StreamMap还实现了AbstractUdfStreamOperator接口,用来约束该类传入的计算逻辑的类型,(接口的类型)只能传入FlatMapFunction类型

4)KeyBy

按照key的hash对数据进行分区,可以保证key相同的一定进入到一个分区内,但是一个分区内可以有多个分区的数据

是对数据进行实时的分区,不是上有发送给下游,而是将数据写入到对应的charnnrl的缓存中,下游到上游实时的拉取

KeyBy底层是newKeyedStream ,然后将父DataStream包起来,并且传入KyeBy的条件

最终会调用keyGroupStreamPartitioner的selectChannel的方法,将keyBy的条件传入到该方法

步骤:

1.先计算key的HashCode值

2.将key的HashCode值进行特殊的hash处理:MathUtils.murmurHash(keyHash),避免hashcode返回的数字为负

3.将返回特殊hash值模 / 最大并行度(128认)得到keyGroupId

keyGroupId*parallelism(此程序的并行度) / maxParallelism(认最大并行度),返回分区编号

优点:可以将数据尽量均匀分配到多个分区并且key的hashcode为负

注:

1.如果将自定义POJO当成key必须重写hashcode方法

2.不能将数组当成keyBy的key

5)reduce

将keyedStream数据进行聚合

传入reduceFunction,输入和输出的类型保持一致

如果这个key的数据是第一次出现,不会调用自定义的reduce方法

底层调用的是StreamGroupReduceOperator的processElement方法,将初始值或累加的中间结果以valueState方式保存起来,通过多态的方式调用自定义的reduce方法,将reduce方法的返回值再更新状态到valueState中,最后使用ouput将数据输出

6)sum算子

对KeyedStream的数据进行聚合

底层调用的是aggregate方法,传入SumAggregator然后再调用reduce方法,在reduce方法中会根据数据的类型,调用具体的相加方法 eg : intSum,LongSum…

7)min max

只会返回keyBy的字段和最小值、最大值,如果还有其他字段返回的是第一次出现的值

底层也是调用reduce方法

8)minBy maxBy

不仅返回KeyBy的字段还会返回最小值、最大值,如果有多个字段还会返回最小值、最大值所在数据的全部字段

底层也是调用reduce方法

9)union算子

将多个类型一样的DateStream合并到一起,使用统一的方式进行处理,可以union一到多个DataStream,如果自己union自己是将数据double

10)connect算子

可以将两个不同类型的数据包装到一起,分别调用两个方法对两个数据流中的数据进行操作,可以让两个数据流共享状态

11)iterate

用来做迭代计算,类似一个分布式for循环

可以指定一个更新模型,对输入的数据进行运算;可以指定两个判断条件:继续迭代的条件和退出迭代的条件(输出数据的条件)

12)startNewChaining

从这个算子开始开启一个新链

13)disableChaining

将该算子的前面和后面的链都断开

14)slotSharingGroup

设置槽的名称,有就近跟随原则,如果前面的task被打上共享资源槽名称,后面的也跟随

可以将一些task调度到指定的槽内,比如计算密集型的

15)物理分区

Hash (keyBy)

随机(shuffle)

轮循 (rebalance)

一个TaskManager中轮循rescaling

广播 (broadcast)

自定义 (partitionCustom)

16)、project (投影)

功能类似于map ,选择出需求需要输出的数据

只能针对于tuple类型的数据

5、window

1)根据并行度分类

1.1)keyedWindow 可以1到多个并行

1.2) NokeyedWindow 并行度只有1

2)GlobalWindow

countwindow :只有countTrigger

3)TimeWindow

按照时间类型划分:processingTimeWindow和eventTimeWindow

按照划分的方式:滚动窗口、滑动窗口、会话窗口

对于自己最近学习的一个阶段性总结,如有错误欢迎纠正,后续应该会再补充!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐