[转帖]RabbitMQ:Exchange的Fanout类型的介绍和使用

rabbitmq,exchange,fanout,类型,介绍,使用 · 浏览次数 : 0

小编点评

**Fanout类型的Exchange测试结果** **发送消息到内部消息队列时,自动广播到其他消息队列中** * 通过测试发现,当发送消息到一个Fanout类型的Exchange时,该消息会自动广播到该Exchange下的所有其他消息队列中。 * 每个消费者都可以获取到发送的消息,并从多个队列中选择接收。 * 这意味着,一个Fanout类型的Exchange可以同时处理多个消费者,并实现广播功能。 **测试步骤:** 1. 启动三个消费者。 2. 启动生产者查看情况。 3. 发送消息到一个Fanout类型的Exchange的任何一个消息队列。 4. 观察消息队列中的消息数量。 **结果:** 发现发送消息到一个队列的时候,自动广播到其他队列中,并且每个消费者都获取了消息。 **结论:** * Fanout类型的Exchange支持广播功能。 * 当发送消息到内部的任何一个消息队列时,都会自动广播到其他队列中。 * 每个消费者都可以获取到发送的消息,并从多个队列中选择接收。

正文

https://blog.csdn.net/weixin_45492007/article/details/106095591

1.声明

当前的内容用于本人学习和使用Fanout类型的Exchange,主要理解其主要作用

2.Fanout Exchange的官方介绍

  1. 扇出交换机将消息路由到与其绑定的所有队列,并且路由键将被忽略。如果将N个队列绑定到扇出交换,则将新消息发布到该交换时,会将消息的副本传递到所有N个队列。扇出交换机非常适合消息的广播路由

  2. 因为扇出交换将消息的副本发送到绑定到它的每个队列,所以它的用例非常相似:

    2.1 大型多人在线(MMO)游戏可以将其用于排行榜更新或其他全球性事件
    2.2 体育新闻网站可以使用扇出交换以近乎实时的方式向移动客户端分发得分更新
    2.3 分布式系统可以广播各种状态和配置更新
    2.4 群组聊天可以使用扇出交换在参与者之间分发消息(尽管AMQP没有内置的在线状态概念,因此XMPP可能是更好的选择)

个人理解就是:如果使用交换类型为Fanout类型的Exchange的时候,如果该Exchange中具有多个queue队列,如果向该Fanout类型的Exchange发送消息,那么所有的队列都可以获得该消息的副本,从而实现广播

在这里插入图片描述

3.模拟和实现该Fanout广播

1.首先创建Fanout类型的Exchange
在这里插入图片描述
当前的Durability使用默认值Durable

2.创建三个队列world1、world2、world3
在这里插入图片描述

3.将这三个队列绑定到fanoutTest这个Exchange上面
在这里插入图片描述
这里不添加任何Routing Key和Arguments

4.创建三个消费者

public class World1MessageReceiver {
	private final static String QUEUE_NAME = "world1";
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span>String<span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
	RabbitMqUtils mqUtils <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">RabbitMqUtils</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
	mqUtils<span class="token punctuation">.</span><span class="token function">reciver</span><span class="token punctuation">(</span>QUEUE_NAME<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> <span class="token punctuation">(</span>consumerTag<span class="token punctuation">,</span> delivery<span class="token punctuation">)</span> <span class="token operator">-</span><span class="token operator">&gt;</span> <span class="token punctuation">{<!-- --></span>
		System<span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">"【world1】==&gt;"</span> <span class="token operator">+</span> <span class="token keyword">new</span> <span class="token class-name">String</span><span class="token punctuation">(</span>delivery<span class="token punctuation">.</span><span class="token function">getBody</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">"utf-8"</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
	<span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>

}

    public class World2MessageReceiver {
    	private final static String QUEUE_NAME = "world2";
    
    <span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span>String<span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
    	RabbitMqUtils mqUtils <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">RabbitMqUtils</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    	mqUtils<span class="token punctuation">.</span><span class="token function">reciver</span><span class="token punctuation">(</span>QUEUE_NAME<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> <span class="token punctuation">(</span>consumerTag<span class="token punctuation">,</span> delivery<span class="token punctuation">)</span> <span class="token operator">-</span><span class="token operator">&gt;</span> <span class="token punctuation">{<!-- --></span>
    		System<span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">"【world2】==&gt;"</span> <span class="token operator">+</span> <span class="token keyword">new</span> <span class="token class-name">String</span><span class="token punctuation">(</span>delivery<span class="token punctuation">.</span><span class="token function">getBody</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">"utf-8"</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    	<span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>
    

    }

      public class World3MessageReceiver {
      	private final static String QUEUE_NAME = "world3";
      
      <span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span>String<span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
      	RabbitMqUtils mqUtils <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">RabbitMqUtils</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      	mqUtils<span class="token punctuation">.</span><span class="token function">reciver</span><span class="token punctuation">(</span>QUEUE_NAME<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> <span class="token punctuation">(</span>consumerTag<span class="token punctuation">,</span> delivery<span class="token punctuation">)</span> <span class="token operator">-</span><span class="token operator">&gt;</span> <span class="token punctuation">{<!-- --></span>
      		System<span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">"【world3】==&gt;"</span> <span class="token operator">+</span> <span class="token keyword">new</span> <span class="token class-name">String</span><span class="token punctuation">(</span>delivery<span class="token punctuation">.</span><span class="token function">getBody</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">"utf-8"</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      	<span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token punctuation">}</span>
      

      }

        5.创建消息生产者

        public class WorldMessageSender {
        	private final static String QUEUE_NAME = "world1";// 这个随便写
        
        <span class="token annotation punctuation">@Test</span>
        <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">testSender</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
        	RabbitMqUtils mqUtils <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">RabbitMqUtils</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        	<span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">int</span> i <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span> i <span class="token operator">&lt;</span> <span class="token number">10</span><span class="token punctuation">;</span> i<span class="token operator">++</span><span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
        		mqUtils<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token string">"fanoutTest"</span><span class="token punctuation">,</span> QUEUE_NAME<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">"你好,世界!"</span> <span class="token operator">+</span> i<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        	<span class="token punctuation">}</span>
        
        <span class="token punctuation">}</span>
        

        }

          注意这里需要修改原来的RabbitMqUtils中的部分内容

          
          	public boolean send(final String exchange, final String queue, final String msg) {
          		return send(exchange, queue, false, msg);
          	}
          
          <span class="token keyword">public</span> <span class="token keyword">boolean</span> <span class="token function">send</span><span class="token punctuation">(</span><span class="token keyword">final</span> String exchange<span class="token punctuation">,</span> <span class="token keyword">final</span> String queue<span class="token punctuation">,</span> <span class="token keyword">boolean</span> durable<span class="token punctuation">,</span> <span class="token keyword">final</span> String msg<span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
          	<span class="token keyword">try</span> <span class="token punctuation">(</span>Connection connection <span class="token operator">=</span> factory<span class="token punctuation">.</span><span class="token function">newConnection</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span> Channel channel <span class="token operator">=</span> connection<span class="token punctuation">.</span><span class="token function">createChannel</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
          		channel<span class="token punctuation">.</span><span class="token function">queueDeclare</span><span class="token punctuation">(</span>queue<span class="token punctuation">,</span> durable<span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> null<span class="token punctuation">)</span><span class="token punctuation">;</span>
          		channel<span class="token punctuation">.</span><span class="token function">basicPublish</span><span class="token punctuation">(</span>exchange<span class="token punctuation">,</span> queue<span class="token punctuation">,</span> null<span class="token punctuation">,</span> msg<span class="token punctuation">.</span><span class="token function">getBytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
          		<span class="token keyword">return</span> <span class="token boolean">true</span><span class="token punctuation">;</span>
          	<span class="token punctuation">}</span> <span class="token keyword">catch</span> <span class="token punctuation">(</span><span class="token class-name">IOException</span> <span class="token operator">|</span> TimeoutException e<span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
          		e<span class="token punctuation">.</span><span class="token function">printStackTrace</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
          		<span class="token keyword">return</span> <span class="token boolean">false</span><span class="token punctuation">;</span>
          	<span class="token punctuation">}</span>
          <span class="token punctuation">}</span>
          
          <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">reciver</span><span class="token punctuation">(</span><span class="token keyword">final</span> String queue<span class="token punctuation">,</span> DeliverCallback deliverCallback<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
          	<span class="token function">reciver</span><span class="token punctuation">(</span>queue<span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> deliverCallback<span class="token punctuation">)</span><span class="token punctuation">;</span>
          <span class="token punctuation">}</span>
          
          <span class="token comment">// 修复问题</span>
          <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">reciver</span><span class="token punctuation">(</span><span class="token keyword">final</span> String queue<span class="token punctuation">,</span> <span class="token keyword">boolean</span> durable<span class="token punctuation">,</span> DeliverCallback deliverCallback<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
          	Connection connection <span class="token operator">=</span> factory<span class="token punctuation">.</span><span class="token function">newConnection</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
          	Channel channel <span class="token operator">=</span> connection<span class="token punctuation">.</span><span class="token function">createChannel</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
          	channel<span class="token punctuation">.</span><span class="token function">queueDeclare</span><span class="token punctuation">(</span>queue<span class="token punctuation">,</span> durable<span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> null<span class="token punctuation">)</span><span class="token punctuation">;</span>
          	channel<span class="token punctuation">.</span><span class="token function">basicConsume</span><span class="token punctuation">(</span>queue<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> deliverCallback<span class="token punctuation">,</span> consumerTag <span class="token operator">-</span><span class="token operator">&gt;</span> <span class="token punctuation">{<!-- --></span>
          	<span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
          <span class="token punctuation">}</span>
          

            让这个接收者和消费者的durable都可以设置为true,如果不设定一致,则存在报错的情况

            6.启动三个消费者
            在这里插入图片描述

            7.启动生产者查看情况
            在这里插入图片描述
            在这里插入图片描述
            在这里插入图片描述
            发现发送消息到一个队列的时候自动广播到该exchange下的所有的queue中,并且每个消费者都获取了消息

            测试成功!

            4.总结

            1.通过测试发现Fanout类型的Exchange,如果发送消息到其内部的任何一个消息队列,都会自动广播到其他队列中,每个消费者都可以获取数据

            2.使用Fanout可以实现广播功能

            以上纯属个人见解,如有问题请联系本人!

            与[转帖]RabbitMQ:Exchange的Fanout类型的介绍和使用相似的内容:

            [转帖]RabbitMQ:Exchange的Fanout类型的介绍和使用

            https://blog.csdn.net/weixin_45492007/article/details/106095591 1.声明 当前的内容用于本人学习和使用Fanout类型的Exchange,主要理解其主要作用 2.Fanout Exchange的官方介绍 扇出交换机将消息路由到与其绑定的

            [转帖]RabbitMQ的Vhost,Exchange,Queue原理分析

            RabbitMQ的Vhost,Exchange,Queue原理分析https://www.cnblogs.com/zhengchunyuan/p/9253725.html Vhost分析 RabbitMQ的Vhost主要是用来划分不同业务模块。不同业务模块之间没有信息交互。 Vhost之间相互完全隔

            [转帖]RabbitMQ学习笔记06:Topics

            https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_five.html 前言 在上一篇博文中我们使用direct类型的exchange改善了我们的日志系统,但是它仍然有一定的限制,它没有办法基于多个条件路由消息。 我们可能不仅仅希望基于日志级别

            [转帖]RabbitMQ 死信队列

            https://blog.csdn.net/lamp_yang_3533/article/details/111463928 死信交换机 DLX(Dead-Letter-Exchange)可以称之为死信交换机,也称之为死信邮箱。 当消息在一个正常的业务队列中变成死信(dead message)之后,

            [转帖]如何选择RabbitMQ的消息保存方式?

            https://www.cnblogs.com/zhengchunyuan/p/10179677.html RabbitMQ对于queue中的message的保存方式有两种方式:disc和ram。如果采用disc,则需要对exchange/queue/delivery mode都要设置成durabl

            [转帖]RabbitMQ服务优化,修改最大连接数

            https://www.cnblogs.com/hoyeong/p/16242202.html RabbitMQ的优化RabbitMQ的连接数是压垮消息队列的一个重要的指标。所以在平时使用OpenStack平台的过程中,如果大量的用户同时创建虚拟机,会导致云平台创建报错,其实就是消息队列服务的崩溃。

            [转帖]RabbitMQ性能优化

            https://www.cnblogs.com/zhengchunyuan/p/9253728.html 修改rabbitmq.config文件 rabbitmq.config文件时rabbitmq的配置文件,他遵守Erlang配置文件定义。 rabbitmq.config文件位置: Unix $R

            [转帖]RabbitMQ 的重要概念(术语)

            Message 消息指的是 RabbitMQ 的队列中保存的数据。 Producer 消息的生产者,即 message publisher(sender),是指负责创建和发送消息的程序。 Vhost RabbitMQ 的虚拟主机,一个 broker 里可以开设多个 vhost,用作不同用户的权限分离

            [转帖]RabbitMQ基础概念详细介绍

            https://www.jianshu.com/p/e55e971aebd8 AMQP简介 AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦和通讯。 AMQP的主要

            [转帖]RabbitMQ 如何保证交换机中的消息不丢失

            我们知道,生产者会先将消息发送给交换机,但是如果交换机此时没有匹配到相关的队列时,交换机中的消息就会出现丢失的问题。 那么,如何保证交换机中的消息不丢失呢? mandatory 参数 当 basicPublish 方法的 mandatory 参数设为 true 时,如果交换器无法匹配到绑定的队列,那