flink基础教程 豆瓣_大数据与我的文章

flink基础教程 豆瓣_大数据与我的文章一 Iceberg概述Iceberg的官方定义:Apache Iceberg is an open table format for huge

大家好,欢迎来到IT知识分享网。

一 Iceberg概述

Iceberg的官方定义:Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table.

用通俗的语言解释一下就是,Iceberg是介于计算层(Spark、Trino、Presto、Flink和Hive)和存储层(ORC、Parquet)的中间“表格式”层。它与存储层最大的区别是,它并不定义数据的存储方式仅仅是定义数据和元数据的组织方式,向计算层提供统一的“”定义。它构建在数据存储格式之上,底层的数据存储仍然是ORC、Parquet等格式。通过Spark、Flink写入Iceberg,然后在通过Spark、Trino、Presto、Flink等来读取数据。

flink基础教程 豆瓣_大数据与我的文章

从Iceberg起到的作用可以看出,Iceberg并不绑定某一种特定的执行引擎,它实现了通用的数据组织格式。这种格式可以方便地与不同的计算引擎(Flink、Spark、Hive等)对接。

1.1 Iceberg的优势

1.1.1 优化数据入库流程

Iceberg本身提供ACID的事务支持,保证数据写入即可见;Iceberg提供了Upsert、Merge Into的能力,极大地缩小了数据入库延迟性。能够提供分钟级别的计算延迟。

1.1.2 支持多种数据分析引擎

支持Hive、Spark、Presto、Flink等多种数据分析引擎。

1.1.3 统一灵活的表格式

Iceberg提供了基于流式的增量计算模型和基于批处理的全量表计算模型。流处理和批处理任务可以使用相同的存储模型,使数据不再孤立存在。Iceberg支持隐藏分区和分区进化,方便业务进行数据分区策略更新。支持Parquet、Avro以及ORC等存储格式。

1.2 Iceberg的应用场景

1.2.1 构建实时的Data Pipeline

可以使用Flink+Iceberg的方式构建实时Data Pipeline。通过Flink消费Kafka的消息日志,直接将原始日志写入Iceberg,然后再通过Flink的ETL计算将中间结果写入下游的Iceberg表。Iceberg能够支持Flink每30秒或者1分钟的增量数据写入,这样可以提供端到端的数据及时性。

flink基础教程 豆瓣_大数据与我的文章

借助Flink实现数据的ExactlyOnce写入数据湖,由于Kafka的成本比较高,一般只保留7天左右的数据,Iceberg的存储成本相对比较低,可以保存全量的历史数据。针对特殊需求,可以采用Iceberg全量数据和Kafka的增量数据来驱动新的Flink作业。

1.2.2 实现CDC功能

Flink本身已经原生的支持CDC数据解析,可以用Flink CDC2.0技术编写几行代码,将解析Binlog日志写入Iceberg。CDC数据在写入Iceberg之后,可以使用Presto、Spark、Hive等执行引擎实时的读取并分析Iceberg中的CDC数据。

1.2.3 实现准实时数据仓库建设

Iceberg支持并发读、增量读、合并小文件,而且还能做到秒级/分钟级的数据延迟。基于Iceberg这些优势,采用Flink+Iceberg的方式构建了流批一体化的准实时数据仓库。

flink基础教程 豆瓣_大数据与我的文章

二 Flink+Iceberg案例

Apache Iceberg在0.12版本增加了Flink对行级数据删除的功能,除此之外,还支持使用Flink Streaming读取数据,通过Flink CDC/Upsert操作。同时还能通过write.distribution-mode=hash和Batch rewrite files action手段合并小文件。

flink基础教程 豆瓣_大数据与我的文章

2.1 环境准备

我们使用的环境是Flink1.13+Hadoop3.2+Hive3.1。

2.1.1 Flink操作Maven环境准备

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-core</artifactId>
            <version>0.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink</artifactId>
            <version>0.12.0</version>
        </dependency>
				<!-- 在Idea环境操作Hadoop集群文件 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime</artifactId>
            <version>0.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>

IT知识分享网

