3.订阅者会轮流收到信息
receiver01 message = direct
receiver02 message = direct
receiver01 message = direct
receiver02 message = direct
receiver01 message = direct
receiver02 message = direct
复制代码
2.2 topic - 主题交换器
2.2.1 消息发送者
声明topic交换器
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BlogPublisherConfig {
@Bean
public Exchange blogTopicExchange() {
return ExchangeBuilder.topicExchange(“exchange.topic.springboot.blog”).build();
}
}
复制代码
声明controller
@RequestMapping(“/topic”)
public Object topic(String routingKey, String message) {
rabbitTemplate.convertAndSend(“exchange.topic.springboot.blog”, routingKey, message);
return routingKey + " : " + message;
}
复制代码
2.2.2 消息接收者
声明交换器、三个队列、队列的绑定
-
*:匹配一个串
-
#:匹配一个或者多个串
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class BlogSubscriberConfig {
/**
- 主题交换器
*/
@Bean
public TopicExchange blogTopicExchange() {
return ExchangeBuilder.topicExchange(“exchange.topic.springboot.blog”).build();
}
@Bean
public Queue blogJavaQueue() {
return QueueBuilder.durable(“queue.topic.springboot.blog.java”).build();
}
@Bean
public Queue blogMqQueue() {
return QueueBuilder.durable(“queue.topic.springboot.blog.mq”).build();
}
@Bean
public Queue blogAllQueue() {
return QueueBuilder.durable(“queue.topic.springboot.blog.all”).build();
}
@Bean
@Resource
public Binding blogJavaBinding(TopicExchange blogTopicExchange, Queue blogJavaQueue) {
return BindingBuilder.bind(blogJavaQueue).to(blogTopicExchange).with(“springboot.blog.java.routing.key”);
}
@Bean
@Resource
public Binding blogMqBinding(TopicExchange blogTopicExchange, Queue blogMqQueue) {
return BindingBuilder.bind(blogMqQueue).to(blogTopicExchange).with(“springboot.blog.mq.routing.key”);
}
@Bean
@Resource
public Binding blogAllBinding(TopicExchange blogTopicExchange, Queue blogAllQueue) {
// #: 匹配一个或者多个 *:匹配一个
return BindingBuilder.bind(blogAllQueue).to(blogTopicExchange).with(“springboot.blog.#.routing.key”);
}
}
复制代码
监听队列
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class BlogService {
/**
- topic监听
*/
@RabbitListener(queues = “queue.topic.springboot.blog.java”)
public void blogJavaListener(String message) {
System.out.println("blogJavaListener message = " + message);
}
@RabbitListener(queues = “queue.topic.springboot.blog.mq”)
public void blogMqListener(String message) {
System.out.println("blogMqListener message = " + message);
}
@RabbitListener(queues = “queue.topic.springboot.blog.all”)
public void blogAllaListener(String message) {
System.out.println("blogAllListener message = " + message);
}
}
复制代码
2.2.3 消息发布订阅
- 发布者发送消息
-
http://localhost:8071/topic?routingKey=springboot.blog.java.routing.key&message=hello
-
http://localhost:8071/topic?routingKey=springboot.blog.mq.routing.key&message=hello
- 订阅者收到消息
-
全匹配和模糊匹配
-
全匹配无论是哪个都会被匹配上
blogJavaListener message = hello
blogAllListener message = hello
blogAllListener message = hello
blogMqListener message = hello
复制代码
2.3 fanout - 广播交换器
2.3.1 消息发送者
声明fanout交换器
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NoticePublisherConfig {
@Bean
public Exchange radioFanoutExchange() {
return ExchangeBuilder.fanoutExchange(“exchange.fanout.springboot.radio”).build();
}
}
复制代码
声明controller
@RequestMapping(“/fanout”)
public Object fanout(String message) {
rabbitTemplate.convertAndSend(“exchange.fanout.springboot.radio”, null, message);
return message;
}
复制代码
2.32 消息接收者
创建交换器、路由键、绑定
- 不需要使用路由键
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class NoticeSubscriberConfig {
@Bean
public FanoutExchange radioFanoutExchange() {
return ExchangeBuilder.fanoutExchange(“exchange.fanout.springboot.radio”).build();
}
@Bean
public Queue radioQueue() {
return QueueBuilder.durable(“queue.fanout.springboot.radio”).build();
}
@Bean
@Resource
public Binding radioBinding(FanoutExchange radioFanoutExchange, Queue radioQueue) {
// 广播交换器绑定没有路由键,只要绑定即可收到
return BindingBuilder.bind(radioQueue).to(radioFanoutExchange);
}
}
复制代码
监听队列
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class NoticeService {
@RabbitListener(queues = “queue.fanout.springboot.radio”)
public void radioListener(String message) {
System.out.println("radioListener message = " + message);
}
}
复制代码
2.3.3 消息发布订阅
发布者发送消息
- http://localhost:8071/fanout?message=fanout
订阅者收到消息
radioListener message = fanout
复制代码
2.4 headers - 头交换器
2.4.1 消息发送者
-
headers模式通过头匹配,会忽略路由键
-
发送者需要创建队列
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HeadersPublisherConfig {
@Bean
public Exchange radioHeadersExchange() {
return ExchangeBuilder.headersExchange(“exchange.headers.springboot.headers”).build();
}
}
复制代码
创建controller发送消息
-
MessageProperties和Message包是:org.springframework.amqp.core
-
需要创建MessageProperties对象用于设置头信息
-
Message用于存储消息和消息属性信息
@RequestMapping(“/headers”)
public Object headers(@RequestParam Map
param) { MessageProperties properties = new MessageProperties();
properties.setHeader(“name”, param.get(“name”));
properties.setHeader(“token”, param.get(“token”));
Message mqMessage = new Message(param.get(“message”).getBytes(), properties);
rabbitTemplate.convertAndSend(“exchange.headers.springboot.headers”, null, mqMessage);
return properties;
}
复制代码
2.4.2 消息接收者
接收者和上面三种一样,同样需要声明交换器、队列、绑定
-
在队列绑定时需要使用不同规则
-
BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()
-
所有字段属性和值全部匹配
-
BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match()
-
任意字段属性和值全部匹配
-
BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll(“name”, “token”).exist()
-
指定所有属性字段存在
-
BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny(“name”, “token”).exist()
-
指定任意属性存在
-
headerMap中存放的属性就是发送者中封装的属性,属性完全匹配则正确路由到此处
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class HeadersSubscriberConfig {
@Bean
public HeadersExchange headersExchange() {
return ExchangeBuilder.headersExchange(“exchange.headers.springboot.headers”).build();
}
@Bean
public Queue headersQueue01() {
return QueueBuilder.durable(“queue.headers.springboot.01”).build();
}
@Bean
public Queue headersQueue02() {
return QueueBuilder.durable(“queue.headers.springboot.02”).build();
}
@Bean
public Queue headersQueue03() {
return QueueBuilder.durable(“queue.headers.springboot.03”).build();
}
@Bean
@Resource
public Binding headers01Binding(HeadersExchange headersExchange,Queue headersQueue01) {
Map
key = new HashMap<>(4); key.put(“name”, “java”);
key.put(“token”, “001”);
return BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match();
}
@Bean
@Resource
public Binding headers02Binding(HeadersExchange headersExchange,Queue headersQueue02) {
Map
key = new HashMap<>(4); key.put(“name”, “java”);
key.put(“token”, “002”);
return BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match();
}
@Bean
@Resource
public Binding headers03Binding(HeadersExchange headersExchange,Queue headersQueue03) {
// name和token都需要存在
return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll(“name”, “token”).exist();
// 任意name或者token存在
// return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny(“name”, “token”).exist();
}
}
复制代码
队列监听
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class HeadersService {
@RabbitListener(queues = “queue.headers.springboot.01”)
public void headers01Listener(String message) {
System.out.println("headers01Listener message = " + message);
}
@RabbitListener(queues = “queue.headers.springboot.02”)
public void headers02Listener(String message) {
System.out.println("headers02Listener message = " + message);
}
@RabbitListener(queues = “queue.headers.springboot.03”)
public void headers03Listener(String message) {
System.out.println("headers03Listener message = " + message);
}
}
复制代码
2.4.3 消息发布订阅
- 发送消息
-
http://localhost:8071/headers?name=java&token=001&message=headers
-
http://localhost:8071/headers?name=java&token=002&message=headers
-
http://localhost:8071/headers?name=mq&token=003&message=headers
- 接收消息
headers01Listener message = headers
headers02Listener message = headers
headers03Listener message = headers
headers02Listener message = headers
headers03Listener message = headers
headers03Listener message = headers
三、发送者异常监控
=========
3.1 发送者异常种类
基本处理流程
- 补偿(兜底)方案
模拟broker宕机:修改发送者端口如5673,然后启动,发送消息,端口不对无法连接主机
-
错误信息:java.net.ConnectException: Connection timed out: connect
-
补偿方案:加入异常处理,如果不可达则返回错误
-
这种错误在发送的时候就已经可以发现,直接将错误返回给调用方即可
@RequestMapping(“/direct”)
public Object sendEmail(String msg) {
try {
rabbitTemplate.convertAndSend(“exchange.direct.springboot.email”, “queue.email.routing.key”, msg);
return msg;
} catch (AmqpException e) {
System.out.println(“发送出现异常:” + e.getMessage());
return “网络中断,请稍后再试”;
}
}
复制代码
模拟无交换器异常
-
错误信息
-
ERROR 4880 — [.200.57.39:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method
(reply-code=404, reply-text=NOT_FOUND - no exchange ‘noExchange’ in vhost ‘/’, class-id=60, method-id=40) -
错误说明:如果没有交换器并不会报错,只会输出一条日志
-
补偿方案:需要采用发送回调来确认是否成功发送消息
模拟无路由异常
-
错误信息:无任何提示,消息直接被丢弃
-
补偿方案:需要采用发送回调来确认是否成功发送消息
3.2 消息发送回调
因为消息是异步发送,所以需要确保消息能正确发送
所以可配置RabbitTemplate然后指定回调信息
步骤01:修改配置文件,配置回调参数
-
publisher-confirm-type
-
org.springframework.boot.autoconfigure.amqp.RabbitProperties#publisherConfirmType
-
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: tianxin
password: tianxin
开启消息发broker回调
publisher-confirm-type: correlated
开启路由消息路由回调
publisher-returns: true
强制确认,也可以在代码中开启
template:
mandatory: true
复制代码
/**
- The type of publisher confirms to use.
*/
public enum ConfirmType {
/**
-
Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
-
within scoped operations.
*/
SIMPLE,
/**
-
Use with {@code CorrelationData} to correlate confirmations with sent
-
messsages.
*/
CORRELATED,
/**
- Publisher confirms are disabled (default).
*/
NONE
}
复制代码
步骤02:配置RabbitTemplate,设置交换器确认回调和路由回调
-
setConfirmCallback:无论成功与否都会调用
-
setReturnCallback:错误时才调用
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Objects;
@Configuration
public class CustomRabbitTemplate {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 开启mandatory为true才能触发回调方法,无论消息推送结果如何强制调用回调方法
rabbitTemplate.setMandatory(true);
// 设置连接工厂信息
rabbitTemplate.setConnectionFactory(connectionFactory);
// 消息发broker回调:发送者到broker的exchange是否正确找到
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println(“setConfirmCallback 消息数据:” + correlationData);
if (Objects.nonNull(correlationData)) {
System.out.println(“setConfirmCallback 消息数据:” + correlationData.getReturnedMessage());
}
System.out.println(“setConfirmCallback 消息确认:” + ack);
System.out.println(“setConfirmCallback 原因:” + cause);
System.out.println(“-----------------------------------”);
});
// 消息路由回调:从交换器路由到队列是否正确发送
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println(“setReturnCallback 消息:” + message);
System.out.println(“setReturnCallback 回应码:” + replyCode);
System.out.println(“setReturnCallback 回应信息:” + replyText);
System.out.println(“setReturnCallback 交换器:” + exchange);
System.out.println(“setReturnCallback 路由键:” + routingKey);
System.out.println(“-----------------------------------”);
});
return rabbitTemplate;
}
}
复制代码
- 路由回调和消息回调
/**
-
A callback for publisher confirmations.
*/
@FunctionalInterface
public interface ConfirmCallback {
/**
-
Confirmation callback.
-
@param correlationData correlation data for the callback.
-
@param ack true for ack, false for nack
-
@param cause An optional cause, for nack, when available, otherwise null.
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}
/**
-
A callback for returned messages.
*/
@FunctionalInterface
public interface ReturnCallback {
/**
-
Returned message callback.
-
@param message the returned message.
-
@param replyCode the reply code.
-
@param replyText the reply text.
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加V:vip1024b 备注Java获取(资料价值较高,非无偿)
总结
以上是字节二面的一些问题,面完之后其实挺后悔的,没有提前把各个知识点都复习到位。现在重新好好复习手上的面试大全资料(含JAVA、MySQL、算法、Redis、JVM、架构、中间件、RabbitMQ、设计模式、Spring等),现在起闭关修炼半个月,争取早日上岸!!!
下面给大家分享下我的面试大全资料
- 第一份是我的后端JAVA面试大全
后端JAVA面试大全
- 第二份是MySQL+Redis学习笔记+算法+JVM+JAVA核心知识整理
MySQL+Redis学习笔记算法+JVM+JAVA核心知识整理
- 第三份是Spring全家桶资料
MySQL+Redis学习笔记算法+JVM+JAVA核心知识整理
e reply text.
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
[外链图片转存中…(img-WED4P5Ty-1711600909781)]
[外链图片转存中…(img-O29ZT15t-1711600909781)]
[外链图片转存中…(img-xGK1n5C5-1711600909782)]
[外链图片转存中…(img-cNdUo5Y5-1711600909782)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
[外链图片转存中…(img-I9s1jq4I-1711600909783)]
[外链图片转存中…(img-tEYVOTho-1711600909783)]
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加V:vip1024b 备注Java获取(资料价值较高,非无偿)
[外链图片转存中…(img-8rfekxS2-1711600909784)]
总结
以上是字节二面的一些问题,面完之后其实挺后悔的,没有提前把各个知识点都复习到位。现在重新好好复习手上的面试大全资料(含JAVA、MySQL、算法、Redis、JVM、架构、中间件、RabbitMQ、设计模式、Spring等),现在起闭关修炼半个月,争取早日上岸!!!
下面给大家分享下我的面试大全资料
- 第一份是我的后端JAVA面试大全
[外链图片转存中…(img-IqyOA3w6-1711600909784)]
后端JAVA面试大全
- 第二份是MySQL+Redis学习笔记+算法+JVM+JAVA核心知识整理
[外链图片转存中…(img-X2NWQdFn-1711600909785)]
MySQL+Redis学习笔记算法+JVM+JAVA核心知识整理
- 第三份是Spring全家桶资料
[外链图片转存中…(img-Z5hotNmA-1711600909785)]
MySQL+Redis学习笔记算法+JVM+JAVA核心知识整理
- 第三份是Spring全家桶资料
- 第二份是MySQL+Redis学习笔记+算法+JVM+JAVA核心知识整理
- 第一份是我的后端JAVA面试大全
- 第三份是Spring全家桶资料
- 第二份是MySQL+Redis学习笔记+算法+JVM+JAVA核心知识整理
- 第一份是我的后端JAVA面试大全
-
-
-
-
- 路由回调和消息回调
-
- Publisher confirms are disabled (default).
-
-
- The type of publisher confirms to use.
-
-
-
-
-
-
- http://localhost:8071/fanout?message=fanout
- 不需要使用路由键
- topic监听
- 主题交换器