本博客仅作学习记录所用,基于尚硅谷和黑马程序员做的笔记…
Hive学习笔记1
Hive学习笔记3
在Hive中,当下版本3.1.2总共支持6种join语法。分别是:
inner join(内连接)、left join(左连接)、right join(右连接)、full outer join(全外连接)、left semi join(左半开连接)、cross join(交叉连接,也叫做笛卡尔乘积)。
这允许FROM子句连接以逗号分隔的表列表,而省略JOIN关键字。
例如:
--隐式联接表示法
SELECT *
FROM table1 t1, table2 t2, table3 t3
WHERE t1.id = t2.id AND t2.id = t3.id AND t1.zipcode = '02535';
hive (default)> select e.eid, e.ename, d.did, d.dname from emp e join dept d on e.did = d.did;
select * from emp e left join dept d on e.did<>d.did;
hive (default)> select * from emp e join dept d on e.did = d.did;
内连接:只有进行连接的两个表中都存在与连接条件相匹配的数据才会被保留下来。
其中inner可以省略。

hive (default)> select e.eid, e.ename, d.deptno from emp e join dept d on e.did = d.did;
--等价于 隐式连接表示法
hive (default)> select e.eid, e.ename, d.deptno from emp e,dept d where e.did = d.did;
left join:左外连接(Left Outer Jion)或者左连接,其中outer可以省略,left outer join是早期的写法。
通俗解释:join时以左表的全部数据为准,右边与之关联;左表数据全部返回,右表关联上的显示返回,关联不上的显示null返回。

hive (default)> select e.eid, e.ename, d.did from emp e left join dept d on e.did = d.did;
right join:右外连接(Right Outer Jion)或者右连接,其中outer可以省略。
通俗解释:join时以右表的全部数据为准,左边与之关联;右表数据全部返回,左表关联上的显示返回,关联不上的显示null返回。

hive (default)> select e.eid, e.ename, d.did from emp e right join dept d on e.did = d.did;
full outer join 等价 full join :全外连接或者外连接。
包含左、右两个表的全部行,不管另外一边的表中是否存在与它们匹配的行 ,如果任一表的指定字段没有符合条件的值的话,那么就使用NULL值替代。
在功能上,它等价于对这两个数据集合分别进行左外连接和右外连接,然后再使用消去重复行的操作将上述两个结果集合并为一个结果集。

hive (default)> select e.eid, e.ename, d.did from emp e full join dept d on e.d=id = d.did;
左半开连接(LEFT SEMI JOIN)会返回左边表的记录,前提是其记录对于右边的表满足ON语句中的判定条件。
从效果上来看有点像inner join之后只返回左表的结果。
select *
from emp e left semi join dept d
on e.deptno =d.deptno;
--相当于 inner join,但是只返回左表全部数据, 只不过效率高一些
select e.*
from emp e inner join dept d
on e.deptno =d.deptno;
交叉连接cross join,将会返回被连接的两个表的笛卡尔积,返回结果的行数等于两个表行数的乘积。对于大表来说,cross join慎用。--cross join
--下列A、B、C 执行结果相同,但是效率不一样:
--A:
select e.*,d.* from emp e,dept d where e.deptno =d.deptno;
--B:
select * from emp e cross join dept d on e.deptno =d.deptno;
select * from emp e cross join dept d where e.deptno =d.deptno;
--C:
select * from emp e inner join dept d on e.deptno =d.deptno;
--一般不建议使用方法A和B,因为如果有WHERE子句的话,往往会先生成两个表行数乘积的行的数据表然后才根据WHERE条件从中选择。
--因此,如果两个需要求交集的表太大,将会非常非常慢,不建议使用。
explain select e.*,d.* from emp e,dept d where e.deptno =d.deptno;
--B:
explain select * from emp e cross join dept d on e.deptno =d.deptno;
explain select * from emp e cross join dept d where e.deptno =d.deptno;
--C:
explain select * from emp e inner join dept d on e.deptno =d.deptno;
注意:连接 n个表,至少需要n-1个连接条件。例如:连接三个表,至少需要两个连接条件。
数据准备
1700 Beijing
1800 London
1900 Tokyo
hive (default)> create table if not exists location(
> loc int,
> loc_name string
> )
> row format delimited fields terminated by ' ';
hive (default)> load data local inpath '/opt/module/datas/location.txt' into table location;
hive (default)> SELECT e.ename, d.dname, l.loc_name FROM emp e JOIN dept d ON d.did = e.did JOIN location l ON d.loc = l.loc;
大多数情况下,Hive会对每对JOIN连接对象启动一个MapReduce任务。
本例中会首先启动一个MapReduce job对表e和表d进行连接操作,然后会再启动一个MapReduce job将第一个MapReduce job的输出和表l进行连接操作。
注意:为什么不是表d和表l先进行连接操作呢?这是因为Hive总是按照从左到右的顺序执行的。
优化:当对3个或者更多表进行join连接时,如果每个on子句都使用相同的连接键的话,那么只会产生一个MapReduce job。
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
--由于联接中仅涉及b的key1列,因此被转换为1个MR作业来执行
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
--会转换为两个MR作业,因为在第一个连接条件中使用了b中的key1列,而在第二个连接条件中使用了b中的key2列。
-- 第一个map / reduce作业将a与b联接在一起,然后将结果与c联接到第二个map / reduce作业中。
hive (default)> select eid, dname from emp, dept;
会对输出的结果进行全局排序,因此底层使用MapReduce引擎执行的时候,只会有一个reducetask执行。也因此,如果输出的行数太大,会导致需要很长的时间才能完成全局排序。hive (default)> select * from emp order by sal;
(2)查询员工信息按工资降序排列
hive (default)> select * from emp order by sal desc;
按照员工薪水的2倍排序
hive (default)> select ename, sal*2 twosal from emp order by twosal;
按照部门和工资升序排序
hive (default)> select ename, did, sal from emp order by did, sal;
Sort By:对于大规模的数据集order by的效率非常低。在很多情况下,并不需要全局排序,此时可以使用sort by。
Sort by为每个reducer产生一个排序文件。每个Reducer内部进行排序,对全局结果集来说不是排序。
1)设置reduce个数
hive (default)> set mapreduce.job.reduces=3;
若是不指定reduce task个数:
日志会显示:Number of reduce tasks not specified. Estimated from input data size: 1
未指定reduce tasks个数。从输入数据大小估计:1
2)查看设置reduce个数
hive (default)> set mapreduce.job.reduces;
3)根据部门编号降序查看员工信息
hive (default)> select * from emp sort by did desc;
4)将查询结果导入到文件中(按照部门编号降序排序)
hive (default)> insert overwrite local directory
> '/opt/module/data/sortby-result'
> select * from emp sort by deptno desc;
Distribute By: 在有些情况下,我们需要控制某个特定行应该到哪个reducer,通常是为了进行后续的聚集操作。distribute by 子句可以做这件事。
distribute by(字段)根据指定字段将数据分到不同的reducer,分发算法是hash散列。
hive (default)> set mapreduce.job.reduces=3;
hive (default)> insert overwrite local directory
> '/opt/module/hive-3.1.2/datas/distribute-result'
> select * from emp
> distribute by did
> sort by eid desc;
注意:
根据分区字段的hash值与reduce的个数取模后,余数相同的分到一个区。当distribute by和sorts by字段相同时,可以使用cluster by方式。 cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
(1)以下两种写法等价
hive (default)> select * from emp cluster by did;
hive (default)> select * from emp distribute by did sort by did;
分组的规则hash散列。hash_func(col_name) % reduce task nums
分为几组取决于reduce task的个数。
select * from student;
--不指定reduce task个数
--日志显示:Number of reduce tasks not specified. Estimated from input data size: 1
--未指定reduce tasks个数。从输入数据大小估计:1
select * from student cluster by age;
--手动设置reduce task个数
set mapreduce.job.reduces =2;
select * from student cluster by age;
select_statement UNION [ALL | DISTINCT] select_statement UNION [ALL | DISTINCT] select_statement ...
--union
--使用DISTINCT关键字与使用UNION默认值效果一样,都会删除重复行。
select num,name from student_local
UNION
select num,name from student_hdfs;
--和上面一样
select num,name from student_local
UNION DISTINCT
select num,name from student_hdfs;
--使用ALL关键字会保留重复行。
select num,name from student_local
UNION ALL
select num,name from student_hdfs;
--如果要将ORDER BY,SORT BY,CLUSTER BY,DISTRIBUTE BY或LIMIT应用于单个SELECT,请将子句放在括住SELECT的括号内
SELECT sno,sname FROM (select sno,sname from student_local LIMIT 2) subq1
UNION
SELECT sno,sname FROM (select sno,sname from student_hdfs LIMIT 3) subq2
--如果要将ORDER BY,SORT BY,CLUSTER BY,DISTRIBUTE BY或LIMIT子句应用于整个UNION结果
--请将ORDER BY,SORT BY,CLUSTER BY,DISTRIBUTE BY或LIMIT放在最后一个之后。
select sno,sname from student_local
UNION
select sno,sname from student_hdfs
order by sno desc;
FROM子句中的每个表都必须有一个名称。--from子句中子查询(Subqueries)
--子查询
SELECT num
FROM (
select num,name from student_local
) tmp;
--包含UNION ALL的子查询的示例
SELECT t3.name
FROM (
select num,name from student_local
UNION distinct
select num,name from student_hdfs
) t3;
--where子句中子查询(Subqueries)
--不相关子查询,相当于IN、NOT IN,子查询只能选择一个列。
--(1)执行子查询,其结果不被显示,而是传递给外部查询,作为外部查询的条件使用。
--(2)执行外部查询,并显示整个结果。
SELECT *
FROM student_hdfs
WHERE student_hdfs.num IN (select num from student_local limit 2);
--相关子查询,指EXISTS和NOT EXISTS子查询
--子查询的WHERE子句中支持对父查询的引用
SELECT A
FROM T1
WHERE EXISTS (SELECT B FROM T2 WHERE T1.X = T2.Y);
公用表表达式(CTE)是一个临时结果集,该结果集是从WITH子句中指定的简单查询派生而来的,该查询紧接在SELECT或INSERT关键字之前。select * from student;
--选择语句中的CTE
with q1 as (select id,name,age from student where classid=1)
select *
from q1;
-- from风格
with q1 as (select id,name,age from student where classid=1)
from q1
select *;
-- chaining CTEs 链式
with q1 as ( select * from student where classid=1),
q2 as ( select id,name,age from q1)
select * from (select name from q2) a;
-- union案例
with q1 as (select * from student where id = 95002),
q2 as (select * from student where id = 95004)
select * from q1 union all select * from q2;
drop table if exists s1;
--视图,CTAS和插入语句中的CTE
-- insert
create table s1 like student; --根据已经存在的表结构创建表
with q1 as ( select * from student where id = 95002)
from q1
insert overwrite table s1
select *;
select * from s1;
drop table if exists s2;
-- ctas
create table s2 as --根据查询结果创建表
with q1 as ( select * from student where id = 95002)
select * from q1;
select * from s2;
drop view if exists v1;
-- view
create view v1 as
with q1 as ( select * from student where id = 95002)
select * from q1;
select * from v1;
分区是指根据分区列(例如“日期day”)的值将表划分为不同分区。这样可以更快地对指定分区数据进行查询。table表目录下以子文件夹形式存在。分区列=分区值分区之间是一种递进关系,可以理解为在前一个分区的基础上继续分区。从HDFS的角度来看就是文件夹下继续划分子文件夹。把一个大的数据集根据业务需要分割成小的数据集。在查询时通过where子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多(避免了全表扫描查询,因为where语句的背后需要进行全表扫描才能过滤出结果,对于hive来说需要扫描表下面的每一个文件)。
什么是静态分区?静态分区指的是分区的字段值是由用户在加载数据的时候手动指定的。
语法如下:
load data [local] inpath '数据文件路径' into table tablename partition(分区字段='分区值'...);
dept_20200401.log
dept_20200402.log
dept_20200403.log
2)创建分区表
hive (default)> create table dept_partition(
> deptno int,dname string,loc string
> )
> partitioned by(day string)
> row format delimited fields terminated by ' ';
注意:分区字段不能是表中已经存在的字段,因为分区字段最终也会以虚拟字段的形式显示在表结构上。。
分区字段是虚拟字段,其数据并不存储在底层的文件中
3)加载数据到分区表中
(1) 数据准备
[hyj@hadoop102 datas]$ vim dept_20200401.log
10 ACCOUNTING 1700
20 RESEARCH 1800
[hyj@hadoop102 datas]$ vim dept_20200402.log
30 SALES 1900
40 OPERATIONS 1700
[hyj@hadoop102 datas]$ vim dept_20200403.log
50 TEST 2000
60 DEV 1900
(2) 加载数据
注意:分区表加载数据时,必须指定分区
hive (default)> load data local inpath
> '/opt/module/hive-3.1.2/datas/dept_20200401.log'
> into table dept_partition
> partition(day='20200401');
hive (default)> load data local inpath
> '/opt/module/hive-3.1.2/datas/dept_20200402.log' into table dept_partition
> partition(day='20200402');
hive (default)> load data local inpath
> '/opt/module/hive-3.1.2/datas/dept_20200403.log' into table dept_partition
> partition(day='20200403');
4)查询分区表中数据
单分区查询
hive (default)> select * from dept_partition where day='20200401';
多分区联合查询
hive (default)> select * from dept_partition where day='20200401'
> union
> select * from dept_partition where day='20200402'
> union
> select * from dept_partition where day='20200403';
--列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。
hive (default)> select * from dept_partition
> where day='20200401' or day='20200402' or day='20200403';
5)增加分区
ADD PARTITION会更改表元数据,但不会加载数据。如果分区位置中不存在数据,查询将不会返回任何结果。
添加单个分区
hive (default)> alter table dept_partition add partition(day='20200404');
同时添加多个分区
hive (default)> alter table dept_partition add partition(day='20200405') partition(day='20200406');
location指定在hdfs上的存储位置
ALTER TABLE dept_partition ADD PARTITION (day='20200405') location '/user/hive/warehouse/dept_partition/day=20200405' PARTITION (day='20200406') location '/user/hive/warehouse/dept_partition/day=20200406';
6)删除分区
可以使用ALTER TABLE DROP PARTITION删除表的分区。这将删除该分区的数据和元数据。
--删除分区
ALTER TABLE table_name DROP [IF EXISTS] PARTITION (day='20200406');
ALTER TABLE table_name DROP [IF EXISTS] PARTITION (day='20200406') PURGE; --直接删除数据 不进垃圾桶
删除单个分区
hive (default)> alter table dept_partition drop partition(day='20200406');
同时删除多个分区
hive (default)> alter table dept_partition drop partition(day='20200404'), partition(day='20200405');
7)查看分区表有多少分区
hive> show partitions dept_partition;
8)查看分区表结构
hive> desc formatted dept_partition;
9)重命名分区
hive (default)> alter table dept_partition partition(day='20200401') rename to partition(day='20220723');
思考: 如果一天的日志数据量也很大,如何再将数据拆分?
hive (default)> create table dept_partition2(
> deptno int,dname string,loc string
> )
> partitioned by(day string,hour string)
> row format delimited fields terminated by ' ';
hive (default)> load data local inpath
> '/opt/module/hive-3.1.2/datas/dept_20200401.log' into table dept_partition2
> partition(day='20200401',hour='12');
(2)查询分区数据
hive (default)> select * from dept_partition2 where day='20200401' and hour='12';
hive (default)> dfs -mkdir -p /user/hive/warehouse/dept_partition2/day=20200401/hour=13;
hive (default)> dfs -put /opt/module/hive-3.1.2/datas/dept_20200401.log /user/hive/warehouse/dept_partition2/day=20200401/hour=13;
查询数据(查询不到刚上传的数据)
hive (default)> select * from dept_partition2 where day='20200401' and hour='13';
Hive将每个表的分区列表信息存储在其metastore中。但是,如果将新分区直接添加到HDFS(例如通过使用hadoop fs -mkdir和hadoop fs -put命令)或从HDFS中直接删除分区文件夹,则除非用户ALTER TABLE table_name ADD/DROP PARTITION在每个新添加/删除的分区上运行命令,否则metastore(也就是Hive)将不会意识到分区信息的更改。
但是,用户可以使用修复表选项运行metastore check命令。
--修复分区
MSCK [REPAIR] TABLE table_name [ADD/DROP/SYNC PARTITIONS];
MSCK命令的默认选项是“添加分区”。使用此选项,它将把HDFS上存在但元存储中不存在的所有分区添加到元存储中。DROP PARTITIONS选项将从metastore中删除分区(此分区是已经从HDFS中删除的分区 )信息。SYNC PARTITIONS选项等效于调用ADD和DROP PARTITIONS。
如果存在大量未跟踪的分区,则可以批量运行MSCK REPAIR TABLE,以避免OOME(内存不足错误)。
执行修复命令
--add partitions可以不写 因为默认就是增加分区
hive (default)> msck repair table dept_partition2;
再次查询数据
hive (default)> select * from dept_partition2 where day='20200401' and hour='13';
--删除分区表的某一个分区文件夹
hive (default)> dfs -rm -r /user/hive/warehouse/dept_partition2/day=20200401/hour=13;
--查询发现还有分区信息,因为元数据信息没有删除
hive (default)> show partitions dept_partition2;
--使用MSCK命令进行修复
hive (default)> msck repair table dept_partition2 drop partitions;
hive (default)> dfs -mkdir -p /user/hive/warehouse/dept_partition2/day=20200401/hour=15;
hive (default)> dfs -put /opt/module/hive-3.1.2/datas/dept_20200401.log /user/hive/warehouse/dept_partition2/day=20200401/hour=15;
--删除分区表的某一个分区文件夹
hive (default)> dfs -rm -r /user/hive/warehouse/dept_partition2/day=20200401/hour=12;
--使用MSCK命令进行修复
hive (default)> msck repair table dept_partition2 sync partitions;
(2)方式二:上传数据后添加分区
上传数据
hive (default)> dfs -mkdir -p /user/hive/warehouse/dept_partition2/day=20200401/hour=14;
hive (default)> dfs -put /opt/module/hive-3.1.2/datas/dept_20200401.log /user/hive/warehouse/dept_partition2/day=20200401/hour=14;
执行添加分区
hive (default)> alter table dept_partition2 add partition(day='20200401',hour='14');
查询数据
hive (default)> select * from dept_partition2 where day='20200401' and hour='14';
(3)方式三:创建文件夹后load数据到分区
创建目录
hive (default)> dfs -mkdir -p /user/hive/warehouse/dept_partition2/day=20200401/hour=15;
加载数据
hive (default)> load data local inpath
> '/opt/module/hive-3.1.2/datas/dept_20200401.log' into table
> dept_partition2 partition(day='20200401',hour='15');
查询数据
hive (default)> select * from dept_partition2 where day='20200401' and hour='15';
--更改分区文件存储格式
ALTER TABLE table_name PARTITION (dt='2008-08-09') SET FILEFORMAT file_format;
--更改分区位置
ALTER TABLE table_name PARTITION (dt='2008-08-09') SET LOCATION "new location";
所谓动态分区指的是分区的字段值是基于查询结果自动推断出来的。核心语法就是insert+select。
关系型数据库中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置。
hive.exec.dynamic.partition=true
(2)设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
hive.exec.dynamic.partition.mode=nonstrict
关于严格模式、非严格模式,演示如下:
FROM page_view_stg pvs
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country)
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip, pvs.cnt
--在这里,country分区将由SELECT子句(即pvs.cnt)的最后一列动态创建。
--而dt分区是手动指定写死的。
--如果是nonstrict模式下,dt分区也可以动态创建。
(3)在所有执行MR的节点上,最大一共可以创建多少个动态分区。默认1000
hive.exec.max.dynamic.partitions=1000
(4)在每个执行MR的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
hive.exec.max.dynamic.partitions.pernode=100
(5)整个MR Job中,最大可以创建多少个HDFS文件。默认100000
hive.exec.max.created.files=100000
(6)当有空分区生成时,是否抛出异常。一般不需要设置。默认false
hive.error.on.empty.partition=false
hive (default)> create table dept_partition_dy(id int, name string)
> partitioned by (loc int) row format delimited fields terminated by ' ';
(2)设置动态分区 为非严格模式
hive (default)> set hive.exec.dynamic.partition.mode = nonstrict;
(3)执行动态分区插入
hive (default)> insert into table dept_partition_dy partition(loc) select did, dname, loc from dept;
(4)查看目标分区表的分区情况
hive (default)> show partitions dept_partition_dy;
Bucket 分桶表是hive的一种优化手段表。分桶是指根据表中字段(例如“编号ID”)的值,经过hash计算规则将数据文件划分成指定的若干个小文件。
分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理的分区。对于一张表或者分区,Hive 可以进一步组织成桶,也就是更为细粒度的数据范围划分。
分桶是将数据集分解成更容易管理的若干部分的另一个技术。
分区针对的是数据的存储路径;分桶针对的是数据文件。
分区表是将数据划分不同的目录进行存储,而分桶表是将数据划分不同的文件进行存储。
[hyj@hadoop102 datas]$ vim student.txt
1001 zhangsan 76
1002 lisi 88
1003 wangwu 59
1004 liuliu 99
1005 qiqi 95
1006 rongrong 37
1007 dundun 48
[hyj@hadoop102 datas]$ hadoop fs -mkdir /student
[hyj@hadoop102 datas]$ hadoop fs -put ./student.txt /student
(2)开启分桶的功能 从Hive2.0开始不再需要设置
set hive.enforce.bucketing=true;
(3)创建分桶表
hive (default)> create table stu_buck(id int,name string,grade double)
> clustered by(id)
> into 4 buckets
> row format delimited fields terminated by ' ';
clustered by(col_name)表示根据哪个字段进行分桶;
into N buckets表示分为几桶(也就是几个部分)。
分桶的字段必须是表中已经存在的字段。
在创建分桶表时,还可以指定分桶内的数据排序规则:
#根据id分为4桶 每个桶内根据grade成绩倒序排序
hive (default)> create table stu_buck(id int,name string,grade double)
> clustered by(id) sorted by(grade desc) into 4 buckets
> row format delimited fields terminated by ' ';
(4)查看表结构
hive (default)> desc formatted stu_buck;
(5)导入数据到分桶表中,load的方式
hive (default)> set mapreduce.job.reduces=-1;
hive (default)> load data inpath '/student/student.txt' into table stu_buck;
(6)查看创建的分桶表中是否分成4个桶

