大家好,欢迎来到IT知识分享网。
一、基本介绍
1.springbatch 是什么?
Spring Batch 是一个用于批处理应用程序开发的开源框架。它提供了一套强大的工具和组件,用于处理大规模数据处理、ETL(抽取、转换和加载)操作、定时任务等。
Spring Batch 构建在 Spring Framework 的基础之上,利用了 Spring 的依赖注入、事务管理、异常处理等特性,使得批处理应用程序的开发更加简单和灵活。
2.springbatch 有哪些核心组件
①.JobLauncher(作业启动器)
用于启动和执行批处理作业的组件。通过 JobLauncher 可以调度和触发 Job 的执行,可以根据需要配置触发 Job 的方式,如定时触发、手动触发等。
②.Job(作业)
一个完整的批处理任务,包含多个 Step。Job 定义了整个批处理任务的执行顺序和依赖关系。可以通过 JobLauncher 启动和执行 Job。
③.Step(步骤)
批处理任务的一部分,包含了读取、处理和写入数据的逻辑。Step 由 ItemReader、ItemProcessor 和 ItemWriter 组成。Step 定义了读取器、处理器和写入器,并可以设置其他属性如事务管理、错误处理策略等。
④.ItemReader(数据读取器)
从数据源中读取数据的组件。常见的内置实现包括 JdbcCursorItemReader(从数据库读取数据)、FlatFileItemReader(从文件读取数据)、JmsItemReader(从消息队列读取数据)等。也可以自定义 ItemReader 来满足特定需求。
⑤.ItemProcessor(数据处理器)
对读取到的数据进行处理、转换或过滤的组件。可以进行任意的业务逻辑操作。可以根据需要添加一个或多个 ItemProcessor,并将它们链接在一起形成处理链。
⑤.ItemWriter(数据写入器):
将处理后的数据写入目标系统(比如数据库、文件等)的组件。常见的内置实现包括 JdbcBatchItemWriter(写入数据库)、FlatFileItemWriter(写入文件)、JmsItemWriter(写入消息队列)等。也可以自定义 ItemWriter 来实现特定的写入逻辑。
⑥.JobRepository(作业仓库)
用于管理批处理任务的元数据和状态信息。JobRepository 负责将作业执行的元数据存储到持久化存储介质(如数据库)中,并提供对这些元数据的操作接口。它还支持事务管理,保证批处理任务的一致性和可靠性。通过使用 JobRepository,可以实现断点续传、重试机制和错误处理等功能。
二、步骤概览
说明:这边实现的需求是,使用批处理组件,从文件中读取数据,经过批处理组件处理,最后保存至数据库中,步骤概览是基于该需求进行实现的。
三、步骤说明
1.引入依赖包
- 批处理依赖包:spring-boot-starter-batch
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> <exclusions> <!-- 排除内置的数据库 --> <exclusion> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> </exclusion> </exclusions> </dependency>
- mybatisplus 依赖包:mybatis-plus-boot-starter,引入它是由于我们数据处理后需要保存到业务库。
<dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.5.5</version> </dependency>
- mysql 驱动包:mysql-connector-java
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency>
2.配置集成参数
在 application.yml 配置文件中,定义集成spring batch 组件的配置参数。
spring: batch: job: #设置为 false -需要jobLaucher.run执行 enabled: true jdbc: initialize-schema: always
3.启用批处理
在 springboot 启用类上添加注解 @EnableBatchProcessing 启用批处理。
4.配置作业步骤
①.数据读取
创建一个数据读取器,用于从文件中读取数据。
- Student:学生信息实体
@Data @TableName(value = "sys_student") public class Student implements Serializable { @TableField(exist = false) private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private Long id; private String name; private Integer age; private Integer sex; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date birthday; }
- /resources/static/student.txt:学生信息文件
"id","name","age","sex","birthday" "1","Jin Xiaoming","10","1","2015-03-08 10:59:21" "2","Yau Ka Fai","11","1","2005-08-21 01:13:52" "3","Xu Zhiyuan","12","1","2006-04-12 04:01:10" "4","Wu Hok Yau","20","1","2018-04-29 00:16:52" "5","Shi Ziyi","20","1","2023-02-20 05:20:14" "6","Koo Wing Fat","20","0","2012-09-13 06:47:16" "7","Takeda Tsubasa","17","0","2002-05-03 15:31:37" "8","Keith Jones","21","0","2009-02-24 03:56:21" "9","Lillian Vargas","18","0","2001-03-02 00:19:56" "10","Lai Ka Keung","29","0","2009-10-04 03:03:30" "1149","Sakai Momoka","14","1","2013-10-09 00:00:00" "1150","Han Jiehong","14","0","2021-07-17 00:00:00" "1151","Yuan Xiaoming","15","0","2015-02-05 00:00:00" "1152","Don Gonzalez","18","0","2008-08-31 00:00:00" "1153","Chin Chieh Lun","15","0","2011-02-04 00:00:00"
- StudentFileItemReaderBuilder:学生信息文件读取器构造者,这边使用了组件内置的 FlatFileItemReader 文件读取器
@Component public class StudentFileItemReaderBuilder { public ItemReader<Student> build() { FlatFileItemReader<Student> reader = new FlatFileItemReader<>(); // 设置文件资源地址 reader.setResource(new ClassPathResource("static/student.txt")); // 忽略第一行 reader.setLinesToSkip(1); // AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取, // 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); // 设置属性名,类似于表头 tokenizer.setNames("id", "name", "age", "sex", "birthday"); // 将每行数据转换为TestData对象 DefaultLineMapper<Student> mapper = new DefaultLineMapper<>(); // 设置LineTokenizer mapper.setLineTokenizer(tokenizer); // 设置映射方式,即读取到的文本怎么转换为对应实体 mapper.setFieldSetMapper(fieldSet -> { Student student = new Student(); student.setId(fieldSet.readLong("id")); student.setName(fieldSet.readString("name")); student.setAge(fieldSet.readInt("age")); student.setSex(fieldSet.readInt("sex")); student.setBirthday(fieldSet.readDate("birthday")); return student; }); reader.setLineMapper(mapper); return reader; } }
②.数据处理
实现数据处理器,需要实现 ItemProcessor接口。
- StudentProcessor:学生信息处理器,用于对读取到的数据进行处理,案例中我们进行了简单的数据过滤处理。
@Component public class StudentProcessor implements ItemProcessor<Student, Student> { @Override public Student process(Student student) throws Exception { // 将性别为1的都设置为0 if (student.getSex() == 1) { student.setSex(0); } return student; } }
③.数据写入
实现数据写入器,需要实现ItemWriter接口,案例中使用集成的mybatisplus 批量到mysql中。
@Component public class StudentItemWriter implements ItemWriter<Student> { @Resource private StudentMapper mapper; @Override public void write(List<? extends Student> list) throws Exception { mapper.batchInsert(list); } }
5.配置作业
配置作业即定义Job 和对应的 Step,定义批处理任务的执行流程。
①.作业监听器
定义作业监听器,我们可以通过它监听作业执行开始和执行结束,并打印作业执行耗时。
- StudentReaderJobListener:作业监听器
@Slf4j @Component public class StudentReaderJobListener implements JobExecutionListener { private LocalDateTime start; private LocalDateTime end; @Override public void beforeJob(JobExecution jobExecution) { start = LocalDateTime.now(); log.info("student reader job start..."); } @Override public void afterJob(JobExecution jobExecution) { end = LocalDateTime.now(); log.info("student reader job end,cost {} ms", ChronoUnit.MILLIS.between(start, end)); } }
②.定义作业
定义作业任务,设置其作业名称、监听器、作业步骤(由数据读取器、数据处理器、数据写入器组成)。
- StudentReaderJobBatchConfig:作业任务配置
@Slf4j @Configuration public class StudentReaderJobBatchConfig { // 任务构建工厂 @Resource private JobBuilderFactory jobFactory; // 步骤构建工厂 @Resource private StepBuilderFactory stepFactory; // 读取构造器 @Resource private StudentFileItemReaderBuilder readerBuilder; // 信息处理器 @Resource private StudentProcessor studentProcessor; // 信息写入器 @Resource private StudentItemWriter studentWriter; // 任务监听器 @Resource private StudentReaderJobListener jobListener; @Bean public Job studentReaderJob() { return jobFactory.get("studentReaderJob") .incrementer(new RunIdIncrementer()) .listener(jobListener) .start(studentReaderJobStep()) .build(); } //任务步骤 private Step studentReaderJobStep() { return stepFactory.get("studentReaderJobStep") .<Student, Student>chunk(10) .reader(readerBuilder.build()) .processor(studentProcessor) .writer(studentWriter) .build(); } }
四、代码测试
1. 测试代码
@SpringBootTest public class ApiTest { // 作业启动器 @Resource public JobLauncher jobLauncher; // 作业任务 @Resource private Job studentReaderJob; @Test public void test_studentJob_execute() throws Exception { // 启动参数,可按需设置 JobParameters jobParameters = new JobParametersBuilder() //.addString("time", String.valueOf(System.currentTimeMillis())) .toJobParameters(); jobLauncher.run(studentReaderJob, jobParameters); } }
2.测试结果
- 后台日志
- 数据入库结果
- 批处理执行记录表,集成组件后会在业务库中生成并记录作业信息。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/108698.html