Hive如何调优

Hive 调优

[TOC]

调优根源

​ Hive性能优化时,把HiveQL当做MapReduce程序来读,即从MapReduce的运行角度来考虑优化性能,从更底层思考如何优化运算性能,而不仅仅局限于逻辑代码的替换层面,所以Hive的优化即MapReduce的优化。

​ RAC(Real Application Cluster)真正应用集群就像一辆机动灵活的小货车,响应快;Hadoop就像吞吐量巨大的轮船,启动开销大,如果每次只做小数量的输入输出,利用率将会很低。所以用好Hadoop的首要任务是增大每次任务所搭载的数据量。

​ Hadoop的核心能力paritionsort,因而这也是优化的根本。
  观察Hadoop处理数据的过程,有几个显著的特征:

  • 数据的大规模并不是负载重点,造成运行压力过大是因为运行数据的倾斜

  • jobs数较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联对此汇总,产生几十个jobs,将会需要30分钟以上的时间且大部分时间被用于作业分配,初始化和数据输出。M/R作业初始化的时间是比较耗时间资源的一个部分。

  • 在使用SUM,COUNT,MAX,MIN等UDAF函数时,不怕数据倾斜问题,Hadoop在Map端的汇总合并优化过,使数据倾斜不成问题。

  • COUNT(DISTINCT)在数据量大的情况下,效率较低,如果多COUNT(DISTINCT)效率更低,因为COUNT(DISTINCT)是按GROUP
    BY字段分组,按DISTINCT字段排序,一般这种分布式方式是很倾斜的;比如:男UV,女UV,淘宝一天30亿的PV,如果按性别分组,分配2个reduce,每个reduce处理15亿数据。

  • 数据倾斜是导致效率大幅降低的主要原因,可以采用多一次 Map/Reduce 的方法, 避免倾斜。

最后得出的结论是:避实就虚,用 job 数的增加,输入量的增加,占用更多存储空间,充分利用空闲 CPU 等各种方法,分解数据倾斜造成的负担。

优化手段

1、列裁剪

​ Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其它列。 例如,若有以下查询:

1
2
SELECT * FROM Demo WHERE e<10;		-- 优化前
SELECT a,b FROM Demo WHERE e<10; -- 优化后

​ 在实施此项查询中,Q 表有 5 列(a,b,c,d,e),Hive 只读取查询逻辑中真实需要 的 3 列 a、b、e,而忽略列 c,d;这样做节省了读取开销,中间表存储开销和数据整合开销。

列裁剪对应的参数项为:hive.optimize.cp=true(默认值为真)

2、分区裁剪

  可以在查询的过程中减少不必要的分区。 例如,若有以下查询:

1
2
SELECT * FROM (SELECTT a1,COUNT(1) FROM T GROUP BY a1) subq WHERE subq.prtn=100; 		--(多余分区) 
SELECT * FROM T1 JOIN (SELECT * FROM T2) subq ON (T1.a1=subq.a2) WHERE subq.prtn=100; --(分区减少)

​ 查询语句若将“subq.prtn=100”条件放入子查询中更为高效,可以减少读入的分区 数目。 Hive 自动执行这种裁剪优化。

分区裁剪对应的参数为:hive.optimize.pruner=true(默认值为真)

3、大表与小表JOIN

​ 在编写带有 join 操作的代码语句时,应该将条目少的表/子查询放在 Join 操作符的左边。 因为在 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,载入条目较少的表 可以有效减少 OOM(out of memory)即内存溢出。所以对于同一个 key 来说,对应的 value 值小的放前,大的放后,这便是“小表放前”原则。 若一条语句中有多个 Join,依据 Join 的条件相同与否,有不同的处理方法。

场景

​ 大表与小表Join时容易发生数据倾斜,表现为小表的数据量比较少但key却比较集中,导致分发到某一个或几个reduce上的数据比其他reduce多很多,造成数据倾斜。

优化方法

使用Map Join将小表装入内存,在map端完成join操作,这样就避免了reduce操作,前提条件是需要的数据在 Map 的过程中可以访问到。有两种方法可以执行Map Join:

(1) 通过hint指定小表做MapJoin
1
select /*+ MAPJOIN(time_dim) */ count(*) from store_sales join time_dim on ss_sold_time_sk = t_time_sk;
(2) 通过配置参数自动做MapJoin

