本文介绍了如何使用纯Java(包括日期和十进制类型)生成Parquet文件并上传到S3 [Windows](无HDFS)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近有一个要求,我需要生成Parquet文件,Apache Spark只能使用Java读取(不使用其他软件安装,如:Apache Drill,Hive,Spark等)。这些文件需要保存到S3,因此我将分享有关如何执行这两项操作的详细信息。



没有简单的指导如何执行此操作。我也不是Java程序员,因此使用Maven,Hadoop等的概念对我来说都是陌生的。所以我花了将近两周的时间来完成这项工作。我想在下面分享我的个人指南,了解我如何实现这一目标

解决方案

免责声明:以下代码示例没有办法代表最佳做法,只是作为一个粗略的操作方法。



依赖关系:




  • parquet-avro(1.9.0):


  • 修改系统路径变量(不是用户变量)并在末尾添加以下内容:%HADOOP_HOME%\ bin


  • 重新启动机器的更改生效。

  • 如果此配置未正确完成,您将在运行时收到以下错误: java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO $ Windows.access0(Ljava / lang / String; I)Z



    编码入门:




    • First创建一个新的空Maven项目并添加parquet-avro 1.9.0和hadoop-aws 2.8.2作为依赖项:

    • 创建您的主要课程可写一些代码

    • 首先,您需要生成架构。现在据我所知,你无法在运行时以编程方式生成模式。 Schema.Parser 类' parse()方法仅将文件或字符串文字作为参数,并且在创建模式后不允许您修改模式。
      为了避免这种情况,我在运行时生成我的Schema JSON并解析它。下面是一个示例Schema:

        String schema ={\namespace \:\org.myorganization。 mynamespace \,//不在Parquet中使用,可以放任何东西
      +\type \:\record \,//必须设置为记录
      +\name \:\myrecordname \,//不在Parquet中使用,可以放任何
      +\fields \:[
      + {\name \:\myInteger \,\type \:\int \},//必填字段
      +{\ name \:\myString \,\type \:[\string \,\null \]},
      +{\ name \:\myDecimal \,\type \:[{\type \:\fixed\,\size \:16, \logicalType \:\decimal \,\name \:\mydecimaltype1 \,\precision \:32,\scale \: 4},\null \]},
      +{\name \: \myDate \,\type \:[{\type \:\int \,\logicalType \:\date \} ,\null \]}
      +]};
      解析器解析器= new Schema.Parser()。setValidate(true);
      Schema avroSchema = parser.parse(schema);

      有关Avro架构的详细信息,请访问:




    为了您的方便,整个源代码:

      package com.mycompany.stackoverflow; 

    import java.math.BigDecimal;
    import java.math.BigInteger;
    import java.math.RoundingMode;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.avro.AvroParquetWriter;
    import org.apache.parquet.hadoop.ParquetWriter;
    import org.apache.parquet.hadoop.metadata.CompressionCodecName;
    import org.joda.time.DateTime;
    import org.joda.time.DateTimeZone;
    import org.joda.time.Days;
    import org.joda.time.MutableDateTime;

    公共类Main {
    public static void main(String [] args){
    System.out.println(Start);

    字符串模式={\namespace \:\org.myorganization.mynamespace \,//未在Parquet中使用,可以放任何
    + \type \:\record \,//必须设置为记录
    +\name \:\myrecordname \,// Not在Parquet中使用,可以放任何
    +\fields \:[
    +{\name \:\myInteger \,\类型\:\int \},//必填字段
    +{\name \:\myString \,\type \:[ \string \,\null \]},
    +{\name \:\myDecimal \,\type \: [{\type \:\fixed\,\size \:16,\logicalType \:\decimal \,\name \ :\mydecimaltype1 \,\precision \:32,\scale \:4},\null \]},
    +{\name \:\myDate \,\type \:[{\type \:\int \, \logicalType \:\date \},\null \]}
    +]};

    Schema.Parser解析器= new Schema.Parser()。setValidate(true);
    Schema avroSchema = parser.parse(schema);

    GenericData.Record record = new GenericData.Record(avroSchema);
    record.put(myInteger,1);
    record.put(myString,string value 1);

    BigDecimal myDecimalValue = new BigDecimal(99.9999);

    //首先我们需要确保巨大的十进制符合我们的模式比例:
    myDecimalValue = myDecimalValue.setScale(4,RoundingMode.HALF_UP);

    //接下来我们得到十进制值作为一个BigInteger(就像没有小数点)
    BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();

    //最后我们序列化整数
    byte [] decimalBytes = myUnscaledDecimalValue.toByteArray();

    //我们需要创建一个Avro'Fixed'类型并再次传递十进制模式:
    GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser()。解析({\type \:\fixed\,\size \:16,\precision \:32,\scale \:4, \ name\:\ mydecimaltype1\}));

    byte [] myDecimalBuffer = new byte [16];
    if(myDecimalBuffer.length> = decimalBytes.length){
    //因为我们将固定字节数组大小设置为16字节,我们需要
    // pad-left我们的原始值零的字节
    int myDecimalBufferIndex = myDecimalBuffer.length - 1;
    for(int i = decimalBytes.length - 1; i> = 0; i - ){
    myDecimalBuffer [myDecimalBufferIndex] = decimalBytes [i];
    myDecimalBufferIndex--;
    }

    //保存结果
    fixed.bytes(myDecimalBuffer);
    } else {
    抛出新的IllegalArgumentException(String.format(十进制大小:%d大于允许的最大值:%d,decimalBytes.length,myDecimalBuffer.length));
    }

    //我们最终可以将小数写入记录
    record.put(myDecimal,fixed);

    //获取纪元价值
    MutableDateTime epoch = new MutableDateTime(0l,DateTimeZone.UTC);

    DateTime currentDate = new DateTime(); //可以在构造函数中使用Java Date
    Days days = Days.daysBetween(epoch,currentDate);

    //我们可以将纪元以来的天数写入记录
    record.put(myDate,days.getDays());

    try {
    配置conf = new Configuration();
    conf.set(fs.s3a.access.key,ACCESSKEY);
    conf.set(fs.s3a.secret.key,SECRETKEY);
    //以下是其他一些有用的设置
    //conf.set(\"fs.s3a.endpoint,s3.amazonaws.com);
    //conf.set(\"fs.s3a.aws.credentials.provider,org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider);
    //conf.set(\"fs.hdfs.impl,org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); //除非您引用hadoop-hdfs库,否则不需要。
    //conf.set(\"fs.file.impl,org.apache.hadoop.fs.LocalFileSystem.class.getName()); //如果出现No FileSystem for scheme:file错误,请取消注释。

    Path path = new Path(s3a://your-bucket-name/examplefolder/data.parquet);

    //使用下面的路径保存到本地文件系统而不是
    // Path path = new Path(data.parquet);

    try(ParquetWriter< GenericData.Record> writer = AvroParquetWriter。< GenericData.Record> builder(path)
    .withSchema(avroSchema)
    .withCompressionCodec(CompressionCodecName.GZIP)
    .withConf(conf)
    .withPageSize(4 * 1024 * 1024)//用于压缩
    .withRowGroupSize(16 * 1024 * 1024)//用于写入缓冲(页面大小)
    .build()){

    //在我们的示例
    writer.write(record)中只写一条记录;
    }
    } catch(Exception ex){
    ex.printStackTrace(System.out);
    }
    }
    }


    I recently had a requirement where I needed to generate Parquet files that could be read by Apache Spark using only Java (Using no additional software installations such as: Apache Drill, Hive, Spark, etc.). The files needed to be saved to S3 so I will be sharing details on how to do both.

    There were no simple to follow guides on how to do this. I'm also not a Java programmer so the concepts of using Maven, Hadoop, etc. were all foreign to me. So it took me nearly two weeks to get this working. I'd like to share my personal guide below on how I achieved this

    解决方案

    Disclaimer: The code samples below in no way represent best practices and are only presented as a rough how-to.

    Dependencies:

    I'll be using NetBeans as my IDE.

    Some info regarding parquet in Java (For noobs such as me):

    • In order to serialize your data into parquet, you must choose one of the popular Java data serialization frameworks: Avro, Protocol Buffers or Thrift (I'll be using Avro (1.8.0), as can be seen from our parquet-avro dependency)
    • You will need to use an IDE that supports Maven. This is because the dependencies above have a lot of dependencies of their own. Maven will automatically download those for you (like NuGet for VisualStudio)

    Pre-requisite:

    You must have hadoop on the windows machine that will be running the Java code. The good news is you don't need to install the entire hadoop software, rather you need only two files:

    • hadoop.dll
    • winutils.exe

    These can be downloaded here. You will need version 2.8.1 for this example (due to parquet-avro 1.9.0).

    1. Copy these files to C:\hadoop-2.8.1\bin on the target machine.
    2. Add a new System Variable (not user variable) called: HADOOP_HOME with the value C:\hadoop-2.8.1

    3. Modify the System Path variable (not user variable) and add the following to the end: %HADOOP_HOME%\bin

    4. Restart the machine for changes to take affect.

    If this config was not done properly you will get the following error at run-time: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

    Getting Started with Coding:

    • First create a new empty Maven Project and add parquet-avro 1.9.0 and hadoop-aws 2.8.2 as dependencies:
    • Create your main class where you can write some code
    • First thing is you need to generate a Schema. Now as far as I can tell there is no way you can generate a schema programmatically at run-time. the Schema.Parser class' parse() method only takes a file or a string literal as a parameter and doesn't let you modify the schema once it is created.To circumvent this I am generating my Schema JSON at run time and parsing that. Below is an example Schema:

      String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
          + "\"type\": \"record\"," //Must be set as record
          + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
          + "\"fields\": ["
          + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
          + " {\"name\": \"myString\",  \"type\": [\"string\", \"null\"]},"
          + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
          + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
          + " ]}";
      Parser parser = new Schema.Parser().setValidate(true);
      Schema avroSchema = parser.parse(schema);
      

      Details on Avro schema can be found here: https://avro.apache.org/docs/1.8.0/spec.html

    • Next we can start generating records (Avro primitive types are simple):

      GenericData.Record record = new GenericData.Record(avroSchema);
      record.put("myInteger", 1);
      record.put("myString", "string value 1");
      

      • In order to generate a decimal logical type a fixed or bytes primitive type must be used as the actual data type for storage. The current Parquet format only supports Fixed length byte arrays (aka: fixed_len_byte_array). So we have to use fixed in our case as well (as can be seen in the schema). In Java we must use BigDecimal in order to truly handle decimals. And I've identified that a Decimal(32,4) will not take more than 16 bytes no matter the value. So we will use a standard byte array size of 16 in our serialization below (and in the schema above):

      BigDecimal myDecimalValue = new BigDecimal("99.9999");
      
      //First we need to make sure the BigDecimal matches our schema scale:
      myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
      
      //Next we get the decimal value as one BigInteger (like there was no decimal point)
      BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
      
      //Finally we serialize the integer
      byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
      
      //We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
      GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
      
      byte[] myDecimalBuffer = new byte[16];
      if (myDecimalBuffer.length >= decimalBytes.length) {            
          //Because we set our fixed byte array size as 16 bytes, we need to
          //pad-left our original value's bytes with zeros
          int myDecimalBufferIndex = myDecimalBuffer.length - 1;
          for(int i = decimalBytes.length - 1; i >= 0; i--){
              myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
              myDecimalBufferIndex--;
          }
          //Save result
          fixed.bytes(myDecimalBuffer);
      } else {
          throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
      }
      
      //We can finally write our decimal to our record
      record.put("myDecimal", fixed);
      

    • For Date values, Avro specifies that we need to save the number of days since EPOCH as an integer. (If you need the time component as well, such as an actual DateTime type, you need to use the Timestamp Avro type, which I will not cover). The easiest way I found to get the number of days since epoch is using the joda-time library. If you added the hadoop-aws dependency to your project you should already have this library. If not you will need to add it yourself:

      //Get epoch value
      MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
      
      DateTime currentDate = new DateTime(); //Can take Java Date in constructor
      Days days = Days.daysBetween(epoch, currentDate);
      
      //We can write number of days since epoch into the record
      record.put("myDate", days.getDays());
      

    • We finally can start writing our parquet file as such

      try {
         Configuration conf = new Configuration();
         conf.set("fs.s3a.access.key", "ACCESSKEY");
         conf.set("fs.s3a.secret.key", "SECRETKEY");
         //Below are some other helpful settings
         //conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
         //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
         //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
         //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors
      
         Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
      
         //Use path below to save to local file system instead
         //Path path = new Path("data.parquet");
      
         try (ParquetWriter writer = AvroParquetWriter.builder(path)
                 .withSchema(avroSchema)
                 .withCompressionCodec(CompressionCodecName.GZIP)
                 .withConf(conf)
                 .withPageSize(4 * 1024 * 1024) //For compression
                 .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
                 .build()) {
             //We only have one record to write in our example
             writer.write(record);
         }
      } catch (Exception ex) { ex.printStackTrace(System.out); }

    • Here is the data loaded into Apache Spark (2.2.0):

    And for your convenience, the entire source code:

    package com.mycompany.stackoverflow;
    
    import java.math.BigDecimal;
    import java.math.BigInteger;
    import java.math.RoundingMode;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.avro.AvroParquetWriter;
    import org.apache.parquet.hadoop.ParquetWriter;
    import org.apache.parquet.hadoop.metadata.CompressionCodecName;
    import org.joda.time.DateTime;
    import org.joda.time.DateTimeZone;
    import org.joda.time.Days;
    import org.joda.time.MutableDateTime;
    
    public class Main {
        public static void main(String[] args) {
            System.out.println("Start");
    
            String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
                    + "\"type\": \"record\"," //Must be set as record
                    + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
                    + "\"fields\": ["
                    + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
                    + " {\"name\": \"myString\",  \"type\": [\"string\", \"null\"]},"
                    + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
                    + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
                    + " ]}";
    
            Schema.Parser parser = new Schema.Parser().setValidate(true);
            Schema avroSchema = parser.parse(schema);
    
            GenericData.Record record = new GenericData.Record(avroSchema);
            record.put("myInteger", 1);
            record.put("myString", "string value 1");
    
            BigDecimal myDecimalValue = new BigDecimal("99.9999");
    
            //First we need to make sure the huge decimal matches our schema scale:
            myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
    
            //Next we get the decimal value as one BigInteger (like there was no decimal point)
            BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
    
            //Finally we serialize the integer
            byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
    
            //We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
            GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
    
            byte[] myDecimalBuffer = new byte[16];
            if (myDecimalBuffer.length >= decimalBytes.length) {            
                //Because we set our fixed byte array size as 16 bytes, we need to
                //pad-left our original value's bytes with zeros
                int myDecimalBufferIndex = myDecimalBuffer.length - 1;
                for(int i = decimalBytes.length - 1; i >= 0; i--){
                    myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
                    myDecimalBufferIndex--;
                }
    
                //Save result
                fixed.bytes(myDecimalBuffer);
            } else {
                throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
            }
    
            //We can finally write our decimal to our record
            record.put("myDecimal", fixed);
    
            //Get epoch value
            MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
    
            DateTime currentDate = new DateTime(); //Can take Java Date in constructor
            Days days = Days.daysBetween(epoch, currentDate);
    
            //We can write number of days since epoch into the record
            record.put("myDate", days.getDays());
    
            try {
               Configuration conf = new Configuration();
               conf.set("fs.s3a.access.key", "ACCESSKEY");
               conf.set("fs.s3a.secret.key", "SECRETKEY");
               //Below are some other helpful settings
               //conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
               //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
               //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
               //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors.
    
               Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
    
               //Use path below to save to local file system instead
               //Path path = new Path("data.parquet");
    
               try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(path)
                       .withSchema(avroSchema)
                       .withCompressionCodec(CompressionCodecName.GZIP)
                       .withConf(conf)
                       .withPageSize(4 * 1024 * 1024) //For compression
                       .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
                       .build()) {
    
                   //We only have one record to write in our example
                   writer.write(record);
               }
            } catch (Exception ex) { 
                ex.printStackTrace(System.out);
            }
        }
    }
    

    这篇关于如何使用纯Java(包括日期和十进制类型)生成Parquet文件并上传到S3 [Windows](无HDFS)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-24 23:39