2.1.2 FlinkSQL客户端操作

如果通过FlinkSQL交互窗口操作,需要下载以下两个jar包。

2.1.2.1 iceberg-flink-runtime-0.12.0.jar

点击这个网址可以下载iceberg-flink-runtime-0.12.0.jar。

https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/0.12.0/iceberg-flink-runtime-0.12.0.jar

加载iceberg-flink-runtime-0.12.0.jar启动Flink SQL client,可以创建Hadoop catalog:

IT知识分享网[bigdata@bigdata185 flink-yarn]$ bin/sql-client.sh embedded \
-j /opt/software/iceberg/iceberg-flink-runtime-0.12.0.jar \
shell

① 创建Hadoop cataloglog

-- 创建Hadoop catlog
CREATE CATALOG hadoop_catalog WITH (   
    'type'='iceberg',   
    'catalog-type'='hadoop',   
    'warehouse'='hdfs://bigdata185:8020/warehouse/hadoop/iceberg',   
    'property-version'='1'
);

② 创建database

IT知识分享网-- 创建database
CREATE DATABASE if not exists iceberg_hadoop_db;

③ 创建表

-- 建表
CREATE TABLE hadoop_catalog.iceberg_hadoop_db.iceberg_dept(
    deptno BIGINT COMMENT 'unique id',
    dname STRING COMMENT '部门名称',
    loc STRING COMMENT '部门所在位置'
);
flink基础教程 豆瓣_大数据与我的文章

2.1.2.2 flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar

点击这个网址可以下载flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar。

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.13.2/flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar

加载iceberg-flink-runtime-0.12.0.jar和flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar启动Flink SQL client,可以创建Hadoop catalog和Hive catalog。

[bigdata@bigdata185 flink-yarn]$ bin/sql-client.sh embedded \
-j /opt/software/iceberg/iceberg-flink-runtime-0.12.0.jar \
-j /opt/software/iceberg/flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar \
shell

① 创建Hive catalog

-- 创建Hive catalog
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://bigdata185:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://bigdata185:8020/warehouse/iceberg/dept',
  'hive-conf-dir'='/opt/module/hive/conf/hive-site.xml'
);
  • type:必须使用“iceberg”。(必须)
  • catalog-type:支持hive和hadoop两种方式。(必须)
  • uri:Hive metastore的thrift URI。(必须)
  • clients:客户池大小。(可选)
  • hive-conf-dir:该配置文件将用于提供自定义的 Hive 配置值。(可选)

② 创建database

默认情况下,Flink会使用Iceberg的default_database数据库,如果我们不想在default数据库,可以使用下面的语句创建database。

-- 创建database
CREATE DATABASE iceberg_db;
USE iceberg_db;

③ 创建表

CREATE TABLE hive_catalog.iceberg_db.iceberg_dept (
    deptno BIGINT COMMENT 'unique id',
    dname STRING COMMENT '部门名称',
    loc STRING COMMENT '部门所在位置'
);

2.2 DataStream读写数据(FlinkStream API)

2.2.1 DataStream写数据

Iceberg支持多种DataStream写数据的方式

2.2.1.1 追加写数据

import com.yunclass.util.SensorIcebergSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.*;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.types.Types;

import java.util.Map;
import com.google.common.collect.ImmutableMap;
public class FlinkWriteIcebergHiveTableStream {
    public static void main(String[] args) throws Exception {
        // 1 设置Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2 加载数据源
        DataStream<RowData> inputStream = env.addSource(new SensorIcebergSource());

        // 3 设置Hive表的Schema信息
        Schema schema = new Schema(
                Types.NestedField.optional(0, "sensorid", Types.StringType.get()),
                Types.NestedField.optional(1, "ts", Types.LongType.get()),
                Types.NestedField.optional(2, "temperature", Types.DoubleType.get())
        );


        PartitionSpec spec = PartitionSpec.unpartitioned();

        // 4 Hadoop集群
        String basePath = "hdfs://bigdata185:8020/";
        String tablePath = basePath.concat("warehouse/iceberg/sensordata");

        // 5 设置数据的存储格式:orc、parquet或者avro
        Map<String, String> props =
                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());

        Table table = new HadoopTables().create(schema, spec, props, tablePath);

        TableLoader tableLoader = TableLoader.fromHadoopTable(tablePath);

        // 6 向HDFS目录写入数据
        FlinkSink.forRowData(inputStream)
                .table(table)
                .tableLoader(tableLoader)
                .writeParallelism(1)
                .build();

        env.execute("通过Append方式向Hive中写入数据");
    }
}

我们通过Hive表验证一下写入结果

-- 创建hive的外部表
CREATE EXTERNAL TABLE sensordata(
sensorid string,
ts bigint,
temperature double
)
STORED AS PARQUET 
LOCATION '/warehouse/iceberg/sensordata/data';

-- 查询数据结果
hive> select * from sensordata;
OK
sensor_0        1630994084      22.8
sensor_1        1630994084      34.0
sensor_2        1630994084      20.7
sensor_3        1630994084      20.2
sensor_4        1630994084      9.7
sensor_5        1630994084      9.2
sensor_6        1630994084      29.4
sensor_7        1630994084      1.6
sensor_8        1630994084      26.4
sensor_9        1630994084      24.8
Time taken: 0.15 seconds, Fetched: 10 row(s)
hive> 

还可以换一种写法

import com.google.common.collect.ImmutableMap;
import com.yunclass.util.SensorIcebergSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;

import java.util.Map;

public class FlinkWriteIcebergHadoopStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Configuration conf = new Configuration();
        DataStreamSource<RowData> inputStream = env.addSource(new SensorIcebergSource());
        String basePath = "hdfs://bigdata185:8020/warehouse/iceberg/";
        HadoopCatalog hpCatalog = new HadoopCatalog(conf, basePath);

        TableIdentifier name = TableIdentifier.of("hadoop", "sensordata");
        PartitionSpec spec = PartitionSpec.unpartitioned();
        Schema schema = new Schema(
                Types.NestedField.optional(0, "sensorid", Types.StringType.get()),
                Types.NestedField.optional(1, "ts", Types.LongType.get()),
                Types.NestedField.optional(2, "temperature", Types.DoubleType.get())
        );

        String tablePath = basePath.concat("hadoop/sensordata");
        Map<String, String> props =
                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
        Table table = hpCatalog.createTable(name, schema, spec, tablePath, props);
        TableLoader tableLoader = TableLoader.fromHadoopTable(tablePath);

        FlinkSink.forRowData(inputStream)
                .table(table)
                .tableLoader(tableLoader)
                .writeParallelism(1)
                .build();
        env.execute();
    }
}

2.2.1.2 覆盖写数据

覆盖写只需要增加overwrite(true)即可,其他代码可以复用。

			FlinkSink.forRowData(inputStream)
                .table(table)
                .tableLoader(tableLoader)
                .writeParallelism(1)
                .overwrite(true)
                .build();

2.2.2 DataStream读数据

下面这个例子是用Flink API读取Iceberg表数据,然后将查询结果打印到控制台。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;

/**
 * 通过Flink读取Iceberg数据
 */
public class FlinkReadIcebergHiveTableStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        String basePath = "hdfs://bigdata185:8020/";
        String tablePath = basePath.concat("warehouse/iceberg/sensordata");
        TableLoader tableLoader = TableLoader.fromHadoopTable(tablePath);

        DataStream<RowData> batchData = FlinkSource.forRowData()
                .env(env)
                .tableLoader(tableLoader)
                .streaming(true)

                .build();

        batchData.print();
        env.execute();
    }
}

三 总结

这篇文章是数据湖-iceberg的开篇,后面会陆续增加iceberg的文章。没有SQL的开发是没有灵魂的,不能提高效率,后面将陆续给大家介绍Iceberg的知识(包括FlinkSQL和SparkSQL对Iceberg表的操作),欢迎大家转发、评论。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6288.html

(0)
上一篇 2022-12-14 22:56
下一篇 2022-12-14 23:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信