大家好,欢迎来到IT知识分享网。
**一、数据倾斜定义 **
MAPREDUCE程序执行时,REDUCE节点大部分执行完成,其中一个或者多个REDUCE节点运行未完成,导致整个程序处理时间很长。这是因为一个或者多个KEY的条数比其他KEY多(百倍或者千倍),这些KEY所在REDUCE节点所处理的数据量比其他节点多很多,从而导致某一个或者几个节点长时间运行。HIVE的执行是分阶段的,MAP处理数据量的差异取决于上一个STAGE的REDUCE输出,所以如何将数据均匀的分配到各个REDUCE中,就能根本解决数据倾斜的问题。
**二、常见数据倾斜原因 **
1、由GROUP BY 造成的数据倾斜解决方法
(1)修改hive.map.aggr=true
set hive.map.aggr=true
做map aggregation,也就是在mapper里面做聚合。这个方法不同于直接写mapreduce的时候可以实现的combiner,但是却实现了类似combiner的效果。事实上各种基于mr的框架如pig,cascading等等用的都是map aggregation(或者叫partial aggregation)而非combiner的策略,也就是在mapper里面直接做聚合操作而不是输出到buffer给combiner做聚合。对于map aggregation,hive还会做检查,如果aggregation的效果不好,那么hive会自动放弃map aggregation。判断效果的依据就是经过一小批数据的处理之后,检查聚合后的数据量是否减小到一定的比例,默认是0.5,由hive.map.aggr.hash.min.reduction这个参数控制。所以如果确认数据里面确实有个别取值倾斜,但是大部分值是比较稀疏的,这个时候可以把比例强制设为1,避免极端情况下map aggr失效。
(2)修改hive.groupby.skewindata=true
set hive.groupby.skewindata=true
注意:只能对单个字段聚合。控制生成两个MR Job,第一个MR Job Map的输出结果随机分配到reduce中减少某些key值条数过多某些key条数过小造成的数据倾斜问题。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可能分发到不同的reduce中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。
(3)count distinct 改写
/*修改前*/select a, count(distinct b) as c from liuxiaobin_tb group by a; /*修改后*/select a, count(*) as c from (select a, b from liuxiaobin_tb group by a, b) group by a;
2、JOIN造成的数据倾斜
(1)skew join
join造成的倾斜,常见情况是不能做map join的两个表(能做map join的话基本上可以避免倾斜),其中一个是行为表,另一个应该是属性表。比如我们有三个表,一个用户属性表users,一个商品属性表items,还有一个用户对商品的操作行为表日志表logs。假设现在需要将行为表关联用户表:
select * from logs a join users b on a.user_id = b.user_id;
其中logs表里面会有一个特殊用户user_id = 0,代表未登录用户,假如这种用户占了相当的比例,那么个别reduce会收到比其他reduce多得多的数据,因为它要接收所有user_id = 0的记录进行处理,使得其处理效果会非常差,其他reduce都跑完很久了它还在运行。
hive给出的解决方案叫skew join,其原理把这种user_id = 0的特殊值先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。
set hive.optimize.skewjoin = true;
(2)特殊值分开处理
针对join倾斜的问题,一般都是通过改写sql解决。对于上面这个问题,我们已经知道user_id = 0是一个特殊key,那么可以把特殊值隔离开来单独做join,这样特殊值肯定会转化成map join,非特殊值就是没有倾斜的普通join了:
* from ( select * from logs where user_id = 0 ) a join ( select * from users where user_id = 0 ) b on a.user_id = b.user_id union all select * from logs a join users b on a.user_id <> 0 and a.user_id = b.user_id;
(3)随机数分配方法
select a.*, b.*from ( select *, cast(rand() * 10 as int) as r_id from logs ) ajoin ( select *, r_id from items lateral view explode(range_list(1, 10)) rl as r_id ) bon a.item_id = b.item_id and a.r_id = b.r_id
3、业务数据本身产生的数据倾斜
(1)空值产生的数据倾斜
select *from log ajoin users bon a.user_id is not null and a.user_id = b.user_id union all select * from log a where a.user_id is null;
(2)不同数据类型关联产生的数据倾斜
select *from users aleft outer join logs bon a.usr_id = cast(b.user_id as string)
(3)map join 解决小表产生的数据倾斜
select * from log a left outer join users b on a.user_id = b.user_id;
select /*+mapjoin(x)*/ *from log aleft outer join ( select /*+mapjoin(c)*/ d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;
三、总结
使map的输出数据更均匀的分布到reduce中去,是我们的最终目标。由于Hash算法的局限性,按key Hash会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。
1、对于join,在判断小表不大于1G的情况下,使用map join
2、对于group by或distinct,设定 hive.groupby.skewindata=true
3、尽量使用上述的SQL语句调节进行优化
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/54436.html