# coding=utf-8
# 导包
import sys
import os
import time

from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("spark://node1.itcast.cn:7077").setAppName("testcluster")
# 构建一个SparkContext类的对象,要求必须传递一个SparkConf类的对象
sc = SparkContext(conf=conf)

# 1、创建一个1-10数组的RDD,将所有元素*2形成新的RDD
rdd = sc.parallelize(list(range(1, 11)))
rdd = rdd.map(lambda x: x * 2)
print(rdd.collect())

# 2、创建一个10-20数组的RDD,使用mapPartitions将所有元素*2形成新的RDD .
rdd = sc.parallelize(list(range(10, 21)))
rdd = rdd.mapPartitions(lambda x: [i * 2 for i in x])
print(rdd.collect())

# 3. 创建一个元素为 1-5 的RDD,运用 flatMap创建一个新的 RDD,新的 RDD 为原 RDD 每个元素的 平方和三次方 来组成 1,1,4,8,9,27…
rdd = sc.parallelize(list(range(1, 6)))
rdd = rdd.flatMap(lambda x: [x ** 2, x ** 3])
print(rdd.collect())

# 4、创建一个 4 个分区的 RDD数据为list[10,20,30,40,50,60],使用glom将每个分区的数据放到一个数组。
rdd = sc.parallelize([10, 20, 30, 40, 50, 60], numSlices=4)
rdd = rdd.glom()
print(rdd.collect())

# 5、创建一个 RDD数据为list[1, 3, 4, 20, 4, 5, 8],按照元素的奇偶性进行分组。
rdd = sc.parallelize([1, 3, 4, 20, 4, 5, 8])
rdd = rdd.groupBy(lambda x: 'odd' if x % 2 == 1 else 'even')
rdd = rdd.mapValues(lambda x: list(x))
print(rdd.collect())

# 6、创建一个 RDD(由字符串组成)list["xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"],
# 过滤出一个新 RDD(包含“xiao”子串)。
rdd = sc.parallelize(["xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"])
rdd = rdd.filter(lambda x: 'xiao' in x)
print(rdd.collect())

# 7、创建一个 RDD数据为List[10,10,2,5,3,5,3,6,9,1],对 RDD 中元素执行去重操作。
rdd = sc.parallelize([10, 10, 2, 5, 3, 5, 3, 6, 9, 1])
rdd = rdd.distinct()
print(rdd.collect())

# 8、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算差集,两个都算。
rdd1 = sc.parallelize(range(1, 7))
rdd2 = sc.parallelize(range(4, 11))
rdd1_res = rdd1.subtract(rdd2)
rdd2_res = rdd2.subtract(rdd1)
print(rdd1_res.collect())
print(rdd2_res.collect())

# 9、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算交集。
rdd1 = sc.parallelize(range(1, 7))
rdd2 = sc.parallelize(range(4, 11))
rdd_res = rdd1.intersection(rdd2)
print(rdd_res.collect())


# 10、用户表(id,name,age,gender):
# 001,刘向前,18,0
# 002,冯 剑,28,1
# 003,李志杰,38,0
# 004,郭 鹏,48,1
# (1)使用累加器accumulator,计算文件包含学生信息个数。
# (2)要求,输出用户信息,gender必须为男或者女,不能为0,1
def acc_count(x):
global acc
acc += 1


acc = sc.accumulator(0)
rdd = sc.parallelize([
("001", "刘向前", 18, 0),
("002", "冯 剑", 28, 1),
("003", "李志杰", 38, 0),
("004", "郭 鹏", 48, 1)
])

rdd_count = rdd.foreach(acc_count)
print(acc)
rdd = rdd.map(lambda x: (x[0], x[1], x[2], '男' if x[3] == 1 else '女'))
print(rdd.collect())

sc.stop()


rdd = sc.parallelize([
("Tom", "DataBase", 80),
("Tom", "Algorithm", 50),
("Tom", "DataStructure", 60),
("Jim", "DataBase", 90),
("Jim", "Algorithm", 60),
("Jim", "DataStructure", 80)
])

# 1、多少名学生
rdd_1 = rdd.map(lambda x: x[0])
stu_count = rdd_1.distinct().count()
print(stu_count)

# 2、多少门课程
rdd_2 = rdd.map(lambda x: x[1])
rdd_2 = rdd_2.distinct().count()
print(rdd_2)

# 3、Tom总成绩平均分
rdd_3 = rdd.filter(lambda x: x[0] == 'Tom')
sum_score = rdd_3.map(lambda x: x[2]).sum()
avg_score = sum_score / rdd_3.count()
print(sum_score, avg_score)

# 4、每名同学的选修的课程数量
rdd_4 = rdd.map(lambda x: (x[0], 1))
rdd_4 = rdd_4.reduceByKey(lambda temp, value: temp + value)
print(rdd_4.collect())

# 5、该系DataBase课程共有多少人选修
rdd_5 = rdd.filter(lambda x: x[1] == 'DataBase')
rdd_5 = rdd_5.map(lambda x: (x[1], 1))
rdd_5 = rdd_5.reduceByKey(lambda temp, value: temp + value)
print(rdd_5.collect())

# 6、各门课程平均分是多少
rdd_6_score = rdd.map(lambda x: (x[1], x[2])).reduceByKey(lambda temp, item: temp + item)
rdd_6_count = rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda temp, item: temp + item)
rdd_6_result = rdd_6_score.join(rdd_6_count).mapValues(lambda x: x[0] / x[1])
print(rdd_6_result.collect())

# 7、使用累加器计算共有多少人选修DataBase这门课
acc = sc.accumulator(0)


def acc_count(x):
global acc
if x[1] == 'DataBase':
acc += 1


rdd.foreach(acc_count)
print(acc)

07-01 23:08