本文介绍了如何加入两个 JDBC 表并避免交换?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有类似 ETL 的场景,其中我从多个 JDBC 表和文件中读取数据,并在源之间执行一些聚合和连接.

I've got ETL-like scenario, in which I read data from multiple JDBC tables and files and perform some aggregations and join between sources.

在一个步骤中,我必须连接两个 JDBC 表.我试过做这样的事情:

In one step I must join two JDBC tables. I've tried to do something like:

val df1 = spark.read.format("jdbc")
            .option("url", Database.DB_URL)
            .option("user", Database.DB_USER)
            .option("password", Database.DB_PASSWORD)
            .option("dbtable", tableName)
            .option("driver", Database.DB_DRIVER)
            .option("upperBound", data.upperBound)
            .option("lowerBound", data.lowerBound)
            .option("numPartitions", data.numPartitions)
            .option("partitionColumn", data.partitionColumn)
            .load();

val df2 = spark.read.format("jdbc")
            .option("url", Database.DB_URL)
            .option("user", Database.DB_USER)
            .option("password", Database.DB_PASSWORD)
            .option("dbtable", tableName)
            .option("driver", Database.DB_DRIVER)
            .option("upperBound", data2.upperBound)
            .option("lowerBound", data2.lowerBound)
            .option("numPartitions", data2.numPartitions)
            .option("partitionColumn", data2.partitionColumn)
            .load();

df1.join(df2, Seq("partition_key", "id")).show();

请注意,两种情况下的 partitionColumn 是相同的 - partition_key".

Note that partitionColumn in both cases is the same - "partition_key".

但是,当我运行这样的查询时,我可以看到不必要的交换(为了可读性而清除了计划):

However, when I run such query, I can see unnecessary exchange (plan cleared for readability):

df1.join(df2, Seq("partition_key", "id")).explain(extended = true);
Project [many many fields]
+- Project [partition_key#10090L, iv_id#10091L, last_update_timestamp#10114,  ... more fields]
    +- SortMergeJoin [partition_key#10090L, id#10091L], [partition_key#10172L, id#10179L], Inner
       :- *Sort [partition_key#10090L ASC NULLS FIRST, iv_id#10091L ASC NULLS FIRST], false, 0
       :  +- Exchange hashpartitioning(partition_key#10090L, iv_id#10091L, 4)
       :     +- *Scan JDBCRelation((select mod(s.id, 23) as partition_key, s.* from tab2 s)) [numPartitions=23] [partition_key#10090L,id#10091L,last_update_timestamp#10114] PushedFilters: [*IsNotNull(PARTITION_KEY)], ReadSchema: struct<partition_key:bigint,id:bigint,last_update_timestamp:timestamp>
       +- *Sort [partition_key#10172L ASC NULLS FIRST, id#10179L ASC NULLS FIRST], false, 0
          +- Exchange hashpartitioning(partition_key#10172L, iv_id#10179L, 4)
             +- *Project [partition_key#10172L, id#10179L ... 75 more fields]
               +- *Scan JDBCRelation((select mod(s.id, 23) as partition_key, s.* from tab1 s)) [numPartitions=23] [fields] PushedFilters: [*IsNotNull(ID), *IsNotNull(PARTITION_KEY)], ReadSchema: struct<partition_key:bigint,id:bigint...

如果我们已经用numPartitions等选项对读取进行了分区,分区数是一样的,为什么还需要另一个Exchange?我们能以某种方式避免这种不必要的洗牌吗?在测试数据上,我看到 Sparks 在本次 Exchange 期间发送了超过 150M 的数据,其中生产Datasets 更大,因此可能是严重的瓶颈.

If we have already partitioned reading with numPartitions and other options, partition count is the same, why there is a need for another Exchange? Can we somehow avoid this unnecessary shuffle? On the test data I see Sparks sends more than 150M of data during this Exchange, where production Datasets are much bigger, so it can be serious bottleneck.

推荐答案

在 Date Source API 的当前实现中,没有向上游传递分区信息,因此即使数据可以在没有 shuffle 的情况下加入,Spark 也无法使用这些信息.因此,您的假设是:

With current implementation of the Date Source API there is no partitioning information passed upstream so even if data could be joined without a shuffle, Spark cannot use this information. Therefore your assumption that:

JdbcRelation 在读取时使用 RangePartitioning

只是不正确.此外,看起来 Spark 使用相同的内部代码来处理基于范围的 JDBC 分区和基于谓词的 JDBC 分区.虽然前者可以转换为 SortOrder,但后者一般可能与 Spark SQL 不兼容.

is just incorrect. Furthermore it looks like Spark uses the same internal code to handle range-based JDBC partitions and predicate-based JDBC partitions. While the former one could be translated to SortOrder, the latter one might be incompatible with Spark SQL in general.

如有疑问,可以使用QueryExecution和内部RDD来检索Partitioner信息:

When in doubt, it is possible to retrieve Partitioner information using QueryExecution and internal RDD:

df.queryExecution.toRdd.partitioner

这在未来可能会改变(SPIP: 数据 来源 API V2SPARK-15689 - 数据源 API v2火花数据帧.预排序分区).

This might change in the future (SPIP:​ ​ Data​ ​ Source​ ​ API​ ​ V2, SPARK-15689 - Data source API v2 and Spark Data Frame. PreSorded partitions ).

这篇关于如何加入两个 JDBC 表并避免交换?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-03 06:05