大赛

第三届BDC——快手活跃用户预测

全称

2018中国高校计算机大赛——大数据挑战赛(Big Data Challenge)

地址

https://www.kesci.com/home/competition/5cb80fd312c371002b12355f

前言

不知道你是否遇到过这样的一些疑惑,就是涉足一个新领域的时候,比如数据挖掘,先非常努力的花时间学习python,numpy,pandas,scipy,sklearn这些基础知识,但是当后面真的遇到实际问题的时候,却不知道如何下手去分析,再回头想掏之前学会的工具时,发现这些工具依然在,但已记不清它们的使用说明。难不成之前学习过的这些都白学了? 其实不是的,这些工具都在,只不过我们之前没有真正的去用过它, 所以,比赛就是一个让我们大展身手的舞台,通过解决实际问题,我们才能真正掌握之前的工具,也是学习知识的一个融合,毕竟解决实际问题,需要各个领域的知识交融和碰撞。在这个过程中,我们还可以认识一些志同道合的伙伴,一起进步和交流,进行思维的碰撞并得到成长。

所以这个数据竞赛修炼系列我会把我的所思所学都记录下来并分享,一是因为解决问题的思路可以迁移和变换,这样或许会帮助更多的人,二是通过整理和总结,可以使得知识和技能在自己脑海中逗留的时间长一些吧。

首先声明这个系列的每一篇都会很长,并且会有一大波代码来袭,毕竟每一篇都是一个完整的数据竞赛,每一篇都需要很长的时间消化整理,因为我想用最朴素,最详细的语言把每个比赛的思路和代码说给你听。

今天是数据竞赛修炼笔记的第一篇,带来的比赛是2018年科赛网上的一个比赛快手用户活跃度的预测,我们拿到了一份快手平台记录的关于快手用户的30天一个行为数据集(记录用户注册,登录,视频观看与发布,互动的记录),我们的目标就是根据这个行为数据集预测在未来七天的活跃用户。
首先我们会对任务的目标和数据进行分析,然后会概览模型的架构,从模型的角度提取特征和标签构建数据集,然后基于tensorflow建立模型并训练得到结果。最后会再结合一些好的解决方案,对解决这个问题的思路和知识点进行总结。

大纲如下:

  • 任务目标与数据分析
  • 整理模型架构
  • 构建用户特征,生成汇总表
  • 制作标签
  • 建立模型并训练,得到最后的结果
  • 知识点和思路的总结(两个知识点温习:pandas的iterrows()和groupby())

一、任务目标与数据分析

开始之前,需要导入包:

import numpy as np
import pandas as pd
import tensorflow as tf
from deep_tools import f
from deep_tools import DataGenerator

1.数据集介绍

app_launch_log.txt 用户30天内登陆的日子

user_activity_log.txt 用户行为数据集 比如对哪个视频进行了点赞转发

user_register_log.txt 用户当前是在哪天进行注册的

video_create_log.txt 视频的创建

2.数据集说明

为期30天的用户数据
但是如果是第七天注册的,就没有前七天的数据
我们这里就是一个月的用户数据
<mark>难点:所有用户的长度不是固定的!!!</mark>
解决:从哪天注册的就从哪天算起,预测<mark>未来一个周期</mark>的活跃度,比如从第七天的预测第14天的、第21天的活跃度这样。

3.读取数据并命名列名

import pandas as pd

register = pd.read_csv('./data/user_register_log.txt',sep='\t',names=['user_id', 'register_day', 'register_type', 'device_type'])
launch = pd.read_csv('./data/app_launch_log.txt',sep='\t', names = ['user_id', 'launch_day'] )# sep="\t" 表示以tab(制表符)为分隔符
create = pd.read_csv('./data/video_create_log.txt',sep='\t', names = ['user_id', 'create_day'])
activity = pd.read_csv('./data/user_activity_log.txt', sep='\t', names = ['user_id', 'act_day','page', 'video_id', 'author_id', 'act_type'])

4.查看每个数据集的前五行

(1)注册日志

register.head()

注册日志说明

第一列是用户的id,第二列是注册的时间,不是具体的某一天,1就是第一天,第三列是当前注册的类型,类似于微信登陆,手机登录等等,第四列是设备类型,脱敏后数据,直接拿过来用

(2)APP启动日志

launch.head()

APP启动日志说明

第一列是用户id,第二天是启动app的天,是启动的某一天,比如330966用户在4,9,11,12都启动了

(3)拍摄日志

create.head()

拍摄日志说明

只有这个用户的id和用户在哪一天创建了视频

