1 hadoop
1.1 mp
00.介绍
Map折分、混洗Shuffle、合约reduce
HDFS解决存储、MapReduce解决计算、YARN资源管理
01.hdfs
hdfs dfs -ls / --列出HDFS根目录下的所有文件和子目录
hdfs dfs -put /tmp/ /hdfs/ --和 copyFromLocal 区别是,put 原路径可以是文件夹等
hdfs dfs -copyFromLocal /local/data /hdfs/data --将本地文件上传到 hdfs上(原路径只能是一个文件)
02.hadoop
a.查看
hadoop fs -ls / --默认是HDFS系统下的根目录
hadoop fs -ls /package/test/ --HDFS系统下某个目录
hadoop fs -cat /package/test/ --HDFS系统下某个目录
b.上传文件、目录
a.put用法
a.上传新文件
hdfs fs -put file:/root/test.txt hdfs:/ --上传本地test.txt文件到HDFS根目录,HDFS根目录须无同名文件,否则“File exists”
hdfs fs -put test.txt /test2.txt --上传并重命名文件
hdfs fs -put test1.txt test2.txt hdfs:/ --一次上传多个文件到HDFS路径
b.上传文件夹
hdfs fs -put mypkg /newpkg --上传并重命名了文件夹
hdfs fs -put -f /root/test.txt / --如果HDFS目录中有同名文件会被覆盖
b.copyFromLocal用法
a.上传文件并重命名
hadoop fs -copyFromLocal file:/test.txt hdfs:/test2.txt
c.下载文件、目录
a.get用法:
hadoop fs -get hdfs:/test.txt file:/root/ --拷贝文件到本地目录
hadoop fs -get /test.txt /root/test.txt --拷贝文件并重命名
b.copyToLocal用法
hadoop fs -copyToLocal hdfs:/test.txt file:/root/ --拷贝文件到本地目录
hadoop fs -copyToLocal /test.txt /root/test.txt --拷贝文件并重命名
d.拷贝文件、目录
hadoop fs -cp file:/test.txt hdfs:/test2.txt --从本地到HDFS,同put
hadoop fs -cp hdfs:/test.txt hdfs:/test2.txt --从HDFS到HDFS
e.移动文件
hadoop fs -mv hdfs:/test.txt hdfs:/dir/test.txt
hadoop fs -mv /test.txt /dir/test.txt
f.删除文件、目录
hadoop fs -rm /a.txt --删除指定文件
hadoop fs -rm /*.txt --删除全部txt文件
hadoop fs -rm -r /user/TroyeKK --递归删除全部文件和目录
g.读取文件
hadoop fs -cat /test.txt #以字节码的形式读取
hadoop fs -tail /test.txt
h.创建空文件
hadoop fs - touchz /newfile.txt
i.创建文件夹
hadoop fs -mkdir /newdir /newdir2 --可以同时创建多个
hadoop fs -mkdir -p /newpkg/newpkg2/newpkg3 --同时创建父级目录
k.获取逻辑空间文件、目录大小(du)
hadoop fs - du / --显示HDFS根目录中各文件和文件夹大小
hadoop fs -du -h / --以最大单位显示HDFS根目录中各文件和文件夹大小
hadoop fs -du -s / --仅显示HDFS根目录大小。即各文件和文件夹大小之和
1.2 hive1
00.总结
a.常用1
create databases if not exists test;
show databases;
show tables;
use test;
desc stu;
-----------------------------------------------------------------------------------------------------
create table if not exists employee (
Name String comment ‘Employee Name’,
Id int,
MobileNumber String,
Salary Float
);
-----------------------------------------------------------------------------------------------------
DESCRIBE FORMATTED student_hive;
show create table internal_student;
-----------------------------------------------------------------------------------------------------
truncate table student_hive; --外部表不支持使用TRUNCATE语句
drop table student_hive;
b.常用2
load data inpath '/user.txt' into table t_user;
load data local inpath '/home/hadoop/user.txt' into table t_user;
-----------------------------------------------------------------------------------------------------
默认不开启事务(不支持update和delete)
内部表开启事务(支持update和delete)
外部表不支持事务(不支持支持update和delete)
hive操作hbase(因HBase不支持Hive的ACID特性,因此不能通过Hive对HBase表进行事务管理)
-----------------------------------------------------------------------------------------------------
SELECT * FROM OMS_DATACENTER.SG_CON_PWRGRID_F_BALANCE_RESULT_WEEK;
INSERT INTO MW_APP.MWT_UD_ZJCNXM2222 (OBJ_ID,OBJ_DISPIDX,sfysb,sf,bwdydj,zt,rq) VALUES( MW_SYS.NEWGUID(),'0','未上报','1111','10kV','111','2023-10-01');
UPDATE MW_APP.MWT_UD_TXFXYJDGL SET LCZT='已发送' WHERE OBJ_ID = '408DE45D-D5DA-4EDE-9E22-FD35BCF2DA9D-00016';
DELETE FROM MW_APP.MWT_UD_ZYCNXM WHERE OBJ_ID = '467DBE40-B8B8-43D3-8A67-5E6764CF6D3D-00001';
-----------------------------------------------------------------------------------------------------
TRUNCATE TABLE student_hive;
DROP TABLE student_hive;
-----------------------------------------------------------------------------------------------------
HiveJoin 分三种:inner join, outer join, semi join,其中outer join 包括 left join,right join 和 full outer join
c.视图
CREATE VIEW emp_40000 AS SELECT * FROM employee WHERE salary>40000
DROP VIEW view_name
d.UDF函数
UDF 分为两大类:UDAF(用户自定义聚合函数)和 UDTF(用户自定义表生成函数)
e.复杂类型
a.复杂类型:STRUCT
CREATE TABLE users (
id INT,
info STRUCT<name: STRING, age: INT, address: STRING>
)
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
b.复杂类型:ARRAY
CREATE TABLE students (
student_id INT,
courses ARRAY<STRING>
)
CLUSTERED BY (student_id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
c.复杂类型:MAP
CREATE TABLE employees (
emp_id INT,
contacts MAP<STRING, STRING>
)
CLUSTERED BY (emp_id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
d.复杂类型:UINON
CREATE TABLE mixed_data (
id INT,
mixed_value UNIONTYPE<INT, STRING, DOUBLE>
)
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
f.语法:insert overwrite
a.重写表数据
用于清除并替换现有的数据。
b.重写分区数据:可以用 INSERT OVERWRITE 对特定分区进行覆盖。
-- 重写表中 department='Sales' 分区的数据
INSERT OVERWRITE TABLE employee_data PARTITION (department='Sales')
SELECT emp_id, name FROM employee_data WHERE department='Sales';
c.导出到外部文件:将查询结果直接导出到指定的 HDFS 目录中。
-- 导出查询结果到指定的 HDFS 目录
INSERT OVERWRITE DIRECTORY '/user/hive/warehouse/employee_output'
SELECT * FROM employee_data;
g.语法:merge into
a.介绍
MERGE INTO target_table AS target
USING source_table AS source
ON (condition)
WHEN MATCHED THEN
UPDATE SET target.column1 = source.column1, target.column2 = source.column2
WHEN NOT MATCHED THEN
INSERT (column1, column2) VALUES (source.column1, source.column2);
b.示例
MERGE INTO employee_target AS target
USING employee_source AS source
ON target.emp_id = source.emp_id
WHEN MATCHED THEN
UPDATE SET target.name = source.name,target.department = source.department
WHEN NOT MATCHED THEN
INSERT (emp_id, name, department) VALUES (source.emp_id, source.name, source.department);
h.SELECT语法
a.语法
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list]
[ORDER BY col_list]
[CLUSTER BY col_list | [DISTRIBUTE BY col_list] [SORT BY col_list] ]
[LIMIT number]
b.Map阶段
1.执行from加载,进行表的查找与加载
2.执行where过滤,进行条件过滤与筛选
3.执行select查询:进行输出项的筛选
4.执行group by分组:描述了分组后需要计算的函数
5.map端文件合并:map端本地溢出写文件的合并操作,每个map最终形成一个临时文件。然后按列映射到对应的Reduce阶段:
c.Reduce阶段
1.group by:对map端发送过来的数据进行分组并进行计算。
2.select:最后过滤列用于输出结果
3.limit排序后进行结果输出到HDFS文件
i.排序
a.ORDER BY 全局排序
对全局数据进行排序,通常只使用一个 reducer。
b.SORT BY 每个Reduce内部排序
对每个 reducer 的数据进行排序,不保证全局排序。
c.DISTRIBUTE BY 分区
根据指定字段将数据分配到不同的 reducers,不进行排序。
d.CLUSTER BY 分区排序
结合了 DISTRIBUTE BY 和 SORT BY,将数据分配到 reducers 中,并在每个 reducer 内部进行排序。
j.窗口函数
a.OVER 子句
SELECT
emp_id,
amount,
SUM(amount) OVER (PARTITION BY department ORDER BY amount) AS running_total
FROM sales;
b.序列函数
SELECT
emp_id,
name,
department,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY amount DESC) AS row_num
FROM sales;
c.窗口函数示例
SELECT
emp_id,
amount,
SUM(amount) OVER (PARTITION BY department ORDER BY amount) AS running_total
FROM sales;
k.文件格式和压缩
a.表的压缩设置
CREATE TABLE my_table (
id INT,
name STRING
)
STORED AS TEXTFILE
TBLPROPERTIES ("compression"="gzip");
-------------------------------------------------------------------------------------------------
修改表的压缩设置:
ALTER TABLE my_table SET TBLPROPERTIES ('compression'='snappy');
b.计算过程中的压缩
计算过程中可以通过以下设置来启用压缩:
MapReduce任务压缩设置: 修改Hive配置文件(如hive-site.xml)或在Hive查询中设置:
-------------------------------------------------------------------------------------------------
SET mapreduce.output.fileoutputformat.compress=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
c.使用Hive的表属性进行压缩
SET hive.exec.compress.output=true;
SET hive.exec.compress.intermediate=true;
SET hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
这些设置将确保在Hive的MapReduce任务中启用压缩,从而减少中间数据和最终结果的数据量。
d.使用Hive的表属性
为了使得数据存储在表中时进行压缩,可以使用表属性:
ALTER TABLE my_table SET FILEFORMAT ORC;
ORC(Optimized Row Columnar)格式支持高效的压缩和读取性能。
l.HQL优化
列裁剪:只选择需要的列以减少数据扫描量。
分区裁剪:在查询中利用分区信息来减少扫描的数据量。
合理设置 Reduce 的数量:调整 Reducer 数量以提高资源利用率。
Job 并行运行设置:允许多个作业并行运行以提高效率。
小文件优化:合并小文件减少 NameNode 负担。
JOIN 操作优化:选择合适的 JOIN 类型和顺序以提高性能。
SMBJoin:使用桶表进行高效的 JOIN 操作。
数据倾斜优化:解决数据不均匀分布问题,提高查询性能。
01.常用1
a.数据库
create database if not exists test; --数据库创建
show databases; --数据库列表
use test; --数据库使用
desc database extended myhive; --显示数据库的详细属性信息
select current_database(); --查看正在使用哪个库
show create database mydb; --查看创建库的详细语句
b.表
a.创建
create table if not exists employee ( --创建新表
Name String comment ‘Employee Name’,
Id int,
MobileNumber String,
Salary Float
);
b.查看
show tables; --查看当前库的所有表
desc stu; --查看表名
show create table internal_student;
DESCRIBE FORMATTED student_hive;
c.语法
ALTER TABLE TableName RENAME TO new_name
ALTER TABLE TableName ADD COLUMNS (col_spec[, col_spec ...])
ALTER TABLE TableName DROP [COLUMN] column_name
ALTER TABLE TableName CHANGE column_name new_name new_type
ALTER TABLE TableName REPLACE COLUMNS (col_spec[, col_spec ...])
d.修改表名,修改列名
ALTER TABLE employee RENAME TO demo1;
e.修改表属性
alter table table_name set TBLPROPERTIES ('comment' = 'my new students table');
f.修改SerDe信息
alter table student set SERDEPROPERTIES ('field.delim' = '-'); --
g.增加/删除/改变/替换列
alter table student add columns (course string);alter table student change column id ids int;alter tsble student replace columns (id int,name string,address string);
h.增加/修改/删除分区
a.添加分区,不指定分区路径,默认路径
ALTER TABLE student ADD partition(part='a') partition(part='b');
b.添加分区,指定分区路径
ALTER TABLE student ADD IF NOT EXISTS partition(part='bb') location '/myhive_bb' partition(part='cc') location '/myhive_cc';
c.修改分区路径示例
ALTER TABLE student partition (part='bb') SET location '/myhive_bbbbb';
d.删除分区示例
ALTER TABLE student DROP if exists partition(part='aa');ALTER TABLE student DROP if exists partition(part='aa') if exists partition(part='bb');
e.注意
最后补充:
1.防止分区被删除:alter table student_p partition (part='aa') enable no_drop;
2.防止分区被查询:alter table student_p partition (part='aa') enable offline; enable 和 disable 是反向操作
c.删除
a.语法1
drop table student_hive;
-------------------------------------------------------------------------------------------------
drop database if exists 库名; --删除库
drop table if exists 表名; --删除表
-------------------------------------------------------------------------------------------------
外部表不支持使用TRUNCATE语句
truncate table 表名; --清空表
truncate table 表名 partition(city='beijing'); --清空表(某个分区)
b.语法2
默认情况下,hive 不允许删除包含表的数据库,有两种解决办法:
1.手动删除库下所有表,然后删除库
2.使用 cascade 关键字,drop database if exists dbname cascade; --默认restrict
-------------------------------------------------------------------------------------------------
drop database if exists myhive ==== drop database if exists myhive restrict --默认restrict
d.其他辅助命令
a.查看数据库列表
show databases;
show databases like 'my*';
b.查看数据表
show tables;
show tables in myhive;
c.查看数据表的建表语句
show create table student;
d.查看hive函数列表
show funcations;
e.查看hive表分区
show partitions student;
show partitions student partition(city='shanghai');
f.查看表的详细信息(元数据信息)
desc student;
desc extended student;
desc formatted student;
g.查看数据库的详细属性
desc database student;
desc database extended student;
h.清空数据表
truncate table student;
02.常用2
a.临时生效,退出hive后失效
set hive.cli.print.current.db=true; --显示数据库名,该参数默认值为false
set hive.cli.print.header=true; --显示列名,该设置会显示表名.列名,默认值为false
set hive.resultset.use.unique.column.names=false; --不显示表名,默认值为true
b.load加载数据
load data inpath '/user.txt' into table t_user; --导入HDFS数据
load data local inpath '/home/hadoop/user.txt' into table t_user; --导入本地数据
c.DQL
Select 查询数据Hive 中的 SELECT 基础语法和标准 SQL 语法基本一致,支持 WHERE、DISTINCT、GROUP BY、ORDER BY、HAVING、LIMIT、子查询等;
select * from db.table1
select count(distinct uid) from db.table1
支持 select、union all、join(left、right、full join)、like、where、having、各种聚合函数、支持 json 解析
UDF(User Defined Function)/ UDAF/UDTF
-----------------------------------------------------------------------------------------------------
hive虽然支持 in/exists(老版本是不支持的),但是 hive 推荐使用 semi join 的方式来代替实现,而且效率更高。
支持 case … when …
-----------------------------------------------------------------------------------------------------
默认不开启事务(不支持update和delete)
内部表开启事务(支持update和delete)
外部表不支持事务(不支持支持update和delete)
hive操作hbase(因HBase不支持Hive的ACID特性,因此不能通过Hive对HBase表进行事务管理)
d.DML
SELECT * FROM OMS_DATACENTER.SG_CON_PWRGRID_F_BALANCE_RESULT_WEEK;
-----------------------------------------------------------------------------------------------------
INSERT INTO MW_APP.MWT_UD_ZJCNXM2222 (OBJ_ID,OBJ_DISPIDX,sfysb,sf,bwdydj,zt,rq) VALUES( MW_SYS.NEWGUID(),'0','未上报','1111','10kV','111','2023-10-01');
INSERT INTO OMS_DATACENTER.SG_CON_PWRGRID_F_BALANCE_RESULT_WEEK (ID, DATASOURCE_ID, DATA_DATE, UPDATE_DATE) VALUES (SUBSTR(SYS_GUID(),0,18), '0021140000', TO_DATE('2022-06-01', 'yyyy-MM-dd'), NOW());
-----------------------------------------------------------------------------------------------------
UPDATE MW_APP.MWT_UD_TXJXP SET STATE='待办';
UPDATE MW_APP.MWT_UD_TXFXYJDGL SET LCZT='已发送' WHERE OBJ_ID = '408DE45D-D5DA-4EDE-9E22-FD35BCF2DA9D-00016';
UPDATE MW_APP.NR_OMS_ZPTH_JHCTB_T2 SET WKV = '1',EKV = '2',YKV = '3',SKV = '4',TKV = '5' WHERE RQ = '2024-05-13' AND DQ = '太原';
-----------------------------------------------------------------------------------------------------
DELETE FROM MW_APP.MWT_UD_ZYCNXM WHERE OBJ_ID = '467DBE40-B8B8-43D3-8A67-5E6764CF6D3D-00001';
TRUNCATE TABLE MW_APP.MWT_UD_TXJXSB;
DROP TABLE OMS_DATACENTER.DSS_JZZT_20240219;
e.Join查询
a.介绍
Hive 支持等值连接(equality join)、外连接(outer join)和(left/right join)。Hive 不支持非等值的连接,
因为非等值连接非常难转化到 map/reduce 任务。另外,Hive 支持多于 2 个表的连接。
b.只支持等值链接,支持 and,不支持 or
例如:SELECT a.* FROM a JOIN b ON (a.id = b.id)
SELECT a.* FROM a JOIN b ON (a.id = b.id AND a.department = b.department)是正确的;
然而:SELECT a.* FROM a JOIN b ON (a.id>b.id)是错误的。
c.可以 join 多于 2 个表
例如:SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
如果 join 中多个表的 join key 是同一个,则 join 会被转化为单个 map/reduce 任务,
例如:SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)被转化为单个 map/reduce 任务,因为 join 中只使用了 b.key1 作为 join key。
例如:SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2),而这一 join 被转化为 2 个 map/reduce 任务。因为 b.key1 用于第一次 join 条件,而b.key2 用于第二次 join。
d.Join 时,每次 map/reduce 任务的逻辑
reducer 会缓存 join 序列中除了最后一个表的所有表的记录,再通过最后一个表将结果序列化到文件系统。
这一实现有助于在 reduce 端减少内存的使用量。实践中,应该把最大的那个表写在最后(否则会因为缓存浪费大量内存)。
例如:SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)所有表都使用同一个 join key(使用 1 次 map/reduce 任务计算)。
-------------------------------------------------------------------------------------------------
Reduce 端会缓存 a 表和 b 表的记录,然后每次取得一个 c 表的记录就计算一次 join 结果,
类似的还有:SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
这里用了 2 次 map/reduce 任务:第一次缓存 a 表,用 b 表序列化;
第二次缓存第一次 map/reduce 任务的结果,然后用 c 表序列化。
e.HiveJoin 分三种:inner join, outer join, semi join
其中:outer join 包括 left join,right join 和 full outer join,主要用来处理 join 中空记录的情况
03.视图
a.语法
CREATE VIEW [IF NOT EXISTS] view_name [(column_name [COMMENT column_comment], …) ]
[COMMENT table_comment]
AS SELECT …
b.示例
CREATE VIEW emp_40000 AS SELECT * FROM employee WHERE salary>40000
c.删除
DROP VIEW view_name
04.UDF函数
a.介绍
UDF 分为两大类:UDAF(用户自定义聚合函数)和 UDTF(用户自定义表生成函数)
b.接口
Hive 有两个不同的接口编写 UDF 程序。一个是基础的 UDF 接口,一个是复杂的 GenericUDF 接口。
-----------------------------------------------------------------------------------------------------
org.apache.hadoop.hive.ql. exec.UDF
基础 UDF 的函数读取和返回基本类型,即 Hadoop 和 Hive 的基本类型。如 Text、IntWritable、LongWritable、DoubleWritable 等。
-----------------------------------------------------------------------------------------------------
org.apache.hadoop.hive.ql.udf.generic.GenericUDF
复杂的 GenericUDF 可以处理 Map、List、Set 类型。
@Describtion 注解是可选的,用于对函数进行说明,其中的 FUNC 字符串表示函数名,当使用 DESCRIBE FUNCTION 命令时,替换成函数名。
@Describtion 包含三个属性:
name:用于指定Hive中的函数名。
value:用于描述函数的参数。
extended:额外的说明
05.复杂类型
a.复杂类型:STRUCT
a.介绍
在 Hive 中,STRUCT 是一种复杂类型,允许将多个字段组合在一起,形成一个类似于 C 语言中的结构体或 JSON 对象
的数据类型。STRUCT 可以包含不同的数据类型,如字符串、整数、浮点数等。使用 STRUCT 类型可以方便地存储和访
问多字段的数据
b.创建包含 STRUCT 的表
我们创建一个包含 STRUCT 类型的表 users,其中 info 字段使用 STRUCT 来存储用户的姓名、年龄和地址。
-------------------------------------------------------------------------------------------------
CREATE TABLE users (
id INT,
info STRUCT<name: STRING, age: INT, address: STRING>
)
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
c.插入数据
INSERT INTO users VALUES (1, named_struct('name', '张三', 'age', 25, 'address', '北京市')), (2, named_struct('name', '李四', 'age', 30, 'address', '上海市'));
d.查询数据
SELECT * FROM users;
SELECT info.name, info.address FROM users;
e.修改数据
UPDATE users SET info = named_struct('name', info.name, 'age', info.age, 'address', '广州市') WHERE id = 1;
f.删除数据
DELETE FROM users WHERE id = 1;
g.清空表
DROP TABLE users;
b.复杂类型:ARRAY
a.介绍
在 Hive 中,ARRAY 是一种复杂类型,用于存储多个同类型的元素。可以使用 ARRAY 来表示列表或集合类型的数据,
这在处理结构化且重复的数据时非常有用。以下是如何在 Hive 中使用 ARRAY 类型的详细示例,包括如何创建带
事务的表,以及插入、删除、更新和查询操作。
b.创建带事务的 ARRAY 类型表
-- 创建一个带事务支持的表,使用 ORC 格式,并分桶
CREATE TABLE students (
student_id INT,
courses ARRAY<STRING>
)
CLUSTERED BY (student_id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
c.插入数据
使用 ARRAY 类型插入数据时,可以通过 array() 函数来构建数组。
-------------------------------------------------------------------------------------------------
-- 插入数据,使用 array() 函数构造课程列表
INSERT INTO students VALUES
(1, array('Math', 'Science', 'English')),
(2, array('History', 'Math', 'Physics')),
(3, array('Biology', 'Chemistry'));
d.查询数据
查询 ARRAY 类型的数据可以通过索引访问元素,索引从 0 开始。
-------------------------------------------------------------------------------------------------
-- 查询所有数据
SELECT * FROM students;
-------------------------------------------------------------------------------------------------
-- 查询 student_id 为 1 的学生的第一门课程
SELECT courses[0] FROM students WHERE student_id = 1;
-------------------------------------------------------------------------------------------------
-- 查询所有学生的课程列表长度
SELECT student_id, size(courses) AS course_count FROM students;
e.更新数据
更新 ARRAY 类型的字段时,可以用 array() 函数重建数组,或者使用 CONCAT、ARRAY_REMOVE 等函数进行修改。
-- 更新 student_id 为 1 的学生的课程列表,增加一门新课程
UPDATE students SET courses = array('Math', 'Science', 'English', 'Art') WHERE student_id = 1;
-------------------------------------------------------------------------------------------------
-- 另一种更新方式,使用 CONCAT 函数添加一门课程
UPDATE students SET courses = CONCAT(courses, array('Art')) WHERE student_id = 2;
f.删除数据
-- 删除 student_id 为 3 的学生
DELETE FROM students WHERE student_id = 3;
-- 查看所有数据,验证删除和更新结果
SELECT * FROM students;
c.复杂类型:MAP
a.介绍
在 Hive 中,MAP 是一种复杂类型,用于存储键值对(key-value pairs)。MAP 可以用来表示字典或哈希表等结构,
适合用于存储一组具有唯一键的值。以下是关于 MAP 类型的详细示例,包括如何创建带事务的表,以及增、删、改、
查的操作示例。
b.创建带事务的 MAP 类型表
-- 创建一个带事务支持的表,使用 ORC 格式,并分桶
CREATE TABLE employees (
emp_id INT,
contacts MAP<STRING, STRING>
)
CLUSTERED BY (emp_id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
c.插入数据
-- 插入数据,使用 map() 函数构建联系人信息
INSERT INTO employees VALUES
(1, map('email', 'john.doe@example.com', 'phone', '123-456-7890')),
(2, map('email', 'jane.smith@example.com', 'phone', '098-765-4321', 'skype', 'janesmith')),
(3, map('phone', '555-555-5555', 'slack', '@john'));
d.查询数据
可以通过键来访问 MAP 中的值,键是大小写敏感的。
-------------------------------------------------------------------------------------------------
-- 查询所有数据
SELECT * FROM employees;
-------------------------------------------------------------------------------------------------
-- 查询 emp_id 为 1 的员工的 email
SELECT contacts['email'] FROM employees WHERE emp_id = 1;
-------------------------------------------------------------------------------------------------
-- 查询所有员工的电话号码
SELECT emp_id, contacts['phone'] AS phone_number FROM employees;
-------------------------------------------------------------------------------------------------
-- 查询所有包含 Skype 联系方式的员工
SELECT emp_id FROM employees WHERE contacts['skype'] IS NOT NULL;
e.更新数据
更新 MAP 类型的字段时,可以使用 map() 函数重新构建,或使用 MAP_KEYS 和 MAP_VALUES 函数进行操作。
-------------------------------------------------------------------------------------------------
-- 更新 emp_id 为 1 的员工,添加 Skype 联系方式
UPDATE employees SET contacts = map('email', contacts['email'], 'phone', contacts['phone'], 'skype', 'johndoe123') WHERE emp_id = 1;
-------------------------------------------------------------------------------------------------
-- 更新 emp_id 为 2 的员工,修改电话号码
UPDATE employees SET contacts = map('email', contacts['email'], 'phone', '111-222-3333', 'skype', contacts['skype']) WHERE emp_id = 2;
f.删除数据
删除记录时,可以根据条件删除某一行。
-------------------------------------------------------------------------------------------------
-- 删除 emp_id 为 3 的员工
DELETE FROM employees WHERE emp_id = 3;
-- 查看所有数据,验证删除和更新结果
SELECT * FROM employees;
d.复杂类型:UINON
a.介绍
在 Hive 中,UNION 复杂类型(也称为 UNIONTYPE)允许一个字段存储不同的数据类型。
这种类型类似于其他编程语言中的联合体(union),但在 Hive 中应用较少,且操作略显复杂。
使用 UNIONTYPE 可以存储多种不同的数据类型,并通过指定的标签(Tag)区分存储的数据类型。
b.创建带事务的 UNIONTYPE 类型表
-- 创建一个带事务支持的表,使用 ORC 格式,并分桶
CREATE TABLE mixed_data (
id INT,
mixed_value UNIONTYPE<INT, STRING, DOUBLE>
)
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
c.插入数据
在插入数据时,可以通过 create_union() 函数指定要插入的类型和对应的值。create_union(tag, value) 函数的 tag 是从 0 开始的索引,用于指示使用哪个类型。
-------------------------------------------------------------------------------------------------
-- 插入数据,使用 create_union() 函数构造 UNIONTYPE
INSERT INTO mixed_data VALUES
(1, create_union(0, 123)), -- 插入整数
(2, create_union(1, 'Hello World')), -- 插入字符串
(3, create_union(2, 45.67)); -- 插入双精度浮点数
d.查询数据
查询 UNIONTYPE 类型的数据时,可以使用 CASE 语句配合 TAG() 和 FIELD() 函数获取特定类型的值。
-------------------------------------------------------------------------------------------------
-- 查询所有数据
SELECT * FROM mixed_data;
-------------------------------------------------------------------------------------------------
-- 查询并显示 mixed_value 的具体值,根据类型判断
SELECT
id,
CASE TAG(mixed_value)
WHEN 0 THEN FIELD(mixed_value, 0) -- 返回 INT 类型
WHEN 1 THEN FIELD(mixed_value, 1) -- 返回 STRING 类型
WHEN 2 THEN FIELD(mixed_value, 2) -- 返回 DOUBLE 类型
END AS actual_value
FROM mixed_data;
e.更新数据
更新 UNIONTYPE 类型的字段时,需要使用 create_union() 重新构建数据。
-------------------------------------------------------------------------------------------------
-- 更新 id 为 1 的记录,将 mixed_value 更新为字符串
UPDATE mixed_data SET mixed_value = create_union(1, 'Updated String') WHERE id = 1;
-------------------------------------------------------------------------------------------------
-- 更新 id 为 2 的记录,将 mixed_value 更新为双精度浮点数
UPDATE mixed_data SET mixed_value = create_union(2, 99.99) WHERE id = 2;
f.删除数据
-- 删除 id 为 3 的记录
DELETE FROM mixed_data WHERE id = 3;
-------------------------------------------------------------------------------------------------
-- 查看所有数据,验证删除和更新结果
SELECT
id,
CASE TAG(mixed_value)
WHEN 0 THEN FIELD(mixed_value, 0)
WHEN 1 THEN FIELD(mixed_value, 1)
WHEN 2 THEN FIELD(mixed_value, 2)
END AS actual_value
FROM mixed_data;
06.语法:insert overwrite
a.介绍
INSERT OVERWRITE 是 Hive 中用于重写目标表或分区数据的一个常用操作。与 INSERT INTO 不同,INSERT OVERWRITE
会删除目标位置现有的数据,并用新数据替代。这个操作特别适用于重新计算或更新整个表或分区的数据。
b.示例
a.创建示例表
-- 创建一个普通的员工信息表,不需要事务支持
CREATE TABLE employee_data (
emp_id INT,
name STRING,
department STRING
)
STORED AS ORC;
b.插入初始数据
-- 插入初始数据
INSERT INTO employee_data VALUES
(1, 'John Doe', 'Sales'),
(2, 'Jane Smith', 'Marketing'),
(3, 'Sam Brown', 'Engineering');
c.使用 INSERT OVERWRITE 操作
使用 INSERT OVERWRITE 语句重写表中的数据。假设我们要更新员工信息,可能由于业务变更,重新调整了部门信息或其他数据。
-------------------------------------------------------------------------------------------------
-- 使用 INSERT OVERWRITE 重写整个表的数据
INSERT OVERWRITE TABLE employee_data
SELECT
emp_id,
name,
CASE
WHEN department = 'Sales' THEN 'Business Development'
WHEN department = 'Marketing' THEN 'Public Relations'
ELSE department
END AS department
FROM employee_data;
-------------------------------------------------------------------------------------------------
这个操作将:将原有数据中的部门信息更新,例如将 Sales 更新为 Business Development。替换掉原表中的数据。
d.查询结果
-- 查询表数据,验证 INSERT OVERWRITE 的结果
SELECT * FROM employee_data;
-------------------------------------------------------------------------------------------------
+--------+-----------+-----------------------+
| emp_id | name | department |
+--------+-----------+-----------------------+
| 1 | John Doe | Business Development |
| 2 | Jane Smith| Public Relations |
| 3 | Sam Brown | Engineering |
+--------+-----------+-----------------------+
c.场景
a.重写表数据
用于清除并替换现有的数据。
b.重写分区数据:可以用 INSERT OVERWRITE 对特定分区进行覆盖。
-- 重写表中 department='Sales' 分区的数据
INSERT OVERWRITE TABLE employee_data PARTITION (department='Sales')
SELECT emp_id, name FROM employee_data WHERE department='Sales';
c.导出到外部文件:将查询结果直接导出到指定的 HDFS 目录中。
-- 导出查询结果到指定的 HDFS 目录
INSERT OVERWRITE DIRECTORY '/user/hive/warehouse/employee_output'
SELECT * FROM employee_data;
07.语法:merge into
a.介绍
Hive在2.2版本之后开始支持Merge操作,并且Merge只能在支持ACID的表上执行
-----------------------------------------------------------------------------------------------------
MERGE INTO target_table AS target
USING source_table AS source
ON (condition)
WHEN MATCHED THEN
UPDATE SET target.column1 = source.column1, target.column2 = source.column2
WHEN NOT MATCHED THEN
INSERT (column1, column2) VALUES (source.column1, source.column2);
b.创建事务表
-- 创建目标表,启用事务
CREATE TABLE employee_target (
emp_id INT,
name STRING,
department STRING
)
CLUSTERED BY (emp_id) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
-----------------------------------------------------------------------------------------------------
-- 创建源表,源表通常不需要事务支持
CREATE TABLE employee_source (
emp_id INT,
name STRING,
department STRING
)
STORED AS ORC;
c.插入初始数据
-- 插入数据到目标表
INSERT INTO employee_target VALUES
(1, 'John Doe', 'Sales'),
(2, 'Jane Smith', 'Marketing');
-----------------------------------------------------------------------------------------------------
-- 插入数据到源表
INSERT INTO employee_source VALUES
(1, 'John Doe', 'Business Development'), -- 更新部门信息
(3, 'Sam Brown', 'Engineering'); -- 新员工
d.使用 MERGE INTO 语法
MERGE INTO employee_target AS target
USING employee_source AS source
ON target.emp_id = source.emp_id
WHEN MATCHED THEN
UPDATE SET target.name = source.name,target.department = source.department
WHEN NOT MATCHED THEN
INSERT (emp_id, name, department) VALUES (source.emp_id, source.name, source.department);
e.查询结果
-- 查询目标表
SELECT * FROM employee_target;
-----------------------------------------------------------------------------------------------------
+--------+-----------+------------------------+
| emp_id | name | department |
+--------+-----------+------------------------+
| 1 | John Doe | Business Development |
| 2 | Jane Smith| Marketing |
| 3 | Sam Brown | Engineering |
+--------+-----------+------------------------+
08.SELECT语法
a.语法
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list]
[ORDER BY col_list]
[CLUSTER BY col_list | [DISTRIBUTE BY col_list] [SORT BY col_list] ]
[LIMIT number]
b.Map阶段
1.执行from加载,进行表的查找与加载
2.执行where过滤,进行条件过滤与筛选
3.执行select查询:进行输出项的筛选
4.执行group by分组:描述了分组后需要计算的函数
5.map端文件合并:map端本地溢出写文件的合并操作,每个map最终形成一个临时文件。然后按列映射到对应的Reduce阶段:
c.Reduce阶段
1.group by:对map端发送过来的数据进行分组并进行计算。
2.select:最后过滤列用于输出结果
3.limit排序后进行结果输出到HDFS文件
09.排序
a.ORDER BY
a.介绍
ORDER BY 用于对全局数据进行排序。这意味着所有数据在被输出之前都必须在一个 reducer 中进行排序,
因此通常会导致只有一个 reducer 被使用。此操作通常是资源密集型的,因为它需要将所有数据聚集到一
个节点上进行排序。
b.示例
-- 全局排序,所有数据都将被汇总到一个 reducer 中
SELECT * FROM sales
ORDER BY amount;
-------------------------------------------------------------------------------------------------
注意:在 Hive 的 hive.mapred.mode 设置为 strict 时,ORDER BY 语句必须与 LIMIT 一起使用,否则会报错。
-- 在 strict 模式下,必须使用 LIMIT
SELECT * FROM sales
ORDER BY amount
LIMIT 10;
b.SORT BY
a.介绍
SORT BY 用于对每个 reducer 的数据进行排序,而不是对全局数据排序。每个 reducer 将对它自己的数据进行排序,
因此如果使用多个 reducers,排序只会在每个 reducer 的输出中进行,而不是全局排序。
b.示例
-- 每个 reducer 中的数据会被排序,但不会保证全局排序
SELECT * FROM sales
SORT BY amount;
c.DISTRIBUTE BY
a.介绍
DISTRIBUTE BY 用于将数据分配到不同的 reducers 中。
与 CLUSTER BY 类似,它将数据根据指定的列分配到不同的 reducers,
但是 DISTRIBUTE BY 不会对数据进行排序,只会将数据分配到不同的 reducers。
b.示例
-- 根据指定的字段将数据分配到不同的 reducers 中
SELECT * FROM sales
DISTRIBUTE BY department;
在这个示例中,所有拥有相同 department 值的记录将会被分配到同一个 reducer 中。
d.CLUSTER BY
a.介绍
CLUSTER BY 结合了 DISTRIBUTE BY 和 SORT BY。首先,它根据指定的列将数据分配到不同的 reducers 中,
然后在每个 reducer 内对数据进行排序。请注意,CLUSTER BY 不能指定排序顺序(ASC 或 DESC),默认是升序(ASC)。
b.示例
-- 根据指定的字段将数据分配到不同的 reducers,并在每个 reducer 中排序
SELECT * FROM sales
CLUSTER BY department;
在这个示例中,所有具有相同 department 值的记录会被分配到同一个 reducer 中,并且在 reducer 内部数据会按升序排序。
e.总结
ORDER BY 全局排序:对全局数据进行排序,通常只使用一个 reducer。
SORT BY 每个Reduce内部排序:对每个 reducer 的数据进行排序,不保证全局排序。
DISTRIBUTE BY 分区:根据指定字段将数据分配到不同的 reducers,不进行排序。
CLUSTER BY 分区排序:结合了 DISTRIBUTE BY 和 SORT BY,将数据分配到 reducers 中,并在每个 reducer 内部进行排序。
10.窗口函数
a.OVER 子句
a.介绍
OVER 子句是窗口函数的核心部分,它定义了窗口函数的操作范围(即“窗口”)。
你可以在 OVER 子句中指定窗口的分区和排序规则。
b.示例
SELECT
emp_id,
amount,
SUM(amount) OVER (PARTITION BY department ORDER BY amount) AS running_total
FROM sales;
在这个示例中,SUM(amount) 是一个窗口函数,PARTITION BY department 指定了分区,ORDER BY amount 指定了排序。
b.序列函数
a.介绍
序列函数是窗口函数的一类,用于生成序列或排名。最常用的序列函数包括 ROW_NUMBER()、RANK() 和 DENSE_RANK()。
b.ROW_NUMBER()
ROW_NUMBER() 函数为每一行分配一个唯一的序列号,基于窗口函数中的排序规则。每个分区内的行号从 1 开始递增。
-------------------------------------------------------------------------------------------------
SELECT
emp_id,
name,
department,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY amount DESC) AS row_num
FROM sales;
在这个示例中,ROW_NUMBER() 为每个部门的员工按 amount 降序排列,并为每个部门内的员工分配一个唯一的行号。
c.比较 ROW_NUMBER()、RANK() 和 DENSE_RANK()
ROW_NUMBER():为每行分配一个唯一的顺序号。
RANK():为每行分配排名,存在并列时排名相同,但排名之后的排名会跳过。
DENSE_RANK():为每行分配排名,存在并列时排名相同,但排名之后的排名不会跳过。
-------------------------------------------------------------------------------------------------
SELECT
emp_id,
name,
department,
amount,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY amount DESC) AS row_num,
RANK() OVER (PARTITION BY department ORDER BY amount DESC) AS rank,
DENSE_RANK() OVER (PARTITION BY department ORDER BY amount DESC) AS dense_rank
FROM sales;
在这个示例中,ROW_NUMBER()、RANK() 和 DENSE_RANK() 对于每个部门的员工按 amount 降序排序进行排名。
c.窗口函数示例
a.计算运行总和
SELECT
emp_id,
amount,
SUM(amount) OVER (PARTITION BY department ORDER BY amount) AS running_total
FROM sales;
在这个示例中,SUM(amount) OVER (PARTITION BY department ORDER BY amount) 计算了在每个部门内,按 amount 排序后的累计总和。
b.计算移动平均
SELECT
emp_id,
amount,
AVG(amount) OVER (PARTITION BY department ORDER BY amount ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS moving_avg
FROM sales;
在这个示例中,AVG(amount) OVER (PARTITION BY department ORDER BY amount ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) 计算了当前行及前两行的平均值,即移动平均。
c.计算累积百分比
SELECT
emp_id,
amount,
PERCENT_RANK() OVER (PARTITION BY department ORDER BY amount) AS pct_rank
FROM sales;
在这个示例中,PERCENT_RANK() 计算了每个员工在其部门中的相对排名百分比。
11.文件格式和压缩
a.文件格式
a.Text File
文本文件是Hive默认使用的文件格式,文本文件中的一行内容,就对应Hive表中的一行记录。
create table textfile_table
(column_specs)
stored as textfile;
b.ORC
ORC(Optimized Row Columnar)file format是Hive 0.11版里引入的一种列式存储的文件格式。
ORC文件能够提高Hive读写数据和处理数据的性能。
(1)行存储的特点
查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。
(2)列存储的特点
因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。
前文提到的text file和sequence file都是基于行存储的,orc和parquet是基于列式存储的。
c.Parquet
Parquet文件是Hadoop生态中的一个通用的文件格式,它也是一个列式存储的文件格式。
每个Row Group包含多个Column Chunk,每个Column Chunk包含多个Page。以下是Row Group、Column Chunk和Page三个概念的说明:
行组(Row Group):一个行组对应逻辑表中的若干行。
列块(Column Chunk):一个行组中的一列保存在一个列块中。
页(Page):一个列块的数据会划分为若干个页。
Footer(File Meta Data)中存储了每个行组(Row Group)中的每个列快(Column Chunk)的元数据信息,元数据信息包含了该列的数据类型、该列的编码方式、该类的Data Page位置等信息。
b.压缩
a.介绍
压缩格式 算法 文件扩展名 是否可切分
DEFLATE DEFLATE .deflate 否
Gzip DEFLATE .gz 否
bzip2 bzip2 .bz2 是
LZO LZO .lzo 是
Snappy Snappy .snappy 否
b.Hadoop在driver端设置压缩
压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec
c.压缩性能的比较
压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s
c.示例
a.表的压缩设置
CREATE TABLE my_table (
id INT,
name STRING
)
STORED AS TEXTFILE
TBLPROPERTIES ("compression"="gzip");
-------------------------------------------------------------------------------------------------
修改表的压缩设置:
ALTER TABLE my_table SET TBLPROPERTIES ('compression'='snappy');
b.计算过程中的压缩
计算过程中可以通过以下设置来启用压缩:
MapReduce任务压缩设置: 修改Hive配置文件(如hive-site.xml)或在Hive查询中设置:
-------------------------------------------------------------------------------------------------
SET mapreduce.output.fileoutputformat.compress=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
c.使用Hive的表属性进行压缩
SET hive.exec.compress.output=true;
SET hive.exec.compress.intermediate=true;
SET hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
这些设置将确保在Hive的MapReduce任务中启用压缩,从而减少中间数据和最终结果的数据量。
d.使用Hive的表属性
为了使得数据存储在表中时进行压缩,可以使用表属性:
ALTER TABLE my_table SET FILEFORMAT ORC;
ORC(Optimized Row Columnar)格式支持高效的压缩和读取性能。
12.HQL优化
a.列裁剪
a.介绍
列裁剪(Column Pruning)是指只选择需要的列来减少数据扫描量。
Hive 在查询时会自动进行列裁剪,但你在编写查询时也应尽量只选择需要的列,以提高查询效率。
b.示例
-- 只选择需要的列,减少数据扫描量
SELECT emp_id, amount
FROM sales;
b.分区裁剪
a.介绍
分区裁剪(Partition Pruning)是指在查询中利用表的分区信息来减少扫描的分区,从而提高查询效率。
在 WHERE 子句中使用分区列可以使 Hive 只扫描相关的分区。
b.示例
-- 只扫描指定的分区
SELECT *
FROM sales
WHERE year = 2023 AND month = 9;
c.合理设置 Reduce 的数量
a.介绍
合理设置 Reduce 的数量(Number of Reducers)是通过调整 Reducer 的数量来优化作业的性能。
Reducer 的数量通常与集群的资源和数据的规模有关。设置过少的 Reducers 可能导致负载不均衡,
而设置过多的 Reducers 可能导致资源浪费。
b.示例
-- 设置 Reduce 的数量
SET mapreduce.job.reduces=10;
d.Job 并行运行设置
a.介绍
Job 并行运行设置(Parallel Execution)允许 Hive 同时执行多个作业,以提高作业的执行速度。
合理设置可以提高资源利用率。
b.示例
-- 设置最大并行作业数
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=8;
e.小文件的问题优化
a.介绍
小文件的问题优化(Small Files Problem)指的是减少小文件的数量,因为小文件会增加 NameNode 的负担,
影响性能。可以通过合并小文件来优化。
b.使用 INSERT OVERWRITE 合并文件
INSERT OVERWRITE DIRECTORY '/path/to/directory'
SELECT * FROM sales;
c.配置 Hive 合并小文件
SET hive.merge.smallfiles.avgsize=128000000; -- 设置合并小文件的平均大小
SET hive.merge.size.per.table=256000000; -- 设置表文件的合并大小
f.JOIN 操作优化
a.介绍
JOIN 操作优化(JOIN Optimization)通过选择合适的 JOIN 类型和优化 JOIN 的执行策略来提高性能。
常见的优化技术包括:
使用适当的 JOIN 类型(例如 MAPJOIN 或 SMBJOIN):
避免大表与小表的笛卡尔积。
选择合适的 JOIN 顺序(使用 USE 或 JOIN 的优化提示)。
b.示例
-- 使用 MAPJOIN 提高小表 JOIN 性能
SELECT /*+ MAPJOIN(small_table) */ *
FROM large_table
JOIN small_table ON large_table.key = small_table.key;
g.SMBJoin
a.介绍
SMBJoin(Sorted Map Join)是一种优化 JOIN 操作的方法,适用于两个桶表(bucketed tables),
能够高效地处理大数据量的 JOIN 操作。
b.SMBJoin 成立的前提条件
两个表都必须是桶表。
两个表的桶数应相等。
两个表的桶列应相同。
c.两个桶表数量相等的 JOIN
在 JOIN 两个桶表时,桶数相等可以提高 JOIN 的效率,因为数据已经根据桶列进行了预排序和分区。
-- 创建桶表
CREATE TABLE table1 (key INT, value STRING)
CLUSTERED BY (key) INTO 4 BUCKETS;
CREATE TABLE table2 (key INT, value STRING)
CLUSTERED BY (key) INTO 4 BUCKETS;
-- 执行 SMBJoin
SELECT *
FROM table1 t1
JOIN table2 t2 ON t1.key = t2.key;
d.执行 SQL 看对比情况
执行 SQL 语句并查看执行计划可以帮助确认是否使用了 SMBJoin。
-- 查看执行计划
EXPLAIN SELECT * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key;
h.Hive 数据倾斜优化
a.介绍
数据倾斜优化(Data Skew Optimization)是解决数据不均匀分布问题的关键,
数据倾斜会导致部分 reducer 处理过多的数据,从而影响作业的整体性能。
b.数据倾斜的原因
数据分布不均:某些 key 的数据量过大。
大表和小表的 JOIN 导致数据集中。
c.数据倾斜的表现
某些 reducer 处理的数据量远大于其他 reducer。
作业执行时间长,某些任务的执行时间远超其他任务。
d.通用解决方案参数调节
调整 reduce 的个数:SET mapreduce.job.reduces=50;
启用 skew join:SET hive.optimize.skewjoin=true;
e.SQL 语句调节
a.使用随机采样技术
SELECT *
FROM large_table
WHERE key IN (SELECT key FROM small_table);
b.引入虚拟列减少数据倾斜
SELECT *
FROM large_table l
JOIN small_table s
ON l.key = s.key
AND (l.hash_key % 10) = (s.hash_key % 10);
g.总结
列裁剪:只选择需要的列以减少数据扫描量。
分区裁剪:在查询中利用分区信息来减少扫描的数据量。
合理设置 Reduce 的数量:调整 Reducer 数量以提高资源利用率。
Job 并行运行设置:允许多个作业并行运行以提高效率。
小文件优化:合并小文件减少 NameNode 负担。
JOIN 操作优化:选择合适的 JOIN 类型和顺序以提高性能。
SMBJoin:使用桶表进行高效的 JOIN 操作。
数据倾斜优化:解决数据不均匀分布问题,提高查询性能。
1.3 hive2
00.总结
a.hive创建表
a.student_internal表
CREATE TABLE student_internal (
id INT,
name STRING,
age INT
);
-------------------------------------------------------------------------------------------------
load data inpath '/user.txt' into table student_internal; --导入HDFS数据
load data local inpath '/home/hadoop/user.txt' into table student_internal; --导入本地数据
b.student_external表
CREATE EXTERNAL TABLE student_external (
id INT,
name STRING,
age INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/external/student_external';
c.mydb.dept 表
CREATE TABLE `mydb.dept`(
`dept_no` INT,
`addr` STRING,
`tel` STRING
)
PARTITIONED BY (date STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
d.temp_salary_hive 表
CREATE EXTERNAL TABLE temp_salary_hive (
salary_id STRING,
id INT,
salary INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
e.默认分隔符
表名称 表类型 默认分隔符
internal_student 内部表 ASCII 1 (\u0001 或 ^A)
student_external 外部表 逗号(,)
mydb.dept 内部分区表 逗号(,)
temp_salary_hive 外部表 制表符(\t)
f.查看表结构
方式1:DESCRIBE FORMATTED student_hive; --查看col_name下的Table Type
方式2:SELECT TBL_NAME, TBL_TYPE FROM TBLS WHERE TBL_NAME = 'student_hive';
方式3:show create table internal_student;
g.创建表方式
a.使用create命令创建一个新表,带分区
CREATE TABLE `mydb.dept`(
`dept_no` int,
`addr` string,
`tel` string)
partitioned by(date string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
b.把一张表的某些字段抽取出来,创建成一张新表,使用as
create table mytest_tmp1 as select * from FDM_SOR.mytest_deptaddr where statis_date='20180229';
---------------------------------------------------------------------------------------------
1.as只会复制属性以及属性值到新的表中
2.使用as创建的表,并不会带原表的分区(分区丢失),包括一些字段的约束等(可以通过describe formatted查看)
3.新表中会将原表的分区当做字段出现在新表中
c.复制表结构,使用like创建表
create table mytest_tmp like FDM_SOR.mytest_deptaddr;
---------------------------------------------------------------------------------------------
注意:不会复制表中属性值,只会复制表结构(包括表的分区以及存储格式之类的,区别as)
d.迁移数据
INSERT INTO TABLE internal_student SELECT * FROM temp_student;
b.hive创建表到hbase
a.内部表
hive创建逻辑表后,自动会在hbase创建物理表
-------------------------------------------------------------------------------------------------
create table student_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "student");
b.外部表
hbase创建物理表后,然后在hive创建逻辑表映射过去
-------------------------------------------------------------------------------------------------
create 'salary','info'
create external table salary_hive(salary_id string,id int,salary int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:id,info:salary")
TBLPROPERTIES ("hbase.table.name" = "salary");
c.hive创建表到hdfs
a.内部表
CREATE TABLE managed_table (dummy STRING);
-------------------------------------------------------------------------------------------------
LOAD DATA INPATH '/workspace/hive/managed_table.txt' INTO table managed_table;
执行后,/workspace/hive/managed_table.txt 移动到 /user/hive/warehouse/managed_table/managed_table.txt,源文件删除
-------------------------------------------------------------------------------------------------
DROP TABLE managed_table;
经测试,hive内部表结构全部删除,user/hive/warehouse/managed_table删除
b.外部表
CREATE EXTERNAL TABLE external_table (dummy STRING);
-------------------------------------------------------------------------------------------------
LOAD DATA INPATH '/workspace/hive/external_table2.txt' INTO TABLE external_table;
执行后,/workspace/hive/external_table.txt 移动到 /user/hive/warehouse/external_table/external_table.txt,源文件删除
-------------------------------------------------------------------------------------------------
DROP TABLE external_table;
经测试,hive内部表结构全部删除,user/hive/warehouse/external_table不会删除
-------------------------------------------------------------------------------------------------
hadoop fs -ls /user/hive/warehouse
hadoop fs -rm -r /user/hive/warehouse/external_table
手动删除
d.事务
a.示例
CREATE TABLE internal_student (
id INT,
name STRING,
age INT
)
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
b.内部表
新增数据后,/user/hive/warehouse/internal_student文件有如下变化
从
空文件夹
新增到
delta_0000001_0000001_0000文件夹
c.外部表
新增数据后,/user/hive/warehouse/external_student文件有如下变化
从
external_student1.txt
external_student2.txt
新增到
000000_0 -> 存10,zhangsan,20
external_student1.txt
external_student2.txt
d.说明
默认不开启事务(不支持update和delete)
内部表开启事务(支持update和delete)
外部表不支持事务(不支持支持update和delete)
hive操作hbase(因HBase不支持Hive的ACID特性,因此不能通过Hive对HBase表进行事务管理)
e.将数据写入文件系统
a.将数据写入一个文件
a.使用 INSERT OVERWRITE DIRECTORY
-- 将查询结果写入到 HDFS 目录
INSERT OVERWRITE DIRECTORY '/user/hive/warehouse/sales_export'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT * FROM sales;
b.将结果复制到本地文件系统
# 从 HDFS 复制结果到本地文件系统
hadoop fs -get /user/hive/warehouse/sales_export /local/path/sales_export
b.将数据写入多个文件
a.使用 INSERT OVERWRITE DIRECTORY 导出到多个文件
-- 将查询结果写入到 HDFS 目录中的多个文件
INSERT OVERWRITE DIRECTORY '/user/hive/warehouse/sales_export_multiple'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT * FROM sales;
b.列出目录中的所有文件
hadoop fs -ls /user/hive/warehouse/sales_export_multiple
c.使用 hive -e 执行命令导出
# 执行查询并将结果输出到文件
hive -e 'SELECT * FROM sales;' > /local/path/sales_result.txt
d.使用 hive -f 执行 SQL 文件导出
a.创建 export_sales.sql 文件,内容如下:
SELECT * FROM sales;
b.使用 hive -f 执行 SQL 文件并导出
hive -f export_sales.sql > /local/path/sales_result_from_sql.txt
f.创建分区表
-- 创建一个分区事务表
CREATE TABLE sales (
order_id INT,
customer_name STRING,
amount DECIMAL(10,2)
)
PARTITIONED BY (year INT, month INT) -- 分区列
CLUSTERED BY (order_id) INTO 4 BUCKETS -- 可选的分桶
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
g.创建分桶表
-- 创建一个分桶事务表
CREATE TABLE employee_data (
emp_id INT,
name STRING,
department STRING
)
CLUSTERED BY (emp_id) INTO 4 BUCKETS -- 按 emp_id 分桶,分成 4 个桶
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
01.hive创建表
a.创建
a.语法
create [external] table [if not exists] table_name (
col_name data_type [comment '字段描述信息']
col_name data_type [comment '字段描述信息'])
[comment '表的描述信息']
[partitioned by (col_name data_type,...)]
[clustered by (col_name,col_name,...)]
[sorted by (col_name [asc|desc],...) into num_buckets buckets]
[row format row_format]
[storted as ....]
[location '指定表的路径']
-------------------------------------------------------------------------------------------------
-- EXTERNAL 代表外部表
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
-- 分区表设置 分区的字段和类型
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
-- 桶表设置 按照什么字段进行分桶
[CLUSTERED BY (col_name, col_name, ...)
-- 桶内的文件 是按照 什么字段排序 分多少个桶
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
-- 分隔符 + 序列化反序列化
[ROW FORMAT row_format]
-- 输入输出格式
[STORED AS file_format]
-- 表所对应的hdfs目录
[LOCATION hdfs_path]
b.说明
1、CREATE TABLE
创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;
用户可以用 IF NOT EXISTS 选项来忽略这个异常。
2、EXTERNAL
可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION).Hive 创建内部表时,
会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。
在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据.
3、LIKE
允许用户复制现有的表结构,但是不复制数据。
4、ROW FORMAT DELIMITED
是用来设置创建的表在加载数据的时候,支持的列分隔符。
Hive默认的分隔符是\001,属于不可见字符,这个字符在vi里是^A
-- 分隔符设置
-- 字段间分隔符
DELIMITED [FIELDS TERMINATED BY char]
-- 集合间分隔符
[COLLECTION ITEMS TERMINATED BY char]
-- map k v 间分隔符
[MAP KEYS TERMINATED BY char]
-- 行分隔符
[LINES TERMINATED BY char]
--序列化和反序列化设置
SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]
5、STORED AS
SEQUENCEFILE|TEXTFILE|RCFILE
如果文件数据是纯文本,使用 STORED AS TEXTFILE
如果数据需要压缩,使用 STORED AS SEQUENCEFILE、RCFile、ORC
SEQUENCEFILE --包含键值对的二进制的文件存储格式,支持压缩,可以节省存储空间
TEXTFILE --最普通的文件存储格式,内容是可以直接查看(默认的)
AVRO --带有schema文件格式的, 一行的数据是个map,添加字段方便
RCFile --是列式存储文件格式,适合压缩处理。对于有成百上千字段的表而言,RCFile更合适。
ORC --是列式存储文件格式,带有压缩和轻量级索引, 一行数据是个数组,查询快,不适合添加字段
parquet --是列式存储文件格式,带有压缩和轻量级索引, 和orc比较类似
ORC File,它的全名是Optimized Row Columnar (ORC) file。(有索引有压缩的列式存储格式)
ORC File包含一组组的行数据,称为stripes,除此之外,ORC File的file footer还包含一些额外的辅助信息。
在ORC File文件的最后,有一个被称为postscript的区,它主要是用来存储压缩参数及压缩的大小。
在默认情况下,一个stripe的大小为256MB。大尺寸的stripes使得从HDFS读数据更高效。
6、CLUSTERED BY
对于每一个表(table)或者分区, Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分。
Hive也是 针对某一列进行桶的组织。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。
7、LOCATION
指定加载数据路径(指定在hdfs上的位置).针对的extetnal外部表,要指定存储路径,不指定也没事,默认路径。
内部表不用指定,默认路径/user/hive/warehouse,CREATE TABLE创建一个具有给定名称的表。
如果已存在具有相同名称的表或视图,则会引发错误。您可以使用IF NOT EXISTS跳过错误。
c.查看内部表还是外部表
方式1:DESCRIBE FORMATTED student_hive; --查看col_name下的Table Type
方式2:SELECT TBL_NAME, TBL_TYPE FROM TBLS WHERE TBL_NAME = 'student_hive';
方式3:show create table internal_student;
d.查看表结构
DESC salary_hive
e.load加载数据
load data inpath '/user.txt' into table t_user; --导入HDFS数据
load data local inpath '/home/hadoop/user.txt' into table t_user; --导入本地数据
b.创建表的三种方法
a.使用create命令创建一个新表,带分区
CREATE TABLE `mydb.dept`(
`dept_no` int,
`addr` string,
`tel` string)
partitioned by(date string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
b.把一张表的某些字段抽取出来,创建成一张新表,使用as
create table mytest_tmp1 as select * from FDM_SOR.mytest_deptaddr where statis_date='20180229';
-------------------------------------------------------------------------------------------------
1.as只会复制属性以及属性值到新的表中
2.使用as创建的表,并不会带原表的分区(分区丢失),包括一些字段的约束等(可以通过describe formatted查看)
3.新表中会将原表的分区当做字段出现在新表中
c.复制表结构,使用like创建表
create table mytest_tmp like FDM_SOR.mytest_deptaddr;
-------------------------------------------------------------------------------------------------
注意:不会复制表中属性值,只会复制表结构(包括表的分区以及存储格式之类的,区别as)
d.迁移数据
INSERT INTO TABLE internal_student SELECT * FROM temp_student;
c.ORC格式和TXT格式
a.ORC格式
CREATE TABLE employee_orc (
emp_id INT,
name STRING,
department STRING
)
STORED AS ORC;
b.TXT格式
CREATE TABLE employee_txt (
emp_id INT,
name STRING,
department STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
c.总结
创建 ORC 表:
使用 STORED AS ORC 来创建表。
可以通过 INSERT INTO 从其他表导入数据,也可以从临时表中导入。
-------------------------------------------------------------------------------------------------
创建 TXT 格式的表:
使用 ROW FORMAT DELIMITED 和 FIELDS TERMINATED BY 指定分隔符来创建表。
使用 LOAD DATA 命令将外部 TXT 文件的数据导入到表中。
d.创建表的各种语法
a.student_internal表
CREATE TABLE student_internal (
id INT,
name STRING,
age INT
);
-------------------------------------------------------------------------------------------------
load data inpath '/user.txt' into table student_internal; --导入HDFS数据
load data local inpath '/home/hadoop/user.txt' into table student_internal; --导入本地数据
-------------------------------------------------------------------------------------------------
默认分隔符:
默认字段分隔符:Hive 默认使用控制字符 ASCII 1 (^A 或 \u0001) 作为字段分隔符。
行分隔符:默认是换行符(\n)。
-------------------------------------------------------------------------------------------------
表名称 表类型 默认分隔符
internal_student 内部表 ASCII 1 (\u0001 或 ^A)
student_external 外部表 逗号(,)
mydb.dept 内部分区表 逗号(,)
temp_salary_hive 外部表 制表符(\t)
-------------------------------------------------------------------------------------------------
默认分隔符 \u0001 是 Hive 设计的初衷,适用于大数据处理场景,避免了常见字符的冲突。
如果未指定 ROW FORMAT DELIMITED,如 internal_student,则默认采用 Hive 内部的标准分隔符。
指定分隔符时,请确保与数据格式一致,否则可能导致数据解析错误
b.student_external表
CREATE EXTERNAL TABLE student_external (
id INT,
name STRING,
age INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/external/student_external';
-------------------------------------------------------------------------------------------------
外部表:通过 CREATE EXTERNAL TABLE 创建,数据存储在指定的外部位置,不受 Hive 控制。
存储格式:STORED AS TEXTFILE,指定数据以文本文件格式存储。
数据位置:使用 LOCATION 指定 HDFS 中的数据存储位置 /user/hive/external/student_external。
数据生命周期:删除表时,数据不会被删除,只有表的元数据会被移除。
数据格式:行格式为分隔符格式,列通过逗号分隔。
c.mydb.dept 表
CREATE TABLE `mydb.dept`(
`dept_no` INT,
`addr` STRING,
`tel` STRING
)
PARTITIONED BY (date STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
-------------------------------------------------------------------------------------------------
内部表:通过 CREATE TABLE 创建,由 Hive 完全管理,数据默认存储在 Hive 的仓库目录中。
分区表:PARTITIONED BY (date STRING) 指定表按日期分区,分区有助于优化查询性能。
数据格式:行格式为分隔符格式,列通过逗号分隔。
数据生命周期:删除表时,数据会被一起删除,除非手动保留数据。
分区管理:分区表可以通过增加或删除分区来管理数据,适用于大规模数据集按时间、地区等维度分区的场景。
d.temp_salary_hive 表
CREATE EXTERNAL TABLE temp_salary_hive (
salary_id STRING,
id INT,
salary INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
-------------------------------------------------------------------------------------------------
外部表:也是一个外部表,数据的位置没有明确指定,这意味着数据的位置需要在加载时确定或者已经预先存在。
数据格式:行数据以制表符(\t)分隔,与前面两个表的逗号分隔不同。
数据生命周期:同样删除表时不会删除数据,Hive 只管理表的元数据。
数据存储:虽然没有指定 LOCATION,但作为外部表,它不会将数据存储在 Hive 的默认数据仓库中。
02.hive创建表到hbase
a.语法说明
a.语法
create table student_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "student_hbase");
b.说明1
在 Hive 中创建了一张逻辑表 student_hive,让用户可以通过 Hive SQL 操作与 HBase 表 student_hbase 中的数据进行交互。
Hive 表的字段与 HBase 表中的行键和列族进行了一一映射,使数据可以被透明地查询和操作。
c.说明2
create table student_hive(id int, name string, age int):
这行语句定义了一个名为 student_hive 的 Hive 表,其中包含三个字段:id(整数类型)、name(字符串类型)和 age(整数类型)。这些字段的定义与常规的 Hive 表没有区别。
-------------------------------------------------------------------------------------------------
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler':
这部分指定了表的存储方式为 HBase,通过使用 Hive 的 HBase 存储处理器(HBaseStorageHandler),让 Hive 表与 HBase 表关联起来。这样可以使 Hive 直接读写 HBase 中的数据。
-------------------------------------------------------------------------------------------------
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age"):
这行配置了 Hive 与 HBase 之间的字段映射关系,解释如下:
:key:表示 id 字段映射到 HBase 的行键(Row Key),它是 HBase 中用于标识每行数据的唯一标识符。
info:name:表示 name 字段映射到 HBase 中的列族 info 下的列 name。
info:age:表示 age 字段映射到 HBase 中的列族 info 下的列 age。
-------------------------------------------------------------------------------------------------
TBLPROPERTIES ("hbase.table.name" = "student_hbase"):
这行指定了在 HBase 中实际存储数据的表名为 student_hbase。如果这个表在 HBase 中不存在,Hive 不会自动创建,需要事先在 HBase 中创建好这个表。
b.内部表
a.创建
a.操作1
# 创建hbase表
create 'student','info'
b.操作2
# 创建hive的表映射(内部表)
create table student_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "student");
c.只有1种创建方式:
正确 第1种:第1步(操作2) hive创建逻辑表后,自动会在hbase创建物理表
错误 第2种:第1步(操作1)、第2步(操作2) 报错:Table student already exists within HBase
b.删除
a.操作1
drop table student_hive;
b.操作2
disable 'student'
drop 'student'
c.存在2种删除方式
正确 第1种:第1步(操作1) hive删除逻辑表后,自动会在hbase删除物理表
user/hive/warehouse/student_hive会删除
正确 第2种:第1步(操作2)、第2步(操作1) hbase删除物理表后,然后在hive删除逻辑表映射过去
user/hive/warehouse/student_hive会删除
c.外部表
a.创建
a.操作1
# 创建hbase表
create 'salary','info'
b.操作2
# 创建hive的表映射(外部表)
create external table salary_hive(salary_id string,id int,salary int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:id,info:salary")
TBLPROPERTIES ("hbase.table.name" = "salary");
c.只有1种创建方式:
错误 第1种:第1步(操作2) 报错:HBase table salary doesn't exist while the table is declared as an external table
正确 第2种:第1步(操作1)、第2步(操作2) hbase创建物理表后,然后在hive创建逻辑表映射过去
b.删除
a.操作1
drop table salary_hive;
b.操作2
disable 'salary'
drop 'salary'
c.存在2种删除方式
正确 第1种:第1步(操作1) hive删除逻辑表后,不会在hbase删除物理表
user/hive/warehouse/salary_hive不会删除,hadoop fs -rm -r /user/hive/warehouse/salary_hive
正确 第2种:第1步(操作2)、第2步(操作1) hbase删除物理表后,然后在hive删除逻辑表映射过去
user/hive/warehouse/salary_hive不会删除,hadoop fs -rm -r /user/hive/warehouse/salary_hive
03.hive创建表到hdfs
a.内部表
a.语法
a.第1步:只创建内部表,hbase不创建
CREATE TABLE managed_table (dummy STRING);
b.第2步:挂载数据,可挂载多个文件
LOAD DATA INPATH '/workspace/hive/managed_table.txt' INTO table managed_table;
---------------------------------------------------------------------------------------------
执行后,/workspace/hive/managed_table.txt 移动到 /user/hive/warehouse/managed_table/managed_table.txt,源文件删除
c.第3步:查看数据
SELECT * FROM managed_table;
b.说明
DROP TABLE managed_table;
经测试,hive内部表结构全部删除,user/hive/warehouse/managed_table删除
b.外部表
a.语法
a.第1步:只创建外部表,hbase不创建
CREATE EXTERNAL TABLE external_table (dummy STRING);
b.第2步:挂载数据,可挂载多个文件
LOAD DATA INPATH '/workspace/hive/external_table.txt' INTO TABLE external_table;
LOAD DATA INPATH '/workspace/hive/external_table2.txt' INTO TABLE external_table;
---------------------------------------------------------------------------------------------
执行后,/workspace/hive/external_table.txt 移动到 /user/hive/warehouse/external_table/external_table.txt,源文件删除
c.第3步:查看数据
SELECT * FROM external_table;
b.说明
DROP TABLE external_table;
经测试,hive内部表结构全部删除,user/hive/warehouse/external_table不会删除
-------------------------------------------------------------------------------------------------
hadoop fs -ls /user/hive/warehouse
hadoop fs -rm -r /user/hive/warehouse/external_table
手动删除
04.事务
a.内部表和外部表默认情况下都支持INSERT操作
a.内部表(Managed Table)
默认支持 INSERT 操作:内部表是由 Hive 完全管理的表,数据存储在 Hive 的默认仓库目录(通常是 HDFS 上的 /user/hive/warehouse)。
支持的插入方式:INSERT INTO:向表中追加数据。
INSERT OVERWRITE:覆盖表中的现有数据。
INSERT INTO ... SELECT ...:从其他表中插入数据。
数据生命周期:删除内部表时,表数据也会被一起删除。
b.外部表(External Table)
默认支持 INSERT 操作:外部表的数据存储在 Hive 仓库目录之外,Hive 只管理表的元数据,而不控制数据的存储位置。
支持的插入方式:INSERT INTO:向表中追加数据。
INSERT OVERWRITE:覆盖表中的现有数据。
数据生命周期:删除外部表时,数据不会被删除,只有表的元数据被移除。
c.限制和注意事项
插入语句对外部表和内部表都适用,但一些特定的存储格式(如 CSV、Parquet、ORC)和存储引擎(如 HBase、Kafka)可能对 INSERT 语句有不同的支持程度。
分区表的插入:如果表是分区表,则插入时需要指定分区值或者使用动态分区插入。
文件格式限制:某些表文件格式和存储配置可能会限制 INSERT 的使用,如需要事务支持的表必须使用 ORC 等支持 ACID 的格式。
b.hive开启事务支持行级insert、update、delete、insert overwrite、mergee into
a.启用ACID特性
/usr/local/hive/hive-3.1.3/conf/hive-site.xml
-------------------------------------------------------------------------------------------------
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
b.添加Hive元数据(使用mysql存储)
use hive_metadata;
INSERT INTO NEXT_LOCK_ID VALUES(1);
INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
INSERT INTO NEXT_TXN_ID VALUES(1);
COMMIT;
quit;
-------------------------------------------------------------------------------------------------
说明:初始时这三个表没有数据,如果不添加数据,会报以下错误:
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager FAILED: Error in acquiring locks: Error communicating with the metastore
c.创建事务表:创建支持事务的表时,需要使用TBLPROPERTIES设置为事务表
CREATE TABLE student (
id INT,
name STRING,
age INT
)
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
d.支持ORC格式
在 Hive 中,要使表支持 ACID 特性(事务管理),并支持增删改操作,表必须使用支持事务的文件格式。
目前,Hive 主要支持以下文件格式来实现 ACID 特性:
ORC(Optimized Row Columnar)
Parquet
Avro
-------------------------------------------------------------------------------------------------
然而,TEXTFILE 格式不支持 ACID 特性,因此不能直接用于事务性表。
这是因为 TEXTFILE 格式缺乏必要的索引和压缩机制,无法有效支持事务所需的功能如行级别的插入、更新和删除。
-------------------------------------------------------------------------------------------------
必须使用支持 ACID 的文件格式:
ORC 是最常用的格式,特别适合大数据处理场景,支持压缩、快速读写和事务操作。
TEXTFILE 格式是纯文本格式,不具备行级更新和删除的能力,因此不支持事务。
-------------------------------------------------------------------------------------------------
表的创建要求:
要启用 ACID 特性,必须在表创建时使用 STORED AS ORC 或其他 ACID 支持的格式。
同时设置 TBLPROPERTIES ('transactional'='true'),以确保表启用了事务支持
-------------------------------------------------------------------------------------------------
如果想启用事务,需要创建表的正确示例如下:
CREATE TABLE internal_student (
id INT,
name STRING,
age INT
)
STORED AS ORC -- 必须使用支持ACID的格式,如ORC
TBLPROPERTIES ('transactional'='true');
c.hive操作hbase,不能开启事务
a.HBase不支持事务
HBase 是一个分布式列存储数据库,设计时不支持事务性操作如行级更新、删除和事务一致性。
HBase 主要用于大规模数据的存储和读取,事务支持不是其核心功能。
b.Hive ACID 特性要求
Hive 的 ACID 特性要求表使用支持事务的文件格式(如 ORC 或 Parquet)。
ACID 表支持行级插入、更新和删除操作,并要求底层存储格式能处理这些操作。
c.总结
HBase 和 Hive ACID 不兼容:HBase 不支持 Hive 的 ACID 特性,因此不能通过 Hive 对 HBase 表进行事务管理。
数据操作:在 Hive 中可以对 HBase 表进行简单的插入和查询操作,但不支持行级的事务操作(如更新和删除)。这些操作需要通过 HBase 的 API 进行。
c.示例:基本表 -> 迁移到 带有事务的表
a.原有表
CREATE TABLE internal_student_temp (
id INT,
name STRING,
age INT
);
b.创建一个带事务管理的新表
-- 创建一个临时表,支持事务管理
CREATE TABLE internal_student_temp (
id INT,
name STRING,
age INT
)
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
c.将数据从原表插入到新的事务表中
INSERT INTO TABLE internal_student_temp SELECT * FROM internal_student;
d.删除原表并重命名事务表
-- 删除原来的非事务表
DROP TABLE internal_student;
-- 重命名事务表为原来的表名
ALTER TABLE internal_student_temp RENAME TO internal_student;
d.示例:内部表
a.语法
a.第1步(开启事务):只创建内部表,hbase不创建
CREATE TABLE internal_student (
id INT,
name STRING,
age INT
)
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
b.第2步:挂载数据,由于开启事务,必须是ORC格式,不支持TEXTFILE,故使用临时表
a.报错
SemanticException Unable to load data to destination table. Error:
The file that you are trying to load does not match the file format
of the destination table.
b.问题分析
表 internal_student 使用的是 ORC 格式,并且启用了事务性(transactional='true'),
这意味着表的所有数据操作都必须符合 ORC 格式和事务管理的要求。
-----------------------------------------------------------------------------------------
您尝试使用 LOAD DATA INPATH 命令加载一个文本文件(CSV 格式),
但这个文件并不符合表的 ORC 格式要求,因此导致格式不匹配错误 (SemanticException)。
c.方法一:可以通过临时表将数据加载为 ORC 格式,然后再将数据插入到目标事务表中
a.创建一个临时表,使用 TEXTFILE 格式
CREATE TABLE temp_student (
id INT,
name STRING,
age INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
b.将数据加载到临时表中
1,zhangsan,20
2,lisi,21
3,wangwu,22
------------------------------------------------------------------------------------
LOAD DATA INPATH '/workspace/hive/internal_student.txt' INTO TABLE temp_student;
c.将临时表的数据插入到事务表 internal_student 中,Hive 会在插入过程中将数据转换为 ORC 格式。
INSERT INTO TABLE internal_student SELECT * FROM temp_student;
d.完成后,可以删除临时表
DROP TABLE temp_student;
d.如果数据量较小,可以直接使用 INSERT 语句来插入数据:
INSERT INTO internal_student VALUES (1, 'zhangsan', 20);
INSERT INTO internal_student VALUES (2, 'lisi', 21);
INSERT INTO internal_student VALUES (3, 'wangwu', 22);.
c.第3步:查看数据
SELECT * FROM internal_student;
b.增删改
a.INSERT(支持)
INSERT INTO TABLE internal_student VALUES (5, 'zhangsan', 20);
INSERT INTO internal_student (id, name, age) VALUES (6, 'zhangsan', 20);
---------------------------------------------------------------------------------------------
新增数据后,/user/hive/warehouse/internal_student文件有如下变化
从
空文件夹
新增到
delta_0000001_0000001_0000文件夹
b.UPDATE(支持)
UPDATE internal_student SET age = 25 WHERE id = 1;
c.DELETE(支持)
DELETE FROM internal_student WHERE id = 1;
TRUNCATE TABLE internal_student;
c.ALTER
a.添加新列
ALTER TABLE internal_student ADD COLUMNS (new_name STRING);
b.修改列数据类型或名称
ALTER TABLE internal_student CHANGE COLUMN name new_name2 STRING;
c.删除列
# ALTER TABLE internal_student REPLACE COLUMNS (id INT, name STRING);
# 必须大于当前的2个字段,才能操作
# 报错Replacing columns cannot drop columns for table default.internal_student.
d.关闭事务
# ALTER TABLE internal_student SET TBLPROPERTIES ('transactional'='false');
# 报错Cannot change 'transactional' without 'transactional_properties'
e.示例:外部表
a.语法
a.第1步(开启事务):只创建外部表,hbase不创建
CREATE EXTERNAL TABLE internal_student (
id INT,
name STRING,
age INT
)
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
---------------------------------------------------------------------------------------------
default.internal_student 不能声明为事务性,因为它是一个外部表
default.internal_student cannot be declared transactional because it's an external table)
a.第1步(不开启事务):只创建外部表,hbase不创建
CREATE EXTERNAL TABLE external_student (
id INT,
name STRING,
age INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
b.第2步:挂载数据,可挂载多个文件
1,zhangsan,20
2,lisi,21
3,wangwu,22
---------------------------------------------------------------------------------------------
LOAD DATA INPATH '/workspace/hive/external_student1.txt' INTO TABLE external_student;
LOAD DATA INPATH '/workspace/hive/external_student2.txt' INTO TABLE external_student;
---------------------------------------------------------------------------------------------
执行后,/workspace/hive/external_student.txt 移动到 /user/hive/warehouse/external_student/external_student.txt,源文件删除
d.第3步:查看数据
SELECT * FROM external_student;
b.增删改
a.INSERT(支持)
INSERT INTO TABLE external_student VALUES (10, 'zhangsan', 20);
INSERT INTO external_student (id, name, age) VALUES (10, 'zhangsan', 20);
---------------------------------------------------------------------------------------------
新增数据后,/user/hive/warehouse/external_student文件有如下变化
从
external_student1.txt
external_student2.txt
新增到
000000_0 -> 存10,zhangsan,20
external_student1.txt
external_student2.txt
b.UPDATE(外部表不支持事务)
UPDATE external_student SET age = 25 WHERE id = 1;
c.DELETE(外部表不支持事务)
DELETE FROM external_student WHERE id = 1;
TRUNCATE TABLE external_student;
c.ALTER
a.添加新列
ALTER TABLE external_student ADD COLUMNS (new_name STRING);
b.修改列数据类型或名称
ALTER TABLE external_student CHANGE COLUMN name new_name2 STRING;
c.删除列
ALTER TABLE external_student REPLACE COLUMNS (id INT, name STRING);
# 必须大于当前的2个字段,才能操作
# 报错Replacing columns cannot drop columns for table default.external_student.
d.关闭事务
# ALTER TABLE external_student SET TBLPROPERTIES ('transactional'='false');
# 报错Cannot change 'transactional' without 'transactional_properties'
05.将数据写入文件系统
a.将数据写入一个文件
a.使用 INSERT OVERWRITE DIRECTORY
-- 将查询结果写入到 HDFS 目录
INSERT OVERWRITE DIRECTORY '/user/hive/warehouse/sales_export'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT * FROM sales;
b.将结果复制到本地文件系统
# 从 HDFS 复制结果到本地文件系统
hadoop fs -get /user/hive/warehouse/sales_export /local/path/sales_export
b.将数据写入多个文件
a.使用 INSERT OVERWRITE DIRECTORY 导出到多个文件
-- 将查询结果写入到 HDFS 目录中的多个文件
INSERT OVERWRITE DIRECTORY '/user/hive/warehouse/sales_export_multiple'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT * FROM sales;
b.列出目录中的所有文件
hadoop fs -ls /user/hive/warehouse/sales_export_multiple
c.使用 hive -e 执行命令导出
# 执行查询并将结果输出到文件
hive -e 'SELECT * FROM sales;' > /local/path/sales_result.txt
d.使用 hive -f 执行 SQL 文件导出
a.创建 export_sales.sql 文件,内容如下:
SELECT * FROM sales;
b.使用 hive -f 执行 SQL 文件并导出
hive -f export_sales.sql > /local/path/sales_result_from_sql.txt
06.创建分区表
a.介绍
在 Hive 中,分区表用于将数据分成多个分区,以便在查询时可以更高效地扫描和处理数据。
事务表则允许进行 ACID 事务操作,如 INSERT, UPDATE, DELETE 等。在 Hive 中结合分区和事务表,
可以管理大规模的数据,并对数据进行高效的操作。
-----------------------------------------------------------------------------------------------------
创建分区表:使用 PARTITIONED BY 语法定义分区列,设置 TBLPROPERTIES 为 'transactional'='true'。
插入数据:在插入数据时指定分区列的值。
查询数据:可以根据分区列进行查询,以提高效率。
更新数据:通常需要使用 INSERT OVERWRITE 或先删除再插入。
删除数据:可以选择删除整个分区或特定记录。
b.创建带事务和分区的表
创建一个支持事务的分区表。在这个示例中,我们将创建一个 sales 表,其中数据按 year 和 month 进行分区。
-----------------------------------------------------------------------------------------------------
-- 创建一个分区事务表
CREATE TABLE sales (
order_id INT,
customer_name STRING,
amount DECIMAL(10,2)
)
PARTITIONED BY (year INT, month INT) -- 分区列
CLUSTERED BY (order_id) INTO 4 BUCKETS -- 可选的分桶
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
c.插入数据
插入数据到分区表中时,需要指定分区的值。
-----------------------------------------------------------------------------------------------------
-- 插入数据到指定分区
INSERT INTO sales PARTITION (year=2023, month=9) VALUES
(1, 'Alice', 100.00),
(2, 'Bob', 150.00);
INSERT INTO sales PARTITION (year=2023, month=10) VALUES
(3, 'Charlie', 200.00),
(4, 'David', 250.00);
d.查询数据
查询分区表时,可以选择特定的分区来提高查询效率。
-----------------------------------------------------------------------------------------------------
-- 查询特定分区的数据
SELECT * FROM sales WHERE year = 2023 AND month = 9;
-----------------------------------------------------------------------------------------------------
-- 查询所有数据
SELECT * FROM sales;
-----------------------------------------------------------------------------------------------------
-- 聚合查询(例如,计算每月的总销售额)
SELECT year, month, SUM(amount) AS total_sales
FROM sales
GROUP BY year, month;
e.更新数据
更新分区表的数据。请注意,更新操作在 Hive 中通常较为复杂,并且可能涉及到重新写入数据。
-----------------------------------------------------------------------------------------------------
-- 更新特定分区的数据(实际操作可能需要先查询,再删除旧数据,再插入新数据)
-- 由于 Hive 更新操作的复杂性,通常推荐使用 INSERT OVERWRITE 进行更新
-- 例如,使用 INSERT OVERWRITE 更新 2023 年 9 月的所有记录
INSERT OVERWRITE TABLE sales PARTITION (year=2023, month=9)
SELECT order_id, customer_name, amount
FROM sales
WHERE year = 2023 AND month = 9;
-----------------------------------------------------------------------------------------------------
-- 如果需要更新特定记录,可以先删除再插入
-- 假设我们要更新 order_id = 1 的记录的 amount
-- 先删除旧记录
DELETE FROM sales WHERE year = 2023 AND month = 9 AND order_id = 1;
-- 再插入新记录
INSERT INTO sales PARTITION (year=2023, month=9) VALUES (1, 'Alice', 110.00);
f.删除数据
删除分区表中的数据时,可以选择删除整个分区,或删除特定的记录。
-----------------------------------------------------------------------------------------------------
-- 删除特定分区的数据
ALTER TABLE sales DROP PARTITION (year=2023, month=10);
-----------------------------------------------------------------------------------------------------
-- 删除特定记录(可以先查询,删除,再插入新数据)
-- 例如,删除 2023 年 9 月分区中 order_id = 2 的记录
DELETE FROM sales WHERE year = 2023 AND month = 9 AND order_id = 2;
g.增加表分区
a.创建示例表(包含分区)
-- 创建一个分区表
CREATE TABLE sales (
order_id INT,
customer_name STRING,
amount DECIMAL(10,2)
)
PARTITIONED BY (year INT, month INT) -- 分区列
STORED AS ORC;
b.插入数据到新分区
-- 向指定的分区插入数据
INSERT INTO sales PARTITION (year=2023, month=11) VALUES
(101, 'Alice', 300.00),
(102, 'Bob', 450.00);
h.删除分区
ALTER TABLE sales DROP PARTITION (year=2023, month=11);
i.修改表或分区的路径
a.修改表的存储路径
ALTER TABLE sales SET LOCATION '/new/path/to/sales_table';
b.修改分区的存储路径
-- 假设旧路径已经在 HDFS 上迁移,使用新的路径进行操作
-- 重新创建分区并指定新路径
ALTER TABLE sales ADD PARTITION (year=2023, month=12) LOCATION '/new/path/to/sales/2023/12';
-- 删除旧的分区
ALTER TABLE sales DROP PARTITION (year=2023, month=11);
j.分区重命名
a.创建新分区
ALTER TABLE sales ADD PARTITION (year=2023, month=10) LOCATION '/path/to/sales/2023/10';
b.移动数据
手动将数据从旧的路径迁移到新的路径。可以使用 HDFS 命令行工具(如 hadoop fs -mv)完成此操作,或者使用 Hive SQL 查询数据并插入到新分区。
c.删除旧分区
ALTER TABLE sales DROP PARTITION (year=2023, month=11);
k.动态分区
a.默认静态分区
-- 学生表
CREATE TABLE student(
id int,
name string,
age int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
-- 学生表数据
1 name1 12
2 name2 12
3 name3 13
4 name4 13
5 name5 14
6 name6 14
7 name7 15
8 name8 15
-- 学生分区表
CREATE TABLE student_dyna(
id int,
name string
)
partitioned by (age int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
-- 需求:把学生表的数据导入的学生分区表中
-- 静态分区导入 适合增量数据的处理
-- 添加12岁的分区,导入12岁数据
alter table student_dyna add IF NOT EXISTS partition(age=12) location '12';
insert overwrite table student_dyna partition(age=12)
select id,name from student where age=12;
-- 添加13岁的分区,导入13岁数据
alter table student_dyna add IF NOT EXISTS partition(age=13) location '13';
insert overwrite table student_dyna partition(age=13)
select id,name from student where age=13;
-- 添加14岁的分区,导入14岁数据
alter table student_dyna add IF NOT EXISTS partition(age=14) location '14';
insert overwrite table student_dyna partition(age=14)
select id,name from student where age=14;
-- 添加15岁的分区,导入15岁数据
alter table student_dyna add IF NOT EXISTS partition(age=15) location '15';
insert overwrite table student_dyna partition(age=15)
select id,name from student where age=15;
b.开启动态分区
--设置参数动态分区
--开启动态分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
-------------------------------------------------------------------------------------------------
--表示每个节点生成动态分区的最大个数,默认是100
set hive.exec.max.dynamic.partitions.pernode=10000;
--表示一个DML操作可以创建的最大动态分区数,默认是1000
set hive.exec.max.dynamic.partitions=100000;
--表示一个DML操作可以创建的最大文件数,默认是100000
set hive.exec.max.created.files=150000
-------------------------------------------------------------------------------------------------
--将select数据,覆盖到表的动态分区
insert overwrite table table1 partition (ds, hr) select key, value, ds, hr FROM table2 WHERE ds is not null;
07.创建分桶表
a.介绍
在 Hive 中,分桶(Bucketing)是一种将表数据分成不同的桶(文件)的技术,以便于更高效地查询和管理数据。
结合事务支持,可以创建一个带有分桶的事务表,并进行增、删、改、查操作。
-----------------------------------------------------------------------------------------------------
创建分桶表:使用 CLUSTERED BY 语法定义分桶列,并设置 TBLPROPERTIES 为 'transactional'='true'。
插入数据:数据会自动根据分桶列分配到桶中。
查询数据:与普通表的查询类似,分桶影响数据存储和处理性能。
更新数据:使用 INSERT OVERWRITE 替换数据。
删除数据:使用 DELETE 或 TRUNCATE TABLE 清除数据。
b.创建带事务和分桶的表
创建一个支持事务的分桶表。分桶表通常使用 CLUSTERED BY 语法定义分桶列,并指定分桶数量。
-----------------------------------------------------------------------------------------------------
-- 创建一个分桶事务表
CREATE TABLE employee_data (
emp_id INT,
name STRING,
department STRING
)
CLUSTERED BY (emp_id) INTO 4 BUCKETS -- 按 emp_id 分桶,分成 4 个桶
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
c.插入数据
向分桶表中插入数据。插入数据时,数据会自动分配到指定的桶中。
-----------------------------------------------------------------------------------------------------
-- 插入数据到表中
INSERT INTO employee_data VALUES
(1, 'John Doe', 'Sales'),
(2, 'Jane Smith', 'Marketing'),
(3, 'Alice Johnson', 'Engineering'),
(4, 'Bob Brown', 'HR');
d.查询数据
查询数据时,可以进行普通的 SQL 查询操作。分桶主要影响数据存储和处理的方式,查询语句与其他表类似。
-----------------------------------------------------------------------------------------------------
-- 查询表中的所有数据
SELECT * FROM employee_data;
-----------------------------------------------------------------------------------------------------
-- 查询特定部门的数据
SELECT * FROM employee_data WHERE department = 'Sales';
-----------------------------------------------------------------------------------------------------
-- 查询并统计每个部门的员工数量
SELECT department, COUNT(*) AS employee_count
FROM employee_data
GROUP BY department;
e.更新数据
在 Hive 中,更新分桶表的数据通常需要使用 INSERT OVERWRITE 操作,因为 Hive 更新操作较复杂。
可以先进行查询,然后用 INSERT OVERWRITE 替换旧数据。
-----------------------------------------------------------------------------------------------------
-- 更新表中的数据(例如,更新 emp_id 为 1 的记录的部门)
INSERT OVERWRITE TABLE employee_data
SELECT
emp_id,
CASE
WHEN emp_id = 1 THEN 'Business Development'
ELSE department
END AS department,
name
FROM employee_data;
f.删除数据
删除数据时可以使用 DELETE 语句进行操作(在 Hive 的较新版本中支持),也可以通过删除整个表或分区来完成。
-----------------------------------------------------------------------------------------------------
-- 删除特定记录(假设我们要删除 emp_id 为 2 的记录)
DELETE FROM employee_data WHERE emp_id = 2;
-----------------------------------------------------------------------------------------------------
-- 删除整个表的数据(清空表)
TRUNCATE TABLE employee_data;
1.4 hbase1
01.habse
a.打开
hbase shell
list
-----------------------------------------------------------------------------------------------------
exit
!quit
-----------------------------------------------------------------------------------------------------
whoami
status
version
-----------------------------------------------------------------------------------------------------
基于列式存储的NoSQL数据库
基于Googlebigtable论文
本质:只有插入操作(修改、删除是通过“时间戳”来模拟操作)
基于HDFS的分布式存储
Hbase只有字符串类型(无其他数据类型,更容易会提高整体效率)
Hbase容易水平扩展(只需简单的通过增加硬件,就可以达到扩展目的)
b.权限
grant <user> <permissions> [<table> [<column family> [<column; qualifier>]] --grant授予权限
grant 'hadoopdoc', 'RWXCA'
-----------------------------------------------------------------------------------------------------
revoke <user> --revoke撤销权限
revoke 'hadoopdoc'
-----------------------------------------------------------------------------------------------------
user_permission 'tablename' --列出特定表的所有权限
user_permission 'emp'
c.命名空间
list_namespace --查看命名空间
list_namespace_tables --查看命名空间中的表有什么
list_namespace_tables '命名空间' --查看命名空间中的表有什么
-----------------------------------------------------------------------------------------------------
create_namespace 'bigdata' --创建命名空间
create 'bigdata:student','info' --在新的命名空间中创建表
-----------------------------------------------------------------------------------------------------
drop_namespace 'bigdata' --只能删除空的命名空间,如果不为空,需要先删除该命名空间下的所有表
d.创建表
create '命名空间:表名称', '列族名称1','列族名称2','列族名称N'
create 'hainiu:student','cf1','cf2','cf3'
-----------------------------------------------------------------------------------------------------
create 表名,列1,列2
create 'scores','grade','course'
e.查看所有表
list
f.查看表结构
desc 'scores'
describe 'scores'
-----------------------------------------------------------------------------------------------------
exists 'scores'
g.插入数据
put 表,行键,列键,值
put '<table name>','<row>','<colfamily:colname>','<value>'
-----------------------------------------------------------------------------------------------------
put 'scores', 'zs', 'grade:', "1"
put 'scores', 'zs', 'course:Chinese', "100"
put 'scores', 'zs', 'course:Math', "95"
put 'scores', 'ls', 'grade:', "1"
put 'scores', 'ls', 'course:Chinese', "80"
put 'scores', 'ls', 'course:Math', "75"
h.更新数据
put 表,行键,列键,值
put '<table name>','<row>','<colfamily:colname>','<value>'
-----------------------------------------------------------------------------------------------------
put 'emp','row1','personal:city','Delhi'
i.查询数据(扫描表)
scan 表名
scan 'scores'
scan 'scores', {COLUMN=>'course:Math', LIMIT=>1}
j.查询数据(某一行)
get '表名', 行键
get 'scores', 'zs' --查询行键为zs的所有数据
get 'scores', 'zs', {COLUMN=>'course:Math'} --查询行键为zs,列键为course:Math的数据
k.删除数据
delete '<table name>', '<row>', '<column name>', '<time stamp>'
delete 'scores', 'zs', 'course:Math' --delete 表名,行键,列键
deleteall 'scores', 'zs' --deleteall 表名,行键
l.启用表
enable 'scores' --启用表
is_enabled 'scores' --是否启用了表
m.删除表
disable 'scores' --删除表以前,必须将表disabled
drop 'scores' --drop 表名
-----------------------------------------------------------------------------------------------------
disable_all 'raj.*' --禁用表
drop_all 'raj.*' --删除表
-----------------------------------------------------------------------------------------------------
disable 'scores'
truncate 'scores'
n.计数
count '<table name>'
o.导入数据:
importtsv -Dimporttsv.columns=HBASE_ROW_KEY,f1,f2 ...
p.导出数据
snapshot 'table_name', 'snapshot_name'
02.java Admin API
a.HBaseAdmin
HBaseAdmin 是代表Admin的类。此类属于org.apache.hadoop.hbase.client软件包。
使用此类,您可以执行管理员的任务。您可以使用Connection.getAdmin() 方法获取Admin实例。
-----------------------------------------------------------------------------------------------------
方法 说明
void createTable(HTableDescriptor desc) 创建一个新表
void createTable(HTableDescriptor desc, byte[][] splitKeys) 用指定的分割键定义的初始空区域集创建一个新表
void deleteColumn(byte[] tableName, String columnName) 从表中删除列
void deleteColumn(String tableName, String columnName) 从表中删除列
void deleteTable(String tableName) 删除表格
b.Descriptor
此类包含有关HBase表的详细信息,例如:
所有列族的描述符,
如果该表是目录表,
如果表是只读的,
记忆库的最大大小,
当应该发生区域分裂时,
与之相关的协处理器,等等。
-----------------------------------------------------------------------------------------------------
构造函数
方法 说明
HTableDescriptor(TableName name) 构造一个表描述符,指定一个TableName对象。
-----------------------------------------------------------------------------------------------------
方法与说明
方法 说明
HTableDescriptor addFamily(HColumnDescriptor family) 将列族添加到给定的描述符
1.5 hbase2
00.总结
a.hive创建表到hbase
a.student_hive(hive)->student表(hbase)
内部表
b.salary_hive(hive)->salary表(hbase)
外部表,先建hbase,再hive外部表。删除hive逻辑表,hbase数据仍存在
c.利用临时表向salary表(外部表)插入数据
INSERT INTO TABLE salary_hive SELECT salary_id, id, salary FROM temp_salary_hive;
d.hive查询
# 现在实现关联查询,每个用户的平均工资是多少,以及人名
select a.name,avg(b.salary) as avg
from student_hive a join salary_hive b
on a.id = b.id
group by a.name
b.hive创建表到hdfs
a.内部表
CREATE TABLE managed_table (dummy STRING);
-------------------------------------------------------------------------------------------------
LOAD DATA INPATH '/workspace/hive/managed_table.txt' INTO table managed_table;
执行后,/workspace/hive/managed_table.txt 移动到 /user/hive/warehouse/managed_table/managed_table.txt,源文件删除
-------------------------------------------------------------------------------------------------
DROP TABLE managed_table;
经测试,hive内部表结构全部删除,user/hive/warehouse/managed_table删除
b.外部表
CREATE EXTERNAL TABLE external_table (dummy STRING);
-------------------------------------------------------------------------------------------------
LOAD DATA INPATH '/workspace/hive/external_table.txt' INTO TABLE external_table;
LOAD DATA INPATH '/workspace/hive/external_table2.txt' INTO TABLE external_table;
执行后,/workspace/hive/external_table.txt 移动到 /user/hive/warehouse/external_table/external_table.txt,源文件删除
-------------------------------------------------------------------------------------------------
DROP TABLE external_table;
经测试,hive内部表结构全部删除,user/hive/warehouse/external_table不会删除
-------------------------------------------------------------------------------------------------
hadoop fs -ls /user/hive/warehouse
hadoop fs -rm -r /user/hive/warehouse/external_table
手动删除
c.hbase的导入导出
a.导入
# 不能会用hive的load方式直接将数据导入到hbase中,但是可以通过中间表的形式导入进行
# 首先在本地创建teacher.txt 输入以下内容
1,yeniu,20
2,xinniu,30
3,qingniu,35
-----------------------------------------------------------------------------------------------------
# 在hive中创建临时表
create table teacher_tmp(id int,name string,age int) row format delimited fields terminated by ',';
-----------------------------------------------------------------------------------------------------
# 将数据加载到临时表中
load data inpath '/workspace/hive/teacher.txt' into table teacher_tmp;
-----------------------------------------------------------------------------------------------------
# 创建和hbase的外部映射表
create table teacher_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "teacher_hbase");
-----------------------------------------------------------------------------------------------------
#从临时表使用insert将数据导入到hbase中
insert into teacher_hive select * from teacher_tmp;
b.导出
不指定行列格式(ROW FORMAT DELIMITED FIELDS TERMINATED BY ','),默认行格式:文本格式 (TextFile)、默认字段分隔符:CTRL-A (\u0001)
-----------------------------------------------------------------------------------------------------
# 导出到hdfs
INSERT OVERWRITE DIRECTORY '/workspace/hive/teacher_hive' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' SELECT * FROM teacher_hive;
# 导出到linux
INSERT OVERWRITE LOCAL DIRECTORY '/workspace/hive/teacher_hive' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' SELECT * FROM teacher_hive;
-----------------------------------------------------------------------------------------------------
从本地上传到HDFS
hdfs fs -put /workspace/hive/teacher_hive/teacher_hive.txt /workspace/hive/teacher_hive/teacher_hive.txt
d.hbase的bulkload
a.准备
# 首先在本地创建文件 bulkload_test.txt 输入以下内容
1,zhangsan,20
2,lisi,30
3,wangwu,40
5 zhaosi,50
-------------------------------------------------------------------------------------------------
# 然后将数据上传到hdfs中
hdfs dfs -put bulkload_test.txt /workspace/hive
-----------------------------------------------------------------------------------------------------
# 在hbase中创建表
create 'bulkload_test','info'
b.导入
# 删除已经存在文件夹
# Output directory hdfs://master:9000/workspace/hive/bulkload_output already exists
hadoop fs -rm -r /workspace/hive/bulkload_output
-------------------------------------------------------------------------------------------------
# 退出hbase shell环境,去linux执行
# (hdfs文件系统)
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.separator=',' \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age \
-Dimporttsv.skip.bad.lines=false \
-Dimporttsv.bulk.output=/workspace/hive/bulkload_output \
bulkload_test hdfs://master:9000/workspace/hive/bulkload_test.txt
e.MR操作Hbase
a.MR 读取 HBase 数据
设置 HBase Configuration: 初始化 HBase 配置,加载 HBase 的相关配置文件。
定义 HBase Input Format: 使用 TableInputFormat 设置 MR 作业的输入格式。需要指定输入表名和扫描器(Scan)等参数。
配置 Scan: Scan 对象用于定义从 HBase 读取数据的范围和过滤条件,比如要扫描的列族、列等。
设置 Job: 将 TableInputFormat 配置为作业的输入格式,Mapper 处理的输入是 HBase 表的行键和值。
实现 Mapper: 在 Mapper 中处理读取到的数据,如 ImmutableBytesWritable 和 Result,并提取需要的字段。
b.MR 写出数据到 HBase
设置 HBase Configuration: 同样需要初始化 HBase 配置并加载相关配置文件。
定义 HBase Output Format: 使用 TableOutputFormat 设置 MR 作业的输出格式,指定输出表的名称。
设置 Job: 将 TableOutputFormat 配置为作业的输出格式,并实现 Mapper 或 Reducer 输出到 HBase 表的逻辑。
实现 Reducer 或 Mapper: 在 Reducer 或 Mapper 中,将处理结果写出到 HBase 表。需要创建 Put 对象,并指定要写入的列族、列和值。
01.hive创建表到hbase
a.student_hive(hive)->student表(hbase)
# 创建hive的表映射(内部表),自动会在hbase创建物理表
create table student_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "student");
-----------------------------------------------------------------------------------------------------
# 插入数据(hive)
INSERT INTO TABLE student_hive VALUES(1,'zhangsan',20),(2,'lisi',21),(3,'wangwu',22);
-----------------------------------------------------------------------------------------------------
# 插入数据(hbase)
put 'student','4','info:name','troyekk'
put 'student','4','info:age','20'
put 'student','5','info:name','troyett'
put 'student','5','info:age','30'
-----------------------------------------------------------------------------------------------------
# 查看表(hive)
SELECT * FROM student_hive;
-----------------------------------------------------------------------------------------------------
# 查看表(hbase)
scan 'student'
b.salary_hive(hive)->salary表(hbase) --推荐,先建hbase,再hive外部表。删除hive逻辑表,hbase数据仍存在
# 创建hbase表
create 'salary','info'
-----------------------------------------------------------------------------------------------------
# 创建hive的表映射(外部表),需要先创建hbase物理表
create external table salary_hive(salary_id string,id int,salary int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:id,info:salary")
TBLPROPERTIES ("hbase.table.name" = "salary");
-----------------------------------------------------------------------------------------------------
# 插入数据(hive)
INSERT INTO TABLE salary_hive VALUES('s008','200','20');
-----------------------------------------------------------------------------------------------------
# 插入数据(hbase)
put 'salary','001','info:id','1'
put 'salary','002','info:id','1'
put 'salary','003','info:id','1'
put 'salary','004','info:id','2'
put 'salary','005','info:id','2'
put 'salary','006','info:id','2'
put 'salary','001','info:salary','1000'
put 'salary','002','info:salary','2000'
put 'salary','003','info:salary','3000'
put 'salary','004','info:salary','4000'
put 'salary','005','info:salary','5000'
put 'salary','006','info:salary','6000'
scan 'salary'
-----------------------------------------------------------------------------------------------------
# 查看表(hive)
SELECT * FROM salary_hive;
-----------------------------------------------------------------------------------------------------
# 查看表(hbase)
scan 'salary'
c.利用临时表向salary表(外部表)插入数据
# 准备数据文件
salary_hive.txt
s001 1 5000
s002 2 6000
s003 3 7000
s004 4 8000
s005 5 9000
-----------------------------------------------------------------------------------------------------
# 创建一个临时Hive表来加载数据(外部表)
CREATE EXTERNAL TABLE temp_salary_hive (
salary_id STRING,
id INT,
salary INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
-----------------------------------------------------------------------------------------------------
# 将数据加载到临时表
LOAD DATA INPATH '/workspace/hive/salary_hive.txt' INTO TABLE temp_salary_hive;
-----------------------------------------------------------------------------------------------------
# 将临时表中的数据插入到HBase映射的表中
INSERT INTO TABLE salary_hive SELECT salary_id, id, salary FROM temp_salary_hive;
-----------------------------------------------------------------------------------------------------
# INSERT INTO TABLE temp_salary_hive VALUES('s008','200','20');
# 插入,默认支持
-----------------------------------------------------------------------------------------------------
# UPDATE temp_salary_hive SET salary = '199' WHERE id = '1';
# 更新,需开启ACID特性,Attempt to do update or delete using transaction manager that does not support these operations.
-----------------------------------------------------------------------------------------------------
# DELETE FROM temp_salary_hive WHERE id = '1';
# 删除,需开启ACID特性,Attempt to do update or delete using transaction manager that does not support these operations.
-----------------------------------------------------------------------------------------------------
# TRUNCATE TABLE TEMP_SALARY_HIVE;
# 截断,需开启ACID特性,Cannot truncate non-managed table temp_salary_hive.
-----------------------------------------------------------------------------------------------------
DROP TABLE temp_salary_hive;
# 丢弃,可执行,由于创建外部表,发现 /user/hive/warehouse/temp_salary_hive 文件没有删除,若需删除,请手动
d.hive查询
# 现在实现关联查询,每个用户的平均工资是多少,以及人名
select a.name,avg(b.salary) as avg
from student_hive a join salary_hive b
on a.id = b.id
group by a.name
e.删除表,hive表结构+hbase结构全部删除
# hive,相当于hbase中的disable+drop
drop table student_hive; 内部表,user/hive/warehouse/student_hive会删除
drop table salary_hive; 外部表,user/hive/warehouse/salary_hive不会删除,hive删除逻辑表后,不会在hbase删除物理表
-----------------------------------------------------------------------------------------------------
# hbase
disable 'student' 内部表,drop table student_hive; 执行后,hbase中的student自动删除
drop 'student' 内部表,drop table student_hive; 执行后,hbase中的student自动删除
disable 'salary' 外部表,drop table salary_hive; 执行后,hive删除逻辑表后,不会在hbase删除物理表
drop 'salary' 外部表,drop table salary_hive; 执行后,hive删除逻辑表后,不会在hbase删除物理表
-----------------------------------------------------------------------------------------------------
# hive
show tables;
# hbase
list
02.hive创建表到hdfs
a.内部表
a.语法
a.第1步:只创建内部表,hbase不创建
CREATE TABLE managed_table (dummy STRING);
b.第2步:挂载数据,可挂载多个文件
LOAD DATA INPATH '/workspace/hive/managed_table.txt' INTO table managed_table;
---------------------------------------------------------------------------------------------
执行后,/workspace/hive/managed_table.txt 移动到 /user/hive/warehouse/managed_table/managed_table.txt,源文件删除
c.第3步:查看数据
SELECT * FROM managed_table;
b.说明
DROP TABLE managed_table;
经测试,hive内部表结构全部删除,user/hive/warehouse/managed_table删除
b.外部表
a.语法
a.第1步:只创建外部表,hbase不创建
CREATE EXTERNAL TABLE external_table (dummy STRING);
b.第2步:挂载数据,可挂载多个文件
LOAD DATA INPATH '/workspace/hive/external_table.txt' INTO TABLE external_table;
LOAD DATA INPATH '/workspace/hive/external_table2.txt' INTO TABLE external_table;
---------------------------------------------------------------------------------------------
执行后,/workspace/hive/external_table.txt 移动到 /user/hive/warehouse/external_table/external_table.txt,源文件删除
c.第3步:查看数据
SELECT * FROM external_table;
b.说明
DROP TABLE external_table;
经测试,hive内部表结构全部删除,user/hive/warehouse/external_table不会删除
-------------------------------------------------------------------------------------------------
hadoop fs -ls /user/hive/warehouse
hadoop fs -rm -r /user/hive/warehouse/external_table
手动删除
03.hbase的导入导出
a.导入
# 不能会用hive的load方式直接将数据导入到hbase中,但是可以通过中间表的形式导入进行
# 首先在本地创建teacher.txt 输入以下内容
1,yeniu,20
2,xinniu,30
3,qingniu,35
-----------------------------------------------------------------------------------------------------
# 在hive中创建临时表
create table teacher_tmp(id int,name string,age int) row format delimited fields terminated by ',';
-----------------------------------------------------------------------------------------------------
# 将数据加载到临时表中
load data inpath '/workspace/hive/teacher.txt' into table teacher_tmp;
-----------------------------------------------------------------------------------------------------
# 创建和hbase的外部映射表
create table teacher_hive(id int,name string,age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age")
TBLPROPERTIES ("hbase.table.name" = "teacher_hbase");
-----------------------------------------------------------------------------------------------------
#从临时表使用insert将数据导入到hbase中
insert into teacher_hive select * from teacher_tmp;
b.导出
不指定行列格式(ROW FORMAT DELIMITED FIELDS TERMINATED BY ','),默认行格式:文本格式 (TextFile)、默认字段分隔符:CTRL-A (\u0001)
-----------------------------------------------------------------------------------------------------
# 导出到hdfs
INSERT OVERWRITE DIRECTORY '/workspace/hive/teacher_hive' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' SELECT * FROM teacher_hive;
# 导出到linux
INSERT OVERWRITE LOCAL DIRECTORY '/workspace/hive/teacher_hive' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' SELECT * FROM teacher_hive;
-----------------------------------------------------------------------------------------------------
从本地上传到HDFS
hdfs fs -put /workspace/hive/teacher_hive/teacher_hive.txt /workspace/hive/teacher_hive/teacher_hive.txt
04.hbase的bulkload
a.介绍
在大数据的场景计算中,有时候我们会遇见将大量数据一次性导入到hbase的情况,但是这个时候hbase是不能够容纳的,
因为插入的数据首先会进入到memstore中如果大量插入数据会造成memstore的内存压力急剧增大,这个时候机器的其他
进程是没有办法执行的,并且还会出现非常严重的问题,比如hbase在大量插入数据的时候首先这个region会急剧增加,
后续region会按照拆分策略进行region拆分,当前region下线,插入程序会直接卡死造成hbase宕机等严重问题,
为了解决这个问题,hbase给用户提供了一种新的插入数据的方式bulkload方式,这个方式中会跳过hbase本身的过程,
首先在使用hbase的提供的mapreduce程序按照插入数据的格式和hbase的表格式生成hfile文件,然后我们将hfile文件
一次性插入到hbase对应的hdfs的文件夹中,这种方式是最快捷并且对于hbase的压力最小的方式
b.准备
# 首先在本地创建文件 bulkload_test.txt 输入以下内容
1,zhangsan,20
2,lisi,30
3,wangwu,40
5 zhaosi,50
-----------------------------------------------------------------------------------------------------
# 然后将数据上传到hdfs中
hdfs dfs -put bulkload_test.txt /wrokspace/hive
-----------------------------------------------------------------------------------------------------
# 在hbase中创建表
create 'bulkload_test','info'
-----------------------------------------------------------------------------------------------------
# 测试(插入数据)
put 'bulkload_test', '1', 'info:name', 'zhangsan'
put 'bulkload_test', '1', 'info:age', '20'
put 'bulkload_test', '2', 'info:name', 'lisi'
put 'bulkload_test', '2', 'info:age', '30'
put 'bulkload_test', '3', 'info:name', 'wangwu'
put 'bulkload_test', '3', 'info:age', '40'
put 'bulkload_test', '5', 'info:name', 'zhaosi'
put 'bulkload_test', '5', 'info:age', '50'
-----------------------------------------------------------------------------------------------------
# 查看数据
scan 'bulkload_test'
c.导入
# 删除已经存在文件夹
# Output directory hdfs://master:9000/workspace/hive/bulkload_output already exists
hadoop fs -rm -r /workspace/hive/bulkload_output
-----------------------------------------------------------------------------------------------------
# 退出hbase shell环境,去linux执行
# (hdfs文件系统)
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.separator=',' \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age \
-Dimporttsv.skip.bad.lines=false \
-Dimporttsv.bulk.output=/workspace/hive/bulkload_output \
bulkload_test hdfs://master:9000/workspace/hive/bulkload_test.txt
# (linux文件系统)
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.separator=',' \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age \
-Dimporttsv.skip.bad.lines=false \
-Dimporttsv.bulk.output=/workspace/hive/bulkload_output \
bulkload_test file:///workspace/hive/bulkload_test.txt
-----------------------------------------------------------------------------------------------------
# 调整 MapReduce 任务的内存和 Java 虚拟机(JVM)设置
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.separator=',' \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age \
-Dimporttsv.skip.bad.lines=false \
-Dimporttsv.bulk.output=/workspace/hive/bulkload_output \
-Dmapreduce.map.memory.mb=2048 \
-Dmapreduce.reduce.memory.mb=2048 \
-Dmapreduce.map.java.opts=-Xmx1536m \
-Dmapreduce.reduce.java.opts=-Xmx1536m \
bulkload_test hdfs://master:9000/workspace/hive/bulkload_test.txt
05.MR操作Hbase
a.MR 读取 HBase 数据
a.流程
设置 HBase Configuration: 初始化 HBase 配置,加载 HBase 的相关配置文件。
定义 HBase Input Format: 使用 TableInputFormat 设置 MR 作业的输入格式。需要指定输入表名和扫描器(Scan)等参数。
配置 Scan: Scan 对象用于定义从 HBase 读取数据的范围和过滤条件,比如要扫描的列族、列等。
设置 Job: 将 TableInputFormat 配置为作业的输入格式,Mapper 处理的输入是 HBase 表的行键和值。
实现 Mapper: 在 Mapper 中处理读取到的数据,如 ImmutableBytesWritable 和 Result,并提取需要的字段。
b.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class HBaseReadMR {
public static class HBaseReadMapper extends Mapper<ImmutableBytesWritable, Result, Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
// 从 Result 对象中获取数据,例如读取某个列的数据
String rowKey = new String(key.get());
String columnValue = new String(value.getValue("column_family".getBytes(), "column".getBytes()));
context.write(new Text(rowKey), new IntWritable(Integer.parseInt(columnValue)));
}
}
public static void main(String[] args) throws Exception {
// 创建 HBase 配置
Configuration config = HBaseConfiguration.create();
config.set(TableInputFormat.INPUT_TABLE, "your_table_name");
// 创建 MapReduce Job
Job job = Job.getInstance(config, "HBase Read Job");
job.setJarByClass(HBaseReadMR.class);
// 设置 Mapper
job.setMapperClass(HBaseReadMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 配置 HBase 表输入格式
Scan scan = new Scan();
scan.addColumn("column_family".getBytes(), "column".getBytes());
TableMapReduceUtil.initTableMapperJob("your_table_name", scan, HBaseReadMapper.class, Text.class, IntWritable.class, job);
// 提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
b.MR 写出数据到 HBase
a.流程
设置 HBase Configuration: 同样需要初始化 HBase 配置并加载相关配置文件。
定义 HBase Output Format: 使用 TableOutputFormat 设置 MR 作业的输出格式,指定输出表的名称。
设置 Job: 将 TableOutputFormat 配置为作业的输出格式,并实现 Mapper 或 Reducer 输出到 HBase 表的逻辑。
实现 Reducer 或 Mapper: 在 Reducer 或 Mapper 中,将处理结果写出到 HBase 表。需要创建 Put 对象,并指定要写入的列族、列和值。
b.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class HBaseWriteMR {
public static class HBaseWriteMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 假设输入数据为 "rowKey,columnValue"
String[] parts = value.toString().split(",");
String rowKey = parts[0];
int columnValue = Integer.parseInt(parts[1]);
context.write(new Text(rowKey), new IntWritable(columnValue));
}
}
public static class HBaseWriteReducer extends Reducer<Text, IntWritable, ImmutableBytesWritable, Put> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
// 创建 Put 对象,指定行键
Put put = new Put(key.toString().getBytes());
// 添加列族、列和值
put.addColumn("column_family".getBytes(), "column".getBytes(), String.valueOf(sum).getBytes());
// 写出 Put 对象
context.write(null, put);
}
}
public static void main(String[] args) throws Exception {
// 创建 HBase 配置
Configuration config = HBaseConfiguration.create();
config.set(TableOutputFormat.OUTPUT_TABLE, "your_table_name");
// 创建 MapReduce Job
Job job = Job.getInstance(config, "HBase Write Job");
job.setJarByClass(HBaseWriteMR.class);
// 设置 Mapper 和 Reducer
job.setMapperClass(HBaseWriteMapper.class);
job.setReducerClass(HBaseWriteReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
// 配置 HBase 表输出格式
job.setOutputFormatClass(TableOutputFormat.class);
// 设置一个临时的输出路径,尽管 HBase 作业本身不会使用它
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("/tmp/hbase-output"));
// 提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1.6 phoenix
00.总结
a.建议
对于我们公司来说,虽然HBase用得多,但用Phoenix的比较少。
b.踩坑
从自己测试来看,Phoenix确实还存在各种不稳定,如下面描述的几点问题:
最新版本对HBase、Hadoop等有严格版本控制,对于已经用上HBase的业务来说要升级HBase版本适配Phoenix代价太大
与HBase强相关,作为HBase中的一个组件启动,HBase元数据容易遭到破坏
官方提供的创建索引方法,容易导致插入失败,查询失败,程序崩溃等问题
c.phoneix映射hbase中hbase乱码解决方案处理及nifi中部分细节问题
由于hbase 是 nosql ,其底层字段存储只由字节数组实现,不区分字段类型,因此在将数据存在hbase中时,为了防止乱码,所有字段均应为varchar类型
考虑到varchar类型的插入,phoneix中所有插入语句中字段值必须为单引号覆盖
phoneix正常创建表,列族会被序列化,hbase中会乱码,需要加上 COLUMN_ENCODED_BYTES=0;
主键加上了3中操作依然会显示为INFO:_0 (INFO列族下),因此尽量使用单独的rowkey字段作为主键,保持rowkey 与 真实 主键值相同即可
中文部分在hbase中会被存储为/uxxxx的utf8格式,具体使用时改变编码即可,不影响什么
在phoneix中创建的表会被默认采取大写形式存储到phoenix中,使用双引号括起来即可解决大小写问题,
d.示例
"stu"最好使用双引号创建
-----------------------------------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS "stu" (
"id" VARCHAR PRIMARY KEY,
"name" VARCHAR,
"age" VARCHAR
) COLUMN_ENCODED_BYTES=0;
-----------------------------------------------------------------------------------------------------
UPSERT INTO "stu" ("id", "name", "age") VALUES ('1', '张三', '20');
UPSERT INTO "stu" ("id", "name", "age") VALUES ('2', '李四', '22');
-----------------------------------------------------------------------------------------------------
SELECT * FROM "stu";
scan 'stu'
-----------------------------------------------------------------------------------------------------
CREATE INDEX "idx_name" ON "stu" ("name");
CREATE INDEX "idx_age" ON "stu" ("age");
CREATE INDEX "idx_name_age" ON "stu" ("name", "age");
!indexes
-----------------------------------------------------------------------------------------------------
DROP TABLE "stu";
01.常用1
a.打开
sqlline.py master:2181
sqlline.py master,slave1,slave2:2181
b.表操作命令
a.创建表
CREATE TABLE testtable (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR,
age INTEGER
);
-------------------------------------------------------------------------------------------------
可以指定列族,例如:
CREATE TABLE testtable (
id INTEGER NOT NULL PRIMARY KEY,
info.name VARCHAR,
info.age INTEGER
);
b.查看表结构
!desc testtable
!describe testtable
c.查看所有表
!tables
d.修改表
ALTER TABLE testtable ADD address VARCHAR;
e.删除表
DROP TABLE testtable; --创建表是testtable
DROP TABLE 'testtable'; --创建表是’testtable‘
DROP TABLE "testtable"; --如果表名使用了双引号(")在创建时定义,则删除表时也需要使用双引号,而不是单引号(')
c.数据操作命令
a.插入数据
UPSERT INTO testtable (id, name, age) VALUES (1, 'John', 30);
b.批量插入数据
UPSERT INTO testtable (id, name, age) VALUES (2, 'Jane', 25);
UPSERT INTO testtable (id, name, age) VALUES (3, 'Mike', 40);
c.查询数据
SELECT * FROM testtable;
d.查询特定条件的数据
SELECT * FROM testtable WHERE age > 30;
e.更新数据
UPSERT INTO testtable (id, name, age) VALUES (1, 'John', 35);
f.删除数据
DELETE FROM testtable WHERE id = 1;
d.索引操作
a.创建索引
create local index myindex1 on "hainiu"."testindex" (COL1);
b.查看执行计划,发现不全表扫描
explain select * from "hainiu"."testindex" where COL1 = '200';
select * from "hainiu"."testindex" where COL1 = '200';
c.查看索引
!indexes
d.删除索引
drop index myindex on "hainiu"."testindex";
e.视图操作
a.创建视图
CREATE VIEW testview AS SELECT * FROM testtable WHERE age > 20;
b.查看视图
!tables
c.删除视图
DROP VIEW testview;
f.统计和分析
a.查看表的统计信息
UPDATE STATISTICS testtable;
b.可以使用HINTS来优化查询
SELECT /*+ INDEX(testtable idx_name) */ * FROM testtable WHERE name = 'John';
g.系统命令
a.查看当前连接的Phoenix版本和JDBC驱动版本
!dbinfo
b.检查当前的数据库连接状态
!scan
c.获取当前运行的所有属性和设置
!set
d.退出
!quit
h.统计查询
a.示例1
select count(1) from PHTEST1;
select count(distinct col1) from PHTEST1;
select sum(num) from (select col1, count(*) as num from PHTEST1 group by col1) t1;
b.示例2
select col1, count(*) as num from PHTEST1 group by col1 order by num desc;
i.查询导入
a.临时表
create table PHTEST2(
pk varchar not null primary key,
col1 varchar,
col2 varchar,
col3 varchar
);
b.插入数据
upsert into PHTEST2 values ('x0001','newvalue','newvalue','newvalue');
upsert into PHTEST2 values ('x0002','newvalue','newvalue','newvalue');
upsert into PHTEST2 values ('x0003','3','4','5');
upsert into PHTEST2 values ('x0004','4','5','6');
upsert into PHTEST2 values ('x0005','newvalue','newvalue','newvalue');
upsert into PHTEST2 values ('x0006','newvalue','newvalue','newvalue');
c.执行批量更新, 将PHTEST2表的数据覆盖到PHTEST1表
upsert into PHTEST1 select * from PHTEST2;
j.数据导入
a.phoenix数据导入只支持csv文件格式
# 在客户端外
# 执行SQL文件
# 对标hive的-f test.sql ${hiveconf:batch_date}
# 创建sql文件 select * from PHTEST1
sqlline.py master:2181 /root/sql
b.操作
# 创建表
create table user(id varchar primary key,name varchar,age varchar);
-------------------------------------------------------------------------------------------------
# 创建csv文件 /root/user.csv
# 输入文件内容
# 1,zhangsan,20
# 2,lisi,30
-------------------------------------------------------------------------------------------------
psql.py -t USER master:2181 /root/user.csv
# 注意:
# 1)phoenix数据导入只支持后缀为.csv的文件, csv文件名称不需要和表名称一致,文件名可以小写
# 2)指定的表必须是大写,小写就报错
02.常用2
a.在phoenix建表时指定列族
CREATE TABLE my_table (
row_id VARCHAR PRIMARY KEY,
cf1.col1 INTEGER,
cf1.col2 VARCHAR,
cf2.col3 DATE
);
b.在phoenix建表时指定压缩格式
CREATE TABLE my_table (
row_id VARCHAR PRIMARY KEY,
cf1.col1 INTEGER,
cf1.col2 VARCHAR
)
COMPRESSION='SNAPPY';
-----------------------------------------------------------------------------------------------------
针对特定列族指定压缩格式:
CREATE TABLE my_table (
row_id VARCHAR PRIMARY KEY,
cf1.col1 INTEGER,
cf1.col2 VARCHAR
)
COLUMN_ENCODED_BYTES=0, COMPRESSION='SNAPPY';
c.在phoenix建表时预分region
在创建表时,将表预分成多个 Region,分界点分别为 '1000', '2000', '3000'
-----------------------------------------------------------------------------------------------------
CREATE TABLE my_table (
row_id VARCHAR PRIMARY KEY,
cf1.col1 INTEGER
)
SPLIT ON ('1000', '2000', '3000');
d.phoenix建表时指定组合rowkey
create table "hainiu"."combinationkey_table1" (
prefix varchar not null,
id varchar not null,
col1 varchar,
col2 varchar
CONSTRAINT pk primary key ( prefix,id )
)
column_encoded_bytes=0,
compression='snappy'
split on ('1','2','|');
-----------------------------------------------------------------------------------------------------
-- 插入数据
upsert into "hainiu"."combinationkey_table1" (prefix,id,col1,col2) values ('1','001','user1','20');
upsert into "hainiu"."combinationkey_table1" (prefix,id,col1,col2) values ('1','002','user2','21');
-----------------------------------------------------------------------------------------------------
-- 查看表结构
!describe "hainiu"."combinationkey_table"
e.phoenix实现动态列
-- 创建表
create table "hainiu"."dynamic_table1"(
pk varchar not null primary key,
col1 varchar,
col2 varchar
)column_encoded_bytes=0;
-----------------------------------------------------------------------------------------------------
-- 插入数据
upsert into "hainiu"."dynamic_table1" (pk,col1,col2) values ('x0001','user1','20');
upsert into "hainiu"."dynamic_table1" (pk,col1,col2) values ('x0002','user1','21');
upsert into "hainiu"."dynamic_table1" (pk,col1,col2) values ('x0003','user1','22');
upsert into "hainiu"."dynamic_table1" (pk,col1,col2) values ('x0004','user1','23');
-----------------------------------------------------------------------------------------------------
-- 动态插入列
-- 动态插入 col3 和 col4 列
upsert into "hainiu"."dynamic_table1" (pk,col1,col2,col3 varchar,col4 varchar) values ('x0005','user1','23','beijing','hainiu');
-- 动态插入 col4 和 col5 列
upsert into "hainiu"."dynamic_table1" (pk,col1,col2,col4 varchar,col5 varchar) values ('x0006','user2','32','huawei','30K');
-- 动态插入 col3、col4、col5 列
upsert into "hainiu"."dynamic_table1" (pk,col1,col2,col3 varchar,col4 varchar,col5 varchar) values ('x0007','user3','33','shanghai','ali','22K');
-- 动态插入 col3、col4、col5、col6 列
upsert into "hainiu"."dynamic_table1" (pk,col1,col2,col3 varchar,col4 varchar,col5 varchar,col6 varchar) values ('x0008','user4','35','shanghai','baidu','12K','false');
-----------------------------------------------------------------------------------------------------
-- phoenix中查询动态列
select * from "hainiu"."dynamic_table1"(col3 varchar,col4 varchar);
select * from "hainiu"."dynamic_table1"(col3 varchar,col4 varchar,col5 varchar) ;
select * from "hainiu"."dynamic_table1"(col3 varchar,col4 varchar,col5 varchar,col6 varchar) ;
f.phoenix与hbase表关联1
a.创建phoenix视图映射hbase表(Hbase有表数据)
a.介绍
映射hbase中已有的表为phoenix的视图,在这个模式下,通过phoenix可以以SQL的形式只读hbase的表。删除视图后,hbase的表仍存在。视图名称需要和hbase表名称一致。
phoenix的DDL中建议为所有表名和列名加上双引号,否则phoenix会全部转为大写进行识别,同样的在执行phoenix的查询命令时要给字符串用单引号,因为双引号里的会被识别为列或表或列族。。
其中ROW是主键,对应hbase表的rowkey,其他字段使用"列族"."列名"作为字段名。
若hbase表中的列名包含小数点,如如列族为'cf',列名为"root.a.b" 则在Phoenix的DDL中对应为"cf"."root.a.b" varchar,
b.使用
create view "phoenix_test"(
"ROW" varchar primary key,
"cf1"."name" varchar,
"cf1"."age" varchar
);
b.创建phoenix表映射hbase已有的表(Hbase无表数据)
a.介绍
映射hbase中已有的表为phoenix的表,在这个模式下,通过phoenix可以以SQL的形式对hbase表进行DDL和DML的操作,删除phoenix表后,hbase里对应的表也会被删除。
在Phoenix中映射hbase中已有的表,若hbase表未使用namespace命名空间则可以使用create table "hbase表名"()...;创建phoenix表,
若hbase表在namespace中即表名中带有冒号,则需要参考下面的第6点里的配置开启phoenix的映射hbase表空间,
此时可以使用命令create table "hbase表命名空间"."hbase表名"()...创建phoenix的表.
b.使用
create table "phoenix"."phoenix_test"(
"ROW" varchar primary key,
"cf1"."name" varchar,
"cf1"."age" varchar
)
column_encoded_bytes=0;
---------------------------------------------------------------------------------------------
创建的phoenix表看不到hbase中已有的数据,但若hbase表有新增数据或对phoenix表进行增删改查则可以看到hbase里的增量数据,
这个是因为phoenix4.10版本之后对列映射编码规则做了修改,我们可以通过在DDL最后加上一句
column_encoded_bytes=0即可关闭该配置,然后该phoenxi表创建之后就可以看到hbase钟的已有数据
g.phoenix与hbase表关联2
a.Hbase有表数据
a.只需要查找,不需要对数据进行操作
a.示例
create view if not exists "stu"(
"id" varchar primary key,
"info1"."name" varchar ,
"info1"."age" varchar
)column_encoded_bytes=0;
b.说明
这时需要建立视图,因为视图删除时不会影响到原数据
建映射视图和映射表需要遵循以下几点:
Phoneix的表名必须和 hbase的 库名.表名一致
Phoneix的表的主键的列名一般对应rowkey列,名称可以随意,但是类型得匹配!
Phoneix的表的普通的列名,必须和hbase的列族.列名一致!
表映射的结尾,必须添加column_encoded_bytes=0,不然无法从hbase中查询到数据的!
b.需要对数据进行操作
a.示例
create table if not exists "stu"(
"id" varchar primary key,
"info1"."name" varchar ,
"info1"."age" varchar
)column_encoded_bytes=0;
b.说明
此时就只能用映射表来操作
c.映射带命名空间的表
a.示例
create table "myns"."t1"(
"id" varchar(20) primary key,
"info1"."name" varchar (20)
)column_encoded_bytes=0;
b.说明
带命名空间的表在进行映射时,必须参照这个博文里的方法进行配置
https://blog.csdn.net/xdsxhdyy/article/details/96461576
除了hbase的hbase-site.xml 要配置,phoenix 的bin/hbase-site.xml也要配置,否则会报错
Ensure that config phoenix.schema.isNamespaceMappingEnabled is consitent on client and server
然后再创建与hbase同名的schema
create schema "myns";
b.Hbase无表数据
a.只有1种
a.示例
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city)
);
b.说明
此处用的联合主键,当然也支持单个主键
注意:在Phoenix中创建的映射表删除时也会将Hbase中的表删除
在建表时,小写的表名或列名,都会自动在sqlline.py中转为大写!这样在查询时,只能使用大写进行查询!
如果必须使用小写,需要在表名等字段上添加双引号,建议不要使用小写的表名或字段!
03.索引分类
a.全局索引(Global Index)
a.介绍
全局索引是在单独的索引表中维护数据,这种索引适用于全表范围内的查询,能够加速跨 Region 的查询操作。
b.示例
-- 创建一个基础表
CREATE TABLE my_table (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR,
age INTEGER
);
-------------------------------------------------------------------------------------------------
-- 创建全局索引
CREATE INDEX idx_name ON my_table (name);
c.说明
idx_name 是一个全局索引,针对 my_table 表中的 name 列创建。
索引表存储在独立的 Region 中,因此对大数据量的全表查询能提供显著性能提升。
b.本地索引(Local Index)
a.介绍
本地索引与主表数据存储在相同的 Region 上,是一种轻量级索引方式,不会增加过多存储开销。它适用于热点数据或 Region 内的查询优化。
b.示例
-- 创建本地索引
CREATE LOCAL INDEX idx_age ON my_table (age);
c.说明
idx_age 是一个本地索引,针对 my_table 表中的 age 列创建。
本地索引表与主表在同一个 Region 中,可以减少网络传输和延迟。
c.覆盖索引(Covering Index)
a.介绍
覆盖索引是指索引中包含查询需要的所有列,可以完全从索引表中获取数据而无需访问主表,从而极大提高查询性能。
b.示例
-- 创建一个包含多个列的全局索引(覆盖索引)
CREATE INDEX idx_name_age ON my_table (name) INCLUDE (age);
c.说明
idx_name_age 是一个覆盖索引,索引 name 列,并同时包含了 age 列的数据。
查询涉及 name 和 age 的时候,可以直接从索引表获取数据,无需访问主表。
d.函数索引(Function-based Index)
a.介绍
函数索引是基于列的函数结果创建的索引,用于加速对函数结果的查询,例如对转换、计算后的数据进行索引优化。
b.示例
-- 创建一个带函数的索引,例如针对 UPPER 函数
CREATE INDEX idx_upper_name ON my_table (UPPER(name));
c.说明
idx_upper_name 是一个函数索引,针对 name 列的 UPPER 函数结果创建索引。
当查询中使用 UPPER(name) 时,Phoenix 可以利用该索引来加速查询。
e.示例总结
全局索引:适用于全表查询,索引数据存储在独立的 Region,适合大规模数据。
本地索引:与主表共用 Region,适合小规模、频繁查询的场景,降低索引管理开销。
覆盖索引:索引表包含所有查询所需的数据,可以完全避免访问主表,非常高效。
函数索引:针对函数计算结果创建索引,优化函数调用时的查询性能。
1.7 sqoop
00.介绍
对数据库的支持。支持DB2、达梦、GreenPlum、HBase、HBase Phoenix、MySQL、PostgreSQL、MPP、Oracle、SQL Server、TeraData、Generic JDBC、HANA等数据源
Sqoop任务共支持5种任务类型,分别为:数据库到HDFS、数据库到HBase、数据库到Hive、HDFS到数据库、Hive到数据库。每种任务类型都可以进行整表导出,同时支持通过参数配置选择部分字段导出
操作数据库时需要将被操作的数据库驱动手动放入Sqoop安装路径下。若使用DataEngine平台,则放置驱动的目录为/usr/hdp/3.0.1.0-187/sqoop/lib/
从大数据集群(HDFS/Hive/HBase等)导出数据到Oracle数据库时,不支持表名小写的情况
01.Sqoop基本命令
a.查看所有命令
sqoop help
b.查看某条命令的具体使用方法
sqoop help <命令名>
02.Sqoop 与 HDFS
a.查询MySQL
a.查询 MySQL 所有数据库
sqoop list-databases \
--connect jdbc:mysql://master:3306/ \
--username hive \
--password hive
-------------------------------------------------------------------------------------------------
sqoop list-databases \
--connect jdbc:mysql://master:3306/ \
--username root
b.查询指定数据库中所有数据表
sqoop list-tables \
--connect jdbc:mysql://master:3306/mysql \
--username hive \
--password hive
-------------------------------------------------------------------------------------------------
sqoop list-tables \
--connect jdbc:mysql://master:3306/ \
--username root
b.MySQL 数据导入到 HDFS
导入命令 将 MySQL 数据库中的 help_keyword 表导入到 HDFS 的 /sqoop 目录下。
使用 3 个 map tasks 并行导入。如果目标目录存在,则先删除再导入。
-----------------------------------------------------------------------------------------------------
sqoop import \
--connect jdbc:mysql://master:3306/mysql \
--username root \
--password root \
--table help_keyword \
--delete-target-dir \
--target-dir /sqoop \
--fields-terminated-by '\t' \
-m 3
-----------------------------------------------------------------------------------------------------
sqoop import \
--connect jdbc:mysql://master:3306/test \
--username root \
--table demo \
--delete-target-dir \
--target-dir /sqoop \
--fields-terminated-by '\t' \
-m 3
-----------------------------------------------------------------------------------------------------
--delete-target-dir:目标目录存在则先删除。
--target-dir:导入的目标目录。
-m:指定并行执行的 map tasks 数量。
-----------------------------------------------------------------------------------------------------
注意:
默认使用表的主键列作为拆分依据。如果表没有主键,可以使用 --autoreset-to-one-mapper 启动一个 map task,
或使用 --split-by <column-name> 指定拆分数据的参考列。
-----------------------------------------------------------------------------------------------------
查看导入后的目录:
hadoop fs -ls -R /sqoop
-----------------------------------------------------------------------------------------------------
查看导入内容:
hadoop fs -text /sqoop/part-m-00000
c.HDFS 数据导出到 MySQL
sqoop export \
--connect jdbc:mysql://master:3306/mysql \
--username root \
--password root \
--table help_keyword_from_hdfs \
--export-dir /sqoop \
--input-fields-terminated-by '\t' \
-m 3
-----------------------------------------------------------------------------------------------------
--export-dir:HDFS 上的目录。
--input-fields-terminated-by:指定数据的分隔符。
-----------------------------------------------------------------------------------------------------
注意:MySQL 中的表 help_keyword_from_hdfs 需要预先创建,建表语句如下:
CREATE TABLE help_keyword_from_hdfs LIKE help_keyword;
d.MySQL 数据导入到 HBase
sqoop import \
--connect jdbc:mysql://master:3306/mysql \
--username root \
--password root \
--table help_keyword \
--hbase-table help_keyword_hbase \
--column-family keywordInfo \
--hbase-row-key help_keyword_id
-----------------------------------------------------------------------------------------------------
--hbase-table:目标 HBase 表名。
--column-family:列族名。
--hbase-row-key:HBase 的 RowKey 列名。
-----------------------------------------------------------------------------------------------------
注意:HBase 中的表 help_keyword_hbase 需要预先创建:
hbase> create 'help_keyword_hbase', 'keywordInfo'
-----------------------------------------------------------------------------------------------------
导入验证 使用 HBase shell 查看表数据:
hbase> scan 'help_keyword_hbase'
03.Sqoop 与 Hive
a.MySQL 数据导入到 Hive
a.导入命令
sqoop import \
--connect jdbc:mysql://master:3306/mysql \
--username root \
--password root \
--table help_keyword \
--delete-target-dir \
--target-dir /sqoop_hive \
--hive-database sqoop_test \
--hive-import \
--hive-overwrite \
-m 3
-------------------------------------------------------------------------------------------------
--hive-database:导入到 Hive 的数据库,需预先创建。
--hive-import:将数据导入到 Hive。
--hive-overwrite:覆盖 Hive 表中已有的数据。
b.导入验证
查看 Hive 中的数据库和表:
hive> SHOW DATABASES;
hive> SHOW TABLES IN sqoop_test;
-------------------------------------------------------------------------------------------------
查看表中数据:
hive> SELECT * FROM sqoop_test.help_keyword;
-------------------------------------------------------------------------------------------------
问题排查:
如果遇到 java.io.IOException: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf 错误,
请将 Hive 的 hive-exec-*.jar 文件复制到 Sqoop 的 lib 目录下:
cp /path/to/hive-exec-*.jar ${SQOOP_HOME}/lib
b.Hive 数据导出到 MySQL
a.查看 Hive 表在 HDFS 的存储位置
hive> use sqoop_test;
hive> desc formatted help_keyword;
b.执行导出命令
sqoop export \
--connect jdbc:mysql://master:3306/mysql \
--username root \
--password root \
--table help_keyword_from_hive \
--export-dir /user/hive/warehouse/sqoop_test.db/help_keyword \
--input-fields-terminated-by '\001' \
-m 3
-------------------------------------------------------------------------------------------------
--export-dir:Hive 表在 HDFS 上的存储路径。
--input-fields-terminated-by:Hive 中默认的分隔符为 \001。
-------------------------------------------------------------------------------------------------
注意:MySQL 中的表 help_keyword_from_hive 需要预先创建,建表语句如下:
CREATE TABLE help_keyword_from_hive LIKE help_keyword;
04.全库导出
a.介绍
Sqoop 支持通过 import-all-tables 命令进行全库导出到 HDFS 或 Hive,但需要注意以下限制:
所有表必须有主键;或者使用 --autoreset-to-one-mapper,只启动一个 map task。
不能使用非默认的分割列,也不能通过 WHERE 子句添加限制。
b.全库导出到 HDFS
sqoop import-all-tables \
--connect jdbc:mysql://master:3306/数据库名 \
--username root \
--password root \
--warehouse-dir /sqoop_all \
--fields-terminated-by '\t' \
-m 3
c.全库导出到 Hive
sqoop import-all-tables -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://master:3306/数据库名 \
--username root \
--password root \
--hive-database sqoop_test \
--hive-import \
--hive-overwrite \
-m 3
05.Sqoop 与 Dameng
a.配置 JDBC 驱动
首先,你需要确保 Sqoop 可以通过 JDBC 连接到达梦数据库。这通常涉及到将达梦数据库的 JDBC 驱动添加到 Sqoop 的类路径中。
下载达梦数据库的 JDBC 驱动(通常是 .jar 文件)。
将驱动文件复制到 Sqoop 的 lib 目录。例如:
cp /path/to/dmdb.jar ${SQOOP_HOME}/lib
b.使用 Sqoop 导入数据
示例:将达梦数据库中的表导入到 HDFS
sqoop import \
--connect jdbc:dm://<达梦数据库地址>:<端口>/<数据库名> \
--username <用户名> \
--password <密码> \
--table <表名> \
--target-dir /path/to/hdfs/dir \
--fields-terminated-by '\t' \
-m 1
-----------------------------------------------------------------------------------------------------
--connect: JDBC 连接字符串,替换 <达梦数据库地址>, <端口>, 和 <数据库名> 为实际值。
--username: 达梦数据库的用户名。
--password: 达梦数据库的密码。
--table: 需要导入的达梦数据库中的表名。
--target-dir: 导入数据存储到 HDFS 上的目录。
--fields-terminated-by: 指定字段分隔符。
-m: 指定并行执行的 map tasks 数量。
c.使用 Sqoop 导出数据
示例:将 HDFS 中的数据导出到达梦数据库
sqoop export \
--connect jdbc:dm://<达梦数据库地址>:<端口>/<数据库名> \
--username <用户名> \
--password <密码> \
--table <表名> \
--export-dir /path/to/hdfs/dir \
--input-fields-terminated-by '\t' \
-m 1
-----------------------------------------------------------------------------------------------------
--connect: JDBC 连接字符串,替换 <达梦数据库地址>, <端口>, 和 <数据库名> 为实际值。
--username: 达梦数据库的用户名。
--password: 达梦数据库的密码。
--table: 需要导入的达梦数据库中的表名。
--export-dir: HDFS 上的数据目录。
--input-fields-terminated-by: 指定输入数据的分隔符。
-m: 指定并行执行的 map tasks 数量。
06.Sqoop 与 Dameng
a.查看 SYSDBA 用户下所有表
sqoop list-tables --driver dm.jdbc.driver.DmDriver --connect jdbc:dm://192.168.142.235:5247 --username SYSDBA --password SYSDBA
b.数据导入(DM 导出到 HDFS)
sqoop import --driver dm.jdbc.driver.DmDriver --connect jdbc:dm://192.168.142.235:5247 --username SYSDBA --password SYSDBA --table BAK_DMINI_210714 -m 1
sqoop import --driver dm.jdbc.driver.DmDriver --connect jdbc:dm://192.168.142.235:5247 --username SYSDBA --password SYSDBA --table BAK_DMINI_210715 -m 1
-----------------------------------------------------------------------------------------------------
--driver dm.jdbc.driver.DmDriver:指定达梦数据库的 JDBC 驱动。
--connect jdbc:dm://192.168.142.235:5247:达梦数据库的 JDBC 连接字符串。
--username SYSDBA 和 --password SYSDBA:用于连接达梦数据库的用户名和密码。
--table BAK_DMINI_210714:需要导入的表名。
-m 1:使用 1 个 map task 进行数据导入。
-----------------------------------------------------------------------------------------------------
查看 HDFS 上的文件:
hadoop fs -ls /user/hadoop01/BAK_DMINI_210714
-----------------------------------------------------------------------------------------------------
查看文件内容:
hadoop fs -cat /user/hadoop01/BAK_DMINI_210714/part-m-00000
c.数据导出(HDFS 导出到 DM)
sqoop export --driver dm.jdbc.driver.DmDriver --connect jdbc:dm://192.168.142.235:5247 --username SYSDBA --password SYSDBA --table BAK_DMINI_210714 --export-dir /user/hadoop01/BAK_DMINI_210714/part-m-00000 --columns EMPLOYEE_ID,START_DATE,END_DATE,JOB_ID,DEPARTMENT_ID --input-fields-terminated-by ',' --input-lines-terminated-by '\n'
-----------------------------------------------------------------------------------------------------
--export-dir /user/hadoop01/BAK_DMINI_210714/part-m-00000:HDFS 上的文件目录。
--columns EMPLOYEE_ID,START_DATE,END_DATE,JOB_ID,DEPARTMENT_ID:需要导出的列。
--input-fields-terminated-by ',':指定输入数据的字段分隔符。
--input-lines-terminated-by '\n':指定输入数据的行结束符。
2 flink
3 scala
4 spark
5 doris
01.介绍
Apache Doris 是一个现代化的开源分布式 SQL 数据库管理系统,专为在线分析处理(OLAP)和实时分析而设计。
它提供高性能的查询能力,支持大规模的数据分析和实时数据处理。Doris 的设计目标是提供简单易用的界面,
同时具备高效的数据导入和查询性能。
02.关键特性
高性能:通过列式存储和向量化执行引擎,Doris 能够提供快速的查询响应时间。
实时分析:支持实时数据导入和更新,适合需要实时数据分析的场景。
易用性:提供类似 MySQL 的 SQL 接口,降低了学习和使用的门槛。
弹性扩展:支持水平扩展,可以处理从 GB 到 PB 级别的数据。
数据集成:支持多种数据源的集成,方便进行数据的提取、加载和转换(ELT)
02.关键特性2
a.跨源数据处理
Doris支持多种数据源连接器,可使用SQL对这些数据源进行联邦查询
数据源连接器:无论是 Hive、Iceberg、Hudi 、Paimon,还是支持 JDBC 协议的数据库系统,Doris 均能轻松连接并高效提取数据
支持数据源:Apache Hive、Apache Iceberg、Apache Hudi、Apache Paimon(Incubating)、Elasticsearch、MySQL、Oracle、SQL Server 等主流数据湖
b.外部数据源集成
通过与外部数据源的集成,实现对外部数据的查询和部分数据导入到 Doris 表中。
可以创建 Catalog 读取外部数据源中的数据,使用 INSERT INTO SELECT 将外部数据源中的数据同步写入到 Doris 中,配合 JOB 可以异步写入。
可以使用 X2Doris 将其他 AP 系统的数据迁移到 Doris 中。
6 finebi
7 datax
8 datahub
9 lakehouse
10 starrocks
11 DataEase