核心参数:

参数名称 默认值 说明
hive.auto.convert.join false 是否将 common join (reduce 端 join)转换为map join
hive.mapjoin.smalltable.filesize 25000000 判断小表的输入文件的大小阈值,默认为25M

​ 因此,巧用MapJoin可以有效解决小表关联大表场景下的数据倾斜。

相关参数:
| 参数名称 | 默认值 | 说明 |
| ————————– | —— | ———————————————————— |
| hive.join.emit.interval | 1000 | 在发出join结果之前对join最右操作缓存多少行的设定,hive jira里有个对该值设置太小的bugfix |
| hive.mapjoin.size.key | 10000 | MapJoin 每一行键的大小,以字节为单位 |
| hive.mapjoin.cache.numrows | 25000 | Map Join 所缓存的行数。 |

4、大表与大表JOIN

场景

大表与大表Join时,当其中一张表的NULL值(或其他值)比较多时,容易导致这些相同值在reduce阶段集中在某一个或几个reduce上,发生数据倾斜问题。

优化方法
(1) Null值合并

​ 将NULL值提取出来最后合并,这一部分只有map操作;非NULL值的数据分散到不同reduce上,不会出现某个reduce任务数据加工时间过长的情况,整体效率提升明显。这种方法由于有两次Table Scan会导致map增多。

1
2
3
4
5
6
7
8
9
10
SELECT a.user_Id,a.username,b.customer_id
FROM user_info a
LEFT JOIN customer_info b
ON a.user_id = b.user_id
where a.user_id IS NOT NULL

UNION ALL
SELECT a.user_Id,a.username,NULL
FROM user_info a
WHERE a.user_id IS NULL
(2) Null给定随机值

在Join时直接把NULL值打散成随机值来作为reduce的key值,不会出现某个reduce任务数据加工时间过长的情况,整体效率提升明显。这种方法解释计划只有一次map,效率一般优于第一种方法。

1
2
3
4
5
6
7
8
9
10
11
SELECT a.user_id,a.username,b.customer_id
FROM user_info a
LEFT JOIN customer_info b
ON
CASE WHEN
a.user_id IS NULL
THEN
CONCAT ('dp_hive', RAND())
ELSE
a.user_id
END = b.user_id;

5、GROUP BY 操作

场景

Hive做group by查询,当遇到group by字段的某些值特别多的时候,会将相同值拉到同一个reduce任务进行聚合,也容易发生数据倾斜。

优化方法
(1) 开启Map端聚合

参数设置:

参数名称 默认值 说明
hive.map.aggr true 是否开启Map端聚合
hive.groupby.mapaggr.checkinterval 100000 在Map端进行聚合操作的各条数目
(2) 有数据倾斜时进行负载均衡

参数设置:

参数名称 默认值 说明
hive.groupby.mapaggr.checkinterval false 当Group By 有数据倾斜时是否进行负载均衡

​ 当设定hive.groupby.skewindata为true时,生成的查询计划会有两个MapReduce任务。在第一个MapReduce 中,map的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,这样处理之后,相同的 Group By Key 有可能分发到不同的 reduce 中,从而达到负载均衡的目的。在第二个 MapReduce 任务再根据第一步中处理的数据按照Group By Key分布到reduce中,(这一步中相同的key在同一个reduce中),最终生成聚合操作结果。

6、COUNT(DISTINCT) 操作

场景

​ 当在数据量比较大的情况下,由于COUNT DISTINCT操作是用一个reduce任务来完成,这一个reduce需要处理的数据量太大,就会导致整个job很难完成,这也可以归纳为一种数据倾斜。

优化方法
(1)使用GroupBy

将COUNT DISTINCT使用先GROUP BY再COUNT的方式替换。例如:

1
select count(id) from (select id from bigtable group by id) a

因此,count distinct的优化本质上也是转成group by操作。

案例

  计算 uv 的时候,经常会用到 COUNT(DISTINCT),但在数据比较倾斜的时候 COUNT(DISTINCT) 会比较慢。这时可以尝试用 GROUP BY 改写代码计算 uv。

1
2
3
4
-- 原有代码
INSERT OVERWRITE TABLE s_dw_tanx_adzone_uv PARTITION (ds=20120329)
SELECT 20120329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv FROM s_ods_log_tanx_pv t WHERE t.ds=20120329 GROUP BY adzoneid
-- 测试数据:169857条

