注:此后更新代码版本均在github上,更快更准,博客不做更新~

2019.3.20


本文代码参考:

参考代码1:https://github.com/AITTSMD/MTCNN-Tensorflow

参考代码2:https://github.com/Seanlinx/mtcnn

参考代码3:https://github.com/CongWeilin/mtcnn-caffe

参考代码4:https://github.com/kpzhang93/MTCNN_face_detection_alignment

在此对其表示衷心的感谢。


基于MTCNN的人脸检测

项目环境及配置:Window10+GTX 1060+Python3.6+Anaconda5.2.0+Spyder+Tensorflow1.9-gpu

本文是对《Joint Face Detection and Alignment Using Multitask Cascaded Convolutional Networks》论文的复现,此时网上已有N篇论文解读以及各种版本的代码复现,优劣参差不齐,大家可择优选读。


1、数据获取

本文训练集采用WIDER_FACElfw_net,选用与论文不同的数据集是因为Celeba数据集标注有很多错误,我在参考代码时看到了另一种数据集也可以很好的使用。

WIDER_FACE的标注格式在其wider_face_split 文件夹下的readme.txt 内。标注格式如下所示:

The format of txt ground truth.
File name
Number of bounding box
x1, y1, w, h, blur, expression, illumination, invalid, occlusion, pose

第一行为图片名称,第二行为人脸框数量,第三行为标注,本文只关注前4个标注。

lfw_net的标注格式可以在其下载网站的Face detector code readme.txt 内。标注格式如下所示:

Each line starts with the image name
followed by the left, right, top, and bottom boundary positions of the face bounding boxes.

每一行第一个字符串为图片名称,接下来分别为左、右、上、下坐标。

注:参考代码中数据集制作时引用的数据标注格式全都是错的,而生成hard_sample样本时格式又是对的?!我不知道这几位大佬是引用的是哪个改了名了标注文本或者用的.mat格式,反正正常下载后打开的txt格式跟他们代码写的完全不一样。遂想直接参考他们代码的童鞋还是先把我的数据集制作这块看完再跑路也不迟。


2、Pnet数据集制作

本项目数据集全部使用Tensorflow的TFRecord格式,比较方便。

TFRecord在输出数据时存在着shuffle不均匀的情况,我在项目1阶段制作数据的时候已经发现,所以必须放一点positive,放一点negative。在参考大佬代码时,大佬也提出全放进一个TFRecord内训练ONet与RNet时比例不均,所以本文三个网络的数据集全部由四个TFRecord构成。

由于本文有回归任务,所以需要记录每张截取人脸图片的bounding_box和landmark,参考大佬的思想,引入txt文档记录。

本文代码地址在文章后尾,可直接参考。

首先还是引入IoU的概念。

(tool.py):

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
import numpy as np
import cv2
import tensorflow as tf

def IoU(box, boxes):
    """
    Compute IoU between detect box and face boxes

    Parameters:
    ----------
    box: numpy array , shape (4, ): x1, y1, x2, y2
         random produced box
    boxes: numpy array, shape (n, 4): x1, y1, w, h
         input ground truth face boxes

    Returns:
    -------
    ovr: numpy.array, shape (n, )
        IoU
    """   
    
    box_area = (box[2] - box[0] + 1) * (box[3] - box[1] + 1)
    area = boxes[:, 2]*boxes[:, 3]
    
    x_right=boxes[:, 2]+boxes[:, 0]
    y_bottom=boxes[:, 3]+boxes[:, 1]
    
    xx1 = np.maximum(box[0], boxes[:, 0])
    yy1 = np.maximum(box[1], boxes[:, 1])
    xx2 = np.minimum(box[2], x_right)
    yy2 = np.minimum(box[3], y_bottom)

    # compute the width and height of the bounding box
    w = np.maximum(0, xx2 - xx1 + 1)
    h = np.maximum(0, yy2 - yy1 + 1)

    inter = w * h
    ovr = inter / (box_area + area - inter)
    return ovr


def NMS(box,_overlap):
    
    if len(box) == 0:
        return []
    
    #xmin, ymin, xmax, ymax, score, cropped_img, scale
    box.sort(key=lambda x :x[4])
    box.reverse()

    pick = []
    x_min = np.array([box[i][0] for i in range(len(box))],np.float32)
    y_min = np.array([box[i][1] for i in range(len(box))],np.float32)
    x_max = np.array([box[i][2] for i in range(len(box))],np.float32)
    y_max = np.array([box[i][3] for i in range(len(box))],np.float32)

    area = (x_max-x_min)*(y_max-y_min)
    idxs = np.array(range(len(box)))

    while len(idxs) > 0:
        i = idxs[0]
        pick.append(i)

        xx1 = np.maximum(x_min[i],x_min[idxs[1:]])
        yy1 = np.maximum(y_min[i],y_min[idxs[1:]])
        xx2 = np.minimum(x_max[i],x_max[idxs[1:]])
        yy2 = np.minimum(y_max[i],y_max[idxs[1:]])

        w = np.maximum(xx2-xx1,0)
        h = np.maximum(yy2-yy1,0)

        overlap = (w*h)/(area[idxs[1:]] + area[i] - w*h)

        idxs = np.delete(idxs, np.concatenate(([0],np.where(((overlap >= _overlap) & (overlap <= 1)))[0]+1)))
    
    return [box[i] for i in pick]


def featuremap(sess,graph,img,scale,map_shape,stride,threshold):
    
    left=0
    up=0
    boundingBox=[]
    
    images=graph.get_tensor_by_name("input/image:0")
    label= graph.get_tensor_by_name("output/label:0")
    roi= graph.get_tensor_by_name("output/roi:0")
    landmark= graph.get_tensor_by_name("output/landmark:0")
    img1=np.reshape(img,(-1,img.shape[0],img.shape[1],img.shape[2]))

    a,b,c=sess.run([label,roi,landmark],feed_dict={images:img1})
    a=np.reshape(a,(-1,2)) 
    b=np.reshape(b,(-1,4)) 
    c=np.reshape(c,(-1,10)) 
    for idx,prob in enumerate(a):
            
        if prob[1]>threshold:
            biasBox=[]
            biasBox.extend([float(left*stride)/scale,float(up*stride)/scale, float(left*stride+map_shape)/scale, float(up*stride+map_shape)/scale,prob[1]])
            biasBox.extend(b[idx])
            biasBox.extend(c[idx])
            boundingBox.append(biasBox)
            
        #防止左越界与下越界
        if (left*stride+map_shape<img.shape[1]):
            left+=1
        elif (up*stride+map_shape<img.shape[0]): 
            left=0
            up+=1
        else : break
            
    return boundingBox


def flip(img,facemark):
    img=cv2.flip(img,1)
    facemark[[0,1]]=facemark[[1,0]]
    facemark[[3,4]]=facemark[[4,3]]   
    return (img,facemark)


def read_single_tfrecord(addr,_batch_size,shape):
    
    filename_queue = tf.train.string_input_producer([addr],shuffle=True)

    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue) 

    features = tf.parse_single_example(serialized_example,
                                   features={
                                   'img':tf.FixedLenFeature([],tf.string),
                                   'label':tf.FixedLenFeature([],tf.int64),                                   
                                   'roi':tf.FixedLenFeature([4],tf.float32),
                                   'landmark':tf.FixedLenFeature([10],tf.float32),
                                   })
    img=tf.decode_raw(features['img'],tf.uint8)
    label=tf.cast(features['label'],tf.int32)
    roi=tf.cast(features['roi'],tf.float32)
    landmark=tf.cast(features['landmark'],tf.float32)
    img = tf.reshape(img, [shape,shape,3])     
    min_after_dequeue = 10000
    batch_size = _batch_size
    capacity = min_after_dequeue + 10 * batch_size
    image_batch, label_batch, roi_batch, landmark_batch = tf.train.shuffle_batch([img,label,roi,landmark], 
                                                        batch_size=batch_size, 
                                                        capacity=capacity, 
                                                        min_after_dequeue=min_after_dequeue,
                                                        num_threads=7)  
    
    label_batch = tf.reshape(label_batch, [batch_size])
    roi_batch = tf.reshape(roi_batch,[batch_size,4])
    landmark_batch = tf.reshape(landmark_batch,[batch_size,10])
    
    return image_batch, label_batch, roi_batch, landmark_batch

   
