使用Spark DataFrame实现基于物品的协同过滤算法(ItemCF)

使用Spark DataFrame实现基于物品的协同过滤算法(ItemCF)

简介

当前spark支持的协同过滤算法只有ALS(基于模型的协同过滤算法),但ALS算法对于某些特定的问题,效果并不理想,不像mahout提供了各种推荐算法。为了享受到spark在速度上带来的提升同时为满足一些业务需求,遂使用spark构建ItemCF算法。同时spark提供了新的DataFrame数据类型,使算法开发更加清晰和易于实现,

前提

常用相似度计算公式

协同过滤算法中最重要的部分是要计算物品间的相似度,对于不同的场景,可以应用不同的相似度计算公式来计算相似度,常用的相似度计算公式如下所示:

同现相似度(Co Occurrence)

同现相似度公式

$$ w(x,y)=\frac{|N(x)\cap{N(y)}|}{|N(x)|} $$

公式中分母是喜欢物品x的用户数,而分子则是同时对物品x和物品y感兴趣的用户数。因此,上述公式可用理解为对物品x感兴趣的用户有多大概率也对y感兴趣 (和关联规则类似)

但上述的公式存在一个问题,如果物品y是热门物品,有很多人都喜欢,则会导致W(x, y)很大,接近于1。因此会造成任何物品都和热门物品交有很大的相似度。为此我们用如下公式进行修正:

改进的同现相似度公式

$$ w(x,y)=\frac{|N(x)\cap{N(y)}|}{\sqrt{|N(x)||N(y)|}} $$

这个格式惩罚了物品y的权重,因此减轻了热门物品和很多物品相似的可能性。(也归一化了)

欧几里得相似度(Eucledian Similarity)

欧几里得相似度根据欧几里得距离计算而来,距离越近相似度越高,反之相反。

欧几里得距离定义

在数学中,欧几里得距离或欧几里得度量是欧几里得空间中两点间“普通”(即直线)距离。使用这个距离,欧氏空间成为度量空间。相关联的范数称为欧几里得范数。较早的文献称之为毕达哥拉斯度量。

欧几里得距离公式

$$ \ d_{X,Y}=\sqrt{ \sum_{i=1}^n(x_i-y_i)^2} $$

皮尔逊相似度

皮尔逊相关系数,即概率论中的相关系数,取值范围[-1,+1]。当大于零时,两个变量正相关,当小于零时表示两个向量负相关。

皮尔逊积矩相关系数定义

两个变量之间的皮尔逊相关系数定义为两个变量之间的协方差和标准差的商:

皮尔逊积矩相关系数公式

$$ \rho_{X,Y}=\frac{cov(X,Y)}{\sigma_{x}\sigma_{y}}=\frac{E((X-\mu_x)(Y-\mu_y))}{\sigma_{x}\sigma_{y}}=\frac{E(XY)-E(X)E(Y)}{\sqrt{E(X^2)-E^2(X)}\sqrt{E(Y^2)-E^2(Y)}} $$

余弦相似度(Cosine Similarity)

利用多维空间两点与所设定的点形成夹角的余弦值范围为[-1,1],值越大,说明夹角越大,两点相距就越远,相似度就越小。

向量间余弦定义

多维空间两点与所设定的点形成夹角的余弦值

余弦计算公式

$$ sim_{X,Y}=\frac{XY}{||X||||Y||}=\frac{ \sum_{i=1}^n(x_iy_i)}{\sqrt{\sum_{i=1}^n(x_i)^2}*\sqrt{\sum_{i=1}^n(y_i)^2}} $$

公式中$ x_i $表示第i个用户对物品x的评分,$ y_i $同理。
该公式只考虑到了用户的评分,很可能评分较高的物品会排在前面而不管物品的其它信息,改进版的余弦相似度计算公式如下:

改进的余弦相似度计算公式

$$ sim_{X,Y}=\frac{XYnum_{X\cap{Y}}}{||X||||Y||num_{X}log10(10+num_{Y})} $$

改进公式考虑到了两个向量相同个体个数、X向量大小、Y向量大小,注意:
$$ \ sim_{X,Y}\neq sim_{Y,X} $$

Tanimoto 相似度(Jaccard 系数)

Tanimoto相似度也称为Jaccard系数,是Cosine相似度扩展,多用于文档相似度就算。此相似度不考虑评价值,只考虑两个集合共同个体数量。

Jaccard 系数公式

$$ sim(x,y)=\frac{X\cap{Y}}{||X||+||Y||-||X\cap{Y}||} $$

预测用户评分公式