​ 关于COUNT(DISTINCT)的数据倾斜问题不能一概而论,要依情况而定,下面是我测试的一组数据:

1
2
3
-- 统计每日IP
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(DISTINCT ip) AS IP FROM logdfs WHERE logdate='2014_12_29';
-- 耗时:24.805 seconds
1
2
3
4
5
-- 统计每日IP(改造)

CREATE TABLE ip_2014_12_29 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip from logdfs WHERE logdate='2014_12_29') tmp;

-- 耗时:46.833 seconds

  测试结果表名:明显改造后的语句比之前耗时,这是因为改造后的语句有2个SELECT,多了一个job,这样在数据量小的时候,数据不会存在倾斜问题。

7、合并小文件

原因
  • 众所周知,小文件在HDFS中存储本身就会占用过多的内存空间,那么对于MR查询过程中过多的小文件又会造成启动过多的Mapper Task, 每个Mapper都是一个后台线程,会占用JVM的空间
  • 在Hive中,动态分区会造成在插入数据过程中,生成过多零碎的小文件(请回忆昨天讲的动态分区的逻辑)
  • 不合理的Reducer Task数量的设置也会造成小文件的生成,因为最终Reducer是将数据落地到HDFS中的
场景

​ 我们知道文件数目小,容易在文件存储端造成瓶颈,给 HDFS 带来压力,影响处理效率。对此,可以通过合并Map和Reduce的结果文件来消除这样的影响

优化方法
(1)开启map/reduce端文件合并

参数设置:

参数名称 默认值 说明
hive.merge.mapfiles true 是否合并Map输出文件
hive.merge.mapredfiles false 是否合并Reduce 端输出文件
hive.merge.size.per.task 256000000(256M) 合并文件的大小
(2)存储格式

​ 采用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件(常见于在流计算的时候采用Sequencefile格式进行存储)

(3)合并文件

​ 关联几百张小表的情况下,阔以合并多个文件数据到一个文件中,重新构建表

8、谓词下推(已过时)

​ 所谓谓词下推就是通过嵌套的方式,将底层查询语句尽量推到数据底层去过滤,这样在上层应用中就可以使用更少的数据量来查询,这种SQL技巧被称为谓词下推(Predicate pushdown)

eg:采用谓词下推的技术,提早进行过滤有可能减少必须在数据库分区之间传递的数据量

1
2
3
4
5
6
7
8
9
10
-- 优化前
SELECT * FROM stu as t
LEFT JOIN course as t1
ON t.id=t2.stu_id
WHERE t.age=18;

-- 优化后
SELECT * FROM (SELECT * FROM stu WHERE age=18) as t
LEFT JOIN course AS t1
on t.id=t1.stu_id

9、设置并行执行任务数

​ Hive会将一个查询转化为一个或多个阶段,包括:MapReduce阶段、抽样阶段、合并阶段、limit阶段等。默认情况下,一次只执行一个阶段。不过,如果某些阶段不是互相依赖,是可以并行执行的。

核心参数

参数名称 默认值 说明
hive.exec.parallel false 是否打开任务并行执行(默认按顺序执行)
hive.exec.parallel.thread.number 8 同一个 sql 允许最大并行度

10、设置合理的map和reduce数量

原因:

  • 过多的启动和初始化 map / reduce 也会消耗时间和资源
  • 数据量较大的情况下,默认的设置下,map端会产生很多的切片文件,不断的溢写与溢出等,会十分消耗资源
  • 有多少个Reduer就会有多少个文件产生,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题
优化方法

参数设置

参数名称 默认值 说明
hive.exec.reducers.bytes.per.reducer 256000000 每个Reduce处理的数据默认是256MB
hive.exec.reducers.max 1009 每个任务最大的reduce数
mapreduce.job.reduces 1 reduce数量,也可以通过编程的方式,调用Job对象的setNumReduceTasks()方法来设置

一个节点Reduce任务数量上限由mapreduce.tasktracker.reduce.tasks.maximum设置(默认2)。

1
2
3
4
5
6
7
8
-- 可以采用以下探试法来决定Reduce任务的合理数量:

-- 1、每个reducer都可以在Map任务完成后立即执行
0.95 * (节点数量 * mapreduce.tasktracker.reduce.tasks.maximum)