def read_multi_tfrecords(addr,_batch_size,shape):
    
    pos_dir,part_dir,neg_dir,landmark_dir = addr
    pos_batch_size,part_batch_size,neg_batch_size,landmark_batch_size = _batch_size   
    
    pos_image,pos_label,pos_roi,pos_landmark = read_single_tfrecord(pos_dir, pos_batch_size, shape)
    part_image,part_label,part_roi,part_landmark = read_single_tfrecord(part_dir, part_batch_size, shape)
    neg_image,neg_label,neg_roi,neg_landmark = read_single_tfrecord(neg_dir, neg_batch_size, shape)
    landmark_image,landmark_label,landmark_roi,landmark_landmark = read_single_tfrecord(landmark_dir, landmark_batch_size, shape)

    images = tf.concat([pos_image,part_image,neg_image,landmark_image], 0, name="concat/image")
    labels = tf.concat([pos_label,part_label,neg_label,landmark_label],0,name="concat/label")
    rois = tf.concat([pos_roi,part_roi,neg_roi,landmark_roi],0,name="concat/roi")
    landmarks = tf.concat([pos_landmark,part_landmark,neg_landmark,landmark_landmark],0,name="concat/landmark")
    
    return images,labels,rois,landmarks    
    

def image_color_distort(inputs):
    inputs = tf.image.random_contrast(inputs, lower=0.5, upper=1.5)
    inputs = tf.image.random_brightness(inputs, max_delta=0.2)
    inputs = tf.image.random_hue(inputs,max_delta= 0.2)
    inputs = tf.image.random_saturation(inputs,lower = 0.5, upper= 1.5)

    return inputs


首先进行人脸样本制作,需要生成pos、part以及neg样本。

x,y,w,h为图片参数,x1,y1,w1,h1为人脸框参数

1、neg样本制作时可以随机选择size大小,其范围在[12,min(w,h)/2]之间即可;左顶点的坐标范围选在[0,w-size]之间即可,因为负样本框不可超出原图像,右底点最大坐标为min(w,h)/2+w|h-min(w,h)/2。

2、另外需要制作一些hard样本,即在每个人脸框周围选择IoU小于0.3的负样本框。此时引入偏移量,size范围还是[12,min(w,h)/2],左顶点偏移量delta范围设为[max(-size,-x1|y1),w1|h1],则左顶点坐标为max(0,x1|y1+delta),这样计算是因为可以产生较多的hard样本,此时左顶点的坐标范围为[x1|y1+max(-size,-x1|y1),x1|y1+w1|h1]。这一段的意思是让hard样本始终有一部分与人脸框相交。

3、pos样本制作,即在每个人脸框周围选择IoU大于0.65的正样本框。还是需要引入偏移量,已知人脸框中心点为x1|y1+(w1|h1)/2,我们令其size可在[(x1|y1)*0.8,(w1|h1)*1.2]范围内,即为人脸框长宽的0.8~1.2倍。引入偏移量delta范围设为[(x1|y1)*-0.2,(w1|h1)*0.2],即偏移量为人脸框长宽的-0.2~0.2倍,此时人脸框中心点坐标加上偏移量坐标再减去size/2即为正样本框的左顶点坐标。

4、part样本与pos样本制作方法一致,在每个人脸框周围选择IoU小于0.65大于0.3的样本框即可。

5、在保存回归框坐标的时候保存的是偏移量坐标,即现在的框与真实的人脸框的偏移量,所以令nx1为样本框的x坐标,须计算offset_x1 = (x1 - nx1) / float(size),保存。

(gen_classify_regression_data.py):

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
from core.tool import IoU
import numpy as np
from numpy.random import randint
import cv2
import os
import time

def main():

    f1 = open(os.path.join(save_dir, 'pos_%d.txt'%(img_size)), 'w')
    f2 = open(os.path.join(save_dir, 'neg_%d.txt'%(img_size)), 'w')
    f3 = open(os.path.join(save_dir, 'par_%d.txt'%(img_size)), 'w')    
    
    with open(WIDER_spilt_dir) as filenames:
        p=0
        neg_idx=0
        pos_idx=0
        par_idx=0
        for line in filenames.readlines():
            line=line.strip().split(' ')
            if(p==0):
                pic_dir=line[0]
                p=1
                boxes=[]
            elif(p==1):
                k=int(line[0])
                p=2
            elif(p==2):
                b=[]            
                k=k-1
                if(k==0):
                    p=0                
                for i in range(4):
                    b.append(int(line[i]))
                boxes.append(b)
                # format of boxes is [x,y,w,h]
                if(p==0):
                    img=cv2.imread(os.path.join(WIDER_dir,pic_dir).replace('/','\\'))
                    h,w,c=img.shape
                    
                    #save num negative pics whose IoU less than 0.3
                    num=50
                    while(num):
                        size=randint(12,min(w,h)/2)
                        x=randint(0,w-size)
                        y=randint(0,h-size)
                        if(np.max(IoU(np.array([x,y,x+size,y+size]),np.array(boxes)))<0.3):
                            resized_img = cv2.resize(img[y:y+size,x:x+size,:], (img_size, img_size))
                            cv2.imwrite(os.path.join(negative_dir,'neg_%d.jpg'%(neg_idx)),resized_img)
                            f2.write(os.path.join(negative_dir,'neg_%d.jpg'%(neg_idx)) + ' 0\n')
                            neg_idx=neg_idx+1
                            num=num-1       

                    for box in boxes:
                        if((box[0]<0)|(box[1]<0)|(max(box[2],box[3])<20)|(min(box[2],box[3])<=5)): 
                            continue  
                        x1, y1, w1, h1 = box
                        
                        # crop images near the bounding box if IoU less than 0.3, save as negative samples
                        for i in range(10):
                            size = randint(12, min(w, h) / 2)
                            delta_x = randint(max(-size, -x1), w1)
                            delta_y = randint(max(-size, -y1), h1)
                            nx1 = int(max(0, x1 + delta_x))
                            ny1 = int(max(0, y1 + delta_y))
                            if((nx1 + size > w1)|(ny1 + size > h1)):
                                continue
                            if(np.max(IoU(np.array([nx1,ny1,nx1+size,ny1+size]),np.array(boxes)))<0.3):
                                resized_img = cv2.resize(img[y:y+size,x:x+size,:], (img_size, img_size))
                                cv2.imwrite(os.path.join(negative_dir,'neg_%d.jpg'%(neg_idx)),resized_img)
                                f2.write(os.path.join(negative_dir,'neg_%d.jpg'%(neg_idx)) + ' 0\n')
                                neg_idx=neg_idx+1
                                
                        #save num positive&part face whose IoU more than 0.65|0.4         
                        box_ = np.array(box).reshape(1, -1)
                        for i in range(10):
                            size=randint(np.floor(0.8*min(w1,h1)),np.ceil(1.25*max(w1,h1))+1)
                        
                            delta_w = randint(-w1 * 0.2, w1 * 0.2 + 1)
                            delta_h = randint(-h1 * 0.2, h1 * 0.2 + 1)
                            # random face box
                            nx1 = int(max(x1 + w1 / 2 + delta_w - size / 2, 0))
                            ny1 = int(max(y1 + h1 / 2 + delta_h - size / 2, 0))
                            nx2 = nx1 + size
                            ny2 = ny1 + size
                            
                            if( nx2 > w | ny2 > h):
                                continue 
                                
                            offset_x1 = (x1 - nx1) / float(size)
                            offset_y1 = (y1 - ny1) / float(size)
                            offset_x2 = (x1+w1 - nx2) / float(size)
                            offset_y2 = (y1+h1 - ny2) / float(size)                                
    
                            if(IoU(np.array([nx1,ny1,nx2,ny2]),box_)>0.65):                     
                                resized_img = cv2.resize(img[ny1:ny2,nx1:nx2,:], (img_size, img_size))
                                cv2.imwrite(os.path.join(positive_dir,'pos_%d.jpg'%(pos_idx)),resized_img)
                                f1.write(os.path.join(positive_dir,'pos_%d.jpg'%(pos_idx)) + ' 1 %.2f %.2f %.2f %.2f\n'%(offset_x1,offset_y1,offset_x2,offset_y2))
                                pos_idx=pos_idx+1   
                                
                            elif(IoU(np.array([nx1,ny1,nx2,ny2]),box_)>0.4):
                                resized_img = cv2.resize(img[ny1:ny2,nx1:nx2,:], (img_size, img_size))
                                cv2.imwrite(os.path.join(par_dir,'par_%d.jpg'%(par_idx)),resized_img)
                                f3.write(os.path.join(par_dir,'par_%d.jpg'%(par_idx)) + ' -1 %.2f %.2f %.2f %.2f\n'%(offset_x1,offset_y1,offset_x2,offset_y2))                           
                                par_idx=par_idx+1 
        print("pics all done,neg_pics %d in total,pos_pics %d in total,par_pics %d in total"%(neg_idx,pos_idx,par_idx))
        
    f1.close()
    f2.close()
    f3.close()  


