本文介绍了使用JDBC导入Postgres时如何对Spark RDD进行分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将Postgres数据库导入Spark.我知道我可以对导入进行分区,但是这需要我有一个数字列(我不想使用value列,因为它到处都是并且不保持顺序):

df = spark.read.format('jdbc').options(url=url, dbtable='tableName', properties=properties).load()
df.printSchema()

root
 |-- id: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- key: string (nullable = false)
 |-- value: double (nullable = false)

相反,我将数据帧转换为rdd(枚举元组),并尝试对其进行分区:

rdd = df.rdd.flatMap(lambda x: enumerate(x)).partitionBy(20)

请注意,我使用20是因为我的集群中有5个工作人员,每个工作人员有一个核心,并且5*4=20.

不幸的是,以下命令仍然需要永远执行:

result = rdd.first()

因此,我想知道我上面的逻辑是否有意义?我做错什么了吗?从Web GUI来看,好像没有使用这些工作程序:

解决方案

由于您已经知道可以按数字列进行分区,所以这可能是您应该做的.这是窍门.首先,让我们找到一个最小和最大纪元:

url = ...
properties = ...

min_max_query = """(
    SELECT
        CAST(min(extract(epoch FROM timestamp)) AS bigint), 
        CAST(max(extract(epoch FROM timestamp)) AS bigint)
    FROM tablename
) tmp"""

min_epoch, max_epoch = spark.read.jdbc(
    url=url, table=min_max_query, properties=properties
).first()

并使用它查询表:

numPartitions = ...

query = """(
    SELECT *, CAST(extract(epoch FROM timestamp) AS bigint) AS epoch
    FROM tablename) AS tmp"""

spark.read.jdbc(
    url=url, table=query,
    lowerBound=min_epoch, upperBound=max_epoch + 1, 
    column="epoch", numPartitions=numPartitions, properties=properties
).drop("epoch")

由于这会将数据分成相同大小的范围,因此它对数据偏斜比较敏感,因此应谨慎使用.

您还可以提供不连续谓词列表作为predicates自变量.

predicates= [
    "id BETWEEN 'a' AND 'c'",
    "id BETWEEN 'd' AND 'g'",
    ...   # Continue to get full coverage an desired number of predicates
]

spark.read.jdbc(
    url=url, table="tablename", properties=properties, 
    predicates=predicates
)

后一种方法更加灵活,可以解决数据分布不均匀的某些问题,但需要更多有关数据的知识.

使用partitionBy首先获取数据,然后执行完全混洗以获取所需的分区数量,因此相对而言非常昂贵.

I am importing a Postgres database into Spark. I know that I can partition on import, but that requires that I have a numeric column (I don't want to use the value column because it's all over the place and doesn't maintain order):

df = spark.read.format('jdbc').options(url=url, dbtable='tableName', properties=properties).load()
df.printSchema()

root
 |-- id: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- key: string (nullable = false)
 |-- value: double (nullable = false)

Instead, I am converting the dataframe into an rdd (of enumerated tuples) and trying to partition that instead:

rdd = df.rdd.flatMap(lambda x: enumerate(x)).partitionBy(20)

Note that I used 20 because I have 5 workers with one core each in my cluster, and 5*4=20.

Unfortunately, the following command still takes forever to execute:

result = rdd.first()

Therefore I am wondering if my logic above makes sense? Am I doing anything wrong? From the web GUI, it looks like the workers are not being used:

解决方案

Since you already know you can partition by a numeric column this is probably what you should do. Here is the trick. First lets find a minimum and maximum epoch:

url = ...
properties = ...

min_max_query = """(
    SELECT
        CAST(min(extract(epoch FROM timestamp)) AS bigint), 
        CAST(max(extract(epoch FROM timestamp)) AS bigint)
    FROM tablename
) tmp"""

min_epoch, max_epoch = spark.read.jdbc(
    url=url, table=min_max_query, properties=properties
).first()

and use it to query the table:

numPartitions = ...

query = """(
    SELECT *, CAST(extract(epoch FROM timestamp) AS bigint) AS epoch
    FROM tablename) AS tmp"""

spark.read.jdbc(
    url=url, table=query,
    lowerBound=min_epoch, upperBound=max_epoch + 1, 
    column="epoch", numPartitions=numPartitions, properties=properties
).drop("epoch")

Since this splits data into ranges of the same size it is relatively sensitive to data skew so you should use it with caution.

You could also provide a list of disjoint predicates as a predicates argument.

predicates= [
    "id BETWEEN 'a' AND 'c'",
    "id BETWEEN 'd' AND 'g'",
    ...   # Continue to get full coverage an desired number of predicates
]

spark.read.jdbc(
    url=url, table="tablename", properties=properties, 
    predicates=predicates
)

The latter approach is much more flexible and can address certain issues with non-uniform data distribution but requires more knowledge about the data.

Using partitionBy fetches data first and then performs full shuffle to get desired number of partitions so it is relativistically expensive.

这篇关于使用JDBC导入Postgres时如何对Spark RDD进行分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 17:09