$$ pred_{u,p}=\frac{\sum_{i\in{ratedItems(u)}}{sim(i,p)r_{u,i}}}{\sum_{i\in{ratedItems(u)}}{sim(i,p)}} $$

公式中u指用户,p值物品,ratedItems(u)指用户u评价过的物品,sim指相似度(item之间的),r指用户对物品评分。

构建ItemCFModel

类定义

1
2
3
4
5
6
7
8
//  物品信息
case class Item(itemId: Int, itemName: String)

// 用户-物品-评分
case class Rating(userId: Int, itemId: Int, rating: Float)

// 用户信息
case class User(userId: Int, userName: String)

相似度度量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* SIMILARITY MEASURES
*/
object SimilarityMeasures {


/**
* The Co-occurrence similarity between two vectors A, B is
* |N(i) ∩ N(j)| / sqrt(|N(i)||N(j)|)
*/
def cooccurrence(numOfRatersForAAndB: Long, numOfRatersForA: Long, numOfRatersForB: Long): Double = {
numOfRatersForAAndB / math.sqrt(numOfRatersForA * numOfRatersForB)
}

/**
* The correlation between two vectors A, B is
* cov(A, B) / (stdDev(A) * stdDev(B))
*
* This is equivalent to
* [n * dotProduct(A, B) - sum(A) * sum(B)] /
* sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
*/
def correlation(size: Double, dotProduct: Double, ratingSum: Double,
rating2Sum: Double, ratingNormSq: Double, rating2NormSq: Double): Double = {

val numerator = size * dotProduct - ratingSum * rating2Sum
val denominator = scala.math.sqrt(size * ratingNormSq - ratingSum * ratingSum) *
scala.math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum)

numerator / denominator
}

/**
* Regularize correlation by adding virtual pseudocounts over a prior:
* RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
* where w = # actualPairs / (# actualPairs + # virtualPairs).
*/
def regularizedCorrelation(size: Double, dotProduct: Double, ratingSum: Double,
rating2Sum: Double, ratingNormSq: Double, rating2NormSq: Double,
virtualCount: Double, priorCorrelation: Double): Double = {

val unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
val w = size / (size + virtualCount)

w * unregularizedCorrelation + (1 - w) * priorCorrelation
}

/**
* The cosine similarity between two vectors A, B is
* dotProduct(A, B) / (norm(A) * norm(B))
*/
def cosineSimilarity(dotProduct: Double, ratingNorm: Double, rating2Norm: Double): Double = {
dotProduct / (ratingNorm * rating2Norm)
}

/**
* The improved cosine similarity between two vectors A, B is
* dotProduct(A, B) * num(A ∩ B) / (norm(A) * norm(B) * num(A) * log10(10 + num(B)))
*/
def improvedCosineSimilarity(dotProduct: Double, ratingNorm: Double, rating2Norm: Double
, numAjoinB: Long, numA: Long, numB: Long): Double = {
dotProduct * numAjoinB / (ratingNorm * rating2Norm * numA * math.log10(10 + numB))
}

/**
* The Jaccard Similarity between two sets A, B is
* |Intersection(A, B)| / |Union(A, B)|
*/
def jaccardSimilarity(usersInCommon: Double, totalUsers1: Double, totalUsers2: Double): Double = {
val union = totalUsers1 + totalUsers2 - usersInCommon
usersInCommon / union
}
}