if __name__=="__main__":
    
    img_size=12
    
    WIDER_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\prepare_data\\WIDER_train\\images"
    WIDER_spilt_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\prepare_data\\wider_face_split\\wider_face_train_bbx_gt.txt"
    negative_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\negative"%(img_size)
    positive_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\positive"%(img_size)
    par_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\part"%(img_size)
    save_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d"%(img_size)
    
    if not os.path.exists(positive_dir):
        os.makedirs(positive_dir)
    if not os.path.exists(par_dir):
        os.makedirs(par_dir)
    if not os.path.exists(negative_dir):
        os.makedirs(negative_dir) 
        
    begin=time.time()
    
    main()

    print(time.time()-begin)
#6841.530851840973

其次进行人脸关键点样本制作,需要生成landmark样本。

landmark人脸框生成与pos样本框框生成方法一致,landmark点也需要记录为相对于样本框的相对值,所以需要计算当前值与左顶点的值相减再除以样本框的size:(landmark[i][0]-box[0])/(box[1]-box[0])。因为landmark数据集数据不够,所以我仅仅用了镜像图像的数据增强。

(gen_landmark_data.py)

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
from core.tool import IoU,flip
import numpy as np
import random
from numpy.random import randint
import cv2
import os
import time

def main():
    
    f4 = open(os.path.join(save_dir, 'land_%d.txt'%(img_size)), 'w')

    with open(pic_spilt_dir) as filenames:
        land_idx=0
        for line in filenames:
            img_list=[]
            mark_list=[]
            line=line.strip().split(' ')
            img=cv2.imread(os.path.join(lfw_dir,line[0]))
            box=(line[1],line[2],line[3],line[4])
            box=[int(_) for _ in box]
            #format of box is [x,x+w,y,y+h]
            height,weight,channel=img.shape
            landmark=np.zeros((5,2))
            for i in range(5):
                mark=(float(line[5+2*i]),float(line[5+2*i+1]))
                landmark[i]=mark

            facemark=np.zeros((5,2))
            for i in range(5):
                mark=((landmark[i][0]-box[0])/(box[1]-box[0]),(landmark[i][1]-box[2])/(box[3]-box[2]))
                facemark[i]=mark
            img_list.append(cv2.resize(img[box[2]:box[3],box[0]:box[1]], (img_size, img_size)))  
            mark_list.append(facemark.reshape(10))

            box_=[box[0],box[2],box[1],box[3]]
            #format of box is [x,y,x+w,y+h]      
            x1,y1,x2,y2=box_
            w=x2-x1+1
            h=y2-y1+1

            if((x1<0)|(y1<0)|(max(w,h)<40)|(min(w,h)<=5)): 
                continue          
            num=40
            while(num):

                size=randint(np.floor(0.8*min(w,h)),np.ceil(1.25*max(w,h))+1)

                delta_w = randint(-w * 0.2, w * 0.2 + 1)
                delta_h = randint(-h * 0.2, h * 0.2 + 1)
                # random face box
                nx1 = int(max(x1 + w / 2 + delta_w - size / 2, 0))
                ny1 = int(max(y1 + h / 2 + delta_h - size / 2, 0))
                nx2 = nx1 + size
                ny2 = ny1 + size 

                if( nx2 > weight | ny2 > height):
                    continue               

                _box=[x1,y1,w,h]
                _box=np.array(_box).reshape(1,-1)     
                if(IoU(np.array([nx1,ny1,nx2,ny2]),_box)>0.65): 
                    facemark=np.zeros((5,2))
                    for i in range(5):
                        mark=((landmark[i][0]-nx1)/size,(landmark[i][1]-ny1)/size)
                        facemark[i]=mark  
                    img_list.append(cv2.resize(img[ny1:ny2,nx1:nx2,:], (img_size, img_size)))  
                    mark_list.append(facemark.reshape(10))

                    #mirro
                    mirro_mark=facemark.copy()
                    if(random.choice([0,1])):
                        img1,mirro_mark=flip(img[ny1:ny2,nx1:nx2,:],mirro_mark)
                        img_list.append(cv2.resize(img1, (img_size, img_size)))  
                        mark_list.append(mirro_mark.reshape(10))  

                    num=num-1
            for i in range(len(img_list)):

                if np.sum(np.where(mark_list[i] <= 0, 1, 0)) > 0:
                    continue

                if np.sum(np.where(mark_list[i] >= 1, 1, 0)) > 0:
                    continue

                cv2.imwrite(os.path.join(landmark_dir,'land_%d.jpg'%(land_idx)),img_list[i])
                mark=[str(_)for _ in mark_list[i]]
                f4.write(os.path.join(landmark_dir,'land_%d.jpg'%(land_idx)) +' -2 '+' '.join(mark)+'\n')
                land_idx=land_idx+1

    f4.close()    

if __name__=="__main__":

    img_size=12
    
    #change img_size to P=12 R=24 O=48 net
    
    begin=time.time()

    lfw_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\prepare_data"
    pic_spilt_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\prepare_data\\trainImageList.txt"
    landmark_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\landmark"%(img_size)
    save_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d"%(img_size)

    if not os.path.exists(landmark_dir):
        os.makedirs(landmark_dir)    

    main()
    
    print(time.time()-begin)

制作完成后生成了:

neg 645017
par 507206
pos 285560
land 584332

接下来制作TFRecord

制作过程中发现了一个问题,兴许是anaconda的bug由jupyter notebook制作出的文件总会出现DATALOSS的错误,详情请看我的另一篇问题解答,使用Spyder IDE即可解决:

(gen_tfrecord.py)

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
import tensorflow as tf 
import cv2
import random
import time

def main():
    
    t_time=time.time()   
    for index,term in enumerate(terms):
        num=0
        print("%s start"%(term))
        with tf.python_io.TFRecordWriter("E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\%s_train.tfrecords"%(img_size,term)) as writer:
            with open(r'E:\friedhelm\object\face_detection_MTCNN\DATA\%d\%s.txt'%(img_size,term)) as readlines:
                readlines=[line.strip().split(' ') for line in readlines]
                random.shuffle(readlines)
                for i,line in enumerate(readlines):
                    if(i%10000==0):
                        print(i,time.time()-t_time)
                        t_time=time.time()
                    img=cv2.imread(line[0].replace('/','\\'))
                    if(img is None):
                        continue
                    img_raw = img.tobytes()
                    label=int(line[1])
                    roi=[0.0]*4               
                    landmark=[0.0]*10
                    if(len(line)==6):    
                        roi=[float(_) for _ in line[2:6]]   
                    if(len(line)==12):
                        landmark=[float(_) for _ in line[2:12]]                  
                    example = tf.train.Example(features=tf.train.Features(feature={
                        'img': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])),
                        "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),
                        "roi": tf.train.Feature(float_list=tf.train.FloatList(value=roi)),
                        "landmark": tf.train.Feature(float_list=tf.train.FloatList(value=landmark)),
                    }))
                    writer.write(example.SerializeToString())  #序列化为字符串  
                    num+=1
                    if(num==base*scale[index]):
                        print("%s finish"%(term))
                        break

if __name__=="__main__":
    
    img_size=12
    #change img_size to P=12 R=24 O=48 net
    terms=['neg_%d'%(img_size),'pos_%d'%(img_size),'par_%d'%(img_size),'land_%d'%(img_size)]
    scale=[3,1,1,2]
    
    #set base number of pos_pic    
    base=200000

    begin=time.time()

    main()
    
    print(time.time()-begin)

照例,挨个测试一下TFRecord文件,以免训练的时候出幺蛾子

(test_TFRecord.py)

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
import tensorflow as tf

filename_queue = tf.train.string_input_producer(["E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\12\\neg_12_train.tfrecords"],shuffle=True,num_epochs=1)

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue) #返回文件名和文件

features = tf.parse_single_example(serialized_example,
                               features={
                               'img':tf.FixedLenFeature([],tf.string),
                               'label':tf.FixedLenFeature([],tf.int64),                                   
                               'roi':tf.FixedLenFeature([4],tf.float32),
                               'landmark':tf.FixedLenFeature([10],tf.float32),
                               })
