我正在尝试编写一个Hadoop map / reduce类,该类读入一个文本文件,该文件包含 Actor 列表以及他们所播放的电影(每行一部电影),并返回每个 Actor 参加的电影数量。

最后,我希望结果按电影的数量排序(升序或降序都可以)。但是,我的代码似乎按电影标题中的字符数对结果进行排序。我已经尝试了所有可能想到的方法,包括反转输出(Text,IntWritable到IntWritable,Text)并使用其他比较器,但是我无法通过影片计数对结果进行排序。

我敢肯定这是非常简单的事情,但是我无法终生解决。任何建议将不胜感激。

数据文件摘录:

Chan, Jackie (I)    The Forbidden Kingdom   2008
Chan, Jackie (I)    Kung Fu Panda 2 2011
Chan, Jackie (I)    Shanghai Noon   2000
Chan, Jackie (I)    Pik lik for 1995
Chan, Jackie (I)    The Karate Kid  2010
Chan, Jackie (I)    Shanghai Knights    2003
Chan, Jackie (I)    Around the World in 80 Days 2004
Chan, Jackie (I)    Rush Hour   1998
Chan, Jackie (I)    The Tuxedo  2002
Chan, Jackie (I)    Kung Fu Panda   2008
Chan, Jackie (I)    Rush Hour 2 2001
Chan, Jackie (I)    Rush Hour 3 2007
Davi, Robert    Licence to Kill 1989
Davi, Robert    Die Hard    1988
Davi, Robert    The Hot Chick   2002
Davi, Robert    The Goonies 1985

我的代码如下:
// MovieCountByActor.java

package ucsc.hadoop.homework2;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
// import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import ucsc.hadoop.util.ConfigurationUtil;

public class MovieCountByActor extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(MovieCountByActor.class);

    public int run(String[] args) throws Exception {
        // Configuration conf = getConf();
        JobConf conf = new JobConf(getConf(), MovieCountByActor.class);
        conf.setOutputKeyComparatorClass(CountSort.class);
        conf.setOutputValueGroupingComparator(CountSort.class);

        if (args.length != 2) {
            System.err.println("Usage: moviecountbyactor <in> <out>");
            System.exit(2);
        }

        ConfigurationUtil.dumpConfigurations(conf, System.out);

        LOG.info("input: " + args[0] + " output: " + args[1]);

        Job job = new Job(conf, "movie count");
        job.setJarByClass(MovieCountByActor.class);
        job.setMapperClass(MovieTokenizerMapper.class);
        job.setReducerClass(MovieCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setSortComparatorClass(CountSort.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);
        return (result) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MovieCountByActor(), args);
        System.exit(exitCode);
    }

    public static class MovieTokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable ONE = new IntWritable(1);
        private final static Text ACTOR = new Text();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("\\t");

            String actor = "";
            if (tokens.length == 3) {
                actor = tokens[0];
                ACTOR.set(actor);
                context.write(ACTOR, ONE);
            }
        }
    }

    public static class MovieCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text actor, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int movieCountPerActor = 0;
            for (IntWritable count : values) {
                movieCountPerActor += count.get();
            }
            result.set(movieCountPerActor);
            context.write(actor, result);
        }
    }

    public static class CountSort extends WritableComparator {
        protected CountSort() {
            super (IntWritable.class);
        }

        @Override
        public int compare(byte[] b1, int j1, int k1, byte[] b2, int j2, int k2) {
            Integer a = ByteBuffer.wrap(b1, j1, k1).getInt();
            Integer b = ByteBuffer.wrap(b2, j2, k2).getInt();
            return a.compareTo(b) * -1;
        }
    }

}

最佳答案

我认为您对job.setSortComparatorClass(CountSort.class);的操作感到困惑-这是在减小键值之前对它们进行比较的比较器。我认为您只是在检查序列化Text对象的Int部分( Actor 名称),这说明了为什么看到的是 Actor 名称长度的输出(如果您有两个 Actor ,我想您会看到意外的输出)散列到同一reduce实例的名称长度相同。

要按电影数量对输出进行排序,您将需要执行另一个M / R作业以获取第一个作业的输出( Actor 的电影计数),然后使用映射器切换键/值(因此输出键是计数,值是 Actor 名称)。使用单个reducer,您将以电影计数的升序获得actor。

关于java - 按频率对Hadoop结果(类似于字数)进行排序,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/15465012/

10-16 16:16