专栏名称: 编程派
Python程序员都在看的公众号,跟着编程派一起学习Python,看最新国外教程和资源!
目录
相关文章推荐
Python中文社区  ·  用 Python 精准捕捉市场趋势反转信号 ·  3 天前  
Python开发者  ·  真的建议马上拿下软考证书!(特大红利期) ·  6 天前  
Python爱好者社区  ·  王者归来!LSTM 终于爆发了。。。 ·  6 天前  
Python爱好者社区  ·  当了leader才发现,大厂最想裁掉的,不是 ... ·  4 天前  
Python爱好者社区  ·  突发:美国密歇根大学终止与上海交大合作! ·  5 天前  
51好读  ›  专栏  ›  编程派

如何构建一个分布式爬虫:理论篇

编程派  · 公众号  · Python  · 2017-06-12 11:31

正文

本文已获作者授权。

文 | resolvewang

前言

本系列文章计划分三个章节进行讲述,分别是理论篇、基础篇和实战篇。理论篇主要为构建分布式爬虫而储备的理论知识,基础篇会基于理论篇的知识写一个简易的分布式爬虫,实战篇则会以微博为例,教大家做一个比较完整且足够健壮的分布式微博爬虫。通过这三篇文章,希望大家能掌握如何构建一个分布式爬虫的方法;能举一反三,将 celery 用于除爬虫外的其它场景。目前基本上的博客都是教大家使用 scrapyd或者 scrapy-redis构建分布式爬虫,本系列文章会从另外一个角度讲述如何用requests celery构建一个健壮的、可伸缩并且可扩展的分布式爬虫。

本系列文章属于爬虫进阶文章,期望受众是具有一定 Python 基础知识和编程能力、有爬虫经验并且希望提升自己的同学。小白要是感兴趣,也可以看看,看不懂的话,可以等有了一定基础和经验后回过头来再看。

另外一点说明,本系列文章不是旨在构建一个分布式爬虫框架或者分布式任务调度框架,而是利用现有的分布式任务调度工具来实现分布式爬虫,所以请轻喷。

分布式爬虫概览

  1. 何谓分布式爬虫? 
    通俗的讲,分布式爬虫就是多台机器多个 spider 对多个 url 的同时处理问题,分布式的方式可以极大提高程序的抓取效率。

  2. 构建分布式爬虫通畅需要考虑的问题 
    (1)如何能保证多台机器同时抓取同一个 URL? 
    (2)如果某个节点挂掉,会不会影响其它节点,任务如何继续? 
    (3)既然是分布式,如何保证架构的可伸缩性和可扩展性?不同优先级的抓取任务如何进行资源分配和调度?

基于上述问题,我选择使用 celery作为分布式任务调度工具,是分布式爬虫中任务和资源调度的核心模块。它会把所有任务都通过消息队列发送给各个分布式节点进行执行,所以可以很好的保证 url 不会被重复抓取;它在检测到 worker 挂掉的情况下,会尝试向其他的 worker 重新发送这个任务信息,这样第二个问题也可以得到解决;celery 自带任务路由,我们可以根据实际情况在不同的节点上运行不同的抓取任务(在实战篇我会讲到)。本文主要就是带大家了解一下 celery 的方方面面(有 celery 相关经验的同学和大牛可以直接跳过了)

Celery 知识储备

celery 基础讲解

按 celery 官网的介绍来说

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。

下面几个关于 celery 的核心知识点

  1. broker:翻译过来叫做中间人。它是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用 celery 的异步任务的时候,会向 broker 传递消息,而后 celery 的 worker 将会取到消息,执行相应程序。这其实就是消费者和生产者之间的桥梁。

  2. backend: 通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery 实现了一个 backend,用于存储这些消息以及 celery 执行的一些消息和结果。

  3. worker: Celery 类的实例,作用就是执行各种任务。注意在 celery3.1.25 后 windows 是不支持 celery worker 的!

  4. producer: 发送任务,将其传递给 broker

  5. beat: celery 实现的定时任务。可以将其理解为一个 producer,因为它也是通过网络调用定时将任务发送给 worker 执行。注意在 windows 上 celery 是不支持定时任务的!

下面是关于 celery 的架构示意图,结合上面文字的话应该会更好理解

celery 架构示意图

