Hive

关于

学习Hive时的笔记

配置

环境要求
- Hive 1.2 需要 java 1.7+, 0.14-1.1 可以工作在 java1.6 版本上。
- Hadoop 2.x
- 可以运行在 Linux 和 Windows 环境, Mac 通常用来做开发的!

添加 $HIVE_HOME 环境变量,并将$bin目录添加到$PATH变量中。

hive.metastore.warehouse.dir 配置指明数据仓库的目录,默认是 /user/hive/warehouse/tmp(临时文件目录)

Hive 基本概念

Hive 的定位

Hive 用来做数据仓库,非实时数据处理。

数据单元

类型系统

隐式类型转换,只能从低精度到高精度。也允许从 STRING 到 DOUBLE。
显示类型转化可以用内置函数实现。

复杂类型

结构体类型的操作!怎么用?!

操作

除了常规的比较操作,还支持正则式比较:

A RLIKE B, A REGEXP B,字符串A是否匹配Java正则式B。注意有个坑,正则式B中的\需要转义字符!!
例子:lat rlike '\d+\.\d+'是错误的,应该是 lat rlike '\\d+\\.\\d+'

内置函数

count 会自动去掉NULL值,这在条件count的时候很有用,例如分别统计在a>1的情况下和a<0情况下的uid,可以用一个查询搞定,不用join

select count(distinct if(a>1, uid, null)) as cnt1,
        count(distinct if(a<0, uid, null)) as cnt0
from some_table

Hive SQL

select uid, row_number() over (partition by uid order by uid)
from

join优化:将大表放右边!

Also it is best to put the largest table on the rightmost side of the join to get the best performance.

FROM pv_users
INSERT OVERWRITE TABLE pv_gender_sum
    SELECT pv_users.gender, count_distinct(pv_users.userid)
    GROUP BY pv_users.gender

INSERT OVERWRITE DIRECTORY '/user/data/tmp/pv_age_sum'
    SELECT pv_users.age, count_distinct(pv_users.userid)
    GROUP BY pv_users.age;
insert overwrite table pv partition (dt='2010-01-01', country)  ----- 动态决定country分区,dt分区值固定
insert overwrite table pv partition (dt, country='US')          ----- 将所有dt分区下 country='US' 子分区都覆盖,一般不要能用这种写法!

如果分区字段为NULL,会写入到默认分区HIVE_DEFAULT_PARTITION中。

影响动态分区的一些配置:

TABLESAMPLE(BUCKET 3 OUT OF 64 ON userid)
CREATE TABLE array_table (int_array_column ARRAY<INT>);

SELECT pv.friends[2]
FROM page_views pv;

相关UDAF函数percentile_approx, histogram_numeric, collect_set, collect_list
- Map 操作:

FROM (
     FROM pv_users
     MAP pv_users.userid, pv_users.date
     USING 'map_script'
     AS dt, uid
     CLUSTER BY dt) map_output

INSERT OVERWRITE TABLE pv_users_reduced
     REDUCE map_output.dt, map_output.uid
     USING 'reduce_script'
     AS date, count;

脚本:

import sys
import datetime

for line in sys.stdin:
  line = line.strip()
  userid, unixtime = line.split('\t')
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  print ','.join([userid, str(weekday)])

优化排序

不要使用order by https://stackoverflow.com/questions/13715044/hive-cluster-by-vs-order-by-vs-sort-by
- ORDER BY 全局排序,但是只能使用一个reducer
- DISTRIBUTE BY 采用Hash算法将map处理后的数据分发给reduce,它保证了相同的key是在同一个reducer
- SORT BY 不是全局排序,而是在数据进入reduce之前完成排序,只能保证每个reducer的输出是有序的,不能保证全局有序。
- CLUSTER BY 相当于先 DISTRIBUTE 然后 sort。也不能保证全局有序。

HIVE 命令

文件格式

textfile格式

文本格式

ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\001'
   LINES TERMINATED BY '\n'
   COLLECTION ITEMS TERMINATED BY '\002'
   MAP KEYS TERMINATED BY '\003'
 STORED AS TEXTFILE;

Avro 格式

ORC 格式

Optimized Row Columnar 格式,采用这种格式可以提升HIVE读写性能。

文件结构:以Strip为单位(默认250MB)。

cli读取命令 hive --orcfiledump

创建表的时候这样写:STORED AS ORC 即可。

Parquet 格式

Hadoop 生态中的一个!

压缩文件格式