(4)行为日志

activity.head()

行为日志说明

用户id,进行行为的哪一天,行为发生在哪一页(比如自己的主页),操作了什么视频id,看的作者的id,操作的动作是什么(比如点赞,转发)

二、整体模型架构

目的:预测一个二分类
方法还是很多的,比如xgboost,神经网络都可以

我们拿到的是序列数据
比如一个用户第七天到第三十天的数据,预测第37天的数据

我们打算用RNN神经网络来做,如果要预测第37 天的可以根据前的每一天都预测一下下一个七天的数据

活跃:当天有登陆就是活跃的为1,label序列可以构建出来的,11100001110101011001类似的

以上就是<mark>RNN</mark>的基本模型架构

三、构建用户特征序列

这个月第一天注册的用户序列数有30
第二天注册的用户序列数只有29
···
第三十天注册的用户序列数就只有1
那我们要查看一下数据的这个情况

给register表多加一个序列长度seq_length

#给register表多加一个序列长度seq_length
register['seq_length'] = 31-register['register_day']
register.head()


构建字典存储用户在持续时间内,不同日期的数据
序列为1的:【用户id,用户id】
序列为2的:【用户id,用户id】
······················
序列为30的:【用户id,用户id】

# 会产生user_queue = {1:[], 2:[], 3:[], ...,30:[]}
user_queue = {
   i:[] for i in range(1,31)}

当前用户里面的索引和值都提取出来,row[-1]就是注册的长度,row[0]就是用户的id。(取到注册的长度,把用户的id放进去)
实现:

for index,row in register.iterrows():
    user_queue[row[-1]].append(row[0])
user_queue


我们模型中的t1,t2,tn都需要传入数据的

比如用户持续了三十天

构造数据:

构建数据特征:

class user_seq:
    
    def __init__(self,register_day,seq_length,n_features):
        self.register = register_day
        self.seq_length = seq_length
        self.array = np.zeros([self.seq_length,n_features])#构建矩阵:持续天数*特征个数,后续新创建的特征来往里面填充
        self.array[0,0] = 1
        self.page_rank = np.zeros([self.seq_length])
        self.pointer = 1
    def put_feature(self, feature_number, string):
        for i in stringing.split(','):
            pos, value = i.split(':')#注册第几天进行了登陆,1为指示符
            self.array[int(pos) - self.register_day, feature_number] = 1
    def put_PR(self,string):
        for i in string.split(','):
            pos, value = i.split(':')
            self.page_rankge_rank[int(pos) - self.register_day] = value
            
    def get_array(self):
        return self.array
    def get_label(self):
        self.label = np.array([BNone]*self.seq_length)
        active  =self.array[:,:10].sum(axis =1)
        for i in range(self.seq_length-7):
            self.label[i]  =1*(np.sum(active[i+1:i+8])>0)
        return self.lable 
        
n_features = 12 data = {
   row[0]:user_seq(register_day=row[1],seq_length=row[-1], n_features = n_features) for index, row in register.iterrows()}
data


在注册日志中的每一个样本进行操作的
每一个id都对应着特征
data的结果是
每一个id对应着一个特征矩阵
不过到此为止特征矩阵里面的值都是0

四、序列特征提取方法

我们要在特征中进行选择了,一共有十二个特征

1.登陆信息

预处理次数每个用户每天登陆的次数设置为1

launch.head()

launch['launch'] = 1
launch.head()


重复出现的数据进行加和,处理一天登录多次
比如一直出现用户id为16 登录天数为第23天

launch_table = launch.groupby(['user_id','launch_day'], as_index = False).agg({
   'launch':'sum'})
launch_table.head()


我们要把它整理成这种格式

<mark>自己定义的将表格变成用户特征序列表</mark>

def record_to_sequence(table):   #得到用户特征序列表
	table.columns=['user_id','day','value']
    table.sort_values(by=['user_id','day'],inplace=True)
    table['string']=table.day.map(str)+':'+table.value.map(str)
    table=table.groupby(['user_id'],as_index=False).agg({
   'string':lambda x:','.join(x)})
    return table
launch_table = record_to_sequence(launch_table)
launch_table.head()

for index,row in launch_table.iterrows():
    data[row[0]].put_feature(1,row[1])
data

五、生成特征汇总表

2.创作视频信息

create.head()

#加一列创作次数的特征,方便同一天发了很多视频
create['create'] = 1
create.head()

#聚合重复的
create_table = create.groupby(['user_id','create_day'],as_index = False).agg({
   'create':'sum'})
create_table.head()