由于 celery 只是任务队列,而不是真正意义上的消息队列,它自身不具有存储数据的功能,所以 broker 和 backend 需要通过第三方工具来存储信息,celery 官方推荐的是 RabbitMQ和 Redis,另外 mongodb 等也可以作为 broker 或者 backend,可能不会很稳定,我们这里选择 Redis 作为 broker 兼 backend。

关于 redis 的安装和配置可以查看 这里

实际例子

先安装 celery

pip install celery

我们以官网给出的例子来做说明,并对其进行扩展。首先在项目根目录下,这里我新建一个项目叫做 celerystudy,然后切换到该项目目录下,新建文件 tasks.py,然后在其中输入下面代码

  1. from celery import Celery

  2. app = Celery('tasks', broker='redis://:''@223.129.0.190:6379/2', backend='redis://:''@223.129.0.190:6379/3')

  3. @app.task

  4. def add(x, y):

  5.    return x   y

这里我详细讲一下代码:我们先通过 app=Celery() 来实例化一个 celery 对象,在这个过程中,我们指定了它的 broker,是 redis 的 db 2,也指定了它的 backend,是 redis 的 db3, broker 和 backend 的连接形式大概是这样

redis://:password@hostname:port/db_number

然后定义了一个 add 函数,重点是 @app.task,它的作用在我看来就是将 add() 
注册为一个类似服务的东西
,本来只能通过本地调用的函数被它装饰后,就可以通过网络来调用。这个 tasks.py 中的 app 就是一个 worker。它可以有很多任务,比如这里的任务函数 add。我们再通过在命令行切换到项目根目录,执行

celery -A tasks worker -l info

启动成功后就是下图所示的样子

celery 的 worker 启动成功

这里我说一下各个参数的意思, -A 指定的是 app(即 Celery 实例) 所在的文件模块,我们的 app 是放在 tasks.py 中,所以这里是 tasks;worker 表示当前以 worker 的方式运行,难道还有别的方式?对的,比如运行定时任务就不用指定 worker 这个关键字; -l info 表示该 worker 节点的日志等级是 info,更多关于启动 worker 的参数(比如 -c、 -Q 等常用的) 请使用

celery worker --help

进行查看

将 worker 启动起来后,我们就可以通过网络来调用 add 函数了。我们在后面的分布式爬虫构建中也是采用这种方式分发和消费 url 的。在命令行先切换到项目根目录,然后打开 python 交互端

from tasks import add 
rs = add.delay(2, 2) # 这里的 add.delay 就是通过网络调用将任务发送给 add 所在的 worker 执行

这个时候我们可以在 worker 的界面看到接收的任务和计算的结果。

[2017-05-19 14:22:43,038: INFO/MainProcess] Received task: tasks.add[c0dfcd0b-d05f-4285-b944-0a8aba3e7e61] # worker 接收的任务

[2017-05-19 14:22:43,065: INFO/MainProcess] Task tasks.add[c0dfcd0b-d05f-4285-b944-0a8aba3e7e61] succeeded in 0.025274309000451467s: 4 # 执行结果

这里是异步调用,如果我们需要返回的结果,那么要等 rs 的 ready 状态 true 才行。这里 add 看不出效果,不过试想一下,如果我们是调用的比较占时间的 io 任务,那么异步任务就比较有价值了

rs #

rs.ready() # true 表示已经返回结果了

rs.status # 'SUCCESS' 任务执行状态,失败还是成功

rs.successful() # True 表示执行成功

rs.result # 4 返回的结果

rs.get() # 4 返回的结果

#这里我们 backend 结果存储在 redis 里

上面讲的是从 Python 交互终端中调用 add 函数,如果我们要从另外一个 py 文件调用呢?除了通过 import 然后 add.delay() 这种方式,我们还可以通过 send_task() 这种方式,我们在项目根目录另外新建一个 py 文件叫做 excute_tasks.py,在其中写下如下的代码

  1. from tasks import add

  2. if __name__ == '__main__':

  3.    add.delay(5, 10)

这时候可以在 celery 的 worker 界面看到执行的结果

  1. [2017-05-19 14:25:48,039: INFO/MainProcess] Received task: tasks.add[f5ed0d5e-a337-45a2-a6b3-38a58efd9760]

  2. [2017-05-19 14:25:48,074: INFO/MainProcess] Task tasks.add[f5ed0d5e-a337-45a2-a6b3-38a58efd9760] succeeded in 0.03369094600020617s: 15

