当前位置:首页 > 科技  > 软件

SpringBatch高阶应用:大数据批处理框架实战指南

来源: 责编: 时间:2024-05-07 09:12:51 252观看
导读本篇文章主要内容:通过Spring Batch从一个库中读取数据进过处理后写入到另外一个库中。
1. 环境准备1.1 引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch

本篇文章主要内容:通过Spring Batch从一个库中读取数据进过处理后写入到另外一个库中。
9U628资讯网——每日最新资讯28at.com

1. 环境准备

1.1 引入依赖

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-jpa</artifactId></dependency>

2.2 配置Job

配置Job启动器9U628资讯网——每日最新资讯28at.com

@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;  jobLauncher.setJobRepository(userJobRepository) ;  return jobLauncher ;}

配置任务Repository存储元信息9U628资讯网——每日最新资讯28at.com

@BeanJobRepository userJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) {  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean() ;  factory.setDatabaseType("mysql") ;  factory.setTransactionManager(transactionManager) ;  factory.setDataSource(dataSource) ;  try {    factory.afterPropertiesSet() ;     return factory.getObject() ;  } catch (Exception e) {    throw new RuntimeException(e) ;  }}

配置ItemReader读取器9U628资讯网——每日最新资讯28at.com

@BeanItemReader<User> userReader(JobOperator jobOperator) throws Exception {  JpaPagingItemReaderBuilder<User> builder = new JpaPagingItemReaderBuilder<>() ;  builder.entityManagerFactory(entityManagerFactory) ;  // 每次分页查询多少条数据  builder.pageSize(10) ;  builder.queryString("select u from User u where u.uid <= 50") ;  builder.saveState(true) ;  builder.name("userReader") ;  return builder.build() ;}

配置数据源,该数据源是用来写入操作的9U628资讯网——每日最新资讯28at.com

public DataSource dataSource() {  HikariDataSource dataSource = new HikariDataSource() ;  dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/testjpa?serverTimezone=GMT%2B8&useSSL=false") ;  dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver") ;  dataSource.setUsername("root") ;  dataSource.setPassword("xxxooo") ;  return dataSource ;}

配置ItemWriter用来写入操作(当前库的数据写入到另外一个库,上面的数据源)9U628资讯网——每日最新资讯28at.com

