大数据技术是数据的集合以及对数据集合的操作技术的统称,具体来说:
1、数据集合:会涉及数据的搜集、存储等,搜集会有很多技术,存储技术现在比较经典方案是使用Hadoop,不过也很多方案采用Kafka。
2、对数据集合的操作技术,目前全球最火爆的是Spark。
Spark的框架实现是语言是Scala,首选应用程序开发语言也是Scala,所以Scala对集合以及集合操作的支持就非常重要且非常强大,因此Spark中对很多数据操作算子和Scala中对集合的操作是一样的。
Scala中数据的创建与操作:
1、最原始的创建数据的方式是形如:val array = new ArrayInt,指定数组的类型是Int且其固定长度是5个元素;
2、数据的索引下标是从0开始;对数组元素访问的时候下标的范围在0到length-1的长度,超过length-1会出现java.lang.ArrayIndexOutOfBoundsException异常;
3、最常用和经典的创建数组的方式是形如:val array = ArrayInt,直接通过Array类名并传入参数的方式来创建数组实例;在背后的实现是调用Array的
工厂方法模式apply来构建出数组及数组的内容;
4、可以省略[Int]这个泛型类型,是因为Scala有类型推导能力,已经传入了当前数组的值,所以可以根据值来推导出类型;
5、形如“val names = Array(“Scala”, “Kafka”, “Spark”)”,关于Array本身在底层的实现是借助了JVM平台上的Java语言的数组实现,是不可变的;
6、如果想使用可变数组,首先需导入“import
scala.collection.mutable.ArrayBuffer”,然后使用ArrayBuffer这个可变数组;
7、关于ArrayBuffer增加元素默认情况下都是在ArrayBuffer末尾增加元素的,效率非常高;
8、当需要多线程并发操作的时候,把ArrayBuffer转换成为Array就非常重要,其实,即使是Array,其本身虽然不可变动(元素不可删减),但是可以修改Array中每个元素的内容,所以多线程操作的时候,还是必须考虑;
9、如果想在已经有的数组的基础上通过作用于每个元素来生成新的元素构成新的数组,则可以通过“yield”语法来完成,这在大数据中意义重大:
第一点:它是在不修改已经有的Array的内容的基础上完成的,非常适合于大数据的处理;
第二点:在大数据处理中,例如Spark中业务操作的核心思想就类似于“yield”,来通过使用function对每个元素操作获得新的元素构成的新集合,其实就是新的RDD,例如MapPartitionsRDD
10、集合的操作往往可以通过丰富的操作算子,例如filter来过滤需要条件的元素,例如map来进行每一个元素的加工;
以下是示例代码
object HelloArrayOps { def main(args: Array[String]): Unit = { //最原始创建数组array1的方式,这个数组包含5个Int类型元素,5个元素初始化为0 val array1 = new Array[Int](5) //创建一个数组array2,用(1 2 3 4 5)5个Int型数据进行初始化 val array2 = Array[Int](1,2,3,4,5) //可以省略[Int]这个泛型类型,是因为Scala有类型推导能力,已经传进了当前数组的值,所以可以根据值来推导出类型; val array3 = Array(1,2,3,4,5) //以下创建数组方法与val array2 = Array(1,2,3,4,5)等价 val array4 = Array.apply(1,2,3,4,5) //虽然array4是用val定义的,但数组元素的值是可以被再次赋值,第一个元素的值被修改为10 array4(0) = 10 //通过for循环打印array4所有元素 for(item <- array4) println(item) //定义一个有3个字符串类型Array数组names val names = Array("Scala", "Kafka", "Spark") for(item <- names) println(item) //导入包scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer //定义一个Int类型的ArrayBuffer val arrayBuffer = ArrayBuffer[Int]() //向数组arrayBuffer中追加一个元素 arrayBuffer += 1 arrayBuffer += 2 arrayBuffer += 3 //将序列(4 5 6 7 8 9 10)追加到数组arrayBuffer中 arrayBuffer += (4,5,6,7,8,9,10) //将一个Array数组追加到arrayBuffer中 arrayBuffer ++= Array(1,2,3) //通过insert方法在arrayBuffer数组中length-1的位置插入指定元素,以下是插入两个元素100,1000 //此时arrayBuffer的值为:1 2 3 4 5 6 7 8 9 10 1 2 100 1000 3 arrayBuffer.insert(arrayBuffer.length - 1, 100,1000) //通过remove方法删除arrayBuffer中指定位置的元素 //此时arrayBuffer的值为:1 2 3 4 5 6 7 8 9 10 1 2 100 3 arrayBuffer.remove(arrayBuffer.length - 2) //当需要多线程并发操作的时候,把ArrayBuffer转换成为Array就非常重要; arrayBuffer.toArray for(item <- arrayBuffer) println(item) val array5 = Array(1,2,3,4,5) print("array5-1: ") for(i <- 0 until array5.length) print(array5(i) + " ") //打印array5-1: 1 2 3 4 5 println print("array5-2: ") //以step等于2的方式遍历array5数组元素 for(i <- 0 until (array5.length, 2)) print(array5(i) + " ") //打印array5-2: 1 3 5 println print("array5-3: ") //以反序方式从数组末尾到数组头遍历元素 for(i <- (0 until array5.length).reverse) print(array5(i) + " ") //打印array5-3: 5 4 3 2 1 println //数组求各 println("array5 Sum = " + array5.sum) //打印array5 Sum = 15 //数组求最大数 println("array5 Max = " + array5.max) //打印array5 Max = 5 val array6 = arrayBuffer.toArray //array6的值:1 2 3 4 5 6 7 8 9 10 1 2 100 3 //对数组进行升序排序 scala.util.Sorting.quickSort(array6) print("array6-1: ") for(i <- 0 until array6.length) print(array6(i) + " ") //排序后打印array6-1: 1 1 2 2 3 3 4 5 6 7 8 9 10 100 println print("array6-2: ") println(array6.mkString(",")) //打印array6-2: 1,1,2,2,3,3,4,5,6,7,8,9,10,100 print("array6-3: ") println(array6.mkString("****", ",", "****")) //打印array6-3: ****1,1,2,2,3,3,4,5,6,7,8,9,10,100**** val array7 = arrayBuffer.toArray print("array7: ") for(i <- 0 until array7.length) print(array7(i) + " ") //打印array7: 1 2 3 4 5 6 7 8 9 10 1 2 100 3 println //"yield"语法在已经有的数组的基础上通过作用于每个元素来生成新的元素构成新的数组,这里对每一个元素都加1 val arrayAddedOne = for(item <- array7) yield item + 1 println("arrayAddedOne: " + arrayAddedOne.mkString(" ")) //打印arrayAddedOne: 2 3 4 5 6 7 8 9 10 11 2 3 101 4 //提取数组array7中的偶数然后用"yield"语法对这些偶数进行加1操作,然后得到新的数组赋给array8 val array8 = for(item <- array7 if item % 2 == 0) yield item + 1 println("array8: " + array8.mkString(" ")) //打印array8: 3 5 7 9 11 3 101 val array9 = array7 print("array9-1: ") //通过filter提取偶数 println(array9.filter{x => x % 2 == 0}.mkString(" ")) //打印array9-1: 2 4 6 8 10 2 100 print("array9-2: ") //同样是通过filter提取偶数,但采用的占位符方式 println(array9.filter(_ % 2 == 0).mkString(" ")) //打印array9-2: 2 4 6 8 10 2 100 print("array9-3: ") //采用占位符方式提取数组偶数并进行乘10操作 println(array9.filter{_ % 2 == 0}.map { x => x * 10}.mkString(" ")) //打印array9-3: 20 40 60 80 100 20 1000 print("array9-4: ") //等价于array9-3 println(array9.filter{_ % 2 == 0}.map { _ * 10}.mkString(" ")) //打印array9-4: 20 40 60 80 100 20 1000 } } package com.dt.scala.moguyun /** * 1,在Scala的集合体系中Iterable是共同的Trait,Iterable要求继承者实现一些共同的方法,例如对元素的遍历等; * 2,Array是一个非常基础的数据结构,不从属于Scala集合的体系; * 3,Scala的集合体系中集合分为可变集合与不可变集合之分;不可变的集合在scala.collection.immutable包中, * 可变的集合在scala.collection.mutable; * 4, List是元素的列表集合,是不可变的: * 第一点:List中head是指第一个元素,tail是指剩下的元素构成的List集合; * 第二点:使用::这个操作符来把List和其它的元素进行组拼来构建新的List * 第三点:如果集合中没有元素的话,此时为Nil,例如说List中只有一个元素,那么head就是这个元素本身, * tail操作就会返回Nil * 5,LinkedList是元素可变的列表 * 6, Set是元素不可重复的集合,且元素是无序的;HashSet中的元素不可变且不可重复且不能够保证顺序; * 7,LinkedHashSet会维护元素的插入顺序; * 8, SortedSet会自动的把插入的元素进行排序; */ object Functional_Iterable { def main(args: Array[String]): Unit = { println("============list============") /* List,Tuple长度固定,元素不可变 */ val range = 1 to 10 val list = List(1, 2, 3, 4, 5) println(list.head) println(list.tail) println(0 :: list) //元素与集合相连是用::,而集合与集合相与是用::: println(list(2)) //list(1) = 10 //报错:List是不可变的,里面元素不可修改 println("============ListBuffer============") /* ListBuffer长度可变,元素可变 */ val listBuffer = scala.collection.mutable.ListBuffer(1,2,3) listBuffer.append(5)//添加一个元素 listBuffer += 8//添加一个元素 listBuffer -= 2//移除第二个元素 println("listBuffer: " + listBuffer) println("============LinkedList============") var linkedList = scala.collection.mutable.LinkedList(1, 2, 3, 4, 5) println(linkedList.elem) //这是取出第一个元素,会改变集合内容 println(linkedList.head) //第一个元素 println(linkedList.tail) while (linkedList != Nil) { println(linkedList.elem) linkedList = linkedList.tail //重新赋值了 } println("linkedList: " + linkedList) //+: 在列表的头部添加一个元素 //:+ 在列表的尾部添加一个元素 //:: 在列表的头部添加一个元素 val copied = linkedList.+:(9)//.+:等价于+: println(copied) println("============Set============") val set = Set(1, 2, 3, 4, 5) println(set) val setMore = set + 10 println("Set: " + setMore) println("============HashSet============") val hashSet = scala.collection.mutable.HashSet(1, 2, 3) hashSet += 5 hashSet += 50 println("hashSet: " + hashSet) println("============LinkedHashSet============") val linkedHashSet = scala.collection.mutable.LinkedHashSet(1, 2, 3) linkedHashSet += 5 linkedHashSet += 50 linkedHashSet += 4 println("linkedHashSet: " + linkedHashSet) println("============SortedSet============") val sortedSet = scala.collection.mutable.SortedSet(1, 2, 3, 5, 50, 4) println("SortedSet" + sortedSet) println(List[String]("I am into Spark so much", "Scala is powerful").flatMap { x => x.split(" ") }.map { x => (x, 1) }.map(x => x._2).reduce(_ + _)) println(List[String]("I am into Spark so much", "Scala is powerful").flatMap { x => x.split(" ") }.map { (_, 1) }.map(_._2).reduce(_ + _)) println(List[String]("I am into Spark so much", "Scala is powerful").flatMap { x => x.split(" ") }.map { (_, 1) }.map(_._2).reduce((x, y) => x + y)) List(0, 1, 2, 3, 4, 5).foreach { x => println(x) } List(0, 1, 2, 3, 4, 5).foreach { println(_) } List(0, 1, 2, 3, 4, 5).foreach { println _ } List(0, 1, 2, 3, 4, 5).foreach { println } List(0, 1, 2, 3, 4, 5).foreach(println) } }