1.概述

离线数据处理生态系统包含许多关键任务,最大限度的提高数据管道基础设施的稳定性和效率是至关重要的。这边博客将分享Hive和Spark分区的各种策略,以最大限度的提高数据工程生态系统的稳定性和效率。

2.内容

大多数Spark Job可以通过三个阶段来表述,即读取输入数据、使用Spark处理、保存输出数据。这意味着虽然实际数据转换主要发生在内存中,但是Job通常以大量的I/O开始和结束。使用Spark常用堆栈是使用存储在HDFS上的Hive表作为输入和输出数据存储。Hive分区有效地表示为分布式文件系统上的文件目录。理论上,尽可能多的文件写入是有意义的,但是,这个也是有代价的。HDFS不能很好的支持大量小文件,每个文件在NameNode内存中大概有150字节的开销,而HDFS的整体IOPS数量有限。文件写入中的峰值绝对会导致HDFS基础架构的某些部分产生性能瓶颈。

比如从某个历史日期到当前日期重新计算表,通常用于修复错误或者数据质量问题。在处理包含一年数据的大型数据集(比如1TB以上)时,可能会将数据分成几千个Spark分区进行处理。虽然从表面上看,这种处理方法并不是最合适的,使用动态分区并将数据结果写入按照日期分区的Hive表中将产生多大100+万个文件。

假如有一个包含3个分区的Spark任务,并且想将数据写入到包含3个分区的Hive中。在这种情况下,希望发送的是将3个文件写入到HDFS,所有数据都存储在每个分区键的单个文件中。实际发生的是将生成9个文件,并且每个文件都有1个记录。使用动态分区写入Hive时,每个Spark分区都由执行程序并行处理。处理Spark分区数据时,每次执行程序在给定Spark分区中遇到新的分区键时,它都会打开一个新文件。默认情况下,Spark对数据会使用Hash或者Round Robin分区器。当应用于任意数据时,可以假设这2中方法在整个Spark分区中相对均匀但是随机分布数据行。如下图所示:

Hive和Spark分区策略-LMLPHP

理想情况下,目标文件大小应该大约是HDFS Block大小的倍数,默认情况下为128MB。在Hive管道中,提供了一些配置来自动将结果收集到合理大小的文件中,从开发人员的角度来看几乎是透明的,比如hive.merge.smallfiles.avgsize和hive.merge.size.per.task。但是,Spark中不存在此类功能,因此,我们需要自己开发实现,来给定一个数据集,应该写入多少文件。

2.1 基于Size的计算

理论上,这是最直接的方法,设置目标大小,估计数据的大小,然后进行划分。但是,在很多情况下,文件被写入磁盘时会进行压缩,并且其格式与存储在Java堆中的记录格式有所不同。这意味着估算写入磁盘时内存的记录大小不是一件容易的事情。

虽然可以使用Spark SizeEstimator实用程序通过内存中数据的大小进行估计,然后应用某种估计的压缩文件格式因此,但是SizeEstimator会考虑数据帧、数据集的内部消耗,以及数据的大小。总体来说,这种方式不太容易准确实现。

2.2 基于行数的计算

这种方法是设置目标行数,计算数据集的大小,然后执行除法以估计目标。我们的目标行数可以通过多种方式确定,或者通过为所有数据集选择一个静态数字,或者通过确定磁盘上单个记录的大小并执行必要的计算。哪种方式是最好取决于你的数据集数量及其复杂性。计数相对来说成本较低,但是需要在计数前缓存以避免重新计算数据集。

2.3 静态文件计数

最简单的解决方案是只要求开发人员在每个插入的基础上告诉Spark总共应该写入多少个文件,这种方式需要给开发人员一些其他方法来获得具体的数字,可以通过这种方式来替换昂贵的计算。

3.如何让Spark以合理的方式分发处理数据?

即使我们知道希望如何将文件写入磁盘,我们仍然必须让Spark以符合实际的方式生成这些文件来构建我们的分区。Spark提供了许多工具来确定数据在整个分区中的分布方式。但是,各种功能中隐藏着很多复杂性,在某些情况下,它们的含义并不明显。下面将介绍Spark提供的一些选项来控制Spark输出文件的数量。

3.1 合并