img=tf.decode_raw(features['img'],tf.uint8)
label=tf.cast(features['label'],tf.int32)
roi=tf.cast(features['roi'],tf.float32)
landmark=tf.cast(features['landmark'],tf.float32)
img = tf.reshape(img, [48,48,3])   
#     img=img_preprocess(img)
min_after_dequeue = 10000
batch_size = 64
capacity = min_after_dequeue + 10 * batch_size
image_batch, label_batch, roi_batch, landmark_batch = tf.train.shuffle_batch([img,label,roi,landmark], 
                                                    batch_size=batch_size, 
                                                    capacity=capacity, 
                                                    min_after_dequeue=min_after_dequeue,
                                                    num_threads=7)  


i=0
with tf.Session() as sess:
    sess.run((tf.global_variables_initializer(),
              tf.local_variables_initializer()))
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess,coord=coord)    
    while(1):
        i=i+1
        if(i%9==1):
            print(sess.run(label_batch))

至此我们PNet数据集制作就完成了。


3、Pnet训练

首先介绍网络结构:

细节方面原文中使用了prelu,仅使用了原生API,根据网络搭就行了,唯一需要注意的细节就是pool的padding模式。

MTCNN网络结构如图1所示:

<figcaption> 图1 MTCNN网络结构 </figcaption>


(MTCNN_model.py):

import tensorflow as tf

def prelu(inputs):

    with tf.variable_scope('prelu'):
        alphas = tf.get_variable("alphas", shape=inputs.get_shape()[-1], dtype=tf.float32, initializer=tf.constant_initializer(0.25))
        pos = tf.nn.relu(inputs)
        neg = alphas * (inputs-abs(inputs))*0.5
    
    return pos + neg


def conv2d(_input,name,conv_size,conv_stride,bias_size,pad,activation='prelu'):
    
    regularizer=tf.contrib.layers.l2_regularizer(0.0005)
    with tf.variable_scope(name):
        weight=tf.get_variable('weight',conv_size,initializer=tf.truncated_normal_initializer(stddev=0.1))
        bias=tf.get_variable('bias',bias_size,initializer=tf.constant_initializer(0.0))
        weight_loss=regularizer(weight)
        tf.add_to_collection('loss',weight_loss)
        conv=tf.nn.conv2d(_input,weight,strides=conv_stride,padding=pad)
        he=tf.nn.bias_add(conv,bias)
        relu=tf.cond(tf.equal(activation,'prelu'),lambda:prelu(he),lambda:tf.cond(tf.equal(activation,'softmax'),lambda:tf.nn.softmax(he),lambda:he),name='output')     
    
    return relu


def fc2d(_input,name,fc_size,bias_size,activation='prelu'):
    
    regularizer=tf.contrib.layers.l2_regularizer(0.0005)
    with tf.variable_scope(name):
        weight=tf.get_variable('weight',fc_size,initializer=tf.truncated_normal_initializer(stddev=0.1))
        bias=tf.get_variable('bias',bias_size,initializer=tf.constant_initializer(0.0))
        weight_loss=regularizer(weight)
        tf.add_to_collection('weight_loss',weight_loss)
        he=tf.nn.bias_add(tf.matmul(_input,weight,name='matmul'),bias)
        relu=tf.cond(tf.equal(activation,'prelu'),lambda:prelu(he),lambda:tf.cond(tf.equal(activation,'softmax'),lambda:tf.nn.softmax(he),lambda:he))     
    
    return relu


def pool(_input,name,kernal_size,kernal_stride,pad):
    
    with tf.variable_scope(name):  
        pool=tf.nn.max_pool(_input,ksize=kernal_size,strides=kernal_stride,padding=pad)  
        
    return pool


def Pnet_model(x,batch_size):
    
    conv_1=conv2d(x,'conv_1',[3,3,3,10],[1,1,1,1],[10],'VALID')
    pool_1=pool(conv_1,'pool_1',[1,3,3,1],[1,2,2,1],'SAME')     
    conv_2=conv2d(pool_1,'conv_2',[3,3,10,16],[1,1,1,1],[16],'VALID')
    conv_3=conv2d(conv_2,'conv_3',[3,3,16,32],[1,1,1,1],[32],'VALID')
    
    face_label=conv2d(conv_3,'face_label',[1,1,32,2],[1,1,1,1],[2],'VALID','softmax')
    bounding_box=conv2d(conv_3,'bounding_box',[1,1,32,4],[1,1,1,1],[4],'VALID','None')
    landmark_local=conv2d(conv_3,'landmark_local',[1,1,32,10],[1,1,1,1],[10],'VALID','None')      
        
    return face_label, bounding_box ,landmark_local


def Rnet_model(x,batch_size):
    
    conv_1=conv2d(x,'conv_1',[3,3,3,28],[1,1,1,1],[28],'VALID')
    pool_1=pool(conv_1,'pool_1',[1,3,3,1],[1,2,2,1],'SAME')
    conv_2=conv2d(pool_1,'conv_2',[3,3,28,48],[1,1,1,1],[48],'VALID')
    pool_2=pool(conv_2,'pool_2',[1,3,3,1],[1,2,2,1],'VALID')    
    conv_3=conv2d(pool_2,'conv_3',[2,2,48,64],[1,1,1,1],[64],'VALID')
    
    resh1 = tf.reshape(conv_3, [batch_size,3*3*64], name='resh1')
    
    fc_1=fc2d(resh1,'fc_1',[3*3*64,128],[128])    

    face_label=fc2d(fc_1,'face_label',[128,2],[2],'softmax')
    bounding_box=fc2d(fc_1,'bounding_box',[128,4],[4],'None')
    landmark_local=fc2d(fc_1,'landmark_local',[128,10],[10],'None')

    return face_label, bounding_box ,landmark_local


def Onet_model(x,batch_size):
    
    conv_1=conv2d(x,'conv_1',[3,3,3,32],[1,1,1,1],[32],'VALID')
    pool_1=pool(conv_1,'pool_1',[1,3,3,1],[1,2,2,1],'SAME')
    conv_2=conv2d(pool_1,'conv_2',[3,3,32,64],[1,1,1,1],[64],'VALID')
    pool_2=pool(conv_2,'pool_2',[1,3,3,1],[1,2,2,1],'VALID')    
    conv_3=conv2d(pool_2,'conv_3',[3,3,64,64],[1,1,1,1],[64],'VALID')
    pool_3=pool(conv_3,'pool_3',[1,2,2,1],[1,2,2,1],'SAME')  
    conv_4=conv2d(pool_3,'conv_4',[2,2,64,128],[1,1,1,1],[128],'VALID')   

    resh1 = tf.reshape(conv_4, [batch_size,3*3*128], name='resh1')
    
    fc_1=fc2d(resh1,'fc_1',[3*3*128,256],[256])
           
    face_label=fc2d(fc_1,'face_label',[256,2],[2],'softmax')
    bounding_box=fc2d(fc_1,'bounding_box',[256,4],[4],'None')
    landmark_local=fc2d(fc_1,'landmark_local',[256,10],[10],'None')

    return face_label, bounding_box ,landmark_local


由于MTCNN在训练的时候loss需要分类,所以只能自己创建loss函数。

在制作数据的时候,neg的label设置为0,pos和part的label设置为1和-1,land的label设置为2。

我们会根据这个label值计算相应的loss,label_los函数只计算label为0的loss,其他的loss设为0;roi_los函数只计算label为1和-1的loss,其他的loss设为0;landmark_los函数只计算label为2的loss,其他的loss设为0。具体tf代码语法不细讲了,百度一下都能看懂。

细节方面,在label_los函数中应用了online hard sample mining,即在训练阶段只选用前70%的labl loss。

(train_tool.py):

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
import tensorflow as tf

def label_los(pre_label,act_label):
    
    ratio=tf.constant(0.7)
    zeros=tf.zeros_like(act_label,dtype=tf.int32)
    valid_label=tf.where(tf.less(act_label,0),zeros,act_label)

    column_num=tf.shape(pre_label,out_type=tf.int32)[0]
    pre_label=tf.squeeze(tf.reshape(pre_label,(1,-1)))
    column=tf.range(0,column_num)*2 
    column_to_stay=column+valid_label

    pre_label=tf.squeeze(tf.gather(pre_label,column_to_stay))
    loss = -tf.log(pre_label+1e-10)      
    ones=tf.ones_like(act_label,dtype=tf.float32)
    zero=tf.zeros_like(act_label,dtype=tf.float32)
    valid_colunm = tf.where(act_label < zeros,zero,ones)  
    
    num_column=tf.reduce_sum(valid_colunm)
    num=tf.cast(num_column*ratio,dtype=tf.int32)
    loss=tf.multiply(loss,valid_colunm,'label_los')
    loss,_=tf.nn.top_k(loss,num)
    
    return tf.reduce_mean(loss)
    