#变成序列
create_table = record_to_sequence(create_table)
create_table.head()


<mark>把十二个特征中的第二个位置,用视频的string填充完</mark>

for index,row in create_table.iterrows():
    data[row[0]].put_feature(2,row[1])#把十二个特征中的第二个位置,用视频的string填充完

3.用户使用时的行为特征

如点赞、转发等
有6种不同的行为,比如点赞是第三个特征,举报是第六个特征
都要构建出

for i in range(6):
    act=activity[activity.act_type==i].copy()
    act=act.groupby(['user_id','act_day'],as_index=False).agg({
   'video_id':'count'})
    act=record_to_sequence(act)
    for index,row in act.iterrows():
        data[row[0]].put_feature(i+3,row[1])

4.产生行为的界面信息

for i in range(1): #暂不作为特征
    act=activity[activity.page==i].copy()
    act=act.groupby(['user_id','act_day'],as_index=False).agg({
   'video_id':'count'})
    act=record_to_sequence(act)
    for index,row in act.iterrows():
        data[row[0]].put_feature(i+9,row[1]) 

5.观看其他用户作品

watched=register.loc[:,['user_id']].copy()
watched.columns=['author_id']
watched=pd.merge(watched,activity[activity.author_id!=activity.user_id],how='inner') #只得到交集,相当于看别人视频的
watched=watched.groupby(['author_id','act_day'],as_index=False).agg({
   'video_id':'count'})
watched=record_to_sequence(watched)
for index,row in watched.iterrows():
    data[row[0]].put_feature(10,row[1])

6.观看自己用户作品

user_id = author_id
比如考虑一下喜欢看自己的作品是不是对这个app比较满意呢

watched=activity[activity.author_id==activity.user_id].copy()
watched=watched.groupby(['user_id','act_day'],as_index=False).agg({
   'video_id':'count'})
watched=record_to_sequence(watched)
for index,row in watched.iterrows():
    data[row[0]].put_feature(11,row[1]) 

至此加上注册,我们的12个特征就ok了
<mark>构建这些特征是为了rnn,所以我们提取的是序列</mark>

六、标签制作

1.活跃用户的定义

未来七天内只要使用过APP的用户,我们定义为活跃用户

2.设置标签

用户从注册开始,对于它每一天的数据进行展开,如果它七天后仍然有行为产生,则标签为1。

用户特征数据即为之前提取的data的各项特征,转换为ndarray即可

data = {
   user_id:user.get_array() for user_id, user in data.items()}

七、合并上述提取方法

#合并上面的方法
register = pd.read_csv('./data/user_register_log.txt',sep = '\t',names=['user_id','register_day','register_type','device_type'])
launch = pd.read_csv('./data/app_launch_log.txt',sep = '\t',names=['user_id','launch_day'])
create = pd.read_csv('./data/video_create_log.txt',sep='\t',names=['user_id','create_day'])
activity = pd.read_csv('./data/user_activity_log.txt',sep='\t',names=['user_id','act_day','page','video_id','author_id','act_type'])

# coding: utf-8

import pandas as pd
import numpy as np
from random import shuffle

def f(table,name='prob'):
    table=table.copy()
    score=[]
    for i in [0.40,0.41,0.42,0.43,0.44,0.45]:
        table['pred']=1*(table[name]>i)
        c=((table.pred==1)&(table.label==1)).sum()
        p=c/table.pred.sum()
        r=c/table.label.sum()
        score.append(2*p*r/(p+r))
    return score

def record_to_sequence(table):
    table.columns=['user_id','day','value']
    table.sort_values(by=['user_id','day'],inplace=True)
    table['string']=table.day.map(str)+':'+table.value.map(str)
    table=table.groupby(['user_id'],as_index=False).agg({
   'string':lambda x:','.join(x)})
    return table

class user_seq:
    
    def __init__(self,register_day,seq_length,n_features):
        self.register_day=register_day
        self.seq_length=seq_length
        self.array=np.zeros([self.seq_length,n_features])
        self.array[0,0]=1
        self.page_rank=np.zeros([self.seq_length])
        self.pointer=1
        
    def put_feature(self,feature_number,string):
        for i in string.split(','):
            pos,value=i.split(':')
            self.array[int(pos)-self.register_day,feature_number]=1

    def put_PR(self,string):
        for i in string.split(','):
            pos,value=i.split(':')
            self.page_rank[int(pos)-self.register_day]=value

    def get_array(self):
        return self.array
    
    def get_label(self):
        self.label=np.array([None]*self.seq_length)
        active=self.array[:,:10].sum(axis=1)
        for i in range(self.seq_length-7):
            self.label[i]=1*(np.sum(active[i+1:i+8])>0)
        return self.label
    