Spark Coalesce是一个特殊版本的重新分区,它只允许减少总的分区,但是不需要完全的Shuffle,因此比重新分区要快得多。它通过有效的合并分区来实现这一点。如下图所示:

Hive和Spark分区策略-LMLPHP

Coalesce在某些情况下看起来不错,但是也有一些问题。首先,Coalesce有一个让我们难以使用的行为。以一个非常基本的Spark应用程序为例,代码如下:

load().map(…).filter(…).save()

比如设置的并行度为1000,但是最终只想写入10个文件,可以设置如下:

load().map(…).filter(…).coalesce(10).save()

但是,Spark会尽可能早的有效的将合并操作下推,因此这将执行为:

load().coalesce(10).map(…).filter(…).save()

有效的解决这种问题的方法是在转换和合并之间强制执行,代码如下所示:

val df = load().map(…).filter(…).cache()
df.count()
df.coalesce(10)

缓存是必须的,否则,你将不得不重新计算数据,这可能会重新消耗资源。然后,缓存是需要消费一定资源的,如果你的数据集无法放入内存中,或者无法腾出内存将数据有效的存储在内存中两次,那么必须使用磁盘缓存,这有其自身的局限性和显著的性能损失。

此外,正如我们看到的,通常需要执行Shuffle来获得我们想要的更复杂的数据集结果。因此,Coalesce仅适用于特定的情况:

  • 保证只写入1个Hive分区;
  • 目标文件数少于你用于处理数据的Spark分区数;
  • 有充足的缓存资源。

3.2 简单重新分区

一个简单的重新分区,它的唯一参数是目标Spark分区计数,即df.repartition(100)。在这种情况下,使用循环分区器,这意味着唯一的保证是输出数据具有大致相同大小的Spark分区。

这种分区仅适用于以下情况的文件计数问题:

  • 保证只需要写入1个Hive分区;
  • 正在写入的文件数大于你的Spark分区数或者由于某些其他原因你无法使用合并。

3.3 按列重新分区

按列重新分区接收目标Spark分区计数,以及要重新分区的列序列,例如,df.repartition(100,$"date")。这对于强制Spark将具有相同键的记录分发到同一个分区很有用。一般来说,这对许多Spark操作(如JOIN)很有用,但是理论上,它也可以解决我们的问题。

按列重新分区使用HashPartitioner,它将具有相同值的记录分配给同一个分区,实际上,它将执行以下操作:

Hive和Spark分区策略-LMLPHP

但是,这种方法只有在每个分区键都可以安全的写入到一个文件时才有效。这是因为无论有多少值具有特定的Hash值,它们最终都会在同一个分区中。按列重新分区仅在你写入一个或者多个小的Hive分区时才有效。在任何其他情况下,它都没有用,因为每个Hive分区总是会得到一个文件,这仅适用于最小的数据集。

3.4 按具有随机因子的列重新分区

我们可以通过添加约束的随机因子来按列修改重新分区,代码如下:

df
.withColumn("rand", rand() % filesPerPartitionKey)
.repartition(100, $"key", $"rand")

理论上,只要满足以下条件,这种方法应该会产生排序良好的记录和大小相当均匀的文件:

  • Hive分区的大小大致相同;
  • 知道每个Hive分区的目标文件数并且可以在运行时对其进行编码。

但是,即使我们满足上述这些条件,还有另外一个问题:散列冲突。假设,现在正在处理一年的数据,日期作为分区的唯一键。如果每个分区需要5个文件,可以执行如下操作:

df.withColumn("rand", rand() % 5).repartition(5*365, $"date", $"rand")

在后台,Scala将构造一个包含日期和随机因素的键,例如(<date>,<0-4>)。然后,如果我们查看HashPartitioner代码,可以发现它将执行以下操作:

