Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support for userCol and itemCol as string types in SAR model #2283

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, I
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseMatrix}
import org.apache.spark.sql.functions.{col, collect_list, sum, udf, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType, IntegerType, LongType}
import org.apache.spark.sql.{DataFrame, Dataset}

import java.text.SimpleDateFormat
Expand Down Expand Up @@ -106,8 +106,28 @@ class SAR(override val uid: String) extends Estimator[SARModel]
(0 to numItems.value).map(i => map.getOrElse(i, 0.0).toFloat).toArray
})

dataset
.withColumn(C.AffinityCol, (dataset.columns.contains(getTimeCol), dataset.columns.contains(getRatingCol)) match {
val userColType = dataset.schema(getUserCol).dataType
val itemColType = dataset.schema(getItemCol).dataType

val castedDataset = (userColType, itemColType) match {
case (StringType, StringType) =>
dataset.withColumn(getUserCol, col(getUserCol).cast("int"))
.withColumn(getItemCol, col(getItemCol).cast("int"))
case (StringType, _) =>
dataset.withColumn(getUserCol, col(getUserCol).cast("int"))
case (_, StringType) =>
dataset.withColumn(getItemCol, col(getItemCol).cast("int"))
case (IntegerType, IntegerType) =>
dataset.withColumn(getUserCol, col(getUserCol).cast("int"))
.withColumn(getItemCol, col(getItemCol).cast("int"))
case (LongType, LongType) =>
dataset.withColumn(getUserCol, col(getUserCol).cast("long"))
.withColumn(getItemCol, col(getItemCol).cast("long"))
case _ => dataset
}

castedDataset
.withColumn(C.AffinityCol, (castedDataset.columns.contains(getTimeCol), castedDataset.columns.contains(getRatingCol)) match {
case (true, true) => blendWeights(timeDecay(col(getTimeCol)), col(getRatingCol))
case (true, false) => timeDecay(col(getTimeCol))
case (false, true) => col(getRatingCol)
Expand Down Expand Up @@ -197,7 +217,27 @@ class SAR(override val uid: String) extends Estimator[SARModel]
})
})

dataset
val userColType = dataset.schema(getUserCol).dataType
val itemColType = dataset.schema(getItemCol).dataType

val castedDataset = (userColType, itemColType) match {
case (StringType, StringType) =>
dataset.withColumn(getUserCol, col(getUserCol).cast("int"))
.withColumn(getItemCol, col(getItemCol).cast("int"))
case (StringType, _) =>
dataset.withColumn(getUserCol, col(getUserCol).cast("int"))
case (_, StringType) =>
dataset.withColumn(getItemCol, col(getItemCol).cast("int"))
case (IntegerType, IntegerType) =>
dataset.withColumn(getUserCol, col(getUserCol).cast("int"))
.withColumn(getItemCol, col(getItemCol).cast("int"))
case (LongType, LongType) =>
dataset.withColumn(getUserCol, col(getUserCol).cast("long"))
.withColumn(getItemCol, col(getItemCol).cast("long"))
case _ => dataset
}

castedDataset
.select(col(getItemCol), col(getUserCol))
.groupBy(getItemCol).agg(collect_list(getUserCol) as "collect_list")
.withColumn(C.FeaturesCol, createItemFeaturesVector(col("collect_list")))
Expand Down
110 changes: 101 additions & 9 deletions core/src/test/python/synapsemltest/recommendation/test_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,94 @@
.cache()
)