class DataGenerator:
    
    def __init__(self,register,launch,create,activity):
        
        register=register.copy()
        launch=launch.copy()
        create=create.copy()
        activity=activity.copy()
        
        #user_queue
        register['seq_length']=31-register['register_day']
        self.user_queue={
   i:[] for i in range(1,31)}
        for index,row in register.iterrows():
            self.user_queue[row[-1]].append(row[0]) #row[-1]是seq_length,row[0]是user_id
        
        #初始化self.data
        n_features=12 #row[0]是user_id,row[1]是register_day,row[-1]是seq_length
        self.data={
   row[0]:user_seq(register_day=row[1],seq_length=row[-1],n_features=n_features) for index,row in register.iterrows()}
        

        #提取launch_seq
        launch['launch']=1
        launch_table=launch.groupby(['user_id','launch_day'],as_index=False).agg({
   'launch':'sum'})
        launch_table=record_to_sequence(launch_table)
        for index,row in launch_table.iterrows():
            self.data[row[0]].put_feature(1,row[1]) #row[0]是user_id,row[1]是string
            
        #提取create_seq
        create['create']=1
        create_table=create.groupby(['user_id','create_day'],as_index=False).agg({
   'create':'sum'})
        create_table=record_to_sequence(create_table)
        for index,row in create_table.iterrows():
            self.data[row[0]].put_feature(2,row[1]) #row[0]是user_id,row[1]是string

        #提取act_seq
        for i in range(6):
            act=activity[activity.act_type==i].copy()
            act=act.groupby(['user_id','act_day'],as_index=False).agg({
   'video_id':'count'})
            act=record_to_sequence(act)
            for index,row in act.iterrows():
                self.data[row[0]].put_feature(i+3,row[1]) #row[0]是user_id,row[1]是string

        #提取page_seq
        for i in range(1):
            act=activity[activity.page==i].copy()
            act=act.groupby(['user_id','act_day'],as_index=False).agg({
   'video_id':'count'})
            act=record_to_sequence(act)
            for index,row in act.iterrows():
                self.data[row[0]].put_feature(i+9,row[1]) #row[0]是user_id,row[1]是string

        #提取watched
        watched=register.loc[:,['user_id']].copy()
        watched.columns=['author_id']
        watched=pd.merge(watched,activity[activity.author_id!=activity.user_id],how='inner')
        watched=watched.groupby(['author_id','act_day'],as_index=False).agg({
   'video_id':'count'})
        watched=record_to_sequence(watched)
        for index,row in watched.iterrows():
            self.data[row[0]].put_feature(10,row[1]) #row[0]是user_id,row[1]是string

        #提取watched by self
        watched=activity[activity.author_id==activity.user_id].copy()
        watched=watched.groupby(['user_id','act_day'],as_index=False).agg({
   'video_id':'count'})
        watched=record_to_sequence(watched)
        for index,row in watched.iterrows():
            self.data[row[0]].put_feature(11,row[1]) #row[0]是user_id,row[1]是string

        #提取label
        self.label={
   user_id:user.get_label() for user_id,user in self.data.items()}
        
        #提取data
        self.data={
   user_id:user.get_array() for user_id,user in self.data.items()}


        #set sample strategy
        self.local_random_list=[]
        for i in range(15,31):
            self.local_random_list+=[i]*(i-14)
            
        self.online_random_list=[]
        for i in range(8,31):
            self.online_random_list+=[i]*(i-7)

        self.local_train_list=list(range(15,31))
        self.local_test_list=list(range(8,31))
        self.online_train_list=list(range(8,31))
        self.online_test_list=list(range(1,31))

        self.pointer={
   i:0 for i in range(1,31)}
        
    
    def reset_pointer(self):
        self.pointer={
   i:0 for i in range(1,31)}
        
        
    def next_batch(self,batch_size=1000):

        seq_length=self.local_random_list[np.random.randint(len(self.local_random_list))]
        batch_size=batch_size//(seq_length-14)+1

        if self.pointer[seq_length]+batch_size>len(self.user_queue[seq_length]):
            self.pointer[seq_length]=0
            shuffle(self.user_queue[seq_length])
            #print('---------------------',seq_length,'shuffled ------------------------------')
        start=self.pointer[seq_length]
        user_list=self.user_queue[seq_length][start:start+batch_size]
        self.pointer[seq_length]+=batch_size

        user_matrix=np.array(user_list)
        data_matrix=np.array([self.data[i] for i in user_list])
        label_matrix=np.array([self.label[i] for i in user_list])
        
        return seq_length,user_matrix,data_matrix,label_matrix
    
    
    def get_set(self,usage='train'):
        
        if usage=='train':
            test_list=self.local_train_list
        else:
            test_list=self.local_test_list
        
        user_list=[np.array(self.user_queue[seq_length]) for seq_length in test_list]
        data_list=[np.array([self.data[user_id] for user_id in self.user_queue[seq_length]]) for seq_length in test_list]
        label_list=[np.array([self.label[user_id] for user_id in self.user_queue[seq_length]]) for seq_length in test_list]
        return test_list,user_list,data_list,label_list






