⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud/RabbitMQ/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labslabx-10-spring-cloud-stream-rabbitmq 目录。

原创不易,给点个 Star 嘿,一起冲鸭!

1. 概述

本文我们来学习 Spring Cloud Stream RabbitMQ 组件,基于 Spring Cloud Stream 的编程模型,接入 RabbitMQ 作为消息中间件,实现消息驱动的微服务。

RabbitMQ 是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。

在开始本文之前,胖友需要对 RabbitMQ 进行简单的学习。可以阅读《RabbitMQ 极简入门》文章,将第一二小节看完,在本机搭建一个 RabbitMQ 服务。

2. Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,使用 Spring Integration 与 Broker 进行连接。

友情提示:可能有胖友对 Broker 不太了解,我们来简单解释下。

一般来说,消息队列中间件都有一个 Broker Server(代理服务器),消息中转角色,负责存储消息、转发消息。

例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Spring Cloud Stream 提供了消息中间件的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:BinderBinding

Binder,跟消息中间件集成的组件,用来创建对应的 Binding。各消息中间件都有自己的 Binder 具体实现。

public interface Binder<T, 
C extends ConsumerProperties, // 消费者配置
P extends ProducerProperties> { // 生产者配置

// 创建消费者的 Binding
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

// 创建生产者的 Binding
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);

}

Binding,包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

最终整体交互如下图所示:Spring Cloud Stream Application

可能看完之后,胖友对 Spring Cloud Stream 还是有点懵逼,并且觉得概念怎么这么多呢?不要慌,我们先来快速入个门,会有更加具象的感受。

3. 快速入门

示例代码对应仓库:

友情提示:这可能是一个信息量有点大的入门内容,请保持耐心~

本小节,我们一起来快速入门下,会创建 2 个项目,分别作为生产者和消费者。最终项目如下图所示:项目结构

友情提示:考虑到胖友能够有更舒适的入门体验,需要对 RabbitMQ 的基本概念有一定的了解,特别是对 Exchange 的四种类型 Direct、Topic、Fanout、Headers 噢。

如果还不知道的,也不要慌~只需要阅读下艿艿写的《芋道 Spring Boot 消息队列 RabbitMQ 入门》「3. 快速入门」即可。

3.1 搭建生产者

创建 labx-10-sc-stream-rabbitmq-producer-demo 项目,作为生产者。

3.1.1 引入依赖

创建 pom.xml 文件中,引入 Spring Cloud Stream RabbitMQ 相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>labx-10</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>labx-10-sc-stream-rabbitmq-producer-demo</artifactId>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
</properties>

<!--
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 引入 Spring Cloud Stream RabbitMQ 相关依赖,将 RabbitMQ 作为消息队列,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>

</project>

通过引入 spring-cloud-starter-stream-rabbit 依赖,引入并实现 Stream RabbitMQ 的自动配置。在该依赖中,已经帮我们自动引入 RabbitMQ 的大量依赖,非常方便,如下图所示:

3.1.2 配置文件

创建 application.yaml 配置文件,添加 Spring Cloud Stream RabbitMQ 相关配置。

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
binder: rabbit001 # 设置使用的 Binder 名字

server:
port: 18080

spring.cloud.stream 为 Spring Cloud Stream 配置项,对应 BindingServiceProperties 类。配置的层级有点深,我们一层一层来看看。

spring.cloud.stream.binders 为 Binder 配置项,对应 BinderProperties Map。其中 key 为 Binder 的名字。

这里,我们配置了一个名字为 rabbit001 的 Binder。

  • type:Binder 的类型。这里,我们设置为了 rabbit,表示使用 Spring Cloud Stream RabbitMQ 提供的 Binder 实现。
  • environment:Binder 的环境。因为 Spring Cloud Steam RabbitMQ 底层使用的是 spring-rabbit,所以在使用 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类。

spring.cloud.stream.bindings 为 Binding 配置项,对应 BindingProperties Map。其中,key 为 Binding 的名字。要注意,虽然说 Binding 分成 Input 和 Output 两种类型,但是在配置项中并不会体现出来,而是要在稍后搭配 @Input 还是 @Output 注解,才会有具体的区分。

这里,我们配置了一个名字为 demo01-output 的 Binding。从命名上,我们的意图是想作为 Output Binding,用于生产者发送消息。

  • destination:目的地。在 RabbitMQ 中,使用 Exchange 作为目的地,默认为 Topic 类型。这里我们设置为 DEMO-TOPIC-01

  • content-type:内容格式。这里使用 JSON 格式,因为稍后我们将发送消息的类型为 POJO,使用 JSON 进行序列化。

  • binder:使用的 Binder 名字。这里我们设置为 rabbit001,就是我们上面刚创建的。

    友情提示:如果只有一个 Binder 的情况,可以不进行设置。又或者通过 spring.cloud.stream.default-binder 配置项来设置默认的 Binder 的名字。

3.1.3 MySource

创建 MySource 接口,声明名字为 Output Binding。代码如下:

public interface MySource {

@Output("demo01-output")
MessageChannel demo01Output();

}

这里,我们通过 @Output 注解,声明了一个名字为 demo01-output 的 Output Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Output 注解的方法的返回结果为 MessageChannel 类型,可以使用它发送消息。MessageChannel 提供的发送消息的方法如下:

@FunctionalInterface
public interface MessageChannel {

long INDEFINITE_TIMEOUT = -1;

default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}

boolean send(Message<?> message, long timeout);

}

那么,我们是否要实现 MySource 接口呢?答案是不需要,全部交给 Spring Cloud Stream 的 BindableProxyFactory 来解决。BindableProxyFactory 会通过动态代理,自动实现 MySource 接口。 而 @Output 注解的方法的返回值,BindableProxyFactory 会扫描带有 @Output 注解的方法,自动进行创建。

例如说,#demo01Output() 方法被自动创建返回结果为 DirectWithAttributesChannel,它是 MessageChannel 的子类。

友情提示:感兴趣的胖友,可以在 BindableProxyFactory 的 #afterPropertiesSet()#invoke(MethodInvocation invocation) 方法上,都打上一个断点,然后进行愉快的调试。

3.1.4 Demo01Message

创建 Demo01Message 类,示例 Message 消息。代码如下:

public class Demo01Message {

/**
* 编号
*/
private Integer id;

// ... 省略 setter/getter/toString 方法

}

3.1.5 Demo01Controller

创建 Demo01Controller 类,提供发送消息的 HTTP 接口。代码如下:

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private MySource mySource; // <X>

@GetMapping("/send")
public boolean send() {
// <1> 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// <2> 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// <3> 发送消息
boolean result = mySource.demo01Output().send(springMessage);
logger.info("[send][发送编号:[{}] 发送成功]", message.getId());
return result;
}

}

  • <X> 处,使用 @Autowired 注解,注入 MySource Bean。
  • <1> 处,创建 Demo01Message 对象。
  • <2> 处,使用 MessageBuilder 创建 Spring Message 对象,并设置消息内容为 Demo01Message 对象。
  • <3> 处,通过 MySource 获得 MessageChannel 对象,然后发送消息。

3.1.6 ProducerApplication

创建 ProducerApplication 类,启动应用。代码如下:

@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {

public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySource 接口。

3.2 搭建消费者

创建 labx-10-sc-stream-rabbitmq-consumer-demo 项目,作为消费者。

3.2.1 引入依赖

创建 pom.xml 文件中,引入 Spring Cloud Stream RabbitMQ 相关依赖。

友情提示:和「3.1.1 引入依赖」基本一样,点击 链接 查看。

3.2.2 配置文件

创建 application.yaml 配置文件,添加 Spring Cloud Stream RabbitMQ 相关配置。

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

总体来说,和「3.1.2 配置文件」是比较接近的,所以我们只说差异点噢。

spring.cloud.stream.bindings 为 Binding 配置项。

这里,我们配置了一个名字为 demo01-input 的 Binding。从命名上,我们的意图是想作为 Input Binding,用于消费者消费消息。

  • group:消费者分组。

    消费者组(Consumer Group):同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。

对于消费队列的消费者,会有两种消费模式:集群消费(Clustering)和广播消费(Broadcasting)。

  • 集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
  • 广播消费(Broadcasting):广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

RabbitMQ 的消费者两种模式都支持。因为这里我们配置了消费者组,所以采用集群消费。至于如何使用广播消费,我们稍后举例子。

这里一点要注意!!!艿艿加了三个感叹号,一定要理解集群消费和广播消费的差异。我们来举个例子,以有两个消费者分组 A 和 B 的场景举例子:

  • 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被两个消费者分组 "consumer_group_01""consumer_group_02" 都各自消费一次。
  • 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被分组 A 的某个实例消费一次,被分组 B 的某个实例也消费一次

通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER" 的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:

  • 积分模块:判断如果是手机注册,给用户增加 20 积分。
  • 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
  • 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
  • ... 等等

这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。

同时,相同消费者分组的多个实例,可以实现高可用,保证在一个实例意外挂掉的情况下,其它实例能够顶上。并且,多个实例都进行消费,能够提升消费速度

友情提示:如果还不理解的话,没有关系,我们下面会演示下我们上面举的例子。

3.2.3 MySink

创建 MySink 接口,声明名字为 Input Binding。代码如下:

public interface MySink {

String DEMO01_INPUT = "demo01-input";

@Input(DEMO01_INPUT)
SubscribableChannel demo01Input();

}

这里,我们通过 @Input 注解,声明了一个名字为 demo01-input 的 Input Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Input 注解的方法的返回结果为 SubscribableChannel 类型,可以使用它订阅消息来消费。MessageChannel 提供的订阅消息的方法如下:

public interface SubscribableChannel extends MessageChannel {

boolean subscribe(MessageHandler handler); // 订阅

boolean unsubscribe(MessageHandler handler); // 取消订阅

}

那么,我们是否要实现 MySink 接口呢?答案也是不需要,还是全部交给 Spring Cloud Stream 的 BindableProxyFactory 大兄弟来解决。BindableProxyFactory 会通过动态代理,自动实现 MySink 接口。 而 @Input 注解的方法的返回值,BindableProxyFactory 会扫描带有 @Input 注解的方法,自动进行创建。

例如说,#demo01Input() 方法被自动创建返回结果为 DirectWithAttributesChannel,它也是 SubscribableChannel 的子类。

友情提示:感兴趣的胖友,可以在 BindableProxyFactory 的 #afterPropertiesSet()#invoke(MethodInvocation invocation) 方法上,都打上一个断点,然后进行愉快的调试。

3.2.4 Demo01Message

创建 Demo01Message 类,示例 Message 消息。

友情提示:和「3.1.4 Demo01Message」基本一样,点击 链接 查看。

3.2.5 Demo01Consumer

创建 Demo01Consumer 类,消费消息。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}