ratings_with_strings = (
spark.createDataFrame(
[
("user0", "item1", 4, 4),
("user0", "item3", 1, 1),
("user0", "item4", 5, 5),
("user0", "item5", 3, 3),
("user0", "item7", 3, 3),
("user0", "item9", 3, 3),
("user0", "item10", 3, 3),
("user1", "item1", 4, 4),
("user1", "item2", 5, 5),
("user1", "item3", 1, 1),
("user1", "item6", 4, 4),
("user1", "item7", 5, 5),
("user1", "item8", 1, 1),
("user1", "item10", 3, 3),
("user2", "item1", 4, 4),
("user2", "item2", 1, 1),
("user2", "item3", 1, 1),
("user2", "item4", 5, 5),
("user2", "item5", 3, 3),
("user2", "item6", 4, 4),
("user2", "item8", 1, 1),
("user2", "item9", 5, 5),
("user2", "item10", 3, 3),
("user3", "item2", 5, 5),
("user3", "item3", 1, 1),
("user3", "item4", 5, 5),
("user3", "item5", 3, 3),
("user3", "item6", 4, 4),
("user3", "item7", 5, 5),
("user3", "item8", 1, 1),
("user3", "item9", 5, 5),
("user3", "item10", 3, 3),
],
["originalCustomerID", "newCategoryID", "rating", "notTime"],
)
.coalesce(1)
.cache()
)

ratings_with_integers = (
spark.createDataFrame(
[
(0, 1, 4, 4),
(0, 3, 1, 1),
(0, 4, 5, 5),
(0, 5, 3, 3),
(0, 7, 3, 3),
(0, 9, 3, 3),
(0, 10, 3, 3),
(1, 1, 4, 4),
(1, 2, 5, 5),
(1, 3, 1, 1),
(1, 6, 4, 4),
(1, 7, 5, 5),
(1, 8, 1, 1),
(1, 10, 3, 3),
(2, 1, 4, 4),
(2, 2, 1, 1),
(2, 3, 1, 1),
(2, 4, 5, 5),
(2, 5, 3, 3),
(2, 6, 4, 4),
(2, 8, 1, 1),
(2, 9, 5, 5),
(2, 10, 3, 3),
(3, 2, 5, 5),
(3, 3, 1, 1),
(3, 4, 5, 5),
(3, 5, 3, 3),
(3, 6, 4, 4),
(3, 7, 5, 5),
(3, 8, 1, 1),
(3, 9, 5, 5),
(3, 10, 3, 3),
],
["originalCustomerID", "newCategoryID", "rating", "notTime"],
)
.coalesce(1)
.cache()
)


class RankingSpec(unittest.TestCase):
@staticmethod
def adapter_evaluator(algo):
def adapter_evaluator(algo, data):
recommendation_indexer = RecommendationIndexer(
userInputCol=USER_ID,
userOutputCol=USER_ID_INDEX,
Expand All @@ -80,7 +164,7 @@ def adapter_evaluator(algo):

adapter = RankingAdapter(mode="allUsers", k=5, recommender=algo)
pipeline = Pipeline(stages=[recommendation_indexer, adapter])
output = pipeline.fit(ratings).transform(ratings)
output = pipeline.fit(data).transform(data)
print(str(output.take(1)) + "\n")

metrics = ["ndcgAt", "fcp", "mrr"]
Expand All @@ -91,13 +175,21 @@ def adapter_evaluator(algo):
+ str(RankingEvaluator(k=3, metricName=metric).evaluate(output)),
)

# def test_adapter_evaluator_als(self):
# als = ALS(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID)
# self.adapter_evaluator(als)
#
# def test_adapter_evaluator_sar(self):
# sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID)
# self.adapter_evaluator(sar)
def test_adapter_evaluator_als(self):
als = ALS(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID)
self.adapter_evaluator(als, ratings)

def test_adapter_evaluator_sar(self):
sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID)
self.adapter_evaluator(sar, ratings)

def test_adapter_evaluator_sar_with_strings(self):
sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID)
self.adapter_evaluator(sar, ratings_with_strings)

def test_adapter_evaluator_sar_with_integers(self):
sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID)
self.adapter_evaluator(sar, ratings_with_integers)

def test_all_tiny(self):
customer_index = StringIndexer(inputCol=USER_ID, outputCol=USER_ID_INDEX)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.recommendation

import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject, TransformerFuzzing}
Expand Down Expand Up @@ -106,6 +103,128 @@ class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] {
test("tlc test userpred jac3 userid only")(
SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "jaccard", simJac3, userAff, userpredJac3))

