Pig

看《Hadoop权威指南第三版》并记录在 Mac OSX 系统上练习Pig使用。

安装

在Mac OSX 上安装命令如下:

$ brew install pig

命令行使用

Pig Latin语言

Pig Latin的关系操作:

类型 操作 描述
加载与存储 LOAD 从文件系统或其他存储加载数据,存入关系
加载与存储 STORE 将一个关系存放在文件系统或者其他存储中
加载与存储 DUMP 将关系打印到控制台
过滤 FILTER 将关系中删除不需要的行
过滤 DISTINCT 将关系中删除重复的行
过滤 FOREACH … GENERATE 将关系中添加或删除字段
过滤 MAPREDUCE 以一个关系作为输入运行某个MapReduce 作业
过滤 STREAM 使用外部程序对一个关系进行变换
过滤 SAMPLE 对一个关系进行随机取样
分组与连接 JOIN 连接两个或多个关系
分组和连接 COGROUP 对两个或更多关系中的数据进行分组
分组和连接 GROUP 在一个关系中对数据进行分组
分组和连接 CROSS 创建两个或更多关系的乘积 (叉乘)
排序 ORDER 根据一个或多个字段对某个关系进行排序
排序 LIMIT 将一个关系的元组个人限定在一定数量内
组合和切分 UNION 合并两个或多个关系为一个关系
组合和切分 SPLIT 把某个关系切分为两个或多个关系

Pig Latin的诊断操作

操作 描述
DESCRIBE 打印关系的模式
EXPLAIN 打印逻辑和物理计划
ILLUSTRATE 使用生成的输入子集显示逻辑计划的试运行结果

注:

  • 这些命令不会被加入逻辑计划中
  • DUMP也是一种诊断操作

Pig Latin的宏和UDF语句

语句 描述
REGISTER 在Pig运行时环境中注册一个JAR文件
DEFINE 为宏、UDF、流式脚本或命令规范新建别名
IMPORT 把另一个文件中定义的宏导入脚本

注:

  • 这些命令不处理关系,所以它们不会被加入逻辑计划
  • 这些命令会被立即执行

Pig Latin 命令

类别 命令 描述
Hadoop文件系统 cat 打印一个或多个文件的内容
Hadoop文件系统 cd 改变当前目录
Hadoop文件系统 copyFromLocal 复制本地文件或目录
Hadoop文件系统 copyToLocal 将一个文件或目录从Hadoop 文件系统复制到本地文件系统
Hadoop文件系统 cp 把一个文件或目录复制到另一个目录
Hadoop文件系统 fs 访问Hadoop文件系统外壳程序
Hadoop文件系统 ls 打印文件列表信息
Hadoop文件系统 mkdir 创建新目录
Hadoop文件系统 mv 将一个文件或目录移动到另一个目录
Hadoop文件系统 pwd 打印当前工作目录
Hadoop文件系统 rm 删除一个文件或目录
Hadoop文件系统 rmf 强制删除文件或目录 (文件或目录不存在也不会失败)
Hadoop MapReduce 工具 kill 终止某个MapReduce 作业
Hadoop MapReduce 工具 exec 在新的 Grunt 外壳程序中以批处理模式运行脚本
Hadoop MapReduce 工具 help 显示可用的命令和选项
Hadoop MapReduce 工具 quit 退出解释器
Hadoop MapReduce 工具 run 在当前 Grunt 外壳程序中运行脚本
Hadoop MapReduce 工具 set 设置 Pig 选项 和 MapReduce 作业属性
Hadoop MapReduce 工具 sh 在 Grunt 中运行外壳命令

Pig Latin 表达式

类别 表达式 描述 示例
常量 文字 常量值 1.0, ‘a’
字段 (位置指定) $n 第n个字段(从0开始) $0, $1
字段 (名字指定) f 字段名f year
字段 (消除歧义) r::f 分组或连接后,关系r中的名为f的字段 A::year
投影 c.$n, c.f 在容器c (关系、包或元组) 中的字段按位置 或 名称指定 records.$0, records.year
Map 查找 m#k 在映射m中键k所对应的值 items#‘Coat’
类型转换 (t) f 将字段f转换为类型t (int) year
算术 +, -, *, /, % 加、减、乘、除、取余 $1 + $2, $1 - $2, $1 * $2, $1 / $2, $1 % $2
条件 x ? y : z 三元运算符,如果 x 为真,则y,否则为 z quality == 0 ? 0 : 1
比较 ==, !=, >, <, >=, <=, x matches y, x is null, x is not null 相等、不等、大于、小于、大于等于、小于等于、正则匹配、是空值、不是空值 quality matches '[01459]'
布尔型 or, and, not 逻辑或,逻辑与,逻辑非 q == 0 or q == 1, not q matches '[01459]'
平面化 FLATTEN(f) 从包或元组中去除嵌套 FLATTEN(group)