-- 2、较快的节点在完成第一个Reduce任务后,马上执行第二个
1.75 * (节点数量 * mapreduce.tasktracker.reduce.tasks.maximum)

11、JVM重用

​ JVM重用是Hadoop中调优参数的内容,该方式对Hive的性能也有很大的帮助,特别对于很难避免小文件的场景或者Task特别多的场景,这类场景的大数据执行时间都很短

​ Hadood的默认配置通常是使用派生JVM来执行map和reduce任务的,会造成JVM的启动过程比较大的开销,尤其是在执行Job包含有成百上千个task任务的情况。

​ 简单来说就是:JVM重用可以使得JVM实例在同一个job中重复使用N次,不需要重复的开启与关系Container来消耗资源

优化方法

​ N的值可以在hadoop的mapred-site.xml文件中进行设置,N值默认为 1

1
2
3
4
<property>
<name>mapred.job.reuse.jvm.num.tasks</name>
<value>1</value>
</property>

12、设置本地模式

场景

​ 有时候Hive处理的数据量非常小,那么在这种情况下,为查询出发执行任务的时间消耗可能会比实际job的执行时间要长,对于大多数这种情况,hive可以通过本地模式在单节点上处理所有任务,避免了网络传输、Container开关等带来的消耗,对于小数据量任务可以大大的缩短时间

当一个job满足如下条件才能真正使用本地模式:
1.job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
2.job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
3.job的reduce数必须为0或者1

优化方法

参数配置

参数名称 默认值 说明
hive.exec.mode.local.auto false 是否开启Hive的本地模式
hive.exec.mode.local.auto.inputbytes.max 128MB 本地模式中job的输入数据最大值
hive.exec.mode.local.auto.tasks.max 4 本地模式中job的map数量最大值

13、使用严格模式

​ Hive提供了一种严格模式,可以防止用户执行那些可能产生意想不到的不好的影响查询

场景
  • 对于分区表,除非WHERE语句中含有分区字段过滤条件来限制数据范围,否则不允许执行,也就是说不允许扫描所有分区
  • 使用ORDER BY 语句进行查询是,必须使用LIMIT语句,因为ORDER BY 为了执行排序过程会将所有结果数据分发到同一个reduce中进行处理,强制要求用户添加LIMIT可以防止reducer额外的执行很长时间
优化方法

参数配置

参数名称 默认值 说明
Hive.mapred.mode nonstrict 开启Hive的严格模式

14、合理的选择排序

​ Hive中排序方式包括order by,sort by,distribute by和cluster by四种。在Hive中一定要合理选择排序的方式,下面我对几种排序及使用方法做一个介绍。

场景

order by

​ order by对查询的结果做一次全局排序,也就是说指定order by的所有数据都会到一个reducer中处理,对数据量特别大的情况,执行时间会特别长。

​ 如果指定了严格模式,此时必须指定limit来限制输出条数,因为所有数据在一个reducer端进行,数据量大时可能不会出结果。

1
set hive.mapred.mode=strict

sort by

​ sort by是局部排序,会在每个reducer端进行排序,实现每个reducer内部是有序的,全部数据不一定有序,除非只有一个reducer。执行sort by可以为order by提高效率(一次归并排序就全局有序了)。

distribute by

​ distribute by类似mapreduce中的分区,用来控制map端在reducer上如何区分,distribute by会把key相同的数据分发到同一个reducer。可以和sort by结合使用,此时distribute by必须写在sort by之前。

cluster by

​ cluster by就是distribute by和sort by的结合。ps:cluster by指定的列只能是降序,不能指定asc和desc

案例

相关用法和结合方式

我们有一张描述商户相关表store,uid:商店所属商户,cash:商店的盈利,name:商店名

uid cash name
1 12 商店1
1 23 商店2
2 43 商店3
3 21 商店4

​ distribute by和sort by一起使用相当于cluster by

1
select uid, cash, name from store distribute by uid sort by uid 

​ 等价于

1
select uid, cash, name from store cluster by uid 

​ 故,针对排序场景需要明确是否需要全局排序,大部分业务场景是不需要全局排序的,通常情况下,distribute by和sort by结合能够解决大部分业务问题。

15、Fetch抓取

场景

​ Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM emp;在这种情况下,Hive可以简单地读取emp对应的存储目录下的文件,然后输出查询结果到控制台。

优化方法

参数设置

参数名称 默认值 说明
hive.fetch.task.conversion more【老版本 Hive 默认是 minimal】 task是否走MR流程
none 执行任何查询都走mapreduce
minimal 执行(单个分区表)全局查找(select * )、字段查找、limit不走mapreduce,不加分区字段执行会走MR
more 执行(无论是否有分区表)全局查找(select * )、字段查找、limit都不走mapreduce
案例

(1)把hive.fetch.task.conversion设置成minimal,然后执行查询语句。

1
2
3
4
5
6
7
8
9
hive (default)> set hive.fetch.task.conversion=minimal;
-- 会执行MR
hive (default)> select * from logs;
hive (default)> select logid from logs;
hive (default)> select logid from logs limit 3;
-- 不会执行MR
hive (default)> select * from logs where day='20200701';
hive (default)> select logid from logs where day='20200701';
hive (default)> select logid from logs where day='20200701' limit 3;

(2)把hive.fetch.task.conversion设置成more,如下查询方式不会执行mapreduce程序。

1
2
3
4
5
6
7
8
9
10
hive (default)> set hive.fetch.task.conversion=more;
-- 都不会执行MR
-- 不会执行MR
hive (default)> select * from logs;
hive (default)> select logid from logs;
hive (default)> select logid from logs limit 3;
-- 不会执行MR
hive (default)> select * from logs where day='20200701';
hive (default)> select logid from logs where day='20200701';
hive (default)> select logid from logs where day='20200701' limit 3;

16、explain执行计划

​ 通过执行计划来调节SQL语句

​ 恕我直言,能看到认真从头看到这个调优方法的,SQL语句肯定会写,基本SQL优化也都会一点,但是按照explain执行计划来调节SQL语句,难度较大,建议PASS。

奉上explain与explain extend执行截图:

explain

explain extend

优化总结

  优化时,把hive sql当做mapreduce程序来读,会有意想不到的惊喜。理解hadoop的核心能力,是hive优化的根本。

长期观察hadoop处理数据的过程,有几个显著的特征:

  • 不怕数据多,就怕数据倾斜。

  • 对jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果 多次关联多次汇总,产生十几个jobs,没半小时是跑不完的。map reduce作业初始化的时间是比较长的。

  • 对sum,count来说,不存在数据倾斜问题。

  • 对count(distinct),效率较低,数据量一多,准出问题,如果是多count(distinct )效率更低。

优化可以从几个方面着手:

  1. 好的模型设计事半功倍。
  2. 解决数据倾斜问题。
  3. 减少job数。
  4. 设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够)。
  5. 自己动手写sql解决数据倾斜问题是个不错的选择。set hive.groupby.skewindata=true;这是通用的算法优化,但算法优化总是漠视业务,习惯性提供通用的解决方法。Etl开发人员更了解业务,更了解数据,所以通过业务逻辑解决倾斜的方法往往更精确,更有效
  6. 对count(distinct)采取漠视的方法,尤其数据大的时候很容易产生倾斜问题,不抱侥幸心理。自己动手,丰衣足食。
  7. 对小文件进行合并,是行至有效的提高调度效率的方法,假如我们的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的影响。
  8. 优化时把握整体,单个作业最优不如整体最优。

优化的常用手段

  主要由三个属性来决定:

  • hive.exec.reducers.bytes.per.reducer
    这个参数控制一个job会有多少个reducer来处理,依据的是输入文件的总大小。默认1GB。

  • hive.exec.reducers.max

    #这个参数控制最大的reducer的数量, 如果 input / bytes per
    reduce > max 则会启动这个参数所指定的reduce个数。
    这个并不会影响mapre.reduce.tasks参数的设置。默认的max是999。

  • mapred.reduce.tasks
    这个参数如果指定了,hive就不会用它的estimation函数来自动计算reduce的个数,而是用这个参数来启动reducer。默认是-1。

参数设置的影响

1、如果reduce太少:如果数据量很大,会导致这个reduce异常的慢,从而导致这个任务不能结束,也有可能会OOM

2、如果reduce太多: 产生的小文件太多,合并起来代价太高,namenode的内存占用也会增大。如果我们不指定mapred.reduce.tasks, hive会自动计算需要多少个reducer。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!