生产者通过指定 RoutingKey (路由键) 指定消息路由规则。示例为 Spring AMQP 中 RabbitMQ 的使用。
Spring AMQP 文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/
Exchange Types (交换器类型)
-
fanout
最快的路由规则,Exchange 将所有接收到消息发送导与其绑定的所有队列中,不做任何判断操作,即使绑了路由键也会直接无视,常用来广播消息
/** * Fanout */ @Bean public FanoutExchange fanoutExc() { return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding fanoutBinding1(Queue topicQueue1, FanoutExchange fanoutExc) { return BindingBuilder.bind(topicQueue1).to(fanoutExc); } @Bean public Binding fanoutBinding2(Queue topicQueue2, FanoutExchange fanoutExc) { return BindingBuilder.bind(topicQueue2).to(fanoutExc); }
-
direct
将消息路由到 BindingKey 与 RoutingKey 完全匹配的队列中,常用来将有优先级的任务发到对应队列,得到更多处理资源
-
topic
将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
- RoutingKey 为一个点号
.
分隔的字符串(被点号.
分隔开的每一段独立的字符串称为一个单词),如com.rabbitmq.client
、java.util.concurrent
、com.hidden.client
- BindingKey 和 RoutingKey 一样也是点号
.
分隔的字符串 - BindingKey 中可以存在两种特殊字符串
*
和#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配多个单词(可以是零个)
所以,如果当一个队列的绑定键为
#
的时候,这个队列将会无视消息的路由键,接收所有的消息。 当*
(星号) 和#
(井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机也就相当于直连交换机。 - RoutingKey 为一个点号
-
headers (不推荐)
不依赖路由键而是 headers 属性进行完全匹配,性能很差因而不实用
@Bean public static HeadersExchange headersExc() { return new HeadersExchange(HEADERS_EXCHANGE); } public Queue headerQueue() { return new Queue(HEADER_QUEUE, true); } @Bean public Binding headerBinding() { Map<String, Object> map = new HashMap<>(); map.put("header1", "value1"); map.put("header2", "value2"); return BindingBuilder.bind(headerQueue()).to(headersExc()).whereAll(map).match(); }
可以如何使用
简单来讲,消息队列可以用来解耦模块、充当工作缓存、流量削峰
-
(简单模式)单生产者对应单消费者
相当于工作缓存,如邮件服务从队列中取件进行发送
-
(工作队列模式)单生产者对应多消费者
资源密集型任务中,多个消费者并行处理
-
(订阅模式)单生产者发到多消费者
如库存更新后需要通知多个缓存和多个数据库,则通过 fanout 交换器分出两个消息队列,分别对应缓存和数据库
-
(路由模式)有选择地发送和接收
指定路由 key
-
(主题模式)将队列需求绑定在特定模式上
-
(远程过程调用)在远程计算机上运行功能并等待结果
通过RPC在远程计算机上运行功能并等待结果
-
(发布者确认)异步确认发送者发布消息
用于实现可靠地分布,如扣款
如何解决消息丢失
消息丢失可能出现在生产者至MQ、消息队列本身、消费者三处。
生产者导致的消息丢失
RabbitMQ 事务机制为了避免消息丢失,可以在发送消息前,先开启执行 txSelect 方法开启一个事务,接着发送消息,如果消息投递失败,执行 txRollback 回滚事务,再执行重试操作重新发送,如果消息投递成功,执行 txCommit 方法提交事务。
但是,RabbitMQ事务同步阻塞,因此一般不会这样做。更常用的方法是 Confirm 模式。RabbitMQ Confirm 模式 通过异步回调的方式,接收成功回调生产者ack
接口,不成功回调nack
接口。
**注意:**Spring AMPQ 默认 publisher-confirm
为 false
,需要手动设 true
,此处可见下文关于 Spring AMQP publisher 参数内容。
因此,生产者与 broker 间消息可靠性保证思路就是
- 当消息发送到broker的时候,执行监听的回调函数。
- 在生产端要维护一个消息发送的表,消息发送的时候记录消息id,在消息成功落地broker磁盘并且进行回调确认(ack)的时候,根据本地消息表和回调确认的消息id进行对比,这样可以确保生产端的消息表中的没有进行回调确认(或者回调确认时网络问题)的消息进行补救式的重发,当然不可避免的就会在消息端可能会造成消息的重复消息。针对消费端重复消息,在消费端进行幂等处理。
RabbitMQ 导致消息丢失
RabbitMQ 没有对消息进行持久化,消息就可能随 RabbitMQ 宕机而丢失。解决方法就是调整 Confirm 模式,让 RabbitMQ 在完成消息持久化,消息到了硬盘上,再返回ack
接口。
消费者导致消息丢失
RabbitMQ 在默认的自动提交ack
配置下,如果消息处理完之前消费者挂了,消息会丢失。将自动提交ack
关闭后,如果消费者处理完消息前挂了,RabbitMQ 会认为没有处理成功,再次推送给消费者处理。
我们可以顺便来看一下 Kafka 和 RocketMQ 的处理
Kafka
生产者导致消息丢失
对于 Kafka 来说,生产者基本不会弄丢消息,因为生产者发送消息会等待 Kafka 响应成功,如果响应失败,生产者会自动不断地重试。
Broker端弄丢了数据
Kafka 通常会一台 leader + 两台 follower,当生产者消息刚写入 leader 成功,但是还没同步到 follower 时,leader 宕机了,此时会重新选举 leader,新的 leader 由于还未同步到这条数据,导致该条消息丢失。
解决办法是做一些配置,当有其他 follower 同步到了消息后才通知生产者消息接收成功了。配置如下:
- 给 topic 设置
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。- 在 Kafka 服务端设置
min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower。- 在 producer 端设置
acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。按上面的配置配置后,就可以保证在 Kafka Broker 端就算 leader 故障了,进行新 leader 选举切换时,也不会丢失数据。
消费者导致消息丢失
Kafka 消费端弄丢数据原因跟 RabbitMQ 类似,Kafka 消费者会在接收到消息的时候,会自动提交一个 offset 给 Kafka,告诉 Kafka 消息已经处理了。处理方法也跟 RabbitMQ 类似,关闭 offset 的自动提交即可。
RocketMQ
RocketMQ 导致数据丢失的原因与前面的 RabbitMQ 和 Kafka 都很类似。生产者就是因为网络抖动等原因消息投递失败,或者 RocketMQ 自身的 Master 节点故障,主备切换故障之类的,消费者则有可能是异步处理导致还未处理成功就给 RocketMQ 提交了 offset 标识消息已处理了。
在 RocketMQ 中,事务消息可以保证消息零丢失。RocketMQ 的事务消息基于这三个业务流程:生产者向RocketMQ发送 half 消息 -> RocketMQ 对 half 消息响应 -> commit/rollback。
- 如果生产者发送 half 失败,则作重试或对消息作持久化,给用户返回失败
- 如果RocketMQ处理失败,则生产者回滚该条消息,请求失败
- 如果 half 发送成功,但 RocketMQ 响应没能够传回,则会通过补偿机制回调接口,决定 commit 还是 rollback
- 包括 commit 或 rollback 时发生的失败都是用补偿机制处理
Spring AMQP
一般 Producer Consumer 设置
Producer
写一个 Config
类来进行 Binding
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
为消息套一个 UUID,方便在ack或nack时回调处理。
private CorrelationData getCorrelationData() {
return new CorrelationData(UUID.randomUUID().toString());
}
ConfirmCallBack
只返回标识值,因此作为 Publisher,可以维护一个 CorrelationID 到 发送信息 的 K-V 关系,从而在发送失败时再做处理,或者删除绑定关系。
Consumer
Consumer 侧的 ConnectionFactory
配置和 Producer 侧是完全一样的。BindingConfig
也完全一致:
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
然后通过 @RabbitListener
定义响应函数:
@Configuration
@RabbitListener(queues="direct.first")
public class RabbitMqListener {
@RabbitHandler
public void handler(String message) {
System.out.println(message);
}
}
Binding 过程也可以在 RabbitMqListener
中完成:
@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.second"),
key="directKey2"))
public class RabbitListener {
@RabbitHandler
public void handler(String message) {
System.out.println(message);
}
}
回调配置
AMQP 有三个相关参数:publisher-confirms
publisher-returns
和 mandatory
。
其中 mandatory
主要管的是消息投到 Exchange 之后,Exchange 无法将其路由到任何队列之后的操作,true
则把消息还给生产者,false
则丢弃。不过,按照 Spring 源码的逻辑,不开启 publisher-returns
就不会进行消息回调,因此要 mandatory
publisher-returns
必须都开才能起作用。另外,returnCallback
回调时返回的直接是经过封装的 byte[]
,需要进行反序列化来还原原消息。
值得注意的一个小点是,Spring 中的 mandatory
是 template 的参数而不是 rabbitmq 的。
publisher-confirms
是针对消息没有被成功放入Exchange的情况,无论成功失败都会调用回调函数,通过 ConfirmCallback
进行回调。