(7)查询分桶的数据
hive (default)> select * from stu_buck;
(8)分桶规则:
根据结果可知:Hive的分桶采用对分桶字段的值进行哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中,余数相同的分到同一个文件。
分桶表操作需要注意的事项:
(1)reduce的个数设置为-1,让Job自行决定需要用多少个reduce或者将reduce的个数设置为大于等于分桶表的桶数
(2)从hdfs中load数据到分桶表中,避免本地文件找不到问题
(3)不要使用本地模式
insert方式将数据导入分桶表
hive(default)> insert into table stu_buck select * from student_insert;
基于分桶字段查询时,不再需要进行全表扫描过滤,根据分桶的规则hash_function(id) mod 4计算出分桶编号,再查询指定分桶里面的数据就可以找出结果,此时是分桶扫描而不是全表扫描。JOIN时可以提高MR程序效率,减少笛卡尔积数量分桶表数据进行抽样当数据量过大时,我们可能需要查找数据子集以加快数据处理速度分析。 这就是抽样、采样,一种用于识别和分析数据中的子集的技术,以发现整个数据集中的模式和趋势。
在HQL中,可以通过三种方式采样数据:随机采样,存储桶表采样和块采样。
随机抽样使用rand()函数和LIMIT关键字来获取数据。 使用了DISTRIBUTE和SORT关键字,可以确保数据也随机分布在mapper和reducer之间,使得底层执行有效率。
ORDER BY 和rand()语句也可以达到相同的目的,但是表现不好。因为ORDER BY是全局排序,只会启动运行一个Reducer。
--数据表
select * from student;
--需求:随机抽取2个学生的情况进行查看
SELECT * FROM student
DISTRIBUTE BY rand() SORT BY rand() LIMIT 2;
--使用order by+rand也可以实现同样的效果 但是效率不高
SELECT * FROM student
ORDER BY rand() LIMIT 2;
Block块采样允许select随机获取n行数据,即数据大小或n个字节的数据。
采样粒度是HDFS块大小。
---block抽样
--根据行数抽样
SELECT * FROM student TABLESAMPLE(1 ROWS);
--根据数据大小百分比抽样
SELECT * FROM student TABLESAMPLE(50 PERCENT);
--根据数据大小抽样
--支持数据单位 b/B, k/K, m/M, g/G
SELECT * FROM student TABLESAMPLE(100b);
---bucket table抽样
select * from stu_buck;
--根据整行数据进行抽样
SELECT * FROM stu_buck TABLESAMPLE(BUCKET 1 OUT OF 4 ON rand());
--根据分桶字段进行抽样 效率更高
describe formatted stu_buck;
SELECT * FROM stu_buck TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);
--TABLESAMPLE (BUCKET x OUT OF y [ON colname])
--1、y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。
--例如,table总共分了4份(4个bucket),当y=2时,抽取(4/2=)2个bucket的数据,当y=8时,抽取(4/8=)1/2个bucket的数据。
--2、x表示从哪个bucket开始抽取。
--例如,table总bucket数为4,tablesample(bucket 4 out of 4),表示总共抽取(4/4=)1个bucket的数据,抽取第4个bucket的数据。
--注意:x的值必须小于等于y的值,否则FAILED:Numerator should not be bigger than denominator in sample clause for table stu_buck
--3、ON colname表示基于什么抽
--ON rand()表示随机抽
--ON 分桶字段 表示基于分桶字段抽样 效率更高 推荐
Hive本身从设计之初时,就是不支持事务的,因为Hive的核心目标是将已经存在的结构化数据文件映射成为表,然后提供基于表的SQL分析处理,是一款面向分析的工具。且映射的数据通常存储于HDFS上,而HDFS是不支持随机修改文件数据的。
这个定位就意味着在早期的Hive的SQL语法中是没有update,delete操作的,也就没有所谓的事务支持了,因为都是select查询分析操作。
从Hive0.14版本开始,具有ACID语义的事务已添加到Hive中,以解决以下场景下遇到的问题:
流式传输数据。使用如Apache Flume或Apache Kafka之类的工具将数据流式传输到Hadoop集群中。虽然这些工具可以每秒数百行或更多行的速度写入数据,但是Hive只能每隔15分钟到一个小时添加一次分区。频繁添加分区会很快导致表中大量的分区。因此通常使用这些工具将数据流式传输到现有分区中,但是这会使读者感到脏读(也就是说,他们将在开始查询后看到写入的数据),并将许多小文件留在目录中,这将给NameNode带来压力。通过事务功能,同时允许读者获得一致的数据视图并避免过多的文件。尺寸变化缓慢。在典型的星型模式数据仓库中,维度表随时间缓慢变化。例如,零售商将开设新商店,需要将其添加到商店表中,或者现有商店可能会更改其平方英尺或某些其他跟踪的特征。这些更改导致插入单个记录或更新 记录(取决于所选策略)。数据重述。有时发现收集的数据不正确,需要更正。从Hive 0.14开始,可以通过INSERT,UPDATE和 DELETE支持这些用例 。虽然Hive支持了具有ACID语义的事务,但是在使用起来,并没有像在MySQL中使用那样方便,有很多局限性。原因很简单,毕竟Hive的设计目标不是为了支持事务操作,而是支持分析操作,且最终基于HDFS的底层存储机制使得文件的增加删除修改操作需要动一些小心思。具体限制如下:
ORC文件格式(STORED AS ORC)。配置参数开启使用。分桶表(Bucketed)才可以使用事务功能。transactional必须为true;如果不做任何配置修改,直接针对Hive中已有的表进行Update、Delete、Insert操作,可以发现,只有insert语句可以执行,Update和Delete操作会报错。
Insert插入操作能够成功的原因在于,底层是直接把数据写在一个新的文件中的。
--Hive中事务表的创建使用
--1、开启事务配置(可以使用set设置当前session生效 也可以配置在hive-site.xml中)
set hive.support.concurrency = true; --Hive是否支持并发
set hive.enforce.bucketing = true; --从Hive2.0开始不再需要 是否开启分桶功能
set hive.exec.dynamic.partition.mode = nonstrict; --动态分区模式 非严格
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on = true; --是否在Metastore实例上运行启动线程和清理线程
set hive.compactor.worker.threads = 1; --在此metastore实例上运行多少个压缩程序工作线程。
--2、创建Hive事务表
create table trans_student(
id int,
name String,
age int
)clustered by (id) into 2 buckets stored as orc TBLPROPERTIES('transactional'='true');
--3、针对事务表进行insert update delete操作
insert into trans_student values (1,"allen",18);
update trans_student set age = 20 where id = 1;
delete from trans_student where id =1;
select * from trans_student;
Hive中的视图(view)是一种虚拟表,只保存定义,不实际存储数据。通常从真实的物理表查询中创建生成视图,也可以从已经存在的视图上创建新视图。[hyj@hadoop102 datas]$ vim student.txt
95001,李勇,男,20,1
95002,刘晨,女,19,1
95003,王敏,女,22,4
95004,张立,男,19,3
95005,刘刚,男,18,1
95006,孙庆,男,23,2
95007,易思玲,女,19,3
95008,李娜,女,18,4
95009,梦圆圆,女,18,3
95010,孔小涛,男,19,2
95011,包小柏,男,18,1
95012,孙花,女,20,4
95013,冯伟,男,21,2
95014,王小丽,女,19,2
95015,王君,男,18,3
95016,钱国,男,21,4
95017,王风娟,女,18,2
95018,王一,女,19,1
95019,邢小丽,女,19,3
95020,赵钱,男,21,1
95021,周二,男,17,2
95022,郑明,男,20,4
[hyj@hadoop102 datas]$ cat class.txt
1 Java班
2 C语言班
3 Python班
4 R语言班
hive (default)> drop table if exists student;
hive (default)> create table student(
> id int,name string,sex string,age int,classid int
> )
> row format delimited fields terminated by ',';
hive (default)> create table class(
> cid int,cname string
> )
> row format delimited fields terminated by ' ';
hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/student.txt' into table student;
hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/class.txt' into table class;
--1、创建视图
hive (default)> create view v_test as select * from student limit 5;
--从已有的视图中创建视图
hive (default)> create view v_from_view as select id,name,classid from v_test limit 3;
--2、显示当前已有的视图
show tables;
show views;--hive v2.2.0之后支持
--3、视图的查询使用
hive (default)> select * from v_from_view;
--4、查看视图定义
hive (default)> show create table v_from_view;
--5、删除视图
hive (default)> drop view v_from_view;
--6.更改/设置视图属性
hive (default)> alter view v_test set tblproperties('comment'='This is a view');
--7.更改视图定义
hive (default)> alter view v_test as select id,name,classid from student limit 10;
降低查询的复杂度,优化查询语句
--使用视图优化嵌套查询
hive (default)> from(
> select * from student join class on(student.classid=class.cid) where cname="Java班"
> ) a select a.name where a.age=19;
--把嵌套子查询变成一个视图
hive (default)> create view shorter_join as
> select * from student join class on(student.classid=class.cid) where classid=1; --在这里我将cname="Java班"改成classid=1,否则↓基于视图查询将会查不到数据(可能是因为字符编码的问题吧)
--基于视图查询
hive (default)> select name from shorter_join where age=19;
通过预计算,提高查询性能,当然需要占用一定的存储空间。查询自动重写(基于Apache Calcite实现)。没有存储数据。存储着预计算的数据。能够缓存数据,在创建物化视图的时候就把数据缓存起来了,hive把物化视图当成一张“表”,将数据缓存。而视图只是创建一个虚表,只有表结构,没有数据,实际查询的时候再去改写SQL去访问实际的数据表。简化降低查询的复杂度,而物化视图的目的是提高查询性能。--物化视图的创建语法
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db_name.]materialized_view_name
[DISABLE REWRITE]
[COMMENT materialized_view_comment]
[PARTITIONED ON (col_name, ...)]
[CLUSTERED ON (col_name, ...) | DISTRIBUTED ON (col_name, ...) SORTED ON (col_name, ...)]
[
[ROW FORMAT row_format]
[STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
]
[LOCATION hdfs_path]
[TBLPROPERTIES (property_name=property_value, ...)]
AS SELECT ...;
语法说明:
查询优化器optimizer查询重写(在物化视图创建期间可以通过DISABLE REWRITE参数设置禁止使用)ALTER MATERIALIZED VIEW [db_name.]materialized_view_name ENABLE|DISABLE REWRITE;
CREATE MATERIALIZED VIEW druid_wiki_mv
STORED AS 'org.apache.hadoop.hive.druid.DruidStorageHandler'
AS
SELECT __time, page, user, c_added, c_removed
FROM src;
目前支持物化视图的drop和show操作,后续会增加其他操作
---Hive 物化视图------------------------------
-- Drops a materialized view
DROP MATERIALIZED VIEW [db_name.]materialized_view_name;
-- Shows materialized views (with optional filters)
SHOW MATERIALIZED VIEWS [IN database_name];
-- Shows information about a specific materialized view
DESCRIBE [EXTENDED | FORMATTED] [db_name.]materialized_view_name;
数据源变更(新数据插入insert、数据修改modify),物化视图也需要更新以保持数据一致性,目前需要用户主动触发rebuild
ALTER MATERIALIZED VIEW [db_name.]materialized_view_name REBUILD;
物化视图创建后即可用于相关查询的加速,用户提交查询query,若该query经过重写后可命中已建视图,则被重写命中相关已建视图实现查询加速。
是否重写查询使用物化视图可以通过全局参数控制,默认为true:
set hive.materializedview.rewriting=true;
用户可选择性的对某个物化视图禁用重写:
ALTER MATERIALIZED VIEW [db_name.]materialized_view_name ENABLE|DISABLE REWRITE;
--1、新建一张事务表 student_trans
hive (default)> set hive.support.concurrency = true; --Hive是否支持并发
hive (default)> set hive.enforce.bucketing = true; --从Hive2.0开始不再需要 是否开启分桶功能
hive (default)> set hive.exec.dynamic.partition.mode = nonstrict; --动态分区模式 非严格
hive (default)> set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
hive (default)> set hive.compactor.initiator.on = true; --是否在Metastore实例上运行启动线程和清理线程
hive (default)> set hive.compactor.worker.threads = 1; --在此metastore实例上运行多少个压缩程序工作线程。
hive (default)> drop table if exists student;
hive (default)> create table student (
> id int,
> name string,
> sex string,
> dept string
> )
> row format delimited fields terminated by ',';
hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/student.txt' into table student;
hive (default)> drop table if exists student_trans;
hive (default)> create table student_trans (
> id int,
> name string,
> sex string,
> dept string)
> clustered by (id) into 2 buckets stored as orc tblproperties('transactional'='true');
--2、导入数据到事物表student_trans中
hive (default)> insert overwrite table student_trans select * from student;
--3、对student_trans建立聚合物化视图
--注意 这里当执行CREATE MATERIALIZED VIEW,会启动一个MR对物化视图进行构建
--可以发现当下的数据库中有了一个物化视图
hive (default)> create materialized view student_trans_agg
> as select dept,count(*) as dept_count from student_trans group by dept;
hive (default)> show tables;
hive (default)> show materialized views;
hive (default)> select * from student_trans_agg;
--查看物化视图详情
hive (default)> desc formatted student_trans_agg;
--4、对原始表student_trans查询
--由于会命中物化视图,重写query 查询物化视图,查询速度会加快(没有启动MR,只是普通的table scan)
hive (default)> select dept, count(*) as dept_count from student_trans group by dept;
--5、查询执行计划可以发现 查询被自动重写为TableScan alias: default.student_trans_agg
--转换成了对物化视图的查询 提高了查询效率
hive (default)> explain select dept, count(*) as dept_count from student_trans group by dept;
--禁用物化视图自动重写
hive (default)> alter materialized view student_trans_agg disable rewrite;
--对原始表student_trans查询,发现启动了MR
hive (default)> select dept, count(*) as dept_count from student_trans group by dept;
--开启物化视图自动重写
hive (default)> alter materialized view student_trans_agg enable rewrite;
--删除物化视图
hive (default)> drop materialized view student_trans_agg;
数值类型函数、日期类型函数、字符串类型函数、集合函数、条件函数等;输入输出行数进行分类,比如:UDF、UDAF、UDTF。一进一出(输入一行输出一行),例如upper函数多进一出(输入多行输出一行),例如sum函数一进多出(输入一行输出多行),比如explode函数。1)查看系统自带的函数
hive (default)> show functions;
2)显示自带的函数的用法
hive (default)> desc function size;
3)详细显示自带的函数的用法
hive (default)> desc function extended size;
NVL( value,default_value)。它的功能是如果value为NULL,则NVL函数返回default_value的值,否则返回value的值,如果两个参数都为NULL ,则返回NULL。hive (default)> select comm,nvl(comm, -1) from emp;
hive (default)> select comm, nvl(comm,mgr) from emp;
1)数据准备
[hyj@hadoop102 datas]$ vim emp_sex.txt
悟空 A 男
八戒 A 男
唐僧 B 男
王五 A 女
李四 B 女
张三 B 女
2)需求:求出不同部门男女各多少人。结果如下:
dept_Id 男 女
A 2 1
B 1 2
3)创建hive表并导入数据
hive (default)> create table emp_sex(
> name string,
> dept_id string,
> sex string)
> row format delimited fields terminated by ' ';
hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/emp_sex.txt' into table emp_sex;
4)按需求查询数据
hive (default)> select
> dept_id,
> sum(case sex when '男' then 1 else 0 end) male_count,
> sum(case sex when '女' then 1 else 0 end) female_count
> from emp_sex
> group by dept_id;
或
select
dept_id,
sum(if(sex='男',1,0)) male_count,
sum(if(sex='女',1,0)) female_count
from
emp_sex
group by dept_id;
1)相关函数说明
CONCAT(string A/col, string B/col…):返回输入字符串连接后的结果,支持任意个输入字符串;
CONCAT_WS(separator, str1, str2,…):它是一个特殊形式的 CONCAT()。第一个参数是剩余参数间的分隔符。分隔符可以是与剩余参数一样的字符串。如果分隔符是 NULL,返回值也将为 NULL。这个函数会跳过分隔符参数后的任何 NULL 。分隔符将被加到被连接的字符串之间;
注意: CONCAT_WS must be "string or array
COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生Array类型字段。 而COLLECT_LIST(col) 不进行去重.
hive (default)> select concat('a','-','b','-','c');
OK
_c0
a-b-c
Time taken: 0.087 seconds, Fetched: 1 row(s)
hive (default)> select concat_ws('-','a','b','c');
OK
_c0
a-b-c
Time taken: 0.067 seconds, Fetched: 1 row(s)
hive (default)> select concat_ws(null,'a','b','c');
OK
_c0
NULL
hive (default)> select concat_ws('-','a','b',null,'c','','d');
OK
_c0
a-b-c--d
hive (default)> select * from test;
OK
test.name test.friends test.children test.address
songsong ["bingbing","lili"] {"xiao song":18,"xiaoxiao song":19} {"street":"hui long guan","city":"beijing "}
yangyang ["caicai","susu"] {"xiao yang":18,"xiaoxiao yang":19} {"street":"chao yang","city":"beijing "}
Time taken: 0.064 seconds, Fetched: 2 row(s)
hive (default)> desc test;
OK
col_name data_type comment
name string
friends array<string>
children map<string,int>
address struct<street:string,city:string>
hive (default)> select concat_ws('-',friends) from test;
_c0
bingbing-lili
caicai-susu
hive (default)> select collect_set(id) from student;
_c0
[1,2,3,4,5,6,7]
hive (default)> select collect_list(id) from student;
[1,2,3,4,5,6,7,1,2,3,4,5,6,7]
2)创建本地constellation.txt,导入数据
[hyj@hadoop102 datas]$ vim person_info.txt
孙悟空 白羊座 A
诸葛亮 射手座 A
刘备 白羊座 B
猪八戒 白羊座 A
玉皇大帝 射手座 A
唐僧 白羊座 B
3)创建hive表并导入数据
hive (default)> create table person_info(
> name string,
> constellation string,
> blood_type string)
> row format delimited fields terminated by ' ';
OK
Time taken: 0.089 seconds
hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/person_info.txt' into table person_info;
4)查询数据:把星座和血型一样的人归类到一起
hive (default)> select
> t1.c_b,
> concat_ws("|",collect_set(t1.name))
> from(
> select
> name,
> concat_ws(',',constellation,blood_type) c_b
> from person_info
> ) t1
> group by t1.c_b;
射手座,A 诸葛亮|玉皇大帝
白羊座,A 孙悟空|猪八戒
白羊座,B 刘备|唐僧
1)函数说明
爆炸函数”,可以炸开数据。Array或者Map结构拆分成多行。LATERAL VIEW udtf函数名(expression) tableAlias AS columnAlias 
一般只要使用UDTF,就会固定搭配lateral view使用。
hive (default)> select * from test;
OK
test.name test.friends test.children test.address
songsong ["bingbing","lili"] {"xiao song":18,"xiaoxiao song":19} {"street":"hui long guan","city":"beijing "}
yangyang ["caicai","susu"] {"xiao yang":18,"xiaoxiao yang":19} {"street":"chao yang","city":"beijing "}
Time taken: 0.071 seconds, Fetched: 2 row(s)
hive (default)> select explode(friends) from test;
OK
col
bingbing
lili
caicai
susu
hive (default)> select explode(children) from test;
OK
key value
xiao song 18
xiaoxiao song 19
xiao yang 18
xiaoxiao yang 19
2)数据准备