def roi_los(label,pre_box,act_box) :    
    
    zeros=tf.zeros_like(label,dtype=tf.float32)
    ones=tf.ones_like(label,dtype=tf.float32)    
    valid_label=tf.where(tf.equal(abs(label),1),ones,zeros)
    loss=tf.reduce_sum(tf.square(act_box-pre_box),axis=1)
    loss=tf.multiply(loss,valid_label,'roi_los')
    return tf.reduce_mean(loss) 
    

def landmark_los(label,pre_landmark,act_landmark):    
    
    zeros=tf.zeros_like(label,dtype=tf.float32)
    ones = tf.ones_like(label,dtype=tf.float32)
    valid_label=tf.where(tf.equal(label,-2),ones,zeros)
    loss=tf.reduce_sum(tf.square(act_landmark-pre_landmark),axis=1)
    loss=tf.multiply(loss,valid_label,'landmark_los')
    return tf.reduce_mean(loss)     
    

def cal_accuracy(cls_prob,label):
       
    pred = tf.argmax(cls_prob,axis=1)
    label_int = tf.cast(label,tf.int64)
    cond = tf.where(tf.greater_equal(label_int,0))
    picked = tf.squeeze(cond)
    label_picked = tf.gather(label_int,picked)
    pred_picked = tf.gather(pred,picked)
    accuracy_op = tf.reduce_mean(tf.cast(tf.equal(label_picked,pred_picked),tf.float32))
    
    return accuracy_op

在介绍完 loss函数后,开始训练。

(train.py):

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
import tensorflow as tf
import time
from core.tool import read_multi_tfrecords,image_color_distort
from core.MTCNN_model import Pnet_model,Rnet_model,Onet_model
from train.train_tool import label_los,roi_los,landmark_los,cal_accuracy
import os

def train(image,label,roi,landmark,model,model_name):
    
    _label, _roi ,_landmark=model(image,batch)
    
    with tf.name_scope('output'):
        _label=tf.squeeze(_label,name='label')
        _roi=tf.squeeze(_roi,name='roi')
        _landmark=tf.squeeze(_landmark,name='landmark')
        
    _label_los=label_los(_label,label)
    _box_los=roi_los(label,_roi,roi)    
    _landmark_los=landmark_los(label,_landmark,landmark)
    
    function_loss=_label_los*ratio[0]+_box_los*ratio[1]+_landmark_los*ratio[2]

    tf.add_to_collection("loss", function_loss)
    loss_all=tf.get_collection('loss')
    
    with tf.name_scope('loss'):
        loss=tf.reduce_sum(loss_all)
        tf.summary.scalar('loss',loss) 
        
    opt=tf.train.AdamOptimizer(learning_rate).minimize(loss)
    
    with tf.name_scope('accuracy'):
        train_accuracy=cal_accuracy(_label,label)
        tf.summary.scalar('accuracy',train_accuracy) 

    saver=tf.train.Saver(max_to_keep=10)
    merged=tf.summary.merge_all() 
    
    images,labels,rois,landmarks=read_multi_tfrecords(addr,batch_size,img_size)   
    images=image_color_distort(images)
    
    with tf.Session() as sess:
        sess.run((tf.global_variables_initializer(),
                  tf.local_variables_initializer()))
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess,coord=coord)
        image_batch,label_batch,roi_batch,landmark_batch=sess.run([images,labels,rois,landmarks])
        
        writer_train=tf.summary.FileWriter('C:\\Users\\312\\Desktop\\',sess.graph)
        try:
            
            for i in range(1,train_step):
                
                image_batch,label_batch,roi_batch,landmark_batch=sess.run([images,labels,rois,landmarks])
                
                sess.run(opt,feed_dict={image:image_batch,label:label_batch,roi:roi_batch,landmark:landmark_batch})
                if(i%100==0):
                    summary=sess.run(merged,feed_dict={image:image_batch,label:label_batch,roi:roi_batch,landmark:landmark_batch})
                    writer_train.add_summary(summary,i) 
                if(i%1000==0):
                    print('次数',i)    
                    print('train_accuracy',sess.run(train_accuracy,feed_dict={image:image_batch,label:label_batch,roi:roi_batch,landmark:landmark_batch}))
                    print('loss',sess.run(loss,{image:image_batch,label:label_batch,roi:roi_batch,landmark:landmark_batch}))               
                    print('time',time.time()-begin)
                    if(i%10000==0):
                        saver.save(sess,"E:\\friedhelm\\object\\face_detection_MTCNN\\%s\\%s.ckpt"%(model_name,model_name),global_step=i)
        except  tf.errors.OutOfRangeError:
            print("finished")
        finally:
            coord.request_stop()
            writer_train.close()
        coord.join(threads)
    
def main(model):
    
    with tf.name_scope('input'):
        image=tf.placeholder(tf.float32,name='image')
        label=tf.placeholder(tf.int32,name='label')
        roi=tf.placeholder(tf.float32,name='roi')
        landmark = tf.placeholder(tf.float32,name='landmark')  

    train(image,label,roi,landmark,model,model_name)

if __name__=='__main__':
    
    img_size=12
    batch=448
    batch_size=[192,64,64,128]
    addr=["E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\neg_%d_train.tfrecords"%(img_size,img_size),
          "E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\pos_%d_train.tfrecords"%(img_size,img_size),
          "E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\par_%d_train.tfrecords"%(img_size,img_size),
          "E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\land_%d_train.tfrecords"%(img_size,img_size)]  

    model=Pnet_model
    model_name="Pnet_model"    
    train_step=100001
    learning_rate=0.001
    
    save_model_path="E:\\friedhelm\\object\\face_detection_MTCNN\\%s"%(model_name)
    
    if not os.path.exists(save_model_path):
        os.makedirs(save_model_path) 
        
    if(model_name=="Onet_model"):
        ratio=[1,0.5,1]
    else:
        ratio=[1,0.5,0.5]
    

    begin=time.time()        
    main(model)
    
# tensorboard --logdir=C:\\Users\\312\\Desktop\\

训练曲线如图2所示:

<figcaption> 图2 Pnet训练曲线 </figcaption>


由于网络较浅,所以训练速度很快,在训练的时候对每个输入的图片都进行了饱和度等变换。由图可看出在6W次的时候已经收敛,由于我在训练时加入了L2正则化,所以说loss值还是偏高,不过是正常的。读取TFRecord文件时需要将4个文件整合为一个,引用一个整合函数就行了。

至此Pnet完毕~


4、Rnet

Rnet数据集需要依赖于Pnet的输出结果,所以我们将WIDER_face_train的整张图片挨个输入进去,让Pnet识别,识别出的结果与人脸框相比较,IoU小于0.3的为负样本,以此类推生成neg、pos、part样本。

在输入训练图片的时候,需要将原图片做成高斯金字塔的形式,以识别不同大小的人脸,原论文中选择的金字塔factor为0.79,最小识别人脸min_face_size为10,本文沿用。

在制作Rnet数据集时本文代码尚未完成封装,所以特别简陋,大家凑合看吧。。


(gen_hard_sample_R24.py):

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
#159424张人脸
import tensorflow as tf 
import numpy as np
import cv2
import time
from core.tool import IoU,NMS,featuremap
import os

