第1章 Spark SQL 概述1.1 什么是 Spark SQL1.2 RDD vs DataFrames vs DataSet1.2.1 RDD1.2.2 DataFrame1.2.3 DataSet1.2.4 三者的共性1.2.5 三者的区别第2章 执行 Spark SQL 查询2.1 命令行查询流程2.2 IDEA 创建 Spark SQL 程序第3章 Spark SQL 解析3.1 新的起始点 SparkSession3.2 创建 DataFrames3.3 DataFrame 常用操作3.3.1 DSL 风格语法3.3.2 SQL 风格语法3.4 创建 DataSet3.5 DataFrame 和 RDD 互操作3.5.1 通过反射的方式获取 Scheam3.5.2 通过编程的方式设置 Schema(StructType)3.6 类型之间的转换总结3.7 用户自定义函数3.7.1 用户自定义 UDF 函数3.7.2 用户自定义 UDAF 函数(即聚合函数)第4章 Spark SQL 数据源4.1 通用加载/保存方法4.1.1 手动指定选项4.1.2 文件保存选项4.2 Parquet 文件4.2.1 Parquet 读写4.2.2 解析分区信息4.2.3 Schema 合并4.3 Hive 数据库4.3.1 内嵌 Hive 应用4.3.2 外部 Hive 应用4.4 JSON 数据集4.5 JDBC第5章 JDBC/ODBC 服务器第6章 运行 Spark SQL CLI第7章 Spark SQL 实战7.1 数据说明7.2 加载数据7.3 计算所有订单中每年的销售单数、销售总额7.4 计算所有订单每年最大金额订单的销售额7.5 计算所有订单中每年最畅销货品


第1章 Spark SQL 概述

1.1 什么是 Spark SQL

1.2 RDD vs DataFrames vs DataSet

1.2.1 RDD

1.2.2 DataFrame


1.2.3 DataSet

1.2.4 三者的共性

1、RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数据提供便利。
2、三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到 action,如 foreach 时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在 action 中使用对应的结果,在执行时会被直接跳过。

val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
val rdd=spark.sparkContext.parallelize(Seq(("a"1), ("b"1), ("a"1)))
// map 不运行
rdd.map { line =>
  println("运行")
  line._1
}

3、三者都会根据 spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。
4、三者都有 partition 的概念。
5、三者有许多共同的函数,如 filter,排序等。
6、在对 DataFrame 和 DataSet 进行许多操作都需要这个包进行支持

import spark.implicits._

7、DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型
DataFrame:

testDF.map {
      case Row(col1: String, col2: Int) =>
        println(col1)
        println(col2)
        col1
      case _=>
        ""
    }

DataSet:

case class Coltest(col1: String, col2: Int)extends Serializable // 定义字段名和类型
    testDS.map 
{
      case Coltest(col1: String, col2: Int) =>
        println(col1)
        println(col2)
        col1
      case _=>
        ""
    }

1.2.5 三者的区别

RDD
1、RDD 一般和 spark mlib 同时使用
2、RDD 不支持 sparksql 操作

DataFrame
1、与 RDD 和 DataSet 不同,DataFrame 每一行的类型固定为 Row,只有通过解析才能获取各个字段的值,如

testDF.foreach{
  line =>
    val col1=line.getAs[String]("col1")
    val col2=line.getAs[String]("col2")
}

每一列的值没法直接访问
2、DataFrame 与 DataSet 一般与 spark ml 同时使用
3、DataFrame 与 DataSet 均支持 sparksql 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作,如

dataDF.createOrReplaceTempView("tmp")
spark.sql("select ROW, DATE from tmp where DATE is not null order by DATE").show(100false)

4、DataFrame 与 DataSet 支持一些特别方便的保存方式,比如 保存成 csv,可以带上表头,这样每一列的字段名一目了然

// 保存
val saveoptions = Map("header" -> "true""delimiter" -> "\t""path" -> "hdfs://hadoop102:9000/test")
datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()

// 读取
val options = Map("header" -> "true""delimiter" -> "\t""path" -> "hdfs://hadoop102:9000/test")
val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()

利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定。

DataSet:
DataSet 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。
DataFrame 也可以叫 Dataset[Row],即每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。
而 DataSet 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息。

case class Coltest(col1: String, col2: Int) extends Serializable // 定义字段名和类型
/**
 rdd
 ("a", 1)
 ("b", 1)
 ("a", 1)
**/

val test: Dataset[Coltest] 
= rdd.map { line =>
      Coltest(line._1, line._2)
    }.toDS
    test.map{
      line =>
        println(line.col1)
        println(line.col2)
    }

可以看出,DataSet 在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用 DataSet,行的类型又不确定,可能是各种 case class,无法实现适配,这时候用 DataFrame,即 Dataset[Row] 就能比较好的解决问题。

