专栏名称: SegmentFault思否
SegmentFault (www.sf.gg)开发者社区,是中国年轻开发者喜爱的极客社区,我们为开发者提供最纯粹的技术交流和分享平台。
目录
相关文章推荐
程序员的那些事  ·  OpenAI ... ·  昨天  
程序员小灰  ·  清华大学《DeepSeek学习手册》(全5册) ·  2 天前  
OSC开源社区  ·  宇树王兴兴早年创业分享引围观 ·  3 天前  
程序猿  ·  “未来 3 年内,Python 在 AI ... ·  4 天前  
51好读  ›  专栏  ›  SegmentFault思否

RabbitMQ + PHP 教程一:Hello World

SegmentFault思否  · 公众号  · 程序员  · 2018-11-02 08:00

正文

介绍

RabbitMQ是一个消息代理器:它接受和转发消息。你可以把它当作一个邮局:当你把邮件放在信箱里时,你可以肯定邮差先生最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ就是这里的邮箱,邮局和邮差。

RabbitMQ和邮局之间的主要区别是,它不处理纸张,而是接受、存储和转发二进制数据——消息。

RabbitMQ,和一般的消息传递,使用专业术语。

生产者的工作就是发送消息。发送消息的程序是生产者:

队列类比一个邮箱,存在于RabbitMQ, 然而信息流通过RabbitMQ和您的应用程序,他们只能存储在一个队列。队列只受主机内存和磁盘限制的约束,它本质上是一个很大的消息缓冲区。会有许多生产者可以发送到一个队列的消息,许多消费者可以尝试从一个队列接收数据。这就是我们如何表示队列的方式:

消费者和生产者有着相似的意义. 消费者无非就是等待消息然后处理的程序:

请注意,生产者、消费者和代理不必同一主机上;事实上,在大多数应用程序中它们没有这样做。

"Hello World"

(使用PHP amqplib客户端)

在本教程的这一部分中,我们将用PHP编写两个程序;一个生产者发送一条消息,一个用户接收消息并将它们打印出来。我们会PHP amqplib API的忽略一些细节,集中在这个非常简单的事情刚刚开始。这是一个“Hello World”的消息传递。

在下图中,“p”是我们的生产商,“C”是我们的消费者。在中间的框是一个队列的消息缓冲区,RabbitMQ保持代表的消费。

PHP amqplib客户端库

RabbitMQ有很多协议。本教程介绍AMQP 0-9-1,这是一个开放的、通用的协议消息。有许多不同的语言RabbitMQ一批客户。我们将在本教程中使用PHP amqplib,composer解决依赖管理。

添加composer.json:

  1. {

  2.    "require": {

  3.        "php-amqplib/php-amqplib": ">=2.6.1"

  4.    }

  5. }

  1. composer install

  2. # 或者 直接运行包引入

  3. composer require php-amqplib/php-amqplib

现在我们可以开始我们的hello world

生产者(消息发送方):

我们命令我们的消息发布者(发送者)send.php和消息接收receive.php。发送者将连接到RabbitMQ,发送一条消息,然后退出。

  1. require_once __DIR__ . '/vendor/autoload.php';

  2. use PhpAmqpLib\Connection\AMQPStreamConnection;

  3. use PhpAmqpLib\Message\AMQPMessage;

现在我们能创建一个连接服务器的Connection:

  1. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

  2. $channel = $connection->channel();

该连接抽象套接字(socket)连接,并为我们负责协议版本协商和认证等。这里,我们连接到一个rabbitmq代理器在本地机器上-使用localhost。如果我们想在不同的机器上连接到一个代理,我们只需在这里指定它的名称或IP地址。

接下来,我们创建一个通道,这是处理事情的大部分API的地方。

发送消息前,我们必须声明一个队列为我们发送做准备;然后我们可以向队列发布消息:

  1. $channel->queue_declare('hello', false, false, false, false);

  2. $msg = new AMQPMessage('Hello World!');

  3. $channel->basic_publish($msg, '', 'hello');

  4. echo " [x] Sent 'Hello World!'\n";

声明队列是幂等的(原句:Declaring a queue is idempotent,这里的idempotent不知道是什么意思) - 只有在它不存在时才会创建队列。消息内容是一个字节数组,因此您可以在那里编码用你喜欢的方式。

最后,我们关闭通道和连接;

  1. $channel->close();

  2. $connection->close();

上面我们完成了send.php。

接下来我们完成消费方的代码。

消费者(接收方,任务处理方)

消费者从RabbitMQ接收推来的消息,我们会保持运行监听消息并打印出来。

引入lib:

  1. require_once __DIR__ . '/vendor/autoload.php';

  2. use PhpAmqpLib\Connection\AMQPStreamConnection;

设置与发布程序相同;我们打开一个连接和一个通道,并声明将要消耗的队列。注意,这与发送发布的队列匹配。

  1. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

  2. $channel = $connection->channel();

  3. $channel->queue_declare('hello', false, false, false, false);

  4. echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

注意,我们也在这里声明队列。因为我们可能在发布之前启动消费者,我们希望在我们尝试从它那里消费消息之前确定队列的存在。

我们将告诉服务器从队列中发送消息。我们将定义一个PHP可调用,它将接收服务器发送的消息。请记住,消息是从服务器异步发送到客户机的。

  1. $callback = function($msg) {

  2.  echo " [x] Received ", $msg->body, "\n";

  3. };

  4. $channel->basic_consume('hello', '', false, true, false, false, $callback);

  5. while(count($channel->callbacks)) {

  6.    $channel->wait();

  7. }

当调用basic_consume,我们的代码会阻塞。当我们收到消息时,我们的回调函数将通过接收到返回的消息传递。

以上是我们receive.php的代码。

运行测试

运行消费者:

  1. php receive.php

运行消息发送方:

  1. php send.php

列出队列

  1. rabbitmqctl list_queues

完整源码(调整过)

config.php

  1. php

  2. return [

  3.    'vendor' => [

  4.        'path' => dirname(dirname(__DIR__)) . '/vendor'

  5.    ],

  6.    'rabbitmq' => [

  7.        'host' => '127.0.0.1',

  8.        'port' => '5672',

  9.        'login' => 'qkl',

  10.        'password' => '123456',

  11.        'vhost' => '/'

  12.    ]

  13. ];

  14. ?>

receive.php

  1. php

  2. $config = require "../config.php";

  3. require_once $config['vendor']['path'] . '/autoload.php';

  4. use PhpAmqpLib\Connection\AMQPStreamConnection;

  5. use PhpAmqpLib\Message\AMQPMessage;

  6. $connection = new AMQPStreamConnection($config['rabbitmq']['host'], $config['rabbitmq']['port'],

  7.    $config['rabbitmq']['login'], $config['rabbitmq']['password'], $config['rabbitmq']['vhost']);

  8. $channel = $connection->channel();

  9. $channel->queue_declare('hello', false, false, false, false);

  10. echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

  11. $callback = function($msg) {

  12.    echo " [x] Received ", $msg->body, "\n";

  13. };

  14. $channel->basic_consume('hello', '', false, true, false,







请到「今天看啥」查看全文