直接从 gzip 等格式中存取为text格式表格。

CREATE TABLE raw (line STRING)
   ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n';

LOAD DATA LOCAL INPATH '/tmp/weblogs/20090603-access.log.gz' INTO TABLE raw;

LZO

UDF

UDF,UDAF,UDTF

HIVE数据类型与 java 数据类型对应关系:

hive   java
map    HashMap
array  ArrayList<?>

JOIN

SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

只有一个 Map/Reduce 任务。而下面这个会有2个 Map/Reduce 任务(JOIN a,b; JOIN * ,c)。

SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)

In every map/reduce stage of the join, the last table in the sequence is streamed through the reducers where as the others are buffered. Therefore, it helps to reduce the memory needed in the reducer for buffering the rows for a particular value of the join key by organizing the tables such that the largest tables appear last in the sequence.

将大表放后面,大表会以streaming的方式进入reducer,而其他的一buffer的方式存在(内存?),可以减少内存的需求。
默认让最后的表以streaming方式进入reducer,也可以手动指定。

SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

当存在这个hint的时候,会将表b,c缓存,而让a以流的方式进入reducer。不存在的时候,则会将最后的表以流的方式进入reducer。

但是在实现的时候,会先用WHERE里面的条件过滤吧?!否则性能不是很差!?

LEFT SEMI JOIN implements the uncorrelated IN/EXISTS subquery semantics in an efficient way.

SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a JOIN b ON a.key = b.key

上述代码不需要 reducer!

The restriction is that a FULL/RIGHT OUTER JOIN b cannot be performed.

set hive.optimize.bucketmapjoin = true
SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM A a JOIN B b ON a.key = b.key

同时需要设置参数:

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

可以设置 hive.auto.convert.join=true 让hive自动帮你转为 MAPJOIN。从 Hive 0.11.0 开始,默认值就是true。
MAPJOIN 将小表放到内存,保存为一个 HASH MAP。工作流程是:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization

Local work:
read records via standard table scan (including filters and projections) from source on local machine
build hashtable in memory
write hashtable to local disk
upload hashtable to dfs
add hashtable to distributed cache

Map task
read hashtable from local disk (distributed cache) into memory
match records' keys against hashtable
combine matches and write to output

MAPJOIN 的限制:

select /*+ MAPJOIN(time_dim, date_dim) */ count(*) from
store_sales
join time_dim on (ss_sold_time_sk = t_time_sk)
join date_dim on (ss_sold_date_sk = d_date_sk)
where t_hour = 8 and d_year = 2002

通过两个值设置 MAPJOIN

set hive.auto.convert.join.noconditionaltask = true;
set hive.auto.convert.join.noconditionaltask.size = 10000000;

SMB Map Join: Sort-Merge-Bucket (SMB) joins

表已经是分桶并且排序好的, JOIN 过程通过顺序 merge 已经排序好的表即可。(效率比普通 JOIN 高)

However, if the tables are partitioned, there could be a slow down as each mapper would need to get a very small chunk of a partition which has a single key.

set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

------ 大表自动转化设置
set hive.auto.convert.sortmerge.join.bigtable.selection.policy
    = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;

大表选择策略会自动决定哪个表被 streaming,而不是 hash 并且 streaming。可选策略有

org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ (default)
org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ
org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ

如果表有不同数量的keys(SORT 列),会发生异常!

SMB 存在的目的主要是为了解决大表与大表间的 Join 问题,分桶其实就是把大表化成了“小表”,然后 Map-Side Join 解决之,这是典型的分而治之的思想。

这个Blog写的不错,https://my.oschina.net/leejun2005/blog/178631

编写自己的UDF

TRANSFORM 貌似不能实现 UDAF。可以用java写UDF或UDAF,UDTF等。需要jdk1.7版本。

ADD JAR hdfs:///user/hadoop-data/user_upload/hive-kv-udaf_2.10-0.0.1.jar;
CREATE TEMPORARY FUNCTION kv as 'KV';

UNION

Lateral View

用在 UDTF 中。对于输入的一行,输出是多行。0.12.0 版本开始,列名可以不用写,会自动采用UDTF输出的StructObjectInspector对象自动得到列名。参考https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView

一个例子,adid_list是一个Array,explode()函数会将这个list输出为多行。

SELECT pageid, adid
FROM pageAds LATERAL VIEW explode(adid_list) adTable AS adid;

SELECT adid, count(1)
FROM pageAds LATERAL VIEW explode(adid_list) adTable AS adid
GROUP BY adid;

