微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink对接kafka

启动kafka和flink

1、进入zookeeper的bin目录下启动zookeeper

./zkServer.sh start

2、进入kafka的bin目录下启动kafka

/kafka-server-start.sh -daemon /opt/module/kafka-0.11/config/server.properties

3、进入flink的bin目录下启动flink

./start-cluster.sh 

kafka启动生产者

kafka主题为sensor

./bin/kafka-console-producer.sh --broker-list 192.168.158.202:90992 --topic sensor

添加pom依赖

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

执行

Java代码如下

package com.test.apitest.souceTest;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;

public class SourceTest02_kafka {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // kafka配置项
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","192.168.153.202:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");



        // 从kafka中读取数据
        DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));

        sensor.print();
        //执行任务
        env.execute();
    }
}

kafka生产数据

 

 flink消费数据

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