Rabbitmq(三)

RabbitMQ模式

    • 订阅与发布模式(Publish/Subscribe)
      • 消费者代码
      • 生产者代码
      • 路由模式(Routing)
        • 生产者代码
        • 消费者代码
        • RPC模式

          订阅与发布模式(Publish/Subscribe)

          Fanout这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有该交换机的类型

          如下图,我们将fanout类型交换机绑定到两个临时队列中,并通过该交换机将消息发布给两个消费者(消息依旧只能被消费一次)

          消费者代码

           /*
           * 消费者01
           * 消息接收
           * */
           public class ReceiveLogs01 { //交换机名称
               public static final String EXCHANGE_NAME = "logs";
           
               public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel();
                   channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
               //声明一个队列,名称随机,当消费者断开与队列的连接时,队列自动删除
               String queueName = channel.queueDeclare().getQueue();
               //把该临时队列绑定我们的 exchange,其中 routingkey(也称之为 binding key)为空字符串
               channel.queueBind(queueName,EXCHANGE_NAME,"");
               System.out.println("等待接受消息,把接受到的消息打印在屏幕上...");
               DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println("ReceiveLogs01控制台打印接受到的消息:" + new String(message.getBody()));
               };
               channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
           }
          }
          
           /*
           * 消费者02
           * 消息接收
           * */
           public class ReceiveLogs02 { //交换机名称
               public static final String EXCHANGE_NAME = "logs";
           
               public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel();
                   channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
               //声明一个临时队列,名称随机,当消费者断开与队列的连接时,队列自动删除(临时队列的特性)
               String queueName = channel.queueDeclare().getQueue();
               //绑定交换机与队列
               channel.queueBind(queueName,EXCHANGE_NAME,"");
               System.out.println("等待接受消息,把接受到的消息打印在屏幕上...");
               DeliverCallback deliverCallback = (consumerTag,message) -> { System.out.println("ReceiveLogs02控制台打印接受到的消息:" + new String(message.getBody()));
               };
               channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
           }
          }
          

          生产者代码

           /*
           *  生产者
           *  发消息 交换机
           * */
           public class Emitlog { // 交换机的名称
               public  static  final String EXCHANGE_NAME = "logs";
           
               public static void main(String[] args) throws  Exception{ Channel channel = RabbitMqUtils.getChannel();
                   /**
                  * 声明一个 exchange
                  * 1.exchange 的名称
                  * 2.exchange 的类型
                  */
                   channel.exchangeDeclare(EXCHANGE_NAME,"fauout");
               Scanner scanner = new Scanner(System.in);
               while (scanner.hasNext()){ String message = scanner.next();
                   channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
                   System.out.println("生产者发出的消息:"+ message);
               }
           }
          }
          

          路由模式(Routing)

          路由模式是指我们的队列能够接收交换机传来的指定的消息,而不是所有的消息,所以路由模式采用 direct 这种类型的交换机来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。

          在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.。

          在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black / green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

          如果交换机的绑定类型是是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多。

          生产者代码

          public class DirectLogs { // 交换机的名称
           public  static  final String EXCHANGE_NAME = "direct_logs";
           public static void main(String[] args) throws  Exception{ Channel channel = RabbitMqUtils.getChannel();
               //BuiltinExchangeType.DIRECT 交换机的枚举类型
               channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
               //创建多个 bindingKey
               Map bindingKeyMap = new HashMap<>();
               bindingKeyMap.put("info","普通 info 信息");
               bindingKeyMap.put("warning","警告 warning 信息");
               bindingKeyMap.put("error","错误 error 信息");
               
               //debug 没有消费这接收这个消息 所有就丢失了
               bindingKeyMap.put("debug","调试 debug 信息");
               for (Map.Entry bindingKeyEntry: bindingKeyMap.entrySet())
               { String bindingKey = bindingKeyEntry.getKey();
                   String message = bindingKeyEntry.getValue();
                   
                   channel.basicPublish(EXCHANGE_NAME,bindingKey, null,message.getBytes("UTF-8"));
                   System.out.println("生产者发出消息:" + message);
               }
           }
          }
          

          消费者代码

          public class ReceiveLogsDirect01 { public static final String EXCHANGE_NAME = "direct_logs";
              public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                  //声明一个队列
                  channel.queueDeclare("console",false,false,false,null);
                  //绑定交换机与队列
                  channel.queueBind("console",EXCHANGE_NAME,"info");
                  channel.queueBind("console",EXCHANGE_NAME,"warning");
                  
                  DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogsDirect01控制台打印接受到的消息:" + new String(message.getBody()));
                  };
                  channel.basicConsume("console",true,deliverCallback,consumerTag -> {});
              }
          }
          
          public class ReceiveLogsDirect02 { public static final String EXCHANGE_NAME = "direct_logs";
           public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
               channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
               //声明一个队列
               channel.queueDeclare("disk",false,false,false,null);
               //绑定交换机与队列
               channel.queueBind("disk",EXCHANGE_NAME,"error");
               DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogsDirect02控制台打印接受到的消息:" + new String(message.getBody()));
               };
               channel.basicConsume("disk",true,deliverCallback,consumerTag -> {});
           }
          }
          

          RPC模式

          RPC即客户端远程调用服务端的方法,使用MQ可以实现RPC的异步调用,基于Direct交换机实现。

          • 在RabbitMQ中,使用RPC通信方式的应用程序被称为RPC客户端,而提供服务的应用程序被称为RPC服务器。

          • 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回结果。

          • 服务端将RPC方法的结果发送到RPC响应队列

          • 客户端接收Reply队列的消息

            虽然RabbitMQ的RPC通信方式可以实现客户端和服务端之间的解耦,实现远程调用,但也会涉及一些缺点:

            性能开销:RPC通信方式需要在客户端和服务端之间进行多次网络通信,而每次网络通信都会带来一定的性能开销,因此在高并发场景下,RPC通信可能会影响系统的性能。

            实现复杂度:RPC通信方式需要客户端和服务端共同定义一套RPC协议,包括消息格式、参数类型和返回值类型等,这需要一定的实现复杂度,尤其是在跨语言和跨平台的场景中。

            依赖可靠的消息代理:RPC通信需要可靠的消息代理来实现消息传递和消息保证,而RabbitMQ等消息代理的可靠性需要进行专门的配置和管理,增加了运维成本。

            安全性:由于RPC通信涉及到网络传输,可能会面临数据被劫持、篡改和窃听等安全风险,因此需要采取相应的安全措施,如加密、认证和授权等。

            该模式作为了解即可,实际应用并不广泛。具体详见官网:https://www.rabbitmq.com/tutorials