微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink——Source

source 是flink的数据源,简单介绍四种读取数据的方式:

  1.从集合中读取

  2.从文件中读取

  3.从kafka中读取

  4.自定义Source

 1 package com.jy.bjz.source;
 2 
 3 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 4 import org.apache.flink.streaming.api.datastream.DataStream;
 5 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 6 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 7 import org.apache.flink.streaming.api.functions.source.sourceFunction;
 8 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 9 
10 import java.util.Arrays;
11 import java.util.Properties;
12 
13 /**
14  * Todo
15  *
16  * @author baojiazhong
17  * @since 2021/9/9 10:48
18  */
19 public class SourceTest {
20     public static void main(String[] args) {
21         // 创建流式执行环境
22         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
23         // 读取 集合数据
24         DataStream<Integer> collectionDataStream = env.fromCollection(Arrays.asList(1, 2, 3, 4));
25         // 读取 文本数据 从路径中读取文件
26         String path = "C:\\Users\\15868\\Desktop\\TL\\flink\\learn\\src\\main\\resources\\hello.txt";
27         DataStream<String> textDataStream = env.readTextFile(path);
28         // 读取 kafka数据
29         // 配置kafka
30         Properties properties = new Properties();
31         properties.setProperty("bootstrap.servers", "localhost:9092");
32         properties.setProperty("group.id", "consumer-group");
33         properties.setProperty("key.deserializer",
34                 "org.apacje.kafka.common.serialization.StringDeserializer");
35         properties.setProperty("value.deserializer",
36                 "org.apache.kafka.common.serialization.StringDeserializer");
37         properties.setProperty("auto.offset.reset", "latest");
38         DataStream<String> test = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));
39         // 自定义Source , 实现 SourceFunction<T> 方法
40         DataStreamSource<String> mySoucrceDataStream = env.addSource(new MySource());
41     }
42 
43     public static class MySource implements SourceFunction<String> {
44 
45         public void run(SourceContext<String> ctx) throws Exception {
46             // 读取数据
47             ctx.collect("");
48         }
49 
50         public void cancel() {
51             //取消接收的条件
52         }
53     }
54 }

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