五 离线推荐数据缓存

5.1离线数据缓存之离线召回集

  • 这里主要是利用我们前面训练的ALS模型进行协同过滤召回,但是注意,我们ALS模型召回的是用户最感兴趣的类别,而我们需要的是用户可能感兴趣的广告的集合,因此我们还需要根据召回的类别匹配出对应的广告。

    所以这里我们除了需要我们训练的ALS模型以外,还需要有一个广告和类别的对应关系。

# 从HDFS中加载广告基本信息数据,返回spark dafaframe对象
df = spark.read.csv("hdfs://localhost:9000/data/ad_feature.csv", header=True)

# 注意:由于本数据集中存在NULL字样的数据,无法直接设置schema,只能先将NULL类型的数据处理掉,然后进行类型转换

from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

# 替换掉NULL字符串,替换掉
df = df.replace("NULL", "-1")

# 更改df表结构:更改列类型和列名称
ad_feature_df = df.\
    withColumn("adgroup_id", df.adgroup_id.cast(IntegerType())).withColumnRenamed("adgroup_id", "adgroupId").\
    withColumn("cate_id", df.cate_id.cast(IntegerType())).withColumnRenamed("cate_id", "cateId").\
    withColumn("campaign_id", df.campaign_id.cast(IntegerType())).withColumnRenamed("campaign_id", "campaignId").\
    withColumn("customer", df.customer.cast(IntegerType())).withColumnRenamed("customer", "customerId").\
    withColumn("brand", df.brand.cast(IntegerType())).withColumnRenamed("brand", "brandId").\
    withColumn("price", df.price.cast(FloatType()))

# 这里我们只需要adgroupId、和cateId
_ = ad_feature_df.select("adgroupId", "cateId")
# 由于这里数据集其实很少,所以我们再直接转成Pandas dataframe来处理,把数据载入内存
pdf = _.toPandas()


# 手动释放一些内存
del df
del ad_feature_df
del _
import gc
gc.collect()
  • 根据指定的类别找到对应的广告
import numpy as np
pdf.where(pdf.cateId==11156).dropna().adgroupId

np.random.choice(pdf.where(pdf.cateId==11156).dropna().adgroupId.astype(np.int64), 200)

显示结果:

313       138953.0
314       467512.0
1661      140008.0
1666      238772.0
1669      237471.0
1670      238761.0
			...   
843456    352273.0
846728    818681.0
846729    838953.0
846810    845337.0
Name: adgroupId, Length: 731, dtype: float64

  • 利用ALS模型进行类别的召回
# 加载als模型,注意必须先有spark上下文管理器,即sparkContext,但这里sparkSession创建后,自动创建了sparkContext

from pyspark.ml.recommendation import ALSModel
# 从hdfs加载之前存储的模型
als_model = ALSModel.load("hdfs://localhost:9000/models/userCateRatingALSModel.obj")
# 返回模型中关于用户的所有属性 df: id features
als_model.userFactors

显示结果:

DataFrame[id: int, features: array<float>]
import pandas as pd
cateId_df = pd.DataFrame(pdf.cateId.unique(),columns=["cateId"])
cateId_df

显示结果:

	cateId
0	1
1	2
2	3
3	4
4	5
5	6
6	7
...	...
6766	12948
6767	12955
6768	12960
6769 rows × 1 columns

cateId_df.insert(0, "userId", np.array([8 for i in range(6769)]))
cateId_df

显示结果:

 userId cateId
0	8	1
1	8	2
2	8	3
3	8	4
4	8	5
...	...	...
6766	8	12948
6767	8	12955
6768	8	12960
6769 rows × 2 columns

  • 传入 userid、cataId的df,对应预测值进行排序
als_model.transform(spark.createDataFrame(cateId_df)).sort("prediction", ascending=False).na.drop().show()

显示结果:

+------+------+----------+
|userId|cateId|prediction|
+------+------+----------+
|     8|  7214|  9.917084|
|     8|   877|  7.479664|
|     8|  7266| 7.4762917|
|     8| 10856| 7.3395424|
|     8|  4766|  7.149538|
|     8|  7282| 6.6835284|
|     8|  7270| 6.2145095|
|     8|   201| 6.0623236|
|     8|  4267| 5.9155636|
|     8|  7267|  5.838009|
|     8|  5392| 5.6882005|
|     8|  6261| 5.6804466|
|     8|  6306| 5.2992325|
|     8| 11050|  5.245261|
|     8|  8655| 5.1701374|
|     8|  4610|  5.139578|
|     8|   932|   5.12694|
|     8| 12276| 5.0776596|
|     8|  8071|  4.979195|
|     8|  6580| 4.8523283|
+------+------+----------+
only showing top 20 rows

import numpy as np
import pandas as pd

import redis