3)需求 将电影分类中的数组数据展开。结果如下:
《疑犯追踪》 悬疑
《疑犯追踪》 动作
《疑犯追踪》 科幻
《疑犯追踪》 剧情
《Lie to me》 悬疑
《Lie to me》 警匪
《Lie to me》 动作
《Lie to me》 心理
《Lie to me》 剧情
《战狼2》 战争
《战狼2》 动作
《战狼2》 灾难
4)创建本地movie.txt,导入数据
[hyj@hadoop102 datas]$ vim movie.txt
《疑犯追踪》 悬疑,动作,科幻,剧情
《Lie to me》 悬疑,警匪,动作,心理,剧情
《战狼2》 战争,动作,灾难
5)创建hive表并导入数据
hive (default)> create table movie_info(
> movie string,
> category string)
> row format delimited fields terminated by '\t';
hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/movie.txt' into table movie_info;
hive (default)> select split(category,',') from movie_info;
OK
_c0
["悬疑","动作","科幻","剧情 "]
["悬疑","警匪","动作","心理","剧情 "]
["战争","动作","灾难 "]
Time taken: 0.09 seconds, Fetched: 3 row(s)
hive (default)> select explode(split(category,',')) from movie_info;
OK
col
悬疑
动作
科幻
剧情
悬疑
警匪
动作
心理
剧情
战争
动作
灾难
--想法是正确的 sql执行却是错误的
hive (default)>select movie,explode(split(category,',')) from movie_info;
-- lateral view + explode
hive (default)> select
> movie,
> category_name
> from
> movie_info
> lateral view
> explode(split(category,',')) movie_info_tmp as category_name;
《疑犯追踪》 悬疑
《疑犯追踪》 动作
《疑犯追踪》 科幻
《疑犯追踪》 剧情
《Lie to me》 悬疑
《Lie to me》 警匪
《Lie to me》 动作
《Lie to me》 心理
《Lie to me》 剧情
《战狼2》 战争
《战狼2》 动作
《战狼2》 灾难
movie_info_tmp侧写表的别名
category_name explode(split(category,',')) 字段的别名
-------窗口函数语法树
Function(arg1,..., argn) OVER ([PARTITION BY <...>] [ORDER BY <....>] [<window_expression>])
--其中Function(arg1,..., argn) 可以是下面分类中的任意一个
--聚合函数:比如sum max avg等
--排序函数:比如rank row_number等
--分析函数:比如lead lag first_value等
--OVER [PARTITION BY <...>] 类似于group by 用于指定分组 每个分组你可以把它叫做窗口
--如果没有PARTITION BY 那么整张表的所有行就是一组
--[ORDER BY <....>] 用于指定每个分组内的数据排序规则 支持ASC、DESC
--[] 用于指定每个窗口中 操作的数据范围 默认是窗口中所有行

