大家好,欢迎来到IT知识分享网。
SeaTunnel教程
第1章 Seatunnel概述
1.1 SeaTunnel是什么
SeaTunnel是一个简单易用的数据集成框架,在企业中,由于开发时间或开发部门不通用,往往有多个异构的、运行在不同的软硬件平台上的信息系统同时运行。数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,从而为企业提供全面的数据共享。SeaTunnel支持海量数据的实时同步。它每天可以稳定高效地同步数百亿数据。并已用于近100家公司的生产。
SeaTunnel的前身是Waterdrop(中文名:水滴)自2021年10月12日更名为SeaTunnel。2021年12月9日,SeaTunnel正式通过Apache软件基金会的投票决议,以全票通过的优秀表现正式成为Apache孵化器项目。2022年3月18日社区正式发布了首个Apache版本v2.1.0。
1.2 SeaTunnel在做什么
本质上,SeaTunnel不是对Saprk和Flink的内部修改,而是在Spark和Flink的基础上做了一层包装。它主要运用了控制反转的设计模式,这也是SeaTunnel实现的基本思想。
SeaTunnel的日常使用,就是编辑配置文件。编辑好的配置文件由SeaTunnel转换为具体的Spark或Flink任务。如图所示。
1.3 SeaTunnel的应用场景
SeaTunnel适用于以下场景 |
SeaTunnel的特点 |
|
|
目前SeaTunnel的长板是他有丰富的连接器,又因为它以Spark和Flink为引擎。所以可以很好地进行分布式的海量数据同步。通常SeaTunnel会被用来做出仓入仓工具,或者被用来进行数据集成。比如,唯品会就选择用SeaTunnel来解决数据孤岛问题,让ClcikHouse集成到了企业中先前的数据系统之中。如图所示:
下图是SeaTunnel的工作流程:
1.5 SeaTunnel目前的插件支持
1.5.1 Spark连接器插件(Source)
1.5.2 Flink 连接器插件(Source)
1.5.3 Spark & Flink 转换插件
转换插件 |
Spark |
Flink |
Add |
||
CheckSum |
||
Convert |
||
Date |
||
Drop |
||
Grok |
||
Json |
√ |
|
Kv |
||
Lowercase |
||
Remove |
||
Rename |
||
Repartition |
||
Replace |
||
Sample |
||
Split |
√ |
√ |
Sql |
√ |
√ |
Table |
||
Truncate |
||
Uppercase |
||
Uuid |
这部分内容来官网,可以看出社区目前规划了大量的插件,但截至V2.1.0 可用的transform插件的数量还是很少的。同学们有兴趣也可以在业余时间尝试参与开源贡献。
官方网址:https://seatunnel.apache.org/zh-CN/
第2章 Seatunnel安装和使用
注意v2.1.0中有少量bug,要想一次性跑通所有示例程序,需使用我们自己编译的包,可以在资料包里获取。具体如何修改源码,可以参考文档第5章。
2.1 SeaTunnel的环境依赖
截至SeaTunnel V2.1.0。
SeaTunnel支持Spark 2.x(尚不支持Spark 3.x)。支持Flink 1.9.0及其以上的版本。
Java版本需要>=1.8
我们演示时使用的是flink版本是1.13.0
2.2 SeaTunnel的下载和安装
1)使用wget下载SeaTunnel,使用-O参数将文件命名为seatunnel-2.1.0.tar.gz
wget https://downloads.apache.org/incubator/seatunnel/2.1.0/apache-seatunnel-incubating-2.1.0-bin.tar.gz -O seatunnel-2.1.0.tar.gz2
2)解压下载好的tar.gz包
tar -zxvf seatunnel-2.1.0.tar.gz -C /opt/module/
3)查看解压的目标路径,apache-seatunnel-incubating-2.1.0的目录就是我们已经安装好的seatunnel。Incubating的意思是孵化中。
2.3 SeaTunnel的依赖环境配置
在config/目录中有一个seatunnel-env.sh脚本。我们可以看一下里面的内容。
这个脚本中声明了SPARK_HOME和FLINK_HOME两个路径。默认情况下seatunnel-env.sh中的SPARK_HOME和FLINK_HOME就是系统环境变量中的SPARK_HOME和FLINK_HOME。
在shell脚本中:-的意思是如果:-前的内容为空,则替换为后面的。
例如,环境变量中没有FLINK_HOME。那么SeaTunnel运行时会将FLINK_HOME设为/opt/flink。
如果你机器上的环境变量SPARK_HOME指向了3.x的一个版本。但是想用2.x的Spark来试一下SeaTunnel。这种情况下,如果你不想改环境变量,那就直接在seatunnel-env.sh中将2.x的路径赋值给SPARK_HOME即可。
比如:
2.4 示例1: SeaTunnel 快速开始
我们先跑一个官方的flink案例。来了解它的基本使用。
1)选择任意路径,创建一个文件。这里我们选择在SeaTunnel的config路径下创建一个example01.conf
[daydayup@hadoop102 config]$ vim example01.conf
2)在文件中编辑如下内容
# 配置Spark或Flink的参数
env {
# You can set flink configuration here
execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = “hdfs://hadoop102:9092/checkpoint”
}
# 在source所属的块中配置数据源
source {
SocketStream{
host = hadoop102
result_table_name = “fake”
field_name = “info”
}
}
# 在transform的块中声明转换插件
transform {
Split{
separator = “#”
fields = [“name”,”age”]
}
sql {
sql = “select info, split(info) as info_row from fake”
}
}
# 在sink块中声明要输出到哪
sink {
ConsoleSink {}
}
3)开启flink集群
[daydayup@hadoop102 flink-1.11.6]$ bin/start-cluster.sh
4)开启一个netcat服务来发送数据
[daydayup@hadoop102 ~]$ nc -lk 9999
5)使用SeaTunnel来提交任务。
在bin目录下有以下内容
start-seatunnel-flink.sh是用来提交flink任务的。start-seatunnel-spark.sh是用来提交Spark任务的。这里我们用flink演示。所以使用start-seatunnel-flink.sh。
用–config 参数指定我们的应用配置文件。
bin/start-seatunnel-flink.sh –config config/example01.sh
等待弹出Job已经提交的提示
6)在netcat上发送数据
7)在Flink webUI上查看输出结果。
8)小结
至此,我们已经跑完了一个官方案例。它以Socket为数据源。经过SQL的处理,最终输出到控制台。在这个过程中,我们并没有编写具体的flink代码,也没有手动去打jar包。我们只是将数据的处理流程声明在了一个配置文件中。
在背后,是SeaTunnel帮我们把配置文件翻译为具体的flink任务。配置化,低代码,易维护是SeaTunnel最显著的特点。
第3章 SeaTunnel基本原理
3.1 SeaTunnel的启动脚本
3.1.1 启动脚本的参数
截至目前,SeaTunnel有两个启动脚本。
提交spark任务用start-seatunnel-spark.sh。
提交flink任务则用start-seatunnel-flink.sh。
本文档主要是结合flink来使用seatunnel的,所以用start-seatunnel-flink.sh来讲解。
start-seatunnle-flink.sh可以指定3个参数
分别是:
–config 应用配置的路径
–variable 应用配置里的变量赋值
–check 检查config语法是否合法
3.1.2 –check参数
截至本文档撰写时的SeaTunnel版本v2.1.0。check功能还尚在开发中,因此–check参数是一个虚设。目前start-seatunnel-flink.sh并不能对应用配置文件的语法合法性进行检查。而且start-seatunnel-flink.sh中目前没有对–check参数的处理逻辑。
需要注意!使用过程中,如果没有使用–check参数,命令行一闪而过。那就是你的配置文件语法有问题。
3.1.3 –config参数和–variable参数
–config参数用来指定应用配置文件的路径。
–variable参数可以向配置文件传值。配置文件内是支持声明变量的。然后我们可以通过命令行给配置中的变量赋值。
变量声明语法如下。
sql {
sql = “select * from (select info,split(info) from fake) where age > ‘”${age}”‘”
}
在配置文件的任何位置都可以声明变量。并用命令行参数–variable key=value的方式将变量值传进去,你也可以用它的短命令形式 -i key=value。传递参数时,key需要和配置文件中声明的变量名保持一致。
如果需要传递多个参数,那就在命令行里面传递多个-i或–variable key=value。
比如:
bin/start-seatunnel-flink.sh –config/xxx.sh -i age=18 -i sex=man
3.1.4 示例2:配置中使用变量
1)我们在example01.conf的基础上创建example02.conf。
[daydayup@hadoop102 config]$ cp example01.sh example02.sh
2)修改文件
[daydayup@hadoop102 config]$ vim example02.sh
3)给sql插件声明一个变量,红色的是我们修改的地方。最终的配置文件如下。
env {
execution.parallelism = 1
}
source {
SocketStream{
result_table_name = “fake”
field_name = “info”
}
}
transform {
Split{
separator = “#”
fields = [“name”,”age”]
}
sql {
sql = “select * from (select info, split(info) from fake) where age > ‘”${age}”‘”
# 需要套一层子查询,因为where先于select,split出的字段无法用where过滤
}
}
sink {
ConsoleSink {}
}
4)开启netcat服务
[daydayup@hadoop102 ~]nc -l 9999
5)使用SeaTunnel来提交任务。-i age=18往命令行中
bin/start-seatunnel-flink.sh –config config/example01.sh -i age=18
6)接着,我们用nc发送几条数据看看效果。
7)在flink的webUI上我们看一下控制台的输出。最终发现未满18岁的李四被过滤掉了。
8)小结
通过传递变量,我们可以实现配置文件的复用。让同一份配置文件就能满足不同的业务需求。
3.1.5 给flink传递参数
在启动脚本的尾部,我们可以看到,start-seatunnel-flink.sh会执行(exec)一条命令,这个命令会使用flink的提交脚本去向集群提交一个任务。而且在调用bin/flink run的时候,还传递了PARAMS作为flink run的参数。
如下图所示,我们可知,凡是–config 和 –variable之外的命令行参数都被放到PARAMS变量中,最后相当于给flink run传递了参数。注意!命令行参数解析过程中没有涉及–check参数处理。这也是为什么说它目前不支持–check操作。
比如,我们可以在seatunnel启动脚本中,指定flink job并行度。
bin/start-seatunnel-flink.sh –config config/ -p 2\
3.2 SeaTunnel的配置文件
3.2.1 应用配置的4个基本组件
我们从SeaTunnel的app配置文件开始讲起。
一个完整的SeaTunnel配置文件应包含四个配置组件。分别是:
env{} source{} –> transform{} –> sink{}
在Source和Sink数据同构时,如果业务上也不需要对数据进行转换,那么transform中的内容可以为空。具体需根据业务情况来定。
3.2.2 env块
env块中可以直接写spark或flink支持的配置项。比如并行度,检查点间隔时间。检查点hdfs路径等。在SeaTunnel源码的ConfigKeyName类中,声明了env块中所有可用的key。如图所示:
3.2.3 SeaTunnel中的核心数据结构Row
Row是SeaTunnel中数据传递的核心数据结构。对flink来说,source插件需要给下游的转换插件返回一个DataStream<Row>,转换插件接到上游的DataStream<Row>进行处理后需要再给下游返回一个DataStream<Row>。最后Sink插件将转换插件处理好的DataStream<Row>输出到外部的数据系统。
如图所示:
因为DataStream<Row>可以很方便地和Table进行互转,所以将Row当作核心数据结构可以让转换插件同时具有使用代码(命令式)和sql(声明式)处理数据的能力。
3.2.4 source块
source块是用来声明数据源的。source块中可以声明多个连接器。比如:
# 伪代码
env {
…
}
source {
hdfs { … }
elasticsearch { … }
jdbc {…}
}
transform {
sql {
sql = “””
select …. from hdfs_table
join es_table
on hdfs_table.uid = es_table.uid where …”””
}
}
sink {
elasticsearch { … }
}
需要注意的是,所有的source插件中都可以声明result_table_name。如果你声明了result_table_name。SeaTunnel会将source插件输出的DataStream<Row>转换为Table并注册在Table环境中。当你指定了result_table_name,那么你还可以指定field_name,在注册时,给Table重设字段名(后面的案例中会讲解)。
因为不同source所需的配置并不一样,所以对source 连接器的配置最好参考官方的文档。
3.2.5 transform块
目前社区对插件做了很多规划,但是截至v2.1.0版本,可用的插件总共有两个,一个是Split,另一个是sql。
transform{}块中可以声明多个转换插件。所有的转换插件都可以使用source_table_name,和result_table_name。同样,如果我们声明了result_table_name,那么我们就能声明field_name。
我们需要着重了解一下Split插件和sql插件的实现。但在此
在SeaTunnel中,一个转换插件的实现类最重要的逻辑在下述四个方法中。
1)处理批数据,DataSet<Row>进,DataSet<Row>出
DataSet<Row> processBatch(FlinkEnvironment env, DataSet<Row> data)
2)处理流数据,DataStram<Row>进,DataStream<Row>出
DataStream<Row> processStream(FlinkEnvironment env, DataStream<Row> dataStream)
3)函数名叫注册函数。实际上,这是一个约定,它只不过是每个transform插件作用于流之后调用的一个函数。
void registerFunction(FlinkEnvironment env, DataStream<Row> datastream)
4)处理一些预备工作,通常是用来解析配置。
void prepare(FlinkEnvironment prepareEnv)
Split插件的实现
现在我们需要着重看一下Split插件的实现。
先回顾一下我们之前example01.conf中关于transform的配置。
接着我们再来看一下Split的源码实现。
我们发现Split插件并没有对数据流进行任何的处理,而是将它直接return了。反之,它向表环境中注册了一个名为split的UDF(用户自定义函数)。而且,函数名是写死的。这意味着,如果你声明了多个Split,后面的UDF还会把前面的覆盖。
这是开发时需要注意的一个点。
但是,需要注意,tranform接口其实是留给了我们直接操作数据的能力的。也就是processStream方法。那么,一个transform插件其实同时履行了process和udf的职责,这是违反单一职责原则的。那要判断一个转换插件在做什么就只能从源码和文档的方面来加以区分了。
最后需要叮嘱的是,指定soure_table_name对于sql插件的意义不大。因为sql插件可以通过from子句来决定从哪个表里抽取数据。
3.2.6 sink块
Sink块里可以声明多个sink插件,每个sink插件都可以指定source_table_name。不过因为不同Sink插件的配置差异较大,所以在实现时建议参考官方文档。
3.3 SeaTunnel的基本原理
SeaTunnel的工作原理简单明了。
1)程序会解析你的应用配置,并创建环境
2)配置里source{},transform{},sink{}三个块中的插件最终在程序中以List集合的方式存在。
3)由Excution对象来拼接各个插件,这涉及到选择source_table,注册result_table等流程,注册udf等流程。并最终触发执行
可以参考下图:
3.4 小结
最后我们用一张图将SeaTunnel中的重要概念串起来。
如果你愿意,依托sql插件和udf。单个配置文件也可以定义出比较复杂的工作流。但SeaTunnel的定位是一个数据集成平台。核心的功能是依托丰富的连接器进行数据同步,数据处理并不是SeaTunnel的长处。所以在SeaTunnel中定义复杂的工作流或许是一种不值得提倡的做法。
需要提醒的是,如果你不指定source_table_name,插件会使用它在配置文件上最近的上一个插件的输出作为输入。
所以,我们可以通过使用依托表名表环境来实现复杂的工作流。
也可以减少表名的使用实现简单的数据同步通道。
第4章 应用案例
注意!下述示例请使用我们修改编译好的包。
4.1 Kafka进Kafka出的简单ETL
4.1.1 需求
对test_csv主题中的数据进行过滤,仅保留年龄在18岁以上的记录。
4.1.2 需求实现
1)首先,创建为kafka创建test_csv主题。
kafka-topics.sh –bootstrap-server hadoop102:9092 –create –topic test_csv –partitions 1 –replication-factor 1
2)为kafka创建test_sink主题
kafka-topics.sh –bootstrap-server hadoop102:9092 –create –topic test_sink –partitions 1 –replication-factor 1
3)编辑应用配置
[daydayup@hadoop102 config]$ vim example03.conf
4)应用配置内容
env {
# You can set flink configuration here
execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = “hdfs://hadoop102:9092/checkpoint”
}
# 在source所属的块中配置数据源
source {
KafkaTableStream {
consumer.bootstrap.servers = “hadoop102:9092”
consumer.group.id = “seatunnel-learn”
topics = test_csv
result_table_name = test
format.type = csv
schema = “[{\”field\”:\”name\”,\”type\”:\”string\”},{\”field\”:\”age\”, \”type\”: \”int\”}]”
format.field-delimiter = “;”
format.allow-comments = “true”
format.ignore-parse-errors = “true”
}
}
# 在transform的块中声明转换插件
transform {
sql {
sql = “select name,age from test where age > ‘”${age}”‘”
}
}
# 在sink块中声明要输出到哪
sink {
kafkaTable {
topics = “test_sink”
producer.bootstrap.servers = “hadoop102:9092”
}
}
5)提交任务
bin/start-seatunnel-flink.sh –config config/example03.conf -i age=18
6)起一个kafka console producer 发送csv数据(分号分隔)
7)起一个kafka console consumer 消费数据
我们成功地实现了数据从kafka输入经过简单的ETL再向kafka输出。
4.2 Kafka 输出到Doris进行指标统计
4.2.1 需求
使用回话日志统计用户的总观看视频数,用户最常会话市场,用户最小会话时长,用户最后一次会话时间。
4.2.2 需求实现
1)在资料中有一个伪数据的生成脚本,将它拷贝到服务器的任意位置
2)执行以下命令安装python脚本需要的两个依赖库
pip3 install Faker
pip3 install kafka-python
3)使用mysql客户端连接doris
[daydayup@hadoop102 fake_data]$ mysql -h hadoop102 -P 9030 -udaydayup -p123321
4)手动创建test_db数据库。
create database test_db;
5)使用下述sql语句建表
CREATE TABLE `example_user_video` (
`user_id` largeint(40) NOT NULL COMMENT “用户id”,
`city` varchar(20) NOT NULL COMMENT “用户所在城市”,
`age` smallint(6) NULL COMMENT “用户年龄”,
`video_sum` bigint(20) SUM NULL DEFAULT “0” COMMENT “总观看视频数”,
`max_duration_time` int(11) MAX NULL DEFAULT “0” COMMENT “用户最长会话时长”,
`min_duration_time` int(11) MIN NULL DEFAULT “999999999” COMMENT “用户最小会话时长”,
`last_session_date` datetime REPLACE NULL DEFAULT “1970-01-01 00:00:00” COMMENT “用户最后一次会话时间”
) ENGINE=OLAP
AGGREGATE KEY(`user_id`, `city`, `age`)
COMMENT “OLAP”
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
;
6)在config目录下, 编写如下的配置文件。
env {
execution.parallelism = 1
}
source {
KafkaTableStream {
consumer.bootstrap.servers = “hadoop102:9092”
consumer.group.id = “seatunnel5”
topics = test
result_table_name = test
format.type = json
schema = “{\”session_id\”:\”string\”,\”video_count\”:\”int\”,\”duration_time\”:\”long\”,\”user_id\”:\”string\”,\”user_age\”:\”int\”,\”city\”:\”string\”,\”session_start_time\”:\”datetime\”,\”session_end_time\”:\”datetime\”}”
format.ignore-parse-errors = “true”
}
}
transform{
sql {
sql = “select user_id,city,user_age as age,video_count as video_sum,duration_time as max_duration_time,duration_time as min_duration_time,session_end_time as last_session_date from test”
result_table_name = test2
}
}
sink{
DorisSink {
source_table_name = test2
fenodes = “hadoop102:8030”
database = test_db
table = example_user_video
user = daydayup
password = 123321
batch_size = 50
doris.column_separator=”\t”
doris.columns=”user_id,city,age,video_sum,max_duration_time,min_duration_time,last_session_date”
}
}
7)使用python脚本向kafka中生成伪数据
[daydayup@hadoop102 fake_data]$ python3 fake_video.py –bootstrap-server hadoop102:9092 –topic test_video
8)查看doris中的结果。
Select * from `example_user_video`;
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/31163.html