一、安装部署Flink 1.12
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。1.准备tar包
flink-1.13.1-bin-scala_2.12.tgz2.解压
tar -zxvf flink-1.13.1-bin-scala_2.12.tgz3.添加Hadoop依赖jar包,放在flink的lib目录下
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.7.5-10.04.启动HDFS集群
hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode5.启动flink本地集群
/flink/bin/start-cluster.sh 可看到两个进程:TaskManagerRunner、StandalonesessionClusterEntrypoint 停止命令 /flink/bin/stop-cluster.sh6.Flink Web UI
http://localhost:8081/#/overview7.执行官方示例
读取文本文件数据,进行词频统计WordCount,将结果打印控制台 /flink/bin/flink run /fline/examples/batch/WordCount.jar二、Flink集成Hudi时,本质将集成jar包:hudi-flink-bundle_2.12-0.10.1.jar,放入Flink应用CLAsspATH下即可。
Flink sql Connector支持Hudi作为Source和Sink时,两种方式将jar包放入CLAsspATH路径: 方式一:运行Flink sql Client命令时,通过参数【-j xx.jar】指定jar包 flink/bin/sql-client.sh embedded -j …./hudi-flink-bundle_2.12-0.10.1.jar 方式二:将jar包直接放入Flink软件安装包lib目录下【$FLINK_HOME/lib】 修改conf/flink-conf.yaml taskmanager.numberOfTaskSlots: 4 works四个localhost 由于Flink需要连接HDFS文件系统,所以需要设置HADOOP_CLAsspATH环境变量,再启动集群三、启动Flink sql Cli命令行
sql-client.sh embedded shell 设置分析结果展示模式为:set execution.result-mode=tableau;四、使用
1.创建表:test_flink_hudi_mor,数据存储到hudi表中,底层HDFS存储,表类型MOR
CREATE TABLE test_flink_hudi_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor', 'write.tasks' = '1', 'compaction.tasks' = '1', 'table.type' = 'MERGE_ON_READ' );connector:表连接器 path:数据存储路径 write.tasks:flink往hudi写数据时,task数量 compaction.tasks:往hudi写数据时,做合并的task数量 table.type:hudi表类型
Flink sql> desc test_flink_hudi_mor; > +-----------+--------------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-----------+--------------+------+-----+--------+-----------+ | uuid | VARCHAR(20) | true | | | | | name | VARCHAR(10) | true | | | | | age | INT | true | | | | | ts | TIMESTAMP(3) | true | | | | | partition | VARCHAR(20) | true | | | | +-----------+--------------+------+-----+--------+-----------+ 5 rows in set
2.插入数据
INSERT INTO test_flink_hudi_mor VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1' ); INSERT INTO test_flink_hudi_mor VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');重复insert,会更新,id1的值由 VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1’ ) 改为 ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1’) 因为是MOR表,先入log,还未合并成parquet文件,如下图:
四、Streaming query
1.创建表:test_flink_hudi_mor_2, 以流的方式查询读取,映射到前面表test_flink_hudi_mor
CREATE TABLE test_flink_hudi_mor_2( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor', 'table.type' = 'MERGE_ON_READ', 'read.tasks' = '1', 'read.streaming.enabled' = 'true', 'read.streaming.start-commit' = '20220307211200', 'read.streaming.check-interval' = '4' );read.streaming.enabled设置为true,表名通过streaming的方式读取表数据 read.streaming.check-interval指定了source监控新的commits的间隔为4s table.type设置表类型为MERGE_ON_READ
2.重新开启terminal启动flink sql CLI,重新创建表:test_flink_hudi_mor,采用批batch模式插入一条数据
CREATE TABLE test_flink_hudi_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor', 'write.tasks' = '1', 'compaction.tasks' = '1', 'table.type' = 'MERGE_ON_READ' ); INSERT INTO test_flink_hudi_mor VALUES ('id9','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par5' ); INSERT INTO test_flink_hudi_mor VALUES ('id10','DX',28, TIMESTAMP '1994-06-02 00:00:01', 'par5' );
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。