SparkSQL Join选择逻辑 

先看JoinSelection的注释

翻译下就是:

如果是等值join,就先看join hints,顺序如下 

  1. broadcast hint:join类型支持的话选择broadcast hash join,如果join的两边都有broadcast hint,选择小的(基于统计)一方去广播
  2. sort merge hint:如果join的key是可排序的,选择sort merge join
  3. shuffle hash hint:join类型支持的话选择shuffle hash join
  4. shuffle replicate NL hint:如果是inner like类型(inner或cross),则选择cartesian product join

如果没有hint或者hint的类型是不合适的,按如下顺序选择

  1. broadcast hash join:如果join的一方足够小,小到可以广播,同时join类型支持,如果两边都很小,选最小的(基于统计)
  2. shuffle hash join:如果join的一方足够小可以构建hash map,并且比另一端小很多,同时需要spark.sql.join.preferSortMergeJoin置为false
  3. sort merge join:如果join的key是可排序的
  4. cartesian product:如果join类型是inner like类型(inner或cross)
  5. broadcast nested loop join:打底策略,即便可能导致OOM但别无选择

注意:

  • hash join(broadcast hash join或者shuffle hash join)只支持等值Join,不支持full outer join
  • 小到可以广播指的是,小于spark.sql.autoBroadcastJoinThreshold的阈值(默认10MB)
  val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold")
    .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
      "nodes when performing a join.  By setting this value to -1 broadcasting can be disabled. " +
      "Note that currently statistics are only supported for Hive Metastore tables where the " +
      "command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been " +
      "run, and file-based data source tables where the statistics are computed directly on " +
      "the files of data.")
    .version("1.1.0")
    .bytesConf(ByteUnit.BYTE)
    .createWithDefaultString("10MB")
  • shuffle hash join时要求一边比另一边小很多,小很多指的是3倍小于
    /**
     * Matches a plan whose output should be small enough to be used in broadcast join.
     */
    private def canBroadcast(plan: LogicalPlan): Boolean = {
      plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
    }

    /**
     * Matches a plan whose single partition should be small enough to build a hash table.
     *
     * Note: this assume that the number of partition is fixed, requires additional work if it's
     * dynamic.
     */
    private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
      plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
    }

    /**
     * Returns whether plan a is much smaller (3X) than plan b.
     *
     * The cost to build hash map is higher than sorting, we should only build hash map on a table
     * that is much smaller than other one. Since we does not have the statistic for number of rows,
     * use the size of bytes here as estimation.
     */
    private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
      a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
    }

hash join为什么只支持等值join,同时不支持full outer join?

  • 这是由于hashmap的特性决定的,随机访问效率最高O(1),为了性能是不会通过hashmap进行遍历查找的。
  • 不支持full outer join 是因为小表做的是构建表,由于不是流式表,无法决定是否输出该行,完全是被动的

参考一下SparkSQL Join流程:

cartesian join为什么会限制是inner like?

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
          } else {
            None
          }
        }

可以看下JoinType的类,继承了InnerLike的一个是inner join,一个是cross join

object JoinType {
  def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
    case "inner" => Inner
    case "outer" | "full" | "fullouter" => FullOuter
    case "leftouter" | "left" => LeftOuter
    case "rightouter" | "right" => RightOuter
    case "leftsemi" | "semi" => LeftSemi
    case "leftanti" | "anti" => LeftAnti
    case "cross" => Cross
    case _ =>
      val supported = Seq(
        "inner",
        "outer", "full", "fullouter", "full_outer",
        "leftouter", "left", "left_outer",
        "rightouter", "right", "right_outer",
        "leftsemi", "left_semi", "semi",
        "leftanti", "left_anti", "anti",
        "cross")

      throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
        "Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
  }
}

sealed abstract class JoinType {
  def sql: String
}

/**
 * The explicitCartesian flag indicates if the inner join was constructed with a CROSS join
 * indicating a cartesian product has been explicitly requested.
 */
sealed abstract class InnerLike extends JoinType {
  def explicitCartesian: Boolean
}

case object Inner extends InnerLike {
  override def explicitCartesian: Boolean = false
  override def sql: String = "INNER"
}

case object Cross extends InnerLike {
  override def explicitCartesian: Boolean = true
  override def sql: String = "CROSS"
}

case object LeftOuter extends JoinType {
  override def sql: String = "LEFT OUTER"
}

case object RightOuter extends JoinType {
  override def sql: String = "RIGHT OUTER"
}

case object FullOuter extends JoinType {
  override def sql: String = "FULL OUTER"
}

case object LeftSemi extends JoinType {
  override def sql: String = "LEFT SEMI"
}

case object LeftAnti extends JoinType {
  override def sql: String = "LEFT ANTI"
}
...
}

为什么Broadcast Nested Loop Join会OOM?

Broadcast Nested Loop Join需要广播数据集和嵌套循环,计算效率极低,对内存的需求也极大,因为不论数据集大小,都会有一个数据集被广播到所有executor上。

Cross Join优化案例

select /*+ mapjoin(b)*/
  a.*, sum(b.work_date) as '工作日'
from a
cross join 
work_date_dim b 
on b.begin_tm >= a.任务开始时间 
and b.end_tm < a.任务结束时间
group by ...

不加mapjoin的hint,执行结果就是特别慢!a表不到 10w, b表只有几千条,执行了30分钟还是不行!

加mapjoin的hint,不到 1分钟就执行完了。但是,注意b表不能太大。

01-04 10:47