LAG(col,n,default_val):往前第n行数据
LEAD(col,n, default_val):往后第n行数据
#name,orderdate,cost
[hyj@hadoop102 datas]$ vim business.txt
jack,2017-01-01,10
tony,2017-01-02,15
jack,2017-02-03,23
tony,2017-01-04,29
jack,2017-01-05,46
jack,2017-04-06,42
tony,2017-01-07,50
jack,2017-01-08,55
mart,2017-04-08,62
mart,2017-04-09,68
neil,2017-05-10,12
mart,2017-04-11,75
neil,2017-06-12,80
mart,2017-04-13,94
[hyj@hadoop102 datas]$ vim employee.txt
1201,gopal,manager,50000,TP
1202,manisha,cto,50000,TP
1203,khalil,dev,30000,AC
1204,prasanth,dev,30000,AC
1206,kranthi,admin,20000,TP
#字段含义:cookieid 、访问时间、pv数(页面浏览数)
[hyj@hadoop102 datas]$ vim website_pv_info.txt
cookie1,2018-04-10,1
cookie1,2018-04-11,5
cookie1,2018-04-12,7
cookie1,2018-04-13,3
cookie1,2018-04-14,2
cookie1,2018-04-15,4
cookie1,2018-04-16,4
cookie2,2018-04-10,2
cookie2,2018-04-11,3
cookie2,2018-04-12,5
cookie2,2018-04-13,6
cookie2,2018-04-14,3
cookie2,2018-04-15,9
cookie2,2018-04-16,7
#字段含义:cookieid、访问时间、访问页面url
[hyj@hadoop102 datas]$ vim website_url_info.txt
cookie1,2018-04-10 10:00:02,url2
cookie1,2018-04-10 10:00:00,url1
cookie1,2018-04-10 10:03:04,1url3
cookie1,2018-04-10 10:50:05,url6
cookie1,2018-04-10 11:00:00,url7
cookie1,2018-04-10 10:10:00,url4
cookie1,2018-04-10 10:50:01,url5
cookie2,2018-04-10 10:00:02,url22
cookie2,2018-04-10 10:00:00,url11
cookie2,2018-04-10 10:03:04,1url33
cookie2,2018-04-10 10:50:05,url66
cookie2,2018-04-10 11:00:00,url77
cookie2,2018-04-10 10:10:00,url44
cookie2,2018-04-10 10:50:01,url55
这里以sum()函数为例,其他聚合函数使用类似。
--建表加载数据
CREATE TABLE employee(
id int,
name string,
deg string,
salary int,
dept string
) row format delimited
fields terminated by ',';
load data local inpath '/opt/module/hive-3.1.2/datas/employee.txt' into table employee;
select * from employee;
----sum+group by普通常规聚合操作------------
select dept,sum(salary) as total from employee group by dept;
----sum+窗口函数聚合操作------------
select id,name,deg,salary,dept,sum(salary) over(partition by dept) as total from employee;
-------------------
---建表并且加载数据
create table website_pv_info(
cookieid string,
createtime string, --day
pv int
) row format delimited
fields terminated by ',';
create table website_url_info (
cookieid string,
createtime string, --访问时间
url string --访问页面
) row format delimited
fields terminated by ',';
load data local inpath '/opt/module/hive-3.1.2/datas/website_pv_info.txt' into table website_pv_info;
load data local inpath '/opt/module/hive-3.1.2/datas/website_url_info.txt' into table website_url_info;
select * from website_pv_info;
select * from website_url_info;
-----窗口聚合函数的使用-----------
--1、求出每个用户总pv数 sum+group by普通常规聚合操作
select cookieid,sum(pv) as total_pv from website_pv_info group by cookieid;
--2、sum+窗口函数 总共有四种用法 注意是整体聚合 还是累积聚合
--sum(...) over( )对表所有行求和
--sum(...) over( order by ... ) 连续累积求和
--sum(...) over( partition by... ) 同组内所行求和
--sum(...) over( partition by... order by ... ) 在每个分组内,连续累积求和
--需求:求出网站总的pv数 所有用户所有访问加起来
--sum(...) over( )对表所有行求和
select cookieid,createtime,pv,
sum(pv) over() as total_pv --注意这里窗口函数是没有partition by 也就是没有分组 全表所有行
from website_pv_info;
--需求:求出每个用户总pv数
--sum(...) over( partition by... ),同组内所行求和
select cookieid,createtime,pv,
sum(pv) over(partition by cookieid) as total_pv
from website_pv_info;
--需求:求出每个用户截止到当天,累积的总pv数
--sum(...) over( partition by... order by ... ),在每个分组内,连续累积求和
select cookieid,createtime,pv,
sum(pv) over(partition by cookieid order by createtime) as current_total_pv
from website_pv_info;
我们知道,在sum(…) over( partition by… order by … )语法完整的情况下,进行的累积聚合操作,默认累积聚合行为是:从第一行聚合到当前行。
Window expression窗口表达式给我们提供了一种控制行范围的能力,比如向前2行,向后3行。
语法如下:
关键字是rows between,包括下面这几个选项:
CURRENT ROW:当前行n PRECEDING:往前n行数据n FOLLOWING:往后n行数据UNBOUNDED:边界UNBOUNDED PRECEDING 表示从前面的起点UNBOUNDED FOLLOWING表示到后面的终点---窗口表达式
select cookieid,createtime,pv,
sum(pv) over(partition by cookieid order by createtime) as pv1 --默认从第一行到当前行
from website_pv_info;
--第一行到当前行
select cookieid,createtime,pv,
sum(pv) over(partition by cookieid order by createtime rows between unbounded preceding and current row) as pv2
from website_pv_info;
--向前3行至当前行
select cookieid,createtime,pv,
sum(pv) over(partition by cookieid order by createtime rows between 3 preceding and current row) as pv4
from website_pv_info;
--向前3行 向后1行
select cookieid,createtime,pv,
sum(pv) over(partition by cookieid order by createtime rows between 3 preceding and 1 following) as pv5
from website_pv_info;
--当前行至最后一行
select cookieid,createtime,pv,
sum(pv) over(partition by cookieid order by createtime rows between current row and unbounded following) as pv6
from website_pv_info;
--第一行到最后一行 也就是分组内的所有行
select cookieid,createtime,pv,
sum(pv) over(partition by cookieid order by createtime rows between unbounded preceding and unbounded following) as pv6
from website_pv_info;
窗口排序函数用于给每个分组内的数据打上排序的标号。注意窗口排序函数不支持窗口表达式。
row_number:在每个分组中,为每行分配一个从1开始的唯一序列号,递增,不考虑重复;
rank: 在每个分组中,为每行分配一个从1开始的序列号,考虑重复,挤占后续位置;
dense_rank: 在每个分组中,为每行分配一个从1开始的序列号,考虑重复,不挤占后续位置;

-----窗口排序函数
SELECT
cookieid,
createtime,
pv,
RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn1,
DENSE_RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn2,
ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY pv DESC) AS rn3
FROM website_pv_info
WHERE cookieid = 'cookie1';
--需求:找出每个用户访问pv最多的Top3 重复并列的不考虑
SELECT * from
(SELECT
cookieid,
createtime,
pv,
ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY pv DESC) AS seq
FROM website_pv_info) tmp where tmp.seq <4;
还有一个函数,叫做ntile(n)函数,其功能为:将每个分组内的数据分为指定的若干个桶里(分为若干个部分),并且为每一个桶分配一个桶编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。
如果不能平均分配,则优先分配较小编号的桶,并且各个桶中能放的行数最多相差1。
--把每个分组内的数据分为3桶
SELECT
cookieid,
createtime,
pv,
NTILE(3) OVER(PARTITION BY cookieid ORDER BY createtime) AS rn2
FROM website_pv_info
ORDER BY cookieid,createtime;

