Hive如何调优
Hive 调优
[TOC]
调优根源
Hive性能优化时,把HiveQL当做MapReduce程序来读,即从MapReduce的运行角度来考虑优化性能,从更底层思考如何优化运算性能,而不仅仅局限于逻辑代码的替换层面,所以Hive的优化即MapReduce的优化。
RAC(Real Application Cluster)真正应用集群就像一辆机动灵活的小货车,响应快;Hadoop就像吞吐量巨大的轮船,启动开销大,如果每次只做小数量的输入输出,利用率将会很低。所以用好Hadoop的首要任务是增大每次任务所搭载的数据量。
Hadoop的核心能力是parition和sort,因而这也是优化的根本。
观察Hadoop处理数据的过程,有几个显著的特征:
数据的大规模并不是负载重点,造成运行压力过大是因为运行数据的倾斜。
jobs数较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联对此汇总,产生几十个jobs,将会需要30分钟以上的时间且大部分时间被用于作业分配,初始化和数据输出。M/R作业初始化的时间是比较耗时间资源的一个部分。
在使用SUM,COUNT,MAX,MIN等UDAF函数时,不怕数据倾斜问题,Hadoop在Map端的汇总合并优化过,使数据倾斜不成问题。
COUNT(DISTINCT)在数据量大的情况下,效率较低,如果多COUNT(DISTINCT)效率更低,因为COUNT(DISTINCT)是按GROUP
BY字段分组,按DISTINCT字段排序,一般这种分布式方式是很倾斜的;比如:男UV,女UV,淘宝一天30亿的PV,如果按性别分组,分配2个reduce,每个reduce处理15亿数据。数据倾斜是导致效率大幅降低的主要原因,可以采用多一次 Map/Reduce 的方法, 避免倾斜。
最后得出的结论是:避实就虚,用 job 数的增加,输入量的增加,占用更多存储空间,充分利用空闲 CPU 等各种方法,分解数据倾斜造成的负担。
优化手段
1、列裁剪
Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其它列。 例如,若有以下查询:
1 |
|
在实施此项查询中,Q 表有 5 列(a,b,c,d,e),Hive 只读取查询逻辑中真实需要 的 3 列 a、b、e,而忽略列 c,d;这样做节省了读取开销,中间表存储开销和数据整合开销。
列裁剪对应的参数项为:hive.optimize.cp=true
(默认值为真)
2、分区裁剪
可以在查询的过程中减少不必要的分区。 例如,若有以下查询:
1 |
|
查询语句若将“subq.prtn=100”条件放入子查询中更为高效,可以减少读入的分区 数目。 Hive 自动执行这种裁剪优化。
分区裁剪对应的参数为:hive.optimize.pruner=true
(默认值为真)
3、大表与小表JOIN
在编写带有 join 操作的代码语句时,应该将条目少的表/子查询放在 Join 操作符的左边。 因为在 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,载入条目较少的表 可以有效减少 OOM(out of memory)即内存溢出。所以对于同一个 key 来说,对应的 value 值小的放前,大的放后,这便是“小表放前”原则。 若一条语句中有多个 Join,依据 Join 的条件相同与否,有不同的处理方法。
场景
大表与小表Join时容易发生数据倾斜,表现为小表的数据量比较少但key却比较集中,导致分发到某一个或几个reduce上的数据比其他reduce多很多,造成数据倾斜。
优化方法
使用Map Join将小表装入内存,在map端完成join操作,这样就避免了reduce操作,前提条件是需要的数据在 Map 的过程中可以访问到。有两种方法可以执行Map Join:
(1) 通过hint指定小表做MapJoin
1 |
|
(2) 通过配置参数自动做MapJoin
核心参数:
参数名称 | 默认值 | 说明 |
---|---|---|
hive.auto.convert.join | false | 是否将 common join (reduce 端 join)转换为map join |
hive.mapjoin.smalltable.filesize | 25000000 | 判断小表的输入文件的大小阈值,默认为25M |
因此,巧用MapJoin可以有效解决小表关联大表场景下的数据倾斜。
相关参数:
| 参数名称 | 默认值 | 说明 |
| ————————– | —— | ———————————————————— |
| hive.join.emit.interval | 1000 | 在发出join结果之前对join最右操作缓存多少行的设定,hive jira里有个对该值设置太小的bugfix |
| hive.mapjoin.size.key | 10000 | MapJoin 每一行键的大小,以字节为单位 |
| hive.mapjoin.cache.numrows | 25000 | Map Join 所缓存的行数。 |
4、大表与大表JOIN
场景
大表与大表Join时,当其中一张表的NULL值(或其他值)比较多时,容易导致这些相同值在reduce阶段集中在某一个或几个reduce上,发生数据倾斜问题。
优化方法
(1) Null值合并
将NULL值提取出来最后合并,这一部分只有map操作;非NULL值的数据分散到不同reduce上,不会出现某个reduce任务数据加工时间过长的情况,整体效率提升明显。这种方法由于有两次Table Scan会导致map增多。
1 |
|
(2) Null给定随机值
在Join时直接把NULL值打散成随机值来作为reduce的key值,不会出现某个reduce任务数据加工时间过长的情况,整体效率提升明显。这种方法解释计划只有一次map,效率一般优于第一种方法。
1 |
|
5、GROUP BY 操作
场景
Hive做group by查询,当遇到group by字段的某些值特别多的时候,会将相同值拉到同一个reduce任务进行聚合,也容易发生数据倾斜。
优化方法
(1) 开启Map端聚合
参数设置:
参数名称 | 默认值 | 说明 |
---|---|---|
hive.map.aggr | true | 是否开启Map端聚合 |
hive.groupby.mapaggr.checkinterval | 100000 | 在Map端进行聚合操作的各条数目 |
(2) 有数据倾斜时进行负载均衡
参数设置:
参数名称 | 默认值 | 说明 |
---|---|---|
hive.groupby.mapaggr.checkinterval | false | 当Group By 有数据倾斜时是否进行负载均衡 |
当设定hive.groupby.skewindata为true时,生成的查询计划会有两个MapReduce任务。在第一个MapReduce 中,map的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,这样处理之后,相同的 Group By Key 有可能分发到不同的 reduce 中,从而达到负载均衡的目的。在第二个 MapReduce 任务再根据第一步中处理的数据按照Group By Key分布到reduce中,(这一步中相同的key在同一个reduce中),最终生成聚合操作结果。
6、COUNT(DISTINCT) 操作
场景
当在数据量比较大的情况下,由于COUNT DISTINCT操作是用一个reduce任务来完成,这一个reduce需要处理的数据量太大,就会导致整个job很难完成,这也可以归纳为一种数据倾斜。
优化方法
(1)使用GroupBy
将COUNT DISTINCT使用先GROUP BY再COUNT的方式替换。例如:
1 |
|
因此,count distinct的优化本质上也是转成group by操作。
案例
计算 uv 的时候,经常会用到 COUNT(DISTINCT),但在数据比较倾斜的时候 COUNT(DISTINCT) 会比较慢。这时可以尝试用 GROUP BY 改写代码计算 uv。
1 |
|
关于COUNT(DISTINCT)的数据倾斜问题不能一概而论,要依情况而定,下面是我测试的一组数据:
1 |
|
1 |
|
测试结果表名:明显改造后的语句比之前耗时,这是因为改造后的语句有2个SELECT,多了一个job,这样在数据量小的时候,数据不会存在倾斜问题。
7、合并小文件
原因
- 众所周知,小文件在HDFS中存储本身就会占用过多的内存空间,那么对于MR查询过程中过多的小文件又会造成启动过多的Mapper Task, 每个Mapper都是一个后台线程,会占用JVM的空间
- 在Hive中,动态分区会造成在插入数据过程中,生成过多零碎的小文件(请回忆昨天讲的动态分区的逻辑)
- 不合理的Reducer Task数量的设置也会造成小文件的生成,因为最终Reducer是将数据落地到HDFS中的
场景
我们知道文件数目小,容易在文件存储端造成瓶颈,给 HDFS 带来压力,影响处理效率。对此,可以通过合并Map和Reduce的结果文件来消除这样的影响
优化方法
(1)开启map/reduce端文件合并
参数设置:
参数名称 | 默认值 | 说明 |
---|---|---|
hive.merge.mapfiles | true | 是否合并Map输出文件 |
hive.merge.mapredfiles | false | 是否合并Reduce 端输出文件 |
hive.merge.size.per.task | 256000000(256M) | 合并文件的大小 |
(2)存储格式
采用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件(常见于在流计算的时候采用Sequencefile格式进行存储)
(3)合并文件
关联几百张小表的情况下,阔以合并多个文件数据到一个文件中,重新构建表
8、谓词下推(已过时)
所谓谓词下推就是通过嵌套的方式,将底层查询语句尽量推到数据底层去过滤,这样在上层应用中就可以使用更少的数据量来查询,这种SQL技巧被称为谓词下推(Predicate pushdown)
eg:采用谓词下推的技术,提早进行过滤有可能减少必须在数据库分区之间传递的数据量
1 |
|
9、设置并行执行任务数
Hive会将一个查询转化为一个或多个阶段,包括:MapReduce阶段、抽样阶段、合并阶段、limit阶段等。默认情况下,一次只执行一个阶段。不过,如果某些阶段不是互相依赖,是可以并行执行的。
核心参数
参数名称 | 默认值 | 说明 |
---|---|---|
hive.exec.parallel | false | 是否打开任务并行执行(默认按顺序执行) |
hive.exec.parallel.thread.number | 8 | 同一个 sql 允许最大并行度 |
10、设置合理的map和reduce数量
原因:
- 过多的启动和初始化 map / reduce 也会消耗时间和资源
- 数据量较大的情况下,默认的设置下,map端会产生很多的切片文件,不断的溢写与溢出等,会十分消耗资源
- 有多少个Reduer就会有多少个文件产生,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题
优化方法
参数设置
参数名称 | 默认值 | 说明 |
---|---|---|
hive.exec.reducers.bytes.per.reducer | 256000000 | 每个Reduce处理的数据默认是256MB |
hive.exec.reducers.max | 1009 | 每个任务最大的reduce数 |
mapreduce.job.reduces | 1 | reduce数量,也可以通过编程的方式,调用Job对象的setNumReduceTasks()方法来设置 |
一个节点Reduce任务数量上限由mapreduce.tasktracker.reduce.tasks.maximum设置(默认2)。
1 |
|
11、JVM重用
JVM重用是Hadoop中调优参数的内容,该方式对Hive的性能也有很大的帮助,特别对于很难避免小文件的场景或者Task特别多的场景,这类场景的大数据执行时间都很短
Hadood的默认配置通常是使用派生JVM来执行map和reduce任务的,会造成JVM的启动过程比较大的开销,尤其是在执行Job包含有成百上千个task任务的情况。
简单来说就是:JVM重用可以使得JVM实例在同一个job中重复使用N次,不需要重复的开启与关系Container来消耗资源
优化方法
N的值可以在hadoop的mapred-site.xml文件中进行设置,N值默认为 1
1 |
|
12、设置本地模式
场景
有时候Hive处理的数据量非常小,那么在这种情况下,为查询出发执行任务的时间消耗可能会比实际job的执行时间要长,对于大多数这种情况,hive可以通过本地模式在单节点上处理所有任务,避免了网络传输、Container开关等带来的消耗,对于小数据量任务可以大大的缩短时间
当一个job满足如下条件才能真正使用本地模式:
1.job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
2.job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
3.job的reduce数必须为0或者1
优化方法
参数配置
参数名称 | 默认值 | 说明 |
---|---|---|
hive.exec.mode.local.auto | false | 是否开启Hive的本地模式 |
hive.exec.mode.local.auto.inputbytes.max | 128MB | 本地模式中job的输入数据最大值 |
hive.exec.mode.local.auto.tasks.max | 4 | 本地模式中job的map数量最大值 |
13、使用严格模式
Hive提供了一种严格模式,可以防止用户执行那些可能产生意想不到的不好的影响查询
场景
- 对于分区表,除非WHERE语句中含有分区字段过滤条件来限制数据范围,否则不允许执行,也就是说不允许扫描所有分区
- 使用ORDER BY 语句进行查询是,必须使用LIMIT语句,因为ORDER BY 为了执行排序过程会将所有结果数据分发到同一个reduce中进行处理,强制要求用户添加LIMIT可以防止reducer额外的执行很长时间
优化方法
参数配置
参数名称 | 默认值 | 说明 |
---|---|---|
Hive.mapred.mode | nonstrict | 开启Hive的严格模式 |
14、合理的选择排序
Hive中排序方式包括order by,sort by,distribute by和cluster by四种。在Hive中一定要合理选择排序的方式,下面我对几种排序及使用方法做一个介绍。
场景
order by
order by对查询的结果做一次全局排序,也就是说指定order by的所有数据都会到一个reducer中处理,对数据量特别大的情况,执行时间会特别长。
如果指定了严格模式,此时必须指定limit来限制输出条数,因为所有数据在一个reducer端进行,数据量大时可能不会出结果。
1 |
|
sort by
sort by是局部排序,会在每个reducer端进行排序,实现每个reducer内部是有序的,全部数据不一定有序,除非只有一个reducer。执行sort by可以为order by提高效率(一次归并排序就全局有序了)。
distribute by
distribute by类似mapreduce中的分区,用来控制map端在reducer上如何区分,distribute by会把key相同的数据分发到同一个reducer。可以和sort by结合使用,此时distribute by必须写在sort by之前。
cluster by
cluster by就是distribute by和sort by的结合。ps:cluster by指定的列只能是降序,不能指定asc和desc
案例
相关用法和结合方式
我们有一张描述商户相关表store,uid:商店所属商户,cash:商店的盈利,name:商店名
uid | cash | name |
---|---|---|
1 | 12 | 商店1 |
1 | 23 | 商店2 |
2 | 43 | 商店3 |
3 | 21 | 商店4 |
distribute by和sort by一起使用相当于cluster by
1 |
|
等价于
1 |
|
故,针对排序场景需要明确是否需要全局排序,大部分业务场景是不需要全局排序的,通常情况下,distribute by和sort by结合能够解决大部分业务问题。
15、Fetch抓取
场景
Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM emp;
在这种情况下,Hive可以简单地读取emp对应的存储目录下的文件,然后输出查询结果到控制台。
优化方法
参数设置
参数名称 | 默认值 | 说明 |
---|---|---|
hive.fetch.task.conversion | more【老版本 Hive 默认是 minimal】 | task是否走MR流程 |
none | 执行任何查询都走mapreduce | |
minimal | 执行(单个分区表)全局查找(select * )、字段查找、limit不走mapreduce,不加分区字段执行会走MR | |
more | 执行(无论是否有分区表)全局查找(select * )、字段查找、limit都不走mapreduce |
案例
(1)把hive.fetch.task.conversion设置成minimal,然后执行查询语句。
1 |
|
(2)把hive.fetch.task.conversion设置成more,如下查询方式不会执行mapreduce程序。
1 |
|
16、explain执行计划
通过执行计划来调节SQL语句
恕我直言,能看到认真从头看到这个调优方法的,SQL语句肯定会写,基本SQL优化也都会一点,但是按照explain执行计划来调节SQL语句,难度较大,建议PASS。
奉上explain与explain extend执行截图:
优化总结
优化时,把hive sql当做mapreduce程序来读,会有意想不到的惊喜。理解hadoop的核心能力,是hive优化的根本。
长期观察hadoop处理数据的过程,有几个显著的特征:
不怕数据多,就怕数据倾斜。
对jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果 多次关联多次汇总,产生十几个jobs,没半小时是跑不完的。map reduce作业初始化的时间是比较长的。
对sum,count来说,不存在数据倾斜问题。
对count(distinct),效率较低,数据量一多,准出问题,如果是多count(distinct )效率更低。
优化可以从几个方面着手:
- 好的模型设计事半功倍。
- 解决数据倾斜问题。
- 减少job数。
- 设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够)。
- 自己动手写sql解决数据倾斜问题是个不错的选择。set hive.groupby.skewindata=true;这是通用的算法优化,但算法优化总是漠视业务,习惯性提供通用的解决方法。Etl开发人员更了解业务,更了解数据,所以通过业务逻辑解决倾斜的方法往往更精确,更有效
- 对count(distinct)采取漠视的方法,尤其数据大的时候很容易产生倾斜问题,不抱侥幸心理。自己动手,丰衣足食。
- 对小文件进行合并,是行至有效的提高调度效率的方法,假如我们的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的影响。
- 优化时把握整体,单个作业最优不如整体最优。
优化的常用手段
主要由三个属性来决定:
hive.exec.reducers.bytes.per.reducer
这个参数控制一个job会有多少个reducer来处理,依据的是输入文件的总大小。默认1GB。hive.exec.reducers.max
#这个参数控制最大的reducer的数量, 如果 input / bytes per
reduce > max 则会启动这个参数所指定的reduce个数。
这个并不会影响mapre.reduce.tasks参数的设置。默认的max是999。mapred.reduce.tasks
这个参数如果指定了,hive就不会用它的estimation函数来自动计算reduce的个数,而是用这个参数来启动reducer。默认是-1。
参数设置的影响
1、如果reduce太少:如果数据量很大,会导致这个reduce异常的慢,从而导致这个任务不能结束,也有可能会OOM
2、如果reduce太多: 产生的小文件太多,合并起来代价太高,namenode的内存占用也会增大。如果我们不指定mapred.reduce.tasks, hive会自动计算需要多少个reducer。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!