emmm我知道可以使用一个map和一个job写到多张表,但是,貌似没有找到别人像我这么做的,所以我就写出来试试
map1:

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.hbase.client.Put;

static class Mapper1 extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{
    private ImmutableBytesWritable tbl1 = new ImmutableBytesWritable (Bytes.toBytes(1));
    @Override
    public void map(LongWritable key ,Text value, Context context) throws IOException,InterruptedException{
        if(逻辑){
            byte[] rowKey = Bytes.toBytes(主键名);
            Put p =new Put(rowKey);
            p.addColumn(Bytes.toBytes(列族名),Bytes.toBytes(列名),Bytes.toBytes());
            //具体各种操作之类的略
            context.write(tbl1,p);
        }
    }
}

然后是map2,也是一样的:

static class Mapper2 extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{
    private ImmutableBytesWritable tbl2 = new ImmutableBytesWritable (Bytes.toBytes(2));
    @Override
    public void map(LongWritable key ,Text value, Context context) throws IOException,InterruptedException{
        if(逻辑){
            byte[] rowKey = Bytes.toBytes(主键名);
            Put p =new Put(rowKey);
            p.addColumn(Bytes.toBytes(列族名),Bytes.toBytes(列名),Bytes.toBytes());
            //具体各种操作之类的略
            context.write(tbl2,p);
        }
    }
}

Driver类:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;

static class HBaseDriver extends Configured implements Tool{

		@Override
        public int run(String[] strings) throws Exception {
            Job job = Job.getInstance(getConf(),getClass().getSimpleName());
            job.setJarByClass(getClass());
            Configuration conf = job.getConfiguration();
            conf.set("hbase.zookeeper.quorum", "master");
            //hbase存储数据的地方,hbase-site.xml中配置
            conf.set("hbase.rootdir", "hdfs://master:9000/hbase");
			//conf的其他设置就略啦

            Path path1=new Path(strings[0]);
            Path path2 = new Path(strings[1]);

            MultipleInputs.addInputPath(job,path1,TextInputFormat.class,Mapper1.class);
            MultipleInputs.addInputPath(job,path2,TextInputFormat.class,Mapper2.class);

            job.setOutputFormatClass(MultiTableOutputFormat.class);
            job.setNumReduceTasks(0);

            return job.waitForCompletion(true)? 0 : 1;
        }

        public static void main(String[] args) throws Exception {
            int exitCode= ToolRunner.run(HBaseConfiguration.create(),new HBaseDriver(),args);
            System.exit(exitCode);
        }
}

job.setOutputFormatClass(MultiTableOutputFormat.class);这个设置能够输出到多张表中,将map的输出类型改成ImmutableBytesWritable行键和Put添加的内容。
而job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,“表名”)只能输入到某个表中。
当然,如果说表的名字想也写成动态的,可以啊。
在Driver类中,加入:

conf.set("table1",args[2]);
conf.set("table2",args[3]);

这两行的位置要靠前,在MultipleInputs之前
然后在Mapper类中加入如下:

public  void setup(Context context){
		tbl1 = new ImmutableBytesWritable(context.getConfiguration().get("table1"));
	}

类似如此,其他的诸如列族名,列名,也可以通过这样的方式去设置。

02-27 03:29