微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

FlinkCDC解读

FlinkCDC

因为一直没有一个详尽的解读我就自己做一篇

·https://github.com/apache/flink·

官网

·https://github.com/ververica/flink-cdc-connectors·

官方论坛

·Flink sql CDC 实践以及一致性分析·2021-03-10 ·https://mp.weixin.qq.com/s/tE70jJO6pZTe6oB0fKcZkQ·

·Flink 如何实时分析 Iceberg 数据湖的 CDC 数据·2021-02-23 ·https://mp.weixin.qq.com/s/18ZA_DuAyvafl3k9lhJnVA·

·基于 Flink sql CDC 的实时数据同步方案·2020-11-02 ·https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q·

·Flink sql CDC 上线!我们总结了 13 条生产实践经验·2020-09-04 ·https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ·

·Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据·2020-08-28 ·https://mp.weixin.qq.com/s/HaSi4E1Ez4jV06RWAQ2wAQ·

WIKI

·https://github.com/ververica/flink-cdc-connectors/wiki/Downloads·

·https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B·

简介
Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC))从不同的数据库提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能

特点
1 支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义

2 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。

3 对于Table/sql API 的CDC connector,用户可以使用sql DDL创建CDC数据源,来监视单个表上的数据变更。

 

MysqL CDC

Postgres CDC

DatabaseVersion
MysqLDatabase: 5.7, 8.0.x
JDBC Driver: 8.0.16
PostgresqlDatabase: 9.6, 10, 11, 12
JDBC Driver: 42.2.12

Kafka的Connector

          canal-json

    debezium-json

    changelog-json(这个本身带墓碑)的format......

具体见·https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/·

MysqL CDC

·https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector·

表/ sql API的用法

  1. 设置安装了1.11+版本和Java 8+版本的Flink集群。
  2. 下载页面下载连接器sql jar (或自行构建)
  3. 将下载的jar放在下方FLINK_HOME/lib/
  4. 重新启动Flink群集。
-- 创建一个MysqL cdc表源
CREATE TABLE MysqL_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (
 'connector' = 'MysqL-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'inventory',
 'table-name' = 'products'
);

-- 从MysqL读取快照和binlog数据,并进行一些转换,并在客户端上显示
SELECT id, UPPER(name), description, weight FROM MysqL_binlog;

DataStream API的用法

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <!-- add the dependency matching your database -->
  <artifactId>flink-connector-MysqL-cdc</artifactId>
  <version>1.1.0</version>
</dependency>

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.sourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.MysqL.MysqLSource;

public class MysqLbinlogSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MysqLSource.<String>builder()
      .hostname("localhost")
      .port(3306)
      .databaseList("inventory") // 监视下库存数据库中的所有表
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new StringDebeziumDeserializationSchema()) // 将SourceRecord转换为String
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); //对接收器使用并行度1以保持消息顺序

    env.execute();
  }
}

具体配置见·https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#startup-reading-position·

Postgres CDC

具体配置见·https://github.com/ververica/flink-cdc-connectors/wiki/Postgres-CDC-Connector·

KAFKA  Connectors

sql解析使用Changelog JSON格式

·https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/json/·

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.13.0</version>
</dependency>

下载flink-format-changelog-json-1.1.3.jar并将其放在下<FLINK_HOME>/lib/

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',-- 使用kafka连接器
 'topic' = 'user_behavior',-- kafka topic
 'properties.bootstrap.servers' = 'localhost:9092',-- kafka代理地址
 'properties.group.id' = 'testGroup',-- 消费者足
 'format' = 'json',-- 数据格式为json
 'json.fail-on-missing-field' = 'false',-- 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(认为 false,即抛出错误失败)
 'json.ignore-parse-errors' = 'true'-- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
  ......
)

Format 参数 

参数是否必须认值类型描述

format

必选(none)String声明使用的格式,这里应为'json'

json.fail-on-missing-field

可选falseBoolean当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(认为 false,即抛出错误失败)。

json.ignore-parse-errors

可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null

json.timestamp-format.standard

可选'sql'String声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为'sql' 以及 'ISO-8601'
  • 可选参数 'sql' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123", 以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" , 以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出

json.map-null-key.mode

选填'FAIL'String指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL''DROP' 和 'LIteraL':
  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LIteraL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'json.map-null-key.literal' 定义。

json.map-null-key.literal

选填'null'String当 'json.map-null-key.mode' 是 LIteraL 的时候,指定字符串常量替换 Map 中的空 key 值。

json.encode.decimal-as-plain-number

选填falseBoolean将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027

数据类型映射关系 

当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。

在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。

下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink sql 类型JSON 类型
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring with encoding: base64
DECIMALnumber
tinyintnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONEstring with format: date-time (with UTC time zone)
INTERVALnumber
ARRAYarray
MAP / MULTISETobject
ROWobject
DEMO

-- 假设我们有一个user_behavior日志
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',  -- 使用kafka连接器
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',  -- 从开头
    'properties.bootstrap.servers' = 'localhost:9092',  -- kafka代理地址
    'format' = 'json'  -- 数据格式为json
);

-- 我们要使用changelog-json格式将UV聚合结果存储在kafka中
create table day_uv (
    day_str STRING,
    uv BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'day_uv',
    'scan.startup.mode' = 'earliest-offset',  -- 从开头读取
    'properties.bootstrap.servers' = 'localhost:9092',  -- kafka代理地址
    'format' = 'changelog-json'  -- 数据格式为json
);

-- 使用更改日志-JSON格式写的UV结果转换成kafka
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');

-- 读回changelog
SELECT * FROM day_uv;

接下来博主会结合自己的使用经验分析sqlCDC和DataStreamAPICDC的使用场景 比较耗费精力 因为最近要上FLINK1.13和普罗米修斯及ZEPPLIN 博主也会将这三篇文章搞定

通常来讲博主自身的生产经验

通常 如果单表binlog那么就会使用Flinksql 方便

如果 多表(因为sql字段已经固定但是多表间字段可能不同)甚至整个库及复杂binlog就会使用DataStreamAPI 需要自己json格式解析取需要的字段

CDC DEMO 

博主要统计商品仓储数量  仓储字段有1(入仓)2(出仓)两种mark。

增删都好说。但是涉及到update,当且仅当非仓储字段update也会产生binlog

这时就要判断是否是仓储字段update,最好的办法就是从binlog before after判断是否是仓储字段update,如果是进行仓储数量计算,不是就不计算。

整个流程是 :启动流之前查询DB表统计仓储数量到state=>利用上面的DEMO对state进行实时+-。

 

 

 

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