test("SAR with String User Column") {
val stringUserCol = "stringUserId"
val stringItemCol = "stringItemId"

val stringRatings: DataFrame = spark
.createDataFrame(Seq(
("user1", "item1", 2),
("user1", "item3", 1),
("user1", "item4", 5),
("user2", "item1", 4),
("user2", "item2", 5),
("user2", "item3", 1),
("user3", "item1", 4),
("user3", "item3", 1),
("user3", "item4", 5)
))
.toDF(stringUserCol, stringItemCol, ratingCol)
.dropDuplicates()
.cache()

val stringRecommendationIndexer: RecommendationIndexer = new RecommendationIndexer()
.setUserInputCol(stringUserCol)
.setUserOutputCol(userColIndex)
.setItemInputCol(stringItemCol)
.setItemOutputCol(itemColIndex)
.setRatingCol(ratingCol)

val transformedStringDf: DataFrame = stringRecommendationIndexer.fit(stringRatings)
.transform(stringRatings).cache()

val algo = new SAR()
.setUserCol(stringRecommendationIndexer.getUserOutputCol)
.setItemCol(stringRecommendationIndexer.getItemOutputCol)
.setRatingCol(ratingCol)
.setSupportThreshold(1)
.setSimilarityFunction("jaccard")
.setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy")

val adapter: RankingAdapter = new RankingAdapter()
.setK(5)
.setRecommender(algo)

val recopipeline = new Pipeline()
.setStages(Array(stringRecommendationIndexer, adapter))
.fit(stringRatings)

val output = recopipeline.transform(stringRatings)

val evaluator: RankingEvaluator = new RankingEvaluator()
.setK(5)
.setNItems(10)

assert(evaluator.setMetricName("ndcgAt").evaluate(output) > 0.0)
assert(evaluator.setMetricName("fcp").evaluate(output) > 0.0)
assert(evaluator.setMetricName("mrr").evaluate(output) > 0.0)

val users: DataFrame = spark
.createDataFrame(Seq(("user1", "item1"), ("user2", "item2")))
.toDF(stringUserCol, stringItemCol)

val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel
.asInstanceOf[SARModel].recommendForUserSubset(users, 10)
assert(recs.count == 2)
}

test("SAR with Different DataTypes in User Column") {
val mixedUserCol = "mixedUserId"
val mixedItemCol = "mixedItemId"

val mixedRatings: DataFrame = spark
.createDataFrame(Seq(
(1, "item1", 2),
(1, "item3", 1),
(1, "item4", 5),
(2, "item1", 4),
(2, "item2", 5),
(2, "item3", 1),
(3, "item1", 4),
(3, "item3", 1),
(3, "item4", 5),
("user4", "item1", 3),
("user4", "item2", 2),
("user4", "item3", 4)
))
.toDF(mixedUserCol, mixedItemCol, ratingCol)
.dropDuplicates()
.cache()

val algo = new SAR()
.setUserCol(mixedUserCol)
.setItemCol(mixedItemCol)
.setRatingCol(ratingCol)
.setSupportThreshold(1)
.setSimilarityFunction("jaccard")
.setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy")

val adapter: RankingAdapter = new RankingAdapter()
.setK(5)
.setRecommender(algo)

val recopipeline = new Pipeline()
.setStages(Array(adapter))
.fit(mixedRatings)

val output = recopipeline.transform(mixedRatings)

val evaluator: RankingEvaluator = new RankingEvaluator()
.setK(5)
.setNItems(10)

assert(evaluator.setMetricName("ndcgAt").evaluate(output) > 0.0)
assert(evaluator.setMetricName("fcp").evaluate(output) > 0.0)
assert(evaluator.setMetricName("mrr").evaluate(output) > 0.0)

val users: DataFrame = spark
.createDataFrame(Seq((1, "item1"), (2, "item2"), ("user4", "item3")))
.toDF(mixedUserCol, mixedItemCol)

val recs = recopipeline.stages(0).asInstanceOf[RankingAdapterModel].getRecommenderModel
.asInstanceOf[SARModel].recommendForUserSubset(users, 10)
assert(recs.count == 3)
}
}

class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] {
Expand Down