专栏名称: Python中文社区
致力于成为国内最好的Python开发者学习交流平台,这里有关于Python的国内外最新消息,每日推送有趣有料的技术干货和社区动态。 官方网站:www.python-cn.com
目录
相关文章推荐
Python爱好者社区  ·  刚刚,DeepSeek开源MoE训练、推理E ... ·  2 天前  
Python开发者  ·  实测满血版 DeepSeek!学习 ... ·  2 天前  
Python开发者  ·  清北 DeepSeek ... ·  2 天前  
Python开发者  ·  OpenAI ... ·  3 天前  
Python开发者  ·  清华大学:DeepSeek + ... ·  3 天前  
51好读  ›  专栏  ›  Python中文社区

Python云计算框架:Openstack源码分析之RabbitMQ(一)

Python中文社区  · 公众号  · Python  · 2016-12-22 22:11

正文

專 欄

ZZR,Python中文社区专栏作者,OpenStack工程师,曾经的NLP研究者。主要兴趣方向:OpenStack、Python爬虫、Python数据分析。

Blog: http://skydream.me/

CSDN: http://blog.csdn.net/titan0427/article/details/50365480

在Openstack中,各个组件内部使用消息队列进行通信,其中,RabbitMQ是常用的一种开源消息代理软件。这里作一个简要介绍。

RabbitMQ介绍


RabbitMQ实现了高级消息队列协议(AMQP)。

AMQP

AMQP是一个定义了应用之间消息传送协议的开放标准. AMQP旨在解决在两个应用之间传送消息存在的以下问题:

  • 网络是不可靠的 -> 消息需要保存后再转发并有出错处理机制

  • 与本地调用相比,网络速度慢 -> 得支持异步调用

  • 应用之间是不同的(比如实现语言不同, 操作系统不同),且应用会经常变化 -> 得与应用无关

AMQP 使用异步的、应用对应用的、二进制数据通信来解决这些问题。


基本组件

RabbitMQ 是 AMQP 的一种实现, 其基本组件包括:
- Producer:Message的生产者, 负责产生消息并把消息发到Exchange。
- Message:RabbitMQ 转发的二进制对象,包括Headers、Properties和 Data。其中Data不是必要的。
- Exchange:负责接收Producer的Message, 并把它转发到合适的Queue.
- Binding:标识Queue和Exchange之间的关系。Exchange根据Message的Properties和Binding的Properties来确定将消息转发到哪些Queue。一个重要的Properties是binding_key。
- Queue:缓存Exchange发来的消息,并将消息主动发给Consumer或者由Consumer主动来获取消息。
- Consumer:使用Queue从Exchange中获取Message。

Message和Exchange

Message

消息结构包括:Headers、Properties和data。

其中,Properties包括几个重要的属性:

  • routing_key:Direct和Topic类型的exchange会根据本属性来转发消息。

  • delivery_mode:将其值设置为2将使用消息持久化。持久化的消息会被保存到磁盘。

  • reply_to:客户端回调队列的名字。

  • correlation_id

  • content_type

Exchange

Exchange有几个重要的属性:

  • name:exchange名字。空字符串名字的exchange为默认的exchange。

  • type:决定了exchange的消息转发方式, 包括direct、fanout、topic和headers。

  • durable:值为True的exchange会在rabbitmq重启会自动创建。Openstack使用的exchange该值都为False。

  • auto_delete:值为True的exchange当消费者的连接都关闭后会被自动删除。 Openstack使用的exchange该值都为False。

  • exclusive:设置为True的话,该exchange只允许被创建的connection使用,且在该connection关闭后它会自动删除。

RabbitMQ消息路由机制

决定Exchange消息路由的属性有:

  • Exchange的type

  • Message的routing_key

  • Binding的binding_key

具体规则如下:

RabbitMQ有多种版本的客户端,本文使用Pika,安装如下。
  1. $ pip install pika

RabbitMQ扩展插件

Management Plugin

提供GUI来管理RabbitMQ。官方地址: https://www.rabbitmq.com/management.html

RabbitMQ用户密码可以在 /etc/rabbitmq/abbitmq.config 查看:

打开图形界面:

  1. # rabbitmq-plugins enable rabbitmq_management

然后通过端口15672就可以访问web管理界面。

Hello World!

首先,我们尝试从Publisher发送一条消息“Hello World”到Consumer。


Publisher


发送消息 主要包括以下几个操作:
1. 与RabbitMQ建立连接。
2. 声明要使用的queue。
3. RabbitMQ中,消息不会直接发到queue,而是发到exchange,由exchange转发到相应的queue。下面的例子中使用了默认的exchange,它会进行定向转发,也就是将message发到routing_key所指定的queue中。
4. 最后,为了保证网络缓存flushed(也就是消息被发出去了),手动关闭连接。

  1. # filename: send.py

  2. #!/usr/bin/env python

  3. import pika



  4. connection = pika.BlockingConnection(pika.ConnectionParameters(

  5.    host= 'localhost'))

  6. channel = connection.channel()

  7. channel.queue_declare(queue='hello')

  8. channel.basic_publish(exchange='',

  9.                      routing_key='hello',

  10.                      body='Hello World!')

  11. print(" [x] Sent 'Hello World!'")

  12. connection.close()

