本文介绍了具有混合数据源的MapReduce作业:HBase表和HDFS文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要实现一个从HBase表和HDFS文件访问数据的MR作业。例如,mapper从HBase表和HDFS文件读取数据,这些数据共享相同的主键但具有不同的模式。 Reducer然后将所有列(来自HBase表和HDFS文件)连接在一起。

我尝试在线查找,找不到用这种混合数据源运行MR作业的方法。 MultipleInputs似乎只适用于多个HDFS数据源。请让我知道你是否有一些想法。示例代码会很棒。

解决方案

经过几天的调查(并从HBase用户邮件列表中获得帮助),我终于想出了如何做到这一点。这里是源代码:

  public class MixMR {

public static class Map extends Mapper< Object ,文字,文字,文字> {

public void map(Object key,Text value,Context context)throws IOException,InterruptedException {
String s = value.toString();
String [] sa = s.split(,);
if(sa.length == 2){
context.write(new Text(sa [0]),new Text(sa [1]));
}

}

}

公共静态类TableMap扩展TableMapper< Text,Text> {
public static final byte [] CF =cf.getBytes();
public static final byte [] ATTR1 =c1.getBytes();
$ b $ public void map(ImmutableBytesWritable row,Result value,Context context)throws IOException,InterruptedException {

String key = Bytes.toString(row.get());
String val = new String(value.getValue(CF,ATTR1));

context.write(new Text(key),new Text(val));
}
}


public static class Reduce extends Reducer< Object,Text,Object,Text> {
public void reduce(Object key,Iterable< Text> values,Context context)
throws IOException,InterruptedException {
String ks = key.toString();
for(Text val:values){
context.write(new Text(ks),val);




$ b public static void main(String [] args)throws Exception {
Path inputPath1 = new Path( ARGS [0]);
Path inputPath2 = new Path(args [1]);
Path outputPath = new Path(args [2]);

String tableName =test;

配置config = HBaseConfiguration.create();
Job job = new Job(config,ExampleRead);
job.setJarByClass(MixMR.class); //包含映射器的类

Scan scan = new Scan();
scan.setCaching(500); // 1是Scan中的默认设置,这对于MapReduce作业会很糟糕
scan.setCacheBlocks(false); //不要为MR作业设置为true
scan.addFamily(Bytes.toBytes(cf));

TableMapReduceUtil.initTableMapperJob(
tableName,//输入HBase表名
scan,//扫描实例以控制CF和属性选择
TableMap.class,//映射器
Text.class,//映射器输出键
Text.class,//映射器输出值
job);


job.setReducerClass(Reduce.class); // reducer class
job.setOutputFormatClass(TextOutputFormat.class);


// inputPath1这里对HBase表
没有影响MultipleInputs.addInputPath(job,inputPath1,TextInputFormat.class,Map.class);
MultipleInputs.addInputPath(job,inputPath2,TableInputFormat.class,TableMap.class);

FileOutputFormat.setOutputPath(job,outputPath);

job.waitForCompletion(true);
}

}


I need to implement a MR job which access data from both HBase table and HDFS files. E.g., mapper reads data from HBase table and from HDFS files, these data share the same primary key but have different schema. A reducer then join all columns (from HBase table and HDFS files) together.

I tried look online and could not find a way to run MR job with such mixed data source. MultipleInputs seem only work for multiple HDFS data sources. Please let me know if you have some ideas. Sample code would be great.

解决方案

After a few days of investigation (and get help from HBase user mailing list), I finally figured out how to do it. Here is the source code:

public class MixMR {

public static class Map extends Mapper<Object, Text, Text, Text> {

    public void map(Object key, Text value, Context context) throws IOException,   InterruptedException {
        String s = value.toString();
        String[] sa = s.split(",");
        if (sa.length == 2) {
            context.write(new Text(sa[0]), new Text(sa[1]));
        }

    }

}

public static class TableMap extends TableMapper<Text, Text>  {
    public static final byte[] CF = "cf".getBytes();
    public static final byte[] ATTR1 = "c1".getBytes();

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {

        String key = Bytes.toString(row.get());
        String val = new String(value.getValue(CF, ATTR1));

        context.write(new Text(key), new Text(val));
    }
}


public static class Reduce extends Reducer  <Object, Text, Object, Text> {
    public void reduce(Object key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String ks = key.toString();
        for (Text val : values){
            context.write(new Text(ks), val);
        }

    }
}

public static void main(String[] args) throws Exception {
Path inputPath1 = new Path(args[0]);
    Path inputPath2 = new Path(args[1]);
    Path outputPath = new Path(args[2]);

    String tableName = "test";

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "ExampleRead");
    job.setJarByClass(MixMR.class);     // class that contains mapper

    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs
    scan.addFamily(Bytes.toBytes("cf"));

    TableMapReduceUtil.initTableMapperJob(
            tableName,        // input HBase table name
              scan,             // Scan instance to control CF and attribute selection
              TableMap.class,   // mapper
              Text.class,             // mapper output key
              Text.class,             // mapper output value
              job);


    job.setReducerClass(Reduce.class);    // reducer class
    job.setOutputFormatClass(TextOutputFormat.class);   


    // inputPath1 here has no effect for HBase table
    MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, Map.class);
    MultipleInputs.addInputPath(job, inputPath2,  TableInputFormat.class, TableMap.class);

    FileOutputFormat.setOutputPath(job, outputPath); 

    job.waitForCompletion(true);
}

}

这篇关于具有混合数据源的MapReduce作业:HBase表和HDFS文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-20 01:23