最近在学flink用sql语句写任务,postgres到MysqL总是报错,中间遇到了很多坑,网上查得写的都不详细,自己做完总结一下。
1.flink版本用的是1.12版本,高版本sql语法有变化,我使用的是1.12版本。
2.注意flink版本与驱动包版本相对应,高版本驱动跟flink版本不兼容导致任务启动失败。
3.postgres数据库和MysqL数据库都是用docker搭建的,搭建postgres数据库参考:docker部署postgres数据库_今朝花落悲颜色的博客-CSDN博客
搭建好了一定要修改postgres.conf配置文件,在挂载目录/docker/postgresql/data/下面找到postgres.conf,修改wal_level=logical,然后重启postgres。
CREATE TABLE `sync_test_1` (
`total_gmv` bigint(20) DEFAULT NULL,
`day_time` varchar(255) COLLATE utf8mb4_bin NOT NULL,
PRIMARY KEY (`day_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
postgres表结构
CREATE TABLE "public"."ball" (
"total_gmv" int8,
"day_time" varchar(32) COLLATE "pg_catalog"."default"
);ALTER TABLE "public"."ball" ADD CONSTRAINT "ball_pkey" PRIMARY KEY ("day_time");
flinksql语句
--源表
CREATE TABLE pgtest (
day_time VARCHAR,
total_gmv bigint,
PRIMARY KEY (day_time) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'postgres-cdc', -- 必须为 'postgres-cdc'
'hostname' = 'localhost', -- 数据库的 IP
'port' = '5432', -- 数据库的访问端口
'username' = 'postgres', -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例)
'password' = '123456', -- 数据库访问使用的密码
'database-name' = 'postgres', -- 需要同步的数据库名
'schema-name' = 'public', -- 需要同步的数据库模式 (Schema)
'table-name' = 'ball' , -- 需要同步的数据表名
'decoding.plugin.name' = 'pgoutput', -- pgoutput,必须
'debezium.slot.name' = 'pgtestflink' -- 指定slot名称,必须
);
--结果表
create table MysqLtest ( day_time VARCHAR,
total_gmv bigint,
PRIMARY KEY (day_time) NOT ENFORCED ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:MysqL://172.18.11.224:3306/flinktest?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
'table-name' = 'sync_test_1',
'username' = 'root',
'password' = '123456'
);
INSERT INTO MysqLtest
SELECT day_time,total_gmv
FROM pgtest ;
这里要说明一下cdc和jdbc,cdc是实时捕获源表数据变化的,jdbc是sink表的,所以要读哪张表用cdc连接器,写哪张表用jdbc连接器。
反过来MysqL到postgres
--源表
create table MysqLtest ( day_time VARCHAR,
total_gmv bigint,
PRIMARY KEY (day_time) NOT ENFORCED ) WITH (
'connector' = 'MysqL-cdc',
'hostname' = '172.18.11.224',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'flinktest',
'table-name' = 'sync_test_1'
);
//结果表
create table pgtest ( day_time VARCHAR,
total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/postgres',
'table-name' = 'public.ball',
'username' = 'postgres',
'password' = '123456'
);
INSERT INTO pgtest
SELECT day_time,total_gmv
FROM MysqLtest ;
注意:必须指定主键PRIMARY KEY (day_time) NOT ENFORCED,与数据表中的主键要对应。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。