diff --git a/core/src/main/python/synapse/ml/recommendation/SAR.py b/core/src/main/python/synapse/ml/recommendation/SAR.py new file mode 100644 index 0000000000..f8b3c42ebe --- /dev/null +++ b/core/src/main/python/synapse/ml/recommendation/SAR.py @@ -0,0 +1,29 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root for information. + +import sys + +if sys.version >= "3": + basestring = str + +from synapse.ml.core.schema.Utils import * +from synapse.ml.recommendation._SAR import _SAR + +@inherit_doc +class SAR(_SAR): + def __init__(self, **kwargs): + _SAR.__init__(self, **kwargs) + + def calculateUserItemAffinities(self, dataset): + if dataset.schema[self.getUserCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getUserCol(), dataset[self.getUserCol()].cast("int")) + if dataset.schema[self.getItemCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getItemCol(), dataset[self.getItemCol()].cast("int")) + return self._call_java("calculateUserItemAffinities", dataset) + + def calculateItemItemSimilarity(self, dataset): + if dataset.schema[self.getUserCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getUserCol(), dataset[self.getUserCol()].cast("int")) + if dataset.schema[self.getItemCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getItemCol(), dataset[self.getItemCol()].cast("int")) + return self._call_java("calculateItemItemSimilarity", dataset) diff --git a/core/src/main/python/synapse/ml/recommendation/SARModel.py b/core/src/main/python/synapse/ml/recommendation/SARModel.py index 03162cbcc2..d2e777bbc5 100644 --- a/core/src/main/python/synapse/ml/recommendation/SARModel.py +++ b/core/src/main/python/synapse/ml/recommendation/SARModel.py @@ -15,3 +15,8 @@ class SARModel(_SARModel): def recommendForAllUsers(self, numItems): return self._call_java("recommendForAllUsers", numItems) + + def recommendForUserSubset(self, dataset, numItems): + if dataset.schema[self.getUserCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getUserCol(), dataset[self.getUserCol()].cast("int")) + return self._call_java("recommendForUserSubset", dataset, numItems) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala index 6e9d0ace45..de32514492 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala @@ -1,6 +1,3 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in project root for information. - package com.microsoft.azure.synapse.ml.recommendation import breeze.linalg.{CSCMatrix => BSM} @@ -13,7 +10,7 @@ import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, I import org.apache.spark.mllib.linalg import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseMatrix} import org.apache.spark.sql.functions.{col, collect_list, sum, udf, _} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{DataFrame, Dataset} import java.text.SimpleDateFormat @@ -106,8 +103,22 @@ class SAR(override val uid: String) extends Estimator[SARModel] (0 to numItems.value).map(i => map.getOrElse(i, 0.0).toFloat).toArray }) - dataset - .withColumn(C.AffinityCol, (dataset.columns.contains(getTimeCol), dataset.columns.contains(getRatingCol)) match { + val userColType = dataset.schema(getUserCol).dataType + val itemColType = dataset.schema(getItemCol).dataType + + val castedDataset = (userColType, itemColType) match { + case (StringType, StringType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (StringType, _) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + case (_, StringType) => + dataset.withColumn(getItemCol, col(getItemCol).cast("int")) + case _ => dataset + } + + castedDataset + .withColumn(C.AffinityCol, (castedDataset.columns.contains(getTimeCol), castedDataset.columns.contains(getRatingCol)) match { case (true, true) => blendWeights(timeDecay(col(getTimeCol)), col(getRatingCol)) case (true, false) => timeDecay(col(getTimeCol)) case (false, true) => col(getRatingCol) @@ -197,7 +208,21 @@ class SAR(override val uid: String) extends Estimator[SARModel] }) }) - dataset + val userColType = dataset.schema(getUserCol).dataType + val itemColType = dataset.schema(getItemCol).dataType + + val castedDataset = (userColType, itemColType) match { + case (StringType, StringType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (StringType, _) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + case (_, StringType) => + dataset.withColumn(getItemCol, col(getItemCol).cast("int")) + case _ => dataset + } + + castedDataset .select(col(getItemCol), col(getUserCol)) .groupBy(getItemCol).agg(collect_list(getUserCol) as "collect_list") .withColumn(C.FeaturesCol, createItemFeaturesVector(col("collect_list"))) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala index 83a5de4022..818f9971da 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala @@ -1,6 +1,3 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in project root for information. - package com.microsoft.azure.synapse.ml.recommendation import com.microsoft.azure.synapse.ml.codegen.Wrappable diff --git a/core/src/test/python/synapsemltest/recommendation/test_ranking.py b/core/src/test/python/synapsemltest/recommendation/test_ranking.py index d2d439c374..c4b9aebff7 100644 --- a/core/src/test/python/synapsemltest/recommendation/test_ranking.py +++ b/core/src/test/python/synapsemltest/recommendation/test_ranking.py @@ -67,10 +67,52 @@ .cache() ) +ratings_with_strings = ( + spark.createDataFrame( + [ + ("user0", "item1", 4, 4), + ("user0", "item3", 1, 1), + ("user0", "item4", 5, 5), + ("user0", "item5", 3, 3), + ("user0", "item7", 3, 3), + ("user0", "item9", 3, 3), + ("user0", "item10", 3, 3), + ("user1", "item1", 4, 4), + ("user1", "item2", 5, 5), + ("user1", "item3", 1, 1), + ("user1", "item6", 4, 4), + ("user1", "item7", 5, 5), + ("user1", "item8", 1, 1), + ("user1", "item10", 3, 3), + ("user2", "item1", 4, 4), + ("user2", "item2", 1, 1), + ("user2", "item3", 1, 1), + ("user2", "item4", 5, 5), + ("user2", "item5", 3, 3), + ("user2", "item6", 4, 4), + ("user2", "item8", 1, 1), + ("user2", "item9", 5, 5), + ("user2", "item10", 3, 3), + ("user3", "item2", 5, 5), + ("user3", "item3", 1, 1), + ("user3", "item4", 5, 5), + ("user3", "item5", 3, 3), + ("user3", "item6", 4, 4), + ("user3", "item7", 5, 5), + ("user3", "item8", 1, 1), + ("user3", "item9", 5, 5), + ("user3", "item10", 3, 3), + ], + ["originalCustomerID", "newCategoryID", "rating", "notTime"], + ) + .coalesce(1) + .cache() +) + class RankingSpec(unittest.TestCase): @staticmethod - def adapter_evaluator(algo): + def adapter_evaluator(algo, data): recommendation_indexer = RecommendationIndexer( userInputCol=USER_ID, userOutputCol=USER_ID_INDEX, @@ -80,7 +122,7 @@ def adapter_evaluator(algo): adapter = RankingAdapter(mode="allUsers", k=5, recommender=algo) pipeline = Pipeline(stages=[recommendation_indexer, adapter]) - output = pipeline.fit(ratings).transform(ratings) + output = pipeline.fit(data).transform(data) print(str(output.take(1)) + "\n") metrics = ["ndcgAt", "fcp", "mrr"] @@ -91,13 +133,17 @@ def adapter_evaluator(algo): + str(RankingEvaluator(k=3, metricName=metric).evaluate(output)), ) - # def test_adapter_evaluator_als(self): - # als = ALS(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) - # self.adapter_evaluator(als) - # - # def test_adapter_evaluator_sar(self): - # sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) - # self.adapter_evaluator(sar) + def test_adapter_evaluator_als(self): + als = ALS(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(als, ratings) + + def test_adapter_evaluator_sar(self): + sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(sar, ratings) + + def test_adapter_evaluator_sar_with_strings(self): + sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(sar, ratings_with_strings) def test_all_tiny(self): customer_index = StringIndexer(inputCol=USER_ID, outputCol=USER_ID_INDEX) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala index 501652f68b..9fb1049a1c 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala @@ -1,247 +1,291 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in project root for information. - -package com.microsoft.azure.synapse.ml.recommendation - -import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject, TransformerFuzzing} -import org.apache.spark.ml.Pipeline -import org.apache.spark.ml.util.MLReadable -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions.{col, udf} - -import scala.language.existentials - -class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { - override def testObjects(): List[TestObject[SAR]] = { - List( - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol), transformedDf) - ) - } - - override def reader: SAR.type = SAR - - override val epsilon = .3 - - override def modelReader: SARModel.type = SARModel - - test("SAR") { - - val algo = sar - .setSupportThreshold(1) - .setSimilarityFunction("jacccard") - .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") - - val adapter: RankingAdapter = new RankingAdapter() - .setK(5) - .setRecommender(algo) - - val recopipeline = new Pipeline() - .setStages(Array(recommendationIndexer, adapter)) - .fit(ratings) - - val output = recopipeline.transform(ratings) - - val evaluator: RankingEvaluator = new RankingEvaluator() - .setK(5) - .setNItems(10) - - assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) - assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || - evaluator.setMetricName("fcp").evaluate(output) === 0.1) - assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) - - val users: DataFrame = spark - .createDataFrame(Seq(("0","0"),("1","1"))) - .toDF(userColIndex, itemColIndex) - - val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel - .asInstanceOf[SARModel].recommendForUserSubset(users, 10) - assert(recs.count == 2) - } - - lazy val testFile: String = getClass.getResource("/demoUsage.csv.gz").getPath - lazy val simCount1: String = getClass.getResource("/sim_count1.csv.gz").getPath - lazy val simLift1: String = getClass.getResource("/sim_lift1.csv.gz").getPath - lazy val simJac1: String = getClass.getResource("/sim_jac1.csv.gz").getPath - lazy val simCount3: String = getClass.getResource("/sim_count3.csv.gz").getPath - lazy val simLift3: String = getClass.getResource("/sim_lift3.csv.gz").getPath - lazy val simJac3: String = getClass.getResource("/sim_jac3.csv.gz").getPath - lazy val userAff: String = getClass.getResource("/user_aff.csv.gz").getPath - lazy val userpredCount3: String = getClass.getResource("/userpred_count3_userid_only.csv.gz").getPath - lazy val userpredLift3: String = getClass.getResource("/userpred_lift3_userid_only.csv.gz").getPath - lazy val userpredJac3: String = getClass.getResource("/userpred_jac3_userid_only.csv.gz").getPath - - private lazy val tlcSampleData: DataFrame = spark.read - .option("header", "true") //reading the headers - .option("inferSchema", "true") - .csv(testFile).na.drop.cache - - test("tlc test sim count1")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "cooc", simCount1, userAff)) - - test("tlc test sim lift1")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "lift", simLift1, userAff)) - - test("tlc test sim jac1")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "jaccard", simJac1, userAff)) - - test("tlc test sim count3")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "cooc", simCount3, userAff)) - - test("tlc test sim lift3")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "lift", simLift3, userAff)) - - test("tlc test sim jac3")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "jaccard", simJac3, userAff)) - - test("tlc test userpred count3 userid only")( - SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "cooc", simCount3, userAff, userpredCount3)) - - test("tlc test userpred lift3 userid only")( - SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "lift", simLift3, userAff, userpredLift3)) - - test("tlc test userpred jac3 userid only")( - SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "jaccard", simJac3, userAff, userpredJac3)) - -} - -class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] { - override def testObjects(): Seq[TestObject[SARModel]] = { - List( - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol) - .fit(transformedDf), transformedDf) - ) - } - - override def reader: MLReadable[_] = SARModel - -} - -object SarTLCSpec extends RankingTestBase { - //scalastyle:off field.name - override lazy val userCol = "userId" - override lazy val itemCol = "productId" - override lazy val ratingCol = "rating" - override lazy val userColIndex = "customerID" - override lazy val itemColIndex = "itemID" - //scalastyle:on field.name - - def testAffinityMatrices(tlcSampleData: DataFrame, - threshold: Int, - similarityFunction: String, - simFile: String, - user_aff: String): - (SARModel, RecommendationIndexerModel) = { - - val ratings = tlcSampleData - - val recommendationIndexerModel = recommendationIndexer.fit(ratings) - val transformedDf = recommendationIndexerModel.transform(ratings) - - val itemMap = recommendationIndexerModel.getItemIndex - - val model = sar - .setSupportThreshold(threshold) - .setSimilarityFunction(similarityFunction) - .setStartTime("2015/06/09T19:39:37") - .setStartTimeFormat("yyyy/MM/dd'T'h:mm:ss") - .fit(transformedDf) - - val simMap = model.getItemDataFrame.collect().map(row => { - val itemI = itemMap.getOrElse(row.getDouble(0).toInt, "-1") - val similarityVectorMap = row.getList(1).toArray.zipWithIndex.map(t => (itemMap.getOrElse(t._2, "-1"), t._1)) - .toMap - itemI -> similarityVectorMap - }).toMap - - val itemAff = spark.read.option("header", "true").csv(simFile) - itemAff.collect().foreach(row => { - val itemI = row.getString(0) - itemAff.drop("_c0").schema.fieldNames.foreach(itemJ => { - val groundTrueScore = row.getAs[String](itemJ).toFloat - val sparkSarScore = simMap.getOrElse(itemI, Map()).getOrElse(itemJ, "-1.0") - assert(groundTrueScore == sparkSarScore) - }) - }) - (model, recommendationIndexerModel) - } - - // scalastyle:off method.length - def testProductRecommendations(tlcSampleData: DataFrame, - threshold: Int, - similarityFunction: String, - simFile: String, - user_aff: String, - userPredFile: String): Unit = { - - val (model, recommendationIndexerModel) = testAffinityMatrices(tlcSampleData, threshold, similarityFunction, - simFile, - user_aff) - - val recoverUser = recommendationIndexerModel.recoverUser() - val recoverItem = recommendationIndexerModel.recoverItem() - - val usersProducts = tlcSampleData - .filter(col("userId") === "0003000098E85347") - .select("productId") - .distinct() - .collect() - .map(_.getString(0)) - - val usersProductsBC = spark.sparkContext.broadcast(usersProducts) - - val itemMapBC = spark.sparkContext.broadcast(recommendationIndexerModel.getItemIndex) - - val filterScore = udf((items: Seq[Int], ratings: Seq[Float]) => { - items.zipWithIndex - .filter(p => { - val itemId = itemMapBC.value.getOrElse[String](p._1, "-1") - val bol = usersProductsBC.value.contains(itemId) - !bol - }).map(p => (p._1, ratings.toList(p._2))) - }) - - val row = model.recommendForAllUsers(10 + usersProducts.length) - .select(col("customerID"), filterScore(col("recommendations.itemID"), col("recommendations.rating")) as - "recommendations") - .select(col("customerID"), col("recommendations._1") as "itemID", col("recommendations._2") as "rating") - .select( - recoverUser(col("customerID")) as "customerID", - recoverItem(col("itemID")(0)) as "rec1", - recoverItem(col("itemID")(1)) as "rec2", - recoverItem(col("itemID")(2)) as "rec3", - recoverItem(col("itemID")(3)) as "rec4", - recoverItem(col("itemID")(4)) as "rec5", - recoverItem(col("itemID")(5)) as "rec6", - recoverItem(col("itemID")(6)) as "rec7", - recoverItem(col("itemID")(7)) as "rec8", - recoverItem(col("itemID")(8)) as "rec9", - recoverItem(col("itemID")(9)) as "rec10", - col("rating")(0) as "score1", - col("rating")(1) as "score2", - col("rating")(2) as "score3", - col("rating")(3) as "score4", - col("rating")(4) as "score5", - col("rating")(5) as "score6", - col("rating")(6) as "score7", - col("rating")(7) as "score8", - col("rating")(8) as "score9", - col("rating")(9) as "score10") - .filter(col("customerID") === "0003000098E85347") - .take(1) - - val answer = spark.read.option("header", "true").csv(userPredFile).collect() - - assert(row(0).getString(0) == "0003000098E85347", "Assert Customer ID's Match") - (0 to 10).foreach(i => assert(row(0).getString(i) == answer(0).getString(i))) - (11 to 20).foreach(i => assert("%.3f".format(row(0).getFloat(i)) == "%.3f".format(answer(0).getString(i).toFloat))) - () - } - // scalastyle:on method.length -} + // Copyright (C) Microsoft Corporation. All rights reserved. + // Licensed under the MIT License. See LICENSE in the project root for information. + + package com.microsoft.azure.synapse.ml.recommendation + + import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject, TransformerFuzzing} + import org.apache.spark.ml.Pipeline + import org.apache.spark.ml.util.MLReadable + import org.apache.spark.sql.DataFrame + import org.apache.spark.sql.functions.{col, udf} + + import scala.language.existentials + + class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { + override def testObjects(): List[TestObject[SAR]] = { + List( + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol), transformedDf), + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol), transformedDfWithStrings) + ) + } + + override def reader: SAR.type = SAR + + override val epsilon = .3 + + override def modelReader: SARModel.type = SARModel + + test("SAR") { + + val algo = sar + .setSupportThreshold(1) + .setSimilarityFunction("jacccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(recommendationIndexer, adapter)) + .fit(ratings) + + val output = recopipeline.transform(ratings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) + assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || + evaluator.setMetricName("fcp").evaluate(output) === 0.1) + assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) + + val users: DataFrame = spark + .createDataFrame(Seq(("0","0"),("1","1"))) + .toDF(userColIndex, itemColIndex) + + val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 2) + } + + test("SAR with string userCol and itemCol") { + + val algo = sar + .setSupportThreshold(1) + .setSimilarityFunction("jacccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(recommendationIndexer, adapter)) + .fit(ratingsWithStrings) + + val output = recopipeline.transform(ratingsWithStrings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) + assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || + evaluator.setMetricName("fcp").evaluate(output) === 0.1) + assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) + + val users: DataFrame = spark + .createDataFrame(Seq(("user0","item0"),("user1","item1"))) + .toDF(userColIndex, itemColIndex) + + val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 2) + } + + lazy val testFile: String = getClass.getResource("/demoUsage.csv.gz").getPath + lazy val simCount1: String = getClass.getResource("/sim_count1.csv.gz").getPath + lazy val simLift1: String = getClass.getResource("/sim_lift1.csv.gz").getPath + lazy val simJac1: String = getClass.getResource("/sim_jac1.csv.gz").getPath + lazy val simCount3: String = getClass.getResource("/sim_count3.csv.gz").getPath + lazy val simLift3: String = getClass.getResource("/sim_lift3.csv.gz").getPath + lazy val simJac3: String = getClass.getResource("/sim_jac3.csv.gz").getPath + lazy val userAff: String = getClass.getResource("/user_aff.csv.gz").getPath + lazy val userpredCount3: String = getClass.getResource("/userpred_count3_userid_only.csv.gz").getPath + lazy val userpredLift3: String = getClass.getResource("/userpred_lift3_userid_only.csv.gz").getPath + lazy val userpredJac3: String = getClass.getResource("/userpred_jac3_userid_only.csv.gz").getPath + + private lazy val tlcSampleData: DataFrame = spark.read + .option("header", "true") //reading the headers + .option("inferSchema", "true") + .csv(testFile).na.drop.cache + + test("tlc test sim count1")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "cooc", simCount1, userAff)) + + test("tlc test sim lift1")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "lift", simLift1, userAff)) + + test("tlc test sim jac1")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "jaccard", simJac1, userAff)) + + test("tlc test sim count3")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "cooc", simCount3, userAff)) + + test("tlc test sim lift3")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "lift", simLift3, userAff)) + + test("tlc test sim jac3")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "jaccard", simJac3, userAff)) + + test("tlc test userpred count3 userid only")( + SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "cooc", simCount3, userAff, userpredCount3)) + + test("tlc test userpred lift3 userid only")( + SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "lift", simLift3, userAff, userpredLift3)) + + test("tlc test userpred jac3 userid only")( + SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "jaccard", simJac3, userAff, userpredJac3)) + + } + + class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] { + override def testObjects(): Seq[TestObject[SARModel]] = { + List( + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol) + .fit(transformedDf), transformedDf), + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol) + .fit(transformedDfWithStrings), transformedDfWithStrings) + ) + } + + override def reader: MLReadable[_] = SARModel + + } + + object SarTLCSpec extends RankingTestBase { + //scalastyle:off field.name + override lazy val userCol = "userId" + override lazy val itemCol = "productId" + override lazy val ratingCol = "rating" + override lazy val userColIndex = "customerID" + override lazy val itemColIndex = "itemID" + //scalastyle:on field.name + + def testAffinityMatrices(tlcSampleData: DataFrame, + threshold: Int, + similarityFunction: String, + simFile: String, + user_aff: String): + (SARModel, RecommendationIndexerModel) = { + + val ratings = tlcSampleData + + val recommendationIndexerModel = recommendationIndexer.fit(ratings) + val transformedDf = recommendationIndexerModel.transform(ratings) + + val itemMap = recommendationIndexerModel.getItemIndex + + val model = sar + .setSupportThreshold(threshold) + .setSimilarityFunction(similarityFunction) + .setStartTime("2015/06/09T19:39:37") + .setStartTimeFormat("yyyy/MM/dd'T'h:mm:ss") + .fit(transformedDf) + + val simMap = model.getItemDataFrame.collect().map(row => { + val itemI = itemMap.getOrElse(row.getDouble(0).toInt, "-1") + val similarityVectorMap = row.getList(1).toArray.zipWithIndex.map(t => (itemMap.getOrElse(t._2, "-1"), t._1)) + .toMap + itemI -> similarityVectorMap + }).toMap + + val itemAff = spark.read.option("header", "true").csv(simFile) + itemAff.collect().foreach(row => { + val itemI = row.getString(0) + itemAff.drop("_c0").schema.fieldNames.foreach(itemJ => { + val groundTrueScore = row.getAs[String](itemJ).toFloat + val sparkSarScore = simMap.getOrElse(itemI, Map()).getOrElse(itemJ, "-1.0") + assert(groundTrueScore == sparkSarScore) + }) + }) + (model, recommendationIndexerModel) + } + + // scalastyle:off method.length + def testProductRecommendations(tlcSampleData: DataFrame, + threshold: Int, + similarityFunction: String, + simFile: String, + user_aff: String, + userPredFile: String): Unit = { + + val (model, recommendationIndexerModel) = testAffinityMatrices(tlcSampleData, threshold, similarityFunction, + simFile, + user_aff) + + val recoverUser = recommendationIndexerModel.recoverUser() + val recoverItem = recommendationIndexerModel.recoverItem() + + val usersProducts = tlcSampleData + .filter(col("userId") === "0003000098E85347") + .select("productId") + .distinct() + .collect() + .map(_.getString(0)) + + val usersProductsBC = spark.sparkContext.broadcast(usersProducts) + + val itemMapBC = spark.sparkContext.broadcast(recommendationIndexerModel.getItemIndex) + + val filterScore = udf((items: Seq[Int], ratings: Seq[Float]) => { + items.zipWithIndex + .filter(p => { + val itemId = itemMapBC.value.getOrElse[String](p._1, "-1") + val bol = usersProductsBC.value.contains(itemId) + !bol + }).map(p => (p._1, ratings.toList(p._2))) + }) + + val row = model.recommendForAllUsers(10 + usersProducts.length) + .select(col("customerID"), filterScore(col("recommendations.itemID"), col("recommendations.rating")) as + "recommendations") + .select(col("customerID"), col("recommendations._1") as "itemID", col("recommendations._2") as "rating") + .select( + recoverUser(col("customerID")) as "customerID", + recoverItem(col("itemID")(0)) as "rec1", + recoverItem(col("itemID")(1)) as "rec2", + recoverItem(col("itemID")(2)) as "rec3", + recoverItem(col("itemID")(3)) as "rec4", + recoverItem(col("itemID")(4)) as "rec5", + recoverItem(col("itemID")(5)) as "rec6", + recoverItem(col("itemID")(6)) as "rec7", + recoverItem(col("itemID")(7)) as "rec8", + recoverItem(col("itemID")(8)) as "rec9", + recoverItem(col("itemID")(9)) as "rec10", + col("rating")(0) as "score1", + col("rating")(1) as "score2", + col("rating")(2) as "score3", + col("rating")(3) as "score4", + col("rating")(4) as "score5", + col("rating")(5) as "score6", + col("rating")(6) as "score7", + col("rating")(7) as "score8", + col("rating")(8) as "score9", + col("rating")(9) as "score10") + .filter(col("customerID") === "0003000098E85347") + .take(1) + + val answer = spark.read.option("header", "true").csv(userPredFile).collect() + + assert(row(0).getString(0) == "0003000098E85347", "Assert Customer ID's Match") + (0 to 10).foreach(i => assert(row(0).getString(i) == answer(0).getString(i))) + (11 to 20).foreach(i => assert("%.3f".format(row(0).getFloat(i)) == "%.3f".format(answer(0).getString(i).toFloat))) + () + } + // scalastyle:on method.length + } diff --git a/docs/Quick Examples/estimators/core/_Recommendation.md b/docs/Quick Examples/estimators/core/_Recommendation.md index 98f9501736..9be63c369e 100644 --- a/docs/Quick Examples/estimators/core/_Recommendation.md +++ b/docs/Quick Examples/estimators/core/_Recommendation.md @@ -61,6 +61,43 @@ ratings = (spark.createDataFrame([ .dropDuplicates() .cache()) +ratings_with_strings = (spark.createDataFrame([ + ("user0", "item1", 4, 4), + ("user0", "item3", 1, 1), + ("user0", "item4", 5, 5), + ("user0", "item5", 3, 3), + ("user0", "item7", 3, 3), + ("user0", "item9", 3, 3), + ("user0", "item10", 3, 3), + ("user1", "item1", 4, 4), + ("user1", "item2", 5, 5), + ("user1", "item3", 1, 1), + ("user1", "item6", 4, 4), + ("user1", "item7", 5, 5), + ("user1", "item8", 1, 1), + ("user1", "item10", 3, 3), + ("user2", "item1", 4, 4), + ("user2", "item2", 1, 1), + ("user2", "item3", 1, 1), + ("user2", "item4", 5, 5), + ("user2", "item5", 3, 3), + ("user2", "item6", 4, 4), + ("user2", "item8", 1, 1), + ("user2", "item9", 5, 5), + ("user2", "item10", 3, 3), + ("user3", "item2", 5, 5), + ("user3", "item3", 1, 1), + ("user3", "item4", 5, 5), + ("user3", "item5", 3, 3), + ("user3", "item6", 4, 4), + ("user3", "item7", 5, 5), + ("user3", "item8", 1, 1), + ("user3", "item9", 5, 5), + ("user3", "item10", 3, 3) + ], ["originalCustomerID", "newCategoryID", "rating", "notTime"]) + .coalesce(1) + .cache()) + recommendationIndexer = (RecommendationIndexer() .setUserInputCol("customerIDOrg") .setUserOutputCol("customerID") @@ -275,6 +312,43 @@ ratings = (spark.createDataFrame([ .dropDuplicates() .cache()) +ratings_with_strings = (spark.createDataFrame([ + ("user0", "item1", 4, 4), + ("user0", "item3", 1, 1), + ("user0", "item4", 5, 5), + ("user0", "item5", 3, 3), + ("user0", "item7", 3, 3), + ("user0", "item9", 3, 3), + ("user0", "item10", 3, 3), + ("user1", "item1", 4, 4), + ("user1", "item2", 5, 5), + ("user1", "item3", 1, 1), + ("user1", "item6", 4, 4), + ("user1", "item7", 5, 5), + ("user1", "item8", 1, 1), + ("user1", "item10", 3, 3), + ("user2", "item1", 4, 4), + ("user2", "item2", 1, 1), + ("user2", "item3", 1, 1), + ("user2", "item4", 5, 5), + ("user2", "item5", 3, 3), + ("user2", "item6", 4, 4), + ("user2", "item8", 1, 1), + ("user2", "item9", 5, 5), + ("user2", "item10", 3, 3), + ("user3", "item2", 5, 5), + ("user3", "item3", 1, 1), + ("user3", "item4", 5, 5), + ("user3", "item5", 3, 3), + ("user3", "item6", 4, 4), + ("user3", "item7", 5, 5), + ("user3", "item8", 1, 1), + ("user3", "item9", 5, 5), + ("user3", "item10", 3, 3) + ], ["originalCustomerID", "newCategoryID", "rating", "notTime"]) + .coalesce(1) + .cache()) + recommendationIndexer = (RecommendationIndexer() .setUserInputCol("customerIDOrg") .setUserOutputCol("customerID") @@ -298,6 +372,10 @@ adapter = (RankingAdapter() res1 = recommendationIndexer.fit(ratings).transform(ratings).cache() adapter.fit(res1).transform(res1).show() + +res2 = recommendationIndexer.fit(ratings_with_strings).transform(ratings_with_strings).cache() + +adapter.fit(res2).transform(res2).show() ``` @@ -344,6 +422,43 @@ val ratings = (Seq( .dropDuplicates() .cache()) +val ratings_with_strings = (Seq( + ("user0", "item1", 4, 4), + ("user0", "item3", 1, 1), + ("user0", "item4", 5, 5), + ("user0", "item5", 3, 3), + ("user0", "item7", 3, 3), + ("user0", "item9", 3, 3), + ("user0", "item10", 3, 3), + ("user1", "item1", 4, 4), + ("user1", "item2", 5, 5), + ("user1", "item3", 1, 1), + ("user1", "item6", 4, 4), + ("user1", "item7", 5, 5), + ("user1", "item8", 1, 1), + ("user1", "item10", 3, 3), + ("user2", "item1", 4, 4), + ("user2", "item2", 1, 1), + ("user2", "item3", 1, 1), + ("user2", "item4", 5, 5), + ("user2", "item5", 3, 3), + ("user2", "item6", 4, 4), + ("user2", "item8", 1, 1), + ("user2", "item9", 5, 5), + ("user2", "item10", 3, 3), + ("user3", "item2", 5, 5), + ("user3", "item3", 1, 1), + ("user3", "item4", 5, 5), + ("user3", "item5", 3, 3), + ("user3", "item6", 4, 4), + ("user3", "item7", 5, 5), + ("user3", "item8", 1, 1), + ("user3", "item9", 5, 5), + ("user3", "item10", 3, 3)) + .toDF("originalCustomerID", "newCategoryID", "rating", "notTime") + .coalesce(1) + .cache()) + val recommendationIndexer = (new RecommendationIndexer() .setUserInputCol("customerIDOrg") .setUserOutputCol("customerID") @@ -367,6 +482,10 @@ val adapter = (new RankingAdapter() val res1 = recommendationIndexer.fit(ratings).transform(ratings).cache() adapter.fit(res1).transform(res1).show() + +val res2 = recommendationIndexer.fit(ratings_with_strings).transform(ratings_with_strings).cache() + +adapter.fit(res2).transform(res2).show() ```