本文介绍了如何使用mongoDB中的mapReduce作为输出存储hdfs中的处理数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个mapreduce应用程序,它处理来自HDFS的数据并将输出数据存储在HDFS中。但是,现在我需要将输出数据存储在mongodb中,并且存储它进入HDFS



可以任何一个让我知道该怎么做吗?



谢谢



MAPPER CLASS

  package com.mapReduce; 

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FMapper扩展了Mapper< LongWritable,Text,Text,Text> {
private String pART;
private String actual;
private String fdate;
public void map(LongWritable ikey,Text ivalue,Context context)throws IOException,InterruptedException {
String tempString = ivalue.toString();
String [] data = tempString.split(,);
pART = data [1];
尝试{
fdate = convertyymmdd(data [0]);
/ **如果实际是最后的抬头
* actual = data [2];
* * /
actual = data [data.length-1];
context.write(new Text(pART),new Text(fdate +,+ actual +,+ dynamicVariables(data)));
} catch(ArrayIndexOutOfBoundsException ae){
System.err.println(ae.getMessage());




$ b public static String convertyymmdd(String date){

String dateInString = null;
String data [] = date.split(/);
String month = data [0];
String day = data [1];
字符串年=数据[2];
dateInString = year +/+ month +/+ day;
System.out.println(dateInString);
return dateInString;


public static String dynamicVariables(String [] data){
StringBuilder str = new StringBuilder();
boolean isfirst = true; (int i = 2; i< data)时,
/ **如果实际为最后的头部
*(int i = 3; i< data.length; i ++){* /
。如果(isfirst){
str.append(data [i]);
isfirst = false;
}
else
str.append(,+ data [i]);
}
return str.toString();
}

}

REDUCER CLASS

  package com.mapReduce; 

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import javax.faces.bean.ApplicationScoped;
import javax.faces.bean.ManagedBean;
import javax.faces.bean.ManagedProperty;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.ihub.bo.ForcastBO;
import com.ihub.service.ForcastService;
公共类FReducer扩展了Reducer<文本,文本,文本,文本> {
private String pART;
私人列表< ForcastBO> list = null;
私人清单<清单< String>> listOfList = null;
私人列表< String> vals = null;
private static List< ForcastBO> forcastBos = new ArrayList< ForcastBO>();
$ b @Override
public void reduce(Text _key,Iterable< Text> values,Context context)throws IOException,InterruptedException {
// TODO自动生成的方法存根
pART = _key.toString();
//过程值
(Text val:values){
String tempString = val.toString();
String [] data = tempString.split(,);
ForcastBO fb = new ForcastBO();
fb.setPart(pART);
fb.setDate(data [0]);
fb.setActual(data [1]);
fb.setW0(data [2]);
fb.setW1(data [3]);
fb.setW2(data [4]);
fb.setW3(data [5]);
fb.setW4(data [6]);
fb.setW5(data [7]);
fb.setW6(data [8]);
fb.setW7(data [9]);
尝试{
list.add(fb);
} catch(Exception ae){
System.out.println(ae.getStackTrace()+****+ ae.getMessage()+*****+ ae。 getLocalizedMessage());



@Override
public void run(Context context)抛出IOException,InterruptedException {
setup(context);
while(context.nextKey()){

listOfList = new ArrayList< List< String>>();
list = new ArrayList< ForcastBO>();
reduce(context.getCurrentKey(),context.getValues(),context);
files_WE(listOfList,list,context);

}

}最后{
cleanup(context);



$ b public void files_WE(List< List< String>> listOfList,List< ForcastBO> list,Context context){

Collections.sort(list);

尝试{
setData(listOfList,list);
$ b Collection.sort(listOfList,new Comparator< List< String>>(){
@Override
public int compare(List< String> o1,List< String> o2){
return o1.get(0).compareTo(o2.get(0));
}
});

for(int i = listOfList.size() - 1; i> -1; i--){
List< String> list1 = listOfList.get(i);
int k = 1;
for(int j = 3; j< list1.size(); j ++){
try {
list1.set(j,listOfList.get(i - k).get j)条);
} catch(Exception ex){
list1.set(j,null);
}
k ++;
}

}
} catch(Exception e){
//e.getLocalizedMessage();


(List< String> ls:listOfList){
System.out.println(ls.get(0));
ForcastBO forcastBO = new ForcastBO();
尝试{
forcastBO.setPart(ls.get(0));
forcastBO.setDate(ls.get(1));
forcastBO.setActual(ls.get(2));
forcastBO.setW0(ls.get(3));
forcastBO.setW1(ls.get(4));
forcastBO.setW2(ls.get(5));
forcastBO.setW3(ls.get(6));
forcastBO.setW4(ls.get(7));
forcastBO.setW5(ls.get(8));
forcastBO.setW6(ls.get(9));
forcastBO.setW7(ls.get(10));
forcastBos.add(forcastBO);
} catch(Exception e){
forcastBos.add(forcastBO);
}
尝试{
System.out.println(forcastBO);
//service.setForcastBOs(forBoostBos);
} catch(Exception e){
System.out.println(FB ::::+ e.getStackTrace());







$ b public void setData(List< List< String>> ; listOfList,List< ForcastBO>列表){
List< List< String>> temListOfList = new ArrayList< List< String>>(); (ForcastBO str:list)
{
vals = new ArrayList< String>();
vals.add(str.getPart());
vals.add(str.getDate());
vals.add(str.getActual());
vals.add(str.getW0());
vals.add(str.getW1());
vals.add(str.getW2());
vals.add(str.getW3());
vals.add(str.getW4());
vals.add(str.getW5());
vals.add(str.getW6());
vals.add(str.getW7());
temListOfList.add(vals);



Collections.sort(temListOfList,new Comparator< List< String>>(){
@Override
public int compare(List< ; String> o1,List< String> o2){
return o1.get(1).compareTo(o2.get(1));
}
}); (List< String> ls:temListOfList){
System.out.println(ls);


listOfList.add(ls);
}
}

public static List< ForcastBO> getForcastBos(){
return forcastBos;
}



}

DRIVER CLASS

  package com.mapReduce; 

import java.net.URI;

导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MRDriver {

public static void main(String [] args)throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,JobName);
job.setJarByClass(MRDriver.class);
// TODO:指定映射器
job.setMapperClass(FMapper.class);
// TODO:指定一个reducer
job.setReducerClass(FReducer.class);

// TODO:指定输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
$ b // TODO:删除临时文件
FileSystem hdfs = FileSystem.get(new URI(hdfs:// localhost:9000),
conf);
Path workingDir = hdfs.getWorkingDirectory();

路径newFolderPath = new Path(/ sd1);
newFolderPath = Path.mergePaths(workingDir,newFolderPath);
if(hdfs.exists(newFolderPath))

{
hdfs.delete(newFolderPath); //删除现有目录

$ b // TODO:指定输入和输出DIRECTORIES(不是文件)

FileInputFormat.setInputPaths(job,new Path(hdfs ://本地主机:9000 /推算未来/的sampleData));
FileOutputFormat.setOutputPath(job,newFolderPath);

if(!job.waitForCompletion(true))
return;
}
}


解决方案

基本上你需要的是改变输出格式类,你有几种方式:


  1. 使用 MongoDB Connector for Hadoop :(而不是使用FileOutputFormat)。

  2. 在reducer中执行 mongodb查询,而不是在MapREduce上下文中编写(不是很好,您可以在HDFS中根据OutputFormat在我看来,选项1是最好的选择,但我没有使用MongoDB连接器来说明它是否足够稳定和功能。选项2要求你真正理解如何工作hadoop发泄,以避免结束大量的交易和hadoop任务重试的开放连接和问题。


    I have a mapreduce application which processed the data from HDFS and stores output data in HDFS

    but, now i need to store output data in mongodb insted of storing it in to HDFS

    can any one let me know how to do it?

    Thank you

    MAPPER CLASS

    package com.mapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class FMapper extends Mapper<LongWritable, Text, Text, Text> {
        private String pART;
        private String actual;
        private String fdate;
        public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
            String tempString = ivalue.toString();
            String[] data = tempString.split(",");
            pART=data[1];
            try{
                fdate=convertyymmdd(data[0]);
                /**IF ACTUAL IS LAST HEADER
                 * actual=data[2];
                 * */
                actual=data[data.length-1];
                context.write(new Text(pART), new Text(fdate+","+actual+","+dynamicVariables(data)));
            }catch(ArrayIndexOutOfBoundsException ae){
                System.err.println(ae.getMessage());
            }
    
        }
    
    
        public static String convertyymmdd(String date){
    
            String dateInString=null;
            String data[] =date.split("/");
            String month=data[0];
            String day=data[1];
            String year=data[2];
            dateInString =year+"/"+month+"/"+day;
            System.out.println(dateInString);   
            return dateInString;
        }
    
        public static String dynamicVariables(String[] data){
            StringBuilder str=new StringBuilder();
            boolean isfirst=true; 
        /** IF ACTUAL IS LAST HEADER
         * for(int i=3;i<data.length;i++){ */
            for(int i=2;i<data.length-1;i++){
    
                if(isfirst){
                    str.append(data[i]);
                    isfirst=false;
                }
                else
                str.append(","+data[i]);
            }
            return str.toString();
            }
    
    }
    

    REDUCER CLASS

    package com.mapReduce;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.List;
    
    import javax.faces.bean.ApplicationScoped;
    import javax.faces.bean.ManagedBean;
    import javax.faces.bean.ManagedProperty;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import com.ihub.bo.ForcastBO;
    import com.ihub.service.ForcastService;
    public class FReducer extends Reducer<Text, Text, Text, Text> {
        private String pART;
        private List<ForcastBO> list = null;
        private List<List<String>> listOfList = null;
        private List<String> vals = null;
        private static List<ForcastBO> forcastBos=new ArrayList<ForcastBO>();
    
        @Override
        public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // TODO Auto-generated method stub
            pART = _key.toString();
            // process values
            for (Text val : values) {
                String tempString = val.toString();
                String[] data = tempString.split(",");
                ForcastBO fb=new ForcastBO();
                fb.setPart(pART);
                fb.setDate(data[0]);
                fb.setActual(data[1]);
                fb.setW0(data[2]);
                fb.setW1(data[3]);
                fb.setW2(data[4]);
                fb.setW3(data[5]);
                fb.setW4(data[6]);
                fb.setW5(data[7]);
                fb.setW6(data[8]);
                fb.setW7(data[9]);
                try {
                    list.add(fb);
                } catch (Exception ae) {
                    System.out.println(ae.getStackTrace() + "****" + ae.getMessage() + "*****" + ae.getLocalizedMessage());
                }
            }   
        }
    
        @Override
        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
              while (context.nextKey()) {
    
             listOfList = new ArrayList<List<String>>();
             list=new ArrayList<ForcastBO>();
                reduce(context.getCurrentKey(), context.getValues(), context);
                files_WE(listOfList, list, context);
    
              }
    
              }finally {
                  cleanup(context);
                }
        }
    
    
        public void files_WE(List<List<String>> listOfList, List<ForcastBO> list, Context context) {
    
            Collections.sort(list);
    
                try {
                    setData(listOfList, list);
    
                    Collections.sort(listOfList, new Comparator<List<String>>() {
                        @Override
                        public int compare(List<String> o1, List<String> o2) {
                            return o1.get(0).compareTo(o2.get(0));
                        }
                    });
    
                    for (int i = listOfList.size() - 1; i > -1; i--) {
                        List<String> list1 = listOfList.get(i);
                        int k = 1;
                        for (int j = 3; j < list1.size(); j++) {
                            try {
                                list1.set(j, listOfList.get(i - k).get(j));
                            } catch (Exception ex) {
                                list1.set(j, null);
                            }
                            k++;
                        }
    
                    }
                } catch (Exception e) {
                    //e.getLocalizedMessage();
                }
    
                for(List<String> ls:listOfList){
                    System.out.println(ls.get(0));
                    ForcastBO forcastBO=new ForcastBO();
                    try{
                        forcastBO.setPart(ls.get(0));
                        forcastBO.setDate(ls.get(1));
                        forcastBO.setActual(ls.get(2));
                        forcastBO.setW0(ls.get(3));
                        forcastBO.setW1(ls.get(4));
                        forcastBO.setW2(ls.get(5));
                        forcastBO.setW3(ls.get(6));
                        forcastBO.setW4(ls.get(7));
                        forcastBO.setW5(ls.get(8));
                        forcastBO.setW6(ls.get(9));
                        forcastBO.setW7(ls.get(10));
                        forcastBos.add(forcastBO);
                        }catch(Exception e){
                            forcastBos.add(forcastBO);
                        }
                    try{
                        System.out.println(forcastBO);
                        //service.setForcastBOs(forcastBos);
                }catch(Exception e){
                    System.out.println("FB::::"+e.getStackTrace());
                }
                }
        }
    
    
    
    
    
            public void setData(List<List<String>> listOfList, List<ForcastBO> list) {
                List<List<String>> temListOfList=new ArrayList<List<String>>();
                for (ForcastBO str : list) {
                    vals = new ArrayList<String>();
                    vals.add(str.getPart());
                    vals.add(str.getDate());
                    vals.add(str.getActual());
                    vals.add(str.getW0());
                    vals.add(str.getW1());
                    vals.add(str.getW2());
                    vals.add(str.getW3());
                    vals.add(str.getW4());
                    vals.add(str.getW5());
                    vals.add(str.getW6());
                    vals.add(str.getW7());
                    temListOfList.add(vals);
                }
    
    
                Collections.sort(temListOfList, new Comparator<List<String>>() {
                    @Override
                    public int compare(List<String> o1, List<String> o2) {
                        return o1.get(1).compareTo(o2.get(1));
                    }
                });
    
                for(List<String> ls:temListOfList){
                    System.out.println(ls);
                    listOfList.add(ls);
                    }
            }
    
            public static List<ForcastBO> getForcastBos() {
                return forcastBos;
            }
    
    
    
        }
    

    DRIVER CLASS

    package com.mapReduce;
    
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class MRDriver {
    
        public static void main(String[] args)  throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "JobName");
            job.setJarByClass(MRDriver.class);
            // TODO: specify a mapper
            job.setMapperClass(FMapper.class);
            // TODO: specify a reducer
            job.setReducerClass(FReducer.class);
    
            // TODO: specify output types
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            // TODO: delete temp file
            FileSystem hdfs = FileSystem.get(new URI("hdfs://localhost:9000"),
                    conf); 
            Path workingDir=hdfs.getWorkingDirectory();
    
            Path newFolderPath= new Path("/sd1");
            newFolderPath=Path.mergePaths(workingDir, newFolderPath);
            if(hdfs.exists(newFolderPath))
    
            {
                hdfs.delete(newFolderPath); //Delete existing Directory
    
            }
            // TODO: specify input and output DIRECTORIES (not files)
    
            FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/Forcast/SampleData"));
            FileOutputFormat.setOutputPath(job, newFolderPath);
    
            if (!job.waitForCompletion(true))
                return;
        }
    }
    
    解决方案

    Basically what you need is to change the "output format class",and you have several ways there:

    1. Use MongoDB Connector for Hadoop: http://docs.mongodb.org/ecosystem/tools/hadoop/?_ga=1.111209414.370990604.1441913822
    2. Implement your own OutputFormat: https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/OutputFormat.html (Instead to use FileOutputFormat).
    3. Execute mongodb queries inside the reducer instead to write in the MapREduce context (Not Nice, you could end with empty outut files in HDFS depending on the OutputFormat specified in the driver)

    In my opinion option 1 is the best option but I haven't used MongoDB connector to say if it is enough stable and functional. Option 2 require that you really understand how is working hadoop underhood to avoid end with a lot of open connections and problems with transactions and hadoop tasks retries.

    这篇关于如何使用mongoDB中的mapReduce作为输出存储hdfs中的处理数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-15 21:27