# 存储用户召回,使用redis第9号数据库,类型:sets类型
client = redis.StrictRedis(host="192.168.19.137", port=6379, db=9)
# 遍历als_model 中 所有用户的id
for r in als_model.userFactors.select("id").collect():
    
    userId = r.id
    
    #准备 当前用户 和 所有类别 一一对应的dataframe
    cateId_df = pd.DataFrame(pdf.cateId.unique(),columns=["cateId"])
    cateId_df.insert(0, "userId", np.array([userId for i in range(6769)]))
    ret = set()
    
    # 利用模型,传入datasets(userId, cateId),这里控制了userId一样,所以相当于是在求某用户对所有分类的兴趣程度
    cateId_list = als_model.transform(spark.createDataFrame(cateId_df)).sort("prediction", ascending=False).na.drop()
    # 找到前 20个 最感兴趣的类别 从前20个分类中选出500个进行召回
    for i in cateId_list.head(20):
        need = 500 - len(ret)    # 如果不足500个,那么随机选出need个广告
        ret = ret.union(np.random.choice(pdf.where(pdf.cateId==i.cateId).adgroupId.dropna().astype(np.int64), need))
        if len(ret) >= 500:    # 如果达到500个则退出
            break
    client.sadd(userId, *ret)
    
# 如果redis所在机器,内存不足,会抛出异常

5.2 离线数据缓存之离线特征

# "pid", 广告资源位,属于场景特征,也就是说,每一种广告通常是可以防止在多种资源外下的
# 因此这里对于pid,应该是由广告系统发起推荐请求时,向推荐系统明确要推荐的用户是谁,以及对应的资源位,或者说有哪些
# 这样如果有多个资源位,那么每个资源位都会对应相应的一个推荐列表

# 需要进行缓存的特征值
    
feature_cols_from_ad = [
    "price"    # 来自广告基本信息中
]

# 用户特征
feature_cols_from_user = [
    "cms_group_id",
    "final_gender_code",
    "age_level",
    "shopping_level",
    "occupation",
    "pvalue_level",
    "new_user_class_level"
]
  • 从HDFS中加载广告基本信息数据
_ad_feature_df = spark.read.csv("hdfs://localhost:9000/data/ad_feature.csv", header=True)

# 更改表结构,转换为对应的数据类型
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

# 替换掉NULL字符串
_ad_feature_df = _ad_feature_df.replace("NULL", "-1")
 
# 更改df表结构:更改列类型和列名称
ad_feature_df = _ad_feature_df.\
    withColumn("adgroup_id", _ad_feature_df.adgroup_id.cast(IntegerType())).withColumnRenamed("adgroup_id", "adgroupId").\
    withColumn("cate_id", _ad_feature_df.cate_id.cast(IntegerType())).withColumnRenamed("cate_id", "cateId").\
    withColumn("campaign_id", _ad_feature_df.campaign_id.cast(IntegerType())).withColumnRenamed("campaign_id", "campaignId").\
    withColumn("customer", _ad_feature_df.customer.cast(IntegerType())).withColumnRenamed("customer", "customerId").\
    withColumn("brand", _ad_feature_df.brand.cast(IntegerType())).withColumnRenamed("brand", "brandId").\
    withColumn("price", _ad_feature_df.price.cast(FloatType()))
    
def foreachPartition(partition):
    
    import redis
    import json
    client = redis.StrictRedis(host="192.168.19.137", port=6379, db=10)
    
    for r in partition:
        data = {
   
            "price": r.price
        }
        # 转成json字符串再保存,能保证数据再次倒出来时,能有效的转换成python类型
        client.hset("ad_features", r.adgroupId, json.dumps(data))
        
ad_feature_df.foreachPartition(foreachPartition)
  • 从HDFS加载用户基本信息数据
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType

# 构建表结构schema对象
schema = StructType([
    StructField("userId", IntegerType()),
    StructField("cms_segid", IntegerType()),
    StructField("cms_group_id", IntegerType()),
    StructField("final_gender_code", IntegerType()),
    StructField("age_level", IntegerType()),
    StructField("pvalue_level", IntegerType()),
    StructField("shopping_level", IntegerType()),
    StructField("occupation", IntegerType()),
    StructField("new_user_class_level", IntegerType())
])
# 利用schema从hdfs加载
user_profile_df = spark.read.csv("hdfs://localhost:8020/csv/user_profile.csv", header=True, schema=schema)
user_profile_df

显示结果:

DataFrame[userId: int, cms_segid: int, cms_group_id: int, final_gender_code: int, age_level: int, pvalue_level: int, shopping_level: int, occupation: int, new_user_class_level: int]

def foreachPartition2(partition):
    
    import redis
    import json
    client = redis.StrictRedis(host="192.168.199.188", port=6379, db=10)
    
    for r in partition:
        data = {
   
            "cms_group_id": r.cms_group_id,
            "final_gender_code": r.final_gender_code,
            "age_level": r.age_level,
            "shopping_level": r.shopping_level,
            "occupation": r.occupation,
            "pvalue_level": r.pvalue_level,
            "new_user_class_level": r.new_user_class_level
        }
        # 转成json字符串再保存,能保证数据再次倒出来时,能有效的转换成python类型
        client.hset("user_features1", r.userId, json.dumps(data))
        
user_profile_df.foreachPartition(foreachPartition2)