data_generator = DataGenerator(register,launch,create,activity)#封装好了工具

八、网络训练模块

构建RNN网络模型

with tf.variable_scope('train'):
    
    #变量与输入
    lr=tf.placeholder(tf.float32,[],name='learning_rate')

    W_out=tf.get_variable('W_out',[n_hu,1])
    b_out=tf.get_variable('b_out',[1])

    
    x=tf.placeholder(tf.float32,[None,None,n_features])
    y=tf.placeholder(tf.float32,[None,None])
    
    batch_size=tf.shape(x)[0]
    seq_length=tf.shape(x)[1]
    
    #RNN层
    cell=tf.nn.rnn_cell.GRUCell(n_hu)
    initial_state = cell.zero_state(batch_size, dtype=tf.float32)
    outputs, state = tf.nn.dynamic_rnn(cell, x,
                                       initial_state=initial_state)
    
    #输出层
    outputs=tf.reshape(outputs,[-1,n_hu])
    logits=tf.matmul(outputs,W_out)+b_out
    logits=tf.reshape(logits,tf.stack([batch_size,seq_length]))
#选择部分预测结果与标签当做训练损失计算
logits_local_train=logits[:,:-14]
label_local_train=y[:,:-14]
#设置损失函数
regularizer=tf.contrib.layers.l2_regularizer(0.00001)
penalty=tf.contrib.layers.apply_regularization(regularizer,tf.trainable_variables())

obj_local=tf.losses.sigmoid_cross_entropy(label_local_train,logits_local_train)+penalty
optimizer=tf.train.AdamOptimizer(lr)
step_local=optimizer.minimize(obj_local)

#l选择部分预测结果与标签当做测试损失计算
logits_local_test=logits[:,-8]
label_local_test=y[:,-8]
def train(n_obs=1000,step=1000,lr_feed=0.01):

    date_seq=[31]+list(range(2,16))+[16]*15
    variables=[step_local,obj_local,label_local_train,logits_local_train]
    
    for i in range(step):
        length,id_list,data_x,data_y=data_generator.next_batch(n_obs)
        _,los,lab,log=sess.run(variables,
                               feed_dict={
   x:data_x,
                                          y:data_y,
                                          lr:lr_feed})
sess=tf.Session()
sess.run(tf.global_variables_initializer())
train(n_obs=1000,step=2000,lr_feed=0.01)
def test():
    
    n_NA=14
    date_seq=[31]+list(range(2,16))+[16]*15
    variables_1=[obj_local,logits_local_train,label_local_train]
    variables_2=[logits_local_test,label_local_test]
        
    obs_count,cum_loss,correct=0,0,0
    user,prob,real=[],[],[]

    #训练损失
    for length,id_list,data_x,data_y in zip(*data_generator.get_set('train')):
        _obj,_logits_train,_label_train=sess.run(variables_1,
                           feed_dict={
   x:data_x,
                                      y:data_y,
                                      lr:0.001})
        obs_count+=(length-n_NA)*len(id_list)
        cum_loss+=_obj*(length-n_NA)*len(id_list)
        correct+=np.sum((1*(_logits_train>0)==_label_train))

    #测试损失
    for length,id_list,data_x,data_y in zip(*data_generator.get_set('test')):
        _=sess.run(variables_2,
                           feed_dict={
   x:data_x,
                                      y:data_y,
                                      lr:0.001})
        
        _logits_test,_label_test=_
        real+=list(_label_test)

        user+=list(id_list)
        prob+=list(1/(1+np.exp(-_logits_test.reshape([-1]))))
    
    #训练损失
    print('train_loss',cum_loss/obs_count)
    
    #测试损失
    result=pd.DataFrame({
   'user_id':user,'prob':prob,'label':real})
    print('test_score:',f(result))
    
    return result

九、得出最终模型结果

test()