使用Docker部署scrapy-redis分布式爬虫

引言

在上篇使用Scrapy爬取知乎用户信息我们编写了一个单机的爬虫,这篇记录了使用Scrapy-Redis将其重写,使其具备分布式抓取的能力,并使用Docker部署到我们两台云server

为什么要分布式,显然单机的爬虫无论在机器的带宽还是ip等在爬取的时候都会有一定的限制,为了提高我们爬取的效率,我们需要编写一个可以在多台机器上同时运行的爬虫,在其爬取状态同步的同时,对我们想要的信息进行爬取。而Scrapy-Redisgithub上的一个开源项目。

为什么使用Docker,说到Docker可能很多人老早就听说过其在江湖上的传说,在我写这篇文章之前,也仅仅是对其有个模糊的了解,但是我昨天晚上在部署我的Project到我的两台服务器上的时候,我被基础环境的配置给弄疯了,之前我在阿里云服务器上安装了Py3,勉强可以支撑我之前的需求。

但是昨晚在安装Scrapy的时候,为了装一个Package,我几乎要把Py3、OpenSSL等组件重新安装一遍,在痛苦的挣扎之后,我想到了Docker(也许是瞄到了官方的例子中的Dockerfile),果不其然简单的google下就看到了大量的部署Scrapy的案例。

简单的看了下官方文档,没一会就将我的项目部署到了两台服务器上并正常运行,这太惊艳了!

原理

分布式爬虫原理

Scrapy-Redis其仅仅是一双飞翔的翅膀,它自身并没有爬虫能力,它仅仅是为Scrapy这个框架披上了一层铠甲。所以阅读本小节需要对Scrapy的工作原理有一定的了解,如不感兴趣可以跳过

试想如果我们自行编写一个分布式爬虫在多台主机上运行,则我们需要将爬虫的爬取队列进行共享,也就是说每一台主机都需要访问到一个共享的队列,然后我们的爬虫就是从队列中取一个request进行爬取,当然这些Scrapy-Redis都已经帮我们做好了,他可能需要做的是如下操作:

  • 初始化爬虫,创建一个Redis的客户端,连接上Redis
  • 查看请求队列是否为空,如果是空则等待,当请求的队列不为空,则从请求队列中拿出一个request
  • 获得request,经过scheduler调度后,engine会将request取出,送给downloader进行请求
  • 经过请求后,返回给EngineEngine将结果返回给用户写的爬虫,对结果进行处理,可能出现下一个request,也可能是item
  • 如果请求后得到的是一个request,则会通过scheduler再次调度,判断request是否重复,并将request放入请求队列
  • 如果经过得到了item,则Scrapy会将item交给我们的pipeline处理

可见,scrapy_redis就是将request调度的队列、请求的队列和获取的item放在了一个多台主机可以同时访问的redis的数据结构里

Scrapy-Redis源码分析

所谓请求分布式,就是使用redisrequest抓取队列和url去重集合进行管理,从而达到爬虫的状态同步。

以下就包含了Scrapy-Redis的源码结构,也就只有这么几个文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ tree ScrapyRedisZhihu/scrapy_redis/
ScrapyRedisZhihu/scrapy_redis/
├── connection.py
├── defaults.py
├── dupefilter.py
├── __init__.py
├── picklecompat.py
├── pipelines.py
├── queue.py
├── scheduler.py
├── spiders.py
└── utils.py
0 directories, 10 files

初始化爬虫,创建一个Redis的客户端,连接上Redis,查看SPIDER_NAME:start_urls请求队列是否为空,如果是空则等待,当请求的队列不为空,则从请求队列中拿出一个request

首先connection.pydefaults.py仅仅根据我们的setting.py配置文件,为爬虫提供一个reids client的实例。

而对于不同的Spider对上层都暴露出一个创建爬虫的接口,上层这个接口就是crawler_create_spider方法,调用这个方法干了一件事情:调用不同spiderfrom_crawler这个方法,无论我们是Spider还是RedisSpider。在不同的spiderfrom_crawler函数都会重写,原生Scrapy提供的from_crawler会创建出不同类型的爬虫并将其返回给Crawler,而我们一般都不会去重写这个method。而这个from_crawler会创建出不同类型的爬虫如zhihu_spider、lajou_spider等并将其返回给上层。

