Spark SQL

将SQL语句转为底层的Spark作业执行,支持大量数据分析算法。

数据抽象DataFrame

加载数据源生成结构化数据
DataFrame的创建

import org.apache.spark.sql.SparkSession
import spark.implicits._   //将底层数据源隐式转为DataFrame
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.json("file:///usr/local/spark.examples/src/main/resources/people.json")

DataFrame常用操作

df.show()        //显示数据表
df.printSchema()		//打印模式
df.select(df("name"),df("age")).show()	//查询
df.filter(df("age")>20).show() //过滤
df.groupBy(df("age"))	//分组聚合
df.sort(df("age"))  //升序
df.rdd.saveAsTextFile("....") //保存为文件

RDD转换为DataFrame

反射机制推断RDD模式
txt文件
Spark SQL入门-LMLPHP

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.SparkSession
import spark.implicits._   //将底层数据源隐式转为DataFrame
case Person(name:String,age:Long)    //定义要转换成的DF类
val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark.examples/src/main/resources/people.json").map(line => line.split(",")).map(attributes => Person(attributes(0),attributes(1).trim.toInt)).toDF
//spark是上段代码的SparkSession对象,加载文件成为RDD,分割",",RDD是一个个Array,把每个Array生成对象Person,转为DF
peopleDF.createOrReplaceTempView("people")  //注册为临时表
val resultDF = spark.sql(“select name,age from people where age >20”) //使用SQL语句进行查询
resultDF.show()

结果如下:
Spark SQL入门-LMLPHP
使用编程方式定义RDD模式
无法提前 cass class的情况
三个步骤

  1. 制作“表头”
  2. 制作“表的记录”
  3. 拼装表头和记录
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//制作字段 StructField 第一个参数数字段名,第二个是类型,第三个是是否可null
val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
val schema = StructType(fields) //制作成模式
//加载数据源成为RDD
...
//制作表中记录 line =>Array => Row
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim.toInt))
//拼装
val peopleDF = spark.createDataFrame(rowRDD, schema)

通过JDBC连接数据库

 //带参数启动spark-shell
./bin/spark-shell  --jars /usr/local/spark/jars/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar  --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.46-bin.jar
 //读
 val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()

//写
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

//下面我们设置两条数据表示两个学生信息
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
//下面要设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
//下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "root") //表示用户名是root
prop.put("password", "hadoop") //表示密码是hadoop
prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "spark.student", prop)

10-07 20:00