此外,我们还可以通过 send_task() 来调用,将 excute_tasks.py 改成这样

  1. from tasks import app

  2. if __name__ == '__main__':

  3.    app.send_task('tasks.add', args=(10, 15),)

这种方式也是可以的。 send_task() 还可能接收到为注册(即通过 @app.task 装饰)的任务,这个时候 worker 会忽略这个消息

[2017-05-19 14:34:15,352: ERROR/MainProcess] Received unregistered task of type 'tasks.adds'. 
The message has been ignored and discarded.

定时任务

上面部分讲了怎么启动 worker 和调用 worker 的相关函数,这里再讲一下 celery 的定时任务。

爬虫由于其特殊性,可能需要定时做增量抓取,也可能需要定时做模拟登陆,以防止 cookie 过期,而 celery 恰恰就实现了定时任务的功能。在上述基础上,我们将 tasks.py 文件改成如下内容

  1. from celery import Celery

  2. app = Celery('add_tasks', broker='redis:''//223.129.0.190:6379/2', backend='redis:''//223.129.0.190:6379/3')

  3. app.conf.update(

  4.   #  配置所在时区

  5.    CELERY_TIMEZONE='Asia/Shanghai',

  6.    CELERY_ENABLE_UTC=True,

  7.    #  官网推荐消息序列化方式为 json

  8.    CELERY_ACCEPT_CONTENT=['json'],

  9.    CELERY_TASK_SERIALIZER='json',

  10.    CELERY_RESULT_SERIALIZER='json',

  11.   # 配置定时任务

  12.    CELERYBEAT_SCHEDULE={

  13.        'my_task': {

  14.            'task': 'tasks.add',  # tasks.py 模块下的 add 方法

  15.            'schedule': 60,      # 每隔 60 运行一次

  16.            'args': (23, 12),

  17.        }

  18.    }

  19. )

  20. @app.task

  21. def add(x, y):

  22.    return x   y

然后先通过 ctrl c 停掉前一个 worker,因为我们代码改了,需要重启 worker 才会生效。我们再次以 celery -A tasks worker -l info 这个命令开启 worker。

这个时候我们只是开启了 worker,如果要让 worker 执行任务,那么还需要通过 beat 给它定时发送,我们再开一个命令行,切换到项目根目录,通过

celery beat -A tasks -l info

  1. celery beat v3.1.25 (Cipater) is starting.

  2. __    -    ... __   -        _

  3. Configuration ->

  4.    . broker -> redis://223.129.0.190:6379/2

  5.    . loader -> celery.loaders.app.AppLoader

  6.    . scheduler -> celery.beat.PersistentScheduler

  7.    . db -> celerybeat-schedule

  8.    . logfile -> [stderr]@%INFO

  9.    . maxinterval -> now (0s)

  10. [2017-05-19 15:56:57,125: INFO/MainProcess] beat: Starting...

这样就表示定时任务已经开始运行了。

眼尖的同学可能看到我这里 celery 的版本是 3.1.25,这是因为 celery 支持的 windows 最高版本是 3.1.25。由于我的分布式微博爬虫的 worker 也同时部署在了 windows 上,所以我选择了使用 3.1.25。如果全是 linux 系统,建议使用 celery4。

此外,还有一点需要注意,在 celery4 后,定时任务(通过 schedule 调度的会这样,通过 crontab 调度的会马上执行)会在当前时间再过定时间隔执行第一次任务,比如我这里设置的是 60 秒的间隔,那么第一次执行 add 会在我们通过 celery beat -A tasks -l info 启动定时任务后 60 秒才执行;celery3.1.25 则会马上执行该任务。

关于定时任务更详细的请看官方文档 celery 定时任务


至此,我们把构建一个分布式爬虫的理论知识都讲了一遍,主要就是对于 celery 的了解和使用,这里并未涉及到 celery 的一些高级特性,实战篇可能会讲解一些我自己使用的特性。

下一篇我将介绍如何使用 celery 写一个简单的分布式爬虫,希望大家能有所收获。


题图:pexels,CC0 授权。

点击阅读原文,查看更多 Python 教程和资源。