大家好,欢迎来到IT知识分享网。
1 rank、dense_rank、row_number
- Row_number返回行的序列号,不重复不并列。组内相同数据的序列号会出现乱序
- Rank组内排序,有并列、并列后序号不连续
- Dense_rank组内排序,有并列、并列后序号连续
2 Hive order by、sort by、distribute by、cluster by
- order by
order by 会对数据进行全局排序,和oracle和mysql等数据库中的order by 效果一样,它只在一个reduce中进行,所以数据量特别大的时候效率非常低。而且当设置 :set hive. mapred. mode =strict的时候不指定limit,执行select会报错,如下:LIMIT must also be specified.
- sort by
sort by 是单独在各自的reduce中进行排序,所以并不能保证全局有序,一般和distribute by 一起执行,而且distribute by 要写在sort by前面。如果mapred.reduce.tasks=1和order by效果一样,如果大于1会分成几个文件输出每个文件会按照指定的字段排序,而不保证全局有序。sort by 不受 hive.mapred.mode 是否为strict ,nostrict 的影响
- distribute by
用distribute by 会对指定的字段按照hashCode值对reduce的个数取模,然后将任务分配到对应的reduce中去执行,就是在mapreduce程序中的patition分区过程,默认根据指定key.hashCode()&Integer.MAX_VALUE%numReduce 确定处理该任务的reduce。
- cluster By
distribute by 和 sort by 合用就相当于cluster by,但是cluster by 不能指定排序为asc或 desc 的规则,只能是desc倒序排列。
3 开窗函数
- 窗口函数指定了函数工作的数据窗口大小(当前行的上下多少行),这个数据窗口大小可能会随着行的变化而变化。
- 窗口函数和聚合函数区别?
窗口函数对于每个组返回多行,组内每一行对应返回一行值。聚合函数对于每个组只返回一行。
- over()开窗函数,其括号内部主要有两种形式,固定搭配,不能更改:
over(distribute by…sort by…) 和 over(partition by…order by…)
- 分析函数
注意(重要,决定了窗口函数的使用场景问题):
1)分析函数的使用一定要注意窗口的范围,因为窗口的范围限定了分析函数的作用范围
2) 分析函数会对窗口中的每一行数据输出一个结果,即会对查询的结果多出一列,这一列可以是聚合结果,也可以是排序结果
sum() avg() max() min()
first_value(col) over() : 某分区排序后的第一个col值
last_value(col) over() : 某分区排序后的最后一个col值
lag(col,n,DEFAULT) : 统计往前n行的col值,n可选,默认为1,DEFAULT当往上第n行为NULL时候,取默认值,如不指定,则为NULL
lead(col,n,DEFAULT) : 统计往后n行的col值,n可选,默认为1,DEFAULT当往下第n行为NULL时候,取默认值,如不指定,则为NULL
ntile(n) : 用于将分组数据按照顺序切分成n片,返回当前切片值。注意:n必须为int类型。
- 排名函数
注意:排名函数不支持window子句,即不支持自定义窗口大小
Row_number、rank、dense_rank
- 窗口大小的设置(也叫window子句)
默认窗口大小是从起始行到当前行
partition by …order by…rows between unbounded preceding and current row
窗口大小为从起始行得到当前行。
partition by …order by… rows between 3 preceding and current row
窗口大小为从当前行到之前三行
partition by …order by… rows between 3 preceding and 1 following
窗口大小为当前行的前三行到之后的一行
partition by …order by… rows between 3 preceding and unbounded following
窗口大小为当前行的前三行到之后的所有行
4 Hive map、reduce数怎么设置
- 影响map个数,即split个数的因素主要有:
1)HDFS块的大小,即HDFS中dfs.block.size的值。如果有一个输入文件为1024m,当块为
256m时,会被划分为4个split;当块为128m时,会被划分为8个split。
2)文件的大小。当块为128m时,如果输入文件为128m,会被划分为1个split;当块为256m,会被划分为2个split。
3)文件的个数。FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小的文件。如果HDFS中dfs.block.size设置为64m,而输入的目录中文件有100个,则划分后的split个数至少为100个。
4)splitsize的大小。分片是按照splitszie的大小进行分割的,一个split的大小在没有设置的情况下,默认等于hdfs block的大小。但应用程序可以通过两个参数来对splitsize进行调节。
- hive小文件很多,造成map个数很多,需要减少map个数
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的)
- 文件小于128M,但是记录多,默认用一个map去算,增加map个数
块大小的计算方式
根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,
调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。
set mapreduce.input.fileinputformat.split.maxsize=50000000(50M);
- 设置reduce个数
1.设置每个reduce处理数据量,默认1G
set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
2.直接调整reduce个数
set mapred.reduce.tasks = 15;
3.只有一个reduce;
a) 没有group by的汇总,比如把select pt,count(1) from tablea group by pt;
b) 用了Order by
c) 有笛卡尔积
5 hive数据倾斜
1)什么是数据倾斜?
由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点
2)主要表现:任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大于平均时长。
3)容易数据倾斜情况
4)产生数据倾斜的原因:
A:key 分布不均匀
B:业务数据本身的特性
C:建表考虑不周全
D:某些 HQL 语句本身就存在数据倾斜
5)针对goupby出现数据倾斜
案例场景:某一特殊key值大量出现,语句中仅出现groupby,没有相应的聚合函数一起(聚合函数可以在map阶段提前进行聚合,可以降低数据倾斜风险),会造成对应key的reduce出现数据倾斜
解决策略是对key值进行加盐处理:
核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行sum,count等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜
方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。
6)针对join出现的数据倾斜
方案一:抽样求出引起数据倾斜的key值,进行过滤处理
情景:某张表中数据分布不均,个别key值出现次数占比很大,引起join数据倾斜,例如数据空值或者爬虫IP
处理思路:首先对数据进行抽样,选出key占比较大列表,采取过滤处理,去掉无效值或者加盐等处理,然后先进行局部处理,在整体处理
优点:可以快速解决数据倾斜问题
缺点:应用场景受限,适用于几个key值偏多的情况
方案二:优先使用mapjoin
由于map阶段不会发生数据倾斜,使用mapjoin可以防止数据倾斜,join操作中的表的数据量比较小(比如几百M或者一两G),比较适用此方案。
在 hive 中,直接提供了能够在 HQL 语句指定该次查询使用 map join,map join 的用法是在查询/子查询的SELECT关键字后面添加/*+ MAPJOIN(tablelist) */提示优化器转化为mapjoin(早期的 Hive 版本的优化器是不能自动优化 map join 的)。其中 tablelist 可以是一个表,或以逗号连接的表的列表。tablelist 中的表将会读入内存,通常应该是将小表写在这里。
MapJoin 具体用法:
select /* +mapjoin(a) */ a.id aid, name, age from a join b on a.id = b.id;select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid =b.movieid;
在 hive0.11 版本以后会自动开启 map join 优化,
由两个参数控制:set hive.auto.convert.join=true;//设置 MapJoin 优化自动开启set hive.mapjoin.smalltable.filesize=25000000//设置小表不超过多大时开启 mapjoin 优化
方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。
方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行内存加载,此时会比较消耗内存资源,每个节点驻留一份小表的全量数据。如果小表数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。
方案三:处理大表文件,使其变为可加载到内存的小表
场景:select * from log a left outer join users b on a.user_id = b.user_id;
users 表有 600w+的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。
解决思路:
select /*+mapjoin(x)*/* from log aleft outer join
( select /*+mapjoin(c)*/ d.*
from ( select distinct user_id from log ) c join users d
on c.user_id = d.user_id) x
on a.user_id = x.user_id;
优点:可以有效解决大表join大表的数据倾斜
缺点:应用受限,log 里 user_id 有上百万个,这就又回到原来 map join 问题。所幸,每日的会员 uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题
方案四:采样倾斜key并分拆join操作
方案适用场景:两个Hive表进行join的时候,如果数据量都比较大,那么此时可以看一下两个Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个Hive表中的少数几个key的数据量过大,而另一个Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。
方案实现思路:
对包含少数几个数据量过大的key的那个表,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
然后将这几个key对应的数据从原来的表中拆分出来,形成一个单独的表,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个表
接着将需要join的另一个表,也过滤出来那几个倾斜key对应的数据并形成一个单独的表,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个表。
再将附加了随机前缀的独立表与另一个膨胀n倍的独立表进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
而另外两个普通的表就照常join即可。
最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。
方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。
方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。
解决方案五:使用随机前缀和扩容RDD进行join
方案适用场景:如果在进行join操作时,表中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。
方案实现思路:
该方案的实现思路基本和“解决方案四”类似,首先查看Hive表中的数据分布情况,找到那个造成数据倾斜的Hive表,比如有多个key都对应了超过1万条数据。
然后将该表的每条数据都打上一个n以内的随机前缀。
同时对另外一个正常的表进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
最后将两个处理后的表进行join即可。
方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。
方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个表进行扩容,对内存资源要求很高。
方案六:增加并行度
场景:两个大表,数据分布均匀,为了提高效率,使用mapjoin,采用切分大表的方法
采用将大表切分为小表,然后进行连接
原始测试表
+———-+————+
| test.id | test.name |
+———-+————+
| 1 | aa |
| 2 | bb |
| 3 | cc |
| 4 | dd |
+———-+————+
将其切分为两个:
select * from test tablesample(bucket 1 out of 2 on id);
解释:tablesample(bucket 3 out of 4 on id),其中tablesample为关键字,bucket 关键字,3为要去的分表,4为拆分表的数目,id拆分依据
6 Hive中的文件格式
1)TEXTFILE
· 文本格式,Hive的默认格式,数据不压缩,磁盘开销大、数据解析开销大。
· 对应的hive API为:org.apache.hadoop.mapred.TextInputFormat和org.apache.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
· 可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但是使用这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作
2)SequenceFile
· Hadoop提供的二进制文件,Hadoop支持的标准文件;
· 数据直接序列化到文件中,SequenceFile文件不能直接查看,可以通过Hadoop fs -text查看;
· SequenceFile具有使用方便、可分割、可压缩、可进行切片,压缩支持NONE、RECORD、BLOCK(优先);
· 对应hive API:org.apache.hadoop.mapred.SequenceFileInputFormat和org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
3)RCFILE
· rcfile是一种行列存储相结合的存储方式,先将数据按行进行分块再按列式存储,保证同一条记录在一个块上,避免读取多个块,有利于数据压缩和快速进行列存储;
· 对应 hive API为:org.apache.hadoop.hive.ql.io.RCFileInputFormat和org.apache.hadoop.hive.ql.io.RCFileOutputFormat
4)orcfile
· orcfile式对rcfile的优化,可以提高hive的读写、数据处理性能、提供更高的压缩效率;
· 优点:
· 每个task只输出单个文件,减少namenode负载;
· 支持各种复杂的数据类型,比如:datetime,decima以及复杂类型struct、list、map;
· 文件中存储了一些轻量级的索引数据;
· 基于数据类型的块模式压缩:integer类型的列用行程长度编码,string类型的列使用字典编码;
· 用多个相互独立的recordReaders并行读相同的文件
· 无需扫描markers即可分割文件
· 绑定读写所需内存
· metadata存储用protocol buffers,支持添加和删除列
5)parquet
· Parquet也是一种列式存储,同时具有很好的压缩性能;同时可以减少大量的表扫描和反序列化的时间。
总结
· textfile 存储空间消耗比较大,并且压缩的text 无法分割和合并 查询的效率最低,可以直接存储,加载数据的速度最高;
· sequencefile 存储空间消耗最大,压缩的文件可以分割和合并 查询效率高,需要通过text文件转化来加载;
· orcfile, rcfile存储空间最小,查询的效率最高 ,需要通过text文件转化来加载,加载的速度最低;
· parquet格式是列式存储,有很好的压缩性能和表扫描功能;
SEQUENCEFILE、RCFILE、ORCFILE、PARQUET格式的表不能直接从本地文件导入数据,数据要先导入到textfile格式的表中, 然后再从表中用insert导入SequenceFile、RCFil、ORCFile、PARQUET表中;或者用复制表结构及数据的方式(create table as select * from table )。
7 hive文件压缩
提示:如果面试过程问起,我们一般回答压缩方式为Snappy,特点速度快,缺点无法切分(可以回答在链式MR中,Reduce端输出使用bzip2压缩,以便后续的map任务对数据进行split)
8 Hive是如何将hql语法转换成MR任务
总的来说,Hive是通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回
Hive将SQL转化为MapReduce任务整个编译阶段分为六个阶段:
Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree
遍历AST Tree,抽象出查询的基本组成单元QueryBlock
遍历QueryBlock,翻译为执行操作树OperatorTree
逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量
遍历OperatorTree,翻译为MapReduce任务
物理层优化器进行MapReduce任务的变换,生成最终的执行计划
9 hive分桶
Hive可以将表或者表的分区进一步组织成桶,以达到:
1、数据取样效率更高
2、数据处理效率更高
桶通过对指定列进行哈希来实现,将一个列名下的数据切分为“一组桶”,每个桶都对应了一个该列名下的一个存储文件。
1)直接分桶
开始操作之前,需要将hive.enforce.bucketing属性设置为true,以标识Hive可以识别桶。
create table music(
id int,
name string,
size float)
row format delimited fields terminated by “\t”
clustered by (id) into 4 buckets;
该代码的意思是将music表按照id将数据分成了4个桶,插入数据时,会对应4个 reduce操作,输出4个文件。
2)在分区中分桶
当数据量过大,需要庞大发分区数量时,可以考虑桶,因为分区数量太大的情况可能会导致文件系统挂掉,而且桶比分区有更高的查询效率。数据最终落在哪一个桶里,取决于clustered by的那个列的值的hash数与桶的个数求余来决定。虽然有一定离散行,但不能保证每个桶中的数据量是一样的。
create table music2(
id int,
name string,
size float)
partitioned by (date string)
clustered by (id) sorted by(size) into 4 bucket
row format delimited
fields terminated by “\t”;
load data local inpath ‘demo/music.txt’ into table music2 partition(date=’2017-08-30′);
10 Hive的udf、udaf和udtf
自定义UDF:继承UDF,重写evaluate方法
自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
1)行转列
创建表及数据导入:
create table person_info(
name string,
constellation string,
blood_type string)
row format delimited fields terminated by “\t”;
load data local inpath “person_info.txt” into table person_info;
例如:把星座和血型一样的人归类到一起
select t1.base, concat_ws(‘|’, collect_set(t1.name)) name
from
(select name, concat(constellation, “,”, blood_type) base from person_info) t1
group by t1.base;
2)列转行(UDAF与UDTF)
创建表及导入数据:
create table movie_info(
movie string,
category array<string>)
row format delimited fields terminated by “\t”
collection items terminated by “,”;
load data local inpath “movie.txt” into table movie_info;
例如:将电影分类中的数组数据展开
select movie, category_name
from movie_info lateral view explode(category) table_tmp as category_name;
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6214.html