# retrieve featurized product data
product_features = (
spark
.table('DELTA.`/mnt/reviews/tmp/description_sim`').selectExpr('id','bucket','w2v','features')
.join( # join to product metadata to allow easier review of recommendations
spark.table('reviews.metadata').select('id','asin','title','category','description'),
on='id',
how='inner'
)
.select('id', 'asin', 'w2v', 'features', 'title', 'description', 'category', 'bucket')
)
display(
product_features
)
# retrieve user reviews numbered most recent to oldest
sequenced_reviews = (
spark
.sql('''
WITH validReviews AS (
SELECT
reviewerID,
product_id,
overall as rating,
unixReviewTime
FROM reviews.reviews
WHERE product_id IS NOT NULL
)
SELECT
a.reviewerID,
a.product_id,
a.rating,
row_number() OVER (PARTITION BY a.reviewerID ORDER BY a.unixReviewTime DESC) as seq_id
FROM validReviews a
LEFT SEMI JOIN (SELECT reviewerID FROM validReviews GROUP BY reviewerID HAVING COUNT(*)>=5) b
ON a.reviewerID=b.reviewerID
''')
)
# get last two ratings as holdout
reviews_hold = (
sequenced_reviews
.filter('seq_id <= 2')
.select('reviewerID', 'product_id', 'rating')
)
# get all but last two ratings as calibration
reviews_cali = (
sequenced_reviews
.filter('seq_id > 2')
.select('reviewerID', 'product_id', 'rating')
)
display(
reviews_cali
)
# calculate weighted averages on product features for each user
user_profiles_cali = (
product_features
.join(
reviews_cali.filter('rating >= 4'), # limit ratings to 4s and 5s as discussed above
on=[product_features.id==reviews_cali.product_id],
how='inner'
)
.groupBy('reviewerID')
.agg(
Summarizer.mean(col('w2v'), weightCol=col('rating')).alias('w2v')
)
)
user_profiles_cali_norm = (
Normalizer(inputCol='w2v', outputCol='features', p=2.0)
.transform(user_profiles_cali)
).cache()
display(
user_profiles_cali_norm
)
# retrieve model from mlflow
cluster_model = mlflow.spark.load_model(
model_uri='models:/description_clust/None'
)
# assign profiles to clusters/buckets
user_profiles_cali_clustered = (
cluster_model.transform(user_profiles_cali_norm)
)
# drop any old delta lake files that might have been created
shutil.rmtree('/dbfs/mnt/reviews/gold/user_profiles_cali', ignore_errors=True)
# persist dataset as delta table
(
user_profiles_cali_clustered
.write
.format('parquet')
.mode('overwrite')
.partitionBy('bucket')
.save('/mnt/reviews/gold/user_profiles_cali')
)
display(
spark.table('PARQUET.`/mnt/reviews/gold/user_profiles_cali`')
)
# make recommendations for sampled reviewers
recommendations = (
product_features
.hint('skew','bucket') # hint to ensure join is balanced
.withColumnRenamed('features', 'features_b')
.join( cali_profiles.withColumnRenamed('features', 'features_a'), on='bucket', how='inner') # join products to profiles on buckets
.withColumn('distance', expr('euclidean_distance(features_a, features_b)')) # calculate similarity
.withColumn('raw_sim', expr('1/(1+distance)'))
.withColumn('min_score', expr('1/(1+sqrt(2))'))
.withColumn('similarity', expr('(raw_sim - min_score)/(1-min_score)'))
.select('reviewerID', 'id', 'similarity')
.withColumn('rank_ui', expr('percent_rank() OVER(PARTITION BY reviewerID ORDER BY similarity DESC)')) # calculate percent rank for recommendations
)
# drop any old delta lake files that might have been created
shutil.rmtree('/dbfs/mnt/reviews/gold/user_profile_recommendations', ignore_errors=True)
# persist dataset as delta table
(
recommendations
.write
.format('delta')
.mode('overwrite')
.save('/mnt/reviews/gold/user_profile_recommendations')
)
# we are retrieving a subset of recommendations for one user so that the range of rank_ui values is more visible
display(
spark
.table('DELTA.`/mnt/reviews/gold/user_profile_recommendations`')
.join( spark.table('DELTA.`/mnt/reviews/gold/user_profile_recommendations`').limit(1), on='reviewerID', how='left_semi' )
.sample(False,0.01)
.orderBy('rank_ui', ascending=True)
)
# retreive recommendations generated in prior step
recommendations = spark.table('DELTA.`/mnt/reviews/gold/user_profile_recommendations`')
# calculate evaluation metric
display(
reviews_hold
.join(
recommendations,
on=[reviews_hold.product_id==recommendations.id, reviews_hold.reviewerID==recommendations.reviewerID],
how='inner'
)
.withColumn('weighted_r', recommendations.rank_ui * reviews_hold.rating)
.groupBy()
.agg( sum('weighted_r').alias('numerator'),
sum('rating').alias('denominator')
)
.withColumn('mean_percent_rank', expr('numerator / denominator'))
.select('mean_percent_rank')
)
Introduction
The purpose of this notebook is to examine how user ratings may be taken into consideration when making content-based recommendations. This notebook should be run on a Databricks ML 7.3+ cluster.
Up to this point, we've been using content-based filters to identify similar items leveraging similarities in product features. But using user feedback, whether explicit or implicit, we can begin to construct a profile for the kinds of products we believe a customer might like and position a wider, sometimes eclectic assortment of products in the process:
Last refresh: Never