本文介绍了如何使用HBase的火花阅读的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面code将从HBase的读取,然后将其转换为JSON结构和皈​​依schemaRDD,但问题是,我使用的List 来存储JSON字符串,然后传递给javaRDD,对主都将载入内存中的数据大约100 GB的数据。什么是加载从HBase的再执行操作的数据,然后将转换为JavaRDD正确的方式。

包hbase_reader;
进口java.io.IOException异常;
的Bean;
进口的java.util.ArrayList;
进口的java.util.List;进口org.apache.spark.api.java.JavaPairRDD;
进口org.apache.spark.api.java.JavaRDD;
进口org.apache.spark.api.java.JavaSparkContext;
进口org.apache.spark.rdd.RDD;
进口org.apache.spark.sql.api.java.JavaSQLContext;
进口org.apache.spark.sql.api.java.JavaSchemaRDD;
进口org.apache.commons.cli.ParseException;
进口org.apache.hadoop.hbase.HBaseConfiguration;
进口org.apache.hadoop.hbase.KeyValue;
进口org.apache.hadoop.hbase.client.HTable;
进口org.apache.hadoop.hbase.client.Result;
进口org.apache.hadoop.hbase.client.ResultScanner;
进口org.apache.hadoop.hbase.client.Scan;
进口org.apache.hadoop.hbase.io.ImmutableBytesWritable;
进口org.apache.hadoop.hbase.ma preduce.TableInputFormat;
进口org.apache.hadoop.hbase.util.Bytes;
进口org.apache.hadoop.io.Text;
进口org.apache.spark.SparkConf;进口scala.Function1;
进口scala.Tuple2;
进口scala.runtime.AbstractFunction1;进口com.google.common.collect.Lists;公共类hbase_reader {    公共静态无效的主要(字串[] args)抛出IOException异常,ParseException的{        清单<串GT;罐子= Lists.newArrayList();        SparkConf SPCONF =新SparkConf();
        spconf.setMaster(本地[2]);
        spconf.setAppName(HBase的);
        //spconf.setSparkHome(\"/opt/human/opt/spark-0.9.0-hdp1);
        spconf.setJars(jars.toArray(新的String [jars.size()));
        JavaSparkContext SC =新JavaSparkContext(SPCONF);
        //spconf.set(\"spark.executor.memory,1克);        JavaSQLContext JSQL =新JavaSQLContext(SC);
        HBaseConfiguration的conf =新HBaseConfiguration();
        字符串tableName值=HBase.CounData1_Raw_Min1;
        HTable表=新HTable(CONF中,TableName);
        尝试{            ResultScanner扫描器= table.getScanner(新扫描());
            清单<串GT; jsonList =新的ArrayList<串GT;();            JSON字符串= NULL;            对于(结果rowResult:扫描仪){
                JSON =;
                串rowKey = Bytes.toString(rowResult.getRow());
                对于(字节[] S1:rowResult.getMap()键设置()){
                    串s1_str = Bytes.toString(S1);                    串jsonSame =;
                    为(字节[] S2:rowResult.getMap()获得(S1).keySet()){
                        串s2_str = Bytes.toString(S2);
                        为(长S3:rowResult.getMap()获得(S1)获得(S2).keySet()){
                            字符串s3_str =新的String(rowResult.getMap()得到(S1)获得(S2)获得(S3));
                            jsonSame + =\\+ s2_str +\\:+ s3_str +,;
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length() - 1);
                    JSON + =\\+ s1_str +\\+:{+ jsonSame +}+,;
                }
                JSON = json.substring(0,json.length() - 1);
                JSON ={\\RowKey \\:\\+ rowKey +\\,+ JSON +};
                jsonList.add(JSON);
            }            JavaRDD<串GT; jsonRDD = sc.parallelize(jsonList);            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);
            的System.out.println(schemaRDD.take(2));        } {最后
            table.close();
        }    }}


解决方案

一个基本的例子读取使用星火HBase的数据(斯卡拉),你也可以在Java中wrtie这样的:

 进口org.apache.hadoop.hbase.client {HBaseAdmin,结果}
进口org.apache.hadoop.hbase {HBaseConfiguration,HTableDescriptor}
进口org.apache.hadoop.hbase.ma preduce.TableInputFormat
进口org.apache.hadoop.hbase.io.ImmutableBytesWritable进口org.apache.spark._反对HBaseRead {
  高清主(参数:数组[字符串]){
    VAL sparkConf =新SparkConf()。setAppName(HBaseRead)。setMaster(本地[2])
    VAL SC =新SparkContext(sparkConf)
    VAL CONF = HBaseConfiguration.create()
    VAL tableName值=表1    System.setProperty(user.name,HDFS)
    System.setProperty(HADOOP_USER_NAME,HDFS)
    conf.set(hbase.master,本地主机:60000)
    conf.setInt(超时,120000)
    conf.set(hbase.zookeeper.quorum,本地主机)
    conf.set(zookeeper.znode.parent,/ HBase的-不安全)
    conf.set(TableInputFormat.INPUT_TABLE中,TableName)    VAL管理=新HBaseAdmin(CONF)
    如果(!admin.isTableAvailable(tableName值)){
      VAL tableDesc =新HTableDescriptor(tableName值)
      admin.createTable(tableDesc)
    }    VAL hBaseRDD = sc.newAPIHadoopRDD(CONF,classOf [TableInputFormat],classOf [ImmutableBytesWritable],classOf [结果])
    的println(记录数量+ hBaseRDD.count())
    sc.stop()
  }
}

The below code will read from the hbase, then convert it to json structure and the convert to schemaRDD , But the problem is that I am using List to store the json string then pass to javaRDD, for data of about 100 GB the master will be loaded with data in memory. What is the right way to load the data from hbase then perform manipulation,then convert to JavaRDD.

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new SparkConf();
        spconf.setMaster("local[2]");
        spconf.setAppName("HBase");
        //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
        spconf.setJars(jars.toArray(new String[jars.size()]));
        JavaSparkContext sc = new JavaSparkContext(spconf);
        //spconf.set("spark.executor.memory", "1g");

        JavaSQLContext jsql = new JavaSQLContext(sc);


        HBaseConfiguration conf = new HBaseConfiguration();
        String tableName = "HBase.CounData1_Raw_Min1";
        HTable table = new HTable(conf,tableName);
        try {

            ResultScanner scanner = table.getScanner(new Scan());
            List<String> jsonList = new ArrayList<String>();

            String json = null;

            for(Result rowResult:scanner) {
                json = "";
                String rowKey  = Bytes.toString(rowResult.getRow());
                for(byte[] s1:rowResult.getMap().keySet()) {
                    String s1_str = Bytes.toString(s1);

                    String jsonSame = "";
                    for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
                        String s2_str = Bytes.toString(s2);
                        for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
                            String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
                            jsonSame += "\""+s2_str+"\":"+s3_str+",";
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length()-1);
                    json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
                }
                json = json.substring(0,json.length()-1);
                json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
                jsonList.add(json);
            }

            JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




            System.out.println(schemaRDD.take(2));

        } finally {
            table.close();
        }

    }

}
解决方案

A Basic Example to Read the HBase data using Spark (Scala), You can also wrtie this in Java :

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "table1"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 120000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found : " + hBaseRDD.count())
    sc.stop()
  }
}

这篇关于如何使用HBase的火花阅读的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-20 01:19