五 离线推荐数据缓存
5.1离线数据缓存之离线召回集
-
这里主要是利用我们前面训练的ALS模型进行协同过滤召回,但是注意,我们ALS模型召回的是用户最感兴趣的类别,而我们需要的是用户可能感兴趣的广告的集合,因此我们还需要根据召回的类别匹配出对应的广告。
所以这里我们除了需要我们训练的ALS模型以外,还需要有一个广告和类别的对应关系。
# 从HDFS中加载广告基本信息数据,返回spark dafaframe对象
df = spark.read.csv("hdfs://localhost:9000/data/ad_feature.csv", header=True)
# 注意:由于本数据集中存在NULL字样的数据,无法直接设置schema,只能先将NULL类型的数据处理掉,然后进行类型转换
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
# 替换掉NULL字符串,替换掉
df = df.replace("NULL", "-1")
# 更改df表结构:更改列类型和列名称
ad_feature_df = df.\
withColumn("adgroup_id", df.adgroup_id.cast(IntegerType())).withColumnRenamed("adgroup_id", "adgroupId").\
withColumn("cate_id", df.cate_id.cast(IntegerType())).withColumnRenamed("cate_id", "cateId").\
withColumn("campaign_id", df.campaign_id.cast(IntegerType())).withColumnRenamed("campaign_id", "campaignId").\
withColumn("customer", df.customer.cast(IntegerType())).withColumnRenamed("customer", "customerId").\
withColumn("brand", df.brand.cast(IntegerType())).withColumnRenamed("brand", "brandId").\
withColumn("price", df.price.cast(FloatType()))
# 这里我们只需要adgroupId、和cateId
_ = ad_feature_df.select("adgroupId", "cateId")
# 由于这里数据集其实很少,所以我们再直接转成Pandas dataframe来处理,把数据载入内存
pdf = _.toPandas()
# 手动释放一些内存
del df
del ad_feature_df
del _
import gc
gc.collect()
- 根据指定的类别找到对应的广告
import numpy as np
pdf.where(pdf.cateId==11156).dropna().adgroupId
np.random.choice(pdf.where(pdf.cateId==11156).dropna().adgroupId.astype(np.int64), 200)
显示结果:
313 138953.0
314 467512.0
1661 140008.0
1666 238772.0
1669 237471.0
1670 238761.0
...
843456 352273.0
846728 818681.0
846729 838953.0
846810 845337.0
Name: adgroupId, Length: 731, dtype: float64
- 利用ALS模型进行类别的召回
# 加载als模型,注意必须先有spark上下文管理器,即sparkContext,但这里sparkSession创建后,自动创建了sparkContext
from pyspark.ml.recommendation import ALSModel
# 从hdfs加载之前存储的模型
als_model = ALSModel.load("hdfs://localhost:9000/models/userCateRatingALSModel.obj")
# 返回模型中关于用户的所有属性 df: id features
als_model.userFactors
显示结果:
DataFrame[id: int, features: array<float>]
import pandas as pd
cateId_df = pd.DataFrame(pdf.cateId.unique(),columns=["cateId"])
cateId_df
显示结果:
cateId
0 1
1 2
2 3
3 4
4 5
5 6
6 7
... ...
6766 12948
6767 12955
6768 12960
6769 rows × 1 columns
cateId_df.insert(0, "userId", np.array([8 for i in range(6769)]))
cateId_df
显示结果:
userId cateId
0 8 1
1 8 2
2 8 3
3 8 4
4 8 5
... ... ...
6766 8 12948
6767 8 12955
6768 8 12960
6769 rows × 2 columns
- 传入 userid、cataId的df,对应预测值进行排序
als_model.transform(spark.createDataFrame(cateId_df)).sort("prediction", ascending=False).na.drop().show()
显示结果:
+------+------+----------+
|userId|cateId|prediction|
+------+------+----------+
| 8| 7214| 9.917084|
| 8| 877| 7.479664|
| 8| 7266| 7.4762917|
| 8| 10856| 7.3395424|
| 8| 4766| 7.149538|
| 8| 7282| 6.6835284|
| 8| 7270| 6.2145095|
| 8| 201| 6.0623236|
| 8| 4267| 5.9155636|
| 8| 7267| 5.838009|
| 8| 5392| 5.6882005|
| 8| 6261| 5.6804466|
| 8| 6306| 5.2992325|
| 8| 11050| 5.245261|
| 8| 8655| 5.1701374|
| 8| 4610| 5.139578|
| 8| 932| 5.12694|
| 8| 12276| 5.0776596|
| 8| 8071| 4.979195|
| 8| 6580| 4.8523283|
+------+------+----------+
only showing top 20 rows
import numpy as np
import pandas as pd
import redis
# 存储用户召回,使用redis第9号数据库,类型:sets类型
client = redis.StrictRedis(host="192.168.19.137", port=6379, db=9)
# 遍历als_model 中 所有用户的id
for r in als_model.userFactors.select("id").collect():
userId = r.id
#准备 当前用户 和 所有类别 一一对应的dataframe
cateId_df = pd.DataFrame(pdf.cateId.unique(),columns=["cateId"])
cateId_df.insert(0, "userId", np.array([userId for i in range(6769)]))
ret = set()
# 利用模型,传入datasets(userId, cateId),这里控制了userId一样,所以相当于是在求某用户对所有分类的兴趣程度
cateId_list = als_model.transform(spark.createDataFrame(cateId_df)).sort("prediction", ascending=False).na.drop()
# 找到前 20个 最感兴趣的类别 从前20个分类中选出500个进行召回
for i in cateId_list.head(20):
need = 500 - len(ret) # 如果不足500个,那么随机选出need个广告
ret = ret.union(np.random.choice(pdf.where(pdf.cateId==i.cateId).adgroupId.dropna().astype(np.int64), need))
if len(ret) >= 500: # 如果达到500个则退出
break
client.sadd(userId, *ret)
# 如果redis所在机器,内存不足,会抛出异常
5.2 离线数据缓存之离线特征
# "pid", 广告资源位,属于场景特征,也就是说,每一种广告通常是可以防止在多种资源外下的
# 因此这里对于pid,应该是由广告系统发起推荐请求时,向推荐系统明确要推荐的用户是谁,以及对应的资源位,或者说有哪些
# 这样如果有多个资源位,那么每个资源位都会对应相应的一个推荐列表
# 需要进行缓存的特征值
feature_cols_from_ad = [
"price" # 来自广告基本信息中
]
# 用户特征
feature_cols_from_user = [
"cms_group_id",
"final_gender_code",
"age_level",
"shopping_level",
"occupation",
"pvalue_level",
"new_user_class_level"
]
- 从HDFS中加载广告基本信息数据
_ad_feature_df = spark.read.csv("hdfs://localhost:9000/data/ad_feature.csv", header=True)
# 更改表结构,转换为对应的数据类型
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
# 替换掉NULL字符串
_ad_feature_df = _ad_feature_df.replace("NULL", "-1")
# 更改df表结构:更改列类型和列名称
ad_feature_df = _ad_feature_df.\
withColumn("adgroup_id", _ad_feature_df.adgroup_id.cast(IntegerType())).withColumnRenamed("adgroup_id", "adgroupId").\
withColumn("cate_id", _ad_feature_df.cate_id.cast(IntegerType())).withColumnRenamed("cate_id", "cateId").\
withColumn("campaign_id", _ad_feature_df.campaign_id.cast(IntegerType())).withColumnRenamed("campaign_id", "campaignId").\
withColumn("customer", _ad_feature_df.customer.cast(IntegerType())).withColumnRenamed("customer", "customerId").\
withColumn("brand", _ad_feature_df.brand.cast(IntegerType())).withColumnRenamed("brand", "brandId").\
withColumn("price", _ad_feature_df.price.cast(FloatType()))
def foreachPartition(partition):
import redis
import json
client = redis.StrictRedis(host="192.168.19.137", port=6379, db=10)
for r in partition:
data = {
"price": r.price
}
# 转成json字符串再保存,能保证数据再次倒出来时,能有效的转换成python类型
client.hset("ad_features", r.adgroupId, json.dumps(data))
ad_feature_df.foreachPartition(foreachPartition)
- 从HDFS加载用户基本信息数据
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType
# 构建表结构schema对象
schema = StructType([
StructField("userId", IntegerType()),
StructField("cms_segid", IntegerType()),
StructField("cms_group_id", IntegerType()),
StructField("final_gender_code", IntegerType()),
StructField("age_level", IntegerType()),
StructField("pvalue_level", IntegerType()),
StructField("shopping_level", IntegerType()),
StructField("occupation", IntegerType()),
StructField("new_user_class_level", IntegerType())
])
# 利用schema从hdfs加载
user_profile_df = spark.read.csv("hdfs://localhost:8020/csv/user_profile.csv", header=True, schema=schema)
user_profile_df
显示结果:
DataFrame[userId: int, cms_segid: int, cms_group_id: int, final_gender_code: int, age_level: int, pvalue_level: int, shopping_level: int, occupation: int, new_user_class_level: int]
def foreachPartition2(partition):
import redis
import json
client = redis.StrictRedis(host="192.168.199.188", port=6379, db=10)
for r in partition:
data = {
"cms_group_id": r.cms_group_id,
"final_gender_code": r.final_gender_code,
"age_level": r.age_level,
"shopping_level": r.shopping_level,
"occupation": r.occupation,
"pvalue_level": r.pvalue_level,
"new_user_class_level": r.new_user_class_level
}
# 转成json字符串再保存,能保证数据再次倒出来时,能有效的转换成python类型
client.hset("user_features1", r.userId, json.dumps(data))
user_profile_df.foreachPartition(foreachPartition2)