大家好,欢迎来到IT知识分享网。
一 Join优化
在编写Hive语句时,Join是最常见的操作方式之一。总的优化原则是:
1.1 使用相同的连接键
如果Join的key相同,不管有多少个表,都会则会合并为1个MapReduce任务,而不是“n”个,在做OUTER JOIN的时候也是一样。
如果Join的条件不相同,那么下面两种写法产生的MapReduce任务数和Job是是一致的,也就是下面两种写法是等价的。
INSERT OVERWRITE TABLE dwd_hbv_pubblog(dt=${date})
SELECT p.pageid, u.username, d.devicename
FROM ods_bhv_pageview p
JOIN ods_user_baseinfo u
ON (p.userid = u.userid)
JOIN ods_device_mapping_info d
ON u.deviceid=d.deviceid;
IT知识分享网
IT知识分享网-- 先关联ods_bhv_pageview和ods_user_baseinfo表,生成一张临时表
CREATE TABLE temp_user_bhv_userinfo_pageview_${date}
AS
SELECT p.pageid, u.username,u.deviceid
FROM ods_bhv_pageview p
JOIN ods_user_baseinfo u
ON (p.userid = u.userid);
-- 再将temp_user_bhv_userinfo_pageview_${date}和ods_device_mapping_info关联
INSERT OVERWRITE TABLE dwd_hbv_pubblog(dt=${date})
SELECT v.pageid, v.username, d.devicename
FROM temp_user_bhv_userinfo_pageview_${date} v
JOIN ods_device_mapping_info d
ON v.deviceid=d.deviceid;
1.2 最好把数据量小的表放在左边,尽量使用mapjoin操作
在使用写有Join操作的查询语句时有一条原则:应该将条目少的表/子查询放在Join操作符的左边。原因是在Join操作的Reduce阶段,位于Join操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生OOM错误的几率。因此通常需要将小表放前面,或者标记哪张表是大表:/*streamtable(table_name) */
1.3 尽量尽早地过滤数据
减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段,最大限度的减少参与Join的数据量。
1.4 尽量原子化操作
尽量避免一个SQL包含复杂逻辑,可以使用中间表来完成复杂的逻辑。
在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:
-- 这个是 join 的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000;
-- 如果是 join 过程出现倾斜应该设置为 true
set hive.optimize.skewjoin=true;
二 Group By优化
默认情况下,Map阶段结束后,相同Key的数据分发到一个reduce,当同一key数据量过大时就产生数据倾斜了。并不是所有的聚合操作都必要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。
2.1 Map端优化
IT知识分享网-- 是否在 Map 端进行聚合,默认为 True
set hive.map.aggr = true;
-- 在 Map 端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
2.2 使用 Group By 有数据倾斜的时候进行负载均衡
set hive.groupby.skewindata = true;
当 sql 语句使用 groupby 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行 负载均衡。策略就是把 MR 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总
在 MR 的第一个阶段中,Map 的输出结果集合会缓存到 maptaks 中,每个 Reduce 做部分聚 合操作,并输出结果,这样处理的结果是相同 Group By Key 有可能被分发到不同的 Reduce 中, 从而达到负载均衡的目的;第二个阶段 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成 最终的聚合操作。
三 整体架构优化
现在hive的整体框架如下,计算引擎不仅仅支持Map/Reduce,并且还支持Tez、Spark等。根据不同的计算引擎又可以使用不同的资源调度和存储系统。
3.1 执行引擎对比
MapReduce、Tez和Spark三种执行引擎的对比
3.1.1 MapReduce
是一种离线计算框架,将一个算法抽象成Map和Reduce两个阶段进行处理,非常适合数据密集型计算。
优点
大数据量下优势明显。
缺点
读写hdfs次数多;数据量不大时性能一般。
3.1.2 Tez
源于MapReduce框架,它把Map/Reduce过程拆分成若干个子过程,同时可以把多个Map/Reduce任务组合成一个较大的DAG任务,减少了Map/Reduce之间的文件存储。同时合理组合其子过程,也可以减少任务的运行时间。
该引擎核心思想是将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output,Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等,这样,这些分解后的元操作可以任意灵活组合,产生新的操作,这些操作经过一些控制程序组装后,可形成一个大的DAG作业。
优点
适用于DAG应用;小数据量下性能优MapReduce。
3.1.3 Spark
Spark基于Map/Reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS。
优点
能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法;实时数据计算有优势。
缺点
大数据量下吃内存。
3.2 压缩设置
3.2.1 压缩的原因
Hive 最终是转为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在于网络 IO 和 磁盘 IO,要解决性能瓶颈,最主要的是减少数据量,对数据进行压缩是个好的方式。压缩 虽然是减少了数据量,但是压缩过程要消耗 CPU 的,但是在 Hadoop 中, 往往性能瓶颈不 在于 CPU,CPU压力并不大,所以压缩充分利用了比较空闲的 CPU,为了减少磁盘存储空间以及I/O次数,对数据进行压缩。
3.2.2 常见的压缩算法对比
3.2.3 压缩算法使用
job输出文件按照BLOCK以Gzip方式进行压缩。
-- 默认值是 false
set mapreduce.output.fileoutputformat.compress=true
-- 默认值是 Record
set mapreduce.output.fileoutputformat.compress.type=BLOCK
-- 默认值是 org.apache.hadoop.io.compress.DefaultCodec
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec
map输出结果也以Gzip进行压缩。
set mapreduce.map.output.compress=true
-- 默认值是 org.apache.hadoop.io.compress.DefaultCodec
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec
对hive输出结果和中间结果进行压缩。
-- 默认值是 false,不压缩
set hive.exec.compress.output=true
-- 默认值是 false,为 true 时 MR 设置的压缩才启用
set hive.exec.compress.intermediate=true
3.3 合理利用文件存储格式
创建表时,尽量使用 orc、parquet 这些列式存储格式,因为列式存储的表,每一列的数据在 物理上是存储在一起的,Hive 查询时会只遍历需要列数据,大大减少处理的数据量。
四 小文件问题
4.1 小文件是如何产生的
- 动态分区插入数据,产生大量的小文件,从而导致map数量剧增。
- reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的)。
- 数据源本身就包含大量的小文件。
4.2 小文件的危害
- HDFS不适合存储大量的小文件,因为NameNode将文件系统的元数据存放在内存中,所以能够存储的文件数目受限于NameNode内存的大小。HDFS中每个文件、目录、数据块占用150Bytes。如果存放的文件数目过多的话会占用很大的内存甚至撑爆内存。
- HDFS适用于高吞吐的批量数据处理,如果同时存入大量的小文件会花费很长的处理时间。如果访问小文件,则必须从一个DataNode跳转到另外一个DataNode,这样会大大降低数据读取速度。
- 对 hive 来说,在进行查询时,每个小文件都会当成一个块,启动一个Map任务来完成,而一个Map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的Map数量是受限的。
4.3 如何解决小文件过多问题
4.3.1 concatenate命令,自动合并小文件
#对于非分区表
Alter Table A Concatenate;
#对于分区表
Alter Table B Partition(dt='20210811') Concatenate;
注意:
- Concatenate命令只支持RCFILE和ORC文件类型的Hive表。
- 使用Concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。
- 当多次使用Concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size。
4.3.2 调整Map、Reduce阶段的参数
-- 此方法是在mapper中将多个文件合成一个split作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
-- 每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;
-- 一个节点上split的至少的大小
set mapred.min.split.size.per.node=100000000;
-- 一个交换机下split的至少的大小
set mapred.min.split.size.per.rack=100000000;
-- 设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true;
-- 设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true;
-- 设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000;
-- 当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge
set hive.merge.smallfiles.avgsize=16000000;
-- hive的查询结果输出是否进行压缩
set hive.exec.compress.output=true;
-- MapReduce Job的结果输出是否使用压缩
set mapreduce.output.fileoutputformat.compress=true;
4.3.3 减少Reduce的数量
有时候为了加快程序运行,可能会盲目的设置reduce的数量。通常不建议直接设置reduce的数量,可以设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数。
-- 设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数
set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默认是1G,设置为5G
-- 执行以下语句,将数据均衡的分配到reduce中
set mapreduce.job.reduces=10;
Insert OverWrite Table A Partition(dt)
Select * From B
Distribute By Rand();
-- 解释:如设置reduce数量为10,则使用 rand(), 随机生成一个数 x % 10 ,这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小
4.3.4 使用hadoop的archive将小文件归档
Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问
-- 用来控制归档是否可用
set hive.archive.enabled= true;
-- 通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable= true;
-- 控制需要归档文件的大小
set har.partfile.size=1099511627776;
-- 使用以下命令进行归档
ALTER TABLE srctest ARCHIVE PARTITION(dt='20210811', hr= '12' );
-- 对已归档的分区恢复为原文件
ALTER TABLE srctest UNARCHIVE PARTITION(dt='20210811', hr= '12' );
如果使用的不是分区表,则可创建成外部表,并使用har://协议来指定路径。
五 数据倾斜
如果Hive使用的是MapReduce引擎,产生倾斜的原因就是大量相同的key都被分配到同一个Reduce中了。查看任务进度时,发现任务长时间停留在99%,只有少量的任务没有完成。产生数据倾斜主要由下面的原因导致:
- Key分布不均匀,有热点Key
- Join操作,关联的key比较集中,导致某些个Reduce的值偏高。
- 空值或者无意义的值:如果这些值很多,在Join的时候会拖累进度。
- distinct操作:导致最终只有一个reduce任务。
解决方案
- 可以使用group by代替distinct,如果数据量非常大时,count(distinct)会非常耗时,原因和order by一样,最终只有一个reduce在做操作。
- 可以先在map端聚合,group by时,combiner在map端做部分聚合,减少Shuffle数据量。同时,在map端设置好聚合的行数阈值。
set hive.map.aggr=true;
set hive.groupby.mapaggr.checkinterval=100000;
- 如果能用mapjoin,尽量使用mapjoin替换reduce端的join。
- 单独处理倾斜key:一般来讲,倾斜的key很好,我们可以通过抽样获得产生倾斜的key,然后进行单独处理。
- 尽量使用sort by配合distribute by一起使用替代order by。
六 推测执行
通过加快获取单个task的结果以及进行侦测将执行慢的TaskTracker加入到黑名单的方式来提高整体的任务执行效率。
6.1 修改mapred-site.xml文件
<property>
<name>mapred.map.tasks.speculative.execution </name>
<value>true</value>
</property>
<property>
<name>mapred.reduce.tasks.speculative.execution </name>
<value>true</value>
</property>
6.2 修改hive配置
set hive.mapred.reduce.tasks.speculative.execution=true;
七 设置合理的MapTask数量
7.1 Map 数过大
- Map 阶段输出文件太小,产生大量小文件
- 初始化和创建 Map 的开销很大
7.2 Map 数太小
- 文件处理或查询并发度小,Job 执行时间过长
- 大量作业时,容易堵塞集群
在 MapReduce 的编程案例中,我们得知,一个MR Job的 MapTask 数量是由输入分片 InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit()决定的。一个输入分片对应一个 MapTask, 而输入分片是由三个参数决定的:
输入分片大小的计算是这么计算出来的:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处理效率。
两种经典的控制 MapTask 的个数方案:减少 MapTask 数或者增加 MapTask 数
- 减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
- 增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数
因为 Hive 语句最终要转换为一系列的 MapReduce Job 的,而每一个 MapReduce Job 是由一 系列的 MapTask 和 ReduceTask 组成的,默认情况下, MapReduce 中一个 MapTask 或者一个 ReduceTask 就会启动一个 JVM 进程,一个 Task 执行完毕后, JVM 进程就退出。这样如果任 务花费时间很短,又要多次启动 JVM 的情况下,JVM 的启动时间会变成一个比较大的消耗, 这个时候,就可以通过重用 JVM 来解决:
set mapred.job.reuse.jvm.num.tasks=5
八 设置合理的分区数
分区表实际上就是对应一个HDFS文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive根据某列或者某些列的值(这些列在表中并不真实存在)将数据分区,放在表文件夹下不同子文件夹中存储。Hive中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。
8.1 静态分区和动态分区:
- 静态分区:在建表中指定分区条件,数据导入或者插入时需要指定分区。
- 动态分区:按照某个或某些字段的值不同自动地进行分区,底层实际是利用MapReduce的mutipleOutputs(根据条件判断,将结果写入不同目录不同文件)。
- 静态分区必须在动态分区前。
8.2 分区的注意事项
Hive分区过多,导致每个分区的文件小,会导致HDFS小文件过多的问题。
- 小文件数量过多造成NameNode负担过大。
- Hive运行Mapreduce时,每个block对应一个切片,而小文件则会直接对应一个map任务,使得map任务过多使得运行效率低下(Yarn频繁申请销毁容器)。
九 调整mapper和reducer的个数
hive通过将查询划分为一个或者多个MR任务达到并行化的目的,每个任务都可能具有多个mapper和reducer任务,其中一些是可以并行执行的,确定最佳的mapper个数和reducer个数取决于多个变量,例如输入的数据量的大小以及对这些数据操作的类型等。
设置太多的mapper与reducer个数,就会导致启动阶段,调度与运行job的过程中产生过多的开销;如果设置的数量太少,那么就可能没有充分利用好集群的并行性。
hive会根据输入的数据量来分配reducer的个数,我们可以通过参数hive.exec.reducers.bytes.per.reducer来设置每个reducer的数据量大小,默认是1G,将该值调大,可以减少reducer的数量,调小,可以增加 reducer的数量。
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>1000000000</value>
<description>size per reducer.The default is 1G, i.e if the input size is 10G, it will use 10 reducers.</description>
</property>
有些查询map阶段之后产生的中间数据量要大于输入的数据量,有时候会小于输入的数据量,因此合理的设置reducer的数量也是一个经验挑战,可以通过参数mapred.reduce.tasks,默认是-1,hive会自动计算其个数:
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
<description>The default number of reduce tasks per job. Typically set
to a prime close to the number of available hosts. Ignored when
mapred.job.tracker is "local". Hadoop set this to 1 by default, whereas Hive uses -1 as its default value.
By setting this property to -1, Hive will automatically figure out what should be the number of reducers.
</description>
</property>
但是实际生产中,集群中数据量会很大,为了控制资源的利用情况,防止一个job过大,消耗完集群资源,使得其他job无法运行,这时可以通过参数hive.exec.reducers.max来设置最大值:
<property>
<name>hive.exec.reducers.max</name>
<value>999</value>
<description>max number of reducers will be used. If the one
specified in the configuration parameter mapred.reduce.tasks is
negative, Hive will use this one as the max number of reducers when
automatically determine number of reducers.</description>
</property>
对于这个属性的值,一般情况下可以根据一个公式来计算,1.5倍数可以防止未充分利用集群的资源:
(集群总共Reduce槽位的个数*1.5 /(执行中查询的平均个数))
十 Hive的其他优化方法
10.1 并行执行
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。
10.2 严格模式
对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加这个LIMIT语句可以防止Reducer额外执行很长一段时间。
限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用WHERE语句,这样关系数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。不幸的是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。
10.3 JVM重用
JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数任务执行时间都很短。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6206.html