在方法上,添加 @StreamListener 注解,声明对应的 Input Binding。这里,我们使用 MySink.DEMO01_INPUT

又因为我们消费的消息是 POJO 类型,所以我们需要添加 @Payload 注解,声明需要进行反序列化成 POJO 对象。

3.2.6 ConsumerApplication

创建 ConsumerApplication 类,启动应用。代码如下:

@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySink 接口。

3.3 测试单集群多实例的场景

本小节,我们会在一个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 demo01-consumer-group-DEMO-TOPIC-01 下有两个消费者实例。

友情提示:因为 IDEA 默认同一个程序只允许启动 1 次,所以我们需要配置 DemoProviderApplication 为 Allow parallel run。如下图所示:Allow parallel run

此时在 IDEA 控制台看到 RabbitMQ 相关的日志如下:

# 在 RabbitMQ 声明一个 `DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01` 队列,并绑定到名字为 `DEMO-TOPIC-01` 的 Exchange 上
2020-03-03 08:09:28.707 INFO 80728 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01, bound to: DEMO-TOPIC-01

# 连接到 RabbitMQ Broker
2020-03-03 08:09:28.708 INFO 80728 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2020-03-03 08:09:28.745 INFO 80728 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#50756c76:0/SimpleConnection@36480b2d [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60215]

# 订阅消费 `DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01` 队列的消息
2020-03-03 08:09:28.867 INFO 80728 --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started bean 'inbound.DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01'

重点是第一条日志,为什么呢?在我们添加了 spring.cloud.stream.bindings.{bindingName} 配置项时,并且是 Input 类型时,每个 RabbitMQ Binding 都会:

  • 【Queue】创建一个 {destination}.{group} 队列,例如这里创建的队列是 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01
  • 【Exchange】同时创建的还有类型为 Topic 的 Exchange,并进行绑定。

下面,我们打开 RabbitMQ 运维界面,查看下名字为 DEMO-TOPIC-01 的 Exchange,会更加好理解。如下图所示: 的 Exchange

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// ConsumerApplication 控制台 01
2020-03-03 08:28:23.343 INFO 82079 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=2110729413}]
2020-03-03 08:28:29.215 INFO 82079 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=-1265506892}]

// ConsumerApplication 控制台 02
2020-03-03 08:28:28.877 INFO 82369 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=770439046}]

符合预期。从日志可以看出,每条消息仅被消费一次。对了,有点忘记提下,非常关键!当 RabbitMQ Consumer 订阅相同 Queue 时,每条消息有且仅被一个 Consumer 消费,通过这样的方式实现集群消费,也就是说,Stream RabbitMQ 是通过消费相同 Queue 实现消费者组

友情提示:RabbitMQ 本身没有消费组的概念,而是由 Spring Cloud Stream 定义的统一抽象,而后交给不同消息队列的 Spring Cloud Stream XXX 去具体实现。例如说,Spring Cloud Stream RabbitMQ 就基于 RabbitMQ 的上述特性,实现消费组的功能。

3.4 测试多集群多实例的场景

本小节,我们会在二个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 demo01-consumer-group-DEMO-TOPIC-01 下有两个消费者实例。

② 修改 labx-10-sc-stream-rabbitmq-consumer-demo 项目的配置文件,修改 spring.cloud.stream.bindings.demo01-input.group 配置项,将消费者分组改成 X-demo01-consumer-group-DEMO-TOPIC-01

然后,执行 ConsumerApplication 两次,再启动两个消费者的实例,从而实现在消费者分组 X-demo01-consumer-group-DEMO-TOPIC-01 下有两个消费者实例。

此时,我们打开 RabbitMQ 运维界面,查看下名字为 DEMO-TOPIC-01 的 Exchange,可以看到两个消费者的两个队列。如下图所示: 的 Exchange

③ 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-03-03 08:41:50.491 INFO 82079 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=-1906251317}]

// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-03-03 08:41:50.056 INFO 82369 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-1417543599}]
2020-03-03 08:41:50.815 INFO 82369 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-1176160470}]

// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-03-03 08:41:50.018 INFO 86893 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=-1417543599}]
2020-03-03 08:41:50.813 INFO 86893 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=-1176160470}]

// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-03-03 08:41:50.469 INFO 86913 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-1906251317}]

符合预期。从日志可以看出,每条消息被每个消费者集群都进行了消费,且仅被消费一次。

3.5 小结

至此,我们已经完成了 Stream RocketMQ 的快速入门,是不是还是蛮简答的噢。现在胖友可以在回过头看看 Binder 和 Binding 的概念,是不是就清晰一些了。

4. 定时消息

示例代码对应仓库:

在 RabbitMQ 中,我们可以通过使用 rabbitmq-delayed-message-exchange 插件提供的定时消息功能。

定时消息,是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

相比定时任务来说,我们可以使用定时消息实现更细粒度动态的定时功能。例如说,新创建的订单 2 小时超时关闭的场景:

  • 如果使用定时任务,我们需要每秒扫描订单表,是否有超过支付时间的订单。这样会增加对订单表的查询压力,同时定时任务本身是串行的,需要一个一个处理。
  • 如果使用定时消息,我们需要创建订单的时候,同时发送一条检查支付超时的定时消息。这样就无需每秒查询查询订单表,同时多个定时消息可以并行消费,提升处理速度。

另外,定时消息更有利于不同环境的隔离。再举个例子,我们生产和预发布环境使用的是相同的数据库,还是新创建的订单 2 小时超时关闭的场景,假设我们现在修改了超时支付的逻辑:

  • 如果使用定时任务,在我们把程序发布到预发布的时候,因为使用相同数据库,会导致所有订单都执行了新的逻辑。如果新的逻辑有问题,将会影响到所有订单。

  • 如果使用定时消息,我们只需要把正服和预发布使用不同的 RabbitMQ Exchange,这样预发布发送的延迟消息,只会被预发布的消费者消费,生产发送的延迟消息,只会被生产的消费者消费。如果新的逻辑有问题,只会影响到预发布的订单。

    友情提示:建议不同的环境,使用不同的 RabbitMQ Exchange 噢,例如说 exchange-01 可以带上具体环境的后缀,从而拆分成 exchange-01-devexchange-01-prod 等。

下面,我们来搭建一个 RabbitMQ 定时消息的使用示例。最终项目如下图所示:项目结构

4.1 安装插件

① 进入 rabbitmq-delayed-message-exchange 插件的下载页面,选择合适的版本。如下图所示:下载页面

# 进入 RabbitMQ 的 plugins 目录
# 因为艿艿是 mac 使用 brew 安装,所以在如下目录。胖友自己的,自己找找哈~
$ cd /usr/local/Cellar/rabbitmq/3.8.0/plugins

# 下载 rabbitmq-delayed-message-exchange 插件
$ wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

② 使用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令,开启该插件。

$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_amqp1_0
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_mqtt
rabbitmq_stomp
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange # 该插件生效了

started 1 plugins.

4.2 搭建生产者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-producer-delay 项目作为生产者。

4.2.1 配置文件

修改 application.yml 配置文件,开启发送定时消息的功能。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-02 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
binder: rabbit001 # 设置使用的 Binder 名字
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-output:
# RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类
producer:
delayed-exchange: true # 是否使用 x-delayed-message 类型的 Exchange,即延迟消息,默认为 false

server:
port: 18080

spring.cloud.stream.rabbit 为 Spring Cloud Stream RabbitMQ 专属配置项。

spring.cloud.stream.rabbit.bindings 为 RabbitMQ 自定义 Binding 配置项,用于对通用的 spring.cloud.stream.bindings 配置项的增强,实现 RabbitMQ Binding 独特的配置。该配置项对应 RabbitBindingProperties Map,其中 key 为 Binding 的名字,需要对应上噢。

这里,我们对名字为 demo01-output 的 Binding 进行增强,进行 Producer 的配置。其中,producer 为 RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类。

  • delayed-exchange 属性,是否使用 x-delayed-message 类型的 Exchange,即延迟消息,默认为 false。这个是由 rabbitmq-delayed-message-exchange 插件提供的一种拓展的 Exchange 类型。

③ 这里我们创建了一个新的 Exchange,修改成了 DEMO-TOPIC-02,因为稍后创建的延迟消息的 Exchange 是 x-delayed-message,而在「3. 快速入门」使用的是 Topic 类型,肯定是不对的。

4.2.2 Demo01Controller

修改 Demo01Controller 类,增发送定时消息的 HTTP 接口。代码如下:

// Demo01Controller.java

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private MySource mySource;

@GetMapping("/send_delay")
public boolean sendDelay() {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader("x-delay", 5000) // 设置延迟时间,单位:毫秒
.build();
// 发送消息
boolean sendResult = mySource.demo01Output().send(springMessage);
logger.info("[sendDelay][发送消息完成, 结果 = {}]", sendResult);
return sendResult;
}

<X> 处,通过添加头 x-delay,设置消息的延迟级别,从而发送定时消息。这是 rabbitmq-delayed-message-exchange 插件的规定,用就完事了。

4.3 搭建消费者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-delay 项目作为消费者。

4.3.1 配置文件

修改 application.yml 配置文件,开启消费定时消息的功能。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-02 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-input:
# RabbitMQ Consumer 配置项,对应 RabbitConsumerProperties 类
consumer:
delayed-exchange: true # 是否使用 x-delayed-message 类型的 Exchange,即延迟消息,默认为 false

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

「4.2.1 配置文件」类似,我们拷贝拷贝,嘿嘿~

spring.cloud.stream.rabbit 为 Spring Cloud Stream RabbitMQ 专属配置项。

spring.cloud.stream.rabbit.bindings 为 RabbitMQ 自定义 Binding 配置项,用于对通用的 spring.cloud.stream.bindings 配置项的增强,实现 RabbitMQ Binding 独特的配置。该配置项对应 RabbitBindingProperties Map,其中 key 为 Binding 的名字,需要对应上噢。

这里,我们对名字为 demo01-output 的 Binding 进行增强,进行 Consumer 的配置。其中,consumer 为 RabbitMQ Consumer 配置项,对应 RabbitConsumerProperties 类。

  • delayed-exchange 属性,是否使用 x-delayed-message 类型的 Exchange,即延迟消息,默认为 false。这个是由 rabbitmq-delayed-message-exchange 插件提供的一种拓展的 Exchange 类型。

③ 这里我们创建了一个新的 Exchange,修改成了 DEMO-TOPIC-02,因为稍后创建的延迟消息的 Exchange 是 x-delayed-message,而在「3. 快速入门」使用的是 Topic 类型,肯定是不对的。

4.4 简单测试

① 执行 ConsumerApplication,启动一个消费者的实例。

