RabbitMQ模式
- 订阅与发布模式(Publish/Subscribe)
- 消费者代码
- 生产者代码
- 路由模式(Routing)
- 生产者代码
- 消费者代码
- RPC模式
订阅与发布模式(Publish/Subscribe)
Fanout这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有该交换机的类型
如下图,我们将fanout类型交换机绑定到两个临时队列中,并通过该交换机将消息发布给两个消费者(消息依旧只能被消费一次)
消费者代码
/* * 消费者01 * 消息接收 * */ public class ReceiveLogs01 { //交换机名称 public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //声明一个队列,名称随机,当消费者断开与队列的连接时,队列自动删除 String queueName = channel.queueDeclare().getQueue(); //把该临时队列绑定我们的 exchange,其中 routingkey(也称之为 binding key)为空字符串 channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("等待接受消息,把接受到的消息打印在屏幕上..."); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println("ReceiveLogs01控制台打印接受到的消息:" + new String(message.getBody())); }; channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {}); } }
/* * 消费者02 * 消息接收 * */ public class ReceiveLogs02 { //交换机名称 public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //声明一个临时队列,名称随机,当消费者断开与队列的连接时,队列自动删除(临时队列的特性) String queueName = channel.queueDeclare().getQueue(); //绑定交换机与队列 channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("等待接受消息,把接受到的消息打印在屏幕上..."); DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println("ReceiveLogs02控制台打印接受到的消息:" + new String(message.getBody())); }; channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {}); } }
生产者代码
/* * 生产者 * 发消息 交换机 * */ public class Emitlog { // 交换机的名称 public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); /** * 声明一个 exchange * 1.exchange 的名称 * 2.exchange 的类型 */ channel.exchangeDeclare(EXCHANGE_NAME,"fauout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出的消息:"+ message); } } }
路由模式(Routing)
路由模式是指我们的队列能够接收交换机传来的指定的消息,而不是所有的消息,所以路由模式采用 direct 这种类型的交换机来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.。
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black / green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
如果交换机的绑定类型是是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多。
生产者代码
public class DirectLogs { // 交换机的名称 public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //BuiltinExchangeType.DIRECT 交换机的枚举类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建多个 bindingKey Map
bindingKeyMap = new HashMap<>(); bindingKeyMap.put("info","普通 info 信息"); bindingKeyMap.put("warning","警告 warning 信息"); bindingKeyMap.put("error","错误 error 信息"); //debug 没有消费这接收这个消息 所有就丢失了 bindingKeyMap.put("debug","调试 debug 信息"); for (Map.Entry bindingKeyEntry: bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null,message.getBytes("UTF-8")); System.out.println("生产者发出消息:" + message); } } } 消费者代码
public class ReceiveLogsDirect01 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个队列 channel.queueDeclare("console",false,false,false,null); //绑定交换机与队列 channel.queueBind("console",EXCHANGE_NAME,"info"); channel.queueBind("console",EXCHANGE_NAME,"warning"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogsDirect01控制台打印接受到的消息:" + new String(message.getBody())); }; channel.basicConsume("console",true,deliverCallback,consumerTag -> {}); } }
public class ReceiveLogsDirect02 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个队列 channel.queueDeclare("disk",false,false,false,null); //绑定交换机与队列 channel.queueBind("disk",EXCHANGE_NAME,"error"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogsDirect02控制台打印接受到的消息:" + new String(message.getBody())); }; channel.basicConsume("disk",true,deliverCallback,consumerTag -> {}); } }
RPC模式
RPC即客户端远程调用服务端的方法,使用MQ可以实现RPC的异步调用,基于Direct交换机实现。
-
在RabbitMQ中,使用RPC通信方式的应用程序被称为RPC客户端,而提供服务的应用程序被称为RPC服务器。
-
服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回结果。
-
服务端将RPC方法的结果发送到RPC响应队列
-
客户端接收Reply队列的消息
虽然RabbitMQ的RPC通信方式可以实现客户端和服务端之间的解耦,实现远程调用,但也会涉及一些缺点:
性能开销:RPC通信方式需要在客户端和服务端之间进行多次网络通信,而每次网络通信都会带来一定的性能开销,因此在高并发场景下,RPC通信可能会影响系统的性能。
实现复杂度:RPC通信方式需要客户端和服务端共同定义一套RPC协议,包括消息格式、参数类型和返回值类型等,这需要一定的实现复杂度,尤其是在跨语言和跨平台的场景中。
依赖可靠的消息代理:RPC通信需要可靠的消息代理来实现消息传递和消息保证,而RabbitMQ等消息代理的可靠性需要进行专门的配置和管理,增加了运维成本。
安全性:由于RPC通信涉及到网络传输,可能会面临数据被劫持、篡改和窃听等安全风险,因此需要采取相应的安全措施,如加密、认证和授权等。
该模式作为了解即可,实际应用并不广泛。具体详见官网:https://www.rabbitmq.com/tutorials
-