RabbitMQ 延时消息实现

1. 实现方式

1. 设置队列过期时间:延迟队列消息过期 + 死信队列,所有消息过期时间一致
2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞
   需要额外安装 `rabbitmq_delayed_message_exchange` 插件才能解决此问题
  • 导入Spring 集成RabbitMQ MAEVN
     org.springframework.boot spring-boot-starter-amqp 2.2.5.RELEASE

    2. 设置队列过期时间:延迟队列消息过期 + 死信队列

    推送消息至延迟队列 -> 消息过期自动推送到死信队列 -> 消费死信队列

    2.1. MQ配置信息

    2.1.1. 自定义队列配置

    …/bootstrap.yml

    # rabbitmq自定义配置
    rabbitmq:
      ttlExchange: medical_dev_ttl_topic_change
      ttlKey: dev_ttl
      ttlQueue: medical.dev.ttl.topic.queue
      delayExpireTime: 600
      ttlQueueSize: 10000
      deadExchange: medical_dev_dead_topic_change
      deadKey: dev_dead
      deadQueue: medical.dev.dead.topic.queue
    
    2.1.2. 读取自定义MQ配置信息
    /**
     * amqp配置文件
     */
    @Data
    @Component
    @ConfigurationProperties(prefix = "rabbitmq")
    public class MyConfigProperties { /**
         * 延迟队列
         */
        public String ttlExchange;
        public String ttlKey;
        public String ttlQueue;
        private Integer delayExpireTime;
        public Integer ttlQueueSize;
        /**
         * 死信队列
         */
        public String deadExchange;
        public String deadKey;
        public String deadQueue;
    }
    

    2.2. 配置文件自动生成队列

    2.2.1. 延迟队列
    import com.awsa.site.mq.MyConfigProperties;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import javax.annotation.Resource;
    import java.util.HashMap;
    /**
     * 延迟队列配置文件
     * 
     * @author mingAn.xie
     */
    @Configuration
    public class RabbitMQConfigTTL { @Resource
        MyConfigProperties myConfigProperties;
        // 1: 声明交换机
        @Bean
        public TopicExchange ttlTopicExchange(){ return new TopicExchange(myConfigProperties.getTtlExchange());
        }
        // 2: 声明队列
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        @Bean
        public Queue ttlTopicduanxinQueue(){ HashMap args = new HashMap<>();
            // 给队列设置消息过期时间:毫秒值
            args.put("x-message-ttl", mqConfigProperties.getDelayExpireTime() * 1000);
            // 设置队列最大长度
            args.put("x-max-length", myConfigProperties.getTtlQueueSize());
            // 设置死信队列交换机名称
            // 当消息在一个队列中变成死信后,它能就发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列被称之为死信队列
            // 编程死信队列的原因:消息被拒绝,消息过期,队列达到最大长度
            args.put("x-dead-letter-exchange", myConfigProperties.getDeadExchange());
            // 设置死信队列路由key
            args.put("x-dead-letter-routing-key", myConfigProperties.getDeadKey());
            return new Queue(myConfigProperties.getTtlQueue(), true, false, false, args);
        }
        // 3: 绑定对用关系
        @Bean
        public Binding ttlTopicsmsBinding(){ return BindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey());
        }
    }
    
    2.2.2. 死信队列
    import com.awsa.site.mq.MyConfigProperties;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import javax.annotation.Resource;
    /**
     * 死信队列配置文件
     * 
     * @author mingAn.xie
     */
    @Configuration
    public class RabbitMQConfigDead { @Resource
        MyConfigProperties myConfigProperties;
        // 1: 声明交换机
        @Bean
        public TopicExchange deadTopicExchange(){ return new TopicExchange(myConfigProperties.getDeadExchange());
        }
        // 2: 声明队列
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        @Bean
        public Queue deadTopicduanxinQueue(){ return new Queue(myConfigProperties.getDeadQueue(), true);
        }
        // 3: 绑定对用关系
        @Bean
        public Binding deadTopicsmsBinding(){ return BindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey());
        }
    }
    

    2.3. 生产者推送消息

    import com.awsa.site.mq.MyConfigProperties;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    import javax.annotation.Resource;
    /**
     * RabbitMQ生产者推送消息类
     * 
     * @author xiemingan
     */
    @Component
    @Slf4j
    public class RabbitmqProducer { @Resource
        private RabbitTemplate rabbitTemplate;
        @Resource
        private MyConfigProperties myConfigProperties;
        /**
         * @param pushMessage 推送消息体
         */
        public void pushTtlMessage(String pushMessage) {// 推送消息至交换机,并指定路由key
            rabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);
            log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}", myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);
        }
    }
    

    2.4. 消费者处理消息

    import lombok.extern.log4j.Log4j2;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    /**
     * @author mingAn.xie
     */
    @Log4j2
    @Component
    public class RabbitmqConsumer { /**
         * 消费死信队列
         * @param message 消息体
         */
        @RabbitListener(queues = "${rabbitmq.deadQueue}")
        public void pushMessages(Message message) { String body = new String(message.getBody()).trim();
            if (StringUtils.isEmpty(body)){ return;
            }
            log.info("MQ消息消费, RabbitmqConsumer.pushMessages() : {}", body);
        }
    }
    

    3. 设置消息的过期时间

    设置交换机类型为 x-delayed-type,推送消息至交换机,直连队列消费

    3.1. 安装插件 rabbitmq_delayed_message_exchange

    前言:这里默认使用环境为 Liunx 系统 Docker 安装 RabbitMQ

    具体可以参考这篇文章:Docker 安装 RabbitMQ 挂载配置文件

    安装插件版本需要与RabbitMQ版本一致,否则可能会导致安装失败,可先进入RabbitMQ容器中查看其他插件版本

    插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    • 这里以最新版本 v3.13.0 举例
      # 下载插件
      wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
      # 将插件复制进容器中: rabbitmq_xxxxxx
      docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins
      # 进入容器: rabbitmq_xxxxxx
      docker exec -it rabbitmq_xxxxxx bash
      cd plugins
      # 查询插件列表, 此处可看到插件的版本
      rabbitmq-plugins list
      # 启用插件
      rabbitmq-plugins enable rabbitmq_delayed_message_exchange
      
      • 交换机类型中出现 x-delayed-type 表示安装成功

        3.2. MQ配置信息

        3.2.1. 自定义队列配置

        …/bootstrap.yml

        #mq队列自定义配置
        rabbitmq:
          saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchange
          saveTaskTtlKey: ey240001_pro_save_task_ttl
          saveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queue
          saveTaskTtlQueueSize: 10000
        
        3.2.2. 读取自定义MQ配置信息
        /**
         * amqp配置文件
         *
         * @author mingAn.xie
         */
        @Data
        @Component
        @ConfigurationProperties(prefix = "rabbitmq")
        public class MyConfigProperties { /**
             * 任务待办生成延时队列
             */
            public String saveTaskTtlExchange;
            public String saveTaskTtlKey;
            public String saveTaskTtlQueue;
            public Integer saveTaskTtlQueueSize;
        }
        

        3.3. 配置文件生成 x-delayed-type 交换机

        import com.awsa.site.mq.MyConfigProperties;
        import org.springframework.amqp.core.*;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import javax.annotation.Resource;
        import java.util.HashMap;
        import java.util.Map;
        /**
         * x-delayed-type 交换机延迟队列配置
         * 
         * @author mingAn.xie
         */
        @Configuration
        public class RabbitMQConfigSaveTaskTtl { @Resource
            MyConfigProperties myConfigProperties;
            // 1: 声明交换机
            @Bean
            public CustomExchange saveTaskTopicExchange() { Map args = new HashMap<>();
                // 设置延迟队列插件类型:按过期时间消费
                args.put("x-delayed-type", "direct");
                // 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数
                return new CustomExchange(myConfigProperties.getSaveTaskTtlExchange(), "x-delayed-message", true, false, args);
            }
            // 2: 声明队列
            // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
            // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
            // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
            @Bean
            public Queue saveTaskTopicduanxinQueue() { return new Queue(myConfigProperties.getSaveTaskTtlQueue(), true, false, false);
            }
            // 3: 绑定对用关系
            @Bean
            public Binding saveTaskTopicsmsBinding() { return BindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs();
            }
        }
        

        3.4. 生产者推送消息

        import com.awsa.site.mq.MyConfigProperties;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.amqp.core.MessagePostProcessor;
        import org.springframework.amqp.rabbit.core.RabbitTemplate;
        import org.springframework.stereotype.Component;
        import javax.annotation.Resource;
        /**
         * 生产者推送消息类
         * 
         * @author xiemingan
         */
        @Component
        @Slf4j
        public class RabbitmqProducer { @Resource
            private RabbitTemplate rabbitTemplate;
            @Resource
            private MyConfigProperties myConfigProperties;
            /**
             * @param pushMessage 推送消息体
             * @param ttlTime     延时时间(毫秒值)
             */
            public void pushTtlMessage(String pushMessage, long ttlTime) { ttlTime = ttlTime <= 0 ? 1000 : ttlTime;
                // 3.1.推送MQ延迟消息队列
                long finalTtlTime = ttlTime;
                MessagePostProcessor messagePostProcessor = message -> { // 设置延迟时间
                    message.getMessageProperties().setDelay((int) finalTtlTime);
                    return message;
                };
                rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor);
                log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}", myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime);
            }
        }
        

        3.5. 消费者处理消息

        import lombok.extern.log4j.Log4j2;
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.rabbit.annotation.RabbitListener;
        import org.springframework.stereotype.Component;
        import org.springframework.util.StringUtils;
        /**
         * @author mingAn.xie
         */
        @Log4j2
        @Component
        public class RabbitmqConsumer { /**
             * 消费延时消息
             * @param message 消息体
             */
            @RabbitListener(queues = "${rabbitmq.saveTaskTtlQueue}")
            public void pushMessages(Message message) { String body = new String(message.getBody()).trim();
                if (StringUtils.isEmpty(body)) { return;
                }
                log.info("MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}", body);
            }
        }