主要介绍Flink中支持得Source和常用的Connector。
Flink 自身实现了多种 Source 和 Connector 方法,并且还提供了多种与第三方系统进行对接的 Connector。
预定义和自定义Source
在基础04中Flink常用的 DataSet 和 DataStream API中提到过几种Flink已经实现的新建DataStream方法。
基于文件:
我们在本地环境进行测试时可以方便地从本地文件读取数据:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// read text file from an HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.types(Integer.class, String.class, Double.class);
// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.includeFields("10010") // take the first and the fourth field
.types(String.class, Double.class);
// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.pojoType(Person.class, "name", "age", "zipcode");
...
可以直接在 ExecutionEnvironment 和 StreamExecutionEnvironment 类中找到 Flink 支持的读取本地文件的方法
**基于Collections**
我们也可以基于内存中的集合、对象等创建自己的 Source。一般用来进行本地调试或者验证。
```powershell
DataSet<String> text = env.fromElements(
"Flink Spark Storm",
"Flink Flink Flink",
"Spark Spark Spark",
"Storm Storm Storm"
);
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
基于socket
这也是我目前常用的本地调测线上代码逻辑常用的手段方式
通过监听 Socket 端口,在本地很方便地模拟一个实时计算环境。
StreamExecutionEnvironment 中提供了 socketTextStream 方法可以通过 host 和 port 从一个 Socket 中以文本的方式读取数据。
DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n");
linux
nc - lk 9000
自定义Source
通过实现 Flink 的SourceFunction 或者 ParallelSourceFunction 来实现单个或者多个并行度的 Source。
public class MyStreamingSource implements SourceFunction<Item> {
private boolean isRunning = true;
/**
* 重写run方法产生一个源源不断的数据发送源
* @param ctx
* @throws Exception
*/
public void run(SourceContext<Item> ctx) throws Exception {
while(isRunning){
Item item = generateItem();
ctx.collect(item);
//每秒产生一条数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
//随机产生一条商品数据
private Item generateItem(){
int i = new Random().nextInt(100);
ArrayList<String> list = new ArrayList();
list.add("HAT");
list.add("TIE");
list.add("SHOE");
Item item = new Item();
item.setName(list.get(new Random().nextInt(3)));
item.setId(i);
return item;
}
}
自带连接器
Flink 中支持了比较丰富的用来连接第三方的连接器,可以在官网中找到 Flink 支持的各种各样的连接器:(目前比较常用的)
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
Google PubSub (source/sink)
我们在使用这些连接器时通常需要引用相对应的 Jar 包依赖。而且一定要注意,对于某些连接器比如 Kafka 是有版本要求的,一定要去官方网站找到对应的依赖版本。
基于 Apache Bahir 发布
Flink 还会基于 Apache Bahir 来发布一些 Connector,比如我们常用的 Redis 等。
Apache Bahir 的代码最初是从 Apache Spark 项目中提取的,后作为一个独立的项目提供。Apache Bahir
通过提供多样化的流连接器(Streaming Connectors)和 sql 数据源扩展分析平台的覆盖面,最初只是为 Apache
Spark 提供拓展。目前也为 Apache Flink 提供,后续还可能为 Apache Beam 和更多平台提供拓展服务。
我们可以在 Bahir 的首页中找到目前支持的 Flink 连接器:
Flink streaming connector for ActiveMQ
Flink streaming connector for Akka
Flink streaming connector for Flume
Flink streaming connector for InfluxDB
Flink streaming connector for Kudu
Flink streaming connector for Redis
Flink streaming connector for Netty
ps:有些项目存在各种各样的问题,会对依赖引用有严格的限制,例如我们目前项目,救不太允许新增依赖去实现redis的连接器,所以会需要我们自己去实现相关的连接器,我会在随后的文章中表述相关实现方式,这里不再赘述。
基于异步 I/O 和可查询状态
异步 I/O 和可查询状态都是 Flink 提供的非常底层的与外部系统交互的方式。
其中异步 I/O 是为了解决 Flink 在实时计算中访问外部存储产生的延迟问题,如果我们按照传统的方式使用 MapFunction,那么所有对外部系统的访问都是同步进行的。在很多情况下,计算性能受制于外部系统的响应速度,长时间进行等待,会导致整体吞吐低下。
我们可以通过继承 RichAsyncFunction 来使用异步 I/O:
/**
* 实现 'AsyncFunction' 用于发送请求和设置回调
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** 能够利用回调函数并发发送请求的数据库客户端 */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// 发送异步请求,接收 future 结果
final Future<String> result = client.query(key);
// 设置客户端完成请求后要执行的回调函数
// 回调函数只是简单地把结果发给 future
CompletableFuture.supplyAsync(new supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// 显示地处理异常
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// 创建初始 DataStream
DataStream<String> stream = ...;
// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
代码简单描述:,ResultFuture 的 complete 方法是异步的,不需要等待返回。
Flink State : Flink 提供了 StateDesciptor 方法专门用来访问不同的 state,StateDesciptor 同时还可以通过 setQueryable 使状态变成可以查询状态。可查询状态目前是一个 Beta 功能,暂时不推荐使用。
rocketmq的连接jar
开源的连接 RocketMQ 的项目:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。