-
Steaming提供了另一种处理数据的方式。hadoop streaming api会为外部进程开启一个IO管道,然后数据会被传给这个进程,其会从标准输入中读取数据,然后通过标准输出来写结果数据,最好返回给Streaming API job。
-
Hive提供多个语法来使用streaming,包括Map()、REDUCE()和TRANSFORM()。
# Hive中使用java编写的MapReduce JOB
FROM (
FROM src
MAP value, key
USING 'java -cp hive-contrib-0.9.0.jar
org.apache.hadoop.hive.contrib.mr.example.IdentityMapper'
AS k, v
CLUSTER BY k) map_output
REDUCE k, v
USING 'java -cp hive-contrib-0.9.0.jar
org.apache.hadoop.hive.contrib.mr.example.WordCountReduce'
AS k, v;
- Map Task和Reduce Task分别使用GenericMR的map方法和reduce方法
- 如果对多个数据集进行JOIN连接处理,然后使用TRANSFORM进行处理。使用UNION ALL和CLUSTER BY,可以实现CO GROUP BY的常见操作。
- Pig提供原生的COGROUP BY操作
FROM (
FROM (
FROM order_log ol
-- User Id, order Id, and timestamp:
SELECT ol.userid AS uid, ol.orderid AS id, av.ts AS ts
UNION ALL
FROM clicks_log cl
SELECT cl.userid AS uid, cl.id AS id, ac.ts AS ts
) union_msgs
SELECT union_msgs.uid, union_msgs.id, union_msgs.ts
CLUSTER BY union_msgs.uid, union_msgs.ts) map
INSERT OVERWRITE TABLE log_analysis
SELECT TRANSFORM(map.uid, map.id, map.ts) USING 'reduce_script'
AS (uid, id, ...);
- Hive中文件格式间具有明显差异,例如文件中记录的编码方式、记录格式以及记录中字节流的编码的方式。
- Hive文本文件格式选择和记录格式对应的。
- Stored AS SEQUENCEFILE、ROW Format delimited、serde、inputformat、outputformat这些语法。
- 例如 STORED AS SEQUENCEFILE等同于INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileOutputFormat' .
- 通过Stored AS SequenceFile指定,SequenceFile文件是含有键-值对的二进制文件。将Hive查询转换成MapReduce JOB时,对于指定的记录,其取决使用那些合适的键值对。
- 其实Hadoop的一种给文件格式,因此和其他非Hadoop系统的工具无法共享。
- SequenceFile支持块和记录级别压缩,优化磁盘利用率和IO,同时可以支持有边界的压缩。
- 大部分Hadoop和Hive存储都是行式存储,在大多数常见下是高效的。
- 大多数的表具有的字段个数都不大(1到20字段)
- 对文件按块进行压缩对于需要处理重复数据的情况比较高效,同时很多处理和调试工具都很好适应行式存储的数据。
- 列式存储适合多字段时,并且大多数的查询只需要其中一小部分字段时,可以使用列式存储,这时候必须要扫描全部的行就可以拿到特定列的数据。
- 列式存储的压缩式非常高效的,特别是在这列的数据具有低计数时(没有太多重复数据),一些列式存储不需要存储null的列。
- RCFile是一个Hive列式存储文件格式。
- 使用hive -service rcfilecat工具查看RCFile格式文件
hive --service rcfilecat /user/hive/warehouse/columntable/000000_0
- SerDe是序列化/反序列化的简写。一个SerDe包含将一条记录的非结构化字节转换为Hive可以使用的一条记录的过程。
- 使用RegexSerDe处理Apache Web日志
CREATE TABLE serde_regex(
host STRING,
identity STRING,
user STRING,
time STRING,
request STRING,
status STRING,
size STRING,
referer STRING,
agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\])
([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\")
([^ \"]*|\"[^\"]*\"))?",
"output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
)
STORED AS TEXTFILE;
CREATE EXTERNAL TABLE messages (
msg_id BIGINT,
tstamp STRING,
text STRING,
user_id BIGINT,
user_name STRING
)
ROW FORMAT SERDE "org.apache.hadoop.hive.contrib.serde2.JsonSerde"
WITH SERDEPROPERTIES (
"msg_id"="$.id",
"tstamp"="$.created_at",
"text"="$.text",
"user_id"="$.user.id",
"user_name"="$.user.name"
)
LOCATION '/data/messages';
- Avro是一个序列化系统,其主要特点是它是一个进化的模式驱动的二进制数据存储模式。
CREATE TABLE doctors
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.literal'='{
"namespace": "testing.hive.avro.serde",
"name": "doctors",
"type": "record",
"fields": [
{
"name":"number",
"type":"int",
"doc":"Order of playing the role"
},
{
"name":"first_name",
"type":"string",
"doc":"first name of actor playing role"
},
{
"name":"last_name",
"type":"string",
"doc":"last name of actor playing role"
}
]
}');
- 使用desc tablename可以查看avro定义的模式
CREATE TABLE doctors
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.url'='hdfs:///test.schema'');
- avro如果需要减少一个字段,那么缺省值默认为null,增加字段也可以增加一个默认值
- ObjectIbnspector的检查器来将记录转换成Hive可以访问的对象。
- Hive具有一个可选的组件叫做HiveServer或者hiveThrift,其运行通过指定端口访问Hive。
hive --service hiveserver2 &
# 查看thrift端口情况
lsof -i tcp:10000
- 引入依赖
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.3.2</version>
</dependency>
- 测试
public class UseThriftHive {
public static void main(String[] args) throws SQLException {
Connection con = DriverManager.getConnection("jdbc:hive2://hadoop:10000");
Statement stmt = con.createStatement();
ResultSet resultSet = stmt.executeQuery("select * from test2");
while (resultSet.next()) {
System.out.println(resultSet.getString(1) + "---" + resultSet.getString(2));
}
}
}