实验要求
使用多线程爬虫技术,爬取目标网站中的图片并保存到本地。
代码实现
使用生产者消费者的设计模式 + 多线程技术。
首先要明确地概念:
- 线程与线程之间是轮流执行的,每个线程都有一个时间片;
- 主线程就是整个函数的执行顺序;
join
函数的意思是让调用者优先执行,也可以理解为线程阻塞,即阻塞其他线程,直到该线程执行完毕或者终止。
""" @author: shoo Wang @contact: wangsuoo@foxmail.com @file: demo02.py @time: 2020/5/19 0019 """
# 多线程爬虫
import requests as req
from lxml import etree
import urllib
import threading
import time
lock = threading.Lock()
IMG_LIST = []
URL_LIST = []
# 生成爬取目标网站的链接
def getUrlList(num):
base_url = 'https://www.doutula.com/photo/list/?page='
for i in range(1, num + 1):
x = base_url + str(i)
URL_LIST.append(x)
# 获取网站的文本数据 url 为单个页面
def product():
while URL_LIST:
# 拿到这一页的地址
url = URL_LIST.pop()
html = etree.HTML(urlToText(url))
img_url = html.xpath('//div[@class="page-content text-center"]//img/@data-original')
# 获取线程锁
lock.acquire()
print('生产者将一页中的图片地址放进资源池')
for img in img_url:
# 对于每一页中的每一个图片都把他放进去
IMG_LIST.append(img)
lock.release()
# 将页的地址转为 text 格式
def urlToText(url):
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.116 Safari/537.36'}
resp = req.get(url, headers=header)
resp.encoding = 'utf-8'
return resp.text
def customer():
global flag
while True:
if len(IMG_LIST) is 0:
pass
else:
# 获取线程锁
lock.acquire()
img = IMG_LIST.pop()
# 改完之后释放锁
lock.release()
fileName = img.split('/')[-1]
path = './image/' + fileName
urllib.request.urlretrieve(img, path)
def run_time(func):
def wrapper(*args, **kwargs):
start = time.time()
func(*args, **kwargs)
end = time.time()
print('爬取成功! 请查阅当前目录下的 image 文件夹, 耗时: ', end-start, 's')
return wrapper()
@run_time
def run():
# 爬取的页数
getUrlList(2)
thrPros = []
thrCuss = []
for i in range(2):
thrPro = threading.Thread(target=product)
thrPro.start()
thrPros.append(thrPro)
for i in range(8):
thrCus = threading.Thread(target=customer)
thrCus.start()
thrCuss.append(thrCus)
for thPro in thrPros:
thPro.join()
for thCus in thrCuss:
thCus.join()
程序执行,过一会之后图片会爬取成功,但是这里是有问题的,下面会分析。
问题分析
这一段代码有问题:
def customer():
global flag
while True:
if len(IMG_LIST) is 0:
pass
else:
# 获取线程锁
lock.acquire()
img = IMG_LIST.pop()
# 改完之后释放锁
lock.release()
fileName = img.split('/')[-1]
path = './image/' + fileName
urllib.request.urlretrieve(img, path)
我们写这段代码的初衷是因为刚开始的时候 IMG_LIST
可能是 0 ,因为生产者还没来得及做,所以让消费者等一下生产者,所以 continue
,但是设想一下到最后的时候,消费者已经消耗完所有的 IMG_LIST
资源了。IMG_LIST
确实为空了,他还在这里不停的循环。
问题解决
我们可以添加一个 falg
变量来记录生产者是不是生产完了。刚开始的时候设置为 0
,这样就还是 continue
。
只需要保证在设置 flag
变量的时候,IMG_LIST
不为空即可,因为如果他不为空,那么无论 flag
的值为何值都不会影响程序的执行,因为这个 if
语句进不去:
那我们如何控制呢?
因为在多线程执行的时候,我们无法保证 IMG_LIST
不为空,因为当生产者刚刚生产完了资源放进资源池中,可能瞬间就被消费者消耗掉了,所以我们要做的就是让其它线程先等生产者执行完了再执行,这个时候就要使用 join
方法了,他的意思是让调用它的线程独占 CPU
资源,只有等他执行完了其他线程才能执行。
所以主线程也被阻塞了,当消费者线程的 join
方法结束的时候,我们肯定可以保证 IMG_LIST
不为空,所以可以放心的将 flag
置为1
,这样的话如果下一次消费者消耗完了资源池中的数据,就说明所有的生产者已经生产完了,然后又被消耗完了,所以是真的没有了,那么这个 flag
就起作用了。
最终答案
""" @author: shoo Wang @contact: wangsuoo@foxmail.com @file: demo02.py @time: 2020/5/19 0019 """
# 多线程爬虫
import requests as req
from lxml import etree
import urllib
import threading
import time
lock = threading.Lock()
IMG_LIST = []
URL_LIST = []
flag = 0
# 生成爬取目标网站的链接
def getUrlList(num):
base_url = 'https://www.doutula.com/photo/list/?page='
for i in range(1, num + 1):
x = base_url + str(i)
URL_LIST.append(x)
# 获取网站的文本数据 url 为单个页面
def product():
while URL_LIST:
# 拿到这一页的地址
url = URL_LIST.pop()
html = etree.HTML(urlToText(url))
img_url = html.xpath('//div[@class="page-content text-center"]//img/@data-original')
# 获取线程锁
lock.acquire()
print('生产者将一页中的图片地址放进资源池')
for img in img_url:
# 对于每一页中的每一个图片都把他放进去
IMG_LIST.append(img)
lock.release()
# 将页的地址转为 text 格式
def urlToText(url):
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.116 Safari/537.36'}
resp = req.get(url, headers=header)
resp.encoding = 'utf-8'
return resp.text
def customer():
global flag
while True:
if len(IMG_LIST) is 0:
if flag:
print('资源池中的数据已取完')
break
pass
else:
# 获取线程锁
lock.acquire()
img = IMG_LIST.pop()
# 改完之后释放锁
lock.release()
fileName = img.split('/')[-1]
path = './image/' + fileName
urllib.request.urlretrieve(img, path)
def run_time(func):
def wrapper(*args, **kwargs):
start = time.time()
func(*args, **kwargs)
end = time.time()
print('爬取成功! 请查阅当前目录下的 image 文件夹, 耗时: ', end-start, 's')
return wrapper()
@run_time
def run():
# 爬取的页数
getUrlList(2)
thrPros = []
thrCuss = []
for i in range(2):
thrPro = threading.Thread(target=product)
thrPro.start()
thrPros.append(thrPro)
for i in range(8):
thrCus = threading.Thread(target=customer)
thrCus.start()
thrCuss.append(thrCus)
for thPro in thrPros:
thPro.join()
''' ? 为什么 flag 要放在这里: - 他前面的是生产者线程的join方法,该方法执行完了则代表生产者线程肯定全部执行完了 - 因为 join 相当于线程阻塞,只执行该线程,不执行其他线程 - 肯定执行完了之后那 IMG 中肯定有数据,那么同时在执行的消费者肯定可以直接获取到 - 如果这个时候还是空,那只有可能是他自己消耗完了,因为人家刚刚给你制造了那么多 IMG,不可能没有 - 如果放在生产者的 join 方法前面的话,则可能生产者还没来得及制造资源 '''
global flag
flag = 1
for thCus in thrCuss:
thCus.join()
时间分析
下面是我以爬取10页数据做的实验,使用控制变量法:
- 图一和图二是在保持消费者的线程数不变的情况下,改变生产者的线程数量
从2到5
,通过结果可以发现,两者的时间几乎没有差别,仅仅是少了0.04
秒,但是这说明真正制约时间的不是生产者; - 图二和图三是在保持生产者的线程数不变的情况下,改变消费者的线程数量
从8到4
,降了一半,通过结果可以发现,两者的时间相差很大后者为前者的3倍
时间,这说明真正限制时间的是消费者的执行速度,所以应该相对于生产者分配更多的线程数量。