计算物品相似度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def fit(ratings: Dataset[Rating]): ItemCFModel = {
this.ratings = Option(ratings)
val numRatersPerItem = ratings.groupBy("itemId").count().alias("nor")
.coalesce(defaultParallelism)

// 在原记录基础上加上item的打分者的数量
val ratingsWithSize = ratings.join(numRatersPerItem, "itemId")
.coalesce(defaultParallelism)

// 执行内联操作
ratingsWithSize.join(ratingsWithSize, "userId")
.toDF("userId", "item1", "rating1", "nor1", "item2", "rating2", "nor2")
.selectExpr("userId"
, "item1", "rating1", "nor1"
, "item2", "rating2", "nor2"
, "rating1 * rating2 as product"
, "pow(rating1, 2) as rating1Pow"
, "pow(rating2, 2) as rating2Pow")
.coalesce(defaultParallelism)
.createOrReplaceTempView("joined")


// 计算必要的中间数据,注意此处有WHERE限定,只计算了一半的数据量
val sparseMatrix = spark.sql(
"""
|SELECT item1
|, item2
|, count(userId) as size
|, sum(product) as dotProduct
|, sum(rating1) as ratingSum1
|, sum(rating2) as ratingSum2
|, sum(rating1Pow) as ratingSumOfSq1
|, sum(rating2Pow) as ratingSumOfSq2
|, first(nor1) as nor1
|, first(nor2) as nor2
|FROM joined
|WHERE item1 < item2
|GROUP BY item1, item2
""".stripMargin)
.coalesce(defaultParallelism)
.cache()

// 计算物品相似度
var sim = sparseMatrix.map(row => {
val size = row.getAs[Long](2)
val dotProduct = row.getAs[Double](3)
val ratingSum1 = row.getAs[Double](4)
val ratingSum2 = row.getAs[Double](5)
val ratingSumOfSq1 = row.getAs[Double](6)
val ratingSumOfSq2 = row.getAs[Double](7)
val numRaters1 = row.getAs[Long](8)
val numRaters2 = row.getAs[Long](9)

val cooc = cooccurrence(size, numRaters1, numRaters2)
val corr = correlation(size, dotProduct, ratingSum1, ratingSum2, ratingSumOfSq1, ratingSumOfSq2)
val regCorr = regularizedCorrelation(size, dotProduct, ratingSum1, ratingSum2,
ratingSumOfSq1, ratingSumOfSq2, PRIOR_COUNT, PRIOR_CORRELATION)
val cosSim = cosineSimilarity(dotProduct, scala.math.sqrt(ratingSumOfSq1), scala.math.sqrt(ratingSumOfSq2))
val impCosSim = improvedCosineSimilarity(dotProduct, math.sqrt(ratingSumOfSq1), math.sqrt(ratingSum2), size, numRaters1, numRaters2)
val jaccard = jaccardSimilarity(size, numRaters1, numRaters2)
(row.getInt(0), row.getInt(1), cooc, corr, regCorr, cosSim, impCosSim, jaccard)
}).toDF("itemId_01", "itemId_02", "cooc", "corr", "regCorr", "cosSim", "impCosSim", "jaccard")

// 最终的物品相似度
sim.withColumnRenamed("itemId_01", "itemId_02")
.withColumnRenamed("itemId_02", "itemId_01")
.union(sim)
.repartition(defaultParallelism) // 重新分区,以便数据均匀分布,方便下游用户使用
.cache()
similarities = Option(sim)
this
}

用户推荐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 为指定的用户推荐num个物品
*
* @param users 用户集
* @param num 为每个用户推荐的物品数量
* @return 推荐表
*/
def recommendForUsers(users: Dataset[User], num: Int): DataFrame = {
// similarityMeasure为相似度算法名
var sim = similarities.get.select("itemId_01", "itemId_02", similarityMeasure)
// 获得评分表
val rits = ratings.get

val project: DataFrame = users
.selectExpr("userId as user", "userName")
// 进行子投影,此处左表数量远小于右表,执行左连接
.join(rits, $"user" <=> rits("userId"), "left")
.drop($"user")
// 选择与用户相关的物品以及评分
.select("userId", "itemId", "rating")

// 获得用户感兴趣的物品与其它物品的相似度
project.join(sim, $"itemId" <=> sim("itemId_01"))
.selectExpr("userId"
, "itemId_01 as relatedItem"
, "itemId_02 as otherItem"
, similarityMeasure
, s"$similarityMeasure * rating as simProduct")
.coalesce(defaultParallelism)
.createOrReplaceTempView("tempTable")

spark.sql(
s"""
|SELECT userId
|, otherItem
|, sum(simProduct) / sum($similarityMeasure) as rating
|FROM tempTable
|GROUP BY userId, otherItem
|ORDER BY userId asc, rating desc
""".stripMargin)
// 过滤结果
.rdd
.map(row => (row.getInt(0), (row.getInt(1), row.getDouble(2))))
.groupByKey()
.mapValues(xs => {
var sequence = Seq[(Int, Double)]()
val iter = xs.iterator
var count = 0
while (iter.hasNext && count < num) {
val rat = iter.next()
if (rat._2 != Double.NaN)
sequence :+= (rat._1, rat._2)
count += 1
}
sequence
})
.toDF("userId", "recommended")
}

相似度计算结果展示

数据来源

数据来自MovieLens,MovieLens数据集是一个关于电影评分的数据集,里面包含了从IMDB, The Movie DataBase上面得到的用户对电影的评分信息。

计算出的物品间相似度

以下展示了使用同现相似度,余弦相似度以及改进版进行相似度计算后(其它相似度请自行测试)的电影间的相似度,并以《星球大战(1977)》进行测试的结果(只显示了前20个结果)。

令人惊讶的是余弦相似度的结果似乎不太令人满意,这似乎是因为余弦相似度只和用户评分有关(更适用于推荐5星电影,不关心电影的类型等),也可能是我的算法出现了差错,欢迎指正。

