微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flinksql postgres到mysql

最近在学flink用sql语句写任务,postgres到MysqL总是报错,中间遇到了很多坑,网上查得写的都不详细,自己做完总结一下。

1.flink版本用的是1.12版本,高版本sql语法有变化,我使用的是1.12版本。

2.注意flink版本与驱动包版本相对应,高版本驱动跟flink版本不兼容导致任务启动失败。

3.postgres数据库MysqL数据库都是用docker搭建的,搭建postgres数据库参考:docker部署postgres数据库_今朝花落悲颜色的博客-CSDN博客

搭建好了一定要修改postgres.conf配置文件,在挂载目录/docker/postgresql/data/下面找到postgres.conf,修改wal_level=logical,然后重启postgres。

 MysqL表结构sql

CREATE TABLE `sync_test_1` (
  `total_gmv` bigint(20) DEFAULT NULL,
  `day_time` varchar(255) COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`day_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

postgres表结构

CREATE TABLE "public"."ball" (
  "total_gmv" int8,
  "day_time" varchar(32) COLLATE "pg_catalog"."default"
);

ALTER TABLE "public"."ball" ADD CONSTRAINT "ball_pkey" PRIMARY KEY ("day_time");

flinksql语句

--源表

CREATE TABLE pgtest (

  day_time VARCHAR,

    total_gmv bigint,

    PRIMARY KEY (day_time) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义

) WITH (

  'connector' = 'postgres-cdc',  -- 必须为 'postgres-cdc'

  'hostname' = 'localhost',     -- 数据库的 IP

  'port' = '5432',     -- 数据库的访问端口

  'username' = 'postgres',           -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例)

  'password' = '123456',    -- 数据库访问使用的密码

  'database-name' = 'postgres',  -- 需要同步的数据库

  'schema-name' = 'public',      -- 需要同步的数据库模式 (Schema)

  'table-name' = 'ball' ,      -- 需要同步的数据表名

  'decoding.plugin.name' = 'pgoutput',  -- pgoutput,必须

   'debezium.slot.name' = 'pgtestflink'  -- 指定slot名称,必须

);

--结果表

create table MysqLtest ( day_time VARCHAR,

    total_gmv bigint,

    PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

    'connector' = 'jdbc',

    'url' = 'jdbc:MysqL://172.18.11.224:3306/flinktest?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',

    'table-name' = 'sync_test_1',

    'username' = 'root',

    'password' = '123456'

);

INSERT INTO MysqLtest  

SELECT day_time,total_gmv

FROM pgtest ;

这里要说明一下cdc和jdbc,cdc是实时捕获源表数据变化的,jdbc是sink表的,所以要读哪张表用cdc连接器,写哪张表用jdbc连接器。 

反过来MysqL到postgres

--源表

create table MysqLtest ( day_time VARCHAR,

    total_gmv bigint,

    PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

       'connector' = 'MysqL-cdc',

       'hostname' = '172.18.11.224',

       'port' = '3306',

       'username' = 'root',

       'password' = '123456',

       'database-name' = 'flinktest',

       'table-name' = 'sync_test_1'

);

//结果表

create table pgtest ( day_time VARCHAR,

    total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH (

    'connector' = 'jdbc',

    'url' = 'jdbc:postgresql://localhost:5432/postgres',

    'table-name' = 'public.ball',

    'username' = 'postgres',

    'password' = '123456'

);

INSERT INTO pgtest  

SELECT day_time,total_gmv

FROM MysqLtest ;

注意:必须指定主键PRIMARY KEY (day_time) NOT ENFORCED,与数据表中的主键要对应。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