上篇:Flink第一个入门程序
在flink种,一个job必须有Source(Data Source)、Sink(Data sink),但是可以没有Transformations
在实时计算DataStream API中,Source用来做什么?
Source是用来获取外部数据源的操作,按照获取数据的方式,可以分为5种:
1. 基于集合的Source
Flink已经封装好的方法,这些Source只要调用StreamExecutionEnvironment的对应方法就可以创建DataStream了,使用起来比较简单
2. 基于Socket网络端口的Source
Flink已经封装好的方法,这些Source只要调用StreamExecutionEnvironment的对应方法就可以创建DataStream了,使用起来比较简单
3. 基于文件的Source
Flink已经封装好的方法,这些Source只要调用StreamExecutionEnvironment的对应方法就可以创建DataStream了,使用起来比较简单
4. 第三方Connector Source
适用场景:生产环境想要从一些分布式、高可用的消息中间件中读取数据
举例:比如Apache Kafka Source、AWS Kinesis Source、Google Cloud PubSub Source等
5. 自定义Source
思路:
(1)首先,需要实现SourceFunction这个接口
(2)然后将该实现类的实例作为参数传入到StreamExecutionEnvironment的addSource方法就可以了
用途:提高了Flink与外部数据源交互的灵活性
说明:
前三种Source是Flink已经封装好的方法
这些Source只要调用StreamExecutionEnvironment的对应方法就可以创建DataStream了,使用起来比较简单
从并行度的角度,如何理解Source?
其实,Source又可以分为:非并行的Source和并行的Source
非并行的Source:
(1)它的并行度只能为1,即用来读取外部数据源的Source只有一个实例
(2)在读取大量数据时效率比较低,通常是用来做一些实验或测试
例如Flink的Socket网络端口读取数据的Source就是一个非并行的Source
并行的Source:
它的并行度可以是1到多个,即用来读取外部数据源的Source可以有一个到多个实例,在分布式计算中,并行度是影响吞吐量一个非常重要的因素,在计算资源足够的前提下,并行度越大,效率越高
例如Kafka Source就是并行的Source
代码演示
1、查看并行度
package cn._51doit.flink.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 并行度
* 读端口号的数据只能一个人连接
*/
public class SocketSink {
public static void main(String[] args) throws Exception {
//local模式默认的并行度是当前节点逻辑核的数量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("Master", 9999);
//DataStream的并行度
int parallelism = lines.getParallelism();
System.out.println("SocketSink的并行度是:"+parallelism);
//对flatmap操作,进行从1行对多行操作
SingleOutputStreamOperator<String> wordAndOne = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(word);
}
}
});
int parallelism1 = wordAndOne.getParallelism();
System.out.println("调用玩flatmap后DataStream的并行度:"+parallelism1);
wordAndOne.print();
env.execute();
}
}
在flink中,在:http://master:8081/#/job/c023d4307839bd1a750bdaae37bc1903/overview查看
Source:Socket Streaming Parallelism:1
由此可见,Source:Socket Streaming Parallelism的并行度永远是1,无论设置多少个参数,它的并行度都是1 ,调用完flatmap后DataStream的并行度:3
强调:Source:Socket Streaming Parallelism的并行度永远是1,无论设置多少个参数,它的并行度都是1
用第二种方式查看并行度
package cn._51doit.flink.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SocketSink01 {
public static void main(String[] args) throws Exception {
//local模式默认的并行度是当前节点逻辑核的数量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataStream的并行度
int parallelism01 = env.getParallelism();
System.out.println("执行环境默认的并行度是:"+parallelism01);
DataStreamSource<String> lines = env.socketTextStream("Master", 9999);
int parallelism = lines.getParallelism();
System.out.println("SocketSink的并行度:"+parallelism);
//对flatmap操作,进行从1行对多行操作
SingleOutputStreamOperator<String> wordAndOne = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(word);
}
}
});
int parallelism1 = wordAndOne.getParallelism();
System.out.println("调用玩flatmap后DataStream的并行度:"+parallelism1);
wordAndOne.print();
env.execute();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。