第2章 执行 Spark SQL 查询

2.1 命令行查询流程

打开 spark-shell
例子:查询大于 30 岁的用户
创建如下 JSON 文件,注意 JSON 的格式:

{"name":"Michael""age":30}
{"name":"Andy""age":30}
{"name":"Justin""age":19}

操作步骤如下:

大数据技术之_19_Spark学习_03_Spark SQL 应用解析 + Spark SQL 概述、解析 、数据源、实战  + 执行 Spark SQL 查询 + JDBC/ODBC 服务器-LMLPHP

2.2 IDEA 创建 Spark SQL 程序

Spark SQL 在 IDEA 中程序的打包和运行方式都和 Spark Core 类似,Maven 依赖中需要添加新的依赖项:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <!-- provided 表示编译期可用,运行期不可用 -->
            <!--<scope>provided</scope>-->
        </dependency>

程序如下:

package com.atguigu.sparksql

import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object HelloWorld {

  val logger = LoggerFactory.getLogger(HelloWorld.getClass)

  def main(args: Array[String]) {
    // 创建 SparkSession 并设置 App 名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option""some-value")
      .getOrCreate()

    // 通过隐式转换将 RDD 操作添加到 DataFrame 上
    import spark.implicits._

    // 通过 spark.read 操作读取 JSON 数据
    val df = spark.read.json("examples/src/main/resources/people.json")

    // show 操作类似于 Action,将 DataFrame 直接打印到 Console 上
    df.show()

    // DSL 风格的使用方式:属性的获取方法 $
    df.filter($"age" > 21).show()

    //将 DataFrame 注册为表
    df.createOrReplaceTempView("persons")

    // 执行 Spark SQL 查询操作
    spark.sql("select * from perosns where age > 21").show()

    // 关闭资源
    spark.stop()
  }
}

第3章 Spark SQL 解析

3.1 新的起始点 SparkSession

    import org.apache.spark.sql.SparkSession

    // 创建 SparkSession 并设置 App 名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option""some-value")
      .getOrCreate()

    // 通过隐式转换将 RDD 操作添加到 DataFrame 上
    import spark.implicits._
    import org.apache.spark.sql.SparkSession

    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option""some-value")
      .enableHiveSupport()
      .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

3.2 创建 DataFrames

1、从 Spark 数据源进行创建:

val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()

+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
19| Justin|
+---+-------+

2、从 RDD 进行转换:

/**
Michael, 29
Andy, 30
Justin, 19
**/

scala> val personRdd = sc.textFile("examples/src/main/resources/people.txt")
personRdd: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[18] at textFile at <console>:24

// 把每一行的数据用 "," 隔开,然后通过第二个 map 转换成一个 Array 再通过 toDF 映射给 name 和 age
scala> val personDF3 = personRdd.map(_.split(",")).map(paras => (paras(0).trim(), paras(1).trim().toInt)).toDF("name""age")
personDF3: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> personDF3.collect
res0: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])

scala> personDF.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

3、从 Hive Table 进行查询返回,我们在数据源章节介绍。

3.3 DataFrame 常用操作

3.3.1 DSL 风格语法

// This import is needed to use the $-notation
import spark.implicits._

// Print the schema in a tree format
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|       31|
|   Andy|       31|
| Justin|       20|
+-------+---------+

// Select person older than 21
df.filter($"age" > 21).show()

+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
+---+-------+

// Count person by age
df.groupBy("age").count().show()

+---+-----+
|age|count|
+---+-----+
19|    1|
30|    2|
+---+-----+

3.3.2 SQL 风格语法

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("persons")

val sqlDF = spark.sql("SELECT * FROM persons")
sqlDF.show()

+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
19| Justin|
+---+-------+

// Register the DataFrame as a global temporary view
df.createGlobalTempView("persons")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.persons").show()

+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
19| Justin|
+---+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.persons").show()

+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
19| Justin|
+---+-------+

临时表是 Session 范围内的,Session 退出后,表就失效了。如果想应用范围内有效,可以使用全局表。
注意:使用全局表时需要全路径访问,如:global_temp.persons

3.4 创建 DataSet

DataSet 是具有强类型的数据集合,需要提供对应的类型信息。

scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS 
= Seq(Person("Andy"32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show()

+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+

scala> val primitiveDS = Seq(123).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> primitiveDS.map(_ + 1).collect()
res1: Array[Int] = Array(234

scala> val path = "examples/src/main/resources/people.json"
path: String = examples/src/main/resources/people.json

scala> val peopleDS = spark.read.json(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> peopleDS.show()

+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
19| Justin|
+---+-------+

3.5 DataFrame 和 RDD 互操作

3.5.1 通过反射的方式获取 Scheam

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0).trim(), attributes(1).trim().toInt))
  .toDF()

peopleDF.show

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 25")

// The columns of a row in the result can be accessed by field index 
teenagersDF.map(teenager => "Name: " + teenager(0)).show() // 通过 row 对象的索引进行访问

+------------+
|       value|
+------------+
|Name: Justin|
+------------+

// or by field name 通过 row 对象的 getAs 方法访问
teenagersDF.map(teenager => "Name: " + teenager.getAs[Int](0)).show() //  以索引访问
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // 以列名访问

// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly (没有为数据集 [Map [K,V]] 预定义的编码器明确定义)
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]

