正文
map/reduce实现 排序
小程序:扫一扫查出行
【扫一扫了解最新限行尾号】
复制小程序
【扫一扫了解最新限行尾号】
复制小程序
import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Sort extends Configured implements Tool {
/*
* 排序
* 输入格式:每个数据占一行
* 输出格式:
* 1 21
* 2 32
* 3 62
* 设计思路:
* 使用reduce自带的默认排序规则。MapReduce按照key值进行排序。如果Key值为Intwritable类型,则按照数字大小排序
* 如果key值为Text类型,则按照字典顺序对字符串进行排序。
* 注意:要重写Partition函数。Reduce排序只能保证自己局部的数据顺序,并不能保证全局的。
* */
public static class Map extends Mapper<LongWritable,Text,IntWritable,IntWritable>{
private IntWritable line=new IntWritable();
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
line.set(Integer.parseInt(value.toString()));
context.write(line, new IntWritable(1));
} } public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
private IntWritable num=new IntWritable(1);
public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
for(IntWritable var:values){
context.write(num, key);
num=new IntWritable(num.get()+1);
}
} } public static class Partition extends Partitioner<IntWritable ,IntWritable>{ @Override
public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
// TODO Auto-generated method stub
System.out.println(numPartitions);
int maxnum=65223;
int bound=maxnum/numPartitions+1;
for(int i=0;i<numPartitions;i++)
{
if(key.get()>=bound*(i-1)&&key.get()<=bound*i)
{
return i;
}
}
return 0;
} } public int run(String[] args)throws Exception{
Configuration conf=new Configuration();
Job job=new Job(conf,"Sort");
job.setJarByClass(Sort.class); job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class); job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success=job.waitForCompletion(true);
return success?0:1;
} public static void main(String[] args)throws Exception{
int ret=ToolRunner.run(new Sort(), args);
System.exit(ret);
} }