返回
Featured image of post RabbitMQ - 基本概念和 SpringBoot 下的使用

RabbitMQ - 基本概念和 SpringBoot 下的使用

整理了一下RabbitMQ学习和在SpringBoot使用过程中作的笔记.

生产者通过指定 RoutingKey (路由键) 指定消息路由规则。示例为 Spring AMQP 中 RabbitMQ 的使用。

Spring AMQP 文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/

Exchange Types (交换器类型)

  • fanout

    最快的路由规则,Exchange 将所有接收到消息发送导与其绑定的所有队列中,不做任何判断操作,即使绑了路由键也会直接无视,常用来广播消息

    /**
     * Fanout
     */
    @Bean
    public FanoutExchange fanoutExc() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding fanoutBinding1(Queue topicQueue1, FanoutExchange fanoutExc) {
        return BindingBuilder.bind(topicQueue1).to(fanoutExc);
    }
    
    @Bean
    public Binding fanoutBinding2(Queue topicQueue2, FanoutExchange fanoutExc) {
        return BindingBuilder.bind(topicQueue2).to(fanoutExc);
    }
    
  • direct

    将消息路由到 BindingKey 与 RoutingKey 完全匹配的队列中,常用来将有优先级的任务发到对应队列,得到更多处理资源

  • topic

    将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

    • RoutingKey 为一个点号 . 分隔的字符串(被点号 . 分隔开的每一段独立的字符串称为一个单词),如 com.rabbitmq.clientjava.util.concurrentcom.hidden.client
    • BindingKey 和 RoutingKey 一样也是点号 分隔的字符串
    • BindingKey 中可以存在两种特殊字符串 *#,用于做模糊匹配,其中 * 用于匹配一个单词,# 用于匹配多个单词(可以是零个)

    所以,如果当一个队列的绑定键为 # 的时候,这个队列将会无视消息的路由键,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机也就相当于直连交换机。

  • headers (不推荐)

    不依赖路由键而是 headers 属性进行完全匹配,性能很差因而不实用

    @Bean
    public static HeadersExchange headersExc() {
        return new HeadersExchange(HEADERS_EXCHANGE);
    }
    
    public Queue headerQueue() {
        return new Queue(HEADER_QUEUE, true);
    }
    @Bean
    public Binding headerBinding() {
        Map<String, Object> map = new HashMap<>();
        map.put("header1", "value1");
        map.put("header2", "value2");
        return BindingBuilder.bind(headerQueue()).to(headersExc()).whereAll(map).match();
    }
    

可以如何使用

简单来讲,消息队列可以用来解耦模块、充当工作缓存、流量削峰

  • (简单模式)单生产者对应单消费者

    相当于工作缓存,如邮件服务从队列中取件进行发送

  • (工作队列模式)单生产者对应多消费者

    资源密集型任务中,多个消费者并行处理

  • (订阅模式)单生产者发到多消费者

    如库存更新后需要通知多个缓存和多个数据库,则通过 fanout 交换器分出两个消息队列,分别对应缓存和数据库

  • (路由模式)有选择地发送和接收

    指定路由 key

  • (主题模式)将队列需求绑定在特定模式上

  • (远程过程调用)在远程计算机上运行功能并等待结果

    通过RPC在远程计算机上运行功能并等待结果

  • (发布者确认)异步确认发送者发布消息

    用于实现可靠地分布,如扣款


如何解决消息丢失

消息丢失可能出现在生产者至MQ、消息队列本身、消费者三处。

生产者导致的消息丢失

RabbitMQ 事务机制为了避免消息丢失,可以在发送消息前,先开启执行 txSelect 方法开启一个事务,接着发送消息,如果消息投递失败,执行 txRollback 回滚事务,再执行重试操作重新发送,如果消息投递成功,执行 txCommit 方法提交事务。

但是,RabbitMQ事务同步阻塞,因此一般不会这样做。更常用的方法是 Confirm 模式。RabbitMQ Confirm 模式 通过异步回调的方式,接收成功回调生产者ack接口,不成功回调nack接口。

**注意:**Spring AMPQ 默认 publisher-confirmfalse,需要手动设 true,此处可见下文关于 Spring AMQP publisher 参数内容。

因此,生产者与 broker 间消息可靠性保证思路就是

  • 当消息发送到broker的时候,执行监听的回调函数。
  • 在生产端要维护一个消息发送的表,消息发送的时候记录消息id,在消息成功落地broker磁盘并且进行回调确认(ack)的时候,根据本地消息表和回调确认的消息id进行对比,这样可以确保生产端的消息表中的没有进行回调确认(或者回调确认时网络问题)的消息进行补救式的重发,当然不可避免的就会在消息端可能会造成消息的重复消息。针对消费端重复消息,在消费端进行幂等处理。

