实验要求

使用多线程爬虫技术,爬取目标网站中的图片并保存到本地。

目标网站:https://www.doutula.com/

代码实现

使用生产者消费者的设计模式 + 多线程技术。

首先要明确地概念:

  • 线程与线程之间是轮流执行的,每个线程都有一个时间片;
  • 主线程就是整个函数的执行顺序;
  • join 函数的意思是让调用者优先执行,也可以理解为线程阻塞,即阻塞其他线程,直到该线程执行完毕或者终止。
""" @author: shoo Wang @contact: wangsuoo@foxmail.com @file: demo02.py @time: 2020/5/19 0019 """

# 多线程爬虫
import requests as req
from lxml import etree
import urllib
import threading
import time

lock = threading.Lock()
IMG_LIST = []
URL_LIST = []


# 生成爬取目标网站的链接
def getUrlList(num):
    base_url = 'https://www.doutula.com/photo/list/?page='
    for i in range(1, num + 1):
        x = base_url + str(i)
        URL_LIST.append(x)


# 获取网站的文本数据 url 为单个页面
def product():
    while URL_LIST:
        # 拿到这一页的地址
        url = URL_LIST.pop()
        html = etree.HTML(urlToText(url))
        img_url = html.xpath('//div[@class="page-content text-center"]//img/@data-original')
        # 获取线程锁
        lock.acquire()
        print('生产者将一页中的图片地址放进资源池')
        for img in img_url:
            # 对于每一页中的每一个图片都把他放进去
            IMG_LIST.append(img)
        lock.release()


# 将页的地址转为 text 格式
def urlToText(url):
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.116 Safari/537.36'}
    resp = req.get(url, headers=header)
    resp.encoding = 'utf-8'
    return resp.text


def customer():
    global flag
    while True:
        if len(IMG_LIST) is 0:
            pass
        else:
            # 获取线程锁
            lock.acquire()
            img = IMG_LIST.pop()
            # 改完之后释放锁
            lock.release()
            fileName = img.split('/')[-1]
            path = './image/' + fileName
            urllib.request.urlretrieve(img, path)


def run_time(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        func(*args, **kwargs)
        end = time.time()
        print('爬取成功! 请查阅当前目录下的 image 文件夹, 耗时: ', end-start, 's')
    return wrapper()


@run_time
def run():
    # 爬取的页数
    getUrlList(2)
    thrPros = []
    thrCuss = []
    for i in range(2):
        thrPro = threading.Thread(target=product)
        thrPro.start()
        thrPros.append(thrPro)
    for i in range(8):
        thrCus = threading.Thread(target=customer)
        thrCus.start()
        thrCuss.append(thrCus)
    for thPro in thrPros:
        thPro.join()
    for thCus in thrCuss:
        thCus.join()

程序执行,过一会之后图片会爬取成功,但是这里是有问题的,下面会分析。

问题分析

这一段代码有问题:

def customer():
    global flag
    while True:
        if len(IMG_LIST) is 0:
            pass
        else:
            # 获取线程锁
            lock.acquire()
            img = IMG_LIST.pop()
            # 改完之后释放锁
            lock.release()
            fileName = img.split('/')[-1]
            path = './image/' + fileName
            urllib.request.urlretrieve(img, path)

我们写这段代码的初衷是因为刚开始的时候 IMG_LIST 可能是 0 ,因为生产者还没来得及做,所以让消费者等一下生产者,所以 continue,但是设想一下到最后的时候,消费者已经消耗完所有的 IMG_LIST 资源了。IMG_LIST 确实为空了,他还在这里不停的循环。

问题解决

我们可以添加一个 falg 变量来记录生产者是不是生产完了。刚开始的时候设置为 0 ,这样就还是 continue

只需要保证在设置 flag 变量的时候,IMG_LIST 不为空即可,因为如果他不为空,那么无论 flag 的值为何值都不会影响程序的执行,因为这个 if 语句进不去:

那我们如何控制呢?

因为在多线程执行的时候,我们无法保证 IMG_LIST 不为空,因为当生产者刚刚生产完了资源放进资源池中,可能瞬间就被消费者消耗掉了,所以我们要做的就是让其它线程先等生产者执行完了再执行,这个时候就要使用 join 方法了,他的意思是让调用它的线程独占 CPU 资源,只有等他执行完了其他线程才能执行。

所以主线程也被阻塞了,当消费者线程的 join 方法结束的时候,我们肯定可以保证 IMG_LIST 不为空,所以可以放心的将 flag 置为1,这样的话如果下一次消费者消耗完了资源池中的数据,就说明所有的生产者已经生产完了,然后又被消耗完了,所以是真的没有了,那么这个 flag 就起作用了。

最终答案

""" @author: shoo Wang @contact: wangsuoo@foxmail.com @file: demo02.py @time: 2020/5/19 0019 """

# 多线程爬虫
import requests as req
from lxml import etree
import urllib
import threading
import time

lock = threading.Lock()
IMG_LIST = []
URL_LIST = []
flag = 0


# 生成爬取目标网站的链接
def getUrlList(num):
    base_url = 'https://www.doutula.com/photo/list/?page='
    for i in range(1, num + 1):
        x = base_url + str(i)
        URL_LIST.append(x)


# 获取网站的文本数据 url 为单个页面
def product():
    while URL_LIST:
        # 拿到这一页的地址
        url = URL_LIST.pop()
        html = etree.HTML(urlToText(url))
        img_url = html.xpath('//div[@class="page-content text-center"]//img/@data-original')
        # 获取线程锁
        lock.acquire()
        print('生产者将一页中的图片地址放进资源池')
        for img in img_url:
            # 对于每一页中的每一个图片都把他放进去
            IMG_LIST.append(img)
        lock.release()


# 将页的地址转为 text 格式
def urlToText(url):
    header = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.116 Safari/537.36'}
    resp = req.get(url, headers=header)
    resp.encoding = 'utf-8'
    return resp.text


def customer():
    global flag
    while True:
        if len(IMG_LIST) is 0:
            if flag:
                print('资源池中的数据已取完')
                break
            pass
        else:
            # 获取线程锁
            lock.acquire()
            img = IMG_LIST.pop()
            # 改完之后释放锁
            lock.release()
            fileName = img.split('/')[-1]
            path = './image/' + fileName
            urllib.request.urlretrieve(img, path)


def run_time(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        func(*args, **kwargs)
        end = time.time()
        print('爬取成功! 请查阅当前目录下的 image 文件夹, 耗时: ', end-start, 's')
    return wrapper()


@run_time
def run():
    # 爬取的页数
    getUrlList(2)
    thrPros = []
    thrCuss = []
    for i in range(2):
        thrPro = threading.Thread(target=product)
        thrPro.start()
        thrPros.append(thrPro)
    for i in range(8):
        thrCus = threading.Thread(target=customer)
        thrCus.start()
        thrCuss.append(thrCus)
    for thPro in thrPros:
        thPro.join()
    ''' ? 为什么 flag 要放在这里: - 他前面的是生产者线程的join方法,该方法执行完了则代表生产者线程肯定全部执行完了 - 因为 join 相当于线程阻塞,只执行该线程,不执行其他线程 - 肯定执行完了之后那 IMG 中肯定有数据,那么同时在执行的消费者肯定可以直接获取到 - 如果这个时候还是空,那只有可能是他自己消耗完了,因为人家刚刚给你制造了那么多 IMG,不可能没有 - 如果放在生产者的 join 方法前面的话,则可能生产者还没来得及制造资源 '''
    global flag
    flag = 1
    for thCus in thrCuss:
        thCus.join()

时间分析

下面是我以爬取10页数据做的实验,使用控制变量法:

  1. 图一和图二是在保持消费者的线程数不变的情况下,改变生产者的线程数量从2到5,通过结果可以发现,两者的时间几乎没有差别,仅仅是少了0.04秒,但是这说明真正制约时间的不是生产者;
  2. 图二和图三是在保持生产者的线程数不变的情况下,改变消费者的线程数量从8到4,降了一半,通过结果可以发现,两者的时间相差很大后者为前者的3倍时间,这说明真正限制时间的是消费者的执行速度,所以应该相对于生产者分配更多的线程数量。