我们打开 RabbitMQ 运维界面,查看下名字为 DEMO-TOPIC-02 的 Exchange 的类型为 x-delayed-message。如下图所示: 的 Exchange

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_delay 接口,发送延迟 5 秒的定时消息。IDEA 控制台输出日志如下:

// Producer 的控制台
2020-03-03 20:26:33.368 INFO 5382 --- [io-18080-exec-3] c.i.s.l.r.p.controller.Demo01Controller : [sendDelay][发送消息完成, 结果 = true]

// Consumer 的控制台
2020-03-03 20:26:38.424 INFO 98265 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:30 消息内容:Demo01Message{id=1237871741}]

符合预期。在 Producer 发送的消息之后,Consumer 确实 5 秒后才消费消息。

4.5 另外一种方案

除了使用 rabbitmq-delayed-message-exchange 插件,我们还可以通过 RabbitMQ 的死信队列实现定时消息。具体的,可以看看艿艿写的《芋道 Spring Boot 消息队列 RabbitMQ 入门》文章的「8. 定时消息」小节。

这两种实现定时消息的方案,各有优缺点,目前采用 rabbitmq-delayed-message-exchange 插件较多,不然 Spring Cloud Stream RabbitMQ 也不会选择将其集成进来。至于两者的对比,胖友可以阅读《RabbitMQ 延迟队列的两种实现方式》文章。

5. 消费重试

示例代码对应仓库:

在开始本小节之前,胖友首先要对 RabbitMQ 的死信队列的机制,有一定的了解。不了解的胖友,可以看看《RabbitMQ 之死信队列》文章。

在消息消费失败的时候,Spring-AMQP 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

友情提示:Spring Cloud Stream RabbitMQ 是基于 Spring-AMQP 操作 RabbitMQ,它仅仅是上层的封装哟。

当然,Spring-AMQP 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。

那么消费失败到达最大次数的消息,是怎么进入到死信队列的呢?Spring-AMQP 在消息到达最大消费次数的时候,会将该消息进行否定(basic.nack),并且 requeue=false ,这样后续就可以利用 RabbitMQ 的死信队列的机制,将该消息转发到死信队列。

另外,每条消息的失败重试,是可以配置一定的间隔时间。具体,我们在示例的代码中,来进行具体的解释。

下面,我们来实现一个 Consumer 消费重试的示例。最终项目如下图所示:项目结构

5.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目即可。

5.2 搭建消费者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-retry 项目作为消费者。

5.2.1 配置文件

修改 application.yml 配置文件,增加消费重试相关的配置项。最终配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
max-attempts: 3 # 重试次数,默认为 3 次。
back-off-initial-interval: 3000 # 重试间隔的初始值,单位毫秒,默认为 1000
back-off-multiplier: 2.0 # 重试间隔的递乘系数,默认为 2.0
back-off-max-interval: 10000 # 重试间隔的最大值,单位毫秒,默认为 10000
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-input:
# RabbitMQ Consumer 配置项,对应 RabbitConsumerProperties 类
consumer:
auto-bind-dlq: true # 是否创建对应的死信队列,并进行绑定,默认为 false。
republish-to-dlq: true # 消费失败的消息发布到对应的死信队列时,是否添加异常异常的信息到消息头

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

spring.cloud.stream.bindings.<bindingName>.consumer 为 Spring Cloud Stream Consumer 通用配置项,对应 ConsumerProperties 类。

  • max-attempts:最大重试次数,默认为 3 次。如果想要禁用掉重试,可以设置为 1。

    max-attempts 配置项要注意,是一条消息一共尝试消费总共 max-attempts 次,包括首次的正常消费。

  • back-off-initial-interval:重试间隔的初始值,单位毫秒,默认为 1000。

  • back-off-multiplier:重试间隔的递乘系数,默认为 2.0。

  • back-off-max-interval:重试间隔的最大值,单位毫秒,默认为 10000。

将四个参数组合在一起,我们来看一个消费重试的过程:

  • 第一次 00:00:00:首次消费,失败。
  • 第二次 00:00:03:3 秒后重试,因为重试间隔的初始值为 back-off-initial-interval,等于 3000 毫秒。
  • 第三次 00:00:09:6 秒后重试,因为有重试间隔的递乘系数 back-off-multiplier,所以是 2.0 * 3000 等于 6000 毫秒。
  • 第四次,没有,因为到达最大重试次数,等于 3。

spring.cloud.stream.rabbit.bindings.<bindingName>.consumer 为 Spring Cloud Stream RabbitMQ Consumer 专属配置项,我们新增了两个配置项:

  • auto-bind-dlq:是否创建对应的死信队列,并进行绑定,默认为 false
    • Spring Cloud Stream RabbitMQ 默认会将消息发送到死信队列,如果这里我们不设置为 true,那么我们就需要手工去创建 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01 对应的死信队列,否则会因为死信队列不存在而报错。
    • 默认情况下,创建的死信队列为原队列添加 .ldq 后缀,可以通过 deadLetterQueueName 配置项来自定义。例如说 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01 对应的死信队列为 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.ldq
  • republish-to-dlq:消费失败的消息发布到对应的死信队列时,是否添加异常异常的信息到消息头,默认为 true。通过这样的方式,我们可以知道一条消息消费失败的原因~

5.2.2 Demo01Consumer

修改 Demo01Consumer 类,直接抛出异常,模拟消费失败,从而演示消费重试的功能。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// <X> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("我就是故意抛出一个异常");
}

}

5.3 简单测试

① 执行 ConsumerApplication,启动一个消费者的实例。

我们打开 RabbitMQ 运维界面,查看下名字为 DLX 的 Exchange,用于死信队列。如下图所示: 的 Exchange

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送消息。IDEA 控制台输出日志如下:

// 第一次消费
2020-03-04 07:59:23.894 INFO 63423 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=1074078968}]
// 第二次消费,3 秒后
2020-03-04 07:59:26.899 INFO 63423 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=1074078968}]
// 第三次消费,6 秒后
2020-03-04 07:59:32.902 INFO 63423 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=1074078968}]

// 内置的 LoggingHandler 打印异常日志
2020-03-04 07:59:32.905 ERROR 63423 --- [DEMO-TOPIC-01-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking Demo01Consumer#onMessage[1 args]; nested exception is java.lang.RuntimeException: 我就是故意抛出一个异常 // ... 省略异常堆栈
Caused by: java.lang.RuntimeException: 我就是故意抛出一个异常 // ... 省略异常堆栈

我们打开 RabbitMQ 运维界面,查看下名字为 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.dlq 的死信队列,并获取一条死信消息,可以从消息头看到具体消费失败的异常堆栈。如下图所示: 死信队列

5.4 另一种重试方案

目前我们看到的重试方案,是通过 RetryTemplate 来实现客户端级别的消费冲水。而 RetryTemplate 又是通过 sleep 来实现消费间隔的时候,这样将影响 Consumer 的整体消费速度,毕竟 sleep 会占用掉线程。

实际上,我们可以结合 RabbitMQ 的定时消息,手动将消费失败的消息发送到定时消息的队列,而延迟时间为下一次重试消费的间隔。通过这样的方式,避免使用 RetryTemplate 使用 sleep 所带来的影响。

友情提示:RocketMQ 的消息重试就是类似这样的方案,可以参考《芋道 Spring Cloud Alibaba 消息队列 RocketMQ 入门》「5. 消费重试」小节。

6. 消费异常处理机制

示例代码对应仓库:

在 Spring Cloud Stream 中,提供了通用的消费异常处理机制,可以拦截到消费者消费消息时发生的异常,进行自定义的处理逻辑。

下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。最终项目如下图所示:项目结构

6.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目即可。

6.2 搭建消费者

「5. 消费重试」小节的 labx-10-sc-stream-rabbitmq-consumer-retry 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-error-handler 项目作为消费者。

6.2.1 Demo01Consumer

修改 Demo01Consumer 类,增加消费异常处理方法。完整代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT) // 对应 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// <X> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("我就是故意抛出一个异常");
}

@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")
public void handleError(ErrorMessage errorMessage) {
logger.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
}

@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
public void globalHandleError(ErrorMessage errorMessage) {
logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
}

}

① 在 Spring Integration 的设定中,若 #onMessage(@Payload Demo01Message message) 方法消费消息发生异常时,会发送错误消息(ErrorMessage)到对应的错误 Channel(<destination>.<group>.errors中。同时,所有错误 Channel 都桥接到了 Spring Integration 定义的全局错误 Channel(errorChannel)

友情提示:先暂时记住 Spring Integration 这样的设定,艿艿也没去深究 T T,也是一脸懵逼。

因此,我们有两种方式来实现异常处理:

  • 局部的异常处理:通过订阅指定错误 Channel
  • 全局的异常处理:通过订阅全局错误 Channel

② 在 #handleError(ErrorMessage errorMessage) 方法上,我们声明了 @ServiceActivator 注解,订阅指定错误 Channel的错误消息,实现 #onMessage(@Payload Demo01Message message) 方法的局部异常处理。如下图所示:对应关系

③ 在 #globalHandleError(ErrorMessage errorMessage) 方法上,我们声明了 @StreamListener 注解,订阅全局错误 Channel的错误消息,实现全局异常处理。

④ 在全局局部异常处理都定义的情况下,错误消息仅会被符合条件局部错误异常处理。如果没有符合条件的,错误消息才会被全局异常处理。

6.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送一条消息。IDEA 控制台输出日志如下:

// onMessage 方法,一共 3 次,包括重试
2020-03-04 09:24:12.510 INFO 80630 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=640460080}]
2020-03-04 09:24:15.515 INFO 80630 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=640460080}]
2020-03-04 09:24:21.519 INFO 80630 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=640460080}]

