微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink sql读取kafka-入门级

目录

环境

flink 1.11版本
Mac系统

下载相关jar包

flink sql读取kafka需要下载相关的kafka依赖包,放到本地的lib目录下,选择下面这个:

在这里插入图片描述

cd /usr/local/Cellar/apache-flink/1.11.2/libexec/lib
cp ~/Downloads/flink-sql-connector-kafka_2.11-1.11.2.jar .

启动flink集群与flink sql

启动flink集群

/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh

启动flink sql

/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/sql-client.sh embedded

使用tableau模式

SET execution.result-mode=tableau;

创建数据库并选择

create database if not exists river_test;
use river_test;

创建flink sql表结构

CREATE TABLE kafkaTable13 (
 item_id BIGINT,
 source_type BIGINT,
 title STRING,
 white_image STRING,
 coupon_name STRING,
 schema STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'test',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
);

查询结果

select * from kafkaTable13;

在这里插入图片描述

通过相关flink UI界面,查看任务:

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