Pig Latin 类型

类别 数据类型 描述 示例
数值 int 32位有符号整数 99
数值 long 64位有符号整数 199L
数值 float 32位浮点数 0.8F
数值 double 64位浮点数 0.88
文本 chararray UTF-16格式的字符数组 ‘hello world’
二进制 bytearray 字节数组
复杂类型 tuple 任何类型的字段序列 (1, ‘programmer’)
复杂类型 bag 元组的无序多重集合 (允许重复的元组) {(1, ‘hello’), (2)}
复杂类型 map 一个键值对的集合。键必须是字符数组,值可以是任意类型的数据 [‘a’#‘hello’, ‘b’#‘world’]

注:

  • 内置函数:TOTUPLE, TOBAG, TOMAP 将表达式转换为元组、包以及映射。
  • 包必须在某个关系中。

Schema

Pig 中的一个关系可以关联一个模式。模式为关系的字段指定名称和类型。

Load语句的AS字句可以在关系上附以模式:

grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt' AS (year:int, temperature:int, quality:int);
grunt> DESCRIBE records;
records: {year:int,temperature:int,quality:int}

注:不指定类型的话,默认是bytearray
    DESCRIBE 用来查看模式 (Schema)

函数

Pig的函数有四种类型:

  • 计算函数 (Eval function)
  • 过滤函数 (Filter function)
  • 加载函数 (Load function)
  • 存储函数 (Store function)

宏提供了在Pig Latin内对可重用的Pig Latin代码进行打包的功能。

示例:

$ cat max_temp.macro
DEFINE max_by_group(X, group_key, max_field) RETURNS Y {
A = GROUP $X by $group_key;
$Y = FOREACH A GENERATE group, MAX($X.$max_field);
};

导入宏:

grunt> IMPORT './max_temp.macro';

使用宏:

grunt> records = LOAD '/input/ncdc/micro-tab/sample.txt' AS (year:chararray, temperature:int, quality:int);
grunt> max_temp = max_by_group(records, year, temperature);
grunt> DUMP max_temp;

用户自定义函数 (UDF)

以插件形式提供使用用户定制代码的能力。可以使用Java、Python、JavaScript写UDF。

下面是删除不符合质量要求的气温记录 (Java代码):

$ cat com/hadoopbook/pig/IsGoodQuality.java
package com.hadoopbook.pig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.pig.FilterFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;

public class IsGoodQuality extends FilterFunc {

  @Override
  public Boolean exec(Tuple tuple) throws IOException {
    if (tuple == null || tuple.size() == 0) {
      return false;
    }
    try {
      Object object = tuple.get(0);
      if (object == null) {
        return false;
      }
      int i = (Integer) object;
      return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
    } catch (ExecException e) {
      throw new IOException(e);
    }
  }
}

编译、打包:

$ javac -cp $(brew --prefix pig)/libexec/pig-0.16.0-core-h2.jar:$(hadoop classpath) com/hadoopbook/pig/IsGoodQuality.java
$ jar -cf com/hadoopbook/pig/IsGoodQuality.jar com/hadoopbook/pig/IsGoodQuality.class

注:我通过brew安装的pig所以使用了$(brew --prefix pig);可以手动指定pig.jar的具体路径。

在Pig中使用该UDF:

$ pig
grunt> REGISTER ./com/hadoopbook/pig/IsGoodQuality.jar;
grunt> DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
grunt> records = LOAD '/input/ncdc/micro-tab/sample.txt' AS (year:chararray, temperature:int, quality:int);
grunt> filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);
grunt> DUMP filtered_records;

写成pig程序:

$ cat max_temp_filter_udf.pig
--max_temp_filter_udf.pig
REGISTER com/hadoopbook/pig/IsGoodQuality.jar;
DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
records = LOAD '/input/ncdc/micro-tab/sample.txt' AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group, MAX(filtered_records.temperature);
DUMP max_temp;
STORE max_temp INTO 'max_temp_output' USING PigStorage(':');

数据加载和存储

前面已经有用Load加载数据了。下面看看怎么存储数据:

grunt> STORE max_temp INTO 'max_temp_output' USING PigStorage(':');
grunt> cat max_temp_output

数据过滤

FILTERLIMIT 用来过滤行;FOREACH ... GENERATE用来删除、添加列(字段)。

grunt> max_temp = FOREACH grouped_records GENERATE group, MAX(filtered_records.temperature);

STREAM操作

STREAM操作可以让外部程序或脚本对关系中的数据进行变换。这一操作的命名对应于Hadoop的Streaming,后者为MapReduce提供类似能力。

grunt> records = LOAD '/input/ncdc/micro-tab/sample.txt' AS (year:chararray, temperature:int, quality:int);
grunt> second_field = STREAM records THROUGH `cut -f 2`;
grunt> DUMP second_field;

Python写stream脚本

is_good_quality.py程序:

$ cat is_good_quality.py
#!/usr/bin/env python
# encoding: utf-8
import re
import sys

for line in sys.stdin:
    (year, temp, q) = line.strip().split()
    if temp != '9999' and re.match('[01459]', q):
        print('{}\t{}'.format(year, temp))

Pig程序:

$ cat max_temp_filter_stream.pig
--max_temp_filter_stream.pig

DEFINE is_good_quality `is_good_quality.py` SHIP ('is_good_quality.py');
records = LOAD '/input/ncdc/micro-tab/sample.txt' AS (year:chararray, temperature:int, quality:int);
filtered_records = STREAM records THROUGH is_good_quality AS (year:chararray, temperature:int);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group, MAX(filtered_records.temperature);
DUMP max_temp;
STORE max_temp INTO 'max_temp_stream';        

运行:

$ hdfs dfs -put max_temp_filter_stream.pig
$ pig -f max_temp_filter_stream.pig
$ hdfs dfs -cat max_temp_stream/*
1949    111
1950    22

数据分组与连接

连接/JOIN

连接:

grunt> C = JOIN A BY $0, B BY $1;

注:等式连接,即:连接A.$0 == B.$1 的行,结果中的字段由所有输入关系的所有字段组成。

如果要连接的关系太大,不能全部放入内存,则应该使用通用的连接操作。如果有一个关系小到能够全部放入内存,则可以使用分段复制连接(fragment replicate join),它把小的输入关系发生到所有mapper,并在map端使用内存查找表对(分段的)较大的关系进行连接。

分段复制连接:

grunt> C = JOIN A BY $0, B BY $1 USING "replicated";

注:第一个关系必须是大的关系,后面则是一个或多个相对较小的关系。(能够全部放入内存)

实战技巧

并行处理

Pig根据输入数据的大小设置reducer个数:每1GB 输入使用一个reducer,且 reducer 的个数不超过 999。可以设置 pig.exec.reducers.bytes.per.ducer 和 pig.exec.reducers.max 来修改默认设置。

为了告诉 Pig 每个作业要用多少个 reducer ,可以在 reducer 阶段的操作中使用 PARALLEL 子句。在 reduce 阶段使用的操作包括所有的 分组(grouping)、连接 (joining)操作(GROUP, COGROUP, JOIN, CROSS)以及DISTINCT 和 ORDER。

grunt> grouped_records = GROUP records BY year PARALLEL 30;

也可以通过设置 default_paralle 选项达到相同效果:

grunt> set default_parallel 30;

注:map 任务的歌声由输入的大小决定(每个HDFS块一个map),不受PARALLEL 子句影响。

参数替换

以下脚本中,$input, $output 用来指定输入和输出路径:

$ cat max_temp_param.pig
records = LOAD '$input' AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group, MAX(filtered_records.temperature);
STORE max_temp INTO '$output';

运行:

$ pig -param input=/input/ncdc/micro-tab/sample.txt -param output=/tmp/out max_temp_param.pig

也可以将参数放在文件中:

$ cat max_temp_param.param
# Input file
input=/input/ncdc/micro-tab/sample.txt
# Output file
output=/tmp/out

运行:

$ pig -param_file max_temp_param.param max_temp_param.pig