本文介绍了Spark Parquet Statistics(最小/最大)集成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在研究Spark如何在Parquet中存储统计信息(最小/最大),以及它如何使用该信息进行查询优化.我有几个问题.首次设置:Spark 2.1.0,以下设置了一个1000行的数据框,其中包含长型和字符串型列.不过,它们是按不同的列排序的.

I have been looking into how Spark stores statistics (min/max) in Parquet as well as how it uses the info for query optimization.I have got a few questions.First setup: Spark 2.1.0, the following sets up a Dataframe of 1000 rows, with a long type and a string type column.They are sorted by different columns, though.

scala> spark.sql("select id, cast(id as string) text from range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
scala> spark.sql("select id, cast(id as string) text from range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")

我在镶木地板工具中添加了一些代码以打印出统计数据并检查生成的镶木文件:

I added some code to parquet-tools to print out stats and examine the generated parquet files:

hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta /secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet 
file:        file:/secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          REQUIRED INT64 R:0 D:0
text:        REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:5 TS:133 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5 ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
text:         BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5 ENC:PLAIN,BIT_PACKED

hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta /secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet 
file:        file:/secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          REQUIRED INT64 R:0 D:0
text:        REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:5 TS:140 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5 ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
text:         BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5 ENC:PLAIN,BIT_PACKED

所以问题是,为什么Spark(尤其是2.1.0)只为数字列生成min/max,而不为string(BINARY)字段生成(即使将string字段包括在排序中)?也许我错过了配置?

So the question is why is Spark, particularly, 2.1.0, only generate min/max for numeric columns, but not strings(BINARY) fields, even if the string field is included in the sort? Maybe I missed a configuraiton?

第二个问题是,如何确认Spark正在使用最小/最大?

The second issue, is how can I confirm Spark is utilizing the min/max?

scala> sc.setLogLevel("INFO")
scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where id=4").show

我有很多这样的话:

17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate: and(noteq(id, null), eq(id, 4))
17/01/17 09:23:35 INFO FileScanRDD: Reading File path: file:///secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet, range: 0-558, partition values: [empty row]
...
17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate: and(noteq(id, null), eq(id, 4))
17/01/17 09:23:35 INFO FileScanRDD: Reading File path: file:///secret/spark21-sortById/part-00193-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet, range: 0-574, partition values: [empty row]
...

问题是,Spark似乎正在扫描每个文件,即使从最小/最大角度来看,Spark仍应能够确定只有00000部分具有相关数据.还是我读错了,Spark正在跳过文件?也许Spark只能使用分区值来跳过数据?

The question is it looks like Spark is scanning every file, even if from the min/max, Spark should be able to determine only part-00000 has the relevant data. Or maybe I read it wrong, that Spark is skipping the files? Maybe Spark can only use partition value for data skipping?

推荐答案

对于第一个问题,我认为这是一个定义问题(字符串的最小值/最大值是多少?按词法排序?),但无论如何据我所知,Spark的实木复合地板目前仅索引数字.

For the first question, I believe this is a matter of definition (what would be the min/max of a string? lexical ordering?) but in any case as far as I know, spark's parquet currently only indexes numbers.

对于第二个问题,我相信,如果您看得更深,您会发现Spark本身并没有加载文件.相反,它正在读取元数据,因此它知道是否读取块.因此,基本上,它会将谓词推到文件(块)级别.

As for the second question, I believe that if you look deeper you would see that spark is not loading the files themselves. Instead it is reading the metadata so it knows whether to read a block or not. So basically it is pushing the predicate to the file (block) level.

这篇关于Spark Parquet Statistics(最小/最大)集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-13 22:05