// handleError 方法
2020-03-04 09:24:21.520 INFO 80630 --- [DEMO-TOPIC-01-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-03-04 09:24:21.521 ERROR 80630 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][payload:Exception thrown while invoking Demo01Consumer#onMessage[1 args]; nested exception is java.lang.RuntimeException: 我就是故意抛出一个异常]
2020-03-04 09:24:21.521 ERROR 80630 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][originalMessage:GenericMessage [payload=byte[16], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=DEMO-TOPIC-01, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01, amqp_redelivered=false, amqp_receivedRoutingKey=DEMO-TOPIC-01, amqp_timestamp=Wed Mar 04 09:24:12 CST 2020, amqp_messageId=34f29888-da05-323b-a049-8568ec6d932e, id=6439861e-bc95-4b26-7a3e-a72b1d1aac95, amqp_consumerTag=amq.ctag-ZvbNIvZe62ppHkg--KwBPA, sourceData=(Body:'{"id":640460080}' MessageProperties [headers={}, timestamp=Wed Mar 04 09:24:12 CST 2020, messageId=34f29888-da05-323b-a049-8568ec6d932e, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DEMO-TOPIC-01, receivedRoutingKey=DEMO-TOPIC-01, deliveryTag=1, consumerTag=amq.ctag-ZvbNIvZe62ppHkg--KwBPA, consumerQueue=DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01]), contentType=application/json, timestamp=1583285052462}]]
2020-03-04 09:24:21.521 ERROR 80630 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][headers:{amqp_raw_message=(Body:'{"id":640460080}' MessageProperties [headers={}, timestamp=Wed Mar 04 09:24:12 CST 2020, messageId=34f29888-da05-323b-a049-8568ec6d932e, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DEMO-TOPIC-01, receivedRoutingKey=DEMO-TOPIC-01, deliveryTag=1, consumerTag=amq.ctag-ZvbNIvZe62ppHkg--KwBPA, consumerQueue=DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01]), id=c1f146bc-aeeb-ec3a-2cfc-d21b2b6bd9b2, sourceData=(Body:'{"id":640460080}' MessageProperties [headers={}, timestamp=Wed Mar 04 09:24:12 CST 2020, messageId=34f29888-da05-323b-a049-8568ec6d932e, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DEMO-TOPIC-01, receivedRoutingKey=DEMO-TOPIC-01, deliveryTag=1, consumerTag=amq.ctag-ZvbNIvZe62ppHkg--KwBPA, consumerQueue=DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01]), timestamp=1583285061519}]

😆 不过要注意,如果异常处理方法成功,没有重新抛出异常,会认定为该消息被消费成功,所以就不会发到死信队列了噢。

7. 广播消费

示例代码对应仓库:

在上述的示例中,我们看到的都是使用集群消费。而在一些场景下,我们需要使用广播消费

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

使用场景?

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RabbitMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RabbitMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

如何实现?

通过“在 RabbitMQ 中,如果多个 Consumer 订阅相同的 Queue ,那么每一条消息有且仅会被一个 Consumer 所消费”的特性,可以很方便的实现集群消费。但是,在实现广播消费时,这个特性恰恰成为了一种阻碍。

不过机智的我们,我们可以通过给每个 Consumer 创建一个其独有 Queue ,从而保证都能接收到全量的消息。同时,RabbitMQ 支持队列的自动删除,所以我们可以在 Consumer 关闭的时候,通过该功能删除其独有的 Queue 。

恰好,Spring Cloud Stream RabbitMQ 在设置 Consumer 的消费者分组为空时,会为该 Consumer 生成一个独有自动删除的 Queue,从而实现广播消费的功能。

下面,我们来实现一个 Consumer 广播消费的示例。最终项目如下图所示:项目结构

7.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目即可。

7.2 搭建消费者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-broadcasting 项目作为消费者。

7.2.1 配置文件

修改 application.yml 配置文件,删除 Consumer 的消费者分组配置项 group 即可。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
# group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

7.3 简单测试

① 执行 ConsumerApplication 两次,启动两个消费者的实例。

我们打开 RabbitMQ 运维界面,查看下名字为 DEMO-TOPIC-01 的 Exchange,可以每个消费者实例都有一个名字带有 anonymous 的队列。如下图所示: 的 Exchange

为什么每个消费者实例会有一个名字带有 anonymous 的队列呢?在我们没有给 Spring Cloud Stream RabbitMQ Consumer 对应的 Binding 设置消费者分组时,会自动给它生成一个以 anonymous 为前缀的消费者分组,因此后续在创建一个名字为 {destination}.{group} 的队列时,就会在名字上带有 anonymous

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。IDEA 控制台输出日志如下:

// ConsumerApplication 控制台 01
2020-03-07 15:43:35.883 INFO 46486 --- [Z-_87KO2Pl-WQ-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=2084635466}]
2020-03-07 15:43:37.278 INFO 46486 --- [Z-_87KO2Pl-WQ-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-2118253111}]
2020-03-07 15:43:37.652 INFO 46486 --- [Z-_87KO2Pl-WQ-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=1956010289}]

// ConsumerApplication 控制台 02
2020-03-07 15:43:35.884 INFO 46527 --- [2e8iPDhSVKdcg-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=2084635466}]
2020-03-07 15:43:37.278 INFO 46527 --- [2e8iPDhSVKdcg-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-2118253111}]
2020-03-07 15:43:37.652 INFO 46527 --- [2e8iPDhSVKdcg-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=1956010289}]

符合预期。从日志可以看出,每条消息仅被每个消费者消费了一次。

8. 并发消费

示例代码对应仓库:

在上述的示例中,我们配置的每一个 Binding 的 Consumer,都是串行消费的。显然,这在监听的 Queue 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。

虽然说,我们可以通过启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度。但是问题是,否能够实现多线程的并发消费呢?答案是

通过在配置文件中的 spring.cloud.stream.bindings.<bindingName>.consumer.concurrency 配置项,可以指定该 Binder 并发消费的线程数。例如说,如果设置 concurrency=4 时,Spring Cloud Stream RabbitMQ 就会为 Binder 创建 4 个线程,进行并发消费。

下面,我们来实现一个 Consumer 并发消费的示例。最终项目如下图所示:项目结构

8.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目即可。

8.2 搭建消费者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-concurrency 项目作为消费者。

8.2.1 配置文件

修改 application.yml 配置文件,增加并发消费的配置项。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
concurrency: 2 # 每个 Consumer 消费线程数的初始大小,默认为 1
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-input:
# RabbitMQ Consumer 配置项,对应 RabbitConsumerProperties 类
consumer:
max-concurrency: 10 # 每个 Consumer 消费线程数的最大大小,默认为 1

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

spring.cloud.stream.bindings.<bindingName>.consumer.concurrency 配置项,指定该 Binder 的 Consumer 消费线程数的初始大小,默认为 1。这里我们设置为 2,表示该 Consumer 初始使用 2 个线程并发消费。

spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.max-concurrency 配置项,指定该 Binder 的 Consumer 消费线程数的最大大小,默认为 1。这里我们设置为 10,表示该 Consumer 最大使用 10 个线程并发消费。

可能胖友对 concurrencymax-concurrency 两个配置项目有点懵逼,我们可以从一个线程池的角度去理解它们俩,前者决定了初始大小,后者决定了最大大小。通过这样的方式,实现 Consumer 的消费能力的动态调整:

  • 在消息过多时,导致 Consumer 消费不过来,可以动态扩容线程数到 max-concurrency 大小,提升消费速度。
  • 在消息较少时,导致 Consumer 消费无压力,可以动态缩容线程数到 concurrency 大小,减少系统资源的占用。

8.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

我们打开 RabbitMQ 运维界面,查看到消费的 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01 队列有 2 个消费者。如下图所示: 队列

为什么该队列会有 2 个消费者呢?因为我们设置了 concurrency 配置项为 2,所以 Spring Cloud Stream 会创建 2 个 RabbitMQ Consumer 和 2 个线程,每个 RabbitMQ Consumer 会被单独分配到一个线程中,进行拉取消息,消费消息。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口四次,发送四条消息。IDEA 控制台输出日志如下:

// 线程编号为 29
2020-03-04 20:40:35.763 INFO 4909 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=1211699615}]
2020-03-04 20:40:36.695 INFO 4909 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:29 消息内容:Demo01Message{id=-1384209259}]

// 线程编号为 70
2020-03-04 20:40:36.277 INFO 4909 --- [DEMO-TOPIC-01-3] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:70 消息内容:Demo01Message{id=1980415787}]
2020-03-04 20:40:37.135 INFO 4909 --- [DEMO-TOPIC-01-3] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:70 消息内容:Demo01Message{id=1355756196}]

我们可以看到,两个线程在消费 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01 队列下的消息。

③ Consumer 的动态扩容和缩容就暂时不演示了,胖友可以在 Demo01Consumer 的消费逻辑里增加 sleep 1 秒,然后多发点消息,从而模拟 Consumer 消费不过来。然后,观察 RabbitMQ 运维界面的 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01 队列的 Consumer 数量,是不是逐步变多了。

再之后,随着消息消费的差不多,Consumer 消费无压力了。然后,观察 RabbitMQ 运维界面的 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01 队列的 Consumer 数量,是不是逐步减少了。

9. 顺序消息

示例代码对应仓库:

我们先来一起了解下顺序消息的顺序消息的定义:

  • 普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。
  • 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。

消息有序,指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序:对于指定的一个 Topic,所有消息根据 Sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。适用场景:性能要求高,以 Sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

注意,分区顺序就是普通顺序消息,全局顺序就是完全严格顺序。

📚 如何实现? 📚

那么,让我们来思考下,如果我们希望在 RabbitMQ 上,实现顺序消息需要做两个事情。

事情一,我们需要保证 RabbitMQ Producer 发送相关联的消息发送到相同的 Queue 中。例如说,我们要发送用户信息发生变更的 Message ,那么如果我们希望使用顺序消息的情况下,可以将用户编号相同的消息发送到相同的 Queue 中。

事情二,我们在有且仅启动一个 Consumer 消费该队列,保证 Consumer 严格顺序消费。

📚 存在的问题? 📚

不过如果这样做,会存在两个问题,我们逐个来看看。

问题一,正如我们在「8. 并发消费」中提到,如果我们将消息仅仅投递到一个 Queue 中,并且采用单个 Consumer 串行消费,在监听的 Queue 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。

此时,我们有两种方案来解决:

  • 方案一,在 Producer 端,将 Queue 拆成多个 Queue 。假设原先 Queue 是 QUEUE_USER ,那么我们就分拆成 QUEUE_USER_00QUEUE_USER_..${N-1} 这样 N 个队列,然后基于消息的用户编号取余,路由到对应的 Queue 中。
  • 方案二,在 Consumer 端,将 Queue 拉取到的消息,将相关联的消息发送到相同的线程中来消费。例如说,还是 Queue 是 QUEUE_USER 的例子,我们创建 N 个线程池大小为 1 的 ExecutorService 数组,然后基于消息的用户编号取余,提交到对应的 ExecutorService 中的单个线程来执行。

两个方案,并不冲突,可以结合使用。

问题二,如果我们启动相同 Consumer 的多个进程,会导致相同 Queue 的消息被分配到多个 Consumer 进行消费,破坏 Consumer 严格顺序消费。

此时,我们有两种方案来解决:

  • 方案一,引入 ZooKeeper 来协调,动态设置多个进程中的相同的 Consumer 的开关,保证有且仅有一个 Consumer 开启对同一个 Queue 的消费。
  • 方案二,仅适用于【问题一】的【方案一】。还是引入 ZooKeeper 来协调,动态设置多个进程中的相同的 Consumer 消费的 Queue 的分配,保证有且仅有一个 Consumer 开启对同一个 Queue 的消费。

📚 本节示例? 📚