class HashPartitioner(partitions: Int) extends Partitioner {
def getPartition(key: Any): Int = key match {
   case null => 0
   case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
}

实际上,所做的就是获取关键元组的散列,然后使用目标数量的Spark分区获取它的mod。我们可以分析一下在这种情况下我们的记录将如何实现分布,分析代码如下:

import java.time.LocalDate

def hashCodeTuple(one: String, two: Int, mod: Int): Int = {
 val rawMod = (one, two).hashCode % mod
 rawMod + (if (rawMod < 0) mod else 0)
}
def hashCodeSeq(one: String, two: Int, mod: Int): Int = {
 val rawMod = Seq(one, two).hashCode % mod
 rawMod + (if (rawMod < 0) mod else 0)
}

def iteration(numberDS: Int, filesPerPartition: Int): (Double, Double, Double) = {
  val hashedRandKeys = (0 to numberDS - 1).map(x => LocalDate.of(2019, 1, 1).plusDays(x)).flatMap(
    x => (0 to filesPerPartition - 1).map(y => hashCodeTuple(x.toString, y, filesPerPartition*numberDS))
  )

  hashedRandKeys.size // Number of unique keys, with the random factor

  val groupedHashedKeys = hashedRandKeys.groupBy(identity).view.mapValues(_.size).toSeq

  groupedHashedKeys.size // number of actual sPartitions used

  val sortedKeyCollisions = groupedHashedKeys.filter(_._2 != 1).sortBy(_._2).reverse

  val sortedSevereKeyCollisions = groupedHashedKeys.filter(_._2 > 2).sortBy(_._2).reverse

  sortedKeyCollisions.size // number of sPartitions with a hashing collision

  // (collisions, occurences)
  val collisionCounts = sortedKeyCollisions.map(_._2).groupBy(identity).view.mapValues(_.size).toSeq.sortBy(_._2).reverse

  (
    groupedHashedKeys.size.toDouble / hashedRandKeys.size.toDouble,
    sortedKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble,
  sortedSevereKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble
  )
}

val results = Seq(
  iteration(365, 1),
  iteration(365, 5),
  iteration(365, 10),
  iteration(365, 100),
  iteration(365 * 2, 100),
  iteration(365 * 5, 100),
  iteration(365 * 10, 100)
)

val avgEfficiency = results.map(_._1).sum / results.length
val avgCollisionRate = results.map(_._2).sum / results.length
val avgSevereCollisionRate = results.map(_._3).sum / results.length

(avgEfficiency, avgCollisionRate, avgSevereCollisionRate) // 63.2%, 42%, 12.6%

上面的脚本计算了3个数量:

