flink 数据湖_flink sql cdc

flink 数据湖_flink sql cdc每条 RowData 都有一个元数据 RowKind,包括 4 种类型, 分别是插入 、更新前镜像 、更新后镜像 、删除 ,这四种类型和数据库里

大家好,欢迎来到IT知识分享网。

一 CDC概述

1.1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为CDC。目前通常描述的CDC技术主要是,检测并捕获数据库的变更(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息队列或者其他服务进行订阅和消费。它是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:

  • 数据同步:用于备份,容灾;
  • 数据分发:一个数据源分发给多个下游系统;
  • 数据采集:面向数据仓库/数据湖的ETL数据集成,是非常重要的数据源。

1.2 CDC的种类

CDC的技术方案非常多,目前业界主流的实现机制可以分为两种:

1.2.1 基于查询的CDC

  • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
  • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
  • 不保障实时性,基于离线调度存在天然的延迟。

1.2.2 基于日志的 CDC

  • 实时消费日志,流处理,例如MySQL的binlog日志完整记录了数据库中的变更,可以把binlog文件当作流的数据源;
  • 保障数据一致性,因为binlog文件包含了所有历史变更明细;
  • 保障实时性,因为类似binlog的日志文件是可以流式消费的,提供的是实时数据。

1.2.3 两种CDC技术对比

flink 数据湖_flink sql cdc

经过以上对比,我们可以发现基于日志CDC有以下这几种优势:

  • 能够捕获所有数据的变化,捕获完整的变更记录。在异地容灾,数据备份等场景中得到广泛应用,如果是基于查询的CDC有可能导致两次查询的中间一部分数据丢失
  • 每次DML操作均有记录无需像查询CDC这样发起全表扫描进行过滤,拥有更高的效率和性能,具有低延迟,不增加数据库负载的优势
  • 无需入侵业务,业务解耦,无需更改业务模型
  • 捕获删除事件和捕获旧记录的状态,在查询CDC中,周期的查询无法感知中间数据是否删除。

1.3 FlinkCDC

Flink社区开发了Flink CDC Connectors组件,这是一个可以从不同的数据库(MySQL、PostgreSQL、Oracle、MongoDB)读取全量数据增量变更数据的source组件。Flink CDC Connectors组件内部集成了Debezium作为记录数据变更的引擎。

目前已经开源:https://github.com/ververica/flink-cdc-connectors

flink 数据湖_flink sql cdc

Flink CDC Connectors可以用来替换Debezium+Kafka的数据采集模块,从而实现Flink SQL采集+计算+传输(ETL)一体化,这样做的优点有以下:

  • 开箱即用,简单易上手
  • 减少维护的组件,简化实时链路,减轻部署成本
  • 减小端到端延迟
  • Flink自身支持Exactly Once的读取和计算
  • 数据不落地,减少存储成本
  • 支持全量和增量流式读取

1.4 CDC方案对比

flink 数据湖_flink sql cdc

1.4.1 增量同步能力

  • 基于日志的方式,可以很好地做到增量同步;
  • 而基于查询的方式是很难做到增量同步的。

1.4.2 全量同步能力

除了Canal,基于查询或者日志的CDC方案基本都支持全量同步数据。

1.4.3 全量 + 增量同步能力

只有Flink CDC、Debezium、Oracle Goldengate支持较好。

1.4.4 架构

该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。

1.4.5 数据清洗方面

在数据转换/数据清洗能力上,当数据进入到CDC工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?

  • 在Flink CDC上操作相当简单,可以通过Flink SQL去操作这些数据;
  • 但是像DataX、Debezium等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。

1.4.6 在生态方面的能力

这里指的是下游的一些数据库或者数据源的支持。Flink CDC下游有丰富的Connector,例如写入到TiDB、MySQL、Pg、HBase、Kafka、ClickHouse等常见的一些系统,也支持各种自定义connector。

二 FlinkCDC写入Iceberg表案例

在OLTP系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。

为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。下面将分别讲解如何使用Flink CDC构建实时数据湖来应对这种场景,采用FlinkSQL和流式处理两种方式。

flink 数据湖_flink sql cdc

2.0 准备阶段

2.0.1 配置MySQL的Binlog日志

[bigdata@bigdata185 ~]$ sudo vi /etc/my.cnf# 开启binloglog-bin=mysql-bin# 选择ROW模式binlog-format=ROW# 配置MySQL的Replicationserver_id=1# 配置生命周期expire-logs-days=7# 配置每个日志文件的大小max-binlog-size=500M# 开启flinkcdc这个Database的flinkcdc功能binlog-do-db=flinkcdc

IT知识分享网

查看binlog是否开启成功

IT知识分享网[bigdata@bigdata185 ~]$ cd /var/lib/mysql
[bigdata@bigdata185 mysql]$ sudo ls -l
flink 数据湖_flink sql cdc

2.0.2 准备数据

进入MySQL数据库

[bigdata@bigdata185 flink-1.13.5]$ mysql -uroot -proot

创建数据库和表,并填充数据

IT知识分享网mysql> CREATE DATABASE flinkcdc;
mysql> use flinkcdc;

CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

2.1 DataStream方式的应用

2.1.1 导入依赖

代码端Flink CDC使用1.13.5或者1.12.5 版本皆可,但pom配置某些包需降成1.11.4,不然会报缺包等错误。本次操作为使用Flink CDC(flink-connector-mysql-cdc 2.0.0 jar)与Flink1.13.5结合,实时监控mysqlbinlog日志(需提前开启binlog日志功能,此处可自行百度,修改mysql配置文件即可),入库iceberg。此代码很多版本问题,版本不一致会出现各种错误。

flink 数据湖_flink sql cdc

    <!-- 通过参数配置版本信息 -->    <properties>        <flink.version>1.13.5</flink.version>        <hadoop.version>3.2.2</hadoop.version>        <iceberg.version>0.11.1</iceberg.version>    </properties>    <dependencies>        <!-- 引入Flink相关依赖 -->        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-core</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-java</artifactId>            <version>${flink.version}</version>            <exclusions>                <exclusion>                    <groupId>log4j</groupId>                    <artifactId>*</artifactId>                </exclusion>                <exclusion>                    <groupId>org.slf4j</groupId>                    <artifactId>slf4j-log4j12</artifactId>                </exclusion>            </exclusions>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-streaming-java_2.12</artifactId>            <version>${flink.version}</version>            <exclusions>                <exclusion>                    <groupId>log4j</groupId>                    <artifactId>*</artifactId>                </exclusion>                <exclusion>                    <groupId>org.slf4j</groupId>                    <artifactId>slf4j-log4j12</artifactId>                </exclusion>            </exclusions>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-common</artifactId>            <version>1.11.4</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-clients_2.12</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-api-java-bridge_2.12</artifactId>            <version>1.11.4</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-api-java</artifactId>            <version>1.11.4</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-planner_2.12</artifactId>            <version>1.11.4</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-planner-blink_2.12</artifactId>            <version>1.11.4</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-json</artifactId>            <version>${flink.version}</version>        </dependency>        <!-- 引入Iceberg相关依赖 -->        <dependency>            <groupId>org.apache.iceberg</groupId>            <artifactId>iceberg-core</artifactId>            <version>${iceberg.version}</version>        </dependency>        <dependency>            <groupId>org.apache.iceberg</groupId>            <artifactId>iceberg-flink</artifactId>            <version>${iceberg.version}</version>        </dependency>        <dependency>            <groupId>org.apache.iceberg</groupId>            <artifactId>iceberg-flink-runtime</artifactId>            <version>${iceberg.version}</version>        </dependency>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>5.1.49</version>        </dependency>        <!-- 引入hadoop客户端相关依赖 -->        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-client</artifactId>            <version>${hadoop.version}</version>        </dependency>        <!-- 引入日志相关依赖 -->        <dependency>            <groupId>org.slf4j</groupId>            <artifactId>slf4j-log4j12</artifactId>            <version>1.7.30</version>        </dependency>        <!-- 添加fastjson依赖 -->        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>1.2.76</version>        </dependency>        <dependency>            <groupId>com.ververica</groupId>            <artifactId>flink-connector-mysql-cdc</artifactId>            <version>2.0.0</version>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-assembly-plugin</artifactId>                <version>3.0.0</version>                <configuration>                    <descriptorRefs>                        <descriptorRef>jar-with-dependencies</descriptorRef>                    </descriptorRefs>                </configuration>                <executions>                    <execution>                        <id>make-assembly</id>                        <phase>package</phase>                        <goals>                            <goal>single</goal>                        </goals>                    </execution>                </executions>            </plugin>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>        </plugins>    </build>

2.1.2 创建Iceberg表

[bigdata@bigdata185 flink-1.11.6]$ bin/sql-client.sh embedded \    -j /opt/module/jars/iceberg/iceberg-flink-runtime-0.11.1.jar \    shellFlink SQL> use catalog hadoop_catalog;Flink SQL> use iceberg_db;Flink SQL> CREATE TABLE products_sink (    id INT,    name STRING,    description STRING  );

2.1.3 DataStream代码

package com.yunclass.iceberg.flinkcdc;import com.ververica.cdc.connectors.mysql.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.TableColumn;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.inference.TypeTransformations;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.table.types.utils.DataTypeUtils;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.*;import org.apache.iceberg.catalog.Catalog;import org.apache.iceberg.catalog.Namespace;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.flink.CatalogLoader;import org.apache.iceberg.flink.TableLoader;import org.apache.iceberg.flink.sink.FlinkSink;import org.apache.iceberg.types.Types;import java.time.ZoneId;import java.util.Arrays;import java.util.HashMap;import java.util.Map;public class RowData2Iceberg {    private static final String HADOOP_CATALOG ="hadoop_catalog";    // 定义iceberg schema    private static final Schema SCHEMA = new Schema(            Types.NestedField.optional(0, "id", Types.IntegerType.get()),            Types.NestedField.optional(1, "name",Types.StringType.get()),            Types.NestedField.optional(2, "description",Types.StringType.get()));    public static void main(String[] args) throws Exception {        // 获取Flink执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        System.setProperty("HADOOP_USER_NAME", "bigdata");        env.setParallelism(1);        env.enableCheckpointing(3000);        // 定义mysql的监控字段        TableSchema tableSchema = TableSchema.builder()                .add(TableColumn.of("id", DataTypes.INT()))                .add(TableColumn.of("name", DataTypes.STRING()))                .add(TableColumn.of("description", DataTypes.STRING()))                .build();        // 转换为RowType格式        RowType rowType = (RowType)tableSchema.toRowDataType().getLogicalType();        DebeziumDeserializationSchema deserialer =                new RowDataDebeziumDeserializeSchema(                        rowType,                        createTypeInfo(tableSchema.toRowDataType()),                        (rowData, rowKind) -> {}, ZoneId.of("Asia/Shanghai"));        // 定义MySQL数据源        SourceFunction sourceFunction = MySqlSource.<String>builder()                .hostname("bigdata185")                .serverTimeZone("UTC")                .port(3306)                .databaseList("flinkcdc")                .tableList("flinkcdc.products")                .username("root")                .password("root")                .deserializer(deserialer)                .build();        // 加载数据源        DataStreamSource dataStreamSource = env.addSource(sourceFunction);        dataStreamSource.print();        sink2IcebergHadoop(dataStreamSource);        env.setParallelism(1);        env.execute();    }    public static TypeInformation createTypeInfo(DataType producedDataType) {        final DataType internalDataType = DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);        return (TypeInformation)TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);    }    private static void sink2IcebergHadoop(DataStream input) {        Map properties = new HashMap<>();        properties.put("type", "iceberg");        properties.put("catalog-type", "hadoop");        properties.put("property-version", "2");        properties.put("warehouse", "hdfs://bigdata185:8020/flink/warehouse");        CatalogLoader catalogLoader = CatalogLoader.hadoop(HADOOP_CATALOG, new Configuration(), properties);        icebergSink(input, catalogLoader);    }    private static void icebergSink(DataStream input, CatalogLoader loader) {        Catalog catalog = loader.loadCatalog();        TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("iceberg_db"), "products_sink");        Table table;        if (catalog.tableExists(tableIdentifier)) {            table = catalog.loadTable(tableIdentifier);        } else {            table = catalog.buildTable(tableIdentifier, SCHEMA)                    .withPartitionSpec(PartitionSpec.unpartitioned())                    .create();        }        /**         * 社区版本现在v1删除功能有问题,需要用v2展开测试,把java API把v1升级到v2         */        TableOperations operations = ((BaseTable) table).operations();        TableMetadata metadata = operations.current();        operations.commit(metadata, metadata.upgradeToFormatVersion(2));        TableLoader tableLoader = TableLoader.fromCatalog(loader, tableIdentifier);        // 写入Iceberg表        FlinkSink.forRowData(input)                .table(table)                .tableLoader(tableLoader)                .equalityFieldColumns(Arrays.asList("id"))                .writeParallelism(1)                .build();    }}

2.1.4 本地模式测试

2.1.4.1 初始化启动

1、在Idea环境中,直接右键执行代码。

flink 数据湖_flink sql cdc

2、在Iceberg中查表验证数据

Flink SQL> select * from products_sink;
flink 数据湖_flink sql cdc

2.1.4.2 流式测试Iceberg

1、新增数据

在MySQL的flinkcdc.products中新增一条数据。

INSERT INTO products
VALUES (default,"tigger","beijing zoo");

控制台自动打印

flink 数据湖_flink sql cdc

在Iceberg中验证数据是否插入成功。

Flink SQL> select * from products_sink;
flink 数据湖_flink sql cdc

2、更新数据

mysql> update products set name = 'tom' where id = 110;

控制台自动打印

flink 数据湖_flink sql cdc

在Iceberg中验证数据是否插入成功。

Flink SQL> select * from products_sink;
flink 数据湖_flink sql cdc

3、删除数据

mysql> delete from products where id = 110;

控制台自动打印

flink 数据湖_flink sql cdc

在Iceberg中验证数据是否插入成功。

Flink SQL> select * from products_sink;
flink 数据湖_flink sql cdc

2.1.5 集群模式测试

2.1.5.1 Maven打包

flink 数据湖_flink sql cdc

flink 数据湖_flink sql cdc

2.1.5.2 上传jar包

[bigdata@bigdata185 sources]$ lsflink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar

2.1.5.3 运行Flink程序

[bigdata@bigdata185 flink-1.13.5]$ bin/flink run -m bigdata185:8081 -c com.yunclass.iceberg.flinkcdc.RowData2Iceberg /opt/sources/flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar 

2.1.5.4 流式测试Iceberg

1、新增数据

在MySQL的flinkcdc.products中新增一条数据。

INSERT INTO products
VALUES (default,"tigger","beijing zoo");

控制台自动打印

flink 数据湖_flink sql cdc

2、更新数据

mysql> update products set name = 'tom' where id = 111;

控制台自动打印

flink 数据湖_flink sql cdc

3、删除数据

mysql> delete from products where id = 111;

控制台自动打印

flink 数据湖_flink sql cdc

2.2 FlinkSQL方式的应用

在使用了Flink CDC之后,除了组件更少,维护更方便外,另一个优势是通过Flink SQL极大地降低了用户使用门槛。

接下来将以数据从MySQL同步到Iceberg为例展示整个流程,架构图如下所示:

flink 数据湖_flink sql cdc

2.2.1 配置FlinkCDC

目前社区版本支持FlinkCDC2.0+Flink1.13.5+Iceberg0.13.0配置FlinkSQL级别的CDC操作。为了简化整个教程,本教程需要的jar包都已经被打包进SQL-Client容器中,在自己的 Flink 环境运行本教程,需要下载下面列出的包并且把它们放在Flink所在目录的lib目录下,即 FLINK_HOME/lib/

flink-sql-connector-mysql-cdc-2.1.0.jar

iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

[bigdata@bigdata185 software]$ cp iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar /opt/module/flink-1.13.5/lib/
[bigdata@bigdata185 software]$ cp flink-sql-connector-mysql-cdc-2.1.0.jar /opt/module/flink-1.13.5/lib/

从Flink1.11.x开始,不再提供flink-shaded-hadoop-2-uber的支持,如果需要flink支持hadoop的环境变量HADOOP_CLASSPATH

[bigdata@bigdata185 flink-1.13.5]$ vi bin/config.sh
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

2.2.2 在 Flink SQL CLI 中使用 Flink DDL 创建表

2.2.2.1 启动Flink集群

启动Flink集群的standalonesession模式

[bigdata@bigdata185 flink-1.13.5]$ bin/start-cluster.sh 

登录http://bigdata185:8081/,确认Flink集群是否启动成功。

flink 数据湖_flink sql cdc

首先,使用如下的命令进入Flink SQL CLI中:

[bigdata@bigdata185 flink-1.13.5]$ bin/sql-client.sh

我们可以看到如下界面:

flink 数据湖_flink sql cdc

2.2.2.2 开启CheckPoint

Checkpoint默认是不开启的,我们需要开启Checkpoint来让Iceberg可以提交事务。并且,mysql-cdc在binlog读取阶段开始前,需要等待一个完整的checkpoint来避免binlog记录乱序的情况。

-- Flink SQL
-- 每隔 3 秒做一次 checkpoint  
Flink SQL> SET execution.checkpointing.interval = 3s;

2.1.2.3 创建 MySQL 分库分表 source 表

创建source表user_source来捕获MySQL中对应表的数据,在表的配置项 database-name , table-name 使用正则表达式来匹配这些表。

-- Flink SQL
Flink SQL> CREATE TABLE orders_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'bigdata185',
   'port' = '3306',
   'username' = 'root',
   'password' = 'root',
   'database-name' = 'flinkcdc',
   'table-name' = 'orders'
 );

2.2.2.4 创建 Iceberg sink 表

创建Iceberg的sink表,用来将数据加载至Iceberg中。

Flink SQL> CREATE TABLE orders_sink (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
    'connector'='iceberg',
    'catalog-name'='hadoop_catalog',
    'catalog-type'='hadoop',  
    'warehouse'='hdfs://bigdata185:8020/flink/warehouse',
    'format-version'='2'
  );

2.1.3 流式写入 Iceberg

2.1.3.1 写数据

在session1中使用下面的Flink SQL语句将数据从MySQL写入Iceberg中:

Flink SQL> INSERT INTO orders_sink SELECT * FROM orders_source;

上述命令将会启动一个流式作业,源源不断将MySQL数据库中的全量和增量数据同步到Iceberg中。
在Flink UI上可以看到这个运行的作业:

flink 数据湖_flink sql cdc

然后我们就可以使用如下的看到Iceberg中的写入的文件:

flink 数据湖_flink sql cdc

在你的运行环境中,实际的文件可能与上面的截图不相同,但是整体的目录结构应该相似。

2.1.3.2 查询Iceberg表数据

使用下面的Flink SQL语句查询表products_sink中的数据:

Flink SQL> select * from orders_sink;
flink 数据湖_flink sql cdc

下面每执行一步,我们就可以在 Flink Client CLI 中使用 SELECT * FROM orders_sink 查询表 orders_sink 来看到数据的变化。

2.1.3.3 新增一条数据

flinkcdc.orders 表中插入新的一行

--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
flink 数据湖_flink sql cdc

2.1.3.4 更新一条数据

--MySQLUPDATE orders SET customer_name='Tom' WHERE order_id = 10004;
flink 数据湖_flink sql cdc

2.1.3.5 删除一条数据

--MySQLDELETE FROM orders WHERE order_id = 10004;
flink 数据湖_flink sql cdc

我们展示了如何使用 Flink CDC 同步 MySQL 分库分表的数据,快速构建 Icberg 实时数据湖。用户也可以同步其他数据库(Postgres/Oracle)的数据到Iceberg数据湖中。

三 FlinkCDC实时读写数据分析

3.1 Flink 流概念(翻译)

原文连接:Streaming Concepts

3.1.1 数据流上的关系查询

传统的数据库SQL在设计时并未考虑流数据。但是结果,传统的数据库SQL处理与流处理之间在概念上几乎没有差别。传统的数据库SQL和实时SQL概念没差别,但是处理的差别还是很大的,下面列出一些区别:

flink 数据湖_flink sql cdc

尽管存在这些差异,使用关系查询和SQL来处理流并不是不可能的。高级关系数据库系统提供称为”物化视图”的功能。物化视图定义为SQL查询,就像常规虚拟视图一样。

与虚拟视图相比,物化视图缓存查询的结果,使得在访问视图时不需要执行查询。缓存的一个常见挑战是避免缓存提供过时的结果。物化视图在修改其定义查询的基表时会过时。Eager View Maintenance是一种在更新基表后立即更新实例化视图的技术。

如果我们考虑以下内容,Eager View Maintenance和流上的SQL查询之间的联系就变得很明显:

  • 数据库表是INSERT,UPDATE和DELETEDML语句流的结果,通常被称为更新日志流。
  • 物化视图定义为SQL查询。为了更新视图,查询需要持续处理视图源表的更改日志流。
  • 物化视图是流式SQL查询的结果。

考虑到这些要点,我们将在下面回顾一下动态表的概念。

3.1.2 动态表和连续查询

动态表是Flink的Table API和SQL对流数据的支持的核心概念。与静态表相比,动态表会随时间而变化,但可以像静态表一样查询动态表,只不过查询动态表需要产生连续查询。连续查询永远不会终止,会生成动态表作为结果表。查询不断更新其(动态)结果表以反映其(动态)输入表的更改。最终,动态表上的连续查询与定义物化视图的查询非常相似。

下图显示了流,动态表和连续查询之间的关系:

flink 数据湖_flink sql cdc

  1. 数据流被转化为动态表
  2. 在产生的动态表上执行连续不断的查询,产生一个动态结果表。
  1. 结果动态表再次被转化为数据流。

值得注意的是,动态表的查询结果始终在语义上等同于在输入表的快照上执行批处理的到的相同查询结果。

接下来我们以点击事件流来解释动态表和连续不断的查询。

CREATE TABLE clicks (
  user  VARCHAR,     -- the name of the user
  url   VARCHAR,     -- the URL that was accessed by the user
  cTime TIMESTAMP(3) -- the time when the URL was accessed
) WITH (...);

3.1.3 在流上定义Table

为了使用传统的关系查询处理流,必须将其转换为Table。从概念上讲,流的每个新增记录都被解释为对结果表的Insert操作。最终,可以理解为是在从一个INSERT-only changelog流上构建一个表。

下图可视化了click事件流(左侧)如何转换为表格(右侧)。随着插入更多点击流记录,结果表将持续增长。

flink 数据湖_flink sql cdc

注意:stream流上转化的表内部并没有被物化。

3.1.3.1 连续查询(Continuous Queries)

在动态表上执行连续查询,并生成一个新的动态表作为结果。与批处理查询相反,连续查询永远不会终止并根据其输入表的更新来更新其结果表。在任何时间点,连续查询的结果在语义上都等同于在输入表的快照上以批处理模式执行同一查询的结果。

在下面的内容中,我们显示了对单击事件流上定义的clicks表的两个示例查询。

聚合查询

第一个查询是一个简单的GROUP-BY COUNT聚合查询。主要是对clicks表按照user分组,然后统计url得到访问次数。下图展示了clicks表在数据增加期间查询是如何执行的。

flink 数据湖_flink sql cdc

启动查询后,点击表(左侧)为空。当第一行插入到clicks表中时,查询开始计算结果表。插入第一行[Mary,./home]后,结果表(右侧,顶部)由单行[Mary,1]组成。当第二行[Bob,./cart]插入clicks表时,查询将更新结果表并插入新行[Bob,1]。第三行[Mary,./prod?id=1]产生已计算结果行的更新,从而将[Mary,1]更新为[Mary,2]。最后,当第四行添加到clicks表时,查询将第三行[Liz,1]插入结果表。

滚动查询

第二个查询与第一个查询类似,但是在对URL数量进行计数之前,除了将clicks表的user分组之外增加了一个1小时的滚动窗口(基于时间的计算(例如,窗口基于特殊的时间属性))。同样,该图显示了在不同时间点的输入和输出,以可视化动态表的变化性质。

flink 数据湖_flink sql cdc

和上面一样,输入表的点击次数显示在左侧。该查询每小时连续计算结果并更新结果表。clicks表包含四行,其时间戳记(cTime)在12:00:00和12:59:59之间。该查询从该输入计算两个结果行(每个用户一个),并将它们附加到结果表中。对于13:00:00和13:59:59之间的下一个窗口,clicks表包含三行,这将导致另外两行附加到结果表中。结果表会更新,因为随着时间的推移会将更多行添加到点击次数中。

3.1.3.2 Update 和 append 查询

尽管两个示例查询看起来非常相似(都计算了分组计数聚合),但是内部逻辑还是区别较大:

  • 第一个查询更新以前发出的结果,即结果表的更改日志流包含INSERT和UPDATE更改。
  • 第二个查询仅append到结果表,即结果表的更改日志流仅包含INSERT更改。

查询是生成仅append表还是update表有一些区别:

  • 产生update变化的查询通常必须维护更多状态。
  • 将仅append表转换为流与将update表的转换为流,方式不同。

3.1.4 查询限制

可以将许多但不是全部的语义有效查询评估为流中的连续查询。某些查询由于需要维护的状态大小或计算更新过于昂贵而无法计算。

3.1.4.1 状态大小

连续查询在无界流上执行,通常应该运行数周或数月,甚至7*24小时。因此,连续查询处理的数据总量可能非常大。为了更新先前生成的结果,可能需要维护所有输出的行。例如,第一个示例查询需要存储每个用户的URL计数,以便能够增加计数,并在输入表收到新行时发出新结果。如果仅统计注册用户,则要维护的计数可能不会太高。但是,如果未注册的用户分配了唯一的用户名,则要维护的计数数将随着时间的推移而增长,最终可能导致查询失败。

SELECT user, COUNT(url)
FROM clicks
GROUP BY user;

3.1.4.2 计算更新

有时即使只添加或更新了单个输入记录,某些查询也需要重新计算和更新大部分发出的结果行。显然,这样的查询不适合作为连续查询执行。以下查询是一个示例,该查询根据最终点击时间为每个用户计算RANK。一旦clicks表收到新行,用户的lastAction就会更新,并且必须计算新的排名。但是,由于两行不能具有相同的排名,因此所有排名较低的行也需要更新。

SELECT user, RANK() OVER (ORDER BY lastAction)
FROM (
  SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);

3.1.5 Table转化为流

可以像常规数据库表一样通过INSERT,UPDATE和DELETE更改来连续修改动态表。它可能是一个具有一行且不断更新的表,一个没有UPDATE和DELETE修改的仅插入表,或介于两者之间的任何表。

将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的Table API和SQL支持三种方式来编码动态表的更改:

  • Append-only stream:假如动态表的更改操作仅仅是insert ,那么变为stream就仅仅需要将插入的行发送出去即可。
  • Retract stream: 撤回流是具有两种消息类型的流,即添加消息和撤消消息。通过将INSERT更改编码为添加消息,DELETE更改编码为撤消消息,将UPDATE编码为对先前行的回撤消息和对新增行的增加消息,来完成将动态表转换为撤回流。下图可视化了动态表到撤回流的转换:
flink 数据湖_flink sql cdc

  • Upsert流: Upsert流是具有两种消息类型的流,Upsert消息和Delete消息。转换为upsert流的动态表需要一个(可能是复合的)唯一键。通过将INSERT和UPDATE更改编码为upsert消息并将DELETE更改编码为delete消息,将具有唯一键的动态表转换为流。流算符需要知道唯一键属性才能正确处理消息。与撤回流的主要区别在于UPDATE更改使用单个消息进行编码,因此效率更高。下图显示了动态表到upsert流的转换。
flink 数据湖_flink sql cdc

3.2 基于Flink CDC的ETL分析

MySQL数据库的一张表所有的变更都记录在binlog日志中,如果一直对表进行更新,binlog日志流也一直会追加,数据库中的表就相当于binlog日志流在某个时刻点物化的结果;日志流就是将表的变更数据持续捕获的结果。这说明Flink SQL的Dynamic Table是可以非常自然地表示一张不断变化的MySQL数据库表。

而Debezium作为Flink CDC的底层采集工具。Debezium支持全量同步,也支持增量同步,也支持全量 + 增量的同步,非常灵活,同时基于日志的CDC技术使得提供Exactly-Once成为可能。

flink 数据湖_flink sql cdc

将Flink SQL的内部数据结构RowData和Debezium的数据结构进行对比,可以发现两者是非常相似的。

  • 每条 RowData 都有一个元数据 RowKind,包括 4 种类型, 分别是插入 (INSERT)、更新前镜像 (UPDATE_BEFORE)、更新后镜像 (UPDATE_AFTER)、删除 (DELETE),这四种类型和数据库里面的 binlog 概念保持一致。
  • 而 Debezium 的数据结构,也有一个类似的元数据 op 字段, op 字段的取值也有四种,分别是 c、u、d、r,各自对应 create、update、delete、read。对于代表更新操作的 u,其数据部分同时包含了前镜像 (before) 和后镜像 (after)。

在使用了 Flink CDC 之后,除了组件更少,维护更方便外,另一个优势是通过 Flink SQL 极大地降低了用户使用门槛,可以看下面的例子:

flink 数据湖_flink sql cdc

可以通过Flink CDC去同步数据库数据并写入到TiDB、Iceberg、Doris等数据库中,用户直接使用Flink SQL创建对应的MySQL-CDC表,然后对数据流进行JOIN加工,加工后直接写入到下游数据库。通过一个Flink SQL作业就完成了CDC的数据分析,加工和同步。

大家会发现这是一个纯 SQL 作业,这意味着只要会 SQL 的 BI,业务线同学都可以完成此类工作。与此同时,用户也可以利用 Flink SQL 提供的丰富语法进行数据清洗、分析、聚合。

四 总结

  1. CDC概述
  2. FlinkCDC写入Iceberg表案例
  3. FlinkCDC实时写数据分析

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6291.html

(0)
上一篇 2022-12-14 22:56
下一篇 2022-12-14 23:00

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信