① 对于问题一,Spring Cloud Stream 抽象了 Partitioning 分区的概念,可以将一个 Binding 的 destination 目的地拆分成多个分区的目的地,这样我们就可以 Producer 发送消息到对应的每个目的地,而每个目的地单独分配一个 Consumer 进行消费。如此,我们便实现了顺序消息的功能。

我们知道,Spring Cloud Stream RabbitMQ 的 destination 实际对应的是 RabbitMQ Queue,而 Queue 已经是 RabbitMQ 最细粒度,不存在 Queue 的说法,那么 Spring Cloud Stream RabbitMQ 要怎么实现 destination 的分区呢?答案是,通过创建多个名字为 {destination}-{index},其中 index 从 0 开始递增的编号。那么最终如下图所示:

基于 Spring Cloud Stream

聪慧的胖友,是不是已经发现,其实就是方案一,哈哈哈~

② 对于问题二,因为实现起来比较复杂,所以我们暂时通过手动设置启动的 Consumer 消费的 Queue,嘿嘿。

下面,我们来实现一个 Spring Cloud Stream RabbitMQ 下的顺序消息的示例。最终项目如下图所示:项目结构

9.1 搭建生产者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-producer-partitioning 项目作为生产者。

9.1.1 配置文件

修改 application.yml 配置文件,添加 partition-key-expression 配置项,设置 Producer 发送顺序消息的 Sharding key。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-03 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
binder: rabbit001 # 设置使用的 Binder 名字
# Producer 配置项,对应 ProducerProperties 类
producer:
partition-key-expression: payload['id'] # 分区 key 表达式。该表达式基于 Spring EL,从消息中获得分区 key。
partition-count: 2 # 分区大小,默认为 1 分区
# required-groups: demo01-consumer-group-DEMO-TOPIC-01 # 发送到的消费者分组,默认为空

server:
port: 18080

spring.cloud.stream.bindings.<bindingName>.producer.partition-count 配置项,设置分区大小,默认为 1 不进行分区。也就是说,设置大于 1 的情况下,才会进行分区。

spring.cloud.stream.bindings.<bindingName>.producer.partition-key-expression 配置项,该表达式基于 Spring EL,从消息中获得 Sharding key。

友情提示:Sharding Key 和 Partition Key 是等价的,有些文章喜欢叫分片键,有些文章喜欢叫分区键。

艿艿自己的习惯,是叫 Sharding Key,奈何 Spring Cloud Stream 是 Partition Key,所以下文胖友看到两个词存在混用的情况,知道是一个意思哈~

这里,我们设置该配置项为 payload['id'],表示从 Spring Message 的 payload 的 id。稍后我们发送的消息的 payload 为 Demo01Message,那么 id 就是 Demo01Message.id

如果我们想从消息的 headers 中获得 Sharding key,可以设置为 headers['partitionKey']

③ Spring Cloud Stream 使用 PartitionHandler 进行 Sharding key 的获得与计算,最终 Sharding key 的结果为 key.hashCode() % partitionCount

感兴趣的胖友,可以阅读 PartitionHandler 的 #determinePartition(Message<?> message) 方法。

我们以发送一条 id 为 1 的 Demo01Message 消息为示例,最终会发送到对应 RabbitMQ Queue 的队列为 DEMO-TOPIC-03-1。计算过程如下:

// 第一步,PartitionHandler 使用 `partition-key-expression` 表达式,从 Message 中获得 Sharding key
key => 1

// 第二步,PartitionHandler 计算最终的 Sharding key
// 默认情况下,每个 RocketMQ Topic 的队列总数是 4。
key => key.hashCode() % partitionCount = 1.hashCode() % 4 = 1 % 4 = 1

// 第三步,RabbitMQ 路由到 `DEMO-TOPIC-03-1` 队列。
// 具体的过程,稍后我们在测试的过程中,结合 RabbitMQ 控制台一起说,会更加清晰明了。

这样,我们就能保证相同 Sharding Key 的消息,发送到相同的对应 RabbitMQ Queue 队列中。当前,前提是该队列的 partition-count 总数不能变噢,不然计算的 Sharding Key 会发生变化。

9.1.2 Demo01Controller

修改 Demo01Controller 类,增加发送 3 条顺序消息的 HTTP 接口。代码如下:

@GetMapping("/send_orderly")
public boolean sendOrderly() {
// 发送 3 条相同 id 的消息
int id = new Random().nextInt();
for (int i = 0; i < 3; i++) {
// 创建 Message
Demo01Message message = new Demo01Message().setId(id);
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
}
return true;
}

每次发送的 3 条消息使用相同的 id,配合上我们使用它作为 Sharding key,就可以发送对应 Topic 的相同队列中。

另外,整列发送的虽然是顺序消息,但是和发送普通消息的代码是一模一样的。

9.2 搭建消费者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-partitioning 项目作为消费者。

9.2.1 配置文件

修改 application.yml 配置文件,添加 partitioned 配置项,设置 Consumer 消费来自分区的消息。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-03 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
partitioned: true # 是否消费来自队列分区的消息,默认为 false
instance-index: ${CONSUMER_INSTANCE_INDEX} # 消费来自哪个分区的消息,默认为 -1

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

spring.cloud.stream.bindings.<bindingName>.consumer.partitioned 配置项,是否消费来自队列分区的消息,默认为 false。这里我们设置为 true,进行开启,从而最终实现顺序消费。

spring.cloud.stream.bindings.<bindingName>.consumer.instance-index 配置项,消费来自哪个分区的消息,默认为 -1。这里我们设置为 ${CONSUMER_INSTANCE_INDEX},稍后我们会从环境变量进行设置,这样可以实现启动的 Consumer 消费指定分区(队列)的消息,从而最终实现顺序消费。

9.3 简单测试

① 使用 IDEA,设置环境变量为 CONSUMER_INSTANCE_INDEX=0,并执行 ConsumerApplication,启动消费者的实例,消费来自分区 0 的消息。如下图所示:IDEA 设置

然后再使用 IDEA,设置环境变量为 CONSUMER_INSTANCE_INDEX=1,并执行 ConsumerApplication,启动消费者的实例,消费来自分区 1 的消息。

此时,我们打开 RabbitMQ 运维界面,查看到名字为 DEMO-TOPIC-03 的 Exchange 下有两个名字为 DEMO-TOPIC-03-{index},分别路由到对应的 DEMO-TOPIC-03.demo01-consumer-group-DEMO-TOPIC-01-{index} 队列。如下图所示: Exchange

如此,Spring Cloud Stream RabbitMQ 在获得消息的 Sharding Key 之后,通过发送的消息的 RoutingKey 为对应的 DEMO-TOPIC-03-{index},从而最终被 RabbitMQ 路由到 DEMO-TOPIC-03.demo01-consumer-group-DEMO-TOPIC-01-{index} 队列。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_orderly 接口,发送顺序消息。IDEA 控制台输出日志如下:

// 分区为 0 的 Consumer
2020-03-05 09:06:45.669 INFO 55685 --- [MO-TOPIC-01-0-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=1693382842}]
2020-03-05 09:06:45.672 INFO 55685 --- [MO-TOPIC-01-0-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=1693382842}]
2020-03-05 09:06:45.673 INFO 55685 --- [MO-TOPIC-01-0-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=1693382842}]

// 分区为 1 的 Consumer

随机发送的消息编号为 1693382842,经过计算后,Sharding Key 为 1693382842 % 2 等于 0,所以发送到 DEMO-TOPIC-03.demo01-consumer-group-DEMO-TOPIC-01-0 队列,最终被消费分区为 0 的 Consumer 所消费。

10. 消息过滤

示例代码对应仓库:

Spring Cloud Stream 提供了通用Consumer 级别的效率过滤器机制。我们只需要使用 @StreamListener 注解的 condition 属性,设置消息满足指定 Spring EL 表达式的情况下,才进行消费。

下面,我们来实现一个 Spring Cloud Stream RabbitMQ 下的消息过滤的示例。最终项目如下图所示:项目结构

10.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目即可。

10.1.1 Demo01Controller

修改 Demo01Controller 类,增加发送 3 条tag 消息头的消息的 HTTP 接口。代码如下:

@GetMapping("/send_tag")
public boolean sendTag() {
for (String tag : new String[]{"yunai", "yutou", "tudou"}) {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader("tag", tag) // <X> 设置 Tag
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
}
return true;
}

<X> 处,设置发送消息的 tag 消息头。

10.2 搭建消费者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-filter 项目作为消费者。

10.2.1 Demo01Consumer

修改 Demo01Consumer 类,使用 @StreamListener 注解的 condition 属性来过滤消息。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(value = MySink.DEMO01_INPUT, condition = "headers['tag'] == 'yunai'")
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}

这里我们设置消息的 Header 带有的 tag 值为 yunai 时,才进行消费。

10.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_tag 接口,发送带有 Tag 的消息。IDEA 控制台输出日志如下:

// 消息头 tag 为 `yunai` 的消息被消费
2020-03-07 09:52:45.654 INFO 67291 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-1461057474}]

