mahout 0.5 基于 hadoop 的 CF 代码分析

mahout的taste框架是协同过滤算法的实现。它支持DataModel,如文件、数据库、NoSQL存储等,也支持hadoop的MapReduce。这里主要分析mahout0.5中的基于MR的实现。

基于MR的CF实现主要流程就在org.apache.mahout.cf.taste.hadoop.item.RecommenderJob类中(注意mahout有2个RecommendJob,要看清楚是哪一个包)。这 个类的run方法就包含了所有的步骤。从上到下,完整的其实有10步(中间计算item相似度其实拆分成了3个job,我们也当做是一个phase吧)。 也就是说,如果指定了所有的必要参数,运行一次item-based CF算法,会执行12个JOB,当然有的步骤是可以忽略的,下面会讲。以下就是详细的每一步骤的分析:

phase1: itemIDIndex

这步主要是将itemId转成一个int。这里设计上其实有点小问题,如果item的数量非常多,比如超过int的最大值,那就有可能会出现重合了。所以用long其实更为合适。而mahout之所以要做这一步看似无谓的转换,主要是因为它内部的许多变量如Vector之类的都是基于int的...

   input:用户评分文件(这也是我们最原始的输入了),格式一般为:userId \t itemId \t score。注意输入必须是textfile的。可能是为了方便测试吧,mahout的很多包默认输入都是textfile格式的。

   map:(index, itemId)

   reduce: (index, itemId)

index为long类型的itemId对应的int值。

phase2: toUserVector

   input:用户评分文件

   param: --userBooleanData如果这个参数为true,则会忽略评分列,对于没有显式的评分的如购买数据,有时需要指这定这个值。

   map: (userId, itemId, pref)

   reduce: 以用户为key,输出成向量形式==> (userId, VectorWritable<itemId, pref>)。注意这里的ItemId是转成int的。

 phase3: countUser,计算用户数

   map: (userId)

   reduce: 输出总用户数count

 phase4: maybePruneAndTranspose

   input: phase2的输出:userVector,即用户对宝贝的评分向量

   param: --maxCooccurrences

   map: (userId,Vector<itemId, pref>) ==>(itemId,DistributedRowMatrix<userId,pref>),注意如果指定了—maxCooccurrences参数,这里会有裁剪,每个userId最多对maxCooccurrencesitemId打分。 这里的DistributedRowMatrix,分布式行矩阵:行:itemId,列:userId

   reduce: (itemId, VectorWritable<userId,pref>)

这一步的结果,相当于对phase2做了个矩阵的转置,并且做了裁剪(如果需要裁剪的话)。

 phase5: RowSimilarityJob

这一步比较关键,计算item相似度,它拆分成了三个JOB

   param:--numberOfColumns(用户数,phase3有计算), --similarityClassname(相似度算法),--maxSimilaritiesPerRow(计算item相似度时最多的用户数,默认:100)

    job1:weight

      input:phase4的输出

       map: (itemId, VectorWritable <userId, pref>) ==>(userId, WeightedOccurrence<itemId, pref, weight>)

       这里的weight,对于欧氏向量距离,或者Pearson距离等,均为Double.NaN,即无效;所以实际上如果只使用这些距离度量,这一步是可以忽略的(mahout没有忽略 -_-|| )。在LoglikelihoodVectorSimilarity中有用到weight的值。

       reduce:(userId, WeightedOccurrenceArray<itemId, pref, weight>)

看起来又是比较多余的一步,用了phase4的结果,最终输出的却是类似phase2的结果(如果weight不起作用的话)。唯一有用的是,phase4的结果做了裁剪了...

    job2:pairwise similarity *item相似度计算*

      map: 输入为job1的最终结果,对同一用户的所有item-rating,输出两两item之间的关系 ==>(WeightedRowPair<itemA, itemB, weightA, weightB>, coocurrence<userId,valueA, valueB>) 。同上,这里的权重weightA,B对于欧氏距离等可以忽略,如果忽略了权重,那么实际输出就是:(<itemA, itemB>, <userId, prefA, prefB>) 。

       reduce: 在这端,以<itemA,itemB>为key聚合了来自不同map的所有用户的打分,最后输出itemA和B的对称相似度(即以itemA为key或以itemB为key)==>(SimilarityMatrixEntryKey<itemA,similarity>, MatrixEntryWritable<WeightedRowPair<itemA, itemB,weightA, weightB>>) ,(SimilarityMatrixEntryKey<itemB,similarity>, MatrixEntryWritable<WeightedRowPair<itemB, itemA,weightB, weightA>>)