@BeanItemWriter<User> userWriter() {  // 通过JDBC批量处理  JdbcBatchItemWriterBuilder<User> builder = new JdbcBatchItemWriterBuilder<>() ;  DataSource dataSource = dataSource() ;  builder.dataSource(dataSource) ;  builder.sql("insert into st (id, name, sex, mobile, age, birthday) values (?, ?, ?, ?, ?, ?)") ;  builder.itemPreparedStatementSetter(new ItemPreparedStatementSetter<User>() {    @Override    public void setValues(User item, PreparedStatement ps) throws SQLException {      ps.setInt(1, item.getUid()) ;      ps.setString(2, item.getName()) ;      ps.setString(3, item.getSex()) ;      ps.setString(4, item.getMobile()) ;      ps.setInt(5, item.getAge()) ;      ps.setObject(6, item.getBirthday()) ;    }  }) ;  return builder.build() ;}

配置ItemProcessor处理器,数据从当前库读取处理后经过处理后再写入另外的库中9U628资讯网——每日最新资讯28at.com

@BeanItemProcessor<User, User> userProcessor() {  return new ItemProcessor<User, User>() {    @Override    public User process(User item) throws Exception {      System.out.printf("%s - 开始处理数据:%s%n", Thread.currentThread().getName(), item.toString()) ;      // 模拟耗时操作      TimeUnit.SECONDS.sleep(1) ;      // 在这里你可以对数据进行相应的处理。      return item ;    }  } ;}

配置Step将ItemReader、ItemProcessor、ItemWriter串联在一起。9U628资讯网——每日最新资讯28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")    .<User, User>chunk(5)    .reader(userReader)    .processor(userProcessor)    .writer(userWriter)    .build() ;}

配置Job,Job是封装整个批处理流程的实体。在 Spring Batch 中,Job只是Step实例的容器。它将逻辑上属于一个流程的多个步骤组合在一起,并允许对所有步骤的全局属性(如可重启性)进行配置。作业配置包含:9U628资讯网——每日最新资讯28at.com

  • 简单的工作名称。
  • Step实例的定义和排序。
  • Job是否可重新启动。
@BeanJob userJob(Step userStep1, Step userStep2) {  return jobs.get("userJob").start(userStep1).build();}

以上是Spring Batch定义配置一个Job所需的核心组件。接下来会以上面的基础配置进行高阶知识点进行介绍。9U628资讯网——每日最新资讯28at.com

2. 高阶配置管理

2.1 通过Controller接口启动Job

@RequestMapping("/userJob")public class UserJobController {  @Resource  private JobLauncher userJobLauncher ;  @GetMapping("/start")  public Object start() throws Exception {    JobParameters jobParameters = new JobParameters() ;    this.userJobLauncher.run(userJob, jobParameters) ;    return "started" ;  }}

通过JobLauncher#run方法启动Job。当你调用该接口时,你会发现接口一直不会返回,一直阻塞,下图是Job的启动序列9U628资讯网——每日最新资讯28at.com

图片图片9U628资讯网——每日最新资讯28at.com

根据上图能知道,当你调用run方法后,会等待整个Job退出状态为FINISHED或者FAILED后才能结束。所以,你需要异步完成,以便 SimpleJobLauncher 立即返回给调用者。而正确的序列应该是如下:9U628资讯网——每日最新资讯28at.com

图片图片9U628资讯网——每日最新资讯28at.com

上图通过异步方式启动Job序列。
9U628资讯网——每日最新资讯28at.com

2.2 异步启动Job

@BeanTaskExecutor taskExecutor() {  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor() ;  taskExecutor.setThreadNamePrefix("spring_batch_launcher") ;  taskExecutor.setCorePoolSize(10) ;  taskExecutor.setMaxPoolSize(10) ;  taskExecutor.initialize() ;   return taskExecutor ;}@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;  jobLauncher.setJobRepository(userJobRepository) ;  jobLauncher.setTaskExecutor(taskExecutor()) ;  return jobLauncher ;}

通过上面配置后,Job启动将是异步的会直接返回JobExecution。9U628资讯网——每日最新资讯28at.com

2.3 重启Job

当一个Job正在执行,由于断电或者强制终止了程序。当程序恢复后你希望能够接着程序终止前的进度继续执行,这时候你需要进行如下的操作(本人没有发现有什么API能够操作的,可能文档没看仔细)。
9U628资讯网——每日最新资讯28at.com

当程序非正常终止是,下面两张表的状态都是STARTED,END_TIME为null
9U628资讯网——每日最新资讯28at.com

batch_job_execution表9U628资讯网——每日最新资讯28at.com

图片图片9U628资讯网——每日最新资讯28at.com

batch_step_execution表9U628资讯网——每日最新资讯28at.com

图片图片9U628资讯网——每日最新资讯28at.com

想要重新启动必须将上面的状态修改为STOPPED,END_TIME字段设置上值(是什么值无所谓)。9U628资讯网——每日最新资讯28at.com

然后我们就可以继续使用上面的Controller接口启动任务继续执行了。9U628资讯网——每日最新资讯28at.com

2.4 多线程执行Step

为了加快程序的执行,我们可以为Step配置线程池
9U628资讯网——每日最新资讯28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")    .<User, User>chunk(5)    .reader(userReader)    .processor(userProcessor)    .writer(userWriter)    // 配置线程池    .taskExecutor(taskExecutor())    .build() ;}

注意:Step中使用的任何池化资源(如数据源)都可能对并发性设置限制。请确保这些资源池至少与步骤中所需的并发线程数一样大。9U628资讯网——每日最新资讯28at.com

通过上面配置线程池后,你将在控制台看到如下输出。9U628资讯网——每日最新资讯28at.com

图片图片9U628资讯网——每日最新资讯28at.com

默认将有4个线程同时进行处理。可以通过如下配置进行调整9U628资讯网——每日最新资讯28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")      // ...      // 节流限制10,这里配置的大小应该与你的数据库连接池大小及使用的线程池核心线程数一致。      .throttleLimit(10)      .build() ;}

2.5 重复启动Job

要想重复启动Job,我们可以在启动Job时设置不同的JobParameters参数,只要参数不同那么就可以重复的启动Job。如下示例:9U628资讯网——每日最新资讯28at.com

@GetMapping("/start/{page}")public Object start(@PathVariable("page") Long page) throws Exception {  Map<String, JobParameter> parameters = new HashMap<>() ;  // 每次设置的参数值不同即可。  parameters.put("page", new JobParameter(page)) ;  JobParameters jobParameters = new JobParameters(parameters) ;  this.userJobLauncher.run(userJob, jobParameters) ;  return "started" ;}

以上是本篇文章的全部内容,希望对你有帮助。9U628资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-87012-0.htmlSpringBatch高阶应用:大数据批处理框架实战指南

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 架构设计中如何应对接口级故障?

下一篇: Web Components 取代 Vue?我觉得不太行!

标签:
  • 热门焦点
  • 小米官宣:2023年上半年出货量中国第一!

    今日早间,小米电视官方微博带来消息,称2023年小米电视上半年出货量达到了中国第一,同时还表示小米电视的巨屏风暴即将开始。“公布一个好消息2023年#小米电视上半年出货量中国
  • 影音体验是真的强 简单聊聊iQOO Pad

    大公司的好处就是产品线丰富,非常细分化的东西也能给你做出来,例如早先我们看到了新的vivo Pad2,之后我们又在iQOO Neo8 Pro的发布会上看到了iQOO的首款平板产品iQOO Pad。虽
  • 一文看懂为苹果Vision Pro开发应用程序

    译者 | 布加迪审校 | 重楼苹果的Vision Pro是一款混合现实(MR)头戴设备。Vision Pro结合了虚拟现实(VR)和增强现实(AR)的沉浸感。其高分辨率显示屏、先进的传感器和强大的处理能力
  • 2023年,我眼中的字节跳动

    此时此刻(2023年7月),字节跳动从未上市,也从未公布过任何官方的上市计划;但是这并不妨碍它成为中国最受关注的互联网公司之一。从2016-17年的抖音强势崛起,到2018年的&ldquo;头腾
  • 华为开发者大会2023日程公开:开设鸿蒙HarmonyOS 4体验区

    IT之家 7 月 31 日消息,华为今日公布了 HDC.Together 开发者大会 2023 的详细日程。整场大会将于 8 月 4 日-6 日之间举行,届时将发布最新一代鸿蒙 H
  • iQOO 11S屏幕细节公布:首发三星2K E6全感屏 安卓最好的直屏手机

    日前iQOO手机官方宣布,新一代电竞旗舰iQOO 11S将会在7月4日19:00正式与大家见面。随着发布时间的日益临近,官方关于该机的预热也更加密集,截至目前已
  • 回归OPPO两年,一加赢了销量,输了品牌

    成为OPPO旗下主打性能的先锋品牌后,一加屡创佳绩。今年618期间,一加手机全渠道销量同比增长362%,凭借一加 11、一加 Ace 2、一加 Ace 2V三款爆品,一加
  • 苹果140W USB-C充电器:采用氮化镓技术

    据10 月 30 日 9to5 Mac 消息报道,当苹果推出新的 MacBook Pro 2021 时,该公司还推出了新的 140W USB-C 充电器,附赠在 MacBook Pro 16 英寸机型的盒子里,也支
  • 2022爆款:ROG魔霸6 冰川散热系统持续护航

    喜逢开学季,各大商家开始推出自己的新产品,进行打折促销活动。对于忠实的端游爱好者来说,能够拥有一款梦寐以求的笔记本电脑是一件十分开心的事。但是现在的
Top