同现相似度结果展示

movie1 movie2 coocurrence
星球大战(1977) 绝地归来(1983) 0.8828826458931883
星球大战(1977) 迷失方舟攻略(1981) 0.7679353753201742
星球大战(1977) 帝国反击,(1980) 0.7458505006229118
星球大战(1977) 教父,The(1972) 0.7275434127191666
星球大战(1977) 法戈(1996) 0.7239858668831711
星球大战(1977) 独立日(ID4)(1996) 0.723845113716724
星球大战(1977) 沉默的羔羊,The(1991) 0.7025515983155468
星球大战(1977) 印第安纳琼斯和最后的十字军东征(1989) 0.6920306174608959
星球大战(1977) 低俗小说(1994) 0.6885437675802282
星球大战(1977) 星际迷航:第一次接触(1996) 0.6850249237265413
星球大战(1977) 回到未来(1985) 0.6840536741086217
星球大战(1977) 逃亡者,The(1993) 0.6710463728397225
星球大战(1977) 摇滚,The(1996) 0.6646215466055597
星球大战(1977) 终结者,The(1984) 0.6636319257721421
星球大战(1977) 阿甘正传(1994) 0.6564951869930893
星球大战(1977) 终结者2:审判日(1991) 0.653467518885383
星球大战(1977) Princess Bride,The(1987) 0.6534487891771482
星球大战(1977) 异形(1979) 0.648232034779792
星球大战(1977) E.T。外星(1982) 0.6479990753086882
星球大战(1977) 巨蟒和圣杯(1974) 0.6476896799641126

余弦相似度结果展示

余弦相似度

movie1 movie2 cosSim
星球大战(1977) Infinity(1996) 1.0
星球大战(1977) Mostro,Il(1994) 1.0
星球大战(1977) Boys,Les(1997) 1.0
星球大战(1977) 陌生人,(1994) 1.0
星球大战(1977) 爱是一切(1996) 1.0
星球大战(1977) 巴黎是女人(1995) 1.0
星球大战(1977) 遇难者,A(1937) 1.0
星球大战(1977) 馅饼在天空(1995) 1.0
星球大战(1977) 世纪(1993) 1.0
星球大战(1977) 天使在我的肩膀(1946) 1.0
星球大战(1977) 这里来曲奇(1935) 1.0
星球大战(1977) 力量98(1995) 1.0
星球大战(1977) 滑稽女郎(1943) 1.0
星球大战(1977) 火山(1996) 1.0
星球大战(1977) 难忘的夏天(1994) 1.0
星球大战(1977) Innocents,The(1961) 1.0
星球大战(1977) Sleepover(1995) 1.0
星球大战(1977) 木星的妻子(1994) 1.0
星球大战(1977) 我的生活与时代与安东宁·阿托(En compagnie d’Antonin Artaud)(1993) 1.0
星球大战(1977) Bent(1997) 1.0

改进余弦相似度结果展示

改进余弦相似度

movie1 movie2 impCosSim
星球大战(1977) 绝地归来(1983) 0.6151374130038775
星球大战(1977) 失落方舟攻略(1981) 0.5139215764696529
星球大战(1977) 法戈(1996) 0.4978221397190352
星球大战(1977) 帝国反击,The(1980) 0.47719131109655355
星球大战(1977) 教父,The(1972) 0.4769568086870377
星球大战(1977) 沉默的羔羊,The(1991) 0.449096021012343
星球大战(1977) 独立日(ID4)(1996) 0.4334888029282058
星球大战(1977) 低俗小说(1994) 0.43054394420596026
星球大战(1977) 联系(1997) 0.4093441266211224
星球大战(1977) 印第安纳琼斯和最后的十字军东征(1989) 0.4080635382244593
星球大战(1977) 回到未来(1985) 0.4045977014813726
星球大战(1977) 星际迷航:第一次接触(1996) 0.40036290288050874
星球大战(1977) 逃亡者,The(1993) 0.3987919640908379
星球大战(1977) Princess Bride,The(1987) 0.39490206690864144
星球大战(1977) 摇滚,The(1996) 0.39100622194841916
星球大战(1977) 巨蟒与圣杯(1974) 0.3799595474408077
星球大战(1977) 终结者,The(1984) 0.37881311350029406
星球大战(1977) 阿甘正传(1994) 0.3755685058241706
星球大战(1977) 终结者2:审判日(1991) 0.37184317281514295
星球大战(1977) 杰瑞马奎尔(1996) 0.370478212770262
文章作者: manlier
文章链接: https://glassywing.github.io/2018/04/10/spark-itemcf/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 manlier的个人博客