RabbitMQ使用教程(超详细)
文章摘要
RabbitMQ是基于AMQP协议的消息队列,用于系统间异步通信和解耦。本教程涵盖其五种工作模式:简单队列、Work公平分发、订阅、路由和主题模式,并提供了Spring与SpringBoot集成RabbitMQ的环境搭建及代码示例。
### RabbitMQ实战教程 [](https://blog.csdn.net/wd520521/article/details/110139307) ### [](https://blog.csdn.net/wd520521/article/details/110139307)[](https://blog.csdn.net/wd520521/article/details/110139307)1.什么是MQ * 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。 其主要用途:不同进程Process/线程Thread之间通信。 为什么会产生消息队列?有几个原因: * 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个; * 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列; * 关于消息队列的详细介绍请参阅: [《Java帝国之消息队列》](https://mp.weixin.qq.com/s?__biz=MzAxOTc0NzExNg==&mid=2665513507&idx=1&sn=d6db79c1ae03ba9260fb0fb77727bb54&chksm=80d67a60b7a1f376e7ad1e2c3276e8b565f045b1c7e21ef90926f69d99f969557737eb5d8128&mpshare=1&scene=1&srcid=1019awkBx8kaLyFohcuW4Ee7) [《一个故事告诉你什么是消息队列》](https://github.com/jasonGeng88/blog/blob/master/201705/MQ.md) [《到底什么时候该使用MQ》](https://mp.weixin.qq.com/s?__biz=MjM5ODYxMDA5OQ==&mid=2651960012&idx=1&sn=c6af5c79ecead98daa4d742e5ad20ce5&chksm=bd2d07108a5a8e0624ae6ad95001c4efe09d7ba695f2ddb672064805d771f3f84bee8123b8a6&mpshare=1&scene=1&srcid=04054h4e90lz5Qc2YKnLNuvY) * MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。 * **本教程pdf及代码下载地址**: 代码:[https://download.csdn.net/download/zpcandzhj/10585077](https://download.csdn.net/download/zpcandzhj/10585077) 教程:[https://download.csdn.net/download/zpcandzhj/10585092](https://download.csdn.net/download/zpcandzhj/10585092) ### [](https://blog.csdn.net/wd520521/article/details/110139307)[](https://blog.csdn.net/wd520521/article/details/110139307)2.RabbitMQ [](https://blog.csdn.net/wd520521/article/details/110139307)2.1.RabbitMQ的简介 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/39989e4979654e9dae33119dcdedf994.png) 开发语言:Erlang – 面向并发的编程语言。 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/38d973fed84e444281e7d0c0c2c69a75.png) 2.1.1.AMQP AMQP是消息队列的一个协议。 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/ab4611f446994cd7bad4c3fcd1794b46.png) [](https://blog.csdn.net/wd520521/article/details/110139307)2.2.官网 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/91f36d8cf55b43369da3f0b37e9937b6.png) [](https://blog.csdn.net/wd520521/article/details/110139307)2.3.MQ的其他产品 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/9add0f7dcca64b15895e81cf3ad3eb58.png) [](https://blog.csdn.net/wd520521/article/details/110139307)2.4.学习5种队列 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/044ba1d483fa4740aa03528621712d81.png) [](https://blog.csdn.net/wd520521/article/details/110139307)2.5.安装文档 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/46c6ac9852a04c24a36087b9072f4fbe.png) ### [](https://blog.csdn.net/wd520521/article/details/110139307)[](https://blog.csdn.net/wd520521/article/details/110139307)3.搭建RabbitMQ环境 [](https://blog.csdn.net/wd520521/article/details/110139307)3.1.下载 下载地址:http://www.rabbitmq.com/download.html [](https://blog.csdn.net/wd520521/article/details/110139307)3.2.windows下安装 3.2.1.安装Erlang 下载:http://www.erlang.org/download/otp\_win64\_17.3.exe 安装: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/e6cf133b0c4b4c4583d47c77160589df.png) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/60def10c7f0d4ecc9a6e563abb0474f8.png) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/2aebd12375e54711b35b6f54c978d2fd.png) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/dd7936b8eab64151bb6fa4da59e4f415.png) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/f5a49577d981436184425e83c8477df3.png) 安装完成。 3.2.2.安装RabbitMQ ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/ee34c09673ec4cea84cf1f0019bb72b0.png) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/e1d2a0a45f38482c9b3028b1503a88a3.png) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/3232a0bb36c7447db11fcb2a117b0609.png) 安装完成。 开始菜单里出现如下选项: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/5da97d546a524320aadac9f728fdd23b.png) 启动、停止、重新安装等。 3.2.3.启用管理工具 1、双击![这里写图片描述](https://oss.120120.top/blog/2024/01/08/a9522d0c910943eab5344ea83edffde3.png) 2、进入C:\\Program Files (x86)\\RabbitMQ Server\\rabbitmq\_server-3.4.1\\sbin输入命令: rabbitmq-plugins enable rabbitmq\_management ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/ed40ec74dd9e4fdc9161edfba3cad123.png) 这样就启动了管理工具,可以试一下命令: 停止:net stop RabbitMQ 启动:net start RabbitMQ 3、在浏览器中输入地址查看:http://127.0.0.1:15672/ ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/ad3a0589049145fabac8d9274759577d.png) 4、使用默认账号登录:guest/ guest [](https://blog.csdn.net/wd520521/article/details/110139307)3.3.Linux下安装 3.3.1.安装Erlang 3.3.2.添加yum支持 cd /usr/local/src/ mkdir rabbitmq cd rabbitmq wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm rpm --import http://packages.erlang-solutions.com/rpm/erlang\_solutions.asc 使用yum安装: sudo yum install erlang ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/a9da10792233449e883e13c394532770.png) 3.3.3.安装RabbitMQ 上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/ 安装: rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm 3.3.4.启动、停止 service rabbitmq-server start service rabbitmq-server stop service rabbitmq-server restart 3.3.5.设置开机启动 chkconfig rabbitmq-server on 3.3.6.设置配置文件 cd /etc/rabbitmq cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/ mv rabbitmq.config.example rabbitmq.config 3.3.7.开启用户远程访问 vi /etc/rabbitmq/rabbitmq.config ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/9bb5182e46a14aa0bb70a0da0dc94c3d.png) 注意要去掉后面的逗号。 3.3.8.开启web界面管理工具 rabbitmq-plugins enable rabbitmq\_management service rabbitmq-server restart 3.3.9.防火墙开放15672端口 /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT /etc/rc.d/init.d/iptables save [](https://blog.csdn.net/wd520521/article/details/110139307)3.4.安装的注意事项 1、推荐使用默认的安装路径 2、系统用户名必须是英文 Win10改名字非常麻烦,具体方法百度 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/b2c864dcea6946e1b9d47fcc20798650.png) 3、计算机名必须是英文 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/7cf3e7ecff334127b52fd3c9ec4804d3.png) 4、系统的用户必须是管理员 如果安装失败应该如何解决: 1、重装系统 – 不推荐 2、将RabbitMQ安装到linux虚拟机中 a)推荐 3、使用别人安装好的RabbitMQ服务 a)只要给你开通一个账户即可。 b)使用公用的RabbitMQ服务,在192.168.50.22 c)推荐 常见错误: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/dc5ffff9de014b81ace1d13a546aab08.png) [](https://blog.csdn.net/wd520521/article/details/110139307)3.5.安装完成后操作 1、系统服务中有RabbitMQ服务,停止、启动、重启 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/413336cd03194ecea6011c6becaab730.png) 2、打开命令行工具 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/2ce9eb871c3a4c4e991c7afcee39cac8.png) 如果找不到命令行工具,直接cd到相应目录: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/80ad61fc01584ab5aa76711f14e79985.png) 输入命令rabbitmq-plugins enable rabbitmq\_management启用管理插件 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/6b50951a943c42e98c99142f81b03c79.png) 查看管理页面 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/e73fa2f05ae44c4dbbc541f89289600e.png) 通过默认账户 guest/guest 登录 如果能够登录,说明安装成功。 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/a26a8a32fb7b4ee194f1f8f9309972b4.png) ### [](https://blog.csdn.net/wd520521/article/details/110139307)[](https://blog.csdn.net/wd520521/article/details/110139307)4.添加用户 [](https://blog.csdn.net/wd520521/article/details/110139307)4.1.添加admin用户 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/78f70118e2ed48058af770de93ca7dfb.png) [](https://blog.csdn.net/wd520521/article/details/110139307)4.2.用户角色 1、超级管理员(administrator) 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 2、监控者(monitoring) 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) 3、策略制定者(policymaker) 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 4、普通管理者(management) 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 5、其他 无法登陆管理控制台,通常就是普通的生产者和消费者。 [](https://blog.csdn.net/wd520521/article/details/110139307)4.3.创建Virtual Hosts ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/5d0436b7f28c409a9c23c21c49dddb28.png) 选中Admin用户,设置权限: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/4e0f79d784b44cf58c416c087af819c8.png) 看到权限已加: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/b2a5b882e2cd4d7f814c12337d2a4af9.png) [](https://blog.csdn.net/wd520521/article/details/110139307)4.4.管理界面中的功能 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/7d7d63a129334cacb8dc13e9f77c9e9b.png) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/08aa1e5e988142978c71f16b1d2b4aac.png) ### [](https://blog.csdn.net/wd520521/article/details/110139307)[](https://blog.csdn.net/wd520521/article/details/110139307)5.学习五种队列 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/99165128d9da4878bda84f74a00fbd9e.png) [](https://blog.csdn.net/wd520521/article/details/110139307)5.1.导入my-rabbitmq项目 项目下载地址: [https://download.csdn.net/download/zpcandzhj/10585077](https://download.csdn.net/download/zpcandzhj/10585077) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/0c4dc0353eb446e3b654e20ba9eedf0b.png) [](https://blog.csdn.net/wd520521/article/details/110139307)5.2.简单队列 5.2.1.图示 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/6e880a14fb674ab2942042f3c51c6e99.png) P:消息的生产者 C:消息的消费者 红色:队列 生产者将消息发送到队列,消费者从队列中获取消息。 5.2.2.导入RabbitMQ的客户端依赖 ```java com.rabbitmq amqp-client 3.4.1 ``` 5.2.3.获取MQ的连接 ```java package com.zpc.rabbitmq.util; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; public class ConnectionUtil { public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("localhost"); //端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("testhost"); factory.setUsername("admin"); factory.setPassword("admin"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; } } ``` 5.2.4.生产者发送消息到队列 ```java package com.zpc.rabbitmq.simple; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "q_test_01"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //关闭通道和连接 channel.close(); connection.close(); } } ``` 5.2.5.管理工具中查看消息 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/4bf6452b2bc6418a8de4bca4675fc38a.png) 点击上面的队列名称,查询具体的队列中的信息: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/9c5887ef1d0547bfb6b6f06f41144dc8.png) 5.2.6.消费者从队列中获取消息 ```java package com.zpc.rabbitmq.simple; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "q_test_01"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } ``` [](https://blog.csdn.net/wd520521/article/details/110139307)5.3.Work模式 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/d34c3745dd374e07b0c13e0a4ddbb157.png) 5.3.1.图示 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/da9df76c309c42b2929d810e658a491a.png) 一个生产者、2个消费者。 一个消息只能被一个消费者获取。 5.3.2.消费者1 ```java package com.zpc.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发一条消息给消费者 //channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,false表示手动返回完成状态,true表示自动 channel.basicConsume(QUEUE_NAME, true, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [y] Received '" + message + "'"); //休眠 Thread.sleep(10); // 返回确认状态,注释掉表示使用自动确认模式 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } ``` 5.3.3.消费者2 ```java package com.zpc.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发一条消息给消费者 //channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,false表示手动返回完成状态,true表示自动 channel.basicConsume(QUEUE_NAME, true, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); // 休眠1秒 Thread.sleep(1000); //下面这行注释掉表示使用自动确认模式 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } ``` 5.3.4.生产者 向队列中发送100条消息。 ```java package com.zpc.rabbitmq.work; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { // 消息内容 String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 10); } channel.close(); connection.close(); } } ``` 5.3.5.测试 测试结果: 1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。 2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。 * 其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。 RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。 * 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。 basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。 * 2个概念 * 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。 * 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。 为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。 还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。 [](https://blog.csdn.net/wd520521/article/details/110139307)5.4.Work模式的“能者多劳” 打开上述代码的注释: ```java // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); ``` ```java //开启这行 表示使用手动确认模式 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); ``` 同时改为手动确认: ```java // 监听队列,false表示手动返回完成状态,true表示自动 channel.basicConsume(QUEUE_NAME, false, consumer); ``` 测试: 消费者1比消费者2获取的消息更多。 [](https://blog.csdn.net/wd520521/article/details/110139307)5.5.消息的确认模式 消费者从队列中获取消息,服务端如何知道消息已经被消费呢? 模式1:自动确认 只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。 模式2:手动确认 消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。 手动模式: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/fc4f2f1c05e8475e9a6a0f0cf0646673.png) 自动模式: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/390fba2a441c4da89e5721adcd856ae2.png) [](https://blog.csdn.net/wd520521/article/details/110139307)5.6.订阅模式 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/edeec5dcaf7347b8b5f5821cf5fc2073.png) 5.6.1.图示 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/289dce04b329436783e3c62a4647ce2a.png) 解读: 1、1个生产者,多个消费者 2、每一个消费者都有自己的一个队列 3、生产者没有将消息直接发送到队列,而是发送到了交换机 4、每个队列都要绑定到交换机 5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/435854ef984643c1a4949af55ef58767.png) 5.6.2.消息的生产者(看作是后台系统) 向交换机中发送消息。 ```java package com.zpc.rabbitmq.subscribe; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息内容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } ``` 注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。 5.6.3.消费者1(看作是前台系统) ```java package com.zpc.rabbitmq.subscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv { private final static String QUEUE_NAME = "test_queue_work1"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } ``` 5.6.4.消费者2(看作是搜索系统) ```java package com.zpc.rabbitmq.subscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv2] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } ``` 5.6.5.测试 测试结果: 同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。 在管理工具中查看队列和交换机的绑定关系: ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/1b2abe3b04dd4eda83707c23c528e563.png) [](https://blog.csdn.net/wd520521/article/details/110139307)5.7.路由模式 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/a463787593ae4d17a3a44ea1ef2761cd.png) 5.7.1.图示 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/cd24c5beb5bb4155b3f543ad94d2cdad.png) 5.7.2.生产者 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/5aeef26738174bfd93c6cded1a30a634.png) 5.7.3.消费者1(假设是前台系统) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/bae589f438de41db97e5e61461513e4f.png) 5.7.4.消费2(假设是搜索系统) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/b2afb5c07abc4cea9942dec7e8d68c50.png) [](https://blog.csdn.net/wd520521/article/details/110139307)5.8.主题模式(通配符模式) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/32078020951f44b898078c3eca5dd3c8.png) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/1dfeba756da84a518b205ca604b0f3ce.png) 5.8.1.图示 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/3a3bb63ccc6540b59ead6cf7622b6315.png) 同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。 5.8.2.生产者 ```java package com.zpc.rabbitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zpc.rabbitmq.util.ConnectionUtil; public class Send { private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息内容 String message = "Hello World!!"; channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } ``` 5.8.3.消费者1(前台系统) ```java package com.zpc.rabbitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv { private final static String QUEUE_NAME = "test_queue_topic_work_1"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv_x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } ``` 5.8.4.消费者2(搜索系统) ```java package com.zpc.rabbitmq.topic; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_topic_work_2"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv2_x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } ``` ### [](https://blog.csdn.net/wd520521/article/details/110139307)[](https://blog.csdn.net/wd520521/article/details/110139307)6.Spring-Rabbit [](https://blog.csdn.net/wd520521/article/details/110139307)6.1.Spring项目 http://spring.io/projects ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/049557d6963644059e7a1b5e5a24ec11.png) [](https://blog.csdn.net/wd520521/article/details/110139307)6.2.简介 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/963ba4c8e7454ef4904957127c61f052.png) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/180d0609ee874d0499746703b8098ea1.png) [](https://blog.csdn.net/wd520521/article/details/110139307)6.3.使用 6.3.1.消费者 ```java package com.zpc.rabbitmq.spring; /** * 消费者 * * @author Evan */ public class Foo { //具体执行业务的方法 public void listen(String foo) { System.out.println("\n消费者: " + foo + "\n"); } } ``` 6.3.2.生产者 ```java package com.zpc.rabbitmq.spring; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SpringMain { public static void main(final String... args) throws Exception { AbstractApplicationContext ctx = new ClassPathXmlApplicationContext( "classpath:spring/rabbitmq-context.xml"); //RabbitMQ模板 RabbitTemplate template = ctx.getBean(RabbitTemplate.class); //发送消息 template.convertAndSend("Hello, 鸟鹏!"); Thread.sleep(1000);// 休眠1秒 ctx.destroy(); //容器销毁 } } ``` 6.3.3.配置文件 1、定义连接工厂 ```java ``` 2、定义模板(可以指定交换机或队列) ```java ``` 3、定义队列、交换机、以及完成队列和交换机的绑定 ```java ``` 4、定义监听 ```java ``` 5、定义管理,用于管理队列、交换机等: ```java ``` 完整配置文件rabbitmq-context.xml ```java ``` [](https://blog.csdn.net/wd520521/article/details/110139307)6.4.持久化交换机和队列 ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/4fdfb09571424664b88357ea9f9be98b.png) 持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。 非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。 非持久化的性能高于持久化。 如何选择持久化?非持久化? – 看需求。 ### [](https://blog.csdn.net/wd520521/article/details/110139307)[](https://blog.csdn.net/wd520521/article/details/110139307)7.Spring集成RabbitMQ一个完整案例 创建三个系统A,B,C A作为生产者,B、C作为消费者(B,C作为web项目启动) 项目下载地址:[https://download.csdn.net/download/zpcandzhj/10585077](https://download.csdn.net/download/zpcandzhj/10585077) [](https://blog.csdn.net/wd520521/article/details/110139307)7.1.在A系统中发送消息到交换机 7.1.1.导入依赖 ```java 4.0.0 com.zpc myrabbitA 0.0.1-SNAPSHOT jar myrabbit org.springframework.amqp spring-rabbit 1.4.0.RELEASE com.alibaba fastjson 1.2.47 ``` 7.1.2.队列和交换机的绑定关系 实现: 1、在配置文件中将队列和交换机完成绑定 2、可以在管理界面中完成绑定 a)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启 b)管理更加灵活 c)更容易对绑定关系的权限管理,流程管理 本例选择第2种方式 7.1.3.配置 rabbitmq-context.xml ```java ``` 7.1.4.消息内容 方案: 1、消息内容使用对象做json序列化发送 a)数据大 b)有些数据其他人是可能用不到的 2、发送特定的业务字段,如id、操作类型 7.1.5.实现 生产者MsgSender.java: ```java package com.zpc.myrabbit; import com.alibaba.fastjson.JSON; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * 消息生产者 */ public class MsgSender { public static void main(String[] args) throws Exception { AbstractApplicationContext ctx = new ClassPathXmlApplicationContext( "classpath:spring/rabbitmq-context.xml"); //RabbitMQ模板 RabbitTemplate template = ctx.getBean(RabbitTemplate.class); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制 //发送消息 Map msg = new HashMap(); msg.put("type", "1"); msg.put("date", date); template.convertAndSend("type2", JSON.toJSONString(msg)); Thread.sleep(1000);// 休眠1秒 ctx.destroy(); //容器销毁 } } ``` [](https://blog.csdn.net/wd520521/article/details/110139307)7.2.在B系统接收消息 7.2.1.导入依赖 ```java 4.0.0 com.zpc myrabbitB 0.0.1-SNAPSHOT war myrabbit 4.1.3.RELEASE 1.2.46 com.rabbitmq amqp-client 3.4.1 org.springframework.amqp spring-rabbit 1.4.0.RELEASE org.springframework spring-webmvc ${spring.version} com.alibaba fastjson 1.2.47 ${project.artifactId} org.apache.tomcat.maven tomcat7-maven-plugin /testRabbit UTF-8 8081 ``` 7.2.2.配置 ```java ``` 7.2.3.具体处理逻辑 ```java public class Listener { //具体执行业务的方法 public void listen(String msg) { System.out.println("\n消费者B开始处理消息: " + msg + "\n"); } } ``` 7.2.4.在界面管理工具中完成绑定关系 选中定义好的交换机(exchange) ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/41bbdd59865048699e5d5804fa285b1d.png) 1)direct ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/1fb95af1bbaf44d2ad2ba31c46f6a7de.png) 2)fanout ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/4d3b7cb19a4049dcab3269c90e334a8f.png) 3)topic ![这里写图片描述](https://oss.120120.top/blog/2024/01/08/5b5b4029838c4ca4a4a8776333ae85d9.png) [](https://blog.csdn.net/wd520521/article/details/110139307)7.3.在C系统中接收消息 (和B系统配置差不多,无非是Q名和Q对应的处理逻辑变了) 7.3.1.配置 ```java ``` 7.3.2.处理业务逻辑 ```java public class Listener { //具体执行业务的方法 public void listen(String msg) { System.out.println("\n消费者C开始处理消息: " + msg + "\n"); } } ``` 7.3.3.在管理工具中绑定队列和交换机 见7.2.4 7.3.4.测试 分别启动B,C两个web应用,然后运行A的MsgSender主方法发送消息,分别测试fanout、direct、topic三种类型 ### [](https://blog.csdn.net/wd520521/article/details/110139307)[](https://blog.csdn.net/wd520521/article/details/110139307)8.Springboot集成RabbitMQ * springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp对消息各种支持。 代码下载地址:[https://download.csdn.net/download/zpcandzhj/10585077](https://download.csdn.net/download/zpcandzhj/10585077) [](https://blog.csdn.net/wd520521/article/details/110139307)8.1.简单队列 1、配置pom文件,主要是添加spring-boot-starter-amqp的支持 ```java org.springframework.boot spring-boot-starter-amqp ``` 2、配置application.properties文件 配置rabbitmq的安装地址、端口以及账户信息 ```java spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin ``` 3、配置队列 ```java package com.zpc.rabbitmq; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue queue() { return new Queue("q_hello"); } } ``` 4、发送者 ```java package com.zpc.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制 String context = "hello " + date; System.out.println("Sender : " + context); //简单对列的情况下routingKey即为Q名 this.rabbitTemplate.convertAndSend("q_hello", context); } } ``` 5、接收者 ```java package com.zpc.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } } ``` 6、测试 ```java package com.zpc.rabbitmq; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqHelloTest { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } } ``` [](https://blog.csdn.net/wd520521/article/details/110139307)8.2.多对多使用(Work模式) 注册两个Receiver: ```java package com.zpc.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_hello") public class HelloReceiver2 { @RabbitHandler public void process(String hello) { System.out.println("Receiver2 : " + hello); } } ``` ```java @Test public void oneToMany() throws Exception { for (int i=0;i<100;i++){ helloSender.send(i); Thread.sleep(300); } } ``` ```java public void send(int i) { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制 String context = "hello " + i + " " + date; System.out.println("Sender : " + context); //简单对列的情况下routingKey即为Q名 this.rabbitTemplate.convertAndSend("q_hello", context); } ``` [](https://blog.csdn.net/wd520521/article/details/110139307)8.3.Topic Exchange(主题模式) * topic 是RabbitMQ中最灵活的一种方式,可以根据routing\_key自由的绑定不同的队列 首先对topic规则配置,这里使用两个队列(消费者)来演示。 1)配置队列,绑定交换机 ```java package com.zpc.rabbitmq.topic; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicRabbitConfig { final static String message = "q_topic_message"; final static String messages = "q_topic_messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } /** * 声明一个Topic类型的交换机 * @return */ @Bean TopicExchange exchange() { return new TopicExchange("mybootexchange"); } /** * 绑定Q到交换机,并且指定routingKey * @param queueMessage * @param exchange * @return */ @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } } ``` 2)创建2个消费者 q\_topic\_message 和q\_topic\_messages ```java package com.zpc.rabbitmq.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_topic_message") public class Receiver1 { @RabbitHandler public void process(String hello) { System.out.println("Receiver1 : " + hello); } } ``` ```java package com.zpc.rabbitmq.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_topic_messages") public class Receiver2 { @RabbitHandler public void process(String hello) { System.out.println("Receiver2 : " + hello); } } ``` 3)消息发送者(生产者) ```java package com.zpc.rabbitmq.topic; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MsgSender { @Autowired private AmqpTemplate rabbitTemplate; public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context); } } ``` send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。 4)测试 ```java package com.zpc.rabbitmq.topic; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitTopicTest { @Autowired private MsgSender msgSender; @Test public void send1() throws Exception { msgSender.send1(); } @Test public void send2() throws Exception { msgSender.send2(); } } ``` [](https://blog.csdn.net/wd520521/article/details/110139307)8.4.Fanout Exchange(订阅模式) * Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。 1)配置队列,绑定交换机 ```java package com.zpc.rabbitmq.fanout; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { @Bean public Queue aMessage() { return new Queue("q_fanout_A"); } @Bean public Queue bMessage() { return new Queue("q_fanout_B"); } @Bean public Queue cMessage() { return new Queue("q_fanout_C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("mybootfanoutExchange"); } @Bean Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(aMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(bMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(cMessage).to(fanoutExchange); } } ``` 2)创建3个消费者 ```java package com.zpc.rabbitmq.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_fanout_A") public class ReceiverA { @RabbitHandler public void process(String hello) { System.out.println("AReceiver : " + hello + "/n"); } } ``` ```java package com.zpc.rabbitmq.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_fanout_B") public class ReceiverB { @RabbitHandler public void process(String hello) { System.out.println("BReceiver : " + hello + "/n"); } } ``` ```java package com.zpc.rabbitmq.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_fanout_C") public class ReceiverC { @RabbitHandler public void process(String hello) { System.out.println("CReceiver : " + hello + "/n"); } } ``` 3)生产者 ```java package com.zpc.rabbitmq.fanout; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MsgSenderFanout { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context); } } ``` 4)测试 ```java package com.zpc.rabbitmq.fanout; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitFanoutTest { @Autowired private MsgSenderFanout msgSender; @Test public void send1() throws Exception { msgSender.send(); } } ``` 结果如下,三个消费者都收到消息: AReceiver : hi, fanout msg CReceiver : hi, fanout msg BReceiver : hi, fanout msg ### [](https://blog.csdn.net/wd520521/article/details/110139307)[](https://blog.csdn.net/wd520521/article/details/110139307)9.总结 * 使用MQ实现商品数据的同步优势: 1、降低系统间耦合度 2、便于管理数据的同步(数据一致性) * 推荐阅读 [《RabbitMQ详解》](http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html) [《大型网站技术架构:核心原理与案例分析》](https://download.csdn.net/download/zpcandzhj/10584276) 本文转自 [https://blog.csdn.net/wd520521/article/details/110139307](https://blog.csdn.net/wd520521/article/details/110139307),如有侵权,请联系删除。
作者头像
admin
分享技术与生活
打赏作者

评论

暂无评论,快来抢沙发吧~