spark学习7-mllib als推荐引擎学习
继续上一篇学习spark本次将介绍下如何利用mllib进行商品或者用户的推荐,具体用到的算法是ALS(交替二乘法)推荐算法介绍推荐算法可以分为:UserCF(基于用户的协同过滤)、ItemCF(基于商品的协同过滤)、ModelCF(基于模型的协同过滤),具体介绍见:http://blog.csdn.net/ygrx/article/details/15501679spark中的协同
·
继续上一篇学习spark
本次将介绍下如何利用mllib进行商品或者用户的推荐,具体用到的算法是ALS(交替二乘法)
推荐算法介绍
推荐算法可以分为:UserCF(基于用户的协同过滤)、ItemCF(基于商品的协同过滤)、ModelCF(基于模型的协同过滤),具体介绍见: http://blog.csdn.net/ygrx/article/details/15501679
spark中的协同过滤算法用的是ALS算法,叫做矩阵分解,其实就是补充二维矩阵中缺失的打分,具体见: http://www.dataguru.cn/article-7049-1.html
Spark ALS推荐实例
我们接下去通过代码来描述下利用spark的mllib如何进行商户或者用户的推荐,假设我们的数据如下:
3563904759267333 473136262280108 1
3563904759484437 836075662280961 1
3563904759746579 102136262280378 1
3563904759484437 836075662280961 1
3563904759746579 102136262280378 1
字段1是用户id,第二个字段是商户id,第三个就是打分
val conf = new SparkConf()
.setAppName("Mid Recommend")
.set("spark.executor.extraClassPath", "/opt/cloudera/parcels/CDH/lib/hbase/lib/*")
val sc = new SparkContext(conf)
这段代码描述了构建SparkContext,同时加入了hbase需要用的jar包,因为我们的数据需要存入hbase
val trans = sc.textFile("/xxx/recommend/card_consume_info_sh/card_consume_info_sh")
var count = 0
val cardMapping = trans.map { line =>
val field = line.split("\t")
field(0)
}.distinct(1).map { line =>
count += 1
(count.toString, line)
}
count = 0
val midMapping = trans.map { line =>
val field = line.split("\t")
field(1)
}.distinct(1).map { line =>
count += 1
(line, count.toString)
}
saveToHbase(midMapping.map(convert), "mid_mapping")
saveToHbase(cardMapping.map(convert), "card_mapping")
cardMapping.map { line =>
line._1 + "\t" + line._2
}.saveAsTextFile("/<span style="font-family: Arial, Helvetica, sans-serif;">xxx</span><span style="font-family: Arial, Helvetica, sans-serif;">/recommend/mapping/card_mapping")</span>
midMapping.map { line =>
line._1 + "\t" + line._2
}.saveAsTextFile("/xxx/recommend/mapping/mid_mapping")
val trans = sc.textFile("/xxx/recommend/card_consume_info_sh/card_consume_info_sh")
val cardMapping = sc.textFile("/xxx/recommend/mapping/card_mapping").map { line =>
val field = line.split("\t")
(field(1), field(0))
}
val midMapping = sc.textFile("/xxx/recommend/mapping/mid_mapping").map { line =>
val field = line.split("\t")
(field(0), field(1))
}
// 进行join
val rs = trans.map { line =>
val field = line.split("\t")
(field(0), field(1) + "_" + field(2))
}.join(cardMapping).map { line =>
val field = line._2._1.split("_")
(field(0), line._1 + "_" + line._2._2 + "_" + field(1))
}.join(midMapping).map { line =>
val field = line._2._1.split("_")
field(1) + "\t" + line._2._2 + "\t" + field(2)
}
rs.saveAsTextFile("/xxx/recommend/card_consume_info_sh/card_consume_info_sh_mapping")
由于mllib的Rating类的user和product只能是int,而我们的user和product是String类型,所以需要做一个转换,上述代码将原始数据进行了int和String的mapping,最后我们的数据格式为:2418938 4676 1,第一个表示用户id,第二个表示商品id,第三个字段表示打分
val trans = sc.textFile("/xxx/recommend/card_consume_info_sh/card_consume_info_sh_mapping")
val ratings = trans.map { line =>
val Array(cardMapping, midMapping, rating) = line.split("\t")
Rating(cardMapping.toInt, midMapping.toInt, rating.toDouble)
}.persist
// 使用ALS训练数据建立推荐模型
val rank = 8
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)
// 从rating中获取user以及product数据集
val usersProducts = ratings.map {
case Rating(user, product, rate) => (user, product)
}
// 使用推荐模型预对用户和商品进行评分,得到预测评分的数据集
val predictions = model.predict(usersProducts).map {
case Rating(user, product, rate) => ((user, product), rate)
}
// 真实数据和预测数据进行合并
val ratesAndPredicts = ratings.map {
case Rating(user, product, rate) => ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPredicts.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
这段代码我们通过ALS进行模型的训练,主要有3个参数,具体见 http://blog.javachen.com/2015/06/01/how-to-implement-collaborative-filtering-using-spark-als.html
这段代码我们只试了3个参数中的一个情况,然后计算MSE值,这个值越小,说明该model越接近正确值,具体模型的评估同样可以见上个博客 http://blog.javachen.com/2015/06/01/how-to-implement-collaborative-filtering-using-spark-als.html
其实我们这里可以用3个参数多维度的数据进行循环训练模型,计算出MSE值最小的那个model就可以了~
// 为每个商户进行推荐
val products = ratings.map(_.product).distinct(10).collect()
val rawData = new mutable.HashMap[String, String]()
products.foreach(
product => {
// 依次为商品推荐用户
val rs = model.recommendUsers(product, 10)
var value = ""
var key = 0
// 拼接推荐结果
rs.foreach(r => {
key = r.product
value = value + r.user + ":" + r.rating + ","
})
rawData.put(key.toString, value.substring(0, value.length - 1))
}
)
// 存进hbase
val rs = sc.parallelize(rawData.toSeq).map(convert)
saveToHbase(rs, "mid_recommend")
这段代码是为每个商户进行离线推荐,不建议数据量很大,要不然撑爆内存,高版本提供了离线全部推荐的api,只是我的版本是CDH5.4.4,Spark还是1.3.0的版本,所以不支持这个api
def saveModel(sc: SparkContext, model: MatrixFactorizationModel): Unit = {
model.save(sc, "/xxx/recommend/model")
}
def loadModel(sc: SparkContext): MatrixFactorizationModel = {
return MatrixFactorizationModel.load(sc, "/xxx/recommend/model")
}
如果我们需要实时推荐,那么就需要将训练好的模型保存下来,然后需要用的时候再load下,然后用recommendUsers进行推荐。
实时推荐引擎的思考
上段最后提到了实时推荐引擎,有如下的思路,仅供参考:
推荐最重要的就是从初始数据中用算法进行打分,这个根据业务的不同设计的方法不同,然后根据打好分的数据进行模型的训练,训练完了保存下来,需要进行推荐的时候再load过来进行实时推荐。
主要流程:原始数据经过ETL处理生成结构化数据然后用算法进行打分,这块耗时比较长,每天可以凌晨进行ETL抽取;然后根据打分好的数据进行模型的训练,这段时间在ETL之后,需要时间也不短;最后保存模型;加载模型进行实时推荐
参考资料:http://wuchong.me/blog/2015/04/06/spark-on-hbase-new-api/
http://blog.javachen.com/2015/06/01/how-to-implement-collaborative-filtering-using-spark-als.html
更多推荐
所有评论(0)