【架构系列】RabbitMQ应用场景及在实际项目中如何搭建可靠的RabbitMQ架构体系

作者:后端小肥肠

创作不易,未经允许禁止转载。

1. 前言

RabbitMQ,作为一款高性能、可靠的消息队列软件,已经成为许多企业和开发团队的首选之一。它的灵活性和可扩展性使得它适用于各种应用场景,从简单的任务队列到复杂的分布式系统。本文将深入探讨RabbitMQ的应用场景以及如何在实际项目中构建可靠的RabbitMQ架构体系。

2. RabbitMQ应用场景

2.1 异步处理

在现代应用中,异步消息处理是提升用户体验和系统效率的关键。RabbitMQ可以有效地用于多种异步处理任务,例如:

  • 用户注册后的邮件发送:用户注册后,通过RabbitMQ发送一个消息到队列中,由后台服务监听并处理发送邮件的任务,从而不会延迟用户的注册过程。
  • 订单处理:在电商平台中,订单处理包括库存管理、支付确认等多个步骤,RabbitMQ可以用来在这些服务间异步传递订单信息,确保处理流程的连续性和效率。

    2.2 应用解耦

    RabbitMQ支持多种通信模式,如点对点、发布/订阅等,这些模式帮助系统各部分保持低耦合度,便于独立扩展和维护。例如:

    • 微服务架构中的服务通信:在微服务架构中,RabbitMQ允许各个微服务之间通过消息进行交互,而不是直接调用对方的API,这种方式减少了服务间的直接依赖。

      2.3 流量削峰

      在流量高峰期,如促销或大型活动期间,系统可能会遭遇巨大的访问压力。RabbitMQ可以用来缓冲入站消息,如订单或请求,从而保护后端服务不被过载:

      • 秒杀活动中的订单处理:在秒杀活动中,大量的购买请求可以先进入RabbitMQ队列,系统根据处理能力逐步从队列中取出并处理这些请求,有效避免了系统崩溃。

        2.4 通信与集成

        RabbitMQ提供了一个灵活的消息传递系统,可以集成复杂的企业系统。它支持多种协议和广泛的开发语言库,适用于:

        • 跨平台通信:在不同操作系统和不同编程语言编写的应用之间,RabbitMQ可以作为消息传递中间件,实现这些系统的有效通信。

          2.5 日志处理和应用监控

          RabbitMQ也常用于系统日志处理和监控。它可以聚合各服务产生的日志信息,并传输到日志分析系统:

          • 集中式日志管理:通过RabbitMQ,各个系统和应用的日志可以被统一收集至一个中央处理位置,便于进行日志分析、监控和报警。

            2.6 数据同步

            RabbitMQ 在数据同步中扮演着重要的角色,特别是在分布式系统中,它能够确保数据在多个系统或组件之间保持一致性和最新状态。这对于维护数据的完整性和及时性至关重要。例如:

            • 数据库同步:在多地数据中心运营的情况下,RabbitMQ 可以用来同步不同地点的数据库。通过消息队列,当一个数据中心的数据库更新时,相应的变更可以通过 RabbitMQ 发送到其他数据中心,从而保证所有地点的数据一致。

            • 实时数据复制:在金融服务或电子商务平台,实时数据复制是保证高可用性和灾难恢复的关键。使用 RabbitMQ,可以实现高效的数据复制策略,如将交易数据从主系统复制到备份系统或分析数据库。

            • 缓存刷新:在使用缓存提高应用性能的情况下,RabbitMQ 可以用来在数据更新时自动通知系统刷新缓存。这样,用户总是能够获取到最新的数据,而不是过时的缓存数据。

              通过这些应用场景,可以看出RabbitMQ在现代软件架构中扮演的多样化角色,不仅增强了系统的可靠性和伸缩性,还提高了开发和运维的效率。

              3. 在项目中如何搭建稳定RabbitMQ架构体系

              3.1. RabbitMQ安装

              网上RabbitMQ安装教程很多,本文只简述基于docker安装的核心步骤:

              1. 环境准备,准备Cenos虚拟机,我的是7.x版本:

              2. 拉取或解压RabbitMQ镜像:

              3. 运行docker容器:

              docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /home/docker/rabbitmq/rabbitmq:/var/lib/rabbitmq -v /home/docker/rabbitmq/rabbitmq_conf:/etc/rabbitmq   -e RABBITMQ_DEFAULT_VHOST=km_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:latest

              4. 进入容器 :

               docker exec -it 容器id /bin/bash

              5. 运行rabbitmq-plugins enable rabbitmq_management(解决无法访问网页端15672端口问题),即可完成RabbitMQ安装。

              3.2. 总体技术流程

              本文以异步处理应用场景为例,展示如何构建稳定可靠的RabbitMQ架构体系:

              上述流程为异步消息通信的技术流程,在异步消息通信中当消息投递后就立刻返回了结果,我们无法获取消息消费的具体过程,这就导致了虽然我们可以即刻获取程序返回状态,但是程序执行细节或是否失败无法通过程序响应返回的方式获取。

              基于以上RabbitMQ异步通信的优缺点,我们要搭建一个可靠的RabbitMQ架构需要从以下几个方面入手:

              生产者稳定架构:

              1. 消息投递回调监听。创建消息投递回调监听函数,监听生产者投递的消息是否投递成功。

              2. 消息确认表创建。创建消息确认表(message_confirmation),记录消息投递状态,其中字段status反应了是否投递成功(0为为投递成功,1为投递成功)。

              CREATE TABLE "public"."message_confirmation" (
                "id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
                "status" int4,
                "create_time" timestamp(6),
                "update_time" timestamp(6),
                "message" varchar(255) COLLATE "pg_catalog"."default",
                CONSTRAINT "message_confirmation_pkey" PRIMARY KEY ("id")
              )
              ;
              ALTER TABLE "public"."message_confirmation" 
                OWNER TO "postgres";

              3. 创建定时任务监听消息投递确认表。每隔一段时间遍历消息确认表,筛选出status为0的消息数据,进行重复投递动作。

              消费者稳定架构

              1. 死信队列运用。由于网络或外部因素导致消息消费失败,可将消息投递至死信队列进行二次消费。

              2. 日志表记录。如死信队列也消费失败,可将消息写入日志表(message_error)后进行手动消费,由技术人员获取日志表中消费失败记录,排查消费失败原因。

              CREATE TABLE "public"."message_error" (
                "id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
                "message_id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
                "error_log" text COLLATE "pg_catalog"."default",
                "create_time" timestamp(6),
                "update_time" timestamp(6),
                CONSTRAINT "message_error_pkey" PRIMARY KEY ("id")
              )
              ;
              ALTER TABLE "public"."message_error" 
                OWNER TO "postgres";

              3.3. 实战讲解

              3.3.1. 环境配置
              3.3.1.1. 所需版本工具
              3.3.1.2. pom依赖
                org.springframework.boot spring-boot-starter-web   com.baomidou mybatis-plus-boot-starter   org.projectlombok lombok   org.springframework.boot spring-boot-starter-validation   org.springframework.boot spring-boot-starter-amqp   org.postgresql postgresql 
              3.3.2. 生产者核心代码讲解
              3.3.2.1. yml配置
              server:
                port: 8873
              spring:
                datasource:
                  url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_producer
                  username: postgres
                  password: postgres
                  driver-class-name: org.postgresql.Driver
                rabbitmq:
                  port: 5672
                  host: 192.168.10.11
                  username: admin
                  password: admin
                  virtual-host: my_vhost
                  publisher-confirm-type: correlated
                  listener:
                    simple:
                      acknowledge-mode: manual
              3.3.2.2. 编写回调函数
               @PostConstruct
                  public void regCallback() {
                      // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
                      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                          @Override
                          public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                              log.info("cause:"+cause);
                              // 如果ack为true代表消息已经收到
                              String messageId = correlationData.getId();
                              if (!ack) {
                                  // 这里可能要进行其他的方式进行存储
                                  log.error("MQ队列应答失败,messageId是:" + messageId);
                                  return;
                              }
                              try {
                                  MessageConfirmation messageConfirmation = messageConfirmationMapper.selectById(messageId);
                                  messageConfirmation.setStatus(1);
                                  int count=messageConfirmationMapper.updateById(messageConfirmation);
                                  if (count == 1) {
                                      log.info("本地消息状态修改成功,消息成功投递到消息队列中...");
                                  }
                              } catch (Exception ex) {
                                  log.error("本地消息状态修改失败,出现异常:" + ex.getMessage());
                              }
                          }
                      });
                  }

              上述回调函数主要用于监听生产者发送的消息是否发送成功,并将消息发送状态更新至消息确认表中。

              3.3.2.3. 编写定时任务监听消息确认表
              @Configuration
              @EnableScheduling
              @Slf4j
              public class confirmMessageTaskService {
                  @Autowired
                  private RabbitTemplate rabbitTemplate;
                  @Autowired
                  MessageConfirmationMapper messageConfirmationMapper;
                  @Scheduled(cron = "0 */1 * * * ?")
                  public void sendMessage(){
                      // 把消息为0的状态消息重新查询出来,投递到MQ中。
                      LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
                      queryWrapper.eq(MessageConfirmation::getStatus, 0);
                      List noConfirmMessages = messageConfirmationMapper.selectList(queryWrapper)
                              .stream()
                              .collect(Collectors.toList());
                      noConfirmMessages.forEach((noConfirmMessage)->{
                          rabbitTemplate.convertAndSend("xz_push_exchange","", JsonUtil.obj2String(noConfirmMessage),
                                  new CorrelationData(noConfirmMessage.getId()));
                      });
                  }
              }

               上述定时任务为每分钟遍历消息确认表,将status=0的消息筛选出来进行消息投递。

              3.3.2.4. 消息投递
               public void sendMessage(MessageConfirmation messageConfirmation) {
                      messageConfirmationMapper.insert(messageConfirmation);
                      rabbitTemplate.convertAndSend("xfc_fanout_exchange","", JsonUtil.obj2String(messageConfirmation),
                              new CorrelationData(messageConfirmation.getId()));
                  }

              3.4. 消费者核心代码讲解

              3.4.1. yml配置
              server:
                port: 8872
              spring:
                datasource:
                  url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_consumer
                  username: postgres
                  password: postgres
                  driver-class-name: org.postgresql.Driver
                rabbitmq:
                  port: 5672
                  host: 192.168.10.11
                  username: admin
                  password: admin
                  virtual-host: my_vhost
                  listener:
                    simple:
                      acknowledge-mode: manual
              mybatis-plus:
                typeAliasesPackage: com.xfc.consumer.entities
                mapper-locations: classpath:mapper/*.xml
              3.4.2. RabbitMQ配置类
              @Configuration
              public class RabbitMQConfig {
                  /**
                   * 死信队列
                   * @return
                   */
                  @Bean
                  public FanoutExchange deadExchange() {
                      return new FanoutExchange("dead_xfc_fanout_exchange", true, false);
                  }
                  @Bean
                  public Queue deadXfcQueue() {
                      return new Queue("dead.xfc.queue", true);
                  }
                  @Bean
                  public Binding bindDeadXfc() {
                      return BindingBuilder.bind(deadXfcQueue()).to(deadExchange());
                  }
                  /**
                   * 队列
                   * @return
                   */
                  @Bean
                  public FanoutExchange fanoutExchange() {
                      return new FanoutExchange("xfc_fanout_exchange", true, false);
                  }
                  @Bean
                  public Queue xfcQueue() {
                      Map args = new HashMap<>();
                      args.put("x-dead-letter-exchange", "dead_xfc_fanout_exchange");
                      return new Queue("xfc.queue", true, false, false, args);
                  }
                  @Bean
                  public Binding bindXfc() {
                      return BindingBuilder.bind(xfcQueue()).to(fanoutExchange());
                  }
              }
              

              上述代码为RabbitMQ配置类,用于在项目初始化时生成相应的交换机和队列。 

              3.4.3. 队列消费
              @Service
              @Slf4j
              public class XfcMqConsumer {
                  @RabbitListener(queues = {"xfc.queue"})
                  public void messageconsumer(String message, Channel channel,
                                              CorrelationData correlationData,
                                              @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
                      MessageConfirmation messageConfirmation=null;
                      try {
                          log.info("收到MQ的消息是: " + message );
                          messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
                          /**
                           * 编写业务逻辑
                           */
                          
                      } catch (Exception e) {
                          e.printStackTrace();
                          log.error("消息投放到死信队列"+e.getMessage(),e);
                          channel.basicNack(tag,false,false);// 死信队列
                      }
                  }
              }
              3.4.4. 死信队列消费
              @Service
              @Slf4j
              public class DeadMqConsumer {
                  @Autowired
                  MessageErrorMapper messageErrorMapper;
                  @RabbitListener(queues = {"dead.xfc.queue"})
                  public void messageconsumer(String message, Channel channel,
                                              CorrelationData correlationData,
                                              @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
                      MessageConfirmation messageConfirmation=null;
                      try {
                          log.info("收到MQ的消息是: " + message );
                          messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
                          /**
                           * 编写业务逻辑
                           */
                      } catch (Exception e) {
                          e.printStackTrace();
                          /**
                           * 写入message_error
                           */
                          messageErrorMapper.insert(new MessageError(messageConfirmation.getId(),e.getMessage(),new Date()));
                          channel.basicNack(tag,false,false);// 死信队列
                      }
                  }
              }
              

              3.5 效果测试

              以上代码编写完成后需要进行架构效果测试,其步骤如下:

              1. 消息投递测试

              上图调用了消息投递接口。

              在消息确认表中,新增了一条消息且status=1,代表该条消息已投递成功。

              2. 消费者正常消费测试

              3. 消费异常测试

              上图可看出消息消费异常投入到了死信队列。

              在死信队列中依然消费失败。

              消费失败后成功写入了日志表。

              4. 结语

              本文讲解了RabbitMQ应用场景以及在异步处理场景中如何搭建稳定的RabbitMQ架构体系,逐步详细的给出了生产者及消费者端代码并在文章最后对架构效果进行了测试,感兴趣的同学可根据代码进行实操,有疑问和其他见解也可在评论区留言,我看到都会回复。