这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
通过使用Flink DataStream Connectors 数据流连接器连接到ElasticSearch搜索引擎的文档数据库Index,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
数据流输入
DataStreamSource.java
package com.flink.examples.kafka; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; /** * @Description 从Kafka中消费数据 */ public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度(使用几个cpu核心) env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //1.消费者客户端连接到kafka Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig, "192.168.110.35:9092"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONfig, 5000); props.put(ConsumerConfig.GROUP_ID_CONfig, "consumer-45"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig, "latest"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props); //setStartFromEarliest()会从最早的数据开始进行消费,忽略存储的offset信息 //consumer.setStartFromEarliest(); //Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略 //consumer.setStartFromTimestamp(1559801580000L); //Flink从topic中最新的数据开始消费 //consumer.setStartFromLatest(); //Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数 //consumer.setStartFromGroupOffsets(); //2.在算子中进行处理 DataStream<TUser> sourceStream = env.addSource(consumer) .filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value)) .map((MapFunction<String, TUser>) value -> { System.out.println("print:" + value); //注意,因已开启enableCheckpointing容错定期检查状态机制,当算子出现错误时, //会导致数据流恢复到最新checkpoint的状态,并从存储在checkpoint中的offset开始重新消费Kafka中的消息。 //因此会有可能导制数据重复消费,重复错误,陷入死循环。加上try|catch,捕获错误后再正确输出。 Gson gson = new Gson(); try { TUser user = gson.fromJson(value, TUser.class); return user; }catch(Exception e){ System.out.println("error:" + e.getMessage()); } return new TUser(); }) .returns(TUser.class); sourceStream.print(); //3.执行 env.execute("flink kafka source"); } }
数据流输出
DataStreamSink.java
package com.flink.examples.kafka; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; /** * @Description 将生产者数据写入到kafka */ public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //必需设置setParallelism并行度,否则不会输出 env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 检查点必须在一分钟内完成,或者被丢弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //1.连接kafka Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig, "192.168.110.35:9092"); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props); //2.创建数据,并写入数据到流中 TUser user = new TUser(); user.setId(8); user.setName("liu3"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(1598889600000L); DataStream<String> sourceStream = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value)); //3.将数据流输入到kafka sourceStream.addSink(producer); sourceStream.print(); env.execute("flink kafka sink"); } }
数据展示
以上是“Flink中Connectors如何连接Kafka”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程之家行业资讯频道!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。