--需求:统计每个用户pv数最多的前3分之1天。
--理解:将数据根据cookieid分 根据pv倒序排序 排序之后分为3个部分 取第一部分
SELECT * from
(SELECT
cookieid,
createtime,
pv,
NTILE(3) OVER(PARTITION BY cookieid ORDER BY pv DESC) AS rn
FROM website_pv_info) tmp where rn =1;
LAG(col,n,DEFAULT) 用于统计窗口内往上第n行值
第一个参数为列名,第二个参数为往上第n行(可选,默认为1),第三个参数为默认值(当往上第n行为NULL时候,取默认值,如不指定,则为NULL);
LEAD(col,n,DEFAULT) 用于统计窗口内往下第n行值
第一个参数为列名,第二个参数为往下第n行(可选,默认为1),第三个参数为默认值(当往下第n行为NULL时候,取默认值,如不指定,则为NULL);
FIRST_VALUE 取分组内排序后,截止到当前行,第一个值;
LAST_VALUE 取分组内排序后,截止到当前行,最后一个值;
-----------窗口分析函数----------
--LAG
SELECT cookieid,
createtime,
url,
ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,
LAG(createtime,1,'1970-01-01 00:00:00') OVER(PARTITION BY cookieid ORDER BY createtime) AS last_1_time,
LAG(createtime,2) OVER(PARTITION BY cookieid ORDER BY createtime) AS last_2_time
FROM website_url_info;
--LEAD
SELECT cookieid,
createtime,
url,
ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,
LEAD(createtime,1,'1970-01-01 00:00:00') OVER(PARTITION BY cookieid ORDER BY createtime) AS next_1_time,
LEAD(createtime,2) OVER(PARTITION BY cookieid ORDER BY createtime) AS next_2_time
FROM website_url_info;
--FIRST_VALUE
SELECT cookieid,
createtime,
url,
ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,
FIRST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime) AS first1
FROM website_url_info;
--LAST_VALUE
SELECT cookieid,
createtime,
url,
ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,
LAST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime) AS last1
FROM website_url_info;
hive (default)> create table business(
> name string,
> orderdate string,
> cost int
> )
> row format delimited fields terminated by ',';
OK
Time taken: 0.047 seconds
hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/business.txt' into table business;
substring(str, pos[, len])
如:substring(“hellohive”,2,3)从2位置开始截取3长度
注意:下标是从1开始的.
# 写成hive (default)> select substring(orderdate,0,7) from business;结果也一样
hive (default)> select substring(orderdate,1,7) from business;
2017-01
2017-01
2017-02
2017-01
2017-01
2017-04
2017-01
2017-01
2017-04
2017-04
2017-05
2017-04
2017-06
2017-04
//查询在2017年4月份购买过的顾客
hive (default)> select
> distinct(name)
> from business
> where substring(orderdate,0,7)='2017-04';
name
jack
mart
hive (default)> select
> name
> from business
> where substring(orderdate,0,7)='2017-04'
> group by name;
name
jack
mart
//查询在2017年4月份购买过的总人数
hive (default)> select
> count(distinct(name))
> from business
> where substring(orderdate,1,7)='2017-04';
_c0
2
(1) 查询在2017年4月份购买过的顾客及总人数
方法1:用笛卡尔积
方法2:
hive (default)> select
> name,
> count(*) over()
> from business
> where substring(orderdate,1,7)='2017-04'
> group by name;
name count_window_0
mart 2
jack 2
//计算顾客的购买明细及购买总额
hive (default)> select
> name,
> orderdate,
> cost,
> sum(cost) over(partition by name)
> from business;
name orderdate cost sum_window_0
jack 2017-01-05 46 176
jack 2017-01-08 55 176
jack 2017-01-01 10 176
jack 2017-04-06 42 176
jack 2017-02-03 23 176
mart 2017-04-13 94 299
mart 2017-04-11 75 299
mart 2017-04-09 68 299
mart 2017-04-08 62 299
neil 2017-05-10 12 92
neil 2017-06-12 80 92
tony 2017-01-04 29 94
tony 2017-01-02 15 94
tony 2017-01-07 50 94
(2) 查询顾客的购买明细及月购买总额
hive (default)> select
> name,
> orderdate,
> cost,
> sum(cost) over(partition by month(orderdate))
> from business;
name orderdate cost sum_window_0
jack 2017-01-01 10 205
jack 2017-01-08 55 205
tony 2017-01-07 50 205
jack 2017-01-05 46 205
tony 2017-01-04 29 205
tony 2017-01-02 15 205
jack 2017-02-03 23 23
mart 2017-04-13 94 341
jack 2017-04-06 42 341
mart 2017-04-11 75 341
mart 2017-04-09 68 341
mart 2017-04-08 62 341
neil 2017-05-10 12 12
neil 2017-06-12 80 80
(3) 将每个顾客的cost按照日期进行累加
hive (default)> select
> name,
> orderdate,
> cost,
> sum(cost) over(partition by name order by orderdate)
> from business;
name orderdate cost sum_window_0
jack 2017-01-01 10 10
jack 2017-01-05 46 56
jack 2017-01-08 55 111
jack 2017-02-03 23 134
jack 2017-04-06 42 176
mart 2017-04-08 62 62
mart 2017-04-09 68 130
mart 2017-04-11 75 205
mart 2017-04-13 94 299
neil 2017-05-10 12 12
neil 2017-06-12 80 92
tony 2017-01-02 15 15
tony 2017-01-04 29 44
tony 2017-01-07 50 94
select name,orderdate,cost,
sum(cost) over() as sample1,--所有行相加
sum(cost) over(partition by name) as sample2,--按name分组,组内数据相加
sum(cost) over(partition by name order by orderdate) as sample3,--按name分组,组内数据累加
sum(cost) over(partition by name order by orderdate rows between UNBOUNDED PRECEDING and current row ) as sample4 ,--和sample3一样,由起点到当前行的聚合
sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING and current row) as sample5, --当前行和前面一行做聚合
sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING AND 1 FOLLOWING ) as sample6,--当前行和前边一行及后面一行
sum(cost) over(partition by name order by orderdate rows between current row and UNBOUNDED FOLLOWING ) as sample7 --当前行及后面所有行
from business;
rows必须跟在order by 子句之后,对排序的结果进行限制,使用固定的行数来限制分区中的数据行数量
(4)排序值相同时说明
排序值相同,则窗口也一样.(例如下面的两个3都开4个窗)
[hyj@hadoop102 datas]$ vim num.txt
1
2
3
3
4
4
5
6
hive (default)> create table num(id int);
OK
Time taken: 0.053 seconds
hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/num.txt' into table num;
hive (default)> select id,sum(id) over(order by id) from num;
id sum_window_0
1 1
2 3
3 9
3 9
4 17
4 17
5 22
6 28
(5) 查询每个顾客上次的购买时间(将上1行移下来)
hive (default)> select
> name,
> orderdate,
> lag(orderdate,1) over(partition by name order by orderdate)
> from business;
name orderdate lag_window_0
jack 2017-01-01 NULL
jack 2017-01-05 2017-01-01
jack 2017-01-08 2017-01-05
jack 2017-02-03 2017-01-08
jack 2017-04-06 2017-02-03
mart 2017-04-08 NULL
mart 2017-04-09 2017-04-08
mart 2017-04-11 2017-04-09
mart 2017-04-13 2017-04-11
neil 2017-05-10 NULL
neil 2017-06-12 2017-05-10
tony 2017-01-02 NULL
tony 2017-01-04 2017-01-02
tony 2017-01-07 2017-01-04
(5) 查询每个顾客上2次的购买时间(lag第3个参数代表默认值)
hive (default)> select
> name,
> orderdate,
> lag(orderdate,2,orderdate) over(partition by name order by orderdate)
> from business;
name orderdate lag_window_0
jack 2017-01-01 2017-01-01
jack 2017-01-05 2017-01-05
jack 2017-01-08 2017-01-01
jack 2017-02-03 2017-01-05
jack 2017-04-06 2017-01-08
mart 2017-04-08 2017-04-08
mart 2017-04-09 2017-04-09
mart 2017-04-11 2017-04-08
mart 2017-04-13 2017-04-09
neil 2017-05-10 2017-05-10
neil 2017-06-12 2017-06-12
tony 2017-01-02 2017-01-02
tony 2017-01-04 2017-01-04
tony 2017-01-07 2017-01-02
(6)查询顾客下一次的购买时间
hive (default)> select
> name,
> orderdate,
> lead(orderdate,1,'2022-6-2') over(partition by name order by orderdate)
> from business;
name orderdate lead_window_0
jack 2017-01-01 2017-01-05
jack 2017-01-05 2017-01-08
jack 2017-01-08 2017-02-03
jack 2017-02-03 2017-04-06
jack 2017-04-06 2022-6-2
mart 2017-04-08 2017-04-09
mart 2017-04-09 2017-04-11
mart 2017-04-11 2017-04-13
mart 2017-04-13 2022-6-2
neil 2017-05-10 2017-06-12
neil 2017-06-12 2022-6-2
tony 2017-01-02 2017-01-04
tony 2017-01-04 2017-01-07
tony 2017-01-07 2022-6-2
(7) 查询前20%时间的订单信息 (分成5个组取第1组)
hive (default)> select * from(
> select name,orderdate,cost,ntile(5) over(order by orderdate) groupId from business
> ) t
> where t.groupId=1;
t.name t.orderdate t.cost t.groupid
jack 2017-01-01 10 1
tony 2017-01-02 15 1
tony 2017-01-04 29 1
RANK() 排序相同时会重复,总数不会变 DENSE_RANK() 排序相同时会重复,总数会减少 ROW_NUMBER() 会根据顺序计算 [hyj@hadoop102 datas]$ vim score.txt
孙悟空 语文 87
孙悟空 数学 95
孙悟空 英语 68
八戒 语文 94
八戒 数学 56
八戒 英语 84
唐僧 语文 64
唐僧 数学 86
唐僧 英语 84
沙僧 语文 65
沙僧 数学 85
沙僧 英语 78
hive (default)> create table score(
> name string,
> subject string,
> score int)
> row format delimited fields terminated by ' ';
hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/score.txt' into table score;
hive (default)> select *,rank() over(order by score) from score;
score.name score.subject score.score rank_window_0
八戒 数学 56 1
唐僧 语文 64 2
沙僧 语文 65 3
孙悟空 英语 68 4
沙僧 英语 78 5
唐僧 英语 84 6
八戒 英语 84 6
沙僧 数学 85 8
唐僧 数学 86 9
孙悟空 语文 87 10
八戒 语文 94 11
孙悟空 数学 95 12
hive (default)> select *,dense_rank() over(order by score) from score;
score.name score.subject score.score dense_rank_window_0
八戒 数学 56 1
唐僧 语文 64 2
沙僧 语文 65 3
孙悟空 英语 68 4
沙僧 英语 78 5
唐僧 英语 84 6
八戒 英语 84 6
沙僧 数学 85 7
唐僧 数学 86 8
孙悟空 语文 87 9
八戒 语文 94 10
孙悟空 数学 95 11
hive (default)> select *,row_number() over(order by score) from score;
score.name score.subject score.score row_number_window_0
八戒 数学 56 1
唐僧 语文 64 2
沙僧 语文 65 3
孙悟空 英语 68 4
沙僧 英语 78 5
唐僧 英语 84 6
八戒 英语 84 7
沙僧 数学 85 8
唐僧 数学 86 9
孙悟空 语文 87 10
八戒 语文 94 11
孙悟空 数学 95 12
```sql
hive (default)> select *,rank() over(partition by subject order by score) from score;
score.name score.subject score.score rank_window_0
八戒 数学 56 1
沙僧 数学 85 2
唐僧 数学 86 3
孙悟空 数学 95 4
孙悟空 英语 68 1
沙僧 英语 78 2
唐僧 英语 84 3
八戒 英语 84 3
唐僧 语文 64 1
沙僧 语文 65 2
孙悟空 语文 87 3
八戒 语文 94 4
hive (default)> select
> name,
> subject,
> score,
> rank() over(partition by subject order by score desc) rp,
> dense_rank() over(partition by subject order by score desc) drp,
> row_number() over(partition by subject order by score desc) rmp
> from score;
name subject score rp drp rmp
孙悟空 数学 95 1 1 1
唐僧 数学 86 2 2 2
沙僧 数学 85 3 3 3
八戒 数学 56 4 4 4
唐僧 英语 84 1 1 1
八戒 英语 84 1 1 2
沙僧 英语 78 3 2 3
孙悟空 英语 68 4 3 4
八戒 语文 94 1 1 1
孙悟空 语文 87 2 2 2
沙僧 语文 65 3 3 3
唐僧 语文 64 4 4 4
hive (default)> select
> name,
> subject,
> score
> from
> (select
> *,
> rank() over(partition by subject order by score desc) rk
> from score) t
> where rk<=3;
name subject score
孙悟空 数学 95
唐僧 数学 86
沙僧 数学 85
八戒 英语 84
唐僧 英语 84
沙僧 英语 78
八戒 语文 94
孙悟空 语文 87
沙僧 语文 65
unix_timestamp:返回当前或指定时间的时间戳
select unix_timestamp();
select unix_timestamp("2020-10-28",'yyyy-MM-dd');
--获取当前日期: current_date
select current_date();
--获取当前时间戳: current_timestamp
--同一查询中对current_timestamp的所有调用均返回相同的值。
select current_timestamp(); -- 2022-07-27 16:21:44.245
--获取当前UNIX时间戳函数: unix_timestamp
select unix_timestamp(); --1658910144
--UNIX时间戳转日期函数: from_unixtime
select from_unixtime(1658910144); -- 2022-07-27 08:22:24
select from_unixtime(1658910144, 'yyyy/MM/dd HH:mm:ss'); -- 2022/07/27 08:22:24
--日期转UNIX时间戳函数: unix_timestamp
select unix_timestamp("2022-07-27 08:22:24"); --1658910144
--指定格式日期转UNIX时间戳函数: unix_timestamp
select unix_timestamp('20111207 13:01:03','yyyyMMdd HH:mm:ss');
to_date:抽取日期部分
select to_date('2020-10-28 12:12:12');
year:获取年
select year('2020-10-28 12:12:12');
month:获取月
select month('2020-10-28 12:12:12');
day:获取日
select day('2020-10-28 12:12:12');
hour:获取时
select hour('2020-10-28 12:12:12');
minute:获取分
select minute('2020-10-28 12:12:12');
second:获取秒
select second('2020-10-28 12:12:12');
weekofyear:当前时间是一年中的第几周
select weekofyear('2020-10-28 12:12:12');
dayofmonth:当前时间是一个月中的第几天
select dayofmonth('2020-10-28 12:12:12');
months_between: 两个日期间的月份
select months_between('2020-04-01','2020-10-28'); -- -6.87096774
add_months:日期加减月
select add_months('2020-10-28',-3);
datediff:两个日期相差的天数
select datediff('2020-11-04','2020-10-28');
date_add:日期加天数
select date_add('2020-10-28',4);
date_sub:日期减天数
select date_sub('2020-10-28',-4);
last_day:日期的当月的最后一天
select last_day('2020-02-30');
date_format(): 格式化日期
select date_format('2020-10-28 12:12:12','yyyy/MM/dd HH:mm:ss');
--取整函数: round 返回double类型的整数值部分 (遵循四舍五入)
select round(3.1415926);
--指定精度取整函数: round(double a, int d) 返回指定精度d的double类型
select round(3.1415926,4); --3.1416
--向下取整函数: floor
select floor(3.1415926); --3
select floor(-3.1415926);
--向上取整函数: ceil
select ceil(3.1415926);
select ceil(-3.1415926);
--取随机数函数: rand 每次执行都不一样 返回一个0到1范围内的随机数
select rand();
--指定种子取随机数函数: rand(int seed) 得到一个稳定的随机数序列
select rand(2);
--二进制函数: bin(BIGINT a)
select bin(18);
--进制转换函数: conv(BIGINT num, int from_base, int to_base)
select conv(17,10,2);
--绝对值函数: abs
select abs(-3.9);
upper: 转大写
select upper('low');
lower: 转小写
select lower('low');
length: 长度
select length("atguigu");
trim: 前后去空格
select trim(" atguigu ");
lpad: 向左补齐,到指定长度
rpad: 向右补齐,到指定长度
hive (default)> select lpad('spark',7,'h');
OK
_c0
hhspark
Time taken: 0.479 seconds, Fetched: 1 row(s)
hive (default)> select rpad('spark',7,'h');
OK
_c0
sparkhh
regexp_replace:使用正则表达式匹配目标字符串,匹配成功后替换!
SELECT regexp_replace('2020/10/25', '/', '-');
------------String Functions 字符串函数------------
describe function extended find_in_set;
--字符串长度函数:length(str | binary)
select length("angelababy");
--字符串反转函数:reverse
select reverse("angelababy");
--字符串连接函数:concat(str1, str2, ... strN)
select concat("angela","baby");
--带分隔符字符串连接函数:concat_ws(separator, [string | array(string)]+)
select concat_ws('.', 'www', array('it', 'cn'));
--字符串截取函数:substr(str, pos[, len]) 或者 substring(str, pos[, len])
select substr("angelababy",-2); --pos是从1开始的索引,如果为负数则倒着数
select substr("angelababy",2,2);
select substring("hello",-2) -lo
--字符串转大写函数:upper,ucase
select upper("angelababy");
select ucase("angelababy");
--字符串转小写函数:lower,lcase
select lower("ANGELABABY");
select lcase("ANGELABABY");
--去空格函数:trim 去除左右两边的空格
select trim(" angelababy ");
--左边去空格函数:ltrim
select ltrim(" angelababy ");
--右边去空格函数:rtrim
select rtrim(" angelababy ");
--正则表达式替换函数:regexp_replace(str, regexp, rep)
select regexp_replace('100-200', '\\d+', 'num');
--正则表达式解析函数:regexp_extract(str, regexp[, idx]) 提取正则匹配到的指定组内容
select regexp_extract('100-200', '(\\d+)-(\\d+)', 2);
--URL解析函数:parse_url 注意要想一次解析出多个 可以使用parse_url_tuple这个UDTF函数
select parse_url('https://www.bilibili.com/', 'HOST');
--json解析函数:get_json_object
--空格字符串函数:space(n) 返回指定个数空格
select space(4);
--重复字符串函数:repeat(str, n) 重复str字符串n次
select repeat("angela",2);
--首字符ascii函数:ascii
select ascii("angela"); --a对应ASCII 97
--左补足函数:lpad
select lpad('hi', 5, '??'); --???hi
select lpad('hi', 1, '??'); --h
--右补足函数:rpad
select rpad('hi', 5, '??'); --hi???
select rpad('hi', 1, '??'); --h
--分割字符串函数: split(str, regex)
select split('apache hive', '\\s+');
--集合查找函数: find_in_set(str,str_array)
select find_in_set('a','abc,b,ab,c,def');
--size: 集合中元素的个数 size(Map) size(Array)
select size(friends) from test;
-- map_keys: 返回map中的key
select map_keys(children) from test;
-- map_values: 返回map中的value
select map_values(children) from test;
-- array_contains: 判断array中是否包含某个元素 array_contains(Array, value)
select array_contains(friends,'bingbing') from test;
-- sort_array: 将array中的元素排序
select sort_array(friends) from test;
--数组排序函数:sort_array(Array)
select sort_array(array(12,2,32));
select * from student limit 3;
describe function extended isnull;
--if条件判断: if(boolean testCondition, T valueTrue, T valueFalseOrNull)
select if(1=2,100,200);
select if(sex ='男','male','female') from student limit 3;
--空判断函数: isnull( a )
select isnull("allen");
select isnull(null); --true
--非空判断函数: isnotnull ( a )
select isnotnull("allen");
select isnotnull(null);
--空值转换函数: nvl(T value, T default_value)
select nvl("allen","it");
select nvl(null,"it");
--非空查找函数: COALESCE(T v1, T v2, ...) coalesce
--返回参数中的第一个非空值;如果所有值都为NULL,那么返回NULL
select COALESCE(null,11,22,33);
select COALESCE(null,null,null,33);
select COALESCE(null,null,null);
--条件转换函数: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END
select case 100 when 50 then 'tom' when 100 then 'mary' else 'tim' end; --mary
select case sex when '男' then 'male' else 'female' end from student limit 3;
--nullif( a, b ):
-- 如果a = b,则返回NULL,否则返回第一个数
select nullif(11,11); --NULL
select nullif(13,12); --13
--assert_true(condition)
--如果'condition'不为真,则引发异常,否则返回null
SELECT assert_true(11 >= 0); --NULL
SELECT assert_true(-1 >= 0); --异常
主要完成对数据脱敏转换功能,屏蔽原始数据。
desc function extended mask;
--mask
--将查询回的数据,大写字母转换为X,小写字母转换为x,数字转换为n。
select mask("abc123DEF"); --xxxnnnXXX
--自定义替换的字母 mask(value, upperChar, lowerChar,digitChar)
select mask("abc123DEF",'-','.','^'); -- ...^^^---
--mask_first_n(string str[, int n]
--对前n个进行脱敏替换
select mask_first_n("abc123DEF",4);
--mask_last_n(string str[, int n])
select mask_last_n("abc123DEF",4);
--mask_show_first_n(string str[, int n])
--除了前n个字符,其余进行掩码处理
select mask_show_first_n("abc123DEF",4);
--mask_show_last_n(string str[, int n])
select mask_show_last_n("abc123DEF",4);
--mask_hash(string|char|varchar str)
--返回字符串的hash编码。
select mask_hash("abc123DEF");
--hive调用java方法: java_method(class, method[, arg1[, arg2..]])
select java_method("java.lang.Math","max",11,22);
--反射函数: reflect(class, method[, arg1[, arg2..]])
select reflect("java.lang.Math","max",11,22);
--取哈希值函数:hash
select hash("allen");
select current_user();
select logged_in_user();
select current_database();
--查看hive的版本
select version();
--SHA-1加密: sha1(string/binary)
select sha1("allen");
--SHA-2家族算法加密:sha2(string/binary, int) (SHA-224, SHA-256, SHA-384, SHA-512)
select sha2("allen",224);
select sha2("allen",512);
--crc32加密:
select crc32("allen");
--MD5加密: md5(string/binary)
select md5("allen");
****处理。****处理create [temporary] function [dbname.]function_name AS class_name;drop [temporary] function [if exists] [dbname.]function_name;<dependencies>
<dependency>
<groupId>org.apache.hivegroupId>
<artifactId>hive-execartifactId>
<version>3.1.2version>
dependency>
dependencies>
3)业务代码
package com.hyj.UDF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @description: hive自定义函数UDF 实现对手机号中间4位进行****加密
* @author: hyj
* 自定义 UDF 函数,需要继承 GenericUDF 类
*/
public class EncryptPhoneNumber extends GenericUDF {
/**
*
* @param arguments 输入参数类型的鉴别器对象
* @return 返回值类型的鉴别器对象
*/
//initialize()方法会被输入的每个参数调用,在这个方法中对参数的类型进行校验,参数类型包括个数、数据类型等等.
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
//判断输入参数的个数
if(arguments.length!=1){
throw new UDFArgumentLengthException("Input Args Length Error!!!");
}
//判断输入参数的类型(是否是基本类型)
//Category共定义了5种类型:基本类型(Primitive),集合(List),键值对映射(Map),结构体(Struct),联合体(Union)
if(!arguments[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
throw new UDFArgumentTypeException(0,"Input Args Type Error!!!");
}
//函数本身返回值为string,需要返回string类型的鉴别器对象
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
String encryptPhonum=null;
//手机号不为空,并且为11位
if(arguments[0].get()==null){
return null;
}else if (arguments[0].get().toString().trim().length()==11){
String phoNum = arguments[0].get().toString().trim();
//判断数据是否满足中国大陆手机号码规范
String regex = "^1[3-9]\\d{9}$";
//创建一个模式对象:将给定的正则表达式编译成模式
Pattern pattern = Pattern.compile(regex);
//创建一个匹配器对象,matcher匹配器按照pattern(模式)到phoNum中去匹配
Matcher matcher = pattern.matcher(phoNum);
//matches()尝试将整个区域与模式匹配
if (matcher.matches()){ //进入这里的都是符合手机号规则的
//使用正则替换 返回加密后的数据
encryptPhonum= phoNum.replaceAll("(\\d{3})\\d{4}(\\d{4})", "$1****$2");
}else {
//不符合手机号规则 数据直接原封不动返回
encryptPhonum=phoNum;
}
}
return encryptPhonum;
}
@Override //使用explain查看SQL执行计划时,会显示null
public String getDisplayString(String[] strings) {
return null;
}
}
4)打成 jar 包上传到服务器/opt/module/datas/myudf.jar

把jar包上传到Hiveserver2服务运行所在机器的linux系统,或者HDFS文件系统。

5)在客户端中使用命令把jar包添加到hive的classpath。
add jar /opt/module/hive-3.1.2/datas/HiveProjectUDF-1.0-SNAPSHOT.jar;
6)创建临时函数与开发好的 java class 关联
create temporary function encrypt_phonum as "com.hyj.UDF.EncryptPhoneNumber";
7)即可在 hql 中使用自定义的函数
select encrypt_phonum(15303615860);
grouping sets是一种将多个group by逻辑写在一个sql语句中的便利写法。
等价于将不同维度的GROUP BY结果集进行UNION ALL。
GROUPING__ID表示结果属于哪一个分组集合。
#字段:月份、天、用户cookieid
[hyj@hadoop102 datas]$ vim cookie_info.txt
2018-03,2018-03-10,cookie1
2018-03,2018-03-10,cookie5
2018-03,2018-03-12,cookie7
2018-04,2018-04-12,cookie3
2018-04,2018-04-13,cookie2
2018-04,2018-04-13,cookie4
2018-04,2018-04-16,cookie4
2018-03,2018-03-10,cookie2
2018-03,2018-03-10,cookie3
2018-04,2018-04-12,cookie5
2018-04,2018-04-13,cookie6
2018-04,2018-04-15,cookie3
2018-04,2018-04-15,cookie2
2018-04,2018-04-16,cookie1
create table cookie_info(
month string,
day string,
cookieid string
) row format delimited
fields terminated by ',';
load data local inpath '/opt/module/hive-3.1.2/datas/cookie_info.txt' into table cookie_info;
select * from cookie_info;
---group sets---------
select
month,
day,
count(distinct cookieid) as nums,
grouping__id
from cookie_info
group by month,day
grouping sets (month,day) --这里是关键
order by grouping__id;
--grouping_id表示这一组结果属于哪个分组集合,
--根据grouping sets中的分组条件month,day,1是代表month,2是代表day
--等价于
SELECT month,NULL as day,COUNT(DISTINCT cookieid) AS nums,1 AS GROUPING__ID FROM cookie_info GROUP BY month
UNION ALL
SELECT NULL as month,day,COUNT(DISTINCT cookieid) AS nums,2 AS GROUPING__ID FROM cookie_info GROUP BY day;

--再比如
SELECT
month,
day,
COUNT(DISTINCT cookieid) AS nums,
GROUPING__ID
FROM cookie_info
GROUP BY month,day
GROUPING SETS (month,day,(month,day)) --1 month 2 day 3 (month,day)
ORDER BY GROUPING__ID;
--等价于
SELECT month,NULL as day,COUNT(DISTINCT cookieid) AS nums,1 AS GROUPING__ID FROM cookie_info GROUP BY month
UNION ALL
SELECT NULL as month,day,COUNT(DISTINCT cookieid) AS nums,2 AS GROUPING__ID FROM cookie_info GROUP BY day
UNION ALL
SELECT month,day,COUNT(DISTINCT cookieid) AS nums,3 AS GROUPING__ID FROM cookie_info GROUP BY month,day;
2^n。------cube---------------
SELECT
month,
day,
COUNT(DISTINCT cookieid) AS nums,
GROUPING__ID
FROM cookie_info
GROUP BY month,day
WITH CUBE
ORDER BY GROUPING__ID;
--等价于
SELECT month,day,COUNT(DISTINCT cookieid) AS nums,0 AS GROUPING__ID FROM cookie_info GROUP BY month,day
UNION ALL
SELECT month,NULL as day,COUNT(DISTINCT cookieid) AS nums,1 AS GROUPING__ID FROM cookie_info GROUP BY month
UNION ALL
SELECT NULL as month,day,COUNT(DISTINCT cookieid) AS nums,2 AS GROUPING__ID FROM cookie_info GROUP BY day
UNION ALL
SELECT NULL as month,NULL as day,COUNT(DISTINCT cookieid) AS nums,3 AS GROUPING__ID FROM cookie_info;
cube的语法功能指的是:根据GROUP BY的维度的所有组合进行聚合。
rollup是Cube的子集,以最左侧的维度为主,从该维度进行层级聚合。
比如ROLLUP有a,b,c 3个维度,则所有组合情况是:
((a,b,c),(a,b),(a),())。
--以month维度进行层级聚合:
SELECT
month,
day,
COUNT(DISTINCT cookieid) AS nums,
GROUPING__ID
FROM cookie_info
GROUP BY month,day
WITH ROLLUP
ORDER BY GROUPING__ID;

2) 以day维度进行层级聚合
--把month和day调换顺序,则以day维度进行层级聚合:
SELECT
day,
month,
COUNT(DISTINCT cookieid) AS uv,
GROUPING__ID
FROM cookie_info
GROUP BY day,month
WITH ROLLUP
ORDER BY GROUPING__ID;
单字节分隔符来加载文本数据,例如逗号、制表符、空格等等,默认的分隔符为\001。根据不同文件的不同分隔符,我们可以通过在创建表时使用 row format delimited fields terminated by ‘单字节分隔符’ 来指定文件中的分割符,确保正确将表中的每一列与文件中的每一列实现一一对应的关系。||”、“--”等
情况二:数据的字段中包含了分隔符

上图中每列的分隔符为空格,但是数据中包含了分割符,时间字段中也有空格
192.168.88.134 [08/Nov/2020:10:44:32 +0800] "GET / HTTP/1.1" 404 951
若是直接建表加载数据,时间字段会被切分成两个字段,后面所有的字段出现了错位
<dependencies>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-commonartifactId>
<version>3.1.2version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-hdfsartifactId>
<version>3.1.2version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-hdfs-clientartifactId>
<version>3.1.2version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>3.1.2version>
dependency>
<dependency>
<groupId>org.junit.jupitergroupId>
<artifactId>junit-jupiter-apiartifactId>
<version>RELEASEversion>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.1version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
plugins>
build>
(2)Mapper
package com.hyj.splitpackage;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*若想要自定义的类中的map方法支持分布式并行计算,就必须继承自MapReduce框架的Mapper类
*本类继承Mapper类,负责重写父类Mapper中的map方法
*Mapper 泛型的参数含义为 LongWritable表示文本偏移量,Text表示读取的一行文本,
* 第二个Test表示map方法输出的key类型,LongWritable表示map方法输出的value类型
*/
public class ChangeSplitMapper extends Mapper<LongWritable,Text,Text, NullWritable> {
//定义输出的Key
private Text outputKey=new Text();
//定义输出的value
private NullWritable outputValue=NullWritable.get();
@Override setup()方法只执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作
protected void setup(Context context){
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将一行数据转换成字符串
String line = value.toString();
//将里面的||转换为|
String newLine = line.replaceAll("\\|\\|", "|");
//替换后的内容作为Key
outputKey.set(newLine);
//输出结果
context.write(outputKey,outputValue);
/*context是MapReduce框架的上下文对象,可以存放公共类型的数据,
比如,map方法处理完的中间结果可以保存到context上下文对象中
*/
}
@Override //释放资源
protected void cleanup(Context context){
}
}
(3)Driver
package com.hyj.splitpackage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ChangeSplitDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration(); //获取配置信息
Job job = Job.getInstance(configuration); //获取job对象实例
job.setJarByClass(ChangeSplitDriver.class); //设置job运行时的程序入口主类
job.setMapperClass(ChangeSplitMapper.class);
//指定Mapper输出数据的key/value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//reduce:不需要reduce过程
job.setNumReduceTasks(0);
//指定作业job的输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定作业job的输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交作业
boolean result=job.waitForCompletion(true);
//根据job执行返回的结果(正常/异常)退出程序
System.exit( result? 0 : 1);
}
}
(4)将此程序打成jar包上传到hadoop102的/opt/module/hive-3.1.2/datas/目录下
(5)运行程序
[hyj@hadoop102 datas]$ hadoop jar MR-Project-1.0-SNAPSHOT.jar com.hyj.splitpackage.ChangeSplitDriver file:///opt/module/hive-3.1.2/datas/singer.txt file:///opt/module/hive-3.1.2/datas/xin_singer
file://是指本地文件路径,由于这里是在虚拟机中运行,所以虚拟机就成了“本地机”,那么相应的资源文件路径,就应当是虚拟机中的某个路径。
当没有了前缀 file://的时候默认的路径是hdfs的文件路径!
(6)查看转换后的文件
[hyj@hadoop102 datas]$ cd xin_singer/
[hyj@hadoop102 xin_singer]$ ll
总用量 4
-rw-r--r-- 1 hyj hyj 403 7月 31 16:18 part-m-00000
drwxr-xr-x 3 hyj hyj 15 7月 31 16:18 _temporary
[hyj@hadoop102 xin_singer]$ cat part-m-00000
01|周杰伦|中国|台湾|男|七里香
02|刘德华|中国|香港|男|笨小孩
03|汪 峰|中国|北京|男|光明
04|朴 树|中国|北京|男|那些花儿
05|许 巍|中国|陕西|男|故乡
06|张靓颖|中国|四川|女|画心
07|黄家驹|中国|香港|男|光辉岁月
08|周传雄|中国|台湾|男|青花
09|刘若英|中国|台湾|女|很爱很爱你
10|张 杰|中国|四川|男|天下
(7)重新建表加载数据
--如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
id string,--歌手id
name string,--歌手名称
country string,--国家
province string,--省份
gender string,--性别
works string--作品
) row format delimited fields terminated by '|';
--加载数据
load data local inpath '/opt/module/hive-3.1.2/datas/xin_singer/part-m-00000' into table singer;
select * from singer;
在ETL阶段可以直接对数据进行分隔符的替换,通过替换分隔符将多字节分隔符更改为单字节分隔符,就可以解决数据加载的问题,但是这种方式有对应的优缺点,并不是所有的场景适用于该方法。
优点:实现方式较为简单,基于字符串替换即可
缺点:无法满足情况2的需求
Serialize就是序列化,用于将Hive中使用的java object转换成能写入hdfs的字节序列,或者其他系统能识别的流文件。Hive中的insert语句用于将数据写入HDFS,所以就会调用序列化实现。Hive中的调用过程如下:
Deserilize就是反序列化,用于将字符串或者二进制数据流转换成Hive能识别的java object对象。所有Hive中的Select语句在查询数据时,需要将HDFS中的数据解析为Hive中对象,就需要进行反序列化。Hive可以方便的将数据加载到表中而不需要对数据进行转换,这样在处理海量数据时可以节省大量的时间。Hive中的调用过程如下:
01||周杰伦||中国||台湾||男||七里香
([0-9]*)\\|\\|(.*)\\|\\|(.*)\\|\\|(.*)\\|\\|(.*)\\|\\|(.*)
正则表达式在线测试:https://c.runoob.com/front-end/854/
(2)基于正则表达式,使用RegexSerde建表
--如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
id string,--歌手id
name string,--歌手名称
country string,--国家
province string,--省份
gender string,--性别
works string--作品
)
--指定使用RegexSerde加载数据
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
--指定正则表达式
WITH SERDEPROPERTIES (
"input.regex" = "([0-9]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)"
);
--加载数据
load data local inpath '/opt/module/hive-3.1.2/datas/singer.txt' into table singer;
--查看结果 发现每一列的数据都被正常的加载,没有错位
select * from singer;
使用RegexSerDe类时,所有的字段 数据类型必须为string
input.regex中需要全匹配整行数据,每个括号表示一个字段。
“input.regex” = "([^ ])\t([^ ])\t([^ ])"则表示按制表符\t分割。
192.168.88.100 [08/Nov/2020:10:44:33 +0800] "GET /hpsk_sdk/index.html HTTP/1.1" 200 328
([^ ]*) ([^}]*) ([^ ]*) ([^ ]*) ([^ ]*) ([0-9]*) ([^ ]*)
或 (可以将空格替换成\\s)
([^\\s]*)\\s(.*)\\s([^\\s]*)\\s([^\\s]*)\\s([^\\s]*)\\s([0-9]*)\\s([^\\s]*)
(2)基于正则表达式,使用RegexSerde建表
--如果表存在,就删除表
drop table if exists apachelog;
--创建表
create table apachelog(
ip string, --IP地址
stime string, --时间
mothed string, --请求方式
url string, --请求地址
policy string, --请求协议
stat string, --请求状态
body string --字节大小
)
--指定使用RegexSerde加载数据
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
--指定正则表达式
WITH SERDEPROPERTIES (
"input.regex" = "([^ ]*) ([^}]*) ([^ ]*) ([^ ]*) ([^ ]*) ([0-9]*) ([^ ]*)"
);
--加载数据
load data local inpath '/opt/module/hive-3.1.2/datas/apache_web_access.log' into table apachelog;
--查看结果 发现时间字段不再被分割为两个字段,整体作为一个字段被加载
select * from apachelog;
方案概述
Hive中也允许使用自定义InputFormat来解决以上问题,通过在自定义InputFormat,来自定义解析逻辑实现读取每一行的数据。
自定义InputFormat
(1)自定义InputFormat继承TextInputFormat
ackage com.hyj.inputformat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
public class UserInputFormat extends TextInputFormat {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(genericSplit.toString());
UserRecordReader reader = new UserRecordReader(job,(FileSplit)genericSplit);
return reader;
}
}
(2)自定义RecordReader,实现RecordReader接口,实现next方法.
读取数据时将每条数据中的”||”全部替换成“|”

package com.hyj.inputformat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
import java.io.InputStream;
/**
* @ClassName UserRecordReader
* @Description TODO 用于自定义读取器,在自定义InputFormat中使用,将读取到的每行数据中的||替换为|
*/
public class UserRecordReader implements RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class.getName());
int maxLineLength;
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private Seekable filePosition;
private CompressionCodec codec;
private Decompressor decompressor;
public UserRecordReader(Configuration job, FileSplit split) throws IOException {
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec)
.createInputStream(fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new LineReader(cIn, job);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new LineReader(fileIn, job);
filePosition = fileIn;
}
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
private boolean isCompressedInput() {
return (codec != null);
}
private int maxBytesToConsume(long pos) {
return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal;
if (isCompressedInput() && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
/**
* Read a line.
*/
public synchronized boolean next(LongWritable key, Text value) throws IOException {
while (getFilePosition() <= end) {
key.set(pos);
int newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength));
String str = value.toString().replaceAll("\\|\\|", "\\|");
value.set(str);
pos += newSize;
if (newSize == 0) {
return false;
}
if (newSize < maxLineLength) {
return true;
}
LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}
return false;
}
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start));
}
}
public synchronized long getPos() throws IOException {
return pos;
}
public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
}
}
public static class LineReader extends org.apache.hadoop.util.LineReader {
LineReader(InputStream in) {
super(in);
}
LineReader(InputStream in, int bufferSize) {
super(in, bufferSize);
}
public LineReader(InputStream in, Configuration conf) throws IOException {
super(in, conf);
}
}
}
add jar /opt/module/hive-3.1.2/lib/HiveUserInputFormat.jar;
该方法可以实现临时添加,如果希望永久生效,重启Hive即可
--如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
id string,--歌手id
name string,--歌手名称
country string,--国家
province string,--省份
gender string,--性别
works string--作品
)
--指定使用分隔符为|
row format delimited fields terminated by '|'
stored as
--指定使用自定义的类实现解析
inputformat 'com.hyj.inputformat.UserInputFormat'
outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
desc formatted singer;
load data local inpath '/opt/module/hive-3.1.2/datas/singer.txt' into table singer;
--查看结果 发现数据正常匹配,没有出现错位。
select * from singer;
当数据文件中出现多字节分隔符或者数据中包含了分隔符时,会导致数据加载与实际表的字段不匹配的问题,基于这个问题我们提供了三种方案:替换分隔符、正则加载及自定义InputFormat来实现,其中替换分隔符无法解决数据中存在分隔符的问题,自定义InputFormat的开发成本较高,所以整体推荐使用正则加载的方式来实现对于特殊数据的处理。
在对URL进行解析时,我们要先了解URL的基本组成部分,再根据实际的需求从URL中获取对应的部分,例如一条URL由以下几个部分组成:

PROTOCOL:协议类型HOST:域名PATH:访问路径QUERY:参数数据Hive中为了实现对URL的解析,专门提供了解析URL的函数parse_url和parse_url_tuple,在show functions中可以看到对应函数
[hyj@hadoop102 datas]$ vim url.txt
1 http://facebook.com/path/p1.php?query=1
2 http://tongji.baidu.com/news/index.jsp?uuid=frank
3 http://www.jdwz.com/index?source=baidu
4 http://www.itcast.cn/index?source=alibaba
/*创建数据库*/
create database if not exists db_function;
/*切换数据库*/
use db_function;
/*创建数据表*/
create table tb_url(
id int,
url string
) row format delimited fields terminated by '\t';
/*加载数据*/
load data local inpath '/opt/module/hive-3.1.2/datas/url.txt' into table tb_url;
/*查询数据*/
select * from tb_url;

基于当前的数据,实现对URL进行分析,从URL中获取每个ID对应HOST、PATH以及QUERY,最终实现效果如下:

parse_url函数是Hive中提供的最基本的url解析函数,可以根据指定的参数,从URL解析出对应的参数值进行返回,函数为普通的一对一函数类型。parse_url(url, partToExtract[, key]) - extracts a part from a URL
Parts: HOST, PATH, QUERY, REF, PROTOCOL, AUTHORITY, FILE, USERINFO key
parse_url 在使用时需要指定两个参数
第一个参数:url:指定要解析的URL
第二个参数:key:指定要解析的内容
-- 查询tb_url中每个url的HOST
select id,url,parse_url(url,"HOST") as host from tb_url;

-- 查询tb_url中每个url的PATH
select id,url,parse_url(url,"PATH") as path from tb_url;

--查询tb_url中每个url的QUERY
select id,url,parse_url(url,"QUERY") as query from tb_url;

-- 实现需求
select
id,
parse_url(url,"HOST") as host,
parse_url(url,"PATH") as path,
parse_url(url,"QUERY") as query,
parse_url(url,"PROTOCOL") as protocol
from
tb_url;

SELECT parse_url('http://facebook.com/path/p1.php?query=1', 'QUERY') FROM tb_url LIMIT 1; -- query=1
SELECT parse_url('http://facebook.com/path/p1.php?query=1', 'QUERY', 'query') FROM tb_url LIMIT 1; -- 1
SELECT parse_url('http://www.jdwz.com/index?source=baidu', 'QUERY', 'source') FROM tb_url LIMIT 1; -- baidu
使用parse_url函数每次只能解析一个参数,导致需要经过多个函数调用才能构建多列,开发角度较为麻烦,实现过程性能也相对较差,需要对同一列做多次计算处理,我们希望能实现调用一次函数,就可以将多个参数进行解析,得到多列结果。
parse_url_tuple函数是Hive中提供的基于parse_url的url解析函数,可以通过一次指定多个参数,从URL解析出多个参数的值进而返回多列,函数为特殊的一对多函数类型,即通常所说的UDTF函数类型。parse_url_tuple(url, partname1, partname2, ..., partnameN) - extracts N (N>=1) parts from a URL.
It takes a URL and one or multiple partnames, and returns a tuple. All the input parameters and output column types are string.
Partname: HOST, PATH, QUERY, REF, PROTOCOL, AUTHORITY, FILE, USERINFO, QUERY:<KEY_NAME>
parse_url_tuple在使用时可以指定多个参数
第一个参数:url:指定要解析的URL
第二个参数:key1:指定要解析的内容1
……
第N个参数:keyN:指定要解析的内容N
--查询tb_url中每个url的HOST、PATH
select parse_url_tuple(url,"HOST","PATH") as (host,path) from tb_url;

--查询tb_url中每个url的PROTOCOL、HOST、QUERY
select parse_url_tuple(url,"PROTOCOL","HOST","PATH") as (protocol,host,path) from tb_url;

--实现需求
select parse_url_tuple(url,"HOST","PATH","QUERY") as (host,path,query) from tb_url;

当前实现的过程中,通过parse_url_tuple实现了通过调用一个函数,就可以从URL中解析得到多个参数的值,但是当我们将原表的字段放在一起查询时,会出现以下问题:
select
id,
parse_url_tuple(url,"HOST","PATH","QUERY") as (host,path,query)
from tb_url;
SQL 错误 [40000] [42000]: Error while compiling statement: FAILED: SemanticException 3:53 AS clause has an invalid number of aliases. Error encountered near token 'path'
......
Hive中的一对多的UDTF函数可以实现高效的数据转换,但是也存在着一些使用中的问题,UDTF函数对于很多场景下有使用限制,例如:select时不能包含其他字段、不能嵌套调用、不能与group by等放在一起调用等等。
UDTF函数的调用方式,主要有以下两种方式:
方式一:直接在select后单独使用
方式二:与Lateral View放在一起使用
功能
Lateral View是一种特殊的语法,主要用于搭配UDTF类型功能的函数一起使用,用于解决UDTF函数的一些查询限制的问题。
侧视图的原理是将UDTF的结果构建成一个类似于视图的表,然后将原表中的每一行和UDTF函数输出的每一行进行连接,生成一张新的虚拟表。这样就避免了UDTF的使用限制问题。使用lateral view时也可以对UDTF产生的记录设置字段名称,产生的字段可以用于group by、order by 、limit等语句中,不需要再单独嵌套一层子查询。
一般只要使用UDTF,就会固定搭配lateral view使用。
官方链接:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView
语法
lateralView: LATERAL VIEW udtf(expression) tableAlias AS columnAlias (',' columnAlias)*
fromClause: FROM baseTable (lateralView)*
基本语法如下:
select …… from tabelA lateral view UDTF(xxx) 别名 as col1,col2,col3……
select
a.id as id,
b.host as host,
b.path as path,
b.query as query
from tb_url a
lateral view parse_url_tuple(url,"HOST","PATH","QUERY") b as host,path,query;
(2)多lateral view调用
select
a.id as id,
b.host as host,
b.path as path,
c.protocol as protocol,
c.query as query
from tb_url a
lateral view parse_url_tuple(url,"HOST","PATH") b as host,path
lateral view parse_url_tuple(url,"PROTOCOL","QUERY") c as protocol,query;
select
id,
url,
col1
from tb_url
lateral view explode(array()) et as col1;

如果加上outer关键字以后,就会保留原表数据,类似于outer join
select
id,
url,
col1
from tb_url
lateral view outer explode(array()) et as col1;
原始数据表

目标结果表

CASE
WHEN 条件1 THEN VALUE1
WHEN 条件2 THEN VALUE2
……
WHEN 条件N THEN VALUEN
ELSE 默认值
END
CASE 列
WHEN V1 THEN VALUE1
WHEN V2 THEN VALUE2
……
WHEN VN THEN VALUEN
ELSE 默认值
END
select
id,
case
when id < 2 then 'a'
when id = 2 then 'b'
else 'c'
end as caseName
from tb_url;

select
id,
case id
when 1 then 'a'
when 2 then 'b'
else 'c'
end as caseName
from tb_url;
[hyj@hadoop102 datas]$ vim r2c1.txt
a c 1
a d 2
a e 3
b c 4
b d 5
b e 6
--切换数据库
use db_function;
--建表
create table row2col1(
col1 string,
col2 string,
col3 int
) row format delimited fields terminated by '\t';
--加载数据到表中
load data local inpath '/opt/module/hive-3.1.2/datas/r2c1.txt' into table row2col1;
select * from row2col1;
--SQL实现转换
select
col1 as col1,
max(case col2 when 'c' then col3 else 0 end) as c,
max(case col2 when 'd' then col3 else 0 end) as d,
max(case col2 when 'e' then col3 else 0 end) as e
from
row2col1
group by
col1;


concat(element1,element2,element3……)
如果任意一个元素为null,结果就为nullselect concat("hello","spark","and","hive","!!!");
select concat("hello","hadoop","java",null);
concat_ws(SplitChar,element1,element2……)
select concat_ws("-","hello","hive","hadoop");
select concat_ws("-","hello",null,"java","spark"); --hello-java-spark
collect_list(colName)
select collect_list(col1) from row2col1;
collect_set(colName)
select collect_set(col1) from row2col1;
数据准备
[hyj@hadoop102 datas]$ vim r2c2.txt
a b 1
a b 2
a b 3
c d 4
c d 5
c d 6
--切换数据库
use db_function;
--建表
create table row2col2(
col1 string,
col2 string,
col3 int
)row format delimited fields terminated by '\t';
--加载数据到表中
load data local inpath '/opt/module/hive-3.1.2/datas/r2c2.txt' into table row2col2;
select * from row2col2;
--SQL实现转换
select
col1,
col2,
concat_ws(',', collect_list(cast(col3 as string))) as col3
from
row2col2
group by
col1, col2;
原始数据表

目标结果表

结果去重且排序select_statement
UNION [DISTINCT]
select_statement
UNION [DISTINCT]
select_statement ...
select 'b','a','c'
union
select 'a','b','c'
union
select 'a','b','c';
结果不去重不排序select_statement UNION ALL select_statement UNION ALL select_statement ...
select 'b','a','c'
union all
select 'a','b','c'
union all
select 'a','b','c';

[hyj@hadoop102 datas]$ vim c2r1.txt
a 1 2 3
b 4 5 6
--切换数据库
use db_function;
--创建表
create table col2row1(
col1 string,
col2 int,
col3 int,
col4 int
) row format delimited fields terminated by '\t';
--加载数据
load data local inpath '/opt/module/hive-3.1.2/datas/c2r1.txt' into table col2row1;
select * from col2row1;
--SQL实现转换
select col1, 'c' as col2, col2 as col3 from col2row1
UNION ALL
select col1, 'd' as col2, col3 as col3 from col2row1
UNION ALL
select col1, 'e' as col2, col4 as col3 from col2row1;
原始数据表

目标结果表

explode( Map | Array)
select explode(split("a,b,c,d",","));
数据准备
[hyj@hadoop102 datas]$ vim c2r2.txt
a b 1,2,3
c d 4,5,6
--切换数据库
use db_function;
--创建表
create table col2row2(
col1 string,
col2 string,
col3 string
)row format delimited fields terminated by '\t';
--加载数据
load data local inpath '/opt/module/hive-3.1.2/datas/c2r2.txt' into table col2row2;
select * from col2row2;
select
col1,
col2,
lv.col3 as col3
from
col2row2
lateral view
explode(split(col3, ',')) lv as col3;
JSON数据格式是数据存储及数据处理中最常见的结构化数据格式之一,很多场景下公司都会将数据以JSON格式存储在HDFS中,当构建数据仓库时,需要对JSON格式的数据进行处理和分析,那么就需要在Hive中对JSON格式的数据进行解析读取。
例如,当前我们JSON格式的数据如下:

每条数据都以JSON形式存在,每条数据中都包含4个字段,分别为设备名称【device】、设备类型【deviceType】、信号强度【signal】和信号发送时间【time】,现在我们需要将这四个字段解析出来,在Hive表中以每一列的形式存储,最终得到以下Hive表:

返回顶部
Hive中为了实现JSON格式的数据解析,提供了两种解析JSON数据的方式,在实际工作场景下,可以根据不同数据,不同的需求来选择合适的方式对JSON格式数据进行处理。
方式一:使用JSON函数进行处理
Hive中提供了两个专门用于解析JSON字符串的函数:get_json_object、json_tuple,这两个函数都可以实现将JSON数据中的每个字段独立解析出来,构建成表。
方式二:使用Hive内置的JSON Serde加载数据
Hive中除了提供JSON的解析函数以外,还提供了一种专门用于加载JSON文件的Serde来实现对JSON文件中数据的解析,在创建表时指定Serde,加载文件到表中,会自动解析为对应的表格式。
get_json_object(json_txt, path) - Extract a json object from path
第一个参数:指定要解析的JSON字符串
第二个参数:指定要返回的字段,通过$.columnName的方式来指定path
特点:每次只能返回JSON对象中一列的值
[hyj@hadoop102 datas]$ vim device.json
{"device":"device_30","deviceType":"kafka","signal":98.0,"time":1616817201390}
{"device":"device_40","deviceType":"route","signal":99.0,"time":1616817201887}
{"device":"device_21","deviceType":"bigdata","signal":77.0,"time":1616817202142}
{"device":"device_31","deviceType":"kafka","signal":98.0,"time":1616817202405}
{"device":"device_20","deviceType":"bigdata","signal":12.0,"time":1616817202513}
{"device":"device_54","deviceType":"bigdata","signal":14.0,"time":1616817202913}
{"device":"device_10","deviceType":"db","signal":39.0,"time":1616817203356}
{"device":"device_94","deviceType":"bigdata","signal":59.0,"time":1616817203771}
{"device":"device_32","deviceType":"kafka","signal":52.0,"time":1616817204010}
{"device":"device_21","deviceType":"bigdata","signal":85.0,"time":1616817204229}
{"device":"device_74","deviceType":"bigdata","signal":27.0,"time":1616817204720}
{"device":"device_91","deviceType":"bigdata","signal":50.0,"time":1616817205164}
{"device":"device_62","deviceType":"db","signal":89.0,"time":1616817205328}
{"device":"device_21","deviceType":"bigdata","signal":25.0,"time":1616817205457}
{"device":"device_76","deviceType":"bigdata","signal":62.0,"time":1616817205984}
{"device":"device_74","deviceType":"bigdata","signal":44.0,"time":1616817206571}
{"device":"device_42","deviceType":"route","signal":43.0,"time":1616817206681}
{"device":"device_32","deviceType":"kafka","signal":65.0,"time":1616817207131}
{"device":"device_32","deviceType":"kafka","signal":95.0,"time":1616817207714}
{"device":"device_71","deviceType":"bigdata","signal":45.0,"time":1616817207907}
{"device":"device_32","deviceType":"kafka","signal":81.0,"time":1616817208320}
{"device":"device_10","deviceType":"db","signal":81.0,"time":1616817208907}
{"device":"device_20","deviceType":"bigdata","signal":69.0,"time":1616817209287}
{"device":"device_61","deviceType":"db","signal":98.0,"time":1616817209785}
{"device":"device_30","deviceType":"kafka","signal":95.0,"time":1616817210104}
{"device":"device_43","deviceType":"route","signal":57.0,"time":1616817210540}
{"device":"device_10","deviceType":"db","signal":36.0,"time":1616817211134}
{"device":"device_20","deviceType":"bigdata","signal":75.0,"time":1616817211248}
{"device":"device_64","deviceType":"db","signal":68.0,"time":1616817211812}
{"device":"device_53","deviceType":"bigdata","signal":60.0,"time":1616817212237}
{"device":"device_52","deviceType":"bigdata","signal":57.0,"time":1616817212709}
{"device":"device_30","deviceType":"kafka","signal":75.0,"time":1616817213073}
{"device":"device_31","deviceType":"kafka","signal":83.0,"time":1616817213614}
{"device":"device_93","deviceType":"bigdata","signal":54.0,"time":1616817214101}
{"device":"device_20","deviceType":"bigdata","signal":84.0,"time":1616817214639}
--切换数据库
use db_function;
--创建表
create table tb_json_test1 (
json string
);
--加载数据
load data local inpath '/opt/module/hive-3.1.2/datas/device.json' into table tb_json_test1;
select * from tb_json_test1;
select
json,
get_json_object(json,"$.device") as device, --获取设备名称字段
get_json_object(json,"$.deviceType") as deviceType, --获取设备类型
get_json_object(json,"$.signal") as signal, ----获取设备信号强度
get_json_object(json,"$.time") as stime --获取时间
from tb_json_test1;
json_tuple(jsonStr, p1, p2, ..., pn)
like get_json_object, but it takes multiple names and return a tuple
(1)参数
属于UDTF类型函数.字符串类型.select
--返回设备名称及信号强度
json_tuple(json,"device","signal") as (device,signal)
from tb_json_test1;
实现需求,单独使用
select
--解析所有字段
json_tuple(json,"device","deviceType","signal","time") as (device,deviceType,signal,stime)
from tb_json_test1;
实现需求,搭配侧视图
select
json,device,deviceType,signal,stime
from tb_json_test1
lateral view json_tuple(json,"device","deviceType","signal","time") b
as device,deviceType,signal,stime;
--切换数据库
use db_function;
--创建表
create table tb_json_test2 (
device string,
deviceType string,
signal double,
`time` string
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE;
load data local inpath '/opt/module/hive-3.1.2/datas/device.json' into table tb_json_test2;
select * from tb_json_test2;
drop table if exists tb_json_test3;
create table tb_json_test3 (
device string,
deviceType string,
`time` string
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
load data local inpath '/opt/module/hive-3.1.2/datas/device.json' into table tb_json_test3;
select * from tb_json_test3;
不论是Hive中的JSON函数还是自带的JSONSerde,都可以实现对于JSON数据的解析,工作中一般根据数据格式以及对应的需求来实现解析。如果数据中每一行只有个别字段是JSON格式字符串,就可以使用JSON函数来实现处理,但是如果数据加载的文件整体就是JSON文件,每一行数据就是一个JSON数据,那么建议直接使用JSONSerde来实现处理最为方便。
当前有一份用户登录数据如下图所示,数据中有两个字段,分别是userId和loginTime。

userId表示唯一的用户ID,唯一标识一个用户,loginTime表示用户的登录日期,例如第一条数据就表示A在2021年3月22日登录了。
现在需要对用户的登录次数进行统计,得到连续登陆N(N>=2)天的用户。
例如统计连续两天的登录的用户,需要返回A和C,因为A在22/23/24都登录了,所以肯定是连续两天登录,C在22和23号登录了,所以也是连续两天登录的。
例如统计连续三天的登录的用户,只能返回A,因为只有A是连续三天登录的。
基于以上的需求根据数据寻找规律,要想得到连续登陆用户,必须找到两个相同用户ID的行之间登陆日期之间的关系。
例如:统计连续登陆两天的用户,只要用户ID相等,并且登陆日期之间相差1天即可。基于这个规律,我们有两种方案可以实现该需求。
方案一:实现表中的数据自连接,构建笛卡尔积,在结果中找到符合条件的id即可方案二:使用窗口函数来实现[hyj@hadoop102 datas]$ vim login.log
A 2021-03-22
B 2021-03-22
C 2021-03-22
A 2021-03-23
C 2021-03-23
A 2021-03-24
B 2021-03-24
--切换数据库
use db_function;
drop table if exists tb_login;
--建表
create table tb_login(
userid string,
logintime string
) row format delimited fields terminated by '\t';
load data local inpath '/opt/module/hive-3.1.2/datas/login.log' into table tb_login;
select * from tb_login;
--构建笛卡尔积
select
a.userid as a_userid,
a.logintime as a_logintime,
b.userid as b_userid,
b.logintime as b_logintime
from tb_login a,tb_login b;
--保存为表
create table tb_login_tmp as
select
a.userid as a_userid,
a.logintime as a_logintime,
b.userid as b_userid,
b.logintime as b_logintime
from tb_login a,tb_login b;
--过滤数据:用户id相同并且登陆日期相差1
select
a_userid,a_logintime,b_userid,b_logintime
from tb_login_tmp
where a_userid = b_userid
and cast(substr(a_logintime,9,2) as int) - 1 = cast(substr(b_logintime,9,2) as int);

--统计连续登陆两天的用户
select
distinct a_userid
from tb_login_tmp
where a_userid = b_userid
and cast(substr(a_logintime,9,2) as int) - 1 = cast(substr(b_logintime,9,2) as int);
lead(colName,N,defautValue)--统计连续2天登录
select
userid,
logintime,
--本次登陆日期的第二天
date_add(logintime,1) as nextday,
--按照用户id分区,按照登陆日期排序,取下一次登陆时间,取不到就为0
lead(logintime,1,0) over (partition by userid order by logintime) as nextlogin
from tb_login;
with t1 as (
select
userid,
logintime,
--本次登陆日期的第二天
date_add(logintime,1) as nextday,
--按照用户id分区,按照登陆日期排序,取下一次登陆时间,取不到就为0
lead(logintime,1,0) over (partition by userid order by logintime) as nextlogin
from tb_login )
select distinct userid from t1 where nextday = nextlogin;
--统计连续3天登录
select
userid,
logintime,
--本次登陆日期的第三天
date_add(logintime,2) as nextday,
--按照用户id分区,按照登陆日期排序,取下下一次登陆时间,取不到就为0
lead(logintime,2,0) over (partition by userid order by logintime) as nextlogin
from tb_login;
with t1 as (
select
userid,
logintime,
--本次登陆日期的第三天
date_add(logintime,2) as nextday,
--按照用户id分区,按照登陆日期排序,取下下一次登陆时间,取不到就为0
lead(logintime,2,0) over (partition by userid order by logintime) as nextlogin
from tb_login )
select distinct userid from t1 where nextday = nextlogin;
--统计连续N天登录
select
userid,
logintime,
--本次登陆日期的第N天
date_add(logintime,N-1) as nextday,
--按照用户id分区,按照登陆日期排序,取下下一次登陆时间,取不到就为0
lead(logintime,N-1,0) over (partition by userid order by logintime) as nextlogin
from tb_login;
当前有一份消费数据如下,记录了每个用户在每个月的所有消费记录,数据表中一共有三列:

userId:用户唯一id,唯一标识一个用户
mth:用户消费的月份,一个用户可以在一个月多次消费
money:用户每次消费的金额
现在需要基于用户每个月的多次消费的记录进行分析,统计得到每个用户在每个月的消费总金额以及当前累计消费总金额,最后结果如下:

以用户A为例:
A在2021年1月份,共四次消费,分别消费5元、15元、8元、5元,所以本月共消费33元,累计消费33元。
A在2021年2月份,共两次消费,分别消费4元、6元,所以本月共消费10元,累计消费43元。
如果要实现以上需求,首先要统计出每个用户每个月的消费总金额,分组实现集合,但是需要按照用户ID,将该用户这个月之前的所有月份的消费总金额进行累加实现。该需求可以通过两种方案来实现:
方案一:分组统计每个用户每个月的消费金额,然后构建自连接,根据条件分组聚合
方案二:分组统计每个用户每个月的消费金额,然后使用窗口聚合函数实现
[hyj@hadoop102 datas]$ vim money.txt
A 2021-01 5
A 2021-01 15
B 2021-01 5
A 2021-01 8
B 2021-01 25
A 2021-01 5
A 2021-02 4
A 2021-02 6
B 2021-02 10
B 2021-02 5
A 2021-03 7
B 2021-03 9
A 2021-03 11
B 2021-03 6
--切换数据库
use db_function;
--建表
create table tb_money(
userid string,
mth string,
money int
) row format delimited fields terminated by '\t';
load data local inpath '/opt/module/hive-3.1.2/datas/money.txt' into table tb_money;
select * from tb_money;
--统计得到每个用户每个月的消费总金额
create table tb_money_mtn as
select
userid,
mth,
sum(money) as m_money
from tb_money
group by userid,mth;
--基于每个用户每个月的消费总金额进行自连接
select
a.userid as auserid,
a.mth as amth,
a.m_money as am_money,
b.userid as buserid,
b.mth as bmth,
b.m_money as bm_money
from tb_money_mtn a join tb_money_mtn b on a.userid = b.userid;
--将每个月之前月份的数据过滤出来
select
a.userid as auserid,
a.mth as amth,
a.m_money as am_money,
b.userid as buserid,
b.mth as bmth,
b.m_money as bm_money
from tb_money_mtn a join tb_money_mtn b on a.userid = b.userid
where a.mth >= b.mth;
--对每个用户每个月的金额进行分组,聚合之前月份的消费金额
select
a.userid as auserid,
a.mth as amth,
a.m_money as am_money,
sum(b.m_money) as t_money
from tb_money_mtn a join tb_money_mtn b on a.userid = b.userid
where a.mth >= b.mth
group by a.userid,a.mth,a.m_money;
select
userid,
mth,
m_money,
sum(m_money) over (partition by userid order by mth) as t_money
from tb_money_mtn;
工作中经常需要实现TopN的需求,例如热门商品Top10、热门话题Top20、热门搜索Top10、地区用户Top10等等,TopN是大数据业务分析中最常见的需求。
普通的TopN只要基于数据进行排序,然后基于排序后的结果取前N个即可,相对简单,但是在TopN中有一种特殊的TopN计算,叫做分组TopN。
分组TopN指的是基于数据进行分组,从每个组内取TopN,不再基于全局取TopN。如果要实现分组取TopN就相对麻烦。
例如:现在有一份数据如下,记录这所有员工的信息:

如果现在有一个需求:查询每个部门薪资最高的员工的薪水,这个可以直接基于表中数据分组查询得到
select deptno,max(salary) from tb_emp group by deptno;
但是如果现在需求修改为:统计查询每个部门薪资最高的前两名员工的薪水,这时候应该如何实现呢?
根据上述需求,这种情况下是无法根据group by分组聚合实现的,因为分组聚合只能实现返回一条聚合的结果,但是需求中需要每个部门返回薪资最高的前两名,有两条结果,这时候就需要用到窗口函数中的分区来实现了。
数据准备
[hyj@hadoop102 datas]$ vim emp.txt
7369 SMITH CLERK 7902 1980-12-17 800.00 20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-4-2 2975.00 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30
7782 CLARK MANAGER 7839 1981-6-9 2450.00 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-5-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-3 950.00 30
7902 FORD ANALYST 7566 1981-12-3 3000.00 20
7934 MILLER CLERK 7782 1982-1-23 1300.00 10
-切换数据库
use db_function;
--建表
create table tb_emp(
empno string,
ename string,
job string,
managerid string,
hiredate string,
salary double,
bonus double,
deptno string
) row format delimited fields terminated by '\t';
load data local inpath '/opt/module/hive-3.1.2/datas/emp.txt' into table tb_emp;
select * from tb_emp;
--基于row_number实现,按照部门分区,每个部门内部按照薪水降序排序
select
empno,
ename,
salary,
deptno,
row_number() over(partition by deptno order by salary desc) as rn
from tb_emp;
--过滤每个部门的薪资最高的前两名
with t1 as (
select
empno,
ename,
salary,
deptno,
row_number() over (partition by deptno order by salary desc) as rn
from tb_emp )
select * from t1 where rn < 3;
Hive在实际工作中主要用于构建离线数据仓库,定期的从各种数据源中同步采集数据到Hive中,经过分层转换提供数据应用。例如,每天需要从MySQL中同步最新的订单信息、用户信息、店铺信息等到数据仓库中,进行订单分析、用户分析。
例如:MySQL中有一张用户表:tb_user,每个用户注册完成以后,就会在用户表中新增该用户的信息,记录该用户的id、手机号码、用户名、性别、地址等信息。

每天都会有用户注册,产生新的用户信息,我们每天都需要将MySQL中的用户数据同步到Hive数据仓库中,在做用户分析时,需要对用户的信息做统计分析,例如统计新增用户的个数、总用户个数、用户性别分布、地区分布、运营商分布等指标。
在实现数据仓库数据同步的过程中,我们必须保证Hive中的数据与MySQL中的数据是一致的,这样才能确保我们最终分析出来的结果是准确的,没有问题的,但是在实现同步的过程中,这里会面临一个问题:如果MySQL中的数据发生了修改,Hive中如何存储被修改的数据?
例如以下情况



方案一:在Hive中用新的addr覆盖008的老的addr,直接更新

优点:实现最简单,使用起来最方便
缺点:没有历史状态,008的地址是1月2号在sh,但是1月2号之前是在gz的,如果要查询008的1月2号之前的addr就无法查询,也不能使用sh代替
方案二:每次数据改变,根据日期构建一份全量的快照表,每天一张表
2021-01-02:Hive中有一张表tb_user_2021-01-02

2021-01-03:Hive中有一张表tb_user_2021-01-03

优点:记录了所有数据在不同时间的状态
缺点:冗余存储了很多没有发生变化的数据,导致存储的数据量过大
方案三:构建拉链表,通过时间标记发生变化的数据的每种状态的时间周期

返回顶部
拉链表专门用于解决在数据仓库中数据发生变化如何实现数据存储的问题,如果直接覆盖历史状态,会导致无法查询历史状态,如果将所有数据单独切片存储,会导致存储大量非更新数据的问题。拉链表的设计是将更新的数据进行状态记录,没有发生更新的数据不进行状态存储,用于存储所有数据在不同时间上的所有状态,通过时间进行标记每个状态的生命周期,查询时,根据需求可以获取指定时间范围状态的数据,默认用9999-12-31等最大值来表示最新状态。
整体实现过程一般分为三步,第一步先增量采集所有新增数据【增加的数据和发生变化的数据】放入一张增量表。第二步创建一张临时表,用于将老的拉链表与增量表进行合并。第三步,最后将临时表的数据覆盖写入拉链表中。例如:
当前MySQL中的数据:

当前Hive数据仓库中拉链表的数据:

step1:增量采集变化数据,放入增量表中

step2:构建临时表,将Hive中的拉链表与临时表的数据进行合并

step3:将临时表的数据覆盖写入拉链表中
#构建模拟数据
[hyj@hadoop102 datas]$ vim zipper.txt
001 186xxxx1234 laoda 0 sh 2021-01-01 9999-12-31
002 186xxxx1235 laoer 1 bj 2021-01-01 9999-12-31
003 186xxxx1236 laosan 0 sz 2021-01-01 9999-12-31
004 186xxxx1237 laosi 1 gz 2021-01-01 9999-12-31
005 186xxxx1238 laowu 0 sh 2021-01-01 9999-12-31
006 186xxxx1239 laoliu 1 bj 2021-01-01 9999-12-31
007 186xxxx1240 laoqi 0 sz 2021-01-01 9999-12-31
008 186xxxx1241 laoba 1 gz 2021-01-01 9999-12-31
009 186xxxx1242 laojiu 0 sh 2021-01-01 9999-12-31
010 186xxxx1243 laoshi 1 bj 2021-01-01 9999-12-31
[hyj@hadoop102 datas]$ vim update.txt
008 186xxxx1241 laoba 1 sh 2021-01-02 9999-12-31
011 186xxxx1244 laoshi 1 jx 2021-01-02 9999-12-31
012 186xxxx1245 laoshi 0 zj 2021-01-02 9999-12-31
--创建数据库
create database db_zipper;
use db_zipper;
--创建拉链表
create table dw_zipper(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';
--加载模拟数据
load data local inpath '/opt/module/hive-3.1.2/datas/zipper.txt' into table dw_zipper;
select userid,nick,addr,starttime,endtime from dw_zipper;
--创建ods层增量表
create table ods_zipper_update(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';
--加载数据
load data local inpath '/opt/module/hive-3.1.2/datas/update.txt' into table ods_zipper_update;
--查询数据
select userid,nick,addr,starttime,endtime from ods_zipper_update;
create table tmp_zipper(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';
insert overwrite table tmp_zipper
select
userid,
phone,
nick,
gender,
addr,
starttime,
endtime
from ods_zipper_update
union all
--查询原来拉链表的所有数据,并将这次需要更新的数据的endTime更改为更新值的startTime
select
a.userid,
a.phone,
a.nick,
a.gender,
a.addr,
a.starttime,
--如果这条数据没有更新或者这条数据不是要更改的数据,就保留原来的值,否则就改为新数据的开始时间-1
if(b.userid is null or a.endtime < '9999-12-31', a.endtime , date_sub(b.starttime,1)) as endtime
from dw_zipper a left join ods_zipper_update b
on a.userid = b.userid ;
insert overwrite table dw_zipper
select * from tmp_zipper;
在传统的关系型数据库例如MySQL、Oracle等数据库中,为了提高数据的查询效率,可以为表中的字段单独构建索引,查询时,可以基于字段的索引快速的实现查询、过滤等操作。
Hive中也同样提供了索引的设计,允许用户为字段构建索引,提高数据的查询效率。但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。Hive索引可以建立在表中的某些列上,以提升一些操作的效率,例如减少MapReduce任务中需要读取的数据块的数量。
在可以预见到分区数据非常庞大的情况下,分桶和索引常常是优于分区的。而分桶由于SMB Join对关联键要求严格,所以并不是总能生效。
官方文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/AlterIndex
注意:官方明确表示,索引功能支持是从Hive0.7版本开始,到Hive3.0不再支持。
Hive中索引的基本原理:当为某张表的某个字段创建索引时,Hive中会自动创建一张索引表,该表记录了该字段的每个值与数据实际物理位置之间的关系,例如数据所在的HDFS文件地址,以及所在文件中偏移量offset等信息。
Hive的索引目的是提高Hive表指定列的查询速度。没有索引时,类似’WHERE tab1.col1 = 10’ 的查询,Hive会加载整张表或分区,然后处理所有的rows,但是如果在字段col1上面存在索引时,那么只会加载和处理文件的一部分。
下面我们来实现索引的构建,例如:
当前有一张分区表tb_login_part,默认查询数据时,是没有索引的,当查询登陆日期时可以通过分区过滤来提高效率,但是如果想按照用户ID进行查询,就无法使用分区进行过滤,只能全表扫描数据。

如果我们需要经常按照用户ID查询,那么性能就会相对较差,我们可以基于用户ID构建索引来加快查询效率。
可以使用Hive3.0以下版本测试
--为表中的userid构建索引
create index idx_user_id_login on table tb_login_part(userid)
--索引类型为Compact,Hive支持Compact和Bitmap类型,存储的索引内容不同
as 'COMPACT'
--延迟构建索引
with deferred rebuild;

刚创建完的索引表是没有数据的,需要生成索引数据)alter index idx_user_id_login ON tb_login_part rebuild;
通过运行一个MapReduce程序来构建索引
desc default__tb_login_part_idx_user_id_login__;
select * from default__tb_login_part_idx_user_id_login__;

索引中记录了每个用户ID对应的文件以及在文件中的位置

DROP INDEX idx_user_id_login ON tb_login_part;
问题:
Hive构建索引的过程是通过一个MapReduce程序来实现的,这就导致了Hive的一个问题,每次Hive中原始数据表的数据发生更新时,索引表不会自动更新,必须手动执行一个Alter index命令来实现通过MapReduce更新索引表,导致整体性能较差,维护相对繁琐。例如:


alter index idx_user_id_login ON tb_login_part rebuild;
由于Hive的索引设计过于繁琐,所以从Hive3.0版本开始,取消了对Hive Index的支持及使用,不过如果使用的是Hive1.x或者Hive2.x在特定的场景下依旧可以使用Hive Index来提高性能。
实际工作场景中,一般不推荐使用Hive Index,推荐使用ORC文件格式中的索引来代替Hive Index提高查询性能。