数仓-HIVE实战优化技巧[通俗易懂]

数仓-HIVE实战优化技巧[通俗易懂]1 rank、dense_rank、row_numberRow_number返回行的序列号,不重复不并列。

大家好,欢迎来到IT知识分享网。

1 rank、dense_rank、row_number

  • Row_number返回行的序列号,不重复不并列。组内相同数据的序列号会出现乱序
  • Rank组内排序,有并列、并列后序号不连续
  • Dense_rank组内排序,有并列、并列后序号连续

2 Hive order by、sort by、distribute by、cluster by

  • order by

order by 会对数据进行全局排序,和oracle和mysql等数据库中的order by 效果一样,它只在一个reduce中进行,所以数据量特别大的时候效率非常低。而且当设置 :set hive. mapred. mode =strict的时候不指定limit,执行select会报错,如下:LIMIT must also be specified.

  • sort by

sort by 是单独在各自的reduce中进行排序,所以并不能保证全局有序,一般和distribute by 一起执行,而且distribute by 要写在sort by前面。如果mapred.reduce.tasks=1和order by效果一样,如果大于1会分成几个文件输出每个文件会按照指定的字段排序,而不保证全局有序。sort by 不受 hive.mapred.mode 是否为strict ,nostrict 的影响

  • distribute by

用distribute by 会对指定的字段按照hashCode值对reduce的个数取模,然后将任务分配到对应的reduce中去执行,就是在mapreduce程序中的patition分区过程,默认根据指定key.hashCode()&Integer.MAX_VALUE%numReduce 确定处理该任务的reduce。

  • cluster By

distribute by 和 sort by 合用就相当于cluster by,但是cluster by 不能指定排序为asc或 desc 的规则,只能是desc倒序排列。

3 开窗函数

  • 窗口函数指定了函数工作的数据窗口大小(当前行的上下多少行),这个数据窗口大小可能会随着行的变化而变化。
  • 窗口函数和聚合函数区别?

窗口函数对于每个组返回多行,组内每一行对应返回一行值。聚合函数对于每个组只返回一行。

  • over()开窗函数,其括号内部主要有两种形,固定搭配,不能更改:

over(distribute by…sort by…) 和 over(partition by…order by…)

  • 分析函数

注意(重要,决定了窗口函数的使用场景问题):

1)分析函数的使用一定要注意窗口的范围,因为窗口的范围限定了分析函数的作用范围

2) 分析函数会对窗口中的每一行数据输出一个结果,即会对查询的结果多出一列,这一列可以是聚合结果,也可以是排序结果

sum() avg() max() min()

first_value(col) over() : 某分区排序后的第一个col值

last_value(col) over() : 某分区排序后的最后一个col值

lag(col,n,DEFAULT) : 统计往前n行的col值,n可选,默认为1,DEFAULT当往上第n行为NULL时候,取默认值,如不指定,则为NULL

lead(col,n,DEFAULT) : 统计往后n行的col值,n可选,默认为1,DEFAULT当往下第n行为NULL时候,取默认值,如不指定,则为NULL

ntile(n) : 用于将分组数据按照顺序切分成n片,返回当前切片值。注意:n必须为int类型。

  • 排名函数

注意:排名函数不支持window子句,即不支持自定义窗口大小

Row_number、rank、dense_rank

  • 窗口大小的设置(也叫window子句)

默认窗口大小是从起始行到当前行

partition by …order by…rows between unbounded preceding and current row

窗口大小为从起始行得到当前行。

partition by …order by… rows between 3 preceding and current row

窗口大小为从当前行到之前三行

partition by …order by… rows between 3 preceding and 1 following

窗口大小为当前行的前三行到之后的一行

partition by …order by… rows between 3 preceding and unbounded following

窗口大小为当前行的前三行到之后的所有行

4 Hive map、reduce数怎么设置

  • 影响map个数,即split个数的因素主要有:

1)HDFS块的大小,即HDFS中dfs.block.size的值。如果有一个输入文件为1024m,当块为

