1.声明
当前的内容用于本人学习和使用Fanout类型的Exchange,主要理解其主要作用
2.Fanout Exchange的官方介绍
-
扇出交换机将消息路由到与其绑定的所有队列,并且路由键将被忽略
。如果将N个队列绑定到扇出交换,则将新消息发布到该交换时,会将消息的副本传递到所有N个队列。扇出交换机非常适合消息的广播路由
。 -
因为扇出交换将消息的副本发送到绑定到它的每个队列,所以它的用例非常相似:
2.1 大型多人在线(MMO)游戏可以将其用于排行榜更新或其他全球性事件
2.2 体育新闻网站可以使用扇出交换以近乎实时的方式向移动客户端分发得分更新
2.3 分布式系统可以广播各种状态和配置更新
2.4 群组聊天可以使用扇出交换在参与者之间分发消息(尽管AMQP没有内置的在线状态概念,因此XMPP可能是更好的选择)
个人理解就是:如果使用交换类型为Fanout类型的Exchange的时候,如果该Exchange中具有多个queue队列,如果向该Fanout类型的Exchange发送消息,那么所有的队列都可以获得该消息的副本,从而实现广播
3.模拟和实现该Fanout广播
1.首先创建Fanout类型的Exchange
当前的Durability使用默认值Durable
2.创建三个队列world1、world2、world3
3.将这三个队列绑定到fanoutTest这个Exchange上面
这里不添加任何Routing Key和Arguments
4.创建三个消费者
public class World1MessageReceiver {
private final static String QUEUE_NAME = "world1";
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span>String<span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
RabbitMqUtils mqUtils <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">RabbitMqUtils</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
mqUtils<span class="token punctuation">.</span><span class="token function">reciver</span><span class="token punctuation">(</span>QUEUE_NAME<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> <span class="token punctuation">(</span>consumerTag<span class="token punctuation">,</span> delivery<span class="token punctuation">)</span> <span class="token operator">-</span><span class="token operator">></span> <span class="token punctuation">{<!-- --></span>
System<span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">"【world1】==>"</span> <span class="token operator">+</span> <span class="token keyword">new</span> <span class="token class-name">String</span><span class="token punctuation">(</span>delivery<span class="token punctuation">.</span><span class="token function">getBody</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">"utf-8"</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
}
public class World2MessageReceiver {
private final static String QUEUE_NAME = "world2";
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span>String<span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
RabbitMqUtils mqUtils <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">RabbitMqUtils</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
mqUtils<span class="token punctuation">.</span><span class="token function">reciver</span><span class="token punctuation">(</span>QUEUE_NAME<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> <span class="token punctuation">(</span>consumerTag<span class="token punctuation">,</span> delivery<span class="token punctuation">)</span> <span class="token operator">-</span><span class="token operator">></span> <span class="token punctuation">{<!-- --></span>
System<span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">"【world2】==>"</span> <span class="token operator">+</span> <span class="token keyword">new</span> <span class="token class-name">String</span><span class="token punctuation">(</span>delivery<span class="token punctuation">.</span><span class="token function">getBody</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">"utf-8"</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
}
public class World3MessageReceiver {
private final static String QUEUE_NAME = "world3";
<span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span>String<span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
RabbitMqUtils mqUtils <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">RabbitMqUtils</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
mqUtils<span class="token punctuation">.</span><span class="token function">reciver</span><span class="token punctuation">(</span>QUEUE_NAME<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> <span class="token punctuation">(</span>consumerTag<span class="token punctuation">,</span> delivery<span class="token punctuation">)</span> <span class="token operator">-</span><span class="token operator">></span> <span class="token punctuation">{<!-- --></span>
System<span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">"【world3】==>"</span> <span class="token operator">+</span> <span class="token keyword">new</span> <span class="token class-name">String</span><span class="token punctuation">(</span>delivery<span class="token punctuation">.</span><span class="token function">getBody</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">"utf-8"</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
}
5.创建消息生产者
public class WorldMessageSender {
private final static String QUEUE_NAME = "world1";// 这个随便写
<span class="token annotation punctuation">@Test</span>
<span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">testSender</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
RabbitMqUtils mqUtils <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">RabbitMqUtils</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">int</span> i <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span> i <span class="token operator"><</span> <span class="token number">10</span><span class="token punctuation">;</span> i<span class="token operator">++</span><span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
mqUtils<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token string">"fanoutTest"</span><span class="token punctuation">,</span> QUEUE_NAME<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> <span class="token punctuation">(</span><span class="token string">"你好,世界!"</span> <span class="token operator">+</span> i<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
}
注意这里需要修改原来的RabbitMqUtils中的部分内容
public boolean send(final String exchange, final String queue, final String msg) {
return send(exchange, queue, false, msg);
}
<span class="token keyword">public</span> <span class="token keyword">boolean</span> <span class="token function">send</span><span class="token punctuation">(</span><span class="token keyword">final</span> String exchange<span class="token punctuation">,</span> <span class="token keyword">final</span> String queue<span class="token punctuation">,</span> <span class="token keyword">boolean</span> durable<span class="token punctuation">,</span> <span class="token keyword">final</span> String msg<span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
<span class="token keyword">try</span> <span class="token punctuation">(</span>Connection connection <span class="token operator">=</span> factory<span class="token punctuation">.</span><span class="token function">newConnection</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span> Channel channel <span class="token operator">=</span> connection<span class="token punctuation">.</span><span class="token function">createChannel</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
channel<span class="token punctuation">.</span><span class="token function">queueDeclare</span><span class="token punctuation">(</span>queue<span class="token punctuation">,</span> durable<span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> null<span class="token punctuation">)</span><span class="token punctuation">;</span>
channel<span class="token punctuation">.</span><span class="token function">basicPublish</span><span class="token punctuation">(</span>exchange<span class="token punctuation">,</span> queue<span class="token punctuation">,</span> null<span class="token punctuation">,</span> msg<span class="token punctuation">.</span><span class="token function">getBytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">return</span> <span class="token boolean">true</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span> <span class="token keyword">catch</span> <span class="token punctuation">(</span><span class="token class-name">IOException</span> <span class="token operator">|</span> TimeoutException e<span class="token punctuation">)</span> <span class="token punctuation">{<!-- --></span>
e<span class="token punctuation">.</span><span class="token function">printStackTrace</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token keyword">return</span> <span class="token boolean">false</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token punctuation">}</span>
<span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">reciver</span><span class="token punctuation">(</span><span class="token keyword">final</span> String queue<span class="token punctuation">,</span> DeliverCallback deliverCallback<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
<span class="token function">reciver</span><span class="token punctuation">(</span>queue<span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> deliverCallback<span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
<span class="token comment">// 修复问题</span>
<span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">reciver</span><span class="token punctuation">(</span><span class="token keyword">final</span> String queue<span class="token punctuation">,</span> <span class="token keyword">boolean</span> durable<span class="token punctuation">,</span> DeliverCallback deliverCallback<span class="token punctuation">)</span> <span class="token keyword">throws</span> Exception <span class="token punctuation">{<!-- --></span>
Connection connection <span class="token operator">=</span> factory<span class="token punctuation">.</span><span class="token function">newConnection</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
Channel channel <span class="token operator">=</span> connection<span class="token punctuation">.</span><span class="token function">createChannel</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
channel<span class="token punctuation">.</span><span class="token function">queueDeclare</span><span class="token punctuation">(</span>queue<span class="token punctuation">,</span> durable<span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> <span class="token boolean">false</span><span class="token punctuation">,</span> null<span class="token punctuation">)</span><span class="token punctuation">;</span>
channel<span class="token punctuation">.</span><span class="token function">basicConsume</span><span class="token punctuation">(</span>queue<span class="token punctuation">,</span> <span class="token boolean">true</span><span class="token punctuation">,</span> deliverCallback<span class="token punctuation">,</span> consumerTag <span class="token operator">-</span><span class="token operator">></span> <span class="token punctuation">{<!-- --></span>
<span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
让这个接收者和消费者的durable都可以设置为true,如果不设定一致,则存在报错的情况
6.启动三个消费者
7.启动生产者查看情况
发现发送消息到一个队列的时候自动广播到该exchange下的所有的queue中,并且每个消费者都获取了消息
测试成功!
4.总结
1.通过测试发现Fanout类型的Exchange,如果发送消息到其内部的任何一个消息队列,都会自动广播到其他队列中,每个消费者都可以获取数据
2.使用Fanout可以实现广播功能
以上纯属个人见解,如有问题请联系本人!