当前位置:首页 > 科技  > 软件

三分钟学会在 RabbitMQ 中实现发布订阅模式

来源: 责编: 时间:2024-03-28 09:28:45 277观看
导读在这个充满挑战和收获的60天学习之旅中,你将迅速提升成为一名全栈工程师。专注于Spring Boot框架,我们将深入研究高级特性,从项目初始化到微服务架构,再到性能优化和持续集成部署。无论你是初学者还是有一定经验的开发者,

在这个充满挑战和收获的60天学习之旅中,你将迅速提升成为一名全栈工程师。专注于Spring Boot框架,我们将深入研究高级特性,从项目初始化到微服务架构,再到性能优化和持续集成部署。无论你是初学者还是有一定经验的开发者,这个专题都将带你穿越从零到全面掌握Spring Boot的学习曲线。UfT28资讯网——每日最新资讯28at.com

UfT28资讯网——每日最新资讯28at.com

Day 32 ~ Springboot3.1.x|3分钟学会在 RabbitMQ 中实现发布订阅模式UfT28资讯网——每日最新资讯28at.com

实现发布与订阅消息模式

发布-订阅模式是一种消息传递方式,其中发送者(发布者)不会将消息直接发送到特定的接收者(订阅者)。发布者类别定义了哪些订阅者因为订阅者匹配了发布者的类别而接收消息。UfT28资讯网——每日最新资讯28at.com

以下是使用RabbitMQ实现发布-订阅模式的一种例子,我们将使用RabbitMQ的Fanout Exchange。UfT28资讯网——每日最新资讯28at.com

Producer

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class EmitLog {  private static final String EXCHANGE_NAME = "logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    try (Connection connection = factory.newConnection();         Channel channel = connection.createChannel()) {        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");        String message = "Log message...";        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));        System.out.println("Sent '" + message + "'");    }  }}

在上述代码的channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),我们声明一个名为"log"的exchange,同时我们定义其类型为"fanout",意味着它会将接收到的所有消息广播给所有它所知道的队列。UfT28资讯网——每日最新资讯28at.com

Consumer

每一个订阅者都需要拥有一个queue,因此,我们需要在客户端中创建queue。UfT28资讯网——每日最新资讯28at.com

import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogs {  private static final String EXCHANGE_NAME = "logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");    String queueName = channel.queueDeclare().getQueue();    channel.queueBind(queueName, EXCHANGE_NAME, "");    DeliverCallback deliverCallback = (consumerTag, delivery) -> {        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);        System.out.println("Received '" + message + "'");    };    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });  }}

在这个例子中,我们声明一个新的queue,并将其与"logs"的exchange绑定。然后我们定义了消息的接收以及处理方式。UfT28资讯网——每日最新资讯28at.com

处理消息发送失败的情况

在使用消息中间件的过程中,消息发送失败是无法避免的情况。因此,我们需要对此进行正确的处理以避免因此而导致的系统问题。UfT28资讯网——每日最新资讯28at.com

对于消息发送失败的处理,有以下几种常用的方案:UfT28资讯网——每日最新资讯28at.com

  • 重试: 对于有些暂时的问题,比如网络波动,可以通过简单的重试来解决。
  • 消息持久化:将消息存储在某处(例如数据库),只有当消息成功发送后,再删除它。
  • 死信队列:把无法处理的消息放入"死信队列",然后由专门的消费者来进行处理。

RabbitMQ中的消息确认(publisher confirms)和消费者应答(Consumer Acknowledgements)就是为了解决此类问题。UfT28资讯网——每日最新资讯28at.com

ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection()) {    Channel channel = connection.createChannel();    String queueName = "test";    String message = "Hello world";    try {        channel.queueDeclare(queueName, false, false, false, null);        channel.confirmSelect();        channel.basicPublish("", queueName, null, message.getBytes());        if (!channel.waitForConfirms()) {            System.out.println("消息发送失败");        }    } catch (Exception e) {        System.out.println("错误: " + e.getMessage());    }}

上述代码中执行channel.confirmSelect();后,当前channel被设置为publisher confirm模式。在此模式下,当消息被RabbitMQ成功接收后,会发送一个确认给生产者。如果RabbitMQ没有发送确认,那么生产者可以认定该消息发送失败。UfT28资讯网——每日最新资讯28at.com

结论:掌握发布-订阅模式和消息发送失败处理策略,对于掌握消息队列的使用至关重要,可为系统的稳定性和扩展性提供保障。UfT28资讯网——每日最新资讯28at.com

UfT28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-79990-0.html三分钟学会在 RabbitMQ 中实现发布订阅模式

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: SpringCloud项目开发中实用技巧总结

下一篇: 2024 年,这些 VS Code 插件可以卸载了!

标签:
  • 热门焦点
  • Find N3入网:最高支持16+1TB

    OPPO将于近期登场的Find N3折叠屏目前已经正式入网,型号为PHN110。本次Find N3在外观方面相比前两代有很大的变化,不再是小号的横向折叠屏,而是跟别的厂商一样采用了较为常见的
  • 摸鱼心法第一章——和配置文件说拜拜

    为了能摸鱼我们团队做了容器化,但是带来的问题是服务配置文件很麻烦,然后大家在群里进行了“亲切友好”的沟通图片图片图片图片对比就对比,简单对比下独立配置中心和k8s作为配
  • CSS单标签实现转转logo

    转转品牌升级后更新了全新的Logo,今天我们用纯CSS来实现转转的新Logo,为了有一定的挑战性,这里我们只使用一个标签实现,将最大化的使用CSS能力完成Logo的绘制与动画效果。新logo
  • SpringBoot中使用Cache提升接口性能详解

    环境:springboot2.3.12.RELEASE + JSR107 + Ehcache + JPASpring 框架从 3.1 开始,对 Spring 应用程序提供了透明式添加缓存的支持。和事务支持一样,抽象缓存允许一致地使用各
  • 谷歌KDD'23工作:如何提升推荐系统Ranking模型训练稳定性

    谷歌在KDD 2023发表了一篇工作,探索了推荐系统ranking模型的训练稳定性问题,分析了造成训练稳定性存在问题的潜在原因,以及现有的一些提升模型稳定性方法的不足,并提出了一种新
  • 梁柱接棒两年,腾讯音乐闯出新路子

    文丨田静 出品丨牛刀财经(niudaocaijing)7月5日,企鹅FM发布官方公告称由于业务调整,将于9月6日正式停止运营,这意味着腾讯音乐长音频业务走向消亡。腾讯在长音频领域还在摸索。为
  • 华为发布HarmonyOS 4:更好玩、更流畅、更安全

    在8月4日的华为开发者大会2023(HDC.Together)大会上,HarmonyOS 4正式发布。自2019年发布以来,HarmonyOS一直以用户为中心,经历四年多的发展HarmonyOS已
  • 苹果公司要求三星和LG Display生产「无边框」OLED iPhone显示屏

    据 The Elec 报道,苹果已要求其供应商为未来的 iPhone 型号开发「无边框」OLED 显示面板。苹果显然已要求三星和 LG Display 开发新的 OLED 显示面
  • 英特尔Xe HPG游戏显卡:拥有512EU,单风扇版本

    据10 月 30 日外媒 TheVerge 消息报道,英特尔 Xe HPG Arc Alchemist 的正面实被曝光,不仅拥有 512 EU 版显卡,还拥有 128EU 的单风扇版本。另外,这款显卡 PCB
Top