// Primitive types and case classes can be also defined as (原始类型和样例类也可以定义为)
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] (row.getValuesMap [T] 一次检索多个列到 Map [String,T])
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name""age"))).collect()
// Array[Map[String,Any]] = Array(Map(name -> Justin, age -> 19))

3.5.2 通过编程的方式设置 Schema(StructType)

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string (schema 以字符串形式编码)
val schemaString = "name age" // 实际开发中 schemaString 是动态生成的

// Generate the schema based on the string of schema (根据 schema 字符串生成 schema)
// 把 name 和 age 都设置成 StringType 类型
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) // Array[org.apache.spark.sql.types.StructField]
val schema = StructType(fields)

// 把 name 设置成 StringType 类型,把 age 设置成 IntegerType 类型
// val fields = schemaString.split(" ").map(fieldName => fieldName match { 
//  case "name" => StructField(fieldName, StringType, nullable = true);
//  case "age" => StructField(fieldName, IntegerType, nullable = true)
//  }) // Array[org.apache.spark.sql.types.StructField]

// Convert records of the RDD (people) to Rows (将 RDD (people) 的记录转换为很多行)
import org.apache.spark.sql._
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0).trim, attributes(1).trim)) // org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: string]  注意:此时的 name 和 age 都是 StringType 类型

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name (可以通过字段索引或字段名称访问结果中行的列)
results.map(attributes => "Name: " + attributes(0)).show()

+-------------+
|        value|
+-------------+
|Name: Michael|
|   Name: Andy|
| Name: Justin|
+-------------+

results.show

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

3.6 类型之间的转换总结

小结
DataFrame/Dataset 转 RDD:

val rdd1 = testDF.rdd
val rdd2 = testDS.rdd

RDD 转 DataFrame:

import spark.implicits._
val testDF = rdd.map { line =>
      (line._1, line._2)
    }.toDF("col1""col2")

一般用元组把一行的数据写在一起,然后在 toDF 中指定字段名。

RDD 转 DataSet:

import spark.implicits._
case class Coltest(col1:String, col2:Int) extends Serializable // 定义字段名和类型
val testDS 
= rdd.map { line =>
      Coltest(line._1, line._2)
    }.toDS

可以注意到,定义每一行的类型 case class 时,已经给出了字段名和类型,后面只要往 case class 里面添加值即可。

Dataset 转 DataFrame:
这个也很简单,因为只是把 case class 封装成 Row。

import spark.implicits._
val testDF = testDS.toDF

DataFrame 转 DataSet:

import spark.implicits._
case class Coltest(col1:String, col2:Int) extends Serializable // 定义字段名和类型
val testDS 
= testDF.as[Coltest]

这种方法就是在给出每一列的类型后,使用 as 方法,转成 DataSet,这在数据类型是 DataFrame 又需要针对各个字段处理时极为方便。
在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然 toDF、toDS 无法使用。

3.7 用户自定义函数

3.7.1 用户自定义 UDF 函数

scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
19| Justin|
+---+-------+

scala> spark.udf.register("addName", (x: String) => "Name:" + x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.createOrReplaceTempView("people")

scala> spark.sql("select addName(name), age from people").show()
+-----------------+---+
|UDF:addName(name)|age|
+-----------------+---+
|     Name:Michael| 30|
|        Name:Andy| 30|
|      Name:Justin| 19|
+-----------------+---+

scala> spark.sql("select addName(name) as newName, age from people").show()
+------------+---+
|     newName|age|
+------------+---+
|Name:Michael| 30|
|   Name:Andy| 30|
| Name:Justin| 19|
+------------+---+

3.7.2 用户自定义 UDAF 函数(即聚合函数)

弱类型用户自定义聚合函数
通过继承 UserDefinedAggregateFunction 来实现用户自定义聚合函数。下面展示一个求平均工资的自定义聚合函数:

package com.atguigu.spark

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

object MyAverage extends UserDefinedAggregateFunction {

  // 聚合函数输入参数的数据类型
  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // :: 用于的是向队列的头部追加数据,产生新的列表

  // 聚合缓冲区中值的数据类型
  def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) // Nil 是一个空的 List,定义为 List[Nothing]
  }

  // 返回值的数据类型
  def dataType: DataType = DoubleType

  // 对于相同的输入是否一直返回相同的输出
  def deterministic: Boolean = true

  // 初始化
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    // 存工资的总额
    buffer(0) = 0L
    // 存工资的个数
    buffer(1) = 0L
  }

  // 相同 Execute 间的数据合并(同一分区)
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }

  // 不同 Execute 间的数据合并(不同分区)
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  // 计算最终结果
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)

  def main(args: Array[String]) {
    // 创建 SparkSession 并设置 App 名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      // .config("spark.some.config.option", "some-value")
        .master("local[*]"// 本地测试
      .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    spark.udf.register("myAverage", MyAverage)

    // val df = spark.read.json("examples/src/main/resources/employees.json")
    val df = spark.read.json("D:\\learn\\JetBrains\\workspace_idea\\spark\\doc\\employees.json")
    df.createOrReplaceTempView("employees")
    df.show()
    // +-------+------+
    // |   name|salary|
    // +-------+------+
    // |Michael|  3000|
    // |   Andy|  4500|
    // | Justin|  3500|
    // |  Berta|  4000|
    // +-------+------+

    val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
    result.show()
    // +--------------+
    // |average_salary|
    // +--------------+
    // |        3750.0|
    // +--------------+

    spark.stop()
  }
}