由于我们的爬虫是继承了RedisSpider这个类,RedisSpider继承了Scrapy原生的Spider类,RedisSpider主要重写了Spiderfrom_crawler类方法,Spiderfrom_crawler类方法起到了一个Constructor的作用,解耦了spider实例化的具体实现和上层调用。

1
2
3
4
5
6
class RedisSpider(RedisMixin, Spider):
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler) # RedisMixin中的setup_redis
return obj

并注意,RedisSpider它采用了Python的多继承Mixin设计,他不仅继承了Spider还继承了RedisMixin,当我们RedisSpider初始化,它和原生Scrapy不同的是:调用setup_redis来初始化和redis的相关设置

1
2
3
4
5
6
class RedisMixin(object):
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal."""
self.server = connection.from_settings(crawler.settings)
# The idle signal is called when the spider has no requests left,
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

setup_redis也只是做了两件事,根据我们的setting.py配置文件,初始化一个redis连接;和为爬虫设置一个信号,当spider空闲,会调用spider_idle这个方法,spider_idle则会将调用schedule_next_requests,而schedule_next_requests也只是调用next_requests如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class RedisMixin(object):
def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
self.schedule_next_requests()
raise DontCloseSpider
def schedule_next_requests(self):
"""Schedules a request if available"""
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
found = 0
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
def make_request_from_data(self, data):
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)

由上我们可以知道当spider空闲的时候,会尝试在redis_key这个数据结构中尝试获得一个url,这个数据结构可以是默认的list也可以设置为set,设置为set则需要用REDIS_START_URLS_AS_SETsetting中进行设置。

如果获得一个url就进行一些decode处理,并进行请求,这里make_requests_from_url默认就是用的标准Spider类中的make_requests_from_url方法,其仅仅是return Request(url, dont_filter=True)

在如果没有url就会抛出DontCloseSpider这个异常,在上层engine._spider_idle早已将其捕获,并会再次重新调用spider_idle。这样就实现了一直尝试获得一个的request,没有则并进行等待并重新调度。

需要注意的是,在第一次调用spider_idle之前,爬虫会调用start_requests这个方法,这是由上层crawler中的crawl方法所决定的,这个方法也是Scrapy入口

1
2
3
4
5
6
7
8
@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
assert not self.crawling, "Crawling already taking place"
self.spider = self._create_spider(*args, **kwargs)
self.engine = self._create_engine()
start_requests = iter(self.spider.start_requests())
yield self.engine.open_spider(self.spider, start_requests)
yield defer.maybeDeferred(self.engine.start)

在我们创建好spiderengine,就会调用start_requests,在原生的Spider类中这个函数会迭代start_urls这个列表,也就是我们熟悉的爬虫的入口url,而在RedisMixin中将其重写,在redis的数据结构中获得爬虫的起始页面。

1
2
def start_requests(self):
return self.next_requests()

所以,我们在使用RedisSpider类进行分布式爬虫编写的时候,就不需要定义start_urls这个列表,相反需要增加一个redis_key = 'zhihu_redis:start_urls',并且我们在运行爬虫的时候,需要手动在redis的数据结构内push/add一个逻辑上的start_url,使其被next_requests方法获得

start_requests方法被调用,将回到crawl调用engine.open_spider方法,这个方法会初始化scheduler并调用爬虫中间件spidermw.process_start_requests(start_requests, spider),之后会调用scheduler.open这个方法,这个方法会初始化我们的优先级队列和过滤器。并注册engine._next_requestreactor loop里面,当nextcall.schedule()调用,engine._next_request将会被调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def _next_request(self, spider):
while not self._needs_backout(spider):
if not self._next_request_from_scheduler(spider):
break #第一次执行,scheduler中没有request,会直接break
if slot.start_requests and not self._needs_backout(spider):
try:
request = next(slot.start_requests)
except StopIteration:
slot.start_requests = None
except Exception:
slot.start_requests = None
logger.error('Error while obtaining start requests',
exc_info=True, extra={'spider': spider})
else:
self.crawl(request, spider)
if self.spider_is_idle(spider) and slot.close_if_idle:
self._spider_idle(spider)
#对 RedisSpider 中的DontCloseSpider进行捕获,捕获到直接返回,回到`reactor loop`

由于第一次执行程序,redis数据结构中(SPIDER_NAME:start_urls)没有对应的url,在这个注册到reactor loop内的方法,调用了_spider_idleRedisSpider中的DontCloseSpider进行捕获。并直接返回到reactor loop,直到从redis中获得url,调用crawl

1
2
3
4
5
6
7
8
9
10
11
12
def crawl(self, request, spider):
assert spider in self.open_spiders, \
"Spider %r not opened when crawling: %s" % (spider.name, request)
self.schedule(request, spider)
self.slot.nextcall.schedule()
def schedule(self, request, spider):
self.signals.send_catch_log(signal=signals.request_scheduled,
request=request, spider=spider)
if not self.slot.scheduler.enqueue_request(request):
self.signals.send_catch_log(signal=signals.request_dropped,
request=request, spider=spider)

可以看到,在engine.cawl中调用了engine.schedule,这里终于调用了scheduler.enqueue_request,也就是scrapy_redis中的scheduler.py模块中的enqueue_request

经过scheduler调用dupfilter的判重,会将request送到优先级队列当中,送给downloader进行请求

scheduler在将request放入SPIDERNAME:requests这个队列时,关于requests去重,Scrapy-Redis还是使用原来的做法,提取指纹(hashlib.sha1),放入一个集合,但是Scrapy-Redis使用了Redis的集合,这样多台主机可以共享一个用于去重的集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
return False
self.queue.push(request)
return True
# dupfilter.py
def request_seen(self, request):
fp = self.request_fingerprint(request)
added = self.server.sadd(self.key, fp)
return added == 0
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request

优先级调度队列scrapy_redis:requestsQueue.py中被定义的模块,对原生的队列进行了重写,使用Redislistzset分别实现了三种队列,LifoQueueFifoQueuePriorityQueue。默认使用的是PriorityQueue

request进入优先级队列之后,会调用nextcall.schedule,也就是会调用engine._next_request,当再次调用engine._next_request_from_scheduler,再调用scheduler.next_request就会将优先级队列中的的url取出。并进行下载了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _next_request_from_scheduler(self, spider):
slot = self.slot
request = slot.scheduler.next_request()
if not request:
return
d = self._download(request, spider)
d.addBoth(self._handle_downloader_output, request, spider) #注册 _downloader 回调
d.addErrback(lambda f: logger.info('Error while handling downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.remove_request(request))
d.addErrback(lambda f: logger.info('Error while removing request from slot',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.nextcall.schedule())
d.addErrback(lambda f: logger.info('Error while scheduling new request',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d

经过请求后,response返回给EngineEngine对结果进行处理,可能出现下一个request,也可能解析出我们想要的数据,也就是item,也有可能是失败,如果请求后得到的是一个request,则会通过scheduler判断request是否重复,并将request放入请求队列

downloader返回的response交给我们的engine,engine会判断response的类型

1
2
3
4
5
6
7
8
9
10
11
12
def _handle_downloader_output(self, response, request, spider):
assert isinstance(response, (Request, Response, Failure)), response
# downloader middleware can return requests (for example, redirects)
if isinstance(response, Request):
self.crawl(response, spider)
return
# response is a Response or Failure
d = self.scraper.enqueue_scrape(response, request, spider)
d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d

如果是一个Request,如可能出现重定向等情况,则会再次调用crawl进入scheduler队列。如果是response或是错误则会调用scraper.enqueue_scrape(response, request, spider)。在这个方法内会经过一系列的调用、注册回调,会判断如果不是错误则会调用call_spider这个方法

1
2
3
4
5
def call_spider(self, result, request, spider):
result.request = request
dfd = defer_result(result)
dfd.addCallbacks(request.callback or spider.parse, request.errback)
return dfd.addCallback(iterate_spider_output)

可以看到在这个方法内,addCallbacks注册回调方法,取得request.callback,如果未定义则调用Spiderparse方法,也就是我们自己爬虫zhihu_spider的逻辑,在我们的逻辑处理后会出现两种,一种就是item,一种就是request,并将其yield,此时会调用在_scrape中注册的回调函数handle_spider_outputspider返回的数据进行处理。在这个方法内,有调用了_process_spidermw_output对我们的yield出去的数据进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _process_spidermw_output(self, output, request, response, spider):
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
if isinstance(output, Request):
self.crawler.engine.crawl(request=output, spider=spider)
elif isinstance(output, (BaseItem, dict)):
self.slot.itemproc_size += 1
dfd = self.itemproc.process_item(output, spider)
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None:
pass
else:
typename = type(output).__name__
logger.error('Spider must return Request, BaseItem, dict or None, '
'got %(typename)r in %(request)s',
{'request': request, 'typename': typename},
extra={'spider': spider})

此时可以看到,在_process_spidermw_outputoutput进行了类型判断,如果是Request,则调用engine.crawl,进入scheduler的处理检验指纹并加入优先级队列等等,如果是BaseItem则会调用pipelineprocess_item方法

经过spider逻辑处理后,会有两种结果,可能出现下一个request,也可能解析出我们想要的数据,也就是item

Scrapy-redispipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class RedisPipeline(object):
def __init__(self, server,
key=defaults.PIPELINE_KEY,
serialize_func=default_serialize):
self.server = server #Redis client instance.
self.key = key # redis key ,value stored serialized key
self.serialize = serialize_func #Items serializer function
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
#init pipeline from setting
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
# twisted 异步线程的接口,将item的处理变为异步的,防止源源不断的item处理阻塞
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
return self.key % {'spider': spider.name}

遵循Scrapy pipeline的写法,在_process_spidermw_output被调用,只是将Scrapy处理得到的item经过序列化之后放入Redis的列表。

这样造成的结果就是爬取到的数据都放入到了Redis,如果我们想和之前标准的scrapy一样将数据放入其他的数据库或做一些处理,我们需要重新编写一个处理item的一个文件

提醒
虽然scrapy-redis源码组成简单,但是由于scrapy-redisScrapy的源码依赖过多,需要更加细致的解读只能读者自行阅读Scrapy的源码。关于Scrapy-Redis的具体使用可以查阅源码和官方仓库中的setting.py文件中的注释。

实现

修改setting.py文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# Ensure all spiders share same duplicates filter through redis.
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
ITEM_PIPELINES = {
# 'ScrapyRedisZhihu.pipelines.MysqlTwsitedPipeline': 300,
'scrapy_redis.pipelines.RedisPipeline': 301
}
DOWNLOADER_MIDDLEWARES = {
'scrapy.contrib.downloadermiddleware.useragent.UserAgentMiddleware' : None,
'ScrapyRedisZhihu.middlewares.ProxyMiddleware' :400
}
DOWNLOAD_DELAY = 0.4
MYSQL_HOST='x.x.x.x'
MYSQL_NAME='crawler'
MYSQL_USER='root'
MYSQL_PASS='xxxxxx'
REDIS_HOST='x.x.x.x'
REDIS_PORT=9999

注意
我们将SCHEDULERDUPEFILTER_CLASSITEM_PIPELINES都设置为Scrapy-Redis提供的。其中SCHEDULER被重新实现,使用Redislistzset分别实现了三种队列,LifoQueueFifoQueuePriorityQueue

dupefilter使用了redisset

ITEM_PIPELINES也将item序列化后放入了redislist。它不是必须的,但是我想尝试下Redis的威力,所以使用了这个pipeline,如果使用RedisPipeline的话,我们需要重新编写一个文件来处理序列化(默认为json)到Redis里面的Item,通常命名为process_item.py

修改zhihu_spider.py文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import scrapy
import json
from ScrapyRedisZhihu.items import ZhihuUserItem,ZhihuUserItemLoader
from datetime import datetime
from scrapy_redis.spiders import RedisSpider
following_api = "https://www.zhihu.com/api/v4/members/{}/followees?include=data[*].gender%2Cvoteup_count%2Cthanked_Count%2Cfollower_count%2Cfollowing_count%2Canswer_count%2Carticles_count%2Cfavorite_count%2Cfavorited_count%2Cthanked_count%2Cbadge[%3F(type%3Dbest_answerer)].topics&offset=0&limit=20"
class ZhihuSpider(RedisSpider):
name = 'zhihu_redis'
allowed_domains = ["www.zhihu.com"]
redis_key = 'zhihu_redis:start_urls'
def parse(self, response):
json_response = json.loads(response.body_as_unicode())
if not json_response['paging']['is_end']:
yield scrapy.Request(url=json_response['paging']['next'])
if json_response['data']:
for data in json_response['data']:
url_token = data.get('url_token')
if url_token:
yield scrapy.Request(url=following_api.format(url_token))
agreed_count = data['voteup_count']
thxd_count = data['thanked_count']
collected_count = data['favorited_count']
if thxd_count or collected_count:
item_loader = ZhihuUserItemLoader(item=ZhihuUserItem(), response=response)
item_loader.add_value('name',data['name'])
item_loader.add_value('id',data['id'])
item_loader.add_value('url_token',data['url_token'])
item_loader.add_value('headline',data['headline']
if data['headline'] else "无")
item_loader.add_value('answer_count',data['answer_count'])
item_loader.add_value('articles_count',data['articles_count'])
item_loader.add_value('gender',data['gender']
if data['gender'] else 0)
item_loader.add_value('avatar_url',data['avatar_url_template'].format(size='xl'))
item_loader.add_value('user_type',data['user_type'])
item_loader.add_value('badge',','.join([badge.get('description') for badge in data['badge']])
if data.get('badge') else "无")
item_loader.add_value('follower_count',data['follower_count'])
item_loader.add_value('following_count',data['following_count'])
item_loader.add_value('agreed_count',agreed_count)
item_loader.add_value('thxd_count',thxd_count)
item_loader.add_value('collected_count',collected_count)
# item_loader.add_value('craw_time',datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
zhihu_item = item_loader.load_item()
yield zhihu_item

即可

使用Docker部署

关于Docker的安装和基本使用这里就不再赘述,后期应该会针对写一篇关于容器的文章。

搭建私有仓库

为了将我们的容器上传到云服务器上而不是一个服务器一个服务器的打包,我们需要搭建一个私有的docker仓库,首先我们需要运行下面命令获取registry镜像
sudo docker pull registry:2.1.1

然后启动一个容器

1
sudo docker run -d -v /opt/registry:/var/lib/registry -p 5000:5000 --restart=always --name registry registry:2.1.1

Registry服务默认会将上传的镜像保存在容器的/var/lib/registry,我们将主机的/opt/registry目录挂载到该目录,即可实现将镜像保存

运行sudo docker ps保证registry镜像启动的容器正在运行即可

打包部署

在项目根目录生成依赖文件:pip freeze > requirements.txt

编写Dockerfile文件,和main.py在同一个目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
FROM hub.c.163.com/sportscool/python3
MAINTAINER zhxfei <dylan@zhxfei.com>
ENV PATH /usr/bin:$PATH
ADD . /code
WORKDIR /code
RUN pip install -r requirements.txt -i https://pypi.douban.com/simple
CMD scrapy crawl zhihu_redis

生成镜像: sudo docker build -t IP:PORT/NAME .

这个IP:PORT应为私有仓库的registry服务监听的地址和端口,启动-t就已经制定了镜像对应的仓库

push镜像到私有仓库: sudo docker push IP:PORT/NAME

使用curl来确认是否上传:

1
2
zhxfei@HP-ENVY:~/just4fun$ curl 120.x.x.x:5000/v2/_catalog
{"repositories":["scrapy_redis"]}

有之前生成的NAME即可,如此处应为scrapy_redis

之后在每台云服务器上运行sudo docker pull IP:PORT/NAME 将镜像下载到本地,再启动一个容器即可,由于打包镜像时指定了CMD,运行镜像即会运行python main.py,爬虫就已经启动了

当然,也可以使用一些自动化运维的工具,在本机上远程批量执行命令,如Ansible,使用

1
sudo ansible ALL_HOSTS -m shell -a "docker pull IP:PORT/NAME && docker run -d -P IP:PORT/NAME" --sudo

坚持原创技术分享,您的支持将鼓励我继续创作!