SELECT * FROM src LATERAL VIEW OUTER explode(array()) C AS a limit 10;

select uid, key, val
from table LATERAL VIEW explode(kv) adTable AS key, val

FROM 语句里面可以包含多个 Lateral View。通过 OUTER 关键字可以让 explode 输出为NULL的时候,
记录至少存在一行!(没有这个关键字,结果中将不会出现记录)

子查询

子查询放在 FROM 里面,在 0.13 版本后,可以放在 IN 和 EXISTS 之中,但是存在一些限制。

采样

TABLESAMPLE (BUCKET x OUT OF y [ON colname])

colname 可以是非分区字段以外的字段 或者 RAND()
表采样是很慢的,如果建表的时候采用CLUSTERED BY创建,
那么可以加快采样速度,因为只要简单地取出对应的BUCKET就可以了,而不用全表扫描。

更多信息参考:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Sampling

block sampling @since(0.8)

TABLESAMPLE (n PERCENT)
TABLESAMPLE (ByteLengthLiteral)   ---- 例如 100M
TABLESAMPLE (n ROWS)

----- 例子
SELECT *
FROM source TABLESAMPLE(100M) s;

这个是在 HDFS block level 上进行的采样,
所以一些压缩格式表数据不支持这个特性。复现可以通过设置种子来实现set hive.sample.seednumber=<INTEGER>;

虚拟列

INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE 在 Mapper 里面分别指输入文件名 和 全局文件位置

简单例子

select INPUT__FILE__NAME, key, BLOCK__OFFSET__INSIDE__FILE from src;
select key, count(INPUT__FILE__NAME) from src group by key order by key;
select * from src where BLOCK__OFFSET__INSIDE__FILE > 12000 order by key;

窗函数和分析函数

@since(0.11)

窗函数(没搞懂)

OVER

可以将聚合函数的返回值应用到每一列(窗函数的功能),就像分析函数那样!!

SELECT a, SUM(b) OVER (PARTITION BY c, d ORDER BY e, f)
FROM T;

SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY d ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
FROM T;

分析函数

其他细节

COUNT(DISTINCT a) OVER (PARTITION BY c)

SELECT rank() OVER (ORDER BY sum(b))
FROM T
GROUP BY a;

Enhanced Aggregation, Cube, Grouping and Rollup

SELECT a, b, SUM(c) FROM tab1 GROUP BY a, b GROUPING SETS ( (a,b) , a)

---- 等价于
SELECT a, b, SUM( c ) FROM tab1 GROUP BY a, b
UNION
SELECT a, null, SUM( c ) FROM tab1 GROUP BY a

设置 hive.new.job.grouping.set.cardinality 的值,当候选分组数目(上面分别是8和4)超过这个值时,将开启额外的 Mapper Reducer 任务来处理。

EXPLAIN 命令

用来显示 query 的执行计划的,例如对于这个query有几个stage等。

EXPLAIN [EXTENDED|DEPENDENCY|AUTHORIZATION] query

HIVE 权限管理

MORE

HIVE on spark!

UDF 开发

支持 JAVA(或Scala) 写UDF UDAF UDTF!依赖:

UDF

实现普通函数 (v1, ...) -> (w1, ...)。
需要继承org.apache.hadoop.hive.ql.exec.UDF这个类,并实现 evaluate 方法。

UDFTrim 模板

MAC 切换不同的JDK

参考:https://stackoverflow.com/questions/20974607/can-java-7-and-java-8-co-exist-on-osx

use-java () {
    export JAVA_HOME=`/usr/libexec/java_home -v 1.$1`
}

然后使用 use-java 7 就可以切换到 jdk 1.7 了。

ERROR 汇总

org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.io.IOException: Split metadata size exceeded 10000000. Aborting job job_1469532484579_647683

解决方法:set mapreduce.jobtracker.split.metainfo.maxsize=-1;

问题

TIPS

# YARN: Hadoop 2
set mapreduce.job.maps=<num>;
set mapreduce.job.reduces=<num>;

另一种去重的方法

select sum(if(r=1, 1, 0)) as distinct_num,
    sum(1) as num
from (
    select
        id,
        row_number() over (partition by id) as r
    from tableA
)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;   --- 不融合
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;  --- 融合
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.exec.max.dynamic.partitions=<最大总的分区数目>;
set hive.exec.max.dynamic.partitions.pernode=<每一个MR节点创建的最大分区数目>;