256m时,会被划分为4个split;当块为128m时,会被划分为8个split。

2)文件的大小。当块为128m时,如果输入文件为128m,会被划分为1个split;当块为256m,会被划分为2个split。

3)文件的个数。FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小的文件。如果HDFS中dfs.block.size设置为64m,而输入的目录中文件有100个,则划分后的split个数至少为100个。

4)splitsize的大小。分片是按照splitszie的大小进行分割的,一个split的大小在没有设置的情况下,默认等于hdfs block的大小。但应用程序可以通过两个参数来对splitsize进行调节。

  • hive小文件很多,造成map个数很多,需要减少map个数

set mapred.max.split.size=100000000;

set mapred.min.split.size.per.node=100000000;

set mapred.min.split.size.per.rack=100000000;

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的)

  • 文件小于128M,但是记录多,默认用一个map去算,增加map个数

块大小的计算方式

根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,

调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。

set mapreduce.input.fileinputformat.split.maxsize=50000000(50M);

  • 设置reduce个数

1.设置每个reduce处理数据量,默认1G

set hive.exec.reducers.bytes.per.reducer=500000000; (500M)

2.直接调整reduce个数

set mapred.reduce.tasks = 15;

3.只有一个reduce;

a) 没有group by的汇总,比如把select pt,count(1) from tablea group by pt;

b) 用了Order by

c) 有笛卡尔积

5 hive数据倾斜

1)什么是数据倾斜?

由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点

2)主要表现:任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大于平均时长。

3)容易数据倾斜情况

数仓-HIVE实战优化技巧[通俗易懂]

hive数据倾斜

4)产生数据倾斜的原因:

A:key 分布不均匀

B:业务数据本身的特性

C:建表考虑不周全

D:某些 HQL 语句本身就存在数据倾斜

5)针对goupby出现数据倾斜

案例场景:某一特殊key值大量出现,语句中仅出现groupby,没有相应的聚合函数一起(聚合函数可以在map阶段提前进行聚合,可以降低数据倾斜风险),会造成对应key的reduce出现数据倾斜

解决策略是对key值进行加盐处理:

核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行sum,count等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜

方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

6)针对join出现的数据倾斜

方案一:抽样求出引起数据倾斜的key值,进行过滤处理

情景:某张表中数据分布不均,个别key值出现次数占比很大,引起join数据倾斜,例如数据空值或者爬虫IP

处理思路:首先对数据进行抽样,选出key占比较大列表,采取过滤处理,去掉无效值或者加盐等处理,然后先进行局部处理,在整体处理

优点:可以快速解决数据倾斜问题

缺点:应用场景受限,适用于几个key值偏多的情况

方案二:优先使用mapjoin

由于map阶段不会发生数据倾斜,使用mapjoin可以防止数据倾斜,join操作中的表的数据量比较小(比如几百M或者一两G),比较适用此方案。

在 hive 中,直接提供了能够在 HQL 语句指定该次查询使用 map join,map join 的用法是在查询/子查询的SELECT关键字后面添加/*+ MAPJOIN(tablelist) */提示优化器转化为mapjoin(早期的 Hive 版本的优化器是不能自动优化 map join 的)。其中 tablelist 可以是一个表,或以逗号连接的表的列表。tablelist 中的表将会读入内存,通常应该是将小表写在这里。

MapJoin 具体用法:

select /* +mapjoin(a) */ a.id aid, name, age from a join b on a.id = b.id;select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid =b.movieid;

在 hive0.11 版本以后会自动开启 map join 优化,

由两个参数控制:set hive.auto.convert.join=true;//设置 MapJoin 优化自动开启set hive.mapjoin.smalltable.filesize=25000000//设置小表不超过多大时开启 mapjoin 优化

方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行内存加载,此时会比较消耗内存资源,每个节点驻留一份小表的全量数据。如果小表数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

方案三:处理大表文件,使其变为可加载到内存的小表

场景:select * from log a left outer join users b on a.user_id = b.user_id;

users 表有 600w+的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

解决思路:

select /*+mapjoin(x)*/* from log aleft outer join

