目录
环境
flink 1.11版本
Mac系统
下载相关jar包
flink sql读取kafka需要下载相关的kafka依赖包,放到本地的lib目录下,选择下面这个:
cd /usr/local/Cellar/apache-flink/1.11.2/libexec/lib
cp ~/Downloads/flink-sql-connector-kafka_2.11-1.11.2.jar .
启动flink集群与flink sql
启动flink集群
/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh
启动flink sql
/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/sql-client.sh embedded
使用tableau模式
SET execution.result-mode=tableau;
创建数据库并选择
create database if not exists river_test;
use river_test;
创建flink sql表结构
CREATE TABLE kafkaTable13 (
item_id BIGINT,
source_type BIGINT,
title STRING,
white_image STRING,
coupon_name STRING,
schema STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
查询结果
select * from kafkaTable13;
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。