// 消息头 tag 为 `yutou` 和 `tudou` 的消息被过滤
2020-03-07 09:52:45.656 WARN 67291 --- [DEMO-TOPIC-01-1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: d8cd778b-868f-cd85-8048-1fce8bc6381e
2020-03-07 09:52:45.657 WARN 67291 --- [DEMO-TOPIC-01-1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: cf3a35f8-5d91-317f-7a5b-d82a197b8f04

只消费了一条消息头为 yunai 的消息,而消息头为 yutoutudou 的消息被 Consumer 过滤。要注意,被过滤掉的消息,后续是无法被消费掉了,效果和消费成功是一样的。

11. 事务消息

示例代码对应仓库:

RabbitMQ 内置提供事务消息的支持。对事务消息的概念不了解的胖友,可以看看 《RabbitMQ 之消息确认机制(事务 + Confirm)》 文章的「事务机制」小节。

不过 RabbitMQ 提供的并不是完整的的事务消息的支持,缺少了回查机制。目前,常用的分布式消息队列,只有 RocketMQ 提供了完整的事务消息的支持,具体的可以看看《芋道 Spring Boot 消息队列 RocketMQ 入门》「9. 事务消息」小节,😈 暂时不拓展开来讲。

下面,我们来实现一个 Spring Cloud Stream RabbitMQ 下的事务消息的示例。最终项目如下图所示:项目结构

11.1 搭建生产者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-producer-transaction 项目作为生产者。

11.1.1 配置文件

修改 application.yml 配置文件,添加 spring.cloud.stream.rabbit.<bindingName>.producer.transacted 配置项为 true,开启发送事务消息的功能。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
binder: rabbit001 # 设置使用的 Binder 名字
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-output:
# RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类
producer:
transacted: true # 是否开启事务功能,默认为 false

server:
port: 18080

11.1.2 TransactionConfig

创建 TransactionConfig 类,创建 RabbitTransactionManager Bean,RabbitMQ 的事务管理器,集成到 Spring 的事务体系中,这样就可以使用 @Transactional 声明式事务。代码如下:

@Configuration
@EnableTransactionManagement
public class TransactionConfig {

@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}

}

11.1.3 Demo01ControllerDemo01Controller

修改 Demo01Controller 类,增加发送事务消息的 HTTP 接口。代码如下:

// Demo01Controller .java

@Transactional
@GetMapping("/send_transaction")
public void sendTransaction() throws InterruptedException {
// 创建 Message
int id = new Random().nextInt();
Demo01Message message = new Demo01Message()
.setId(id);
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
logger.info("[syncSend][发送编号:[{}] 发送成功]", id);

// <X> 等待
Thread.sleep(10 * 1000L);
}

在发送消息方法上,我们添加了 @Transactional 注解,声明事务。因为我们创建了 RabbitTransactionManager 事务管理器,所以这里会创建 RabbitMQ 事务。

<X> 处,我们故意等待 Thread#sleep(long millis) 10 秒,判断 RabbitMQ 事务是否生效。

  • 如果同步发送消息成功后,Consumer 立即消费到该消息,说明未生效。
  • 如果 Consumer 是 10 秒之后,才消费到该消息,说明已生效。

11.2 搭建消费者

直接使用「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目即可。

11.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_transaction 接口,发送事务消息。IDEA 控制台输出日志如下:

// Producer 成功同步发送了 1 条消息。此时,事务并未提交
2020-03-05 14:10:08.428 INFO 13345 --- [io-18080-exec-1] c.i.s.l.r.p.controller.Demo01Controller : [syncSend][发送编号:[1188407341] 发送成功]

// 10 秒后,Producer 提交事务。
// 此时,Consumer 消费到该消息。
2020-03-05 14:10:18.432 INFO 9946 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=1188407341}]

Consumer 在事务消息提交后,消费到该消息。符合预期~

12. 消费者的消息确认

示例代码对应仓库:

在 RabbitMQ 中,Consumer 有两种消息确认的方式:

  • 方式一,自动确认。
  • 方式二,手动确认。

对于自动确认的方式,RabbitMQ Broker 只要将消息写入到 TCP Socket 中成功,就认为该消息投递成功,而无需 Consumer 手动确认

对于手动确认的方式,RabbitMQ Broker 将消息发送给 Consumer 之后,由 Consumer 手动确认之后,才任务消息投递成功。

实际场景下,因为自动确认存在可能丢失消息的情况,所以在对可靠性有要求的场景下,我们基本采用手动确认。当然,如果允许消息有一定的丢失,对性能有更高的产经下,我们可以考虑采用自动确认。

😈 更多关于消费者的消息确认的内容,胖友可以阅读如下的文章:

在 Spring-AMQP 中,在 AcknowledgeMode 中,定义了三种消息确认的方式:

// AcknowledgeMode.java

/**
* No acks - {@code autoAck=true} in {@code Channel.basicConsume()}.
*/
NONE, // 对应 Consumer 的自动确认

/**
* Manual acks - user must ack/nack via a channel aware listener.
*/
MANUAL, // 对应 Consumer 的手动确认,由开发者在消费逻辑中,手动进行确认。

/**
* Auto - the container will issue the ack/nack based on whether
* the listener returns normally, or throws an exception.
* <p><em>Do not confuse with RabbitMQ {@code autoAck} which is
* represented by {@link #NONE} here</em>.
*/
AUTO; // 对应 Consumer 的手动确认,在消费消息完成(包括正常返回、和抛出异常)后,由 Spring-AMQP 框架来“自动”进行确认。

  • 实际上,就是将手动确认进一步细分,提供了由 Spring-AMQP 提供 Consumer 级别的自动确认。
  • 在上述的示例中,我们都采用了 Spring-AMQP 默认的 AUTO 模式

下面,我们来实现一个 Spring Cloud Stream RabbitMQ 下的 Consumer 使用 MANUAL 模式,手动确认的示例。最终项目如下图所示:项目结构

12.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目即可。

12.2 搭建消费者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-ack 项目作为消费者。

12.2.1 配置文件

修改 application.yml 配置文件,添加 spring.cloud.stream.rabbit.<bindingName>.consumer.acknowledge-mode 配置项为 MANUAL,设置 Consumer 使用 MANUAL 模式,手动确认消息被消费完成。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-input:
# RabbitMQ Consumer 配置项,对应 RabbitConsumerProperties 类
consumer:
acknowledge-mode: MANUAL # 消费消息的确认模式,默认为 AUTO 自动确认

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

12.2.2 Demo01Consumer

修改 Demo01Consumer 类,增加手动确认消息被消费完成的代码。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

private AtomicInteger index = new AtomicInteger();

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// 提交消费进度
if (index.incrementAndGet() == 1) {
// ack 确认消息
// 第二个参数 multiple ,用于批量确认消息,为了减少网络流量,手动确认可以被批处。
// 1. 当 multiple 为 true 时,则可以一次性确认 deliveryTag 小于等于传入值的所有消息
// 2. 当 multiple 为 false 时,则只确认当前 deliveryTag 对应的消息
channel.basicAck(deliveryTag, false);
}
}

}

① 在消费方法上,我们增加类型为 Channel 的方法参数,和 deliveryTag 。通过调用其 Channel#basicAck(deliveryTag, multiple) 方法,可以进行消息的确认。这里,艿艿添加了比较详细的注释说明,胖友可以自己瞅瞅噢。

② 在消费逻辑中,我们故意只提交消费的第一条消息。😈 这样,我们只需要发送两条消息,如果第二条的消费进度没有被提交,就可以说明手动提交消费进度成功。

12.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送两条消息。IDEA 控制台输出日志如下:

// Consumer 消费 2 条消息成功
2020-03-06 08:01:31.843 INFO 90049 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-223429437}]
2020-03-06 08:01:32.205 INFO 90049 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-934118987}]

我们打开 RabbitMQ 运维界面,查看下 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01 队列的消息情况,会看到一条未确认的消息。如下图所示: 队列

13. 生产者的发送确认

示例代码对应仓库:

在开始本小节之前,胖友先看看《芋道 Spring Boot 消息队列 RabbitMQ 入门》文章的「13. 消费者的消息确认」小节,因为 Spring Cloud Stream RabbitMQ 对 Producer 的发送消息的确认,又进行了二次通用封装,解释起来会比较复杂。

下面,我们来实现一个 Spring Cloud Stream RabbitMQ 下的 Producer 的发送消息确认的示例。最终项目如下图所示:项目结构

13.1 搭建生产者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-producer-confirm 项目作为生产者。

13.1.1 配置文件

修改 application.yml 配置文件,增加 Producer 的发送消息确认相关配置。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
publisher-returns: true # 设置消息是否回退,默认为 false
publisher-confirm-type: simple # 设置开启消息确认模型,默认为 null 不进行确认
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
binder: rabbit001 # 设置使用的 Binder 名字
producer:
error-channel-enabled: true # 是否开启异常 Channel,默认为 false 关闭
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-output:
# RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类
producer:
confirm-ack-channel: demo01-producer-confirm # 设置发送确认的 Channel,默认为 null

server:
port: 18080

① 设置 spring.cloud.stream.binders.<binderName>.environment.spring.publisher-returns 配置项为 true,开启消息回退的功能。即,当 Producer 成功发送消息到 RabbitMQ Broker 时,但是在通过 Exchange 进行匹配不到 Queue 时,Broker 会将该消息回退给 Producer 。

设置 spring.cloud.stream.bindings.<dingingName>.producer.error-channel-enabled 配置项为 true,开启 Producer 的 异常 Channel。因为,当消息发送被退回时,Spring Cloud Stream RabbitMQ 是发送一个本地异常消息到相应的异常 Channel。

② 设置 spring.cloud.stream.binders.<binderName>.environment.spring.publisher-confirm-type 配置项为 simple,开启消息确认模式为 simple。即,使用同步的 Confirm 模式。即,当 Producer 发送消息到 RabbitMQ Broker 超过指定时间未成功时,会抛出 AmqpException 异常。

友情提示:目前 simple 模式会有问题,即使发送失败,也不会抛出 AmqpException 异常。原因是,RabbitMessageChannelBinder#createProducerMessageHandler(...) 方法,在创建 AmqpOutboundEndpoint 时,并未判断使用 simple 确认模式,导致并未调用 AmqpOutboundEndpoint 的 #setWaitForConfirm(boolean waitForConfirm) 方法,设置同步等待消息的确认结果。

核心代码,如下图所示:发送消息

设置 spring.cloud.stream.rabbit.bindings.<bindingName>.producer.confirm-ack-channel 配置项为 demo01-producer-confirm,设置发送确认的 Channel,默认为 null。这样,我们可以通过订阅 demo01-producer-confirm 这个 Channel,实现自定义逻辑。

13.1.2 Demo01ProducerConfirmCallback

创建 Demo01ProducerConfirmCallback 类,监听 demo01-producer-confirm 这个 Channel,实现消息确认的监听。代码如下:

@Component
public class Demo01ProducerConfirmCallback {

private Logger logger = LoggerFactory.getLogger(getClass());

@ServiceActivator(inputChannel = "demo01-producer-confirm")
public void onPublisherConfirm(Message message) {
logger.info("[onPublisherConfirm][headers:{}]", message.getHeaders());
logger.info("[onPublisherConfirm][payload:{}]", message.getPayload());
}

}

#onPublisherConfirm(Message message) 方法上,我们添加了 @ServiceActivator 注解,声明监听来自 demo01-producer-confirm Channel 的消息,从而实现对 demo01-output 发送消息的结果的确认。 关联

不过实际上,如果胖友是使用的消息确认模式为 simple 类型,不写这个逻辑也是没问题的,因为发送失败会同步抛出 AmqpException 异常。

13.1.3 Demo01ProducerReturnCallback

创建 Demo01ProducerReturnCallback 类,监听 DEMO-TOPIC-01.errors 这个 Channel,实现消息回退的监听。代码如下:

@Component
public class Demo01ProducerReturnCallback {

private Logger logger = LoggerFactory.getLogger(getClass());

@ServiceActivator(inputChannel = "DEMO-TOPIC-01.errors")
public void handleError(ErrorMessage errorMessage) {
logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
logger.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
}

@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
public void globalHandleError(ErrorMessage errorMessage) {
logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
}

}

「6. 消费异常处理机制」小节是类似的,只是它是针对 Consumer 消费消息异常的情况,而这里是针对 Producer 发送消息异常的情况。

