本文介绍了如何在Spark DataFrameSQL中引用广播变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下SparkSQL:

I have the following SparkSQL:

val resultDf = spark.sql("SELECT name, phone, country FROM users")

我想按以下集合中存在的国家/地区过滤返回的记录:

I'd like to filter returned records by countries which are present in the following collection:

val countries = Seq("Italy", "France", "United States", "Poland", "Spain")

例如,我可以基于集合创建广播变量:

For example I can create the broadcast variable based on the collection:

val countriesBroadcast = sc.broadcast(countries)

但是在我的SQL查询中可以使用countriesBroadcast变量吗?

but is it possible(and if so - how?) to use the countriesBroadcast variable inside of my SQL query?

推荐答案

在spark数据帧API中,我们可以广播整个表,可以将其与目标表连接以获得所需的输出.这是示例代码.

In spark data frame API we can broadcast the entire table can be joined with the target table to get the desired output. Here is the example code.

进口

import org.apache.spark.sql.functions.broadcast

代码

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.read.option("header", true).csv("data/user.txt")
df.createOrReplaceTempView("users")
val countries = Seq("Italy", "France", "United States", "Poland", "Spain")
import spark.implicits._
spark.sparkContext.parallelize(countries, 1).toDF("country").createOrReplaceTempView("countries")
broadcast(spark.table("countries")).join(spark.table("users"), "country").show()

"data/user.txt"文件内容

"data/user.txt" file contents

 name,phone,country
 a,123,India
 b,234,Italy
 c,526,France
 d,765,India

代码输出:

+-------+----+-----+
|country|name|phone|
+-------+----+-----+
|  Italy|   b|  234|
| France|   c|  526|
+-------+----+-----+

注意:代码已在Spark 2.2和Scala 2.11中进行了测试

Note: code tested with Spark 2.2 and Scala 2.11

这篇关于如何在Spark DataFrameSQL中引用广播变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-18 08:52