def main():
    saver=tf.train.import_meta_graph(graph_path)  
    
    f1 = open(os.path.join(save_dir, 'pos_%d.txt'%(img_size)), 'w')
    f2 = open(os.path.join(save_dir, 'neg_%d.txt'%(img_size)), 'w')
    f3 = open(os.path.join(save_dir, 'par_%d.txt'%(img_size)), 'w')   

    with tf.Session() as sess: 

        saver.restore(sess,model_path)
        graph = tf.get_default_graph()

        with open(WIDER_spilt_dir) as filenames:
            p=0
            idx=0
            neg_idx=0
            pos_idx=0
            par_idx=0
            for line in filenames.readlines():
                line=line.strip().split(' ')
                if(p==0):
                    pic_dir=line[0]
                    p=1
                    boxes=[]
                elif(p==1):
                    k=int(line[0])
                    p=2
                elif(p==2):
                    b=[]            
                    k=k-1
                    if(k==0):
                        p=0                
                    for i in range(4):
                        b.append(int(line[i]))
                    boxes.append(b)
                    # format of boxes is [x,y,w,h]
                    if(p==0):
                        img=cv2.imread(os.path.join(WIDER_dir,pic_dir).replace('/','\\'))
                        h,w,c=img.shape
                        print(pic_dir)
                        if(min(h,w)<20):
                            continue

                        scales=[]
                        total_box=[]
                        pro=map_shape/min_face_size
                        small=min(img.shape[0:2])*pro
                        

                        while small>=12:
                            scales.append(pro)
                            pro*=factor
                            small*=factor  

                        for scale in scales:

                            scale_img=cv2.resize(img,((int(img.shape[1]*scale)),(int(img.shape[0]*scale))))
                            bounding_boxes=featuremap(sess,graph,scale_img,scale,map_shape,stride,threshold)

                            if(bounding_boxes):
                                for box in bounding_boxes:
                                    total_box.append(box)

                        NMS_box=NMS(total_box,0.7)
                        neg_num=0
                        for box_ in NMS_box:

                            box=box_.copy()                        
                            if((box[0]<0)|(box[1]<0)|(box[2]>w)|(box[3]>h)|(box[2]-box[0]<=min_face_size)|(box[3]-box[1]<=min_face_size)): 
                                continue  
                            # format of total_box: [x1,y1,x2,y2,score,offset_x1,offset_y1,offset_x2,offset_y2,10*landmark]  

                            t_box=[0]*4
                            t_w=box[2]-box[0]+1
                            t_h=box[3]-box[1]+1

                            t_box[0]=box[5]*t_w+box[0]
                            t_box[1]=box[6]*t_h+box[1]                     
                            t_box[2]=box[7]*t_w+box[2]    
                            t_box[3]=box[8]*t_h+box[3]                        
                            # calculate ground truth predict-face boxes

                            if((t_box[0]<0)|(t_box[1]<0)|(t_box[2]>w)|(t_box[3]>h)|(t_box[2]-t_box[0]<=min_face_size)|(t_box[3]-t_box[1]<=min_face_size)): 
                                continue 

                            ti_box=t_box.copy()
                            ti_box=[int(_) for _ in ti_box]

                            Iou=IoU(np.array(t_box),np.array(boxes))

                            if((np.max(Iou)<0.3)&(neg_num<60)):
                                resized_img = cv2.resize(img[ti_box[1]:ti_box[3],ti_box[0]:ti_box[2],:], (img_size,img_size))
                                cv2.imwrite(os.path.join(negative_dir,'neg_%d.jpg'%(neg_idx)),resized_img)
                                f2.write(os.path.join(negative_dir,'neg_%d.jpg'%(neg_idx)) + ' 0\n')
                                neg_idx=neg_idx+1
                                neg_num=neg_num+1

                            else:
                                x1,y1,w1,h1=boxes[np.argmax(Iou)]

                                offset_x1 = (x1 - t_box[0]) / float(t_box[2]-t_box[0]+1)
                                offset_y1 = (y1 - t_box[1]) / float(t_box[3]-t_box[1]+1)
                                offset_x2 = (x1+w1 - t_box[2]) / float(t_box[2]-t_box[0]+1)
                                offset_y2 = (y1+h1 - t_box[3]) / float(t_box[3]-t_box[1]+1)                         

                                if(np.max(Iou)>0.65):                    
                                    resized_img = cv2.resize(img[ti_box[1]:ti_box[3],ti_box[0]:ti_box[2],:], (img_size, img_size))
                                    cv2.imwrite(os.path.join(positive_dir,'pos_%d.jpg'%(pos_idx)),resized_img)
                                    f1.write(os.path.join(positive_dir,'pos_%d.jpg'%(pos_idx)) + ' 1 %.2f %.2f %.2f %.2f\n'%(offset_x1,offset_y1,offset_x2,offset_y2))
                                    pos_idx=pos_idx+1   

                                elif(np.max(Iou)>0.4):
                                    resized_img = cv2.resize(img[ti_box[1]:ti_box[3],ti_box[0]:ti_box[2],:], (img_size, img_size))
                                    cv2.imwrite(os.path.join(par_dir,'par_%d.jpg'%(par_idx)),resized_img)
                                    f3.write(os.path.join(par_dir,'par_%d.jpg'%(par_idx)) + ' -1 %.2f %.2f %.2f %.2f\n'%(offset_x1,offset_y1,offset_x2,offset_y2))                           
                                    par_idx=par_idx+1 
                        idx+=1
                        if(idx%100==0):
                            print('idx: ',idx," ;neg_idx: ",neg_idx," ;pos_idx: ",pos_idx," ;par_idx: ",par_idx)
                            print(time.time()-begin)
        print("pics all done,neg_pics %d in total,pos_pics %d in total,par_pics %d in total"%(neg_idx,pos_idx,par_idx))

    f1.close()
    f2.close()
    f3.close()  


if __name__=="__main__":
    
    begin=time.time()
    img_size=24
    map_shape=12
    min_face_size=20
    stride=2
    factor=0.79
    threshold=0.8

    graph_path='E:\\friedhelm\\object\\face_detection_MTCNN\\\Pnet_model\\Pnet_model.ckpt-60000.meta'
    model_path='E:\\friedhelm\\object\\face_detection_MTCNN\\\Pnet_model\\Pnet_model.ckpt-60000'
    
    WIDER_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\prepare_data\\WIDER_train\\images"
    WIDER_spilt_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\prepare_data\\wider_face_split\\wider_face_train_bbx_gt.txt"
    negative_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\negative"%(img_size)
    positive_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\positive"%(img_size)
    par_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d\\part"%(img_size)
    save_dir="E:\\friedhelm\\object\\face_detection_MTCNN\\DATA\\%d"%(img_size)

    if not os.path.exists(positive_dir):
        os.makedirs(positive_dir)
    if not os.path.exists(par_dir):
        os.makedirs(par_dir)
    if not os.path.exists(negative_dir):
        os.makedirs(negative_dir)    
    
    main()
    
    print(time.time()-begin)    
    
#pics all done,neg_pics 758503 in total,pos_pics 285017 in total,par_pics 572771 in total
# 17590.795156002045

Rnet的landmark样本直接调用(gen_landmark_data.py),把其中的img_size改成24运行就行了。

代码运行完毕后,会生成:

neg_pics 758503

pos_pics 285017

par_pics 572771

随后还是生成TFReord,train,都改一下之前代码的参数就行了,在此不细说了。

Rnet训练曲线如下:

<figcaption> 图3 Rnet训练曲线 </figcaption>


如图可看出Rnet5W次已经收敛。至此Rnet完事~

5、Onet

Onet问题是最多的,Onet的数据集应该由Pnet+Rnet级联生成,可是生成的neg样本太少了,只有1W+,所以我干脆就调用了

(gen_classify_regression_data.py)文件生成了Onet数据集,这就相当于给自己挖了一个大坑了。。

其他的过程都一样的,就不复述了。

Onet训练曲线如下:

<figcaption> 图4 Onet训练曲线 </figcaption>

由图可看出5W次也就收敛了。Onet也就草草收场了~

6、人脸检测

这里给出两个封装好的检测器,等我封装好了,那些样本都生成完毕了,所以只能留着检测用了。。

基础检测器detector,负责最基本的调用模型检测任务,返回包含置信度,预测人脸框,人脸框偏移量,人脸关键点的boundingbox。

(detector.py):

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
import tensorflow as tf
import numpy as np
import os

class Detector(object):
    
    def __init__(self,model,model_path,batch_size):
        
        model_name=model_path.split("\\")[-1].split(".")[0]
        if not os.path.exists(model_path+".meta"):
            raise Exception("%s is not exists"%(model_name))
            
        graph=tf.Graph()
        with graph.as_default():
            self.sess=tf.Session()
            self.images=tf.placeholder(tf.float32)
            self.label,self.roi,self.landmark=model(self.images,batch_size) 
            saver=tf.train.Saver()
            saver.restore(self.sess,model_path)

    def predict(self,img,scale,img_size,stride,threshold,boxes):
        """
        used for predict
        
        input : img, scale , img_size , stride , threshold , boxes
        output: boundingbox
        
        format of input  : 
            img       : np.array()
            scale     : float , which img will be resizeed to
            img_size  : int   , size of img
            stride    : int   , stride of the featrue map of model
            threshold : int   , percentage of result to keep
            boxes     : list  , output bounding box of pre-model
        format of output : 
            boundingbox : list of boxes -1*[x1,y1,x2,y2,score,offset_x1,offset_y1,offset_x2,offset_y2,5*(landmark_x,landmark_y)]
        """    
        left=0
        up=0
        boundingBox=[]

        pre_label,pre_box,pre_land=self.sess.run([self.label,self.roi,self.landmark],feed_dict={self.images:img})
        
        pre_label=np.reshape(pre_label,(-1,2)) 
        pre_box=np.reshape(pre_box,(-1,4)) 
        pre_land=np.reshape(pre_land,(-1,10))

        for idx,prob in enumerate(pre_label):

            if prob[1]>threshold:
                biasBox=[]
                if(len(boxes) == 0):
                    biasBox.extend([float(left*stride)/scale,float(up*stride)/scale, float(left*stride+12)/scale, float(up*stride+12)/scale,prob[1]])
                else:
                    biasBox.extend([boxes[idx][0],boxes[idx][1],boxes[idx][2],boxes[idx][3],prob[1]])
                biasBox.extend(pre_box[idx])
                biasBox.extend(pre_land[idx])
                boundingBox.append(biasBox)

            if (len(boxes) == 0):
                #prevent the sliding window to overstep the boundary
                if (left*stride+img_size<img.shape[2]):
                    left+=1
                elif (up*stride+img_size<img.shape[1]): 
                    left=0
                    up+=1
                else : continue

        return boundingBox

检测器mtcnn_detector,提供对外接口,初始化时需要提供模型、模型地址、图像batch_size、金字塔factor,最小识别人脸min_face_size,置信度阈值threshold。

检测时需要对接口输入图像,输出人脸框与人脸关键点。

(mtcnn_detector.py)

# -*- coding: utf-8 -*-
"""
@author: friedhelm

"""
import numpy as np
import cv2
from core.tool import NMS
from detection.detector import Detector
import time

class MTCNN_Detector(object):
    
    def __init__(self,model,model_path,batch_size,factor,min_face_size,threshold):
        
        self.pnet_model=model[0]
        self.rnet_model=model[1]       
        self.onet_model=model[2]        
        self.model_path=model_path        
        self.batch_size=batch_size        
        self.factor=factor
        self.min_face_si***_face_size 
        self.threshold=threshold
    
        
    def calibrate_box(self,img,NMS_box):
        """
        used for calibrating NMS_box
        
        input : boundingbox after nms, img
        output: score , boundingbox after calibrate , landmark_box
        
        format of input  : 
            NMS_box : -1*[x1,y1,x2,y2,score,offset_x1,offset_y1,offset_x2,offset_y2,5*(landmark_x,landmark_y)] 
            img          : np.array()
            
        format of output : 
            score_box    : list of score -1*[score]
            net_box      : list of box   -1*[face_x1,face_x2,face_y1,face_y2]
            landmark_box : list of box   -1*[5*(true_landmark_x,true_landmark_y)]
        """
        net_box=[]
        score_box=[] 
        landmark_box=[]
        h,w,c=img.shape

        
        for box_ in NMS_box:

            box=box_.copy()                        

            if((box[0]<0)|(box[1]<0)|(box[2]>w)|(box[3]>h)|(box[2]-box[0]<self.min_face_size)|(box[3]-box[1]<self.min_face_size)): 
                continue  

            # calibrate the boundingbox

            t_box=[0]*4
            t_w=box[2]-box[0]+1
            t_h=box[3]-box[1]+1

            t_box[0]=box[5]*t_w+box[0]
            t_box[1]=box[6]*t_h+box[1]                     
            t_box[2]=box[7]*t_w+box[2]    
            t_box[3]=box[8]*t_h+box[3]                        
            
            if((t_box[0]<0)|(t_box[1]<0)|(t_box[2]>w)|(t_box[3]>h)|(t_box[2]-t_box[0]<self.min_face_size)|(t_box[3]-t_box[1]<self.min_face_size)): 
                continue 
            
            landmark=np.zeros((5,2))
            for i in range(5):
                landmark[i]=(box_[9+i*2]*(t_box[2]-t_box[0]+1)+t_box[0],box_[9+i*2+1]*(t_box[3]-t_box[1]+1)+t_box[1])
                
            landmark_box.append(landmark)            
            score_box.append(box_[4])
            net_box.append(t_box)     
            
        return score_box,net_box,landmark_box
   

    def detect_Pnet(self,pnet_detector,img):
        """
        input : detector , img
        output: score_box , pnet_box , None
        
        format of input  :
            detector: class detector 
            img     : np.array()
            
        format of output : 
            score_box : list of score                  -1*[score]
            pnet_box  : list of box after calibration  -1*[p_face_x1,p_face_x2,p_face_y1,p_face_y2]
        """        
        factor=self.factor
        pro=12/self.min_face_size
        scales=[]
        total_box=[]
        score_box=[]
        small=min(img.shape[0:2])*pro

        while small>=12:
            scales.append(pro)
            pro*=factor
            small*=factor 
            
        for scale in scales:
            
            crop_img=img
            scale_img=cv2.resize(crop_img,((int(crop_img.shape[1]*scale)),(int(crop_img.shape[0]*scale))))
            scale_img1=np.reshape(scale_img,(-1,scale_img.shape[0],scale_img.shape[1],scale_img.shape[2])) 
            
            bounding_boxes=pnet_detector.predict(scale_img1,scale,img_size=12,stride=2,threshold=self.threshold[0],boxes=[])
            
            if(bounding_boxes):
                for box in bounding_boxes:
                    total_box.append(box)        

        NMS_box=NMS(total_box,0.7)                    
        if(len(NMS_box)==0):
            return None,None,None

        score_box,pnet_box,_=self.calibrate_box(img,NMS_box)
            
        return score_box,pnet_box,None
        

    def detect_Rnet(self,rnet_detector,img,bounding_box):
        """
        input : detector , img , bounding_box
        output: score_box , rnet_box , None
        
        format of input  :
            detector     : class detector 
            img          : np.array()
            bounding_box : list of box output from function(detect_Pnet)  -1*[p_face_x1,p_face_x2,p_face_y1,p_face_y2]
            
        format of output : 
            score_box : list of score                  -1*[score]
            rnet_box  : list of box after calibration  -1*[r_face_x1,r_face_x2,r_face_y1,r_face_y2]
        """        
        score_box=[]        
        scale_img=np.zeros((len(bounding_box),24,24,3))
        
        for idx,box in enumerate(bounding_box):
            scale_img[idx,:,:,:] = cv2.resize(img[int(box[1]):int(box[3]),int(box[0]):int(box[2]),:], (24, 24))
            
        bounding_boxes=rnet_detector.predict(scale_img,scale=1,img_size=24,stride=4,threshold=self.threshold[1],boxes=bounding_box)
                    
        NMS_box=NMS(bounding_boxes,0.6)                    
                    
        if(len(NMS_box)==0):
            return None,None,None

        score_box,rnet_box,_=self.calibrate_box(img,NMS_box)
        
        return score_box,rnet_box,None        

    
    def detect_Onet(self,onet_detector,img,bounding_box):
        """
        input : detector , img , bounding_box
        output: score_box , onet_box , landmark_box
        
        format of input  :
            detector     : class detector 
            img          : np.array()
            bounding_box : list of box output from function(detect_Rnet)  -1*[r_face_x1,r_face_x2,r_face_y1,r_face_y2]
            
        format of output : 
            score_box    : list of score                  -1*[score]
            onet_box     : list of box after calibration  -1*[o_face_x1,o_face_x2,o_face_y1,o_face_y2]
            landmark_box : list of landmark               -1*[5*(o_landmark_x,o_landmark_y)]
        """              
        score_box=[] 
        landmark_box=[]
        
        scale_img=np.zeros((len(bounding_box),48,48,3))
        
        for idx,box in enumerate(bounding_box):
            scale_img[idx,:,:,:] = cv2.resize(img[int(box[1]):int(box[3]),int(box[0]):int(box[2]),:], (48, 48))
            
        bounding_boxes=onet_detector.predict(scale_img,scale=1,img_size=48,stride=8,threshold=self.threshold[2],boxes=bounding_box)
                    
        NMS_box=NMS(bounding_boxes,0.6)                    
                    
        if(len(NMS_box)==0):
            return None,None,None

        score_box,onet_box,landmark_box=self.calibrate_box(img,NMS_box)     

        return score_box,onet_box,landmark_box   
    
    
    def detect_face(self,images):    
        """
        used for detecting face in both batch images and single image
        
        input : images 
        output: face_boxes , landmark_boxes
        
        format of input  :
            img          : np.array() batch_size*single_img
            
        format of output : 
            face_boxes     : list of face_box      batch_size*[face_x1,face_x2,face_y1,face_y2]
            landmark_boxes : list of landmark_box  batch_size*[5*(landmark_x,landmark_y)]
        """
        sign=False 
        bounding_box=[]
        landmark_box=[]
        face_boxes=[]
        landmark_boxes=[]
        detect_begin=time.time()
        
        if(np.size(images.shape)==3):
            sign=True
            img=np.zeros((1,images.shape[0],images.shape[1],images.shape[2]))
            img[0,:,:,:]=images
            images=img 
            
        for img in images:

            if(img is None):
                face_boxes.append([])
                landmark_boxes.append([])     
                continue
            
            if self.pnet_model:
                pt=time.time()
                pnet_detector=Detector(self.pnet_model,self.model_path[0],self.batch_size)
                score_box,bounding_box,landmark_box=self.detect_Pnet(pnet_detector,img)
                
                print("pnet-time: ",time.time()-pt)
                if((bounding_box is None) or (len(bounding_box)==0)):
                    face_boxes.append([])
                    landmark_boxes.append([])                    
                    continue

            if self.rnet_model:
                rt=time.time()
                batch_size=len(bounding_box)
                rnet_detector=Detector(self.rnet_model,self.model_path[1],batch_size)                 
                score_box,bounding_box,landmark_box=self.detect_Rnet(rnet_detector,img,bounding_box)
                
                print("rnet-time: ",time.time()-rt)
                if((bounding_box is None) or (len(bounding_box)==0)):
                    face_boxes.append([])
                    landmark_boxes.append([])                    
                    continue
                   
            if self.onet_model:
                ot=time.time()
                batch_size=len(bounding_box)
                onet_detector=Detector(self.onet_model,self.model_path[2],batch_size)                
                score_box,bounding_box,landmark_box=self.detect_Onet(onet_detector,img,bounding_box)

                print("onet-time: ",time.time()-ot)                
                if((bounding_box is None) or (len(bounding_box)==0)):
                    face_boxes.append([])
                    landmark_boxes.append([])                    
                    continue

            face_boxes.append(bounding_box)
            landmark_boxes.append(landmark_box)
        
        print("detect-time: ",time.time()-detect_begin)
        if(sign):
            return face_boxes[0],landmark_boxes[0]
        else:
            return face_boxes,landmark_boxes
    
    
    def detect_single_face(self,img):    
        """
        used for detecting single face or vidio
        
        input : images 
        output: bounding_box , landmark_box
        
        format of input  :
            img          : np.array() 
            
        format of output : 
            bounding_box : list of box  [face_x1,face_x2,face_y1,face_y2]
            landmark_box : list of box  [5*(landmark_x,landmark_y)]
        """    
        bounding_box=[]
        landmark_box=[]     
        detect_begin=time.time()
        
        if(img is None):
            return [],[]            
        
        if self.pnet_model:
            pt=time.time()
            pnet_detector=Detector(self.pnet_model,self.model_path[0],self.batch_size)
            score_box,bounding_box,landmark_box=self.detect_Pnet(pnet_detector,img)
            
            print("pnet-time: ",time.time()-pt)
            if((bounding_box is None) or (len(bounding_box)==0)):
                return [],[]

        if self.rnet_model:
            rt=time.time()
            batch_size=len(bounding_box)
            rnet_detector=Detector(self.rnet_model,self.model_path[1],batch_size)                 
            score_box,bounding_box,landmark_box=self.detect_Rnet(rnet_detector,img,bounding_box)
            
            print("rnet-time: ",time.time()-rt)
            if((bounding_box is None) or (len(bounding_box)==0)):
                return [],[]
            
        if self.onet_model:
            ot=time.time()
            batch_size=len(bounding_box)
            onet_detector=Detector(self.onet_model,self.model_path[2],batch_size)                
            score_box,bounding_box,landmark_box=self.detect_Onet(onet_detector,img,bounding_box)

            print("onet-time: ",time.time()-ot)            
            if((bounding_box is None) or (len(bounding_box)==0)):
                return [],[]
        
        print("detect-time: ",time.time()-detect_begin)
            
        return bounding_box,landmark_box

采用如下代码测试:

from detection.mtcnn_detector import MTCNN_Detector
from core.MTCNN_model import Pnet_model,Rnet_model,Onet_model
import cv2
import matplotlib.pyplot as plt
import numpy as np
import os

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

def main():
    
    if(model_name in ["Pnet","Rnet","Onet"]):
        model[0]=Pnet_model
    if(model_name in ["Rnet","Onet"]):
        model[1]=Rnet_model
    if(model_name=="Onet"):
        model[2]=Onet_model
        
    detector=MTCNN_Detector(model,model_path,batch_size,factor,min_face_size,threshold)
    kaka=[]
    img=cv2.imread(r"C:\Users\312\Desktop\13.jpg") 
    kaka=np.zeros((1,img.shape[0],img.shape[1],img.shape[2]))
    kaka[0,:,:,:]=img

    face_box,_=detector.detect_single_face(img)

    blue = (255, 0, 0) 
    for a in face_box: 
        cv2.rectangle(img,(int(a[0]),int(a[1])), (int(a[2]), int(a[3])),blue,3,8,0)
        
#     cv2.imwrite("C:\\Users\\312\\Desktop\\MTCNN_test_6.jpg",img)
    cv2.imshow("MTCNN_test_1",img)
    cv2.waitKey(0)
#     cv2.destroyAllWindows()
#     plt.imshow(img)    
#     plt.show()    
    
    
if __name__=="__main__":
    
    factor=0.79
    
    model=[None,None,None]
    #
    threshold=[0.8,0.8,0.6]
    min_face_size=10
    #
    batch_size=1
    model_name="Onet"    
    
    model_path=["E:\\friedhelm\\object\\face_detection_MTCNN\\Pnet_model\\Pnet_model.ckpt-60000",
                "E:\\friedhelm\\object\\face_detection_MTCNN\\Rnet_model\\Rnet_model.ckpt-40000",
                "E:\\friedhelm\\object\\face_detection_MTCNN\\Onet_model\\Onet_model_60000.ckpt",] 
    

    main()

结果如下:

你们是不是以为效果还不错呢?!

我来打自己脸了~

7、讨论

弄东西不是弄出结果糊弄完就拉倒了,还是得讨论一波的~

1、在Pnet生成Rnet训练样本时,为了保证高召回率选择了低阈值,足以产生足够多的正负样本去训练Rnet,而且训练时也无问题。然而当按照原文使用Pnet+Rnet生成Onet训练样本时,输入给Pnet的数据依旧是WIDER_FACE_train数据集,已知Rnet的训练样本是Pnet的输出样本,这里就产生问题了,现在是我在拿Rnet的训练数据去输入给Rnet,相当于变相测试Rnet,这已经产生了严重的数据泄露问题。然而Rnet的作用它不要求泛化能力多么高,只是要求能对Pnet的结果进行力所能及的修正,其实不信的同学可以试一试单独跑一下Pnet和Pnet+Rnet,确实修正了很多误分类;

2、 本人使用的是联想拯救者R720笔记本,搭配GTX 1060 。测试阶段一张图片消耗时间主要取决于Pnet,Rnet与Onet平均0.5s即可计算完毕。一张图片需要3秒多的时间。Pnet时间多都是花在图像金字塔与反复调用模型上,原文中作者使用的GPU和CPU我这两年算是没有福气了,过一阵我会去隔壁屋蹭一下1080ti,看看时间是否会大幅度缩短;

3、召回率主要取决于threshold的取值,由于需要很高的召回率所以threshold取值很低,这就出现了很多的误检,Pnet+Rnet可以屏蔽很多的误检,然而自己训练的Onet作用远没有原文的大,主要原因是在生成Onet数据集时,Pnet+Rnet生成的负样本数太少,导致我需要使用WIDER_train数据集重新生成一些样本来保证样本数,需要着重训练的hard_neg样本仅仅1W+,而我拼凑的neg样本有50多W,这跟没有那些hard_neg样本没啥区别了。。对于漏检,我只能说尽力了,我不可能把阈值和最小人脸调的太低,最小人脸直接增加Pnet的负担,阈值增加NMS和Rnet的负担,虽然Rnet表示no problem~

4、本文没有显示landmark,主要原因是因为忘了。。后来补充识别发现实测landmark结果并不好,在Onet利用landmark样本充分训练人脸关键点的情况下还是这么差,个人觉得这与样本关系不太大,虽然原文中采用3个loss函数同时预测以达到正则的效果,但Onet已经调大了landmark的权重,影响不应该这么大。所以我考虑是否代码中有隐形bug,是否计算错误,接下来我会继续检查代码。

8、总结

在完成MTCNN复现后,随即开始人脸识别的工作,争取在暑假前搞完人脸识别系统,让实验室用上新鲜出炉的bug满天飞的,还给自己留后门的打卡软件~


2019.3.3


github地址:https://github.com/friedhelm739/MTCNN-tensorflow