因为 Spring Cloud Stream RabbitMQ 将消息回退作为一种异常处理,所以会发送本地异常消息到相应的异常 Channel,所以这里我们可以通过监听它,处理消息回退的情况。

至于为什么 Channel 是 DEMO-TOPIC-01.errors 呢?原因如下图所示: 关联

13.2 搭建消费者

直接使用「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目即可。

13.3 简单测试

🌲 先来测试一波 Producer 的发送消息确认

① 执行 ConsumerApplication,启动消费者的实例,用于自己创建相关的 RabbitMQ Exchange 和 Queue,并进行绑定。之后,我们在 RabbitMQ 运维界面看到它们如下图:RabbitMQ 运维界面

DEBUG 执行 ProducerApplication,启动生产者的实例。之后,在 Demo01ProducerConfirmCallback 的 #onPublisherConfirm(...) 方法打断点,方便稍后查看 demo01-producer-confirm Channel 监听到的消息。

然后,请求 http://127.0.0.1:18080/demo01/send 接口,发送消息。此时 IDEA 进入该打的断点,如下图所示:IDEA 断点

发送消息确认测试成功~

🌲 再来测试一波 Producer 的发送消息退回

① 使用 RabbitMQ 运维界面,取消 RabbitMQ Exchange 和 Queue 的绑定,因为要测试 Producer 的发送消息退回的条件是,发送的消息匹配不到 Queue。如下图所示:RabbitMQ 运维界面

② 在 Demo01ProducerReturnCallback 的 handleError(... ) 方法打断点,方便稍后查看 DEMO-TOPIC-01.errors Channel 监听到的消息。

然后,请求 http://127.0.0.1:18080/demo01/send 接口,发送消息。此时 IDEA 进入该打的断点,如下图所示:IDEA 断点

发送消息退回测试成功~

旁白君:为了编写本小节的示例,断断续续倒腾了大半天,发现无论是官方文档,还是网上的博客或是资料,都几乎没有!自己一阵乱倒腾,也没有解决,最后运气看到了 https://github.com/jeffrey-garcia/demo-spring-cloud-stream.git 项目,感恩~

14. 批量发送消息

示例代码对应仓库:

友情提示:Spring Cloud Stream RabbitMQ 是基于 Spring-AMQP 操作 RabbitMQ 的。为了表述清晰准确,艿艿可能会直接使用 Spring-AMQP 哈~

在一些业务场景下,我们希望使用 Producer 批量发送消息,提高发送性能。

不同于我们在《芋道 Spring Boot 消息队列 RocketMQ 入门》「4. 批量发送消息」 功能,RocketMQ 是提供了一个可以批量发送多条消息的 API 。而 Spring-AMQP 提供的批量发送消息,它提供了一个 MessageBatch 消息收集器,将发送给相同 Exchange + RoutingKey 的消息们,“偷偷”收集在一起,当满足条件时候,一次性批量发送提交给 RabbitMQ Broker 。

Spring-AMQP 通过 BatchingRabbitTemplate 提供批量发送消息的功能。如下是三个条件,满足任一即会批量发送:

  • 【数量】batchSize :超过收集的消息数量的最大条数。
  • 【空间】bufferLimit :超过收集的消息占用的最大内存。
  • 【时间】timeout :超过收集的时间的最大等待时长,单位:毫秒。😈 不过要注意,这里的超时开始计时的时间,是以最后一次发送时间为起点。也就说,每调用一次发送消息,都以当前时刻开始计时,重新到达 timeout 毫秒才算超时。

另外,BatchingRabbitTemplate 提供的批量发送消息的能力比较弱。对于同一个 BatchingRabbitTemplate 对象来说,同一时刻只能有一个批次(保证 Exchange + RoutingKey 相同),否则会报错。

下面,我们来实现一个 Spring Cloud Stream RabbitMQ 下的 Producer 批量发送消息的示例。最终项目如下图所示:项目结构

14.1 搭建生产者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-producer-batch 项目作为生产者。

14.1.1 配置文件

修改 application.yml 配置文件,增加 Producer 批量发送消息相关配置。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
binder: rabbit001 # 设置使用的 Binder 名字
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
rabbit:
bindings:
demo01-output:
# RabbitMQ Producer 配置项,对应 RabbitProducerProperties 类
producer:
batching-enabled: true # 是否开启批量发送功能,默认为 false
batch-size: 100 # 超过收集的消息数量的最大条数,默认为 100
batch-buffer-limit: 10000 # 每次批量发送消息的最大内存,默认为 10000
batch-timeout: 30000 # 超过收集的时间的最大等待时长,单位:毫秒,默认为 5000

server:
port: 18080

spring.cloud.stream.rabbit.bindings.<bindingName>.producer 配置项下,我们设置了 batching-enabledbatch-sizebatch-buffer-limitbatch-timeout 配置项。具体它们的数值配置多少,根据自己的应用来。

这里,我们故意将 timeout 配置成了 30 秒,主要为了演示之用。

14.1.2 Demo01Controller

修改 Demo01Controller 类,增加间隔 10 秒发送 3 条消息的 HTTP 接口,用于测试 Producer 批量发送消息。代码如下:

// Demo01Controller.java

@Autowired
private MySource mySource;

@GetMapping("/send_batch")
public boolean sendBatch() throws InterruptedException {
// 发送 3 条消息,每条中间间隔 10 秒
for (int i = 0; i < 3; i++) {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 发送消息
mySource.demo01Output().send(springMessage);

// 故意每条消息之间,隔离 10 秒
logger.info("[sendBatch][发送编号:[{}] 发送成功]", message.getId());
Thread.sleep(10 * 1000L);
}
return true;
}

具体发送消息的代码,和我们在「3. 快速入门」小节基本是一模一样的。差别在于底层从使用 RabbitTemplate 变成 BatchingRabbitTemplate 来发送批量消息。😈 对的,这也是为什么艿艿在上文说到,Spring-AMQP 是“偷偷”收集来实现批量发送,对于我们使用发送消息的方法,还是一致的。

实现原理:BatchingRabbitTemplate 通过重写 RabbitTemplate 的 #send(String exchange, String routingKey, Message message, CorrelationData correlationData) 核心方法,实现批量发送的功能。感兴趣的胖友,可以自己去研究下源码,不复杂哈~

14.2 搭建消费者

直接使用「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目即可。

14.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_batch 接口,批量发送消息。IDEA 控制台输出日志如下:

// Producer 成功同步发送了 3 条消息,每条间隔 10 秒。
2020-03-07 09:41:55.543 INFO 69813 --- [io-18080-exec-1] c.i.s.l.r.p.controller.Demo01Controller : [sendBatch][发送编号:[-119797922] 发送成功]
2020-03-07 09:42:05.545 INFO 69813 --- [io-18080-exec-1] c.i.s.l.r.p.controller.Demo01Controller : [sendBatch][发送编号:[2114488618] 发送成功]
2020-03-07 09:42:15.550 INFO 69813 --- [io-18080-exec-1] c.i.s.l.r.p.controller.Demo01Controller : [sendBatch][发送编号:[2015833350] 发送成功]

// Demo05Consumer 在最后一条消息发送成功后果的 30 秒,消费到这 3 条消息。
2020-03-07 09:42:45.569 INFO 60022 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-119797922}]
2020-03-07 09:42:45.570 INFO 60022 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=2114488618}]
2020-03-07 09:42:45.571 INFO 60022 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=2015833350}]

  • 因为使用 BatchingRabbitTemplate 批量发送消息,所以在 Producer 成功发送完第一条消息后,Consumer 并未消费到这条消息。
  • 又因为 BatchingRabbitTemplate 是按照每次发送后,都重新计时,所以在最后一条消息成功发送后的 30 秒,Consumer 才消费到批量发送的 3 条消息。

15. 批量消费消息

示例代码对应仓库:

「14. 批量发送消息」小节,我们已经实现批量发送消息到 RabbitMQ Broker 中。那么,我们来思考一个问题,这批消息在 RabbitMQ Broker 到底是存储一条消息,还是多条消息?

  • 如果胖友使用过 Kafka、RocketMQ 这两个消息队列,那么判断肯定会是多条消息。
  • 「14.3 简单测试」中,我们可以看到逐条消息的消费的日志,也会认为是多条消息。

😭 实际上,RabbitMQ Broker 存储的是一条消息。又或者说,RabbitMQ 并没有提供批量接收消息的 API 接口

那么,为什么我们在「14. 批量发送消息」能够实现呢?答案是批量发送消息是 Spring-AMQP 的 SimpleBatchingStrategy 所封装提供:

这个操作,是不是略微有点骚气?!艿艿在这里卡了很久!!!莫名其妙的~一直以为,RabbitMQ 提供了批量发送消息的 API 接口啊。

OK ,虽然很悲伤,但是我们还是回到这个小节的主题。

在一些业务场景下,我们希望使用 Consumer 批量消费消息,提高消费速度。在 Spring-AMQP 中,提供了两种批量消费消息的方式。本小节,我们先来看第一种,它需要基于「14. 批量发送消息」之上实现。

下面,我们来实现一个 Spring Cloud Stream RabbitMQ 下的 Consumer 批量消费消息的示例。最终项目如下图所示:项目结构

15.1 搭建生产者

必须必须必须使用「14. 批量发送消息」小节的 labx-10-sc-stream-rabbitmq-producer-batch 项目。

15.2 搭建消费者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-batch 项目作为消费者。

15.2.1 配置文件

修改 application.yml 配置文件,设置 spring.cloud.stream.bindings.<bindingName>.consumer.batch-modetrue,开启 Consumer 批量消费模式。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
binders:
rabbit001:
type: rabbit # 设置 Binder 的类型
environment: # 设置 Binder 的环境配置
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
binder: rabbit001 # 设置使用的 Binder 名字
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
batch-mode: true # 是否批量消费默认,默认为 false

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

15.2.2 Demo01Consumer

修改 Demo01Consumer 类,将消费消息的方法的参数改为 List<?>,从而批量消费消息。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload List<Demo01Message> message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}

15.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 <http://127.0.0.1:18080/demo01/send_batch> 接口,批量发送消息。IDEA 控制台输出日志如下:

// Producer 成功同步发送了 3 条消息,每条间隔 10 秒。
2020-03-07 12:31:38.665 INFO 3312 --- [io-18080-exec-2] c.i.s.l.r.p.controller.Demo01Controller : [sendBatch][发送编号:[1485308230] 发送成功]
2020-03-07 12:31:48.672 INFO 3312 --- [io-18080-exec-2] c.i.s.l.r.p.controller.Demo01Controller : [sendBatch][发送编号:[63331063] 发送成功]
2020-03-07 12:31:58.677 INFO 3312 --- [io-18080-exec-2] c.i.s.l.r.p.controller.Demo01Controller : [sendBatch][发送编号:[-71411575] 发送成功]

