一文带你彻底搞懂SpringBoot-RabbitMQ(1),java多线程锁面试题

3.订阅者会轮流收到信息

receiver01 message = direct

receiver02 message = direct

receiver01 message = direct

receiver02 message = direct

receiver01 message = direct

receiver02 message = direct

复制代码

2.2 topic - 主题交换器


2.2.1 消息发送者

声明topic交换器

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class BlogPublisherConfig {

@Bean

public Exchange blogTopicExchange() {

return ExchangeBuilder.topicExchange(“exchange.topic.springboot.blog”).build();

}

}

复制代码

声明controller

@RequestMapping(“/topic”)

public Object topic(String routingKey, String message) {

rabbitTemplate.convertAndSend(“exchange.topic.springboot.blog”, routingKey, message);

return routingKey + " : " + message;

}

复制代码

2.2.2 消息接收者

声明交换器、三个队列、队列的绑定

  • *:匹配一个串

  • #:匹配一个或者多个串

    import org.springframework.amqp.core.Binding;

    import org.springframework.amqp.core.BindingBuilder;

    import org.springframework.amqp.core.Queue;

    import org.springframework.amqp.core.TopicExchange;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

    import javax.annotation.Resource;

    @Configuration

    public class BlogSubscriberConfig {

    /**

    • 主题交换器

      */

      @Bean

      public TopicExchange blogTopicExchange() {

      return ExchangeBuilder.topicExchange(“exchange.topic.springboot.blog”).build();

      }

      @Bean

      public Queue blogJavaQueue() {

      return QueueBuilder.durable(“queue.topic.springboot.blog.java”).build();

      }

      @Bean

      public Queue blogMqQueue() {

      return QueueBuilder.durable(“queue.topic.springboot.blog.mq”).build();

      }

      @Bean

      public Queue blogAllQueue() {

      return QueueBuilder.durable(“queue.topic.springboot.blog.all”).build();

      }

      @Bean

      @Resource

      public Binding blogJavaBinding(TopicExchange blogTopicExchange, Queue blogJavaQueue) {

      return BindingBuilder.bind(blogJavaQueue).to(blogTopicExchange).with(“springboot.blog.java.routing.key”);

      }

      @Bean

      @Resource

      public Binding blogMqBinding(TopicExchange blogTopicExchange, Queue blogMqQueue) {

      return BindingBuilder.bind(blogMqQueue).to(blogTopicExchange).with(“springboot.blog.mq.routing.key”);

      }

      @Bean

      @Resource

      public Binding blogAllBinding(TopicExchange blogTopicExchange, Queue blogAllQueue) {

      // #: 匹配一个或者多个 *:匹配一个

      return BindingBuilder.bind(blogAllQueue).to(blogTopicExchange).with(“springboot.blog.#.routing.key”);

      }

      }

      复制代码

      监听队列

      import org.springframework.amqp.rabbit.annotation.RabbitListener;

      import org.springframework.stereotype.Service;

      @Service

      public class BlogService {

      /**

      • topic监听

        */

        @RabbitListener(queues = “queue.topic.springboot.blog.java”)

        public void blogJavaListener(String message) {

        System.out.println("blogJavaListener message = " + message);

        }

        @RabbitListener(queues = “queue.topic.springboot.blog.mq”)

        public void blogMqListener(String message) {

        System.out.println("blogMqListener message = " + message);

        }

        @RabbitListener(queues = “queue.topic.springboot.blog.all”)

        public void blogAllaListener(String message) {

        System.out.println("blogAllListener message = " + message);

        }

        }

        复制代码

        2.2.3 消息发布订阅

        1. 发布者发送消息
        • http://localhost:8071/topic?routingKey=springboot.blog.java.routing.key&message=hello

        • http://localhost:8071/topic?routingKey=springboot.blog.mq.routing.key&message=hello

          1. 订阅者收到消息
          • 全匹配和模糊匹配

          • 全匹配无论是哪个都会被匹配上

            blogJavaListener message = hello

            blogAllListener message = hello

            blogAllListener message = hello

            blogMqListener message = hello

            复制代码

            2.3 fanout - 广播交换器


            2.3.1 消息发送者

            声明fanout交换器

            import org.springframework.amqp.core.FanoutExchange;

            import org.springframework.context.annotation.Bean;

            import org.springframework.context.annotation.Configuration;

            @Configuration

            public class NoticePublisherConfig {

            @Bean

            public Exchange radioFanoutExchange() {

            return ExchangeBuilder.fanoutExchange(“exchange.fanout.springboot.radio”).build();

            }

            }

            复制代码

            声明controller

            @RequestMapping(“/fanout”)

            public Object fanout(String message) {

            rabbitTemplate.convertAndSend(“exchange.fanout.springboot.radio”, null, message);

            return message;

            }

            复制代码

            2.32 消息接收者

            创建交换器、路由键、绑定

            • 不需要使用路由键

              import org.springframework.amqp.core.*;

              import org.springframework.context.annotation.Bean;

              import org.springframework.context.annotation.Configuration;

              import javax.annotation.Resource;

              @Configuration

              public class NoticeSubscriberConfig {

              @Bean

              public FanoutExchange radioFanoutExchange() {

              return ExchangeBuilder.fanoutExchange(“exchange.fanout.springboot.radio”).build();

              }

              @Bean

              public Queue radioQueue() {

              return QueueBuilder.durable(“queue.fanout.springboot.radio”).build();

              }

              @Bean

              @Resource

              public Binding radioBinding(FanoutExchange radioFanoutExchange, Queue radioQueue) {

              // 广播交换器绑定没有路由键,只要绑定即可收到

              return BindingBuilder.bind(radioQueue).to(radioFanoutExchange);

              }

              }

              复制代码

              监听队列

              import org.springframework.amqp.rabbit.annotation.RabbitListener;

              import org.springframework.stereotype.Service;

              @Service

              public class NoticeService {

              @RabbitListener(queues = “queue.fanout.springboot.radio”)

              public void radioListener(String message) {

              System.out.println("radioListener message = " + message);

              }

              }

              复制代码

              2.3.3 消息发布订阅

              发布者发送消息

              • http://localhost:8071/fanout?message=fanout

                订阅者收到消息

                radioListener message = fanout

                复制代码

                2.4 headers - 头交换器


                2.4.1 消息发送者

                1. headers模式通过头匹配,会忽略路由键

                2. 发送者需要创建队列

                import org.springframework.amqp.core.HeadersExchange;

                import org.springframework.context.annotation.Bean;

                import org.springframework.context.annotation.Configuration;

                @Configuration

                public class HeadersPublisherConfig {

                @Bean

                public Exchange radioHeadersExchange() {

                return ExchangeBuilder.headersExchange(“exchange.headers.springboot.headers”).build();

                }

                }

                复制代码

                创建controller发送消息

                • MessageProperties和Message包是:org.springframework.amqp.core

                • 需要创建MessageProperties对象用于设置头信息

                • Message用于存储消息和消息属性信息

                  @RequestMapping(“/headers”)

                  public Object headers(@RequestParam Map param) {

                  MessageProperties properties = new MessageProperties();

                  properties.setHeader(“name”, param.get(“name”));

                  properties.setHeader(“token”, param.get(“token”));

                  Message mqMessage = new Message(param.get(“message”).getBytes(), properties);

                  rabbitTemplate.convertAndSend(“exchange.headers.springboot.headers”, null, mqMessage);

                  return properties;

                  }

                  复制代码

                  2.4.2 消息接收者

                  接收者和上面三种一样,同样需要声明交换器、队列、绑定

                  • 在队列绑定时需要使用不同规则

                  • BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()

                  • 所有字段属性和值全部匹配

                  • BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match()

                  • 任意字段属性和值全部匹配

                  • BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll(“name”, “token”).exist()

                  • 指定所有属性字段存在

                  • BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny(“name”, “token”).exist()

                  • 指定任意属性存在

                  • headerMap中存放的属性就是发送者中封装的属性,属性完全匹配则正确路由到此处

                    import org.springframework.amqp.core.Binding;

                    import org.springframework.amqp.core.BindingBuilder;

                    import org.springframework.amqp.core.HeadersExchange;

                    import org.springframework.amqp.core.Queue;

                    import org.springframework.context.annotation.Bean;

                    import org.springframework.context.annotation.Configuration;

                    import javax.annotation.Resource;

                    import java.util.HashMap;

                    import java.util.Map;

                    @Configuration

                    public class HeadersSubscriberConfig {

                    @Bean

                    public HeadersExchange headersExchange() {

                    return ExchangeBuilder.headersExchange(“exchange.headers.springboot.headers”).build();

                    }

                    @Bean

                    public Queue headersQueue01() {

                    return QueueBuilder.durable(“queue.headers.springboot.01”).build();

                    }

                    @Bean

                    public Queue headersQueue02() {

                    return QueueBuilder.durable(“queue.headers.springboot.02”).build();

                    }

                    @Bean

                    public Queue headersQueue03() {

                    return QueueBuilder.durable(“queue.headers.springboot.03”).build();

                    }

                    @Bean

                    @Resource

                    public Binding headers01Binding(HeadersExchange headersExchange,Queue headersQueue01) {

                    Map key = new HashMap<>(4);

                    key.put(“name”, “java”);

                    key.put(“token”, “001”);

                    return BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match();

                    }

                    @Bean

                    @Resource

                    public Binding headers02Binding(HeadersExchange headersExchange,Queue headersQueue02) {

                    Map key = new HashMap<>(4);

                    key.put(“name”, “java”);

                    key.put(“token”, “002”);

                    return BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match();

                    }

                    @Bean

                    @Resource

                    public Binding headers03Binding(HeadersExchange headersExchange,Queue headersQueue03) {

                    // name和token都需要存在

                    return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll(“name”, “token”).exist();

                    // 任意name或者token存在

                    // return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny(“name”, “token”).exist();

                    }

                    }

                    复制代码

                    队列监听

                    import org.springframework.amqp.rabbit.annotation.RabbitListener;

                    import org.springframework.stereotype.Service;

                    @Service

                    public class HeadersService {

                    @RabbitListener(queues = “queue.headers.springboot.01”)

                    public void headers01Listener(String message) {

                    System.out.println("headers01Listener message = " + message);

                    }

                    @RabbitListener(queues = “queue.headers.springboot.02”)

                    public void headers02Listener(String message) {

                    System.out.println("headers02Listener message = " + message);

                    }

                    @RabbitListener(queues = “queue.headers.springboot.03”)

                    public void headers03Listener(String message) {

                    System.out.println("headers03Listener message = " + message);

                    }

                    }

                    复制代码

                    2.4.3 消息发布订阅

                    1. 发送消息
                    • http://localhost:8071/headers?name=java&token=001&message=headers

                    • http://localhost:8071/headers?name=java&token=002&message=headers

                    • http://localhost:8071/headers?name=mq&token=003&message=headers

                      1. 接收消息

                      headers01Listener message = headers

                      headers02Listener message = headers

                      headers03Listener message = headers

                      headers02Listener message = headers

                      headers03Listener message = headers

                      headers03Listener message = headers

                      三、发送者异常监控

                      =========

                      3.1 发送者异常种类


                      基本处理流程

                      • 补偿(兜底)方案

                        模拟broker宕机:修改发送者端口如5673,然后启动,发送消息,端口不对无法连接主机

                        • 错误信息:java.net.ConnectException: Connection timed out: connect

                        • 补偿方案:加入异常处理,如果不可达则返回错误

                        • 这种错误在发送的时候就已经可以发现,直接将错误返回给调用方即可

                          @RequestMapping(“/direct”)

                          public Object sendEmail(String msg) {

                          try {

                          rabbitTemplate.convertAndSend(“exchange.direct.springboot.email”, “queue.email.routing.key”, msg);

                          return msg;

                          } catch (AmqpException e) {

                          System.out.println(“发送出现异常:” + e.getMessage());

                          return “网络中断,请稍后再试”;

                          }

                          }

                          复制代码

                          模拟无交换器异常

                          • 错误信息

                          • ERROR 4880 — [.200.57.39:5672] o.s.a.r.c.CachingConnectionFactory  : Channel shutdown: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange ‘noExchange’ in vhost ‘/’, class-id=60, method-id=40)

                          • 错误说明:如果没有交换器并不会报错,只会输出一条日志

                          • 补偿方案:需要采用发送回调来确认是否成功发送消息

                            模拟无路由异常

                            • 错误信息:无任何提示,消息直接被丢弃

                            • 补偿方案:需要采用发送回调来确认是否成功发送消息

                              3.2 消息发送回调


                              因为消息是异步发送,所以需要确保消息能正确发送

                              所以可配置RabbitTemplate然后指定回调信息

                              步骤01:修改配置文件,配置回调参数

                              • publisher-confirm-type

                              • org.springframework.boot.autoconfigure.amqp.RabbitProperties#publisherConfirmType

                              • org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType

                                spring:

                                rabbitmq:

                                host: 127.0.0.1

                                port: 5672

                                username: tianxin

                                password: tianxin

                                开启消息发broker回调

                                publisher-confirm-type: correlated

                                开启路由消息路由回调

                                publisher-returns: true

                                强制确认,也可以在代码中开启

                                template:

                                mandatory: true

                                复制代码

                                /**

                                • The type of publisher confirms to use.

                                  */

                                  public enum ConfirmType {

                                  /**

                                  • Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}

                                  • within scoped operations.

                                    */

                                    SIMPLE,

                                    /**

                                    • Use with {@code CorrelationData} to correlate confirmations with sent

                                    • messsages.

                                      */

                                      CORRELATED,

                                      /**

                                      • Publisher confirms are disabled (default).

                                        */

                                        NONE

                                        }

                                        复制代码

                                        步骤02:配置RabbitTemplate,设置交换器确认回调和路由回调

                                        • setConfirmCallback:无论成功与否都会调用

                                        • setReturnCallback:错误时才调用

                                          import org.springframework.amqp.rabbit.connection.ConnectionFactory;

                                          import org.springframework.amqp.rabbit.core.RabbitTemplate;

                                          import org.springframework.context.annotation.Bean;

                                          import org.springframework.context.annotation.Configuration;

                                          import java.util.Objects;

                                          @Configuration

                                          public class CustomRabbitTemplate {

                                          @Bean

                                          public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {

                                          RabbitTemplate rabbitTemplate = new RabbitTemplate();

                                          // 开启mandatory为true才能触发回调方法,无论消息推送结果如何强制调用回调方法

                                          rabbitTemplate.setMandatory(true);

                                          // 设置连接工厂信息

                                          rabbitTemplate.setConnectionFactory(connectionFactory);

                                          // 消息发broker回调:发送者到broker的exchange是否正确找到

                                          rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

                                          System.out.println(“setConfirmCallback 消息数据:” + correlationData);

                                          if (Objects.nonNull(correlationData)) {

                                          System.out.println(“setConfirmCallback 消息数据:” + correlationData.getReturnedMessage());

                                          }

                                          System.out.println(“setConfirmCallback 消息确认:” + ack);

                                          System.out.println(“setConfirmCallback 原因:” + cause);

                                          System.out.println(“-----------------------------------”);

                                          });

                                          // 消息路由回调:从交换器路由到队列是否正确发送

                                          rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

                                          System.out.println(“setReturnCallback 消息:” + message);

                                          System.out.println(“setReturnCallback 回应码:” + replyCode);

                                          System.out.println(“setReturnCallback 回应信息:” + replyText);

                                          System.out.println(“setReturnCallback 交换器:” + exchange);

                                          System.out.println(“setReturnCallback 路由键:” + routingKey);

                                          System.out.println(“-----------------------------------”);

                                          });

                                          return rabbitTemplate;

                                          }

                                          }

                                          复制代码

                                          • 路由回调和消息回调

                                            /**

                                            • A callback for publisher confirmations.

                                            • */

                                              @FunctionalInterface

                                              public interface ConfirmCallback {

                                              /**

                                              • Confirmation callback.

                                              • @param correlationData correlation data for the callback.

                                              • @param ack true for ack, false for nack

                                              • @param cause An optional cause, for nack, when available, otherwise null.

                                                */

                                                void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);

                                                }

                                                /**

                                                • A callback for returned messages.

                                                • */

                                                  @FunctionalInterface

                                                  public interface ReturnCallback {

                                                  /**

                                                  • Returned message callback.

                                                  • @param message the returned message.

                                                  • @param replyCode the reply code.

                                                  • @param replyText the reply text.

                                                    自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

                                                    深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

                                                    因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

                                                    既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

                                                    由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

                                                    如果你觉得这些内容对你有帮助,可以添加V:vip1024b 备注Java获取(资料价值较高,非无偿)

                                                    总结

                                                    以上是字节二面的一些问题,面完之后其实挺后悔的,没有提前把各个知识点都复习到位。现在重新好好复习手上的面试大全资料(含JAVA、MySQL、算法、Redis、JVM、架构、中间件、RabbitMQ、设计模式、Spring等),现在起闭关修炼半个月,争取早日上岸!!!

                                                    下面给大家分享下我的面试大全资料

                                                    • 第一份是我的后端JAVA面试大全

                                                      后端JAVA面试大全

                                                      • 第二份是MySQL+Redis学习笔记+算法+JVM+JAVA核心知识整理

                                                        MySQL+Redis学习笔记算法+JVM+JAVA核心知识整理

                                                        • 第三份是Spring全家桶资料

                                                          MySQL+Redis学习笔记算法+JVM+JAVA核心知识整理

                                                          e reply text.

                                                          自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

                                                          深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

                                                          因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

                                                          [外链图片转存中…(img-WED4P5Ty-1711600909781)]

                                                          [外链图片转存中…(img-O29ZT15t-1711600909781)]

                                                          [外链图片转存中…(img-xGK1n5C5-1711600909782)]

                                                          [外链图片转存中…(img-cNdUo5Y5-1711600909782)]

                                                          既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

                                                          [外链图片转存中…(img-I9s1jq4I-1711600909783)]

                                                          [外链图片转存中…(img-tEYVOTho-1711600909783)]

                                                          由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

                                                          如果你觉得这些内容对你有帮助,可以添加V:vip1024b 备注Java获取(资料价值较高,非无偿)

                                                          [外链图片转存中…(img-8rfekxS2-1711600909784)]

                                                          总结

                                                          以上是字节二面的一些问题,面完之后其实挺后悔的,没有提前把各个知识点都复习到位。现在重新好好复习手上的面试大全资料(含JAVA、MySQL、算法、Redis、JVM、架构、中间件、RabbitMQ、设计模式、Spring等),现在起闭关修炼半个月,争取早日上岸!!!

                                                          下面给大家分享下我的面试大全资料

                                                          • 第一份是我的后端JAVA面试大全

                                                            [外链图片转存中…(img-IqyOA3w6-1711600909784)]

                                                            后端JAVA面试大全

                                                            • 第二份是MySQL+Redis学习笔记+算法+JVM+JAVA核心知识整理

                                                              [外链图片转存中…(img-X2NWQdFn-1711600909785)]

                                                              MySQL+Redis学习笔记算法+JVM+JAVA核心知识整理

                                                              • 第三份是Spring全家桶资料

                                                                [外链图片转存中…(img-Z5hotNmA-1711600909785)]

                                                                MySQL+Redis学习笔记算法+JVM+JAVA核心知识整理