这是这门课第一次接触机器学习,主题是Predicting Movie Ratings。难度比上一次作业要简单点。。上一次作业真的挺难。。。相关ipynb文件见我github

这里我们会用到Spark MLlib的Alternating Least Squares方法去做一些比之前复杂的事情。这次lab的数据集是500000次电影打分,环境默认配置好了。数据集可以从这里下载。

Part 0 Preliminaries

这部分主要是读取数据,转换为RDD,解析每行的数据。打分的数据格式为:

UserID::MovieID::Rating::Timestamp

电影数据的格式为:

MovieID::Title::Genres

其中Genres(类型)的格式为:

Genres1|Genres2|Genres3|...

我们要做的是把打分数据解析成(UserID, MovieID, Rating),把电影数据解析成(MovieID, Title)。这里我们丢掉了类型特征,因为这个lab做的比较浅,不需要这个特征。

import sys
import os
from test_helper import Test baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab4', 'small') ratingsFilename = os.path.join(baseDir, inputPath, 'ratings.dat.gz')
moviesFilename = os.path.join(baseDir, inputPath, 'movies.dat')
numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies = sc.textFile(moviesFilename) def get_ratings_tuple(entry):
""" Parse a line in the ratings dataset
Args:
entry (str): a line in the ratings dataset in the form of UserID::MovieID::Rating::Timestamp
Returns:
tuple: (UserID, MovieID, Rating)
"""
items = entry.split('::')
return int(items[0]), int(items[1]), float(items[2]) def get_movie_tuple(entry):
""" Parse a line in the movies dataset
Args:
entry (str): a line in the movies dataset in the form of MovieID::Title::Genres
Returns:
tuple: (MovieID, Title)
"""
items = entry.split('::')
return int(items[0]), items[1] ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache() ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count() print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)
print 'Ratings: %s' % ratingsRDD.take(3)
print 'Movies: %s' % moviesRDD.take(3) assert ratingsCount == 487650
assert moviesCount == 3883
assert moviesRDD.filter(lambda (id, title): title == 'Toy Story (1995)').count() == 1
assert (ratingsRDD.takeOrdered(1, key=lambda (user, movie, rating): movie)
== [(1, 1, 5.0)])

运行结果如下

There are 487650 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 914, 3.0), (1, 2355, 5.0)]
Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]

后面我们会涉及到很多排序的操作,一般我们用sortByKey()来搞定。但是实际中,当我们用这个方法的时候,结果可能是不确定的,因为当出现key相同的情况时,排序可能会不确定。

一个更好的方法是对key和value都排序,譬如实现如下函数,再用sortBy()方法,这样能确保排序的正确。

def sortFunction(tuple):
""" Construct the sort string (does not perform actual sorting)
Args:
tuple: (rating, MovieName)
Returns:
sortString: the value to sort with, 'rating MovieName'
"""
key = unicode('%.3f' % tuple[0])
value = tuple[1]
return (key + ' ' + value) print oneRDD.sortBy(sortFunction, True).collect()
print twoRDD.sortBy(sortFunction, True).collect()

当我们只要看排序后的部分结果,我们可以用takeOrdered方法。

oneSorted1 = oneRDD.takeOrdered(oneRDD.count(),key=sortFunction)
twoSorted1 = twoRDD.takeOrdered(twoRDD.count(),key=sortFunction)
print 'one is %s' % oneSorted1
print 'two is %s' % twoSorted1
assert oneSorted1 == twoSorted1

Part 1 Basic Recommendations

一般推荐电影的时候,一个很好的办法是推荐评分最高的电影。我们这部分就要实现这个功能。

Number of Ratings and Average Ratings for a Movie

这里我们要实现一个函数,输入是(MovieID, (Rating1, Rating2, Rating3, ...)),输出是(MovieID, (number of ratings, averageRating))。

# TODO: Replace <FILL IN> with appropriate code

# First, implement a helper function `getCountsAndAverages` using only Python
def getCountsAndAverages(IDandRatingsTuple):
""" Calculate average rating
Args:
IDandRatingsTuple: a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...))
Returns:
tuple: a tuple of (MovieID, (number of ratings, averageRating))
"""
Mid = IDandRatingsTuple[0]
ratingTuple = IDandRatingsTuple[1]
newTuple = (len(ratingTuple), float(sum(ratingTuple))/len(ratingTuple))
retuple = (Mid, newTuple)
return retuple

