在分布式系统中,消息队列(Message Queue,简称 MQ) 用于交换系统之间的信息,是一个非常重要的中间组件。早在上世纪 80 年代,就已经有消息队列的概念了,不过当时叫做 TIB(The Information Bus),当时的消息队列大多是商业产品,直到 2001 年 Java 标准化组织(JCP)提出 JSR 914: Java Message Service (JMS) API,这是一个与平台无关的 API,为 Java 应用提供了统一的消息操作。 JMS 提供了两种消息模型:点对点(peer-2-peer)和发布订阅(publish-subscribe)模型,当前的大多数消息队列产品都可以支持 JMS,譬如:Apache ActiveMQ、RabbitMQ、Kafka 等。
不过,JMS 毕竟是一套 Java 规范,是和编程语言绑定在一起的,只能在 Java 类语言(比如 Scala、Groovy)中具有互用性,也就是说消息的生产者(Producer)和消费者(Consumer)都得用 Java 来编写。如何让不同的编程语言或平台相互通信呢?对于这个问题,摩根大通的 John O'Hara 在 2003 年提出了 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的概念,可以解决不同平台之间的消息传递交互问题,2004 到 2006 年之间,摩根大通和 iMatrix 公司一起着手 AMQP 标准的开发,并于 2006 年发布 AMQP 规范。AMQP 和 JMS 最大的区别在于它是一种通用的消息协议,更准确的说是一种 Wire Protocol(链接协议),AMQP 并不去限定 API 层的实现,而是只定义网络交换的数据格式,这和 HTTP 协议是类似的,使得 AMQP 天然就是跨平台的。
在之后的 2007 年,Rabbit 技术公司基于 AMQP 标准发布了 RabbitMQ 第一个版本。RabbitMQ 采用了 Erlang 语言开发,这是一种通用的面向并发的编程语言,使得 RabbitMQ 具有高性能、高并发的特点,不仅如此,RabbitMQ 还提供了集群扩展的能力,易于使用以及强大的开源社区支持,这让 RabbitMQ 在开源消息队列的产品中占有重要的一席之地。
![rabbitmq.png rabbitmq.png](https://www.aneasystone.com/usr/uploads/2018/08/1272044742.png)
一、RabbitMQ 安装
RabbitMQ 是用 Erlang 语言开发的,所以安装 RabbitMQ 之前,首先要安装 Erlang,在 Windows 上安装 Erlang 非常简单,直接去官网下载 Erlang OTP 的安装包文件并按提示点击安装即可。安装完成之后,我们就可以从 RabbitMQ 的官网下载和安装 RabbitMQ。其他操作系统的安装参考 Downloading and Installing RabbitMQ。
一切就绪后,我们运行 RabbitMQ Command Prompt
,如果你采用的是 RabbitMQ 的默认安装路径,命令提示符会显示:
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.7\sbin>
我们使用命令 rabbitmqctl status
查看 RabbitMQ 的服务状态:
$ rabbitmqctl status
Status of node rabbit@LAPTOP-MBA74KRU ...
[{pid,4248},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.7.7"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.7"},
{cowboy,"Small, fast, modern HTTP server.","2.2.2"},
{amqp_client,"RabbitMQ AMQP Client","3.7.7"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.7"},
{rabbit,"RabbitMQ","3.7.7"},
{rabbit_common,"Modules shared by rabbitmq-server and rabbitmq-erlang-client","3.7.7"},
{recon,"Diagnostic tools for production use","2.3.2"},
{ranch_proxy_protocol,"Ranch Proxy Protocol Transport","1.5.0"},
{ranch,"Socket acceptor pool for TCP protocols.","1.5.0"},
{ssl,"Erlang/OTP SSL application","9.0"},
{mnesia,"MNESIA CXC 138 12","4.15.4"},
{public_key,"Public key infrastructure","1.6"},
{asn1,"The Erlang ASN1 compiler version 5.0.6","5.0.6"},
{os_mon,"CPO CXC 138 46","2.4.5"},
{cowlib,"Support library for manipulating Web protocols.","2.1.0"},
{jsx,"a streaming, evented json parsing toolkit","2.8.2"},
{xmerl,"XML parser","1.3.17"},
{inets,"INETS CXC 138 49","7.0"},
{crypto,"CRYPTO","4.3"},
{lager,"Erlang logging framework","3.6.3"},
{goldrush,"Erlang event stream processor","0.1.9"},
{compiler,"ERTS CXC 138 10","7.2.1"},
{syntax_tools,"Syntax tools","2.1.5"},
{syslog,"An RFC 3164 and RFC 5424 compliant logging framework.","3.4.2"},
{sasl,"SASL CXC 138 11","3.2"},
{stdlib,"ERTS CXC 138 10","3.5"},
{kernel,"ERTS CXC 138 10","6.0"}]},
{listeners,
[{clustering,25672,"::"},
{amqp,5672,"::"},
{amqp,5672,"0.0.0.0"},
{http,15672,"::"},
{http,15672,"0.0.0.0"}]},
{vm_memory_calculation_strategy,rss},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,3380019200},
{disk_free_limit,50000000},
{disk_free,358400446464},
{run_queue,1},
{uptime,6855},
{kernel,{net_ticktime,60}}]
一般情况下,我们还会安装 RabbitMQ Management Plugin,先用 rabbitmq-plugins list
列出所有支持的插件:
$ rabbitmq-plugins list
Listing plugins with pattern ".*" ...
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@LAPTOP-MBA74KRU
|/
[ ] rabbitmq_amqp1_0 3.7.7
[ ] rabbitmq_auth_backend_cache 3.7.7
[ ] rabbitmq_auth_backend_http 3.7.7
[ ] rabbitmq_auth_backend_ldap 3.7.7
[ ] rabbitmq_auth_mechanism_ssl 3.7.7
[ ] rabbitmq_consistent_hash_exchange 3.7.7
[ ] rabbitmq_event_exchange 3.7.7
[ ] rabbitmq_federation 3.7.7
[ ] rabbitmq_federation_management 3.7.7
[ ] rabbitmq_jms_topic_exchange 3.7.7
[E*] rabbitmq_management 3.7.7
[e*] rabbitmq_management_agent 3.7.7
[ ] rabbitmq_mqtt 3.7.7
[ ] rabbitmq_peer_discovery_aws 3.7.7
[ ] rabbitmq_peer_discovery_common 3.7.7
[ ] rabbitmq_peer_discovery_consul 3.7.7
[ ] rabbitmq_peer_discovery_etcd 3.7.7
[ ] rabbitmq_peer_discovery_k8s 3.7.7
[ ] rabbitmq_random_exchange 3.7.7
[ ] rabbitmq_recent_history_exchange 3.7.7
[ ] rabbitmq_sharding 3.7.7
[ ] rabbitmq_shovel 3.7.7
[ ] rabbitmq_shovel_management 3.7.7
[ ] rabbitmq_stomp 3.7.7
[ ] rabbitmq_top 3.7.7
[ ] rabbitmq_tracing 3.7.7
[ ] rabbitmq_trust_store 3.7.7
[e*] rabbitmq_web_dispatch 3.7.7
[ ] rabbitmq_web_mqtt 3.7.7
[ ] rabbitmq_web_mqtt_examples 3.7.7
[ ] rabbitmq_web_stomp 3.7.7
[ ] rabbitmq_web_stomp_examples 3.7.7
使用下面的命令启用 Management Plugin:
$ rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@LAPTOP-MBA74KRU:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LAPTOP-MBA74KRU...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins.
然后访问 http://localhost:15672/ 就可以通过 Web UI 对 RabbitMQ 进行管理了(默认的用户名和密码是:guest/guest):
![rabbitmq-webui-management.jpg rabbitmq-webui-management.jpg](https://www.aneasystone.com/usr/uploads/2018/08/2885921510.jpg)
在生产环境安装 RabbitMQ 时,为了安全起见,我们最好在 Admin 标签下的 Users 里添加新的用户,并将 guest 用户移除。或者通过 rabbitmqctl 命令行:
$ rabbitmqctl add_vhost [vhost]
$ rabbitmqctl add_user [username] [password]
$ rabbitmqctl set_user_tags [username] administrator
$ rabbitmqctl set_permissions -p [vhost] [username] ".*" ".*" ".*"
关于 RabbitMQ 的安装,我们常常采用集群的形式,并且要保证消息队列服务的高可用性。这里有一篇文章可以参考《RabbitMQ集群安装配置+HAproxy+Keepalived高可用》。
二、RabbitMQ 核心概念
RabbitMQ 中有一些概念需要我们在使用前先搞清楚,主要包括以下几个:Broker、Virtual Host、Exchange、Queue、Binding、Routing Key、Producer、Consumer、Connection、Channel。这些概念之间的关系如下图所示(图片来源):
![rabbitmq-model.jpg rabbitmq-model.jpg](https://www.aneasystone.com/usr/uploads/2018/08/817174355.jpg)
- Broker
简单来说就是消息队列服务器的实体,类似于 JMS 规范中的 JMS provider。它用于接收和分发消息,有时候也称为 Message Broker 或者更直白的称为 RabbitMQ Server。 - Virtual Host
和 Web 服务器中的虚拟主机(Virtual Host)是类似的概念,出于多租户和安全因素设计的,可以将 RabbitMQ Server 划分成多个独立的空间,彼此之间互相独立,这样就可以将一个 RabbitMQ Server 同时提供给多个用户使用,每个用户在自己的空间内创建 Exchange 和 Queue。 - Exchange
交换机用于接收消息,这是消息到达 Broker 的第一站,然后根据交换机的类型和路由规则(Routing Key),将消息分发到特定的队列中去。常用的交换机类型有:direct (point-to-point)、topic (publish-subscribe) 和 fanout (multicast)。 - Queue
生产者发送的消息就是存储在这里,在 JMS 规范里,没有 Exchange 的概念,消息是直接发送到 Queue,而在 AMQP 中,消息会经过 Exchange,由 Exchange 来将消息分发到各个队列中。消费者可以直接从这里取走消息。 - Binding
绑定的作用就是把 Exchange 和 Queue 按照路由规则绑定起来,路由规则可由下面的 Routing Key 指定。 - Routing Key
路由关键字,Exchange 根据这个关键字进行消息投递。 - Producer/Publisher
消息生产者或发布者,产生消息的程序。 - Consumer/Subscriber
消息消费者或订阅者,接收消息的程序。 - Connection
生产者和消费者和 Broker 之间的连接,一个 Connection 实际上就对应着一条 TCP 连接。 - Channel
由于 TCP 连接的创建和关闭开销非常大,如果每次访问 Broker 都建立一个 Connection,在消息量大的时候效率会非常低。Channel 是在 Connection 内部建立的逻辑连接,相当于一次会话,如果应用程序支持多线程,通常每个线程都会创建一个单独的 Channel 进行通讯,各个 Channel 之间完全隔离,但这些 Channel 可以公用一个 Connection。
关于 RabbitMQ 中的这些核心概念,实际上也是 AMQP 协议中的核心概念,可以参考官网上对 AMQP 协议的介绍:AMQP 0-9-1 Model Explained 和 AMQP 0-9-1 Quick Reference。
三、RabbitMQ 实战
这一节通过一些简单的 RabbitMQ 实例学习上面介绍的各个概念,这样可以对 RabbitMQ 的理念有个更深入的了解。
想要完整的学习 RabbitMQ,建议把 官网的 6 个例子 挨个实践一把,这 6 个例子非常经典,网上很多 RabbitMQ 的教程都是围绕这 6 个例子展开的。我们知道 AMQP 是跨平台的,支持绝大多数的编程语言,所以官网提供的这些例子也几乎囊括了绝大多数的编程语言,如:Python、Java、Ruby、PHP、C# 等,而且针对 Java 甚至还提供了 Spring AMQP 的版本,实在是非常贴心了。你可以根据需要选择相应编程语言的例子,这里以 Java 为例,分别是:
如果觉得阅读英文比较费劲,网上也有大量的中文教程,譬如:RabbitMQ 中文文档、轻松搞定RabbitMQ、专栏:RabbitMQ从入门到精通、RabbitMQ指南,内容都是围绕这 6 个例子展开的。
![rabbitmq-examples.png rabbitmq-examples.png](https://www.aneasystone.com/usr/uploads/2018/08/1218333376.png)
上面是这几个例子的示意图。
第一个例子实现了一个最简单的生产消费模型,介绍了生产者(Producer)、消费者(Consumer)、队列(Queue)和消息(Message)的基本概念和关系,通过这个例子,我们可以学习如何发送消息,如何接受消息,这是最基础的消息队列的功能,只有一个生产者,也只有一个消费者,虽然简单,但是在日常工作中,有时也会使用这样的模型来做系统模块之间的解耦。
当发送的消息是一个复杂的任务,消费者在接受到这个任务后需要进行大量的计算时,这个队列叫做工作队列(Work Queue)或者任务队列(Task Queue),消费者被称之为 Worker,一个工作队列一般需要多个 Worker 对任务进行分发处理,这种设计具有良好的扩展性,如果要处理的任务太多,出现积压,只要简单的增加 Worker 数目即可。在第二个例子中实现了一个简单的工作队列模型,并介绍了两种任务调度的方法:循环调度 和 公平调度,另外还学习了 消息确认 和 消息持久化 的概念。
在第三个例子中介绍了发布/订阅模型(Publish/Subscribe)并构建了一个简单的日志系统,和前两个例子不一样的是,在这个例子中,所有的消费者都可以接受到生产者发送的消息,换句话说也就是,生产者发送的消息被广播给所有的消费者。在这个例子中我们学习了 交换机(Exchange) 的概念,在 RabbitMQ 的核心理念里,生产者不会直接发送消息给队列,而是发送给交换机,再由交换机将消息推送给特定的队列。消息从交换机推送到队列时会遵循一定的规则,这些规则就是 交换机类型(Exchange Type),常用的交换机类型有四种:直连交换机(direct)、主题交换机(topic)、头交换机(headers)和 扇型交换机(fanout)。值得注意的是,在前面的例子中没有指定交换机,实际上使用的是匿名交换机,这是一种特殊的直连交换机。而这个例子要实现的发布/订阅模型,实际上是扇型交换机。
在第四个例子中介绍了 路由(Routing) 和 绑定(Bindings) 的概念。使用扇形交换机只能用来广播消息,没有足够的灵活性,可以使用直连交换机和路由来实现非常灵活的消息转发,在这个日志系统的例子中,我们根据日志的严重程度将消息投递到两个队列中,一个队列只接受 error 级别的日志,将日志保存到文件中,另一个队列接受所有级别的日志,并将日志输出到控制台。路由指的是生产者如何通过交换机将消息投递到特定队列,生产者一般首先通过 exchangeDeclare
声明好交换机,然后通过 basicPublish
将消息发送给该交换机,发送的时候可以指定一个 Routing Key 参数,交换机会根据交换机的类型和 Routing Key 参数将消息路由到某个队列。绑定是用来表示交换机和队列的关系,一般在消费者的代码中先通过 exchangeDeclare
和 queueDeclare
声明好交换机和队列,然后通过 queueBind
来将两者关联起来。在关联时,也可以指定一个 Routing Key 参数,为了和生产者的 Routing Key 区分开来,有时也叫做 Binding Key。只有生产者发送消息时指定的 Routing Key 和消费者绑定队列时指定的 Binding Key 完全一致时,消息才会被投递给该消费者声明的队列中。
从扇形交换机到直连交换机,再到主题交换机,实际上并没有太大的区别,只是路由的规则越来越细致和灵活。在第五个例子中,我们继续学习和改进这个简单的日志系统,消费者在订阅日志时,不仅要根据日志的严重程度,同时还希望根据日志的来源,像这种同时基于多个标准执行路由操作的情况,我们就要用到主题交换机。和直连交换机一样,在发送消息也需要指定一个 Routing Key,只不过这个 Routing Key 必须是以点号分割的特殊字符串,譬如 cron.info,kern.warn 等,消费者在绑定交换机和队列时也需要指定一个 Routing Key(Binding Key),这个 Binding Key 具有同样的格式,而且还可以使用一些特殊的匹配符来匹配路由(星号 *
匹配一个单词,井号 #
匹配任意数量单词),譬如 *.warn 可以用来匹配所有来源的警告日志。
在最后一个例子中,我们将学习更高级的主题,使用 RabbitMQ 实现一个远程过程调用(RPC)系统。这个例子和第二个例子介绍的工作队列是一样的,只不过在生产者将任务发送给消费者之后,还希望能从消费者那里得到任务的执行结果。这里生产者充当 RPC 系统中的客户端的角色,而消费者充当 RPC 系统中的服务器的角色。要实现 RPC 系统,必须声明两个队列,一个用来发送消息,一个用来接受回调。生产者在发送消息时,可以设置消息的属性,AMQP 协议中给消息预定义了 14 个属性,其中有一个属性叫做 reply_to,就是这里的回调队列。另外还有一个属性 correlation_id,可以将 RPC 的响应和请求关联起来。
所有例子的源码可以参考 这里,我就不一一列出了。下面仅对第二个例子(工作队列模型)的源码进行分析,因为这个例子很常用,我们在日常工作中会经常遇到。
首先我们来看生产者,我们省略掉创建和关闭 Connection、Channel 的部分,无论是生产者还是消费者,这个都是类似的。(完整代码)
channel.queueDeclare("hello-queue", false, false, false, null);
for (int i = 1; i <= 10; i++) {
String message = "Hello World" + StringUtils.repeat(".", i);
channel.basicPublish("", "hello-queue", null, message.getBytes());
System.out.println("Message Sent: " + message);
}
可以看出生产者的核心代码实际上只有这两个函数:queueDeclare()
和 basicPublish()
,首先通过 queueDeclare()
函数声明一个队列 hello-queue,然后使用 basicPublish()
函数向这个队列发送消息。看到这里的代码你可能会有疑问,我们之前不是说在 RabbitMQ 里,生产者不会直接向队列发送消息,而是发送给交换机,再由交换机转发到各个队列吗?实际上,这里用到了 RabbitMQ 的 匿名转发(Nameless Exchange) 特性,在 RabbitMQ 里已经预置了几个交换机,比如:amq.direct、amq.fanout、amq.headers、amq.topic,它们的类型和它们的名字是一样的,amq.direct 就是 direct 类型的交换机,另外,还有一个空交换机,它也是 direct 类型,这个是 RabbitMQ 默认的交换机类型。一般情况下,我们在用 queueDeclare() 声明一个队列之后,还要用 queueBind() 绑定队列到某个交换机上,如下所示:
channel.exchangeDeclare("hello-exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("hello-queue", false, false, false, null);
channel.queueBind("hello-queue", "hello-exchange", "hello-key");
如果一个队列没有任何绑定,那么这个队列默认是绑定在空交换机上的。所以这里的生产者是将消息发送到空交换机,再由空交换机转发到 hello-queue 队列的。我们再来看消费者,下面的代码实现了任务的循环调度:(完整代码)
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.queueDeclare("hello-queue", false, false, false, null);
channel.basicConsume("hello-queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
String message = new String(body, "UTF-8");
System.out.println("Message Recv: " + message);
int c = message.lastIndexOf(".") - message.indexOf(".");
if (c % 2 == 0) {
Thread.sleep(1000 * 5);
} else {
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
在消费者的代码里,我们也用 queueDeclare()
声明了 hello-queue 队列,和生产者的代码是一样的。这里为什么既要在生产者里声明队列,又要在消费者里声明队列呢?而且我们在看其他例子的代码时也会发现,如果要用 exchangeDeclare()
声明交换机 也会同时出现在生产者和消费者中。为了搞清楚它的作用,我们可以把生产者或消费者的这行代码去掉,看看会发生什么:如果在消费者里不声明队列,下面的 basicConsume()
函数会直接抛出 NOT_FOUND 异常;如果在生产者里不声明队列,basicPublish()
发送的消息会全部丢失。所以,无论是生产者发送消息,还是消费者消费消息,都需要先创建队列才行。那么这个队列到底是谁创建的呢?答案是:谁先执行谁创建。创建队列的操作是 幂等 的,也就是说调用多次只会创建一次队列。要注意的是,如果两次创建的时候参数不一样,后创建的会报错:PRECONDITION_FAILED - inequivalent arg。
使用 basicConsume()
函数对某个队列的消息进行消费非常简单,它会一直阻塞,等待消息的到来,这个函数接受一个 DefaultConsumer
对象参数,可以重写该对象的 handleDelivery()
函数,一旦消息到来,就会使用这个回调函数对消息进行处理。我们启动多个消费者实例,由于这些消费者同时消费 hello-queue 队列,RabbitMQ 会将消息挨个分配给消费者,而且是提前一次性分配好,这样每个消费者得到的消息数量是均衡的,所以叫做 循环调度。
这里要特别说明的是 basicConsume()
函数的第二个参数 autoAck
,这个参数表示是否开启 消息自动确认,这是 RabbitMQ 的 消息确认(Message Acknowledgment) 特性。消息确认机制可以保证消息不会丢失,默认情况下,一旦 RabbitMQ 将消息发送给了消费者,就会从内存中删除,如果这时消费者挂掉,所有发给这个消费者的正在处理或尚未处理的消息都会丢失掉。如果我们让消费者在处理完成之后,发送一个消息确认(也就是 ACK),通知 RabbitMQ 这个消息已经接收并且处理完毕了,那么 RabbitMQ 才可以安全的删除该消息。很显然我们这里把 autoAck 参数设置为 true,是没有消息确认机制的,可能会出现消息丢失的情况。
循环调度有一个明显的缺陷,因为每个任务的处理时间是不一样的,所以按任务的先后顺序依次分配很可能会导致消费者消费的任务是不平衡的。我这里简单的模拟了这种不平衡的场景,首先生产者发送了 10 个任务,消费者处理奇数任务的执行时间设置为 5s,偶数任务执行时间设置为 1s,然后启动两个消费者实例,按循环调度算法,每个消费者都会领到 5 个任务,从任务数量上看是平衡的。但是从执行结果看,第一个消费者跑了 25s 才执行完所有任务,而第二个消费者 5s 就跑完了所有任务。对于这种情况,我们引入了公平调度方式。
如何实现公平调度呢?如果能让 RabbitMQ 不提前分配任务,而是在消费者处理完一个任务时才给它分配,不就可以了么?其实这里就要用到上面提到的消息确认机制了,RabbitMQ 提供了 basicQos()
函数用于设置消费者支持同时处理多少个任务,basicQos(1)
表示消费者最多只能同时处理一个任务,所以 RabbitMQ 每次都只分配一个任务给它,而且在这个任务没有处理完成之前,RabbitMQ 也不会给它推送新的任务。
公平调度的实现代码如下:(完整代码)
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
channel.queueDeclare("hello-queue", false, false, false, null);
channel.basicConsume("hello-queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
String message = new String(body, "UTF-8");
System.out.println("Message Recv: " + message);
int c = message.lastIndexOf(".") - message.indexOf(".");
if (c % 2 == 0) {
Thread.sleep(1000 * 5);
} else {
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
在这里 basicConsume()
函数的第二个参数设置成了 false,表示开启消息确认机制,而且在 handleDelivery()
函数中处理完消息后,通过 basicAck()
手工确认消息完成。确认的方法除了 basicAck,还有 basicNack 和 basicReject,它们的区别在于 basicNack 一次可以拒绝多条消息,而 basicReject 一次只能拒绝一条消息。
四、RabbitMQ 高级特性
通过上一节的学习,我们已经可以在我们的系统中使用 RabbitMQ 了,合理的采用消息队列,可以在程序中实现异步处理、应用解耦、流量削峰、消息通讯等等功能。除了这些消息队列的常规功能,RabbitMQ 还具有很多高级特性,这些特性大多是 RabbitMQ 对 AMQP 协议的扩展实现,更多的特性可以参考 官网文档:Protocol Extensions。这一节我们将学习延迟队列、优先级队列和持久化。
4.1 延迟队列
有时候我们不希望我们的消息立即被消费者消费,比如在网上购物时,如果用户下单完成后超过三十分钟未付款,订单需要自动取消,这个是延迟队列的一种典型应用场景,要实现这个功能,我们可以使用定时任务来实现,每隔一分钟扫描一次订单状态,但是这种做法显然效率太低了。当然,我们也可以用 DelayQueue、Timer、ScheduledExecutorService、Quartz 等带有调度功能的工具来实现,可以参考这篇博客中的相应实现:你真的了解延时队列吗。不过今天我们的重点是用 RabbitMQ 实现延迟队列。
延迟队列一般分为两种:基于消息的延迟和基于队列的延迟。基于消息的延迟是指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,显然这样做效率不是很高。实际应用中大多采用基于队列的延迟,每个队列中消息的延迟时间都是相同的,这样可以省去排序消息的工作,只需要检测超时时间按顺序投递即可。
事实上,RabbitMQ 并没有直接支持延迟队列,但是可以通过它的两个特性模拟出延迟队列来,这两个特性是:Time-To-Live Extensions 和 Dead Letter Exchanges。
Time-To-Live Extensions 让我们可以在 RabbitMQ 里为消息或者队列设置过期时间(TTL,time to live),单位为毫秒,当一条消息被设置了 TTL 或者进入设置了 TTL 的队列时,这条消息会在经过 TTL 毫秒后成为 死信(Dead Letter)。我们可以像下面这样通过 x-message-ttl
参数定义一个延迟队列:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60 * 1000);
channel.queueDeclare(queueName, false, false, false, args);
上面这个延迟队列的 TTL 为 60 秒,也就是说,在这个队列中的消息,超过 60 秒就会变成死信。在 RabbitMQ 中,除了过期的消息,还有两种情况消息可能会变成死信,第一种情况是消息被拒绝,并且没有设置 requeue,第二种情况是消息队列如果已满,再往该队列投递消息也会变成死信。那么 RabbitMQ 是如何处理这些死信的呢?
在上面的例子中,我们为队列设置了一个 x-message-ttl
参数,我们还可以给队列添加另一个参数 x-dead-letter-exchange
,这个就是 Dead Letter Exchange(DLX),这个参数决定了当某个队列中出现死信时会被转移到哪?DLX 是一个普通的交换机,和其他的交换机没有任何区别,死信被投递到 DLX 后,通过 DLX 再路由到其他队列,这取决于你给 DLX 绑定了哪些队列。另外,死信被投递到 DLX 时还可以通过参数 x-dead-letter-routing-key
指定 Routing Key。下面这个图很好的阐述了这个过程:(图片来源)
![rabbitmq-ttl-dlx.png rabbitmq-ttl-dlx.png](https://www.aneasystone.com/usr/uploads/2018/08/1097907605.png)
把 TTL 和 DLX 综合起来实现一个延迟队列如下:
// 创建 DLX
channel.exchangeDeclare("this-is-my-dlx", "direct");
// 设置队列的 TTL 和 DLX
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60 * 1000);
args.put("x-dead-letter-exchange", "this-is-my-dlx");
args.put("x-dead-letter-routing-key", "");
channel.queueDeclare(queueName, false, false, false, args);
这里省略了消费者的代码,消费者可以创建一个队列,并绑定到 this-is-my-dlx 这个交换机上,当这个队列中有消息到达时,说明有消息超时了,譬如订单创建超过 30 分钟了,这时去判断订单是否已经付款,如果未付款,则取消订单。
如前文所述,不仅可以设置队列的超时时间,我们也可以设置消息的超时时间:
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration("60000");
channel.basicPublish("exchangeName", "routeKey", properties.build(), "Hello".getBytes());
4.2 优先级队列
在 RabbitMQ 中我们可以使用 x-max-priority
参数将队列标记为优先级队列,优先级的值是一个整数,优先级的值越大,越被提前消费。x-max-priority
参数的值限制了优先级的最大值,一般不宜设置的太大。
Map<String, Object> args= new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("priority-queue", false, false, false, args);
优先级队列在 RabbitMQ 管理页面的 Features 里可以看到 Pri
标志:
![rabbitmq-priority-queue.jpg rabbitmq-priority-queue.jpg](https://www.aneasystone.com/usr/uploads/2018/09/3891452476.jpg)
我们按优先级 1 ~ 5 依次发送 5 条消息到这个队列:
for (int i = 1; i <= 5; i++) {
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().priority(i);
channel.basicPublish("", "priority-queue", properties.build(), ("Hello World" + i).getBytes());
}
然后启动消费者,可以看到 5 条消息并不是按顺序接受的,而是按优先级从大到小排序的:
[*] Waiting for messages. To exit press CTRL+C
Message Recv: Hello World5
Message Recv: Hello World4
Message Recv: Hello World3
Message Recv: Hello World2
Message Recv: Hello World1
发送消息时,优先级不要超过 x-max-priority
的值,超过 x-max-priority 时按 x-max-priority 处理。另外有一点要注意:在这个例子里,我们不能先启动消费者,否则我们还是会看到消息是按顺序接受的,这是因为消息的优先级是在有消息堆积的时候才会有意义,如果消费者的消费速度比生产者的生产速度快,那么生产者刚发送完一条消息就被消费者消费了,队列中最多只有一条消息,还谈什么优先级呢。
4.3 持久化
在前面的例子里,我们学习了 RabbitMQ 的消息确认机制,这个机制可以保证消息不会由于消费者的崩溃而丢失。但是如果是 RabbitMQ 服务崩溃退出了呢?我们该如何保证交换机、队列以及队列中的消息在 RabbitMQ 服务崩溃之后不丢失呢?这就是持久化要解决的问题。在声明交换机和队列时,可以把 durable 设置为 true,在发送消息时,可以设置消息的 deliveryMode 属性为 2,如下:
持久化的交换机:
channel.exchangeDeclare("durable-exchange", BuiltinExchangeType.DIRECT, /*durable*/true);
持久化的队列:
channel.queueDeclare("durable-queue", /*durable*/true, false, false, null);
持久化的消息:
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().deliveryMode(2);
channel.basicPublish("", "durable-queue", properties.build(), "Hello World".getBytes());
为方便起见,也可以直接使用内置的 MessageProperties.PERSISTENT_TEXT_PLAIN
静态变量,可以看一下它的实现,实际上就是 deliveryMode = 2 的一个简单封装:
channel.basicPublish("", "durable-queue", MessageProperties.PERSISTENT_TEXT_PLAIN, "Hello World".getBytes());
关于持久化的话题,我们可以再深入研究一下。为了防止消费者丢消息,我们采取了消息确认机制;为了防止服务器丢消息,我们将交换机、队列和消息都设置成持久化的。但是这样就能万无一失了吗?答案是否定的。问题就在于持久化是需要将消息保存到磁盘的,如果在保存到磁盘的过程中 RabbitMQ 崩溃,消息一样会丢失。要解决这个问题,一个可选的方案是使用 RabbitMQ 的事务机制,不过事务机制会带来大量的开销,性能不高,所以又引入了 Publisher Confirm 机制。推荐王磊的这篇博客 《RabbitMQ事务和Confirm发送方消息确认——深入解读》。
总结
通过这篇博客,我们学习了 AMQP 协议 和 RabbitMQ 的基本概念,并学习了 RabbitMQ 的安装和管理,通过官网的 6 个例子,掌握了交换机的几种常见类型:direct、fanout 和 topics,最后通过延迟队列、优先级队列和消息的持久化,我们还学习了 RabbitMQ 的一些高级特性。可以看出消息队列的功能非常丰富,我们常常在消息队列选型时,要综合考虑各种因素,功能是最重要的一条,InfoQ 上的这篇文章 《消息中间件选型分析:从Kafka与RabbitMQ的对比看全局》 介绍了更多要考虑的点。另外,限于篇幅,很多 RabbitMQ 的知识点没有展开,比如 RabbitMQ 的管理和监控,集群安装,事务和 Publisher Confirm 机制等。本文中所有代码使用的都是 amqp-client,如果你在用 Spring Boot,推荐使用 spring-boot-starter-amqp,这里 是官网的教程。
参考
- RabbitMQ Tutorials
- RabbitMQ中文 文档站
- Messaging with RabbitMQ
- 消息队列之JMS和AMQP对比
- RabbitMQ入门指南
- RabbitMQ与AMQP协议详解
- RabbitMQ从入门到精通
- 消息队列之 RabbitMQ
- 高可用RabbitMQ集群安装配置
- 基于 RabbitMQ 的实时消息推送
- 消息中间件选型分析:从Kafka与RabbitMQ的对比看全局
- 详细介绍Spring Boot + RabbitMQ实现延迟队列
- 你真的了解延时队列吗(一)
- RabbitMQ入门教程(十):队列声明queueDeclare
- Introducing Publisher Confirms