RabbitMQ 导致消息丢失

RabbitMQ 没有对消息进行持久化,消息就可能随 RabbitMQ 宕机而丢失。解决方法就是调整 Confirm 模式,让 RabbitMQ 在完成消息持久化,消息到了硬盘上,再返回ack接口。

消费者导致消息丢失

RabbitMQ 在默认的自动提交ack配置下,如果消息处理完之前消费者挂了,消息会丢失。将自动提交ack关闭后,如果消费者处理完消息前挂了,RabbitMQ 会认为没有处理成功,再次推送给消费者处理。

我们可以顺便来看一下 Kafka 和 RocketMQ 的处理

Kafka

生产者导致消息丢失

对于 Kafka 来说,生产者基本不会弄丢消息,因为生产者发送消息会等待 Kafka 响应成功,如果响应失败,生产者会自动不断地重试。

Broker端弄丢了数据

Kafka 通常会一台 leader + 两台 follower,当生产者消息刚写入 leader 成功,但是还没同步到 follower 时,leader 宕机了,此时会重新选举 leader,新的 leader 由于还未同步到这条数据,导致该条消息丢失。

解决办法是做一些配置,当有其他 follower 同步到了消息后才通知生产者消息接收成功了。配置如下:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了

按上面的配置配置后,就可以保证在 Kafka Broker 端就算 leader 故障了,进行新 leader 选举切换时,也不会丢失数据。

消费者导致消息丢失

Kafka 消费端弄丢数据原因跟 RabbitMQ 类似,Kafka 消费者会在接收到消息的时候,会自动提交一个 offset 给 Kafka,告诉 Kafka 消息已经处理了。处理方法也跟 RabbitMQ 类似,关闭 offset 的自动提交即可。


RocketMQ

RocketMQ 导致数据丢失的原因与前面的 RabbitMQ 和 Kafka 都很类似。生产者就是因为网络抖动等原因消息投递失败,或者 RocketMQ 自身的 Master 节点故障,主备切换故障之类的,消费者则有可能是异步处理导致还未处理成功就给 RocketMQ 提交了 offset 标识消息已处理了。

在 RocketMQ 中,事务消息可以保证消息零丢失。RocketMQ 的事务消息基于这三个业务流程:生产者向RocketMQ发送 half 消息 -> RocketMQ 对 half 消息响应 -> commit/rollback。

  • 如果生产者发送 half 失败,则作重试或对消息作持久化,给用户返回失败
  • 如果RocketMQ处理失败,则生产者回滚该条消息,请求失败
  • 如果 half 发送成功,但 RocketMQ 响应没能够传回,则会通过补偿机制回调接口,决定 commit 还是 rollback
  • 包括 commit 或 rollback 时发生的失败都是用补偿机制处理

Spring AMQP

一般 Producer Consumer 设置

Producer

写一个 Config 类来进行 Binding

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    
   @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }  
}

为消息套一个 UUID,方便在ack或nack时回调处理。

private CorrelationData getCorrelationData() {
  return new CorrelationData(UUID.randomUUID().toString());
}

ConfirmCallBack 只返回标识值,因此作为 Publisher,可以维护一个 CorrelationID 到 发送信息 的 K-V 关系,从而在发送失败时再做处理,或者删除绑定关系。

Consumer

Consumer 侧的 ConnectionFactory 配置和 Producer 侧是完全一样的。BindingConfig 也完全一致:

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    
   @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }  
}

然后通过 @RabbitListener 定义响应函数:

@Configuration
@RabbitListener(queues="direct.first")
public class RabbitMqListener {
  
  @RabbitHandler
  public void handler(String message) {
    System.out.println(message);
  }
}

Binding 过程也可以在 RabbitMqListener 中完成:

@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.second"),
key="directKey2"))
public class RabbitListener {
  
  @RabbitHandler
  public void handler(String message) {
    System.out.println(message);
  }
}

回调配置

AMQP 有三个相关参数:publisher-confirms publisher-returnsmandatory

其中 mandatory 主要管的是消息投到 Exchange 之后,Exchange 无法将其路由到任何队列之后的操作,true 则把消息还给生产者,false 则丢弃。不过,按照 Spring 源码的逻辑,不开启 publisher-returns 就不会进行消息回调,因此要 mandatory publisher-returns 必须都开才能起作用。另外,returnCallback 回调时返回的直接是经过封装的 byte[],需要进行反序列化来还原原消息。

值得注意的一个小点是,Spring 中的 mandatory 是 template 的参数而不是 rabbitmq 的。

publisher-confirms 是针对消息没有被成功放入Exchange的情况,无论成功失败都会调用回调函数,通过 ConfirmCallback 进行回调。

comments powered by Disqus