Movies with Highest Average Ratings

要实现这个功能分三步:从评分数据里提取(MovieID, Rating);计算每个电影的平均分数;把这个结果和电影数据join起来从而得到(average rating, movie name, number of ratings)

# TODO: Replace <FILL IN> with appropriate code

# From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of
# the (MovieID, iterable of Ratings for that MovieID)
movieIDsWithRatingsRDD = ratingsRDD.map(lambda tuple:(tuple[1],tuple[2])).groupByKey() print 'movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3) # Using `movieIDsWithRatingsRDD`, compute the number of ratings and average rating for each movie to
# yield tuples of the form (MovieID, (number of ratings, average rating))
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(getCountsAndAverages)
print 'movieIDsWithAvgRatingsRDD: %s\n' % movieIDsWithAvgRatingsRDD.take(3) # To `movieIDsWithAvgRatingsRDD`, apply RDD transformations that use `moviesRDD` to get the movie
# names for `movieIDsWithAvgRatingsRDD`, yielding tuples of the form
# (average rating, movie name, number of ratings)
movieNameWithAvgRatingsRDD = moviesRDD.join(movieIDsWithAvgRatingsRDD).map(lambda x : (x[1][1][1], x[1][0],x[1][1][0])) print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3)

Movies with Highest Average Ratings and more than 500 reviews

这里我们选出评论数超过500里的评分最高的20部电影。

# TODO: Replace <FILL IN> with appropriate code

# Apply an RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with
# ratings from more than 500 people. We then use the `sortFunction()` helper function to sort by the
# average rating to get the movies in order of their rating (highest rating first)
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
.filter(lambda x : x[2] > 500)
.sortBy(sortFunction, False))
print 'Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20)

Part 2 Collaborative Filtering

现在开始要接触到Spark MLlib了,我们要用到一个方法叫协同过滤(Collaborative filtering)。协同过滤的主要思想是,我们通过找到其他和你类似用户后,给你做推荐。具体到电影推荐,我们从一个用户和电影的矩阵开始,每一行代表一个用户,每一列代表一部电影,它们的值就是该用户对该电影的评分。由于实际这样的矩阵是很稀疏的,这里一般采取的办法是把矩阵分解成两个矩阵,一个描述了用户的潜在性质,一个描述了电影的潜在性质。

那么我们如何求这两个分解后的矩阵呢。我们需要最小化平方误差损失函数,这里用到的方法是ALS(Alternating Least Squares)。这个方法会随机的填充这两个矩阵,然后计算损失函数,再进行迭代更新,这里alternating是指,我们进行迭代更新时,是保持其中一个矩阵不变,更新另外一个,这样“交替”进行。

Creating a Training Set

在开始机器学习之前,我们把数据分成三份:

  • A training set (RDD), which we will use to train models
  • A validation set (RDD), which we will use to choose the best model
  • A test set (RDD), which we will use for our experiments

我们可以用randomSplit()函数来随机分。

trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0L)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
validationRDD.count(),
testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3) assert trainingRDD.count() == 292716
assert validationRDD.count() == 96902
assert testRDD.count() == 98032 assert trainingRDD.filter(lambda t: t == (1, 914, 3.0)).count() == 1
assert trainingRDD.filter(lambda t: t == (1, 2355, 5.0)).count() == 1
assert trainingRDD.filter(lambda t: t == (1, 595, 5.0)).count() == 1 assert validationRDD.filter(lambda t: t == (1, 1287, 5.0)).count() == 1
assert validationRDD.filter(lambda t: t == (1, 594, 4.0)).count() == 1
assert validationRDD.filter(lambda t: t == (1, 1270, 5.0)).count() == 1 assert testRDD.filter(lambda t: t == (1, 1193, 5.0)).count() == 1
assert testRDD.filter(lambda t: t == (1, 2398, 4.0)).count() == 1
assert testRDD.filter(lambda t: t == (1, 1035, 5.0)).count() == 1

