本文介绍了如何使用ParquetWriter将TIMESTAMP逻辑类型(INT96)写入Parquet?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 org.apache.parquet.hadoop.ParquetWriter 即可将CSV数据文件转换为镶木地板数据文件.

I have a tool that uses a org.apache.parquet.hadoop.ParquetWriter to convert CSV data files to parquet data files.

当前,它仅处理int32doublestring

我需要支持实木复合地板timestamp逻辑类型(标注为int96),但由于无法在网上找到精确的规格,因此我不知道该怎么做.

I need to support the parquet timestamp logical type (annotated as int96), and I am lost on how to do that because I can't find a precise specification online.

看来这种时间戳记编码(int96)很少,并且没有得到很好的支持.我在网上发现很少的规格详细信息. 此github自述文件指出:

It appears this timestamp encoding (int96) is rare and not well supported. I've found very little specification details online. This github README states that:

具体是:

  1. 哪个实木复合地板类型我用于 MessageType 模式?我以为我应该使用原始类型PrimitiveTypeName.INT96,但是我不确定是否可以指定逻辑类型?
  2. 如何写入数据?也就是说,我要以哪种格式将时间戳记写入小组?对于INT96时间戳,我假设我必须写一些二进制类型?
  1. Which parquet Type do I use for the column in MessageType schema? I assume I should use the primitive type, PrimitiveTypeName.INT96, but I'm not sure if there may be a way to specify a logical type?
  2. How do I write the data? i.e. In what format do I write the timestamp to the group? For an INT96 timestamp, I assume I must write some binary type?

这是我的代码的简化版本,演示了我正在尝试做的事情.具体来说,请看一下"TODO"注释,这是代码中与上述问题相关的两点.

Here is a simplified version of my code that demonstrates what I am trying to do. Specifically, take a look at the "TODO" comments, these are the two points in the code that correlate to the questions above.

List<Type> fields = new ArrayList<>();
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int32_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "double_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.STRING, "string_col", null));

// TODO: 
//   Specify the TIMESTAMP type. 
//   How? INT96 primitive type? Is there a logical timestamp type I can use w/ MessageType schema?
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT96, "timestamp_col", null)); 

MessageType schema = new MessageType("input", fields);

// initialize writer
Configuration configuration = new Configuration();
configuration.setQuietMode(true);
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
  new Path("output.parquet"),
  new GroupWriteSupport(),
  CompressionCodecName.SNAPPY,
  ParquetWriter.DEFAULT_BLOCK_SIZE,
  ParquetWriter.DEFAULT_PAGE_SIZE,
  1048576,
  true,
  false,
  ParquetProperties.WriterVersion.PARQUET_1_0,
  configuration
);

// write CSV data
CSVParser parser = CSVParser.parse(new File(csv), StandardCharsets.UTF_8, CSVFormat.TDF.withQuote(null));
ArrayList<String> columns = new ArrayList<>(schemaMap.keySet());
int colIndex;
int rowNum = 0;
for (CSVRecord csvRecord : parser) {
  rowNum ++;
  Group group = f.newGroup();
  colIndex = 0;
  for (String record : csvRecord) {
    if (record == null || record.isEmpty() || record.equals( "NULL")) {
      colIndex++;
      continue;
    }


    record = record.trim();
    String type = schemaMap.get(columns.get(colIndex)).get("type").toString();
    MessageTypeConverter.addTypeValueToGroup(type, record, group, colIndex++);

    switch (colIndex) {
      case 0: // int32
        group.add(colIndex, Integer.parseInt(record));
        break;
      case 1: // double
        group.add(colIndex, Double.parseDouble(record));
        break;
      case 2: // string
        group.add(colIndex, record);
        break;
      case 3:
        // TODO: convert CSV string value to TIMESTAMP type (how?)
        throw new NotImplementedException();
    }
  }
  writer.write(group);
}
writer.close();

推荐答案

我使用此代码来自Spark sql作为参考.

I figured it out, using this code from spark sql as a reference.

INT96二进制编码分为两部分:自午夜以来的前8个字节为纳秒最后4个字节是朱利安日

The INT96 binary encoding is split into 2 parts:First 8 bytes are nanoseconds since midnightLast 4 bytes is Julian day

String value = "2019-02-13 13:35:05";

final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1);
final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1);
final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);

// Parse date
SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.setTime(parser.parse(value));

// Calculate Julian days and nanoseconds in the day
LocalDate dt = LocalDate.of(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH)+1, cal.get(Calendar.DAY_OF_MONTH));
int julianDays = (int) JulianFields.JULIAN_DAY.getFrom(dt);
long nanos = (cal.get(Calendar.HOUR_OF_DAY) * NANOS_PER_HOUR)
        + (cal.get(Calendar.MINUTE) * NANOS_PER_MINUTE)
        + (cal.get(Calendar.SECOND) * NANOS_PER_SECOND);

// Write INT96 timestamp
byte[] timestampBuffer = new byte[12];
ByteBuffer buf = ByteBuffer.wrap(timestampBuffer);
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(nanos).putInt(julianDays);

// This is the properly encoded INT96 timestamp
Binary tsValue = Binary.fromReusedByteArray(timestampBuffer);

这篇关于如何使用ParquetWriter将TIMESTAMP逻辑类型(INT96)写入Parquet?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 10:55