  • 效率:非空的Spark分区与输出文件数量的比率;
  • 碰撞率:(date,rand)的Hash值发送冲突的Spark分区的百分比;
  • 严重冲突率:同上,但是此键上的冲突次数为3或者更多。

冲突很重要,因为它们意味着我们的Spark分区包含多个唯一的分区键,而我们预计每个Spark分区只有1个。分析的结果可知,我们使用了63%的执行器,并且可能会出现严重的偏差,我们将近一半的执行者正在处理比预期多2到3倍或者在某些情况下高达8倍的数据。

现在,有一个解决方法,即分区缩放。在之前示例中,输出的Spark分区数量等于预期的总文件数。如果将N个对象随机分配给N个插槽,可以预期会有多个插槽包含多个对象,并且有几个空插槽。因此,需要解决此问题,必须要降低对象与插槽的比率。

我们通过缩放输出分区计数来实现这一点,通过将我们的输出Spar分区计数乘以一个大因子,类似于:

df
.withColumn(“rand”, rand() % 5)
.repartition(5*365*SCALING_FACTOR, $”date”, $”rand”)

分析代码如下:

import java.time.LocalDate

def hashCodeTuple(one: String, two: Int, mod: Int): Int = {
 val rawMod = (one, two).hashCode % mod
 rawMod + (if (rawMod < 0) mod else 0)
}

def hashCodeSeq(one: String, two: Int, mod: Int): Int = {
 val rawMod = Seq(one, two).hashCode % mod
 rawMod + (if (rawMod < 0) mod else 0)
}

def iteration(numberDS: Int, filesPerPartition: Int, partitionFactor: Int = 1): (Double, Double, Double, Double) = {
  val partitionCount = filesPerPartition*numberDS * partitionFactor
  val hashedRandKeys = (0 to numberDS - 1).map(x => LocalDate.of(2019, 1, 1).plusDays(x)).flatMap(
    x => (0 to filesPerPartition - 1).map(y => hashCodeTuple(x.toString, y, partitionCount))
  )

  hashedRandKeys.size // Number of unique keys, with the random factor

  val groupedHashedKeys = hashedRandKeys.groupBy(identity).view.mapValues(_.size).toSeq

  groupedHashedKeys.size // number of unique hashes - and thus, sPartitions with > 0 records

  val sortedKeyCollisions = groupedHashedKeys.filter(_._2 != 1).sortBy(_._2).reverse

  val sortedSevereKeyCollisions = groupedHashedKeys.filter(_._2 > 2).sortBy(_._2).reverse

  sortedKeyCollisions.size // number of sPartitions with a hashing collision

  // (collisions, occurences)
  val collisionCounts = sortedKeyCollisions.map(_._2).groupBy(identity).view.mapValues(_.size).toSeq.sortBy(_._2).reverse

  (
    groupedHashedKeys.size.toDouble / partitionCount,
    groupedHashedKeys.size.toDouble / hashedRandKeys.size.toDouble,
    sortedKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble,
    sortedSevereKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble
  )
}

// With a scale factor of 1
val results = Seq(
  iteration(365, 1),
  iteration(365, 5),
  iteration(365, 10),
  iteration(365, 100),
  iteration(365 * 2, 100),
  iteration(365 * 5, 100),
  iteration(365 * 10, 100)
)

val avgEfficiency = results.map(_._2).sum / results.length // What is the ratio of executors / output files
val avgCollisionRate = results.map(_._3).sum / results.length // What is the average collision rate
val avgSevereCollisionRate = results.map(_._4).sum / results.length // What is the average collision rate where 3 or more hashes collide

(avgEfficiency, avgCollisionRate, avgSevereCollisionRate) // 63.2% Efficiency, 42% collision rate, 12.6% severe collision rate

iteration(365, 5, 2) // 37.7% partitions in-use, 77.4% Efficiency, 24.4% collision rate, 4.2% severe collision rate
iteration(365, 5, 5)
iteration(365, 5, 10)
iteration(365, 5, 100)

随着我们的比例因子接近无穷大,碰撞很快接近于0,效率接近100%。但是,这会产生另外一个问题,即大量的输出Spark分区将为空。同时这些空的Spark分区也会带来一些资源开销,增加驱动程序的内存要求,并使我们更容易受到由于错误或者意外复杂性而导致分区键空间意外大的问题。

这里的一个常见方法是在使用这种方法时不显示设置分区技术(默认并行度和缩放),如果不提供分区计数,则依赖Spark默认的spark.default.parallelism值。虽然,通常并行度自然高于总输出文件数(因此,隐式提供大于1 的缩放因子)。如果满足以下条件,这种方式依然是一种有效的方法:

  • Hive分区的文件数大致相等;
  • 可以确定平均分区文件数应该是多少;
  • 大致知道唯一分区键的总数。

在示例中,我们假设其中的许多事情都很容易知道,主要是输出Hive分区的总数和每个Hive分区所需要的文件数。无论如何,这种方法都是可行的,并且可能适用于需要用例。

3.5 按范围重新分区

按范围重新分区是一个特列,它不使用RoundRobin和Hash Partitioner,而是使用一种特殊的方法,叫做Range Partitioner。

范围分区器根据某些给定键的顺序在Spark分区之间进行拆分行,但是,它不只是全局排序,它做出的保证是:

  • 具有相同散列的所有记录将在同一个分区中结束;
  • 所有Spark分区都将有一个最小值和最大值与之关联;
  • 最小值和最大值将通过使用采样来检测关键频率和范围来确定,分区边界将根据这些估计值进行初始设置;
  • 分区的大小不能保证完全相等,它们的相等性基于样本的准确性,因此,预测的每个Spark分区的最小值和最大值,分区将根据需要增长或缩小以保证前2个条件。

总而言之,范围分区将导致Spark创建与请求的Spark分区数量相等的Bucket数量,然后它将这些Bucket映射到指定分区键的范围。例如,如果你的分区键是日期,则范围可能是(最小值2021-01-01,最大值2022-01-01)。然后,对于每条记录,将记录的分区键与存储Bucket的最小值和最大值进行比较,并相应的进行分配。

Hive和Spark分区策略-LMLPHP

4.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。

06-28 00:15