Root Mean Square Error (RMSE)

RMSE是衡量一个模型好坏的重要标志。在Spark 1.4中,有RegressionMetrics模块去计算RMSE,不过我们的环境是Spark 1.3,所以需要自己写函数来实现。

# TODO: Replace <FILL IN> with appropriate code
import math def computeError(predictedRDD, actualRDD):
""" Compute the root mean squared error between predicted and actual
Args:
predictedRDD: predicted ratings for each movie and each user where each entry is in the form
(UserID, MovieID, Rating)
actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
Returns:
RSME (float): computed RSME value
"""
# Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
predictedReformattedRDD = predictedRDD.map(lambda x :((x[0],x[1]),x[2])) # Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
actualReformattedRDD = actualRDD.map(lambda x :((x[0],x[1]),x[2])) # Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each
# RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
squaredErrorsRDD = predictedReformattedRDD.join(actualReformattedRDD).map(lambda x : (x[1][0]-x[1][1])**2) # Compute the total squared error - do not use collect()
totalError = squaredErrorsRDD.reduce(lambda a,b : a+b) # Count the number of entries for which you computed the total squared error
numRatings = squaredErrorsRDD.count() # Using the total squared error and the number of entries, compute the RSME
return math.sqrt(float(totalError)/numRatings) # sc.parallelize turns a Python list into a Spark RDD.
testPredicted = sc.parallelize([
(1, 1, 5),
(1, 2, 3),
(1, 3, 4),
(2, 1, 3),
(2, 2, 2),
(2, 3, 4)])
testActual = sc.parallelize([
(1, 2, 3),
(1, 3, 5),
(2, 1, 5),
(2, 2, 1)])
testPredicted2 = sc.parallelize([
(2, 2, 5),
(1, 2, 5)])
testError = computeError(testPredicted, testActual)
print 'Error for test dataset (should be 1.22474487139): %s' % testError testError2 = computeError(testPredicted2, testActual)
print 'Error for test dataset2 (should be 3.16227766017): %s' % testError2 testError3 = computeError(testActual, testActual)
print 'Error for testActual dataset (should be 0.0): %s' % testError3

Using ALS.train()

这一部分,我们要用到MLlib的ALS.train()。我们会用这个方法训练多个模型,来选择一个最好的模型。主要的步骤是:

首先我们选择模型的参数,最重要的参数是rank,这个rank是指user矩阵里的行数,或者movie矩阵里的列数。一般来说一个比较小的rank值会欠拟合,一个较大的rank值会过拟合。我们在这里选择4,8和12。在确定了rank后,我们看ALS.train(trainingRDD, rank, seed=seed, iterations=iterations, lambda_=regularizationParameter),其他参数的选择是iterations为5,regularizationParameter为0.1。

而预测打分,我们用model.predictAll()函数。最后把validation data上表现最好的模型作为最终的模型。由于计算RMSE并不是分布式实现,所以计算起来会很慢。

# TODO: Replace <FILL IN> with appropriate code
from pyspark.mllib.recommendation import ALS validationForPredictRDD = validationRDD.map(lambda x :(x[0],x[1])) seed = 5L
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.03 minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
lambda_=regularizationParameter)
predictedRatingsRDD = model.predictAll(validationForPredictRDD)
error = computeError(predictedRatingsRDD, validationRDD)
errors[err] = error
err += 1
print 'For rank %s the RMSE is %s' % (rank, error)
if error < minError:
minError = error
bestRank = rank print 'The best model was trained with rank %s' % bestRank

这段代码是训练模型,比较关键。

Testing Your Model

这里把上面选择的模型和test data来算RMSE。

# TODO: Replace <FILL IN> with appropriate code
myModel = ALS.train(trainingRDD,rank,seed=seed,iterations=iterations,lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda x: (x[0],x[1]))
predictedTestRDD = myModel.predictAll(testForPredictingRDD) testRMSE = computeError(testRDD, predictedTestRDD) print 'The model had a RMSE on the test set of %s' % testRMSE

Comparing Your Model

我们可以用训练好的模型求测试集上的RMSE来看一个模型的好坏,也可以求训练集的平均打分,再求这个RDD和测试集的RDD的RMSE的值。