// Consumer 在最后一条消息发送成功后果的 30 秒,一次性批量消费了这 3 条消息。
2020-03-07 12:32:28.698 INFO 3246 --- [DEMO-TOPIC-01-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:[[B@229b5068, [B@7e3cdd55, [B@24399954]]

符合预期,Consumer 批量消费了 3 条消息。

不过要注意,通过这种方式批量消费消息,一次 Consumer 消费多少条消息,是由 Producer 一次批量发送多少条消息所决定,因为本质上 Consumer 只收到一条组装的批量消息。

15.4 另一种批量消费方式

Spring-AMQP 提供了另一种批量消费方式:通过 Consumer 阻塞等待最多 receiveTimeout 秒,拉取 batchSize 条消息,进行批量消费。不过艿艿翻查和自己尝试,发现在 Spring Cloud Stream RabbitMQ 貌似没有将 Spring-AMQP 的这种方式进行封装并提供。

因此,我们暂时就无法在 Spring Cloud Stream RabbitMQ 搭建这种的示例。感兴趣的胖友,可以暂时去看看《芋道 Spring Boot 消息队列 RabbitMQ 入门》文章的「6. 批量消费消息(第二弹)」小节。

16. 监控端点

示例代码对应仓库:

Spring Cloud Stream 的 endpoint 模块,基于 Spring Boot Actuator,提供了自定义监控端点 bindingschannels,用于获取 Spring Cloud Stream 的 Binding 和 Channel 信息。

同时,Spring Boot 针对 RabbitMQ 拓展了 Spring Boot Actuator 内置的 health 端点,通过自定义的 RabbitHealthIndicator,获取 RabbitMQ 客户端的健康状态。

友情提示:对 Spring Boot Actuator 不了解的胖友,可以后续阅读《芋道 Spring Boot 监控端点 Actuator 入门》文章。

我们来搭建一个 Spring Cloud Stream RocketMQ 监控端点的使用示例。最终项目如下图所示:项目结构

16.1 搭建生产者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-producer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-producer-actuator 项目作为生产者。

16.1.1 引入依赖

pom.xml 文件中,额外引入 Spring Boot Actuator 相关依赖。代码如下:

<!-- 实现对 Actuator 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

16.1.2 配置文件

修改 application.yaml 配置文件,额外增加 Spring Boot Actuator 配置项。配置如下:

management:
endpoints:
web:
exposure:
include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
endpoint:
# Health 端点配置项,对应 HealthProperties 配置类
health:
enabled: true # 是否开启。默认为 true 开启。
show-details: ALWAYS # 何时显示完整的健康信息。默认为 NEVER 都不展示。可选 WHEN_AUTHORIZED 当经过授权的用户;可选 ALWAYS 总是展示。

每个配置项的作用,胖友看下艿艿添加的注释。如果还不理解的话,后续看下《芋道 Spring Boot 监控端点 Actuator 入门》文章。

16.1.3 简单测试

① 使用 ProducerApplication 启动生产者。

② 访问应用的 bindings 监控端点 http://127.0.0.1:18080/actuator/bindings,返回结果如下图: 监控端点

③ 访问应用的 channels 监控端点 http://127.0.0.1:18080/actuator/channels,返回结果如下图: 监控端点

④ 访问应用的 health 监控端点 http://127.0.0.1:18080/actuator/health,返回结果如下图: 监控端点

16.2 搭建消费者

「3. 快速入门」小节的 labx-10-sc-stream-rabbitmq-consumer-demo 项目,复制出 labx-10-sc-stream-rabbitmq-consumer-actuator 项目作为消费者。

16.2.1 引入依赖

pom.xml 文件中,额外引入 Spring Boot Actuator 相关依赖。代码如下:

<!-- 实现对 Actuator 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

16.2.2 配置文件

修改 application.yaml 配置文件,额外增加 Spring Boot Actuator 配置项。配置如下:

management:
endpoints:
web:
exposure:
include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
endpoint:
# Health 端点配置项,对应 HealthProperties 配置类
health:
enabled: true # 是否开启。默认为 true 开启。
show-details: ALWAYS # 何时显示完整的健康信息。默认为 NEVER 都不展示。可选 WHEN_AUTHORIZED 当经过授权的用户;可选 ALWAYS 总是展示。

每个配置项的作用,胖友看下艿艿添加的注释。如果还不理解的话,后续看下《芋道 Spring Boot 监控端点 Actuator 入门》文章。

16.2.3 简单测试

① 使用 ConsumerApplication 启动消费者,随机端口为 13195。

② 访问应用的 bindings 监控端点 http://127.0.0.1:13195/actuator/bindings,返回结果如下图: 监控端点

③ 访问应用的 channels 监控端点 http://127.0.0.1:13195/actuator/channels,返回结果如下图: 监控端点

④ 访问应用的 health 监控端点 http://127.0.0.1:13195/actuator/health,返回结果如下图: 监控端点

666. 彩蛋

至此,我们已经完成 Spring Cloud Stream RabbitMQ 的学习。如下是 RabbitMQ 相关的官方文档:

另外,想要在 Spring Boot 项目中使用 RabbitMQ 作为消息队列的胖友,可以阅读《芋道 Spring Boot 消息队列 RabbitMQ 入门》文章。

文章目录
  1. 1. 1. 概述
  2. 2. 2. Spring Cloud Stream 介绍
  3. 3. 3. 快速入门
    1. 3.1. 3.1 搭建生产者
      1. 3.1.1. 3.1.1 引入依赖
      2. 3.1.2. 3.1.2 配置文件
      3. 3.1.3. 3.1.3 MySource
      4. 3.1.4. 3.1.4 Demo01Message
      5. 3.1.5. 3.1.5 Demo01Controller
      6. 3.1.6. 3.1.6 ProducerApplication
    2. 3.2. 3.2 搭建消费者
      1. 3.2.1. 3.2.1 引入依赖
      2. 3.2.2. 3.2.2 配置文件
      3. 3.2.3. 3.2.3 MySink
      4. 3.2.4. 3.2.4 Demo01Message
      5. 3.2.5. 3.2.5 Demo01Consumer
      6. 3.2.6. 3.2.6 ConsumerApplication
    3. 3.3. 3.3 测试单集群多实例的场景
    4. 3.4. 3.4 测试多集群多实例的场景
    5. 3.5. 3.5 小结
  4. 4. 4. 定时消息
    1. 4.1. 4.1 安装插件
    2. 4.2. 4.2 搭建生产者
      1. 4.2.1. 4.2.1 配置文件
      2. 4.2.2. 4.2.2 Demo01Controller
    3. 4.3. 4.3 搭建消费者
      1. 4.3.1. 4.3.1 配置文件
    4. 4.4. 4.4 简单测试
    5. 4.5. 4.5 另外一种方案
  5. 5. 5. 消费重试
    1. 5.1. 5.1 搭建生产者
    2. 5.2. 5.2 搭建消费者
      1. 5.2.1. 5.2.1 配置文件
      2. 5.2.2. 5.2.2 Demo01Consumer
    3. 5.3. 5.3 简单测试
    4. 5.4. 5.4 另一种重试方案
  6. 6. 6. 消费异常处理机制
    1. 6.1. 6.1 搭建生产者
    2. 6.2. 6.2 搭建消费者
      1. 6.2.1. 6.2.1 Demo01Consumer
    3. 6.3. 6.3 简单测试
  7. 7. 7. 广播消费
    1. 7.1. 7.1 搭建生产者
    2. 7.2. 7.2 搭建消费者
      1. 7.2.1. 7.2.1 配置文件
    3. 7.3. 7.3 简单测试
  8. 8. 8. 并发消费
    1. 8.1. 8.1 搭建生产者
    2. 8.2. 8.2 搭建消费者
      1. 8.2.1. 8.2.1 配置文件
    3. 8.3. 8.3 简单测试
  9. 9. 9. 顺序消息
    1. 9.1. 9.1 搭建生产者
      1. 9.1.1. 9.1.1 配置文件
      2. 9.1.2. 9.1.2 Demo01Controller
    2. 9.2. 9.2 搭建消费者
      1. 9.2.1. 9.2.1 配置文件
    3. 9.3. 9.3 简单测试
  10. 10. 10. 消息过滤
    1. 10.1. 10.1 搭建生产者
      1. 10.1.1. 10.1.1 Demo01Controller
    2. 10.2. 10.2 搭建消费者
      1. 10.2.1. 10.2.1 Demo01Consumer
    3. 10.3. 10.3 简单测试
  11. 11. 11. 事务消息
    1. 11.1. 11.1 搭建生产者
      1. 11.1.1. 11.1.1 配置文件
      2. 11.1.2. 11.1.2 TransactionConfig
      3. 11.1.3. 11.1.3 Demo01ControllerDemo01Controller
    2. 11.2. 11.2 搭建消费者
    3. 11.3. 11.3 简单测试
  12. 12. 12. 消费者的消息确认
    1. 12.1. 12.1 搭建生产者
    2. 12.2. 12.2 搭建消费者
      1. 12.2.1. 12.2.1 配置文件
      2. 12.2.2. 12.2.2 Demo01Consumer
    3. 12.3. 12.3 简单测试
  13. 13. 13. 生产者的发送确认
    1. 13.1. 13.1 搭建生产者
      1. 13.1.1. 13.1.1 配置文件
      2. 13.1.2. 13.1.2 Demo01ProducerConfirmCallback
      3. 13.1.3. 13.1.3 Demo01ProducerReturnCallback
    2. 13.2. 13.2 搭建消费者
    3. 13.3. 13.3 简单测试
  14. 14. 14. 批量发送消息
    1. 14.1. 14.1 搭建生产者
      1. 14.1.1. 14.1.1 配置文件
      2. 14.1.2. 14.1.2 Demo01Controller
    2. 14.2. 14.2 搭建消费者
    3. 14.3. 14.3 简单测试
  15. 15. 15. 批量消费消息
    1. 15.1. 15.1 搭建生产者
    2. 15.2. 15.2 搭建消费者
      1. 15.2.1. 15.2.1 配置文件
      2. 15.2.2. 15.2.2 Demo01Consumer
    3. 15.3. 15.3 简单测试
    4. 15.4. 15.4 另一种批量消费方式
  16. 16. 16. 监控端点
    1. 16.1. 16.1 搭建生产者
      1. 16.1.1. 16.1.1 引入依赖
      2. 16.1.2. 16.1.2 配置文件
      3. 16.1.3. 16.1.3 简单测试
    2. 16.2. 16.2 搭建消费者
      1. 16.2.1. 16.2.1 引入依赖
      2. 16.2.2. 16.2.2 配置文件
      3. 16.2.3. 16.2.3 简单测试
  17. 17. 666. 彩蛋