这两天看Hadoop的排序方面的问题,看到了下面这篇文章,觉得挺好,结合自己已经了解的知识,将排序、对组合键的使用等方面的知识有个更清楚的认识,将文章摘下来放在这儿,有空再看看。
摘自http://www.fuzhijie.me/?p=34
我想涉及到文件的Join操作应该都要使用到二次排序吧,之前我用字符串拼接的方法显得太不专业了,本来在reduce过程中是不需要保存这些数据的,遍历一次便可以将记录全部collect好。Hadoop 0.20包里面有一个SecondarySort的例子程序,结合公司牛人写的一个ppt,终于搞明白了。呵呵,刚好也用上了,所以总结一下。
Hadoop提供了几种默认类型如果Text,LongWritable等,它们都实现了WritableComparable接口,因此具有比较和序列化的能力。要实现二次排序,我想大概有两种办法。第一种是使用自定义类型,该类型实现WritableComparable接口,给原始数据添加一个权值,通过权值来改变排序的结果;第二种方法是给记录的key做一些不同的标记,比如有些在最前面加上一个’+'前缀,另一些前面加上’-'前缀,通过这些前缀来决定排序的规则。这两种办法都要实现自己的分区函数和分组函数,因为key已经被改变了,显然第一种方法感觉要专业一点,但是第二种方法感觉要高效一些,因为不需要类来封装。
我使用了第一种方法来实现二次排序,需求是做一个一对多的文件连接。来一个形象的比喻,比如一个人去商场买东西,他推着购物车,每个商品都有自己唯一的编号。因此数据有两部分组成:
1、用户对商品编号,这是一对多的。数据保存在base.dat文件中。
2、商品编号对商品的信息,其中包括商品的价格,这是一对一的。数据保存在spu.dat文件中。
最后要生成用户对应商品价格记录,这样就可以统计出用户购买商品的总价格。这两个文件通过商品的编号连接。
程序很简单,自己定义了一个UserKey类,在这个类封装了原始数据,另外添加一个权重属性,排序时需要将商品对商品价格排到最前面去。注意这个compareTo方法返回值的涵义,返回-1表示自己要排在比较的记录之前,返回1表示自己要排在比较的记录之前,之前我一直以为返回1表示大于的意思,结果程序就出现了奇怪的现象。Hadoop没有使用Java默认的序列化方式,用户必须负责自定义类型的序列化接口的实现,我感觉下面的程序写得不够专业,这是我比较惯用的序列化手段,如果使用SequenceFileOutputFormat保存输出结果,可以看到对象序列化后的数据的保存方式,不过Java虚拟机统一了数据格式,因此不能使用C/C++的思维来观察这些数据,但是也差不多了。
全部源代码如下:
package taobao;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.InverseMapper;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.util.*;
public class CK {
public static class Key implements WritableComparable<Key> {
public String id = "";
public short weight;
public Key() {
}
public Key(String i, short j) {
id = i;
}
@Override
public void readFields(DataInput in) throws IOException {
// 先写字符串的长度信息
int length = in.readInt();
byte[] buf = new byte[length];
in.readFully(buf, 0, length);
// 得到id号
id = new String(buf);
// 得到权值
weight = in.readShort();
}
@Override
public void write(DataOutput out) throws IOException {
System.out.println("in write");
String str = id.toString();
int length = str.length();
byte[] buf = str.getBytes();
// 先写字符串长度
// WritableUtils.writeVInt(out, length);
out.writeInt(length);
// 再写字符串数据
out.write(buf, 0, length);
// 接着是权值
out.writeShort(weight);
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public String toString() {
return id;
}
@Override
public boolean equals(Object right) {
System.out.println("in equals");
// 只要id相等就认为两个key相等
if (right instanceof Key) {
Key r = (Key) right;
return r.id.equals(id);
} else {
return false;
}
}
@Override
public int compareTo(Key k) {
System.out.println("in compareTo, key=" + k.toString());
// 先比较value id
int cmp = id.compareTo(k.id);
if (cmp != 0) {
return cmp;
}
// 如果value id相等,再比较权值
if (weight > k.weight)
return -1;
else if (weight < k.weight)
return 1;
else
return 0;
}
}
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Key, Text> {
public void map(LongWritable l, Text value,
OutputCollector<Key, Text> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] pair = line.split("\t");
if (pair.length == 3) {
//key->商品编号,value->购买者
Key k = new Key(pair[1], (short) 0);
output.collect(k, new Text(pair[0]));
} else {
//key->商品编号,value->商品价格
Key k = new Key(pair[0], (short) 1);
output.collect(k, new Text(pair[1]));
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Key, Text, Text, Text> {
public void reduce(Key key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
// 此处一定要new出一个新对象来,否则结果不会正确
// 这都是Java引用导致的问题
Text second = new Text(values.next());
while (values.hasNext()) {
output.collect(values.next(), second);
}
}
}
public static class FirstGroupingComparator implements RawComparator<Key> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,
b2, s2, Integer.SIZE / 8);
}
// 完全根据id来分区
@Override
public int compare(Key o1, Key o2) {
System.out.println("in group compare");
String l = o1.id.toString();
String r = o2.id.toString();
int res = l.compareTo(r);
System.out.println("res=" + res);
return res;
}
}
public static class FirstPartitioner implements Partitioner<Key, Text> {
@Override
public void configure(JobConf job) {
// TODO Auto-generated method stub
}
@Override
public int getPartition(Key key, Text value, int numPartitions) {
System.out.println("in FirstPartitioner");
return Math.abs(key.id.hashCode()) % numPartitions;
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(CK.class);
conf.setJobName("Composite key");
// 设置Map输出的key和value的类型
conf.setMapOutputKeyClass(Key.class);
conf.setMapOutputValueClass(Text.class);
// 设置Reduce输出的key和value的类型
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
// 设置Mapper和Reducer
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
// 设置group函数和分区函数
conf.setOutputValueGroupingComparator(FirstGroupingComparator.class);
conf.setPartitionerClass(FirstPartitioner.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
// conf.setOutputFormat(SequenceFileOutputFormat.class);
// 如果输出目录已经存在,那么先将其删除
FileSystem fstm = FileSystem.get(conf);
Path outDir = new Path(args[1]);
fstm.delete(outDir, true);
// 设置输入输出目录
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, outDir);
JobClient.runJob(conf);
}
}