# TODO: Replace <FILL IN> with appropriate code

trainingAvgRating = trainingRDD.map(lambda x:x[2]).mean()
print 'The average rating for movies in the training set is %s' % trainingAvgRating testForAvgRDD = testRDD.map(lambda x: (x[0],x[1],trainingAvgRating))
testAvgRMSE = computeError(testRDD, testForAvgRDD)
print 'The RMSE on the average set is %s' % testAvgRMSE

Part 3 Predictions for Yourself

Your Movie Ratings

我们的最终目的是要推荐电影,假如我们要给user id为0的人推荐电影,下面随意构造这个用户10部电影评分。

# TODO: Replace <FILL IN> with appropriate code
myUserID = 0 # Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
myRatedMovies = [
(0,1,4),(0,2,3),(0,3,2),(0,4,5),(0,5,3),(0,6,2),(0,7,2),(0,8,3),(0,9,3),(0,10,3)
# The format of each line is (myUserID, movie ID, your rating)
# For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
# (myUserID, 260, 5),
]
myRatingsRDD = sc.parallelize(myRatedMovies)
print 'My movie ratings: %s' % myRatingsRDD.take(10)

Add Your Movies to Training Dataset

现在我们把这个用户的数据和训练集合并。

# TODO: Replace <FILL IN> with appropriate code
trainingWithMyRatingsRDD = myRatingsRDD.union(trainingRDD) print ('The training dataset now has %s more entries than the original training dataset' %
(trainingWithMyRatingsRDD.count() - trainingRDD.count()))
assert (trainingWithMyRatingsRDD.count() - trainingRDD.count()) == myRatingsRDD.count()

Train a Model with Your Ratings

# TODO: Replace <FILL IN> with appropriate code
myRatingsModel = ALS.train(trainingWithMyRatingsRDD, bestRank, seed=seed,iterations=iterations,lambda_=regularizationParameter)

Check RMSE for the New Model with Your Ratings

# TODO: Replace <FILL IN> with appropriate code
predictedTestMyRatingsRDD = myRatingsModel.predictAll(testForPredictingRDD)
testRMSEMyRatings = computeError(testRDD,predictedTestMyRatingsRDD)
print 'The model had a RMSE on the test set of %s' % testRMSEMyRatings

Predict Your Ratings

# TODO: Replace <FILL IN> with appropriate code

# Use the Python list myRatedMovies to transform the moviesRDD into an RDD with entries that are pairs of the form (myUserID, Movie ID) and that does not contain any movies that you have rated.
myUnratedMoviesRDD = moviesRDD.map(lambda (x,y):(myUserID,x)).filter(lambda x:x[1] not in [i[1] for i in myRatedMovies]) # Use the input RDD, myUnratedMoviesRDD, with myRatingsModel.predictAll() to predict your ratings for the movies
predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)

Predict Your Ratings

# TODO: Replace <FILL IN> with appropriate code

# Transform movieIDsWithAvgRatingsRDD from part (1b), which has the form (MovieID, (number of ratings, average rating)), into and RDD of the form (MovieID, number of ratings)
movieCountsRDD = movieIDsWithAvgRatingsRDD.map(lambda x:(x[0],x[1][0])) # Transform predictedRatingsRDD into an RDD with entries that are pairs of the form (Movie ID, Predicted Rating)
predictedRDD = predictedRatingsRDD.map(lambda x:(x[1],x[2])) # Use RDD transformations with predictedRDD and movieCountsRDD to yield an RDD with tuples of the form (Movie ID, (Predicted Rating, number of ratings))
predictedWithCountsRDD = (predictedRDD
.join(movieCountsRDD)) # Use RDD transformations with PredictedWithCountsRDD and moviesRDD to yield an RDD with tuples of the form (Predicted Rating, Movie Name, number of ratings), for movies with more than 75 ratings
ratingsWithNamesRDD = (predictedWithCountsRDD
.join(moviesRDD).map(lambda x:(x[1][0][0],x[1][1],x[1][0][1])).filter(lambda x: x[2]>75)) predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(20, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
'\n'.join(map(str, predictedHighestRatedMovies)))
05-06 23:55