执行完以上指令,通过命令行你可以看到queue已经被建立且包含了我们发出的信息:

Consumer


接收消息 主要包括以下几个操作:
1. 与RabbitMQ建立连接。
2. 声明监听的queue。
3. 建立consumer。comsumer需要一个回调函数来负责处理接收到的消息。
4. start_consuming(),其本质是一个while循环,不断取出队列中的消息。

  1. # filename: receive.py

  2. #!/usr/bin/env python

  3. import pika



  4. connection = pika.BlockingConnection(pika.ConnectionParameters(

  5.    host='localhost'))

  6. channel = connection.channel()

  7. channel.queue_declare(queue='hello')



  8. def callback(ch, method, properties, body):

  9.    print(" [x] Received %r" % body)

  10. channel.basic_consume(callback,

  11.                      queue ='hello',

  12.                      no_ack=True)

  13. print(' [*] Waiting for messages. To exit press CTRL+C')

  14. channel.start_consuming()

Work Queues

在上一个例子中,consumer只是简单地打印信息,在这个例子中,我们将consumer改为一个worker,它将根据消息完成一些任务。其本质和print("hello world")并没有什么区别,但是为了保证任务能正确完成,需要一些额外的操作,使workder更健壮。

consumer挂了怎么办

改写以上代码。

publisher发送的消息可以从命令参数中读取。参数包括若干个点,点的数量决定了consumer需要花多少秒来完成任务:

  1. # send.py

  2. message = ' '.join(sys.argv[1:]) or "Hello World!"

  3. channel.basic_publish(exchange='',

  4.                      routing_key='hello',

  5.                      body=message)

  6. print(" [x] Sent %r" % message)

consumer处理消息的回调函数,将根据message进行sleep():

  1. # receive.py

  2. def callback(ch, method, properties, body):

  3.    print(" [x] Received %r" % body)

  4.    time.sleep(body.count(b'.'))

  5.    print(" [x] Done")

  6.    ch.basic_ack(delivery_tag=method.delivery_tag)

  7. channel.basic_consume(callback,

  8.                      queue='hello')

当consumer处理完任务,会回复ack。如果没有ack,这个消息将在queue中处于unacknowledged状态。如果这个consumer处理过程中挂了,这个message将被分发给其它consumer。这个机制 保证了所有的消息都可以被处理 。(一种很坏的情况是,consumer处理了message但没有返回ack,但这个consumer又一直不挂,那么这些被它处理的message就会一直以unack的状态保存在queue中。)


rabbitmq挂了怎么办

为了保证rabbitmq挂了都不会使message消失,我们必须保证:
- queue持久化
- message持久化

由于rabbitmq不允许两个队列重名,下面的代码改用task_queue作为队列名。修改代码如下:

  1. # filename: send.py

  2. channel.queue_declare(queue='task_queue', durable=True)

  3. channel.basic_publish(exchange='',

  4.                      routing_key="task_queue",

  5.                      body=message,

  6.                      properties=pika.BasicProperties(

  7.                          delivery_mode =2, )

  8.                      )

  1. # filename: receive.py

  2. channel.queue_declare(queue='task_queue', durable=True)

任务平均分配

目前的情况是,任务将被平均分配给每一个consumer。比如,如果有两个consumer,那么任务将你一个我一个来分配,而不会根据任务的复杂度来分配。一种极端情况是,奇数任务复杂度很高,偶数任务复杂度很低,那么就会导致一个consumer一直很忙,而另一个一直很闲。为此,进一步修改代码:

  1. # filename: receive.py

  2. channel.basic_qos(prefetch_count=1)

这个参数限制了consumer手上的message数量。如果consumer手上已经有一个unack的message,那么后续的message就不会发给它了。


完整代码

  1. # filename: send.py

  2. # !/usr/bin/env python

  3. import pika

  4. import sys



  5. connection = pika.BlockingConnection(pika.ConnectionParameters(

  6.    host='localhost'))

  7. channel = connection.channel()

  8. channel.queue_declare(queue='task_queue', durable=True)

  9. message = ' '.join(sys.argv[1:]) or "Hello World!"

  10. channel.basic_publish(exchange='',

  11.                      routing_key="task_queue",

  12.                      body=message,

  13.                      properties=pika.BasicProperties(

  14.                          delivery_mode=2, )

  15.                      )

  16. print(" [x] Sent %r" % message)

  17. connection.close()

  1. # filename: receive.py

  2. #!/usr/bin/env python

  3. import pika

  4. import time



  5. connection = pika.BlockingConnection(pika.ConnectionParameters(

  6.    host='localhost'))

  7. channel = connection.channel()

  8. channel.queue_declare(queue='task_queue', durable=True)

  9. channel.basic_qos







请到「今天看啥」查看全文