专栏名称: ImportNew
伯乐在线旗下账号,专注Java技术分享,包括Java基础技术、进阶技能、架构设计和Java技术领域动态等。
相关文章推荐
芋道源码  ·  用了Stream后,代码反而越写越丑? ·  22 小时前  
芋道源码  ·  新来个阿里 P7,仅花 2 ... ·  2 天前  
芋道源码  ·  如何应对消息堆积? ·  3 天前  
芋道源码  ·  DeepSeek 秒变智能客服,咨询效率翻倍!!! ·  3 天前  
芋道源码  ·  Cloudflare ... ·  3 天前  
51好读  ›  专栏  ›  ImportNew

RabbitMQ 指南( 上 )

ImportNew  · 公众号  · Java  · 2017-05-01 13:02

正文

(点击 上方公众号 ,可快速关注)


来源:Listen,

listenzhangbin.com/post/2016/10/rabbitmq-tutorials-one/

如有好文章投稿,请点击 → 这里了解详情


RabbitMQ是一个消息中间件,在一些需要异步处理、发布/订阅等场景的时候,使用RabbitMQ可以完成我们的需求。 下面是我在学习RabbitMQ的过程中的一些记录,内容主要翻译自RabbitMQ官网的Tutorials, 再加上我的一些个人理解。我将会用三篇文章来从RabbitMQ的Hello World介绍起,到最后的通过RabbitMQ实现RPC调用, 相信看完这三篇文章大家应该会对RabbitMQ的基本概念和使用有一定的了解。


说明:


由于RabbitMQ支持许多种语言的client,在这里我使用的是Java语言的client。

所有的图片均来自RabbitMQ官网。


Hello World


首先需要安装RabbitMQ,关于RabbitMQ的安装这里就不赘述了,可以到RabbitMQ的官网去看相应的OS的安装方法。 安装完成后使用rabbitmq-server即可启动RabbitMQ,RabbitMQ还提供了一个UI管理界面,本地默认的地址为localhost:15672, 用户名和密码均为guest。


安装完成之后,按照惯例,先来完成一个简单的Hello World的例子。 最简单的一种消息发送的模型为一个消息发送者(Producer)将消息发送到Queue中,另一端的消息接受者(Consumer)从Queue中接受消息, 大致模型如下图所示:



先来看发送的代码,新建一个类命名为Send.java,代码的第一步为连接server


ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();


connection抽象了socket的连接,并且为我们处理了协议版本的协商、权限认证等等。这里我们连接的是本地的中间件, 也就是localhost,接下来我们创建一个channel,这是大多数API完成任务的所在,也就是说我们的API操作基本都是通过channel来完成的。


channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String message = "Hello World!";

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");


首先是通过channel来声明一个queue,并且声明queue的操作是幂等的,也即是说只有在这个queue不存在的情况下才会新创建一个queue。 这里发送一个Hello World!的消息,实际传递的消息内容为字节数组。


channel.close();

connection.close();


最后关闭channel和connection的连接,注意关闭的顺序,是先关闭channel的连接,再关闭connection的连接。


完整的Send.java代码


public class Send {

private static final String QUEUE_NAME = "hello";

public static void main(String[] args) {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String message = "Hello World!";

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

channel.close();

connection.close();

}

}


完成发送的代码之后是接受消息的代码,新建一个类为Recv.java


public class Recv {

private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

}

};

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}


可以发现一开始的连接部分的代码是相同的,在接收的时候我们也要声明一个queue,注意这里queue的名称和之前发送消息声明的queue的名称必须是相同的, 否则就收不到消息了。


DefaultConsumer类实现了Consumer接口,由于发送消息是异步的,因此在这里提供了一个callback来缓冲消息, 直到我们准备使用这些消息,最后分别运行Send.java和Recv.java,就能看到Hello World!消息了。


Work Queues


在第一部分的Hello World中通过一个命名的queue来传递消息,在这一部分,我们会创建Work Queue来将耗时的任务分发至多个worker。 假设一个消息就是一个耗时的任务,比如文件I/O等等,那么可以通过几个worker来共同完成这些工作。



在Web应用中这是非常有用的,因为在一次非常短的HTTP请求窗口中完成一个非常复杂的任务是很困难的。


准备


这一部分是建立在上一部分Hello World的基础之上的,我们将发送字符串来表示一些复杂的任务, 由于并没有一些真实的复杂的工作,因此使用Thread.sleep()来模拟这是一个很耗时的任务, 并且在发送的字符串当中含有一个点号就表示这个任务需要耗时1秒,比如发送Hello...表示将要耗时3秒。


在前一部分的Send.java的基础上做一些修改,得到一个新的类称为NewTask.java。


String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");


getMessage方法为从命令行中获取参数


private static String getMessage(String[] strings){

if (strings.length

return "Hello World!";

return joinStrings(strings, " ");

}

private static String joinStrings(String[] strings, String delimiter) {

int length = strings.length;

if (length == 0) return "";

StringBuilder words = new StringBuilder(strings[0]);

for (int i = 1; i

words.append(delimiter).append(strings[i]);

}

return words.toString();

}


我们之前的Recv.java也需要做一些变化,它需要模拟一些耗时的任务,消息内容中一个.表示1秒,并且它会处理消息, 我们称它为Worker.java


final Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

try {

doWork(message);

} finally {

System.out.println(" [x] Done");

}

}

};

boolean autoAck = true; // acknowledgment is covered below

channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);


这里有一个autoAck变量的作用在后面会提到。doWork方法就是模拟的耗时任务


private static void doWork(String task) throws InterruptedException {

for (char ch: task.toCharArray()) {







请到「今天看啥」查看全文