问题描述
我有一个mapreduce应用程序,它处理来自HDFS的数据并将输出数据存储在HDFS中。但是,现在我需要将输出数据存储在mongodb中,并且存储它进入HDFS
可以任何一个让我知道该怎么做吗?
谢谢
MAPPER CLASS
package com.mapReduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FMapper扩展了Mapper< LongWritable,Text,Text,Text> {
private String pART;
private String actual;
private String fdate;
public void map(LongWritable ikey,Text ivalue,Context context)throws IOException,InterruptedException {
String tempString = ivalue.toString();
String [] data = tempString.split(,);
pART = data [1];
尝试{
fdate = convertyymmdd(data [0]);
/ **如果实际是最后的抬头
* actual = data [2];
* * /
actual = data [data.length-1];
context.write(new Text(pART),new Text(fdate +,+ actual +,+ dynamicVariables(data)));
} catch(ArrayIndexOutOfBoundsException ae){
System.err.println(ae.getMessage());
$ b public static String convertyymmdd(String date){
String dateInString = null;
String data [] = date.split(/);
String month = data [0];
String day = data [1];
字符串年=数据[2];
dateInString = year +/+ month +/+ day;
System.out.println(dateInString);
return dateInString;
public static String dynamicVariables(String [] data){
StringBuilder str = new StringBuilder();
boolean isfirst = true; (int i = 2; i< data)时,
/ **如果实际为最后的头部
*(int i = 3; i< data.length; i ++){* /
。如果(isfirst){
str.append(data [i]);
isfirst = false;
}
else
str.append(,+ data [i]);
}
return str.toString();
}
}
REDUCER CLASS
package com.mapReduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import javax.faces.bean.ApplicationScoped;
import javax.faces.bean.ManagedBean;
import javax.faces.bean.ManagedProperty;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.ihub.bo.ForcastBO;
import com.ihub.service.ForcastService;
公共类FReducer扩展了Reducer<文本,文本,文本,文本> {
private String pART;
私人列表< ForcastBO> list = null;
私人清单<清单< String>> listOfList = null;
私人列表< String> vals = null;
private static List< ForcastBO> forcastBos = new ArrayList< ForcastBO>();
$ b @Override
public void reduce(Text _key,Iterable< Text> values,Context context)throws IOException,InterruptedException {
// TODO自动生成的方法存根
pART = _key.toString();
//过程值
(Text val:values){
String tempString = val.toString();
String [] data = tempString.split(,);
ForcastBO fb = new ForcastBO();
fb.setPart(pART);
fb.setDate(data [0]);
fb.setActual(data [1]);
fb.setW0(data [2]);
fb.setW1(data [3]);
fb.setW2(data [4]);
fb.setW3(data [5]);
fb.setW4(data [6]);
fb.setW5(data [7]);
fb.setW6(data [8]);
fb.setW7(data [9]);
尝试{
list.add(fb);
} catch(Exception ae){
System.out.println(ae.getStackTrace()+****+ ae.getMessage()+*****+ ae。 getLocalizedMessage());
@Override
public void run(Context context)抛出IOException,InterruptedException {
setup(context);
while(context.nextKey()){
listOfList = new ArrayList< List< String>>();
list = new ArrayList< ForcastBO>();
reduce(context.getCurrentKey(),context.getValues(),context);
files_WE(listOfList,list,context);
}
}最后{
cleanup(context);
$ b public void files_WE(List< List< String>> listOfList,List< ForcastBO> list,Context context){
Collections.sort(list);
尝试{
setData(listOfList,list);
$ b Collection.sort(listOfList,new Comparator< List< String>>(){
@Override
public int compare(List< String> o1,List< String> o2){
return o1.get(0).compareTo(o2.get(0));
}
});
for(int i = listOfList.size() - 1; i> -1; i--){
List< String> list1 = listOfList.get(i);
int k = 1;
for(int j = 3; j< list1.size(); j ++){
try {
list1.set(j,listOfList.get(i - k).get j)条);
} catch(Exception ex){
list1.set(j,null);
}
k ++;
}
}
} catch(Exception e){
//e.getLocalizedMessage();
(List< String> ls:listOfList){
System.out.println(ls.get(0));
ForcastBO forcastBO = new ForcastBO();
尝试{
forcastBO.setPart(ls.get(0));
forcastBO.setDate(ls.get(1));
forcastBO.setActual(ls.get(2));
forcastBO.setW0(ls.get(3));
forcastBO.setW1(ls.get(4));
forcastBO.setW2(ls.get(5));
forcastBO.setW3(ls.get(6));
forcastBO.setW4(ls.get(7));
forcastBO.setW5(ls.get(8));
forcastBO.setW6(ls.get(9));
forcastBO.setW7(ls.get(10));
forcastBos.add(forcastBO);
} catch(Exception e){
forcastBos.add(forcastBO);
}
尝试{
System.out.println(forcastBO);
//service.setForcastBOs(forBoostBos);
} catch(Exception e){
System.out.println(FB ::::+ e.getStackTrace());
$ b public void setData(List< List< String>> ; listOfList,List< ForcastBO>列表){
List< List< String>> temListOfList = new ArrayList< List< String>>(); (ForcastBO str:list)
{
vals = new ArrayList< String>();
vals.add(str.getPart());
vals.add(str.getDate());
vals.add(str.getActual());
vals.add(str.getW0());
vals.add(str.getW1());
vals.add(str.getW2());
vals.add(str.getW3());
vals.add(str.getW4());
vals.add(str.getW5());
vals.add(str.getW6());
vals.add(str.getW7());
temListOfList.add(vals);
Collections.sort(temListOfList,new Comparator< List< String>>(){
@Override
public int compare(List< ; String> o1,List< String> o2){
return o1.get(1).compareTo(o2.get(1));
}
}); (List< String> ls:temListOfList){
System.out.println(ls);
listOfList.add(ls);
}
}
public static List< ForcastBO> getForcastBos(){
return forcastBos;
}
}
DRIVER CLASS
package com.mapReduce;
import java.net.URI;
导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRDriver {
public static void main(String [] args)throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,JobName);
job.setJarByClass(MRDriver.class);
// TODO:指定映射器
job.setMapperClass(FMapper.class);
// TODO:指定一个reducer
job.setReducerClass(FReducer.class);
// TODO:指定输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
$ b // TODO:删除临时文件
FileSystem hdfs = FileSystem.get(new URI(hdfs:// localhost:9000),
conf);
Path workingDir = hdfs.getWorkingDirectory();
路径newFolderPath = new Path(/ sd1);
newFolderPath = Path.mergePaths(workingDir,newFolderPath);
if(hdfs.exists(newFolderPath))
{
hdfs.delete(newFolderPath); //删除现有目录
$ b // TODO:指定输入和输出DIRECTORIES(不是文件)
FileInputFormat.setInputPaths(job,new Path(hdfs ://本地主机:9000 /推算未来/的sampleData));
FileOutputFormat.setOutputPath(job,newFolderPath);
if(!job.waitForCompletion(true))
return;
}
}
基本上你需要的是改变输出格式类,你有几种方式:
- 使用 MongoDB Connector for Hadoop :(而不是使用FileOutputFormat)。
- 在reducer中执行 mongodb查询,而不是在MapREduce上下文中编写(不是很好,您可以在HDFS中根据OutputFormat在我看来,选项1是最好的选择,但我没有使用MongoDB连接器来说明它是否足够稳定和功能。选项2要求你真正理解如何工作hadoop发泄,以避免结束大量的交易和hadoop任务重试的开放连接和问题。
I have a mapreduce application which processed the data from HDFS and stores output data in HDFS
but, now i need to store output data in mongodb insted of storing it in to HDFS
can any one let me know how to do it?
Thank you
MAPPER CLASS
package com.mapReduce; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FMapper extends Mapper<LongWritable, Text, Text, Text> { private String pART; private String actual; private String fdate; public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { String tempString = ivalue.toString(); String[] data = tempString.split(","); pART=data[1]; try{ fdate=convertyymmdd(data[0]); /**IF ACTUAL IS LAST HEADER * actual=data[2]; * */ actual=data[data.length-1]; context.write(new Text(pART), new Text(fdate+","+actual+","+dynamicVariables(data))); }catch(ArrayIndexOutOfBoundsException ae){ System.err.println(ae.getMessage()); } } public static String convertyymmdd(String date){ String dateInString=null; String data[] =date.split("/"); String month=data[0]; String day=data[1]; String year=data[2]; dateInString =year+"/"+month+"/"+day; System.out.println(dateInString); return dateInString; } public static String dynamicVariables(String[] data){ StringBuilder str=new StringBuilder(); boolean isfirst=true; /** IF ACTUAL IS LAST HEADER * for(int i=3;i<data.length;i++){ */ for(int i=2;i<data.length-1;i++){ if(isfirst){ str.append(data[i]); isfirst=false; } else str.append(","+data[i]); } return str.toString(); } }
REDUCER CLASS
package com.mapReduce; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import javax.faces.bean.ApplicationScoped; import javax.faces.bean.ManagedBean; import javax.faces.bean.ManagedProperty; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.ihub.bo.ForcastBO; import com.ihub.service.ForcastService; public class FReducer extends Reducer<Text, Text, Text, Text> { private String pART; private List<ForcastBO> list = null; private List<List<String>> listOfList = null; private List<String> vals = null; private static List<ForcastBO> forcastBos=new ArrayList<ForcastBO>(); @Override public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub pART = _key.toString(); // process values for (Text val : values) { String tempString = val.toString(); String[] data = tempString.split(","); ForcastBO fb=new ForcastBO(); fb.setPart(pART); fb.setDate(data[0]); fb.setActual(data[1]); fb.setW0(data[2]); fb.setW1(data[3]); fb.setW2(data[4]); fb.setW3(data[5]); fb.setW4(data[6]); fb.setW5(data[7]); fb.setW6(data[8]); fb.setW7(data[9]); try { list.add(fb); } catch (Exception ae) { System.out.println(ae.getStackTrace() + "****" + ae.getMessage() + "*****" + ae.getLocalizedMessage()); } } } @Override public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { listOfList = new ArrayList<List<String>>(); list=new ArrayList<ForcastBO>(); reduce(context.getCurrentKey(), context.getValues(), context); files_WE(listOfList, list, context); } }finally { cleanup(context); } } public void files_WE(List<List<String>> listOfList, List<ForcastBO> list, Context context) { Collections.sort(list); try { setData(listOfList, list); Collections.sort(listOfList, new Comparator<List<String>>() { @Override public int compare(List<String> o1, List<String> o2) { return o1.get(0).compareTo(o2.get(0)); } }); for (int i = listOfList.size() - 1; i > -1; i--) { List<String> list1 = listOfList.get(i); int k = 1; for (int j = 3; j < list1.size(); j++) { try { list1.set(j, listOfList.get(i - k).get(j)); } catch (Exception ex) { list1.set(j, null); } k++; } } } catch (Exception e) { //e.getLocalizedMessage(); } for(List<String> ls:listOfList){ System.out.println(ls.get(0)); ForcastBO forcastBO=new ForcastBO(); try{ forcastBO.setPart(ls.get(0)); forcastBO.setDate(ls.get(1)); forcastBO.setActual(ls.get(2)); forcastBO.setW0(ls.get(3)); forcastBO.setW1(ls.get(4)); forcastBO.setW2(ls.get(5)); forcastBO.setW3(ls.get(6)); forcastBO.setW4(ls.get(7)); forcastBO.setW5(ls.get(8)); forcastBO.setW6(ls.get(9)); forcastBO.setW7(ls.get(10)); forcastBos.add(forcastBO); }catch(Exception e){ forcastBos.add(forcastBO); } try{ System.out.println(forcastBO); //service.setForcastBOs(forcastBos); }catch(Exception e){ System.out.println("FB::::"+e.getStackTrace()); } } } public void setData(List<List<String>> listOfList, List<ForcastBO> list) { List<List<String>> temListOfList=new ArrayList<List<String>>(); for (ForcastBO str : list) { vals = new ArrayList<String>(); vals.add(str.getPart()); vals.add(str.getDate()); vals.add(str.getActual()); vals.add(str.getW0()); vals.add(str.getW1()); vals.add(str.getW2()); vals.add(str.getW3()); vals.add(str.getW4()); vals.add(str.getW5()); vals.add(str.getW6()); vals.add(str.getW7()); temListOfList.add(vals); } Collections.sort(temListOfList, new Comparator<List<String>>() { @Override public int compare(List<String> o1, List<String> o2) { return o1.get(1).compareTo(o2.get(1)); } }); for(List<String> ls:temListOfList){ System.out.println(ls); listOfList.add(ls); } } public static List<ForcastBO> getForcastBos() { return forcastBos; } }
DRIVER CLASS
package com.mapReduce; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MRDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); job.setJarByClass(MRDriver.class); // TODO: specify a mapper job.setMapperClass(FMapper.class); // TODO: specify a reducer job.setReducerClass(FReducer.class); // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // TODO: delete temp file FileSystem hdfs = FileSystem.get(new URI("hdfs://localhost:9000"), conf); Path workingDir=hdfs.getWorkingDirectory(); Path newFolderPath= new Path("/sd1"); newFolderPath=Path.mergePaths(workingDir, newFolderPath); if(hdfs.exists(newFolderPath)) { hdfs.delete(newFolderPath); //Delete existing Directory } // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/Forcast/SampleData")); FileOutputFormat.setOutputPath(job, newFolderPath); if (!job.waitForCompletion(true)) return; } }
解决方案Basically what you need is to change the "output format class",and you have several ways there:
- Use MongoDB Connector for Hadoop: http://docs.mongodb.org/ecosystem/tools/hadoop/?_ga=1.111209414.370990604.1441913822
- Implement your own OutputFormat: https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/OutputFormat.html (Instead to use FileOutputFormat).
- Execute mongodb queries inside the reducer instead to write in the MapREduce context (Not Nice, you could end with empty outut files in HDFS depending on the OutputFormat specified in the driver)
In my opinion option 1 is the best option but I haven't used MongoDB connector to say if it is enough stable and functional. Option 2 require that you really understand how is working hadoop underhood to avoid end with a lot of open connections and problems with transactions and hadoop tasks retries.
这篇关于如何使用mongoDB中的mapReduce作为输出存储hdfs中的处理数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!