( select /*+mapjoin(c)*/ d.*

from ( select distinct user_id from log ) c join users d

on c.user_id = d.user_id) x

on a.user_id = x.user_id;

优点:可以有效解决大表join大表的数据倾斜

缺点:应用受限,log 里 user_id 有上百万个,这就又回到原来 map join 问题。所幸,每日的会员 uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题

方案四:采样倾斜key并分拆join操作

方案适用场景:两个Hive表进行join的时候,如果数据量都比较大,那么此时可以看一下两个Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个Hive表中的少数几个key的数据量过大,而另一个Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路:

对包含少数几个数据量过大的key的那个表,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。

然后将这几个key对应的数据从原来的表中拆分出来,形成一个单独的表,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个表

接着将需要join的另一个表,也过滤出来那几个倾斜key对应的数据并形成一个单独的表,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个表。

再将附加了随机前缀的独立表与另一个膨胀n倍的独立表进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。

而另外两个普通的表就照常join即可。

最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

解决方案五:使用随机前缀和扩容RDD进行join

方案适用场景:如果在进行join操作时,表中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

该方案的实现思路基本和“解决方案四”类似,首先查看Hive表中的数据分布情况,找到那个造成数据倾斜的Hive表,比如有多个key都对应了超过1万条数据。

然后将该表的每条数据都打上一个n以内的随机前缀。

同时对另外一个正常的表进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。

最后将两个处理后的表进行join即可。

方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个表进行扩容,对内存资源要求很高。

方案六:增加并行度

场景:两个大表,数据分布均匀,为了提高效率,使用mapjoin,采用切分大表的方法

采用将大表切分为小表,然后进行连接

原始测试表

+———-+————+

| test.id | test.name |

+———-+————+

| 1 | aa |

| 2 | bb |

| 3 | cc |

| 4 | dd |

+———-+————+

将其切分为两个:

select * from test tablesample(bucket 1 out of 2 on id);

解释:tablesample(bucket 3 out of 4 on id),其中tablesample为关键字,bucket 关键字,3为要去的分表,4为拆分表的数目,id拆分依据

6 Hive中的文件格式

1)TEXTFILE

· 文本格式,Hive的默认格式,数据不压缩,磁盘开销大、数据解析开销大。

· 对应的hive API为:org.apache.hadoop.mapred.TextInputFormat和org.apache.hive.ql.io.HiveIgnoreKeyTextOutputFormat;

· 可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但是使用这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作

2)SequenceFile

· Hadoop提供的二进制文件,Hadoop支持的标准文件;

· 数据直接序列化到文件中,SequenceFile文件不能直接查看,可以通过Hadoop fs -text查看;

· SequenceFile具有使用方便、可分割、可压缩、可进行切片,压缩支持NONE、RECORD、BLOCK(优先);

· 对应hive API:org.apache.hadoop.mapred.SequenceFileInputFormat和org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

3)RCFILE

· rcfile是一种行列存储相结合的存储方式,先将数据按行进行分块再按列式存储,保证同一条记录在一个块上,避免读取多个块,有利于数据压缩和快速进行列存储;

· 对应 hive API为:org.apache.hadoop.hive.ql.io.RCFileInputFormat和org.apache.hadoop.hive.ql.io.RCFileOutputFormat

4)orcfile

· orcfile式对rcfile的优化,可以提高hive的读写、数据处理性能、提供更高的压缩效率;

· 优点:

· 每个task只输出单个文件,减少namenode负载;

· 支持各种复杂的数据类型,比如:datetime,decima以及复杂类型struct、list、map;

· 文件中存储了一些轻量级的索引数据;

· 基于数据类型的块模式压缩:integer类型的列用行程长度编码,string类型的列使用字典编码;

· 用多个相互独立的recordReaders并行读相同的文件

· 无需扫描markers即可分割文件

· 绑定读写所需内存

· metadata存储用protocol buffers,支持添加和删除列

5)parquet

· Parquet也是一种列式存储,同时具有很好的压缩性能;同时可以减少大量的表扫描和反序列化的时间。