这里补充一下相似度的计算,以item-based的Pearson相似度为例,公式如下:

计算两个不同的item i和j的相似度,需要知道item i和j的平均评分,以及所有用户分别对i和j的评分。由于在reduce端已经聚合了所有用户对i和j的评分,只需要应用公式,求出平均评分,就可以计算出这两个item的相似度。

    job3:entries2vectors *汇总item的相似items*

      param: --maxSimilaritiesPerRow

      map: 输入为job2的item间相似度结果:(itemA, itemB, similarity) & (itemB,itemA, similarity)。 这里在group的时候按相似度降序做了排序,如果有--maxSimilaritiesPerRow参数,则会做裁剪。

      reduce: (itemA, VectorWritable <item,similarity>)

结果输出了与itemA相似的items。

至此,item相似度计算完毕,接下来的步骤是开始预测评分了。

 phase6: prePartialMultiply1 

   input: phase5的最后输出(即item相似度)

   map: 直接输出item对应的相似items,这里用VectorOrPrefWritable做了封装,表明有可能是相似度向量,也有可能是对item的打分,并且对item为自己的,将相似度设为Double.NaN,以过滤自身。这一步输出结果为(itemId,VectorOrPrefWritable<item, similarity>)

    reduce: IdentityReducer

 phase7: prePartialMultiply2

   input: phase2的输出userVectors

   map: 输出:(itemId, VectorOrPrefWritable<userId, pref>)

   这里默认考虑用户对10个item的评分,可以通过maxPrefsPerUserConsidered参数调整。

   如果指定了usersFile,则在setup时会把所有的userId读入内存,用于过滤。如果map输入数据的userID不在usersFile中,则会被忽略。注意,这是mahout的设计bug,对于比较大的数据集,很有可能造成OOM(事实上在我的测试程序中已经出现OOM了…),这种bug下面还会出现。输出的是用户的评分,同phase6的VectorOrPrefWritable的封装。

   reduce: IdentityReducer

 phase8: partialMultiply

     input: phase6和7的输出:prePartialMultiply1, prePartialMultiply2

     map: Identity。由于6和7的输出的key均为itemId,因而在reduce端同一item的相似item以及对应的用户评分会聚合到一起。

     reduce:(itemId, VectorAndPrefsWritable<similarityMatrix, List<userId>,List<pref>>) 没做特殊处理,直接合在一起,输出相似度矩阵、所有的userId及对item的打分。

 phase9: itemFiltering 

    将过滤文件输出成<userId, itemId>。如果指定了--filterFile参数,则在最后的聚合推荐中会过滤userId对应的items。这一步在实际中多数是可以忽略的,只要不指定这个参数即可。

 phase10: aggregateAndRecommend

回忆一下item-based的推荐,本质上就是计算item相似度,然后将与用户已评分的items最相似的未评分item推荐给用户(有点绕...)。通俗地来说就是,用户对item A已经有过评分,那么就会将与A最相似的item:B,C,D中,用户未曾评分的推荐给他。

    map: 对每个用户,输出对当前item的评分,以及与当前item的所有相似items==>(userId, PrefAndSimilarityColumnWritable<pref,vector<item, similarity>>)

   reduce: 聚合了这个用户所有的评分历史,以及相似items,计算对该用户的推荐结果 ==> (userId, List<itemId>)。

注意在reduce的setup中,会将phase1产生的所有itemId到index的映射读入内存,这里只要Item数据集稍大,就会OOM。这是比较严重的设计bug

事实上,如果item是正规的整数,而不是guid之类的,phase1和这一步的读入内存是完全可以略掉的。这样的话就完全可以在企业级的数据集上使用(我的测试数据集是15亿+的user-item-rating,1.5亿+的用户,在最后这一步挂掉了,前面所有phase都能跑成功)。

至此,已经形成了推荐结果,CF完成。

 以上的所有步骤中,phase5的计算item相似度是最慢的,会产生大量的中间结果(这个其实也很直觉,O(N^2)的复杂度...)。

总体上,mahout有几处比较严重的设计问题,这么设计的主要原因,是很多内部的Writable,如VectorWritable,它们的Index都是int型的,所以如果要改,就会牵一发而动全身。

对于超大规模数据集,建议还是裁剪一下吧,distinct item过亿一般是会挂的,最好能够控制在千万的级别内。

Para obter mais informações sobre otimizações de compiladores, consulte Aviso sobre otimizações.