强类型用户自定义聚合函数
通过继承 Aggregator 来实现强类型自定义聚合函数,同样是求平均工资:

package com.atguigu.spark

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

// 既然是强类型,可能有 case 类
case class Employee(name: String, salary: Long)

case class Average(var sum: Long, var count: Long)

// 其中 Employee 是在应用聚合函数的时候传入的对象,Average 是聚合函数在运行的时候内部需要的数据结构,Double 是聚合函数最终需要输出的类型
object MyAverage extends Aggregator[Employee, Average, Double] 
{

  // 定义一个数据结构,保存工资总数和工资总个数,初始都为0
  def zero: Average = Average(0L0L)

  // 相同 Execute 间的数据合并(同一分区)
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }

  // 聚合不同 Execute 的结果
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }

  // 计算最终结果
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count

  // 设定之间值类型的编码器,要转换成 case 类
  // Encoders.product 是进行 scala 元组和 case 类转换的编码器
  def bufferEncoder: Encoder[Average] = Encoders.product

  // 设定最终输出值的编码器
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble

  def main(args: Array[String]) 
{
    // 创建 SparkSession 并设置 App 名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      // .config("spark.some.config.option", "some-value")
      .master("local[*]"// 本地测试
      .getOrCreate()

    import spark.implicits._
    // For implicit conversions like converting RDDs to DataFrames
    // val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
    val ds = spark.read.json("D:\\learn\\JetBrains\\workspace_idea\\spark\\doc\\employees.json").as[Employee]
    ds.show()
    // +-------+------+
    // |   name|salary|
    // +-------+------+
    // |Michael|  3000|
    // |   Andy|  4500|
    // | Justin|  3500|
    // |  Berta|  4000|
    // +-------+------+

    val averageSalary = MyAverage.toColumn.name("average_salary")
    val result = ds.select(averageSalary)
    result.show()
    // +--------------+
    // |average_salary|
    // +--------------+
    // |        3750.0|
    // +--------------+

    spark.stop()
  }
}

第4章 Spark SQL 数据源

4.1 通用加载/保存方法

4.1.1 手动指定选项

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name""favorite_color").write.save("namesAndFavColors.parquet")
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`")
sqlDF.show()

spark-shell 下的演示代码:

scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json"// Spark SQL 的通用输入模式
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")  // Spark SQL 的通用输出模式

scala> peopleDF.show()
+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
19| Justin|
+---+-------+

scala> val peopleDF = spark.read.json("examples/src/main/resources/people.json"// Spark SQL 的专业输入模式
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] 

scala> peopleDF.write.parquet("hdfs://hadoop102:9000/namesAndAges.parquet"// Spark SQL 的专业输出模式

scala> peopleDF.show()
+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
19| Justin|
+---+-------+

scala> val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`")
19/04/27 21:32:55 WARN ObjectStore: Failed to get database parquet, returning NoSuchObjectException
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> sqlDF.show()
+---+-------+
|age|   name|
+---+-------+
30|Michael|
30|   Andy|
19| Justin|
+---+-------+

4.1.2 文件保存选项

4.2 Parquet 文件

