当前位置:首页 > 科技  > 软件

RabbitMQ工作模式-Routing路由模式

来源: 责编: 时间:2023-11-20 17:12:25 234观看
导读Routing路由模式1、模式说明路由模式特点:队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)。消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑

moW28资讯网——每日最新资讯28at.com

Routing路由模式

1、模式说明

路由模式特点:moW28资讯网——每日最新资讯28at.com

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)。
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。

moW28资讯网——每日最新资讯28at.com

图解:moW28资讯网——每日最新资讯28at.com

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

2。案例

在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。moW28资讯网——每日最新资讯28at.com

在写案例之前,我们首先定义一下需求:moW28资讯网——每日最新资讯28at.com

  • 生产者:发送两条消息,一条消息的用于插入数据,另一条消息用于更新数据。
  • 消费者1:接收插入数据的消息,进行数据插入。
  • 消费者2:接收更新数据的消息,进行数据更新。

(1)生产者

moW28资讯网——每日最新资讯28at.com

package com.lijw.producer;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/3 8:16 */public class Producer_Routing {    //交换机名称    static final String DIRECT_EXCHAGE = "direct_exchange";    //队列名称    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";    //队列名称    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接工厂        ConnectionFactory factory = new ConnectionFactory();        //2. 设置参数        factory.setHost("127.0.0.1"); // ip  默认值 localhost        factory.setPort(5672); //端口  默认值 5672        factory.setVirtualHost("/test"); //虚拟机 默认值 /        factory.setUsername("libai"); // 用户名 默认 guest        factory.setPassword("libai"); //密码 默认值 guest        //3. 创建连接 Connection        Connection connection = factory.newConnection();        //4. 创建Channel        Channel channel = connection.createChannel();        //5. 创建交换机        /*           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)           参数:            1. exchange:交换机名称            2. type:交换机类型                DIRECT("direct"):定向                FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。                TOPIC("topic") 通配符的方式                HEADERS("headers") 参数匹配            3. durable:是否持久化            4. autoDelete:自动删除            5. internal:内部使用。 一般false            6. arguments:参数        */        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT, true, false, false, null);        // 6.声明(创建)队列        /**         * 参数1:队列名称         * 参数2:是否定义持久化队列         * 参数3:是否独占本次连接         * 参数4:是否在不使用的时候自动删除队列         * 参数5:队列其它参数         */        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);        // 7. 绑定队列和交换机        /*            queueBind(String queue, String exchange, String routingKey)            参数:                1. queue:队列名称                2. exchange:交换机名称                3. routingKey:路由键,绑定规则                    如果交换机的类型为fanout ,routingKey设置为""         */        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");        //8. 发送消息至交换机,由交换机分发消息        // 发送信息        String message = "新增了商品。路由模式;routing key 为 insert " ;        /**         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage         * 参数2:路由key,简单模式可以传递队列名称         * 参数3:消息其它属性         * 参数4:消息内容         */        channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());        System.out.println("已发送消息:" + message);        // 发送信息        message = "修改了商品。路由模式;routing key 为 update" ;        /**         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage         * 参数2:路由key,简单模式可以传递队列名称         * 参数3:消息其它属性         * 参数4:消息内容         */        channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());        System.out.println("已发送消息:" + message);        //9. 释放资源        channel.close();        connection.close();    }}

执行发送消息:moW28资讯网——每日最新资讯28at.com

moW28资讯网——每日最新资讯28at.com

发送消息之后,我们来看看声明好的交换机:moW28资讯网——每日最新资讯28at.com

moW28资讯网——每日最新资讯28at.com

moW28资讯网——每日最新资讯28at.com

(2)消费者1:专门接收 insert 的消息

moW28资讯网——每日最新资讯28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_Routing1 {    //队列名称    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接工厂        ConnectionFactory factory = new ConnectionFactory();        //2. 设置参数        factory.setHost("127.0.0.1"); // ip  默认值 localhost        factory.setPort(5672); //端口  默认值 5672        factory.setVirtualHost("/test"); //虚拟机 默认值 /        factory.setUsername("libai"); // 用户名 默认 guest        factory.setPassword("libai"); //密码 默认值 guest        //3. 创建连接 Connection        Connection connection = factory.newConnection();        //4. 创建Channel        Channel channel = connection.createChannel();        //5. 创建队列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        参数:            1. queue:队列名称            2. durable:是否持久化,当mq重启之后,还在            3. exclusive:                * 是否独占。只能有一个消费者监听这队列                * 当Connection关闭时,是否删除队列            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉            5. arguments:参数。         */        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        参数:            1. queue:队列名称            2. autoAck:是否自动确认            3. callback:回调对象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回调方法,当收到消息后,会自动执行该方法                1. consumerTag:标识                2. envelope:获取一些信息,交换机,路由key...                3. properties:配置信息                4. body:数据             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收队列的数据 body: " + new String(body));            }        };        channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer);        //不需要关闭资源,因为消费者需要持续监听队列信息    }}

(3)消费者2:专门接收 update 的消息

moW28资讯网——每日最新资讯28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_Routing2 {    //队列名称    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接工厂        ConnectionFactory factory = new ConnectionFactory();        //2. 设置参数        factory.setHost("127.0.0.1"); // ip  默认值 localhost        factory.setPort(5672); //端口  默认值 5672        factory.setVirtualHost("/test"); //虚拟机 默认值 /        factory.setUsername("libai"); // 用户名 默认 guest        factory.setPassword("libai"); //密码 默认值 guest        //3. 创建连接 Connection        Connection connection = factory.newConnection();        //4. 创建Channel        Channel channel = connection.createChannel();        //5. 创建队列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        参数:            1. queue:队列名称            2. durable:是否持久化,当mq重启之后,还在            3. exclusive:                * 是否独占。只能有一个消费者监听这队列                * 当Connection关闭时,是否删除队列            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉            5. arguments:参数。         */        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        参数:            1. queue:队列名称            2. autoAck:是否自动确认            3. callback:回调对象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回调方法,当收到消息后,会自动执行该方法                1. consumerTag:标识                2. envelope:获取一些信息,交换机,路由key...                3. properties:配置信息                4. body:数据             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收队列的数据 body: " + new String(body));            }        };        channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer);        //不需要关闭资源,因为消费者需要持续监听队列信息    }}

3、测试

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。moW28资讯网——每日最新资讯28at.com

  • 消费者1 收到了 insert 的消息

moW28资讯网——每日最新资讯28at.com

  • 消费者2 收到了 update 的消息

moW28资讯网——每日最新资讯28at.com

4、小结

Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。moW28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-31551-0.htmlRabbitMQ工作模式-Routing路由模式

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 快速了解 CSS @starting-style 规则

下一篇: 通过实例理解Web应用跨域问题

标签:
  • 热门焦点
  • K8S | Service服务发现

    K8S | Service服务发现

    一、背景在微服务架构中,这里以开发环境「Dev」为基础来描述,在K8S集群中通常会开放:路由网关、注册中心、配置中心等相关服务,可以被集群外部访问;图片对于测试「Tes」环境或者
  • 如何正确使用:Has和:Nth-Last-Child

    如何正确使用:Has和:Nth-Last-Child

    我们可以用CSS检查,以了解一组元素的数量是否小于或等于一个数字。例如,一个拥有三个或更多子项的grid。你可能会想,为什么需要这样做呢?在某些情况下,一个组件或一个布局可能会
  • 从零到英雄:高并发与性能优化的神奇之旅

    从零到英雄:高并发与性能优化的神奇之旅

    作者 | 波哥审校 | 重楼作为公司的架构师或者程序员,你是否曾经为公司的系统在面对高并发和性能瓶颈时感到手足无措或者焦头烂额呢?笔者在出道那会为此是吃尽了苦头的,不过也得
  • 品牌洞察丨服务本地,美团直播成效几何?

    品牌洞察丨服务本地,美团直播成效几何?

    来源:17PR7月11日,美团App首页推荐位出现&ldquo;美团直播&rdquo;的固定入口。在直播聚合页面,外卖&ldquo;神枪手&rdquo;直播间、美团旅行直播间、美团买菜直播间等均已上线,同时
  • 年轻人的“职场羞耻感”,无处不在

    年轻人的“职场羞耻感”,无处不在

    作者:冯晓亭 陶 淘 李 欣 张 琳 马舒叶来源:燃次元&ldquo;人在职场,应该选择什么样的着装?&rdquo;近日,在网络上,一个与着装相关的帖子引发关注,在该帖子里,一位在高级写字楼亚洲金
  • 造车两年股价跌六成,小米的估值逻辑变了吗?

    造车两年股价跌六成,小米的估值逻辑变了吗?

    如果从小米官宣造车后的首个交易日起持有小米集团的股票,那么截至2023年上半年最后一个交易日,投资者将浮亏59.16%,同区间的恒生科技指数跌幅为52.78%
  • 小米MIX Fold 3配置细节曝光:搭载领先版骁龙8 Gen2+罕见5倍长焦

    小米MIX Fold 3配置细节曝光:搭载领先版骁龙8 Gen2+罕见5倍长焦

    这段时间以来,包括三星、一加、荣耀等等有不少品牌旗下的最新折叠屏旗舰都得到了不少爆料,而小米新一代折叠屏旗舰——小米MIX Fold 3此前也屡屡被传
  • 英特尔Xe-HP项目终止,将专注Xe-HPC/HPG系列显卡

    英特尔Xe-HP项目终止,将专注Xe-HPC/HPG系列显卡

    据10 月 31 日消息报道,英特尔高级副总裁兼加速计算系统和图形事业部总经理 表示,Xe-HP“ Arctic Sound” 系列服务器 GPU 已经应用于 oneAPI devcloud 云服
  • 微软发布Windows 11新版 引入全新任务栏状态

    微软发布Windows 11新版 引入全新任务栏状态

    近日,微软发布了Windows 11新版,而Build 22563更新主要引入了几周前曝光的平板模式任务栏等,系统更流畅了。更新中,Windows 11加入了专门针对平板优化的任务栏
Top