总结

· textfile 存储空间消耗比较大,并且压缩的text 无法分割和合并 查询的效率最低,可以直接存储,加载数据的速度最高;

· sequencefile 存储空间消耗最大,压缩的文件可以分割和合并 查询效率高,需要通过text文件转化来加载;

· orcfile, rcfile存储空间最小,查询的效率最高 ,需要通过text文件转化来加载,加载的速度最低;

· parquet格式是列式存储,有很好的压缩性能和表扫描功能;

SEQUENCEFILE、RCFILE、ORCFILE、PARQUET格式的表不能直接从本地文件导入数据,数据要先导入到textfile格式的表中, 然后再从表中用insert导入SequenceFile、RCFil、ORCFile、PARQUET表中;或者用复制表结构及数据的方式(create table as select * from table )。

7 hive文件压缩

数仓-HIVE实战优化技巧[通俗易懂]

hadoop文件压缩格式

提示:如果面试过程问起,我们一般回答压缩方式为Snappy,特点速度快,缺点无法切分(可以回答在链式MR中,Reduce端输出使用bzip2压缩,以便后续的map任务对数据进行split)

8 Hive是如何将hql语法转换成MR任务

数仓-HIVE实战优化技巧[通俗易懂]

hive处理流程

总的来说,Hive是通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回

Hive将SQL转化为MapReduce任务整个编译阶段分为六个阶段:

Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree

遍历AST Tree,抽象出查询的基本组成单元QueryBlock

遍历QueryBlock,翻译为执行操作树OperatorTree

逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量

遍历OperatorTree,翻译为MapReduce任务

物理层优化器进行MapReduce任务的变换,生成最终的执行计划

9 hive分桶

Hive可以将表或者表的分区进一步组织成桶,以达到:

1、数据取样效率更高

2、数据处理效率更高

桶通过对指定列进行哈希来实现,将一个列名下的数据切分为“一组桶”,每个桶都对应了一个该列名下的一个存储文件。

1)直接分桶

开始操作之前,需要将hive.enforce.bucketing属性设置为true,以标识Hive可以识别桶。

create table music(

id int,

name string,

size float)

row format delimited fields terminated by “\t”

clustered by (id) into 4 buckets;

该代码的意思是将music表按照id将数据分成了4个桶,插入数据时,会对应4个 reduce操作,输出4个文件。

2)在分区中分桶

当数据量过大,需要庞大发分区数量时,可以考虑桶,因为分区数量太大的情况可能会导致文件系统挂掉,而且桶比分区有更高的查询效率。数据最终落在哪一个桶里,取决于clustered by的那个列的值的hash数与桶的个数求余来决定。虽然有一定离散行,但不能保证每个桶中的数据量是一样的。

create table music2(

id int,

name string,

size float)

partitioned by (date string)

clustered by (id) sorted by(size) into 4 bucket

row format delimited

fields terminated by “\t”;

load data local inpath ‘demo/music.txt’ into table music2 partition(date=’2017-08-30′);

10 Hive的udf、udaf和udtf

自定义UDF:继承UDF,重写evaluate方法

自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close

1)行转列

创建表及数据导入:

create table person_info(

name string,

constellation string,

blood_type string)

row format delimited fields terminated by “\t”;

load data local inpath “person_info.txt” into table person_info;

例如:把星座和血型一样的人归类到一起

select t1.base, concat_ws(‘|’, collect_set(t1.name)) name

from

(select name, concat(constellation, “,”, blood_type) base from person_info) t1

group by t1.base;

2)列转行(UDAF与UDTF)

创建表及导入数据:

create table movie_info(

movie string,

category array<string>)

row format delimited fields terminated by “\t”

collection items terminated by “,”;

load data local inpath “movie.txt” into table movie_info;

例如:将电影分类中的数组数据展开

select movie, category_name

from movie_info lateral view explode(category) table_tmp as category_name;

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6214.html

(0)
上一篇 2022-12-17 13:40
下一篇 2022-12-17 14:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信