1. 实现方式
1. 设置队列过期时间:延迟队列消息过期 + 死信队列,所有消息过期时间一致 2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞 需要额外安装 `rabbitmq_delayed_message_exchange` 插件才能解决此问题
- 导入Spring 集成RabbitMQ MAEVN
org.springframework.boot spring-boot-starter-amqp 2.2.5.RELEASE 2. 设置队列过期时间:延迟队列消息过期 + 死信队列
推送消息至延迟队列 -> 消息过期自动推送到死信队列 -> 消费死信队列
2.1. MQ配置信息
2.1.1. 自定义队列配置
…/bootstrap.yml
# rabbitmq自定义配置 rabbitmq: ttlExchange: medical_dev_ttl_topic_change ttlKey: dev_ttl ttlQueue: medical.dev.ttl.topic.queue delayExpireTime: 600 ttlQueueSize: 10000 deadExchange: medical_dev_dead_topic_change deadKey: dev_dead deadQueue: medical.dev.dead.topic.queue
2.1.2. 读取自定义MQ配置信息
/** * amqp配置文件 */ @Data @Component @ConfigurationProperties(prefix = "rabbitmq") public class MyConfigProperties { /** * 延迟队列 */ public String ttlExchange; public String ttlKey; public String ttlQueue; private Integer delayExpireTime; public Integer ttlQueueSize; /** * 死信队列 */ public String deadExchange; public String deadKey; public String deadQueue; }
2.2. 配置文件自动生成队列
2.2.1. 延迟队列
import com.awsa.site.mq.MyConfigProperties; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import java.util.HashMap; /** * 延迟队列配置文件 * * @author mingAn.xie */ @Configuration public class RabbitMQConfigTTL { @Resource MyConfigProperties myConfigProperties; // 1: 声明交换机 @Bean public TopicExchange ttlTopicExchange(){ return new TopicExchange(myConfigProperties.getTtlExchange()); } // 2: 声明队列 // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 @Bean public Queue ttlTopicduanxinQueue(){ HashMap
args = new HashMap<>(); // 给队列设置消息过期时间:毫秒值 args.put("x-message-ttl", mqConfigProperties.getDelayExpireTime() * 1000); // 设置队列最大长度 args.put("x-max-length", myConfigProperties.getTtlQueueSize()); // 设置死信队列交换机名称 // 当消息在一个队列中变成死信后,它能就发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列被称之为死信队列 // 编程死信队列的原因:消息被拒绝,消息过期,队列达到最大长度 args.put("x-dead-letter-exchange", myConfigProperties.getDeadExchange()); // 设置死信队列路由key args.put("x-dead-letter-routing-key", myConfigProperties.getDeadKey()); return new Queue(myConfigProperties.getTtlQueue(), true, false, false, args); } // 3: 绑定对用关系 @Bean public Binding ttlTopicsmsBinding(){ return BindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey()); } } 2.2.2. 死信队列
import com.awsa.site.mq.MyConfigProperties; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * 死信队列配置文件 * * @author mingAn.xie */ @Configuration public class RabbitMQConfigDead { @Resource MyConfigProperties myConfigProperties; // 1: 声明交换机 @Bean public TopicExchange deadTopicExchange(){ return new TopicExchange(myConfigProperties.getDeadExchange()); } // 2: 声明队列 // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 @Bean public Queue deadTopicduanxinQueue(){ return new Queue(myConfigProperties.getDeadQueue(), true); } // 3: 绑定对用关系 @Bean public Binding deadTopicsmsBinding(){ return BindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey()); } }
2.3. 生产者推送消息
import com.awsa.site.mq.MyConfigProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * RabbitMQ生产者推送消息类 * * @author xiemingan */ @Component @Slf4j public class RabbitmqProducer { @Resource private RabbitTemplate rabbitTemplate; @Resource private MyConfigProperties myConfigProperties; /** * @param pushMessage 推送消息体 */ public void pushTtlMessage(String pushMessage) {// 推送消息至交换机,并指定路由key rabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage); log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}", myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage); } }
2.4. 消费者处理消息
import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; /** * @author mingAn.xie */ @Log4j2 @Component public class RabbitmqConsumer { /** * 消费死信队列 * @param message 消息体 */ @RabbitListener(queues = "${rabbitmq.deadQueue}") public void pushMessages(Message message) { String body = new String(message.getBody()).trim(); if (StringUtils.isEmpty(body)){ return; } log.info("MQ消息消费, RabbitmqConsumer.pushMessages() : {}", body); } }
3. 设置消息的过期时间
设置交换机类型为 x-delayed-type,推送消息至交换机,直连队列消费
3.1. 安装插件 rabbitmq_delayed_message_exchange
前言:这里默认使用环境为 Liunx 系统 Docker 安装 RabbitMQ
具体可以参考这篇文章:Docker 安装 RabbitMQ 挂载配置文件
安装插件版本需要与RabbitMQ版本一致,否则可能会导致安装失败,可先进入RabbitMQ容器中查看其他插件版本
插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 这里以最新版本 v3.13.0 举例
# 下载插件 wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez # 将插件复制进容器中: rabbitmq_xxxxxx docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins # 进入容器: rabbitmq_xxxxxx docker exec -it rabbitmq_xxxxxx bash cd plugins # 查询插件列表, 此处可看到插件的版本 rabbitmq-plugins list # 启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 交换机类型中出现 x-delayed-type 表示安装成功
3.2. MQ配置信息
3.2.1. 自定义队列配置
…/bootstrap.yml
#mq队列自定义配置 rabbitmq: saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchange saveTaskTtlKey: ey240001_pro_save_task_ttl saveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queue saveTaskTtlQueueSize: 10000
3.2.2. 读取自定义MQ配置信息
/** * amqp配置文件 * * @author mingAn.xie */ @Data @Component @ConfigurationProperties(prefix = "rabbitmq") public class MyConfigProperties { /** * 任务待办生成延时队列 */ public String saveTaskTtlExchange; public String saveTaskTtlKey; public String saveTaskTtlQueue; public Integer saveTaskTtlQueueSize; }
3.3. 配置文件生成 x-delayed-type 交换机
import com.awsa.site.mq.MyConfigProperties; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /** * x-delayed-type 交换机延迟队列配置 * * @author mingAn.xie */ @Configuration public class RabbitMQConfigSaveTaskTtl { @Resource MyConfigProperties myConfigProperties; // 1: 声明交换机 @Bean public CustomExchange saveTaskTopicExchange() { Map
args = new HashMap<>(); // 设置延迟队列插件类型:按过期时间消费 args.put("x-delayed-type", "direct"); // 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数 return new CustomExchange(myConfigProperties.getSaveTaskTtlExchange(), "x-delayed-message", true, false, args); } // 2: 声明队列 // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 @Bean public Queue saveTaskTopicduanxinQueue() { return new Queue(myConfigProperties.getSaveTaskTtlQueue(), true, false, false); } // 3: 绑定对用关系 @Bean public Binding saveTaskTopicsmsBinding() { return BindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs(); } } 3.4. 生产者推送消息
import com.awsa.site.mq.MyConfigProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 生产者推送消息类 * * @author xiemingan */ @Component @Slf4j public class RabbitmqProducer { @Resource private RabbitTemplate rabbitTemplate; @Resource private MyConfigProperties myConfigProperties; /** * @param pushMessage 推送消息体 * @param ttlTime 延时时间(毫秒值) */ public void pushTtlMessage(String pushMessage, long ttlTime) { ttlTime = ttlTime <= 0 ? 1000 : ttlTime; // 3.1.推送MQ延迟消息队列 long finalTtlTime = ttlTime; MessagePostProcessor messagePostProcessor = message -> { // 设置延迟时间 message.getMessageProperties().setDelay((int) finalTtlTime); return message; }; rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor); log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}", myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime); } }
3.5. 消费者处理消息
import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; /** * @author mingAn.xie */ @Log4j2 @Component public class RabbitmqConsumer { /** * 消费延时消息 * @param message 消息体 */ @RabbitListener(queues = "${rabbitmq.saveTaskTtlQueue}") public void pushMessages(Message message) { String body = new String(message.getBody()).trim(); if (StringUtils.isEmpty(body)) { return; } log.info("MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}", body); } }
- 交换机类型中出现 x-delayed-type 表示安装成功
- 这里以最新版本 v3.13.0 举例