4.2.1 Parquet 读写

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("hdfs://hadoop102:9000/people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("hdfs://hadoop102:9000/people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 25")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

4.2.2 解析分区信息

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

4.2.3 Schema 合并

示例代码如下:

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single""double")
df1.write.parquet("hdfs://hadoop102:9000/data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single""triple")
df2.write.parquet("hdfs://hadoop102:9000/data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema""true").parquet("hdfs://hadoop102:9000/data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

4.3 Hive 数据库

import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation 
new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ......

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF 
= sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ......

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ......

4.3.1 内嵌 Hive 应用

先做两个准备工作:
(1)为了方便以后的操作,我们先将 /opt/module/hive/conf 目录下的 hive-site.xml 和 /opt/module/hadoop-2.7.2/etc/hadoop 目录下的 core-site.xml、hdfs-site.xml 拷贝至 /opt/module/spark-2.1.1-bin-hadoop2.7 目录下,然后分发至其他机器节点。以后我们就操作 /opt/module/spark-2.1.1-bin-hadoop2.7 目录下的文件就好了!
(2)由于我们使用 /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-sql 和 /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-shell 时打出的日志很多,影响观看,所以我们修改下日志的输出级别 INFO 为 WARN,然后分发至其他机器节点。

[atguigu@hadoop102 conf]$ pwd
/opt/module/spark-2.1.1-bin-hadoop2.7/conf
[atguigu@hadoop102 conf]$ cp log4j.properties.template log4j.properties
[atguigu@hadoop102 conf]$ vim log4j.properties
[atguigu@hadoop102 conf]$ xsync log4j.properties

将 log4j.rootCategory=INFO, console 修改为 log4j.rootCategory=WARN, console

4.3.2 外部 Hive 应用

$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar

4.4 JSON 数据集

示例 JSON 文件如下:

{"name":"Michael""age":30}
{"name":"Andy""age":30}
{"name":"Justin""age":19}

示例代码如下:

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

4.5 JDBC

$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar

示例代码:

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read.format("jdbc").option("url""jdbc:mysql://hadoop102:3306/mysql").option("dbtable""db").option("user""root").option("password""123456").load()

val connectionProperties = new Properties()
connectionProperties.put("user""root")
connectionProperties.put("password""hive")
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/mysql""db", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url""jdbc:mysql://hadoop102:3306/mysql")
  .option("dbtable""db")
  .option("user""root")
  .option("password""123456")
  .save()

jdbcDF2.write.jdbc("jdbc:mysql://hadoop102:3306/mysql""db", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes""name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:mysql://hadoop102:3306/mysql""db", connectionProperties)

第5章 JDBC/ODBC 服务器

./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
......
./bin/beeline
beeline> !connect jdbc:hive2://hadoop102:10000

Hive 的 JDBC 访问:

[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ pwd
/opt/module/spark-2.1.1-bin-hadoop2.7
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ ./sbin/start-thriftserver.sh
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /opt/module/spark-2.1.1-bin-hadoop2.7/logs/spark-atguigu-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-hadoop102.out
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ ./bin/beeline 
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://hadoop102:10000
Connecting to jdbc:hive2://hadoop102:10000(回车)
Enter username for jdbc:hive2://hadoop102:10000: atguigu(回车)
Enter password for jdbc:hive2://hadoop102:10000: (直接回车)
Connected to: Spark SQL (version 2.1.1)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://hadoop102:10000> show tables;
+-----------+---------------------+--------------+--+
| database  |      tableName      | isTemporary  |
+-----------+---------------------+--------------+--+
default   | event_logs20151220  | false        |
default   | student22           | false        |
+-----------+---------------------+--------------+--+
2 rows selected (0.519 seconds)
0: jdbc:hive2://hadoop102:10000> 

第6章 运行 Spark SQL CLI

[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ pwd
/opt/module/spark-2.1.1-bin-hadoop2.7
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ ./bin/spark-sql 

如下图所示:

大数据技术之_19_Spark学习_03_Spark SQL 应用解析 + Spark SQL 概述、解析 、数据源、实战  + 执行 Spark SQL 查询 + JDBC/ODBC 服务器-LMLPHP
配置外部 Hive 需要替换 conf/ 下的 hive-site.xml 。

第7章 Spark SQL 实战

7.1 数据说明

数据集是货品交易数据集。

大数据技术之_19_Spark学习_03_Spark SQL 应用解析 + Spark SQL 概述、解析 、数据源、实战  + 执行 Spark SQL 查询 + JDBC/ODBC 服务器-LMLPHP
每个订单可能包含多个货品,每个订单可以产生多次交易,不同的货品有不同的单价。

7.2 加载数据

tbStock:

scala> case class tbStock(ordernumber: String, locationid: String, dateid: String) extends Serializable
defined class tbStock

scala> val tbStockRdd 
= spark.sparkContext.textFile("tbStock.txt")
tbStockRdd: org.apache.spark.rdd.RDD[String] = tbStock.txt MapPartitionsRDD[1] at textFile at <console>:23

scala> val tbStockDS = tbStockRdd.map(_.split(",")).map(attr => tbStock(attr(0), attr(1), attr(2))).toDS
tbStockDS: org.apache.spark.sql.Dataset[tbStock] = [ordernumber: string, locationid: string ... 1 more field]

scala> tbStockDS.show()
+------------+----------+---------+
| ordernumber|locationid|   dataid|
+------------+----------+---------+
|BYSL00000893|      ZHAO|2007-8-23|
|BYSL00000897|      ZHAO|2007-8-24|
|BYSL00000898|      ZHAO|2007-8-25|
|BYSL00000899|      ZHAO|2007-8-26|
|BYSL00000900|      ZHAO|2007-8-26|
|BYSL00000901|      ZHAO|2007-8-27|
|BYSL00000902|      ZHAO|2007-8-27|
|BYSL00000904|      ZHAO|2007-8-28|
|BYSL00000905|      ZHAO|2007-8-28|
|BYSL00000906|      ZHAO|2007-8-28|
|BYSL00000907|      ZHAO|2007-8-29|
|BYSL00000908|      ZHAO|2007-8-30|
|BYSL00000909|      ZHAO| 2007-9-1|
|BYSL00000910|      ZHAO| 2007-9-1|
|BYSL00000911|      ZHAO|2007-8-31|
|BYSL00000912|      ZHAO| 2007-9-2|
|BYSL00000913|      ZHAO| 2007-9-3|
|BYSL00000914|      ZHAO| 2007-9-3|
|BYSL00000915|      ZHAO| 2007-9-4|
|BYSL00000916|      ZHAO| 2007-9-4|
+------------+----------+---------+
only showing top 20 rows

tbStockDetail:

scala> case class tbStockDetail(ordernumber: String, rownum: Int, itemid: String, number: Int, price: Double, amount: Double) extends Serializable
defined class tbStockDetail

scala> val tbStockDetailRdd 
= spark.sparkContext.textFile("tbStockDetail.txt")
tbStockDetailRdd: org.apache.spark.rdd.RDD[String] = tbStockDetail.txt MapPartitionsRDD[13] at textFile at <console>:23

scala> val tbStockDetailDS = tbStockDetailRdd.map(_.split(",")).map(attr => tbStockDetail(attr(0), attr(1).trim().toInt, attr(2), attr(3).trim().toInt, attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS
tbStockDetailDS: org.apache.spark.sql.Dataset[tbStockDetail] = [ordernumber: string, rownum: int ... 4 more fields]

scala> tbStockDetailDS.show()
+------------+------+--------------+------+-----+------+
| ordernumber|rownum|        itemid|number|price|amount|
+------------+------+--------------+------+-----+------+
|BYSL00000893|     0|FS527258160501|    -1|268.0|-268.0|
|BYSL00000893|     1|FS527258169701|     1|268.0268.0|
|BYSL00000893|     2|FS527230163001|     1|198.0198.0|
|BYSL00000893|     3|24627209125406|     1|298.0298.0|
|BYSL00000893|     4|K9527220210202|     1|120.0120.0|
|BYSL00000893|     5|01527291670102|     1|268.0268.0|
|BYSL00000893|     6|QY527271800242|     1|158.0158.0|
|BYSL00000893|     7|ST040000010000|     8|  0.0|   0.0|
|BYSL00000897|     0|04527200711305|     1|198.0198.0|
|BYSL00000897|     1|MY627234650201|     1|120.0120.0|
|BYSL00000897|     2|01227111791001|     1|249.0249.0|
|BYSL00000897|     3|MY627234610402|     1|120.0120.0|
|BYSL00000897|     4|01527282681202|     1|268.0268.0|
|BYSL00000897|     5|84126182820102|     1|158.0158.0|
|BYSL00000897|     6|K9127105010402|     1|239.0239.0|
|BYSL00000897|     7|QY127175210405|     1|199.0199.0|
|BYSL00000897|     8|24127151630206|     1|299.0299.0|
|BYSL00000897|     9|G1126101350002|     1|158.0158.0|
|BYSL00000897|    10|FS527258160501|     1|198.0198.0|
|BYSL00000897|    11|ST040000010000|    13|  0.0|   0.0|
+------------+------+--------------+------+-----+------+
only showing top 20 rows

tbDate:

scala> case class tbDate(dateid: String, years: Int, theyear: Int, month: Int, day: Int, weekday: Int, week: Int, quarter: Int, period: Int, halfmonth: Int) extends Serializable
defined class tbDate

scala> val tbDateRdd 
= spark.sparkContext.textFile("tbDate.txt")
tbDateRdd: org.apache.spark.rdd.RDD[String] = tbDate.txt MapPartitionsRDD[20] at textFile at <console>:23

scala> val tbDateDS = tbDateRdd.map(_.split(",")).map(attr => tbDate(attr(0), attr(1).trim().toInt, attr(2).trim().toInt, attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS
tbDateDS: org.apache.spark.sql.Dataset[tbDate] = [dateid: string, years: int ... 8 more fields]

scala> tbDateDS.show()
+---------+------+-------+-----+---+-------+----+-------+------+---------+
|   dateid| years|theyear|month|day|weekday|week|quarter|period|halfmonth|
+---------+------+-------+-----+---+-------+----+-------+------+---------+
2003-1-1|200301|   2003|    1|  1|      3|   1|      1|     1|        1|
2003-1-2|200301|   2003|    1|  2|      4|   1|      1|     1|        1|
2003-1-3|200301|   2003|    1|  3|      5|   1|      1|     1|        1|
2003-1-4|200301|   2003|    1|  4|      6|   1|      1|     1|        1|
2003-1-5|200301|   2003|    1|  5|      7|   1|      1|     1|        1|
2003-1-6|200301|   2003|    1|  6|      1|   2|      1|     1|        1|
2003-1-7|200301|   2003|    1|  7|      2|   2|      1|     1|        1|
2003-1-8|200301|   2003|    1|  8|      3|   2|      1|     1|        1|
2003-1-9|200301|   2003|    1|  9|      4|   2|      1|     1|        1|
|2003-1-10|200301|   2003|    110|      5|   2|      1|     1|        1|
|2003-1-11|200301|   2003|    111|      6|   2|      1|     2|        1|
|2003-1-12|200301|   2003|    112|      7|   2|      1|     2|        1|
|2003-1-13|200301|   2003|    113|      1|   3|      1|     2|        1|
|2003-1-14|200301|   2003|    114|      2|   3|      1|     2|        1|
|2003-1-15|200301|   2003|    115|      3|   3|      1|     2|        1|
|2003-1-16|200301|   2003|    116|      4|   3|      1|     2|        2|
|2003-1-17|200301|   2003|    117|      5|   3|      1|     2|        2|
|2003-1-18|200301|   2003|    118|      6|   3|      1|     2|        2|
|2003-1-19|200301|   2003|    119|      7|   3|      1|     2|        2|
|2003-1-20|200301|   2003|    120|      1|   4|      1|     2|        2|
+---------+------+-------+-----+---+-------+----+-------+------+---------+
only showing top 20 rows

注册为临时表:

scala> tbStockDS.createOrReplaceTempView("tbStock")

scala> tbDateDS.createOrReplaceTempView("tbDate")

scala> tbStockDetailDS.createOrReplaceTempView("tbStockDetail")

7.3 计算所有订单中每年的销售单数、销售总额

统计所有订单中每年的销售单数、销售总额
三个表连接后以 count(distinct a.ordernumber) 计销售单数,以 sum(b.amount) 计销售总额

示例代码:

SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount)
FROM tbStock a
    JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
    JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear
ORDER BY c.theyear

spark.sql("SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear ORDER BY c.theyear").show

输出结果如下:

+-------+---------------------------+--------------------+                      
|theyear|count(DISTINCT ordernumber)|         sum(amount)|
+-------+---------------------------+--------------------+
|   2004|                       1094|   3268115.499199999|
|   2005|                       3828|1.3257564149999991E7|
|   2006|                       3772|1.3680982900000006E7|
|   2007|                       4885|1.6719354559999993E7|
|   2008|                       48611.467429530000001E7|
|   2009|                       2619|   6323697.189999999|
|   2010|                         94|  210949.65999999997|
+-------+---------------------------+--------------------+

7.4 计算所有订单每年最大金额订单的销售额

目标:统计每年最大金额订单的销售额。
第一步、统计每年,每个订单一共有多少销售额
示例代码:

SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount
FROM tbStock a
    JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
GROUP BY a.dateid, a.ordernumber

spark.sql("SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber").show

输出结果如下:

+----------+------------+------------------+
|    dateid| ordernumber|       SumOfAmount|
+----------+------------+------------------+
|  2008-4-9|BYSL00001175|             350.0|
2008-5-12|BYSL00001214|             592.0|
2008-7-29|BYSL00011545|            2064.0|
|  2008-9-5|DGSL00012056|            1782.0|
2008-12-1|DGSL00013189|             318.0|
|2008-12-18|DGSL00013374|             963.0|
|  2009-8-9|DGSL00015223|            4655.0|
2009-10-5|DGSL00015585|            3445.0|
2010-1-14|DGSL00016374|            2934.0|
2006-9-24|GCSL00000673|3556.1000000000004|
2007-1-26|GCSL00000826| 9375.199999999999|
2007-5-24|GCSL00001020| 6171.300000000002|
|  2008-1-8|GCSL00001217|            7601.6|
2008-9-16|GCSL00012204|            2018.0|
2006-7-27|GHSL00000603|            2835.6|
|2006-11-15|GHSL00000741|           3951.94|
|  2007-6-6|GHSL00001149|               0.0|
2008-4-18|GHSL00001631|              12.0|
2008-7-15|GHSL00011367|             578.0|
|  2009-5-8|GHSL00014637|            1797.6|
+----------+------------+------------------+

第二步、以上一步查询结果为基础表,和表 tbDate 使用 dateid join,求出每年最大金额订单的销售额
示例代码:

SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount
FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount
    FROM tbStock a
        JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
    GROUP BY a.dateid, a.ordernumber
    ) c
    JOIN tbDate d ON c.dateid = d.dateid
GROUP BY theyear
ORDER BY theyear DESC

spark.sql("SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber ) c JOIN tbDate d ON c.dateid = d.dateid GROUP BY theyear ORDER BY theyear DESC").show

输出结果如下:

+-------+------------------+                                                    
|theyear|       SumOfAmount|
+-------+------------------+
|   2010|13065.280000000002|
|   2009|25813.200000000008|
|   2008|           55828.0|
|   2007|          159126.0|
|   2006|           36124.0|
|   2005|38186.399999999994|
|   200423656.79999999997|
+-------+------------------+

7.5 计算所有订单中每年最畅销货品

目标:统计每年最畅销货品(哪个货品销售额 amount 在当年最高,哪个就是最畅销货品)。
第一步、求出每年每个货品的销售额
示例代码:

SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
FROM tbStock a
    JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
    JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear, b.itemid

spark.sql("SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid").show

输出结果如下:

+-------+--------------+------------------+                                     
|theyear|        itemid|       SumOfAmount|
+-------+--------------+------------------+
|   2004|43824480810202|           4474.72|
|   2006|YA214325360101|             556.0|
|   2006|BT624202120102|             360.0|
|   2007|AK215371910101|24603.639999999992|
|   2008|AK216169120201|29144.199999999997|
|   2008|YL526228310106|16073.099999999999|
|   2009|KM529221590106| 5124.800000000001|
|   2004|HT224181030201|2898.6000000000004|
|   2004|SG224308320206|           7307.06|
|   2007|04426485470201|14468.800000000001|
|   2007|84326389100102|           9134.11|
|   2007|B4426438020201|           19884.2|
|   2008|YL427437320101|12331.799999999997|
|   2008|MH215303070101|            8827.0|
|   2009|YL629228280106|           12698.4|
|   2009|BL529298020602|            2415.8|
|   2009|F5127363019006|             614.0|
|   2005|24425428180101|          34890.74|
|   2007|YA214127270101|             240.0|
|   2007|MY127134830105|          11099.92|
+-------+--------------+------------------+

第二步、在第一步的基础上,统计每年单个货品中的最大金额
示例代码:

SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount
FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
    FROM tbStock a
        JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
        JOIN tbDate c ON a.dateid = c.dateid
    GROUP BY c.theyear, b.itemid
    ) d
GROUP BY d.theyear

spark.sql("SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear").show

输出结果如下:

+-------+------------------+                                                    
|theyear|       MaxOfAmount|
+-------+------------------+
|   2007|           70225.1|
|   2006|          113720.6|
|   2004|53401.759999999995|
|   2009|           30029.2|
|   2005|56627.329999999994|
|   2010|            4494.0|
|   200898003.60000000003|
+-------+------------------+

第三步、用最大销售额和统计好的每个货品的销售额 join,以及用年 join,得到最畅销货品那一行信息
示例代码:

SELECT DISTINCT e.theyear, e.itemid, f.MaxOfAmount
FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
    FROM tbStock a
        JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
        JOIN tbDate c ON a.dateid = c.dateid
    GROUP BY c.theyear, b.itemid
    ) e
    JOIN (SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount
        FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
            FROM tbStock a
                JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
                JOIN tbDate c ON a.dateid = c.dateid
            GROUP BY c.theyear, b.itemid
            ) d
        GROUP BY d.theyear
        ) f ON e.theyear = f.theyear
        AND e.SumOfAmount = f.MaxOfAmount
ORDER BY e.theyear

spark.sql("SELECT DISTINCT e.theyear, e.itemid, f.maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) e JOIN (SELECT d.theyear, MAX(d.sumofamount) AS maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear ) f ON e.theyear = f.theyear AND e.sumofamount = f.maxofamount ORDER BY e.theyear").show

输出结果如下:

+-------+--------------+------------------+                                     
|theyear|        itemid|       maxofamount|
+-------+--------------+------------------+
|   2004|JY424420810101|53401.759999999995|
|   2005|24124118880102|56627.329999999994|
|   2006|JY425468460101|          113720.6|
|   2007|JY425468460101|           70225.1|
|   2008|E2628204040101| 98003.60000000003|
|   2009|YL327439080102|           30029.2|
|   2010|SQ429425090101|            4494.0|
+-------+--------------+------------------+
04-28 18:23