专栏名称: Python中文社区
致力于成为国内最好的Python开发者学习交流平台,这里有关于Python的国内外最新消息,每日推送有趣有料的技术干货和社区动态。 官方网站:www.python-cn.com
目录
相关文章推荐
Python爱好者社区  ·  王炸!DeepSeek彻底爆了! ·  16 小时前  
Python爱好者社区  ·  在银行做开发半年,已经丧失跳槽能力。。。 ·  16 小时前  
Python开发者  ·  北京大学出的第二份 DeepSeek ... ·  3 天前  
Python爱好者社区  ·  DeepSeek 被放弃了,阿里牛逼! ·  3 天前  
Python爱好者社区  ·  刚刚,DeepSeek开源MoE训练、推理E ... ·  3 天前  
51好读  ›  专栏  ›  Python中文社区

Python云计算框架:OpenStack源码分析之RabbitMQ(二)

Python中文社区  · 公众号  · Python  · 2017-01-19 21:53

正文

專 欄


ZZR ,Python中文社区专栏作者,OpenStack工程师,曾经的NLP研究者。主要兴趣方向:OpenStack、Python爬虫、Python数据分析。

Blog: http://skydream.me/

CSDN: http://blog.csdn.net/titan0427/article/details/50365480

fanout exchange

在上一篇中,task_queue所解决的问题是,一个message只能被一种consumer所接收。现在我们有了新的需求,我们需要一个日志系统,我们希望有两种consumer,一种consumer将日志输出到屏幕,另一种consumer写到disk。为了实现这个目的,我们希望message被投到两个queue中,交给不同的consumer进行处理。如下所示:


对于producer,不再指定哪一个queue来接收消息,而是哪一个exchange来接收消息。exchange不保存信息,如果没有queue绑定在这个exchange上的话,消息就丢失了。代码如下:

  1. # filename: emit_log.py

  2. # !/usr/bin/env python

  3. import pika

  4. import sys


  5. connection = pika.BlockingConnection(pika.ConnectionParameters(

  6.    host='localhost'))

  7. channel = connection.channel()

  8. channel.exchange_declare(exchange='logs',

  9.                         type='fanout')

  10. message = ' '.join(sys.argv[1:]) or "info: Hello World!"

  11. channel.basic_publish(exchange='logs',

  12.                      routing_key='',

  13.                      body=message)

  14. print(" [x] Sent %r" % message)

  15. connection.close()

对于consumer,自己创建临时queue( exclusive=True ,当这个consumer终止,这个queue就销毁),将这个queue接到exchange上,然后通过这个queue接收exchange发出的消息:

  1. # filename: receive_logs.py

  2. # !/usr/bin/env python

  3. import pika


  4. def callback(ch, method, properties, body):

  5.    print(" [x] %r" % body)


  6. connection = pika.BlockingConnection(pika.ConnectionParameters(

  7.    host='localhost'))

  8. channel = connection.channel()

  9. channel.exchange_declare(exchange='logs',

  10.                         type= 'fanout')

  11. result = channel.queue_declare(exclusive=True)

  12. queue_name = result.method.queue

  13. channel.queue_bind(exchange='logs',

  14.                   queue=queue_name)


  15. print(' [*] Waiting for logs. To exit press CTRL+C')

  16. channel.basic_consume(callback,

  17.                      queue=queue_name,

  18.                      no_ack=True)

  19. channel.start_consuming()

如下图所示,每一个consumer都建了自己的临时queue,并与exchange进行了绑定:


direct exchange

上面的例子中,queue_bind()的时候我们没有指定routing_key(为了避免混淆,后续将其称为binding_key)。binding_key的功能与exchange的类型有关。对于fanout exchange而言,binding_key没有意义。对于direct exchange而言,如下图所示,只有消息的routing_type与queue的binding_key相同时才会发送到该queue:



可以指定相同的binding_key,如下图所示:


借此,我们可以将日志系统改造为以下模式,不同的consumer只接收特定类型的日志信息:


完整代码如下:

  1. # filename: emit_log.py

  2. #!/usr/bin/env python

  3. import pika

  4. import sys


  5. connection = pika.BlockingConnection(pika.ConnectionParameters(

  6.        host='localhost'))

  7. channel = connection.channel()


  8. channel.exchange_declare(exchange='direct_logs',

  9.                         type='direct')


  10. severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

  11. message = ' '.join(sys.argv[2:]) or 'Hello World!'

  12. channel.basic_publish(exchange='direct_logs',

  13.                      routing_key=severity,

  14.                      body=message)

  15. print(" [x] Sent %r:%r" % (severity, message))

  16. connection.close()

  1. # filename: receiver_logs.py

  2. #!/usr/bin/env python

  3. import pika

  4. import sys


  5. def callback(ch, method, properties, body):

  6.    print(" [x] %r:%r" % (method.routing_key, body))


  7. connection = pika.BlockingConnection(pika.ConnectionParameters(

  8.    host='localhost'))

  9. channel = connection.channel()

  10. channel.exchange_declare(exchange='direct_logs',

  11.                         type='direct')

  12. result = channel.queue_declare(exclusive=True)

  13. queue_name = result.method.queue


  14. severities = sys.argv[1:]

  15. if not severities:

  16.    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])

  17.    sys.exit(1)

  18. for severity in severities:

  19.    channel.queue_bind(exchange='direct_logs',

  20.                       queue=queue_name,

  21.                       routing_key=severity)


  22. print(' [*] Waiting for logs. To exit press CTRL+C')

  23. channel.basic_consume(callback,

  24.                      queue=queue_name,

  25.                      no_ack=True)

  26. channel.start_consuming()

topic exchange

topic exchange与direct exchange类似,只是它允许binding_key使用特殊字符(注意,特殊字符代表的是单词,不是字母):
-
* :代表一个单词
-
# :代表零个或多个单词

比如下面这个例子。routing_key定义为 " . . " 。举几个routing_key匹配的例子:
- quick.orange.rabbit:一,二
- lazy.orange.elephant:一,二
- quick.orange.fox:一
- lazy.brown.fox:三
- lazy.pink.rabbit:二,三(但只发一次,因为二和三是同一个队列)
- quick.brown.fox:无,舍弃
- orange:无,舍弃
- quick.orange.male.rabbit:无,舍弃
- lazy.orange.male.rabbit:三



当binding_key不使用这些特殊字符时,topic exchange其实就是direct exchange;当binding_key使用 # 时,topic exchange其实就是fanout exchange,也就是所有消息都接收。

一些极端的例子:
binding_key:
* ,routing_key:空串。不匹配。
binding_key:
#.* ,routing_key: .. 。匹配。
binding_key:
#.* ,routing_key:apple。匹配。

完整代码:

  1. # filename: emit_log_topic.py

  2. #!/usr/bin/env python

  3. import pika

  4. import sys


  5. connection = pika. BlockingConnection(pika.ConnectionParameters(

  6.        host='localhost'))

  7. channel = connection.channel()


  8. channel.exchange_declare(exchange='topic_logs',

  9.                         type='topic')


  10. routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'

  11. message = ' '.join(sys.argv[2:]) or 'Hello World!'

  12. channel.basic_publish(exchange='topic_logs',

  13.                      routing_key=routing_key,

  14.                      body=message)

  15. print (" [x] Sent %r:%r" % (routing_key, message))

  16. connection.close()

  1. # filename: receive_logs_topic.py

  2. #!/usr/bin/env python

  3. import pika

  4. import sys


  5. connection = pika.BlockingConnection(pika.ConnectionParameters(

  6.        host='localhost'))

  7. channel = connection.channel()


  8. channel.exchange_declare(exchange='topic_logs',

  9.                         type= 'topic')


  10. result = channel.queue_declare(exclusive=True)

  11. queue_name = result.method.queue


  12. binding_keys = sys.argv[1:]

  13. if not binding_keys:

  14.    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])

  15.    sys.exit(1)


  16. for binding_key in binding_keys:

  17.    channel.queue_bind(exchange='topic_logs',

  18.                       queue=queue_name,

  19.                       routing_key=binding_key)


  20. print(' [*] Waiting for logs. To exit press CTRL+C')


  21. def callback(ch, method, properties, body):

  22.    







请到「今天看啥」查看全文