大家好,欢迎来到IT知识分享网。
一 Iceberg概述
Iceberg的官方定义:Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table.
用通俗的语言解释一下就是,Iceberg是介于计算层(Spark、Trino、Presto、Flink和Hive)和存储层(ORC、Parquet)的中间“表格式”层。它与存储层最大的区别是,它并不定义数据的存储方式;仅仅是定义数据和元数据的组织方式,向计算层提供统一的“表”定义。它构建在数据存储格式之上,底层的数据存储仍然是ORC、Parquet等格式。通过Spark、Flink写入Iceberg,然后在通过Spark、Trino、Presto、Flink等来读取数据。
从Iceberg起到的作用可以看出,Iceberg并不绑定某一种特定的执行引擎,它实现了通用的数据组织格式。这种格式可以方便地与不同的计算引擎(Flink、Spark、Hive等)对接。
1.1 Iceberg的优势
1.1.1 优化数据入库流程
Iceberg本身提供ACID的事务支持,保证数据写入即可见;Iceberg提供了Upsert、Merge Into的能力,极大地缩小了数据入库延迟性。能够提供分钟级别的计算延迟。
1.1.2 支持多种数据分析引擎
支持Hive、Spark、Presto、Flink等多种数据分析引擎。
1.1.3 统一灵活的表格式
Iceberg提供了基于流式的增量计算模型和基于批处理的全量表计算模型。流处理和批处理任务可以使用相同的存储模型,使数据不再孤立存在。Iceberg支持隐藏分区和分区进化,方便业务进行数据分区策略更新。支持Parquet、Avro以及ORC等存储格式。
1.2 Iceberg的应用场景
1.2.1 构建实时的Data Pipeline
可以使用Flink+Iceberg的方式构建实时Data Pipeline。通过Flink消费Kafka的消息日志,直接将原始日志写入Iceberg,然后再通过Flink的ETL计算将中间结果写入下游的Iceberg表。Iceberg能够支持Flink每30秒或者1分钟的增量数据写入,这样可以提供端到端的数据及时性。
借助Flink实现数据的ExactlyOnce写入数据湖,由于Kafka的成本比较高,一般只保留7天左右的数据,Iceberg的存储成本相对比较低,可以保存全量的历史数据。针对特殊需求,可以采用Iceberg全量数据和Kafka的增量数据来驱动新的Flink作业。
1.2.2 实现CDC功能
Flink本身已经原生的支持CDC数据解析,可以用Flink CDC2.0技术编写几行代码,将解析Binlog日志写入Iceberg。CDC数据在写入Iceberg之后,可以使用Presto、Spark、Hive等执行引擎实时的读取并分析Iceberg中的CDC数据。
1.2.3 实现准实时数据仓库建设
Iceberg支持并发读、增量读、合并小文件,而且还能做到秒级/分钟级的数据延迟。基于Iceberg这些优势,采用Flink+Iceberg的方式构建了流批一体化的准实时数据仓库。
二 Flink+Iceberg案例
Apache Iceberg在0.12版本增加了Flink对行级数据删除的功能,除此之外,还支持使用Flink Streaming读取数据,通过Flink CDC/Upsert操作。同时还能通过write.distribution-mode=hash和Batch rewrite files action手段合并小文件。
2.1 环境准备
我们使用的环境是Flink1.13+Hadoop3.2+Hive3.1。
2.1.1 Flink操作Maven环境准备
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>0.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>0.12.0</version>
</dependency>
<!-- 在Idea环境操作Hadoop集群文件 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.13.0</version>
</dependency>
IT知识分享网
2.1.2 FlinkSQL客户端操作
如果通过FlinkSQL交互窗口操作,需要下载以下两个jar包。
2.1.2.1 iceberg-flink-runtime-0.12.0.jar
点击这个网址可以下载iceberg-flink-runtime-0.12.0.jar。
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/0.12.0/iceberg-flink-runtime-0.12.0.jar
加载iceberg-flink-runtime-0.12.0.jar启动Flink SQL client,可以创建Hadoop catalog:
IT知识分享网[bigdata@bigdata185 flink-yarn]$ bin/sql-client.sh embedded \
-j /opt/software/iceberg/iceberg-flink-runtime-0.12.0.jar \
shell
① 创建Hadoop cataloglog
-- 创建Hadoop catlog
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://bigdata185:8020/warehouse/hadoop/iceberg',
'property-version'='1'
);
② 创建database
IT知识分享网-- 创建database
CREATE DATABASE if not exists iceberg_hadoop_db;
③ 创建表
-- 建表
CREATE TABLE hadoop_catalog.iceberg_hadoop_db.iceberg_dept(
deptno BIGINT COMMENT 'unique id',
dname STRING COMMENT '部门名称',
loc STRING COMMENT '部门所在位置'
);
2.1.2.2 flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar
点击这个网址可以下载flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar。
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.13.2/flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar
加载iceberg-flink-runtime-0.12.0.jar和flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar启动Flink SQL client,可以创建Hadoop catalog和Hive catalog。
[bigdata@bigdata185 flink-yarn]$ bin/sql-client.sh embedded \
-j /opt/software/iceberg/iceberg-flink-runtime-0.12.0.jar \
-j /opt/software/iceberg/flink-sql-connector-hive-3.1.2_2.12-1.13.2.jar \
shell
① 创建Hive catalog
-- 创建Hive catalog
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://bigdata185:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://bigdata185:8020/warehouse/iceberg/dept',
'hive-conf-dir'='/opt/module/hive/conf/hive-site.xml'
);
- type:必须使用“iceberg”。(必须)
- catalog-type:支持hive和hadoop两种方式。(必须)
- uri:Hive metastore的thrift URI。(必须)
- clients:客户池大小。(可选)
- hive-conf-dir:该配置文件将用于提供自定义的 Hive 配置值。(可选)
② 创建database
默认情况下,Flink会使用Iceberg的default_database数据库,如果我们不想在default数据库,可以使用下面的语句创建database。
-- 创建database
CREATE DATABASE iceberg_db;
USE iceberg_db;
③ 创建表
CREATE TABLE hive_catalog.iceberg_db.iceberg_dept (
deptno BIGINT COMMENT 'unique id',
dname STRING COMMENT '部门名称',
loc STRING COMMENT '部门所在位置'
);
2.2 DataStream读写数据(FlinkStream API)
2.2.1 DataStream写数据
Iceberg支持多种DataStream写数据的方式
2.2.1.1 追加写数据
import com.yunclass.util.SensorIcebergSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.*;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.types.Types;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
public class FlinkWriteIcebergHiveTableStream {
public static void main(String[] args) throws Exception {
// 1 设置Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2 加载数据源
DataStream<RowData> inputStream = env.addSource(new SensorIcebergSource());
// 3 设置Hive表的Schema信息
Schema schema = new Schema(
Types.NestedField.optional(0, "sensorid", Types.StringType.get()),
Types.NestedField.optional(1, "ts", Types.LongType.get()),
Types.NestedField.optional(2, "temperature", Types.DoubleType.get())
);
PartitionSpec spec = PartitionSpec.unpartitioned();
// 4 Hadoop集群
String basePath = "hdfs://bigdata185:8020/";
String tablePath = basePath.concat("warehouse/iceberg/sensordata");
// 5 设置数据的存储格式:orc、parquet或者avro
Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
Table table = new HadoopTables().create(schema, spec, props, tablePath);
TableLoader tableLoader = TableLoader.fromHadoopTable(tablePath);
// 6 向HDFS目录写入数据
FlinkSink.forRowData(inputStream)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(1)
.build();
env.execute("通过Append方式向Hive中写入数据");
}
}
我们通过Hive表验证一下写入结果
-- 创建hive的外部表
CREATE EXTERNAL TABLE sensordata(
sensorid string,
ts bigint,
temperature double
)
STORED AS PARQUET
LOCATION '/warehouse/iceberg/sensordata/data';
-- 查询数据结果
hive> select * from sensordata;
OK
sensor_0 1630994084 22.8
sensor_1 1630994084 34.0
sensor_2 1630994084 20.7
sensor_3 1630994084 20.2
sensor_4 1630994084 9.7
sensor_5 1630994084 9.2
sensor_6 1630994084 29.4
sensor_7 1630994084 1.6
sensor_8 1630994084 26.4
sensor_9 1630994084 24.8
Time taken: 0.15 seconds, Fetched: 10 row(s)
hive>
还可以换一种写法
import com.google.common.collect.ImmutableMap;
import com.yunclass.util.SensorIcebergSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import java.util.Map;
public class FlinkWriteIcebergHadoopStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Configuration conf = new Configuration();
DataStreamSource<RowData> inputStream = env.addSource(new SensorIcebergSource());
String basePath = "hdfs://bigdata185:8020/warehouse/iceberg/";
HadoopCatalog hpCatalog = new HadoopCatalog(conf, basePath);
TableIdentifier name = TableIdentifier.of("hadoop", "sensordata");
PartitionSpec spec = PartitionSpec.unpartitioned();
Schema schema = new Schema(
Types.NestedField.optional(0, "sensorid", Types.StringType.get()),
Types.NestedField.optional(1, "ts", Types.LongType.get()),
Types.NestedField.optional(2, "temperature", Types.DoubleType.get())
);
String tablePath = basePath.concat("hadoop/sensordata");
Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
Table table = hpCatalog.createTable(name, schema, spec, tablePath, props);
TableLoader tableLoader = TableLoader.fromHadoopTable(tablePath);
FlinkSink.forRowData(inputStream)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(1)
.build();
env.execute();
}
}
2.2.1.2 覆盖写数据
覆盖写只需要增加overwrite(true)即可,其他代码可以复用。
FlinkSink.forRowData(inputStream)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(1)
.overwrite(true)
.build();
2.2.2 DataStream读数据
下面这个例子是用Flink API读取Iceberg表数据,然后将查询结果打印到控制台。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;
/**
* 通过Flink读取Iceberg数据
*/
public class FlinkReadIcebergHiveTableStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String basePath = "hdfs://bigdata185:8020/";
String tablePath = basePath.concat("warehouse/iceberg/sensordata");
TableLoader tableLoader = TableLoader.fromHadoopTable(tablePath);
DataStream<RowData> batchData = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.build();
batchData.print();
env.execute();
}
}
三 总结
这篇文章是数据湖-iceberg的开篇,后面会陆续增加iceberg的文章。没有SQL的开发是没有灵魂的,不能提高效率,后面将陆续给大家介绍Iceberg的知识(包括FlinkSQL和SparkSQL对Iceberg表的操作),欢迎大家转发、评论。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6288.html