6.6. scrapy-redis的官方文档源码分析参考:Scheduler


scheduler.py
此扩展是对scrapy中自带的scheduler的替代(在settings的SCHEDULER变量中指出),正是利用此扩展实现crawler的分布式调度。其利用的数据结构来自于queue中实现的数据结构。

scrapy-redis所实现的两种分布式:爬虫分布式以及item处理分布式就是由模块scheduler和模块pipelines实现。上述其它模块作为为二者辅助的功能模块

import importlib
import six

from scrapy.utils.misc import load_object

from . import connection

TODO: add SCRAPY_JOB support.

class Scheduler(object):
“”“Redis-based scheduler”""

def __init__(self, server,
             persist=False,
             flush_on_start=False,
             queue_key='%(spider)s:requests',
             queue_cls='scrapy_redis.queue.SpiderPriorityQueue',
             dupefilter_key='%(spider)s:dupefilter',
             dupefilter_cls='scrapy_redis.dupefilter.RFPDupeFilter',
             idle_before_close=0,
             serializer=None):
    """Initialize scheduler.
    Parameters
    ----------
    server : Redis
        The redis server instance.
    persist : bool
        Whether to flush requests when closing. Default is False.
    flush_on_start : bool
        Whether to flush requests on start. Default is False.
    queue_key : str
        Requests queue key.
    queue_cls : str
        Importable path to the queue class.
    dupefilter_key : str
        Duplicates filter key.
    dupefilter_cls : str
        Importable path to the dupefilter class.
    idle_before_close : int
        Timeout before giving up.
    """
    if idle_before_close < 0:
        raise TypeError("idle_before_close cannot be negative")

    self.server = server
    self.persist = persist
    self.flush_on_start = flush_on_start
    self.queue_key = queue_key
    self.queue_cls = queue_cls
    self.dupefilter_cls = dupefilter_cls
    self.dupefilter_key = dupefilter_key
    self.idle_before_close = idle_before_close
    self.serializer = serializer
    self.stats = None

def __len__(self):
    return len(self.queue)

@classmethod
def from_settings(cls, settings):
    kwargs = {
        'persist': settings.getbool('SCHEDULER_PERSIST'),
        'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
        'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
    }

    # If these values are missing, it means we want to use the defaults.
    optional = {
        # TODO: Use custom prefixes for this settings to note that are
        # specific to scrapy-redis.
        'queue_key': 'SCHEDULER_QUEUE_KEY',
        'queue_cls': 'SCHEDULER_QUEUE_CLASS',
        'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
        # We use the default setting name to keep compatibility.
        'dupefilter_cls': 'DUPEFILTER_CLASS',
        'serializer': 'SCHEDULER_SERIALIZER',
    }
    for name, setting_name in optional.items():
        val = settings.get(setting_name)
        if val:
            kwargs[name] = val

    # Support serializer as a path to a module.
    if isinstance(kwargs.get('serializer'), six.string_types):
        kwargs['serializer'] = importlib.import_module(kwargs['serializer'])

    server = connection.from_settings(settings)
    # Ensure the connection is working.
    server.ping()

    return cls(server=server, **kwargs)

@classmethod
def from_crawler(cls, crawler):
    instance = cls.from_settings(crawler.settings)
    # FIXME: for now, stats are only supported from this constructor
    instance.stats = crawler.stats
    return instance

def open(self, spider):
    self.spider = spider

    try:
        self.queue = load_object(self.queue_cls)(
            server=self.server,
            spider=spider,
            key=self.queue_key % {'spider': spider.name},
            serializer=self.serializer,
        )
    except TypeError as e:
        raise ValueError("Failed to instantiate queue class '%s': %s",
                         self.queue_cls, e)

    try:
        self.df = load_object(self.dupefilter_cls)(
            server=self.server,
            key=self.dupefilter_key % {'spider': spider.name},
            debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
        )
    except TypeError as e:
        raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                         self.dupefilter_cls, e)

    if self.flush_on_start:
        self.flush()
    # notice if there are requests already in the queue to resume the crawl
    if len(self.queue):
        spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))

def close(self, reason):
    if not self.persist:
        self.flush()

def flush(self):
    self.df.clear()
    self.queue.clear()

def enqueue_request(self, request):
    if not request.dont_filter and self.df.request_seen(request):
        self.df.log(request, self.spider)
        return False
    if self.stats:
        self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
    self.queue.push(request)
    return True

def next_request(self):
    block_pop_timeout = self.idle_before_close
    request = self.queue.pop(block_pop_timeout)
    if request and self.stats:
        self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
    return request

def has_pending_requests(self):
    return len(self) > 0

这个文件重写了scheduler类,用来代替scrapy.core.scheduler的原有调度器。其实对原有调度器的逻辑没有很大的改变,主要是使用了redis作为数据存储的媒介,以达到各个爬虫之间的统一调度。 scheduler负责调度各个spider的request请求,scheduler初始化时,通过settings文件读取queue和dupefilters的类型(一般就用上边默认的),配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters,这样对于同一种spider的不同实例,就会使用相同的数据块了)。每当一个request要被调度时,enqueue_request被调用,scheduler使用dupefilters来判断这个url是否重复,如果不重复,就添加到queue的容器中(先进先出,先进后出和优先级都可以,可以在settings中配置)。当调度完成时,next_request被调用,scheduler就通过queue容器的接口,取出一个request,把他发送给相应的spider,让spider进行爬取工作。