当前位置:首页 > 科技  > 软件

使用 Spring Boot 和 Kafka Streams 进行实时数据处理

来源: 责编: 时间:2023-10-13 14:37:17 355观看
导读Spring Boot 和 Apache Kafka Streams 是两个强大的工具,它们使开发人员能够创建可靠且可扩展的实时数据处理应用程序。在这篇文章中,我们将了解 Spring Boot 和 Kafka Streams 如何协同工作,如何利用流处理来发挥应用程

Spring Boot 和 Apache Kafka Streams 是两个强大的工具,它们使开发人员能够创建可靠且可扩展的实时数据处理应用程序。在这篇文章中,我们将了解 Spring Boot 和 Kafka Streams 如何协同工作,如何利用流处理来发挥应用程序的优势。还将探索交互式查询,这是一个相对较新且有趣的功能,为实时数据分析提供了新的机会。QK128资讯网——每日最新资讯28at.com

QK128资讯网——每日最新资讯28at.com

安装Kafka

Kafka可以从官方网站https://kafka.apache.org/downloads下载。一旦 Kafka 启动并运行,就创建一个主题。QK128资讯网——每日最新资讯28at.com

创建Spring Boot项目

创建一个新的 Spring Boot 项目,并且引入“Spring Web”和“Spring for Apache Kafka”两个依赖项。QK128资讯网——每日最新资讯28at.com

@SpringBootApplicationpublic class KafkaStreamsDemoApplication {    public static void main(String[] args) {        SpringApplication.run(KafkaStreamsDemoApplication.class, args);    }}

配置Kafka

接下来,在应用程序的 application.properties 文件中配置 Kafka 创建的主题和代理地址。QK128资讯网——每日最新资讯28at.com

spring.kafka.bootstrap-servers=localhost:9092spring.kafka.consumer.group-id=my-groupspring.kafka.consumer.auto-offset-reset=earliest

创建 Kafka 流处理器

下一步是构建一个 Kafka Streams 处理器,从“my-topic”读取消息并处理,然后将结果输出到另一个主题。使用 KStream API 来处理逻辑,如下:QK128资讯网——每日最新资讯28at.com

@Beanpublic Function<KStream<String, String>, KStream<String, String>> process() {    return input -> input            .mapValues(value -> value.toUpperCase())            .to("output-topic");}

交互式查询

交互式查询是 Kafka Streams 的创新新功能之一。借助此功能,可以立即查询 Kafka Streams 应用程序的状态存储。让我们看看如何使用交互式查询检索存储在状态存储中的大写消息的数量。QK128资讯网——每日最新资讯28at.com

@Autowiredprivate InteractiveQueryService interactiveQueryService;@GetMapping("/messageCount")public long getMessageCount() {  ReadOnlyKeyValueStore<String, Long> store = interactiveQueryService.getQueryableStore("message-count-store", QueryableStoreTypes.keyValueStore());     return store.get("uppercase-message-count");}

在此代码中,我们使用 InteractiveQueryService 来获取“message-count-store”的状态存储的句柄,然以查询该存储来获取大写消息的计数。QK128资讯网——每日最新资讯28at.com

发送数据到Kafka

在实际应用程序中,数据将从多个源发送到 Kafka。在本示例中,我们将使用一个简单的 Kafka 生产者来与“my-topic”进行通信。QK128资讯网——每日最新资讯28at.com

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void produceMessage(String message) {    kafkaTemplate.send("my-topic", message);}

使用处理后的数据

使用 Kafka 消费者最终从“output-topic”接收编辑后的数据,如下:QK128资讯网——每日最新资讯28at.com

@KafkaListener(topics = "output-topic", groupId = "my-group")public void consume(String message) {    System.out.println("Received: " + message);}

总结

在本文中,我们了解了如何使用 Spring Boot 和 Kafka Streams 创建用于实时数据处理的应用程序,并且引入了交互式查询这一有趣的新功能。借助交互式查询,可以通过处理实时数据以及实时查询 Kafka Streams 应用程序的状态来创建交互式动态应用程序。QK128资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-13310-0.html使用 Spring Boot 和 Kafka Streams 进行实时数据处理

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 推荐 13 个 IntelliJ IDEA 高手代码编辑技巧!

下一篇: C#.Net析构知识引申(CLR级的剖析)

标签:
  • 热门焦点
  • 俄罗斯:将审查iPhone等外国公司设备 保数据安全

    iPhone和特斯拉都属于在各自领域领头羊的品牌,推出的产品也也都是数一数二的,但对于一些国家而言,它们的产品可靠性和安全性还是在限制范围内。近日,俄罗斯联邦通信、信息技术
  • Redmi Buds 4开箱简评:才199还有降噪 可以无脑入

    在上个月举办的Redmi Note11T Pro系列新机发布会上,除了两款手机新品之外,Redmi还带来了两款TWS真无线蓝牙耳机产品,Redmi Buds 4和Redmi Buds 4 Pro,此前我们在Redmi Note11T
  • SpringBoot中使用Cache提升接口性能详解

    环境:springboot2.3.12.RELEASE + JSR107 + Ehcache + JPASpring 框架从 3.1 开始,对 Spring 应用程序提供了透明式添加缓存的支持。和事务支持一样,抽象缓存允许一致地使用各
  • 一篇聊聊Go错误封装机制

    %w 是用于错误包装(Error Wrapping)的格式化动词。它是用于 fmt.Errorf 和 fmt.Sprintf 函数中的一个特殊格式化动词,用于将一个错误(或其他可打印的值)包装在一个新的错误中。使
  • 学习JavaScript的10个理由...

    作者 | Simplilearn编译 | 王瑞平当你决心学习一门语言的时候,很难选择到底应该学习哪一门,常用的语言有Python、Java、JavaScript、C/CPP、PHP、Swift、C#、Ruby、Objective-
  • 之家push系统迭代之路

    前言在这个信息爆炸的互联网时代,能够及时准确获取信息是当今社会要解决的关键问题之一。随着之家用户体量和内容规模的不断增大,传统的靠"主动拉"获取信息的方式已不能满足用
  • 微信语音大揭秘:为什么禁止转发?

    大家好,我是你们的小米。今天,我要和大家聊一个有趣的话题:为什么微信语音不可以转发?这是一个我们经常在日常使用中遇到的问题,也是一个让很多人好奇的问题。让我们一起来揭开这
  • 破圈是B站头上的紧箍咒

    来源 | 光子星球撰文 | 吴坤谚编辑 | 吴先之每年的暑期档都少不了瞄准追剧女孩们的古偶剧集,2021年有优酷的《山河令》,2022年有爱奇艺的《苍兰诀》,今年却轮到小破站抓住了追
  • 微博大门常打开,迎接海外画师漂洋东渡

    作者:互联网那些事&ldquo;起猛了,我能看得懂日语了&rdquo;。&ldquo;为什么日本人说话我能听懂?&rdquo;&ldquo;中文不像中文,日语不像日语,但是我竟然看懂了&rdquo;&hellip;&hell
Top