2023年6月21日发(作者:)
详解SpringBoot和SpringBatch使⽤什么是Spring BatchSpring Batch 是⼀个轻量级的、完善的批处理框架,旨在帮助企业建⽴健壮、⾼效的批处理应⽤。Spring Batch是Spring的⼀个⼦项⽬,使⽤Java语⾔并基于Spring框架为基础开发,使的已经使⽤ Spring 框架的开发者或者企业更容易访问和利⽤企业服务。Spring Batch 提供了⼤量可重⽤的组件,包括了⽇志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于⼤数据量和⾼性能的批处理任务,Spring Batch 同样提供了⾼级功能和特性来⽀持,⽐如分区功能、远程功能。总之,通过 SpringBatch 能够⽀持简单的、复杂的和⼤数据量的批处理作业。Spring Batch 使⽤我们⾸先配置Spring Batch 在Spring Boot 中的使⽤,数据库⽤的是mysql,pom⽂件如下,因为Spring Boot 中的Spring Batch包含 hsqsldb 所以我们将其去除 spring-boot-starter-batch hsqldb spring-boot-starter-jdbc spring-boot-starter-web ate hibernate-validator mysql mysql-connector-java 5.1.21 spring-boot-starter-test test 配置好我们需要的实体类。页⾯就不展⽰了。如果有数据校验添加的话那么我们需要配置⾃定义的检验器。若果没有课略过该步骤public class CsvBeanValidator implements Validator,InitializingBean { private tor validator; @Override public void validate(T value) throws ValidationException { Set> constraintViolations=te(value); if(()>0){ StringBuilder message=new StringBuilder(); for(ConstraintViolation constraintViolation:constraintViolations){ (sage() +"n"); } throw new ValidationException(ng()); } } //在这⾥我们使⽤的是JSR-303校验数据,在此进⾏初始化 @Override public void afterPropertiesSet() throws Exception {
ValidatorFactory validatorFactory= efaultValidatorFactory(); validator=ontext().getValidator(); }}public class CsvItemProcessor extends ValidatingItemProcessor { @Override public Person process(Person item) throws ValidationException { s(item); // 在这⾥启动 然后才会调⽤我们⾃定义的校验器,否则不能通过 。 if (ion().equals("汉族")){ e("01"); }else{ ion("02"); } return item; }}进⾏job任务监听 ⾃定义类实现JobExecutionListener 即可long startTime; long endTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = tTimeMillis(); n("任务处理开始"); } @Override public void afterJob(JobExecution jobExecution) { endTime = tTimeMillis(); n("耗时多长时间:" + (endTime - startTime) + "ms"); n("任务处理结束"); }进⾏Spring Batch 的注⼊ ⽅法有xml⽂件注⼊bean ,在这⾥选择java注⼊@Configuration@EnableBatchProcessing //开启批处理public class CsvBatchConfig { /**1 ⾸先我们通过 FlatFileItemReader 读取我们需要的⽂件 通过setResource来实现 * 2 设置map 在这⾥通过先设置解析器 setLineTokenizer 来解析我们csv⽂件中的数 据 * 3 setFieldSetMapper 将我们需要的数据转化为我们的实体对象 存储 * 4 如果想 跳过前⾯的⼏⾏ 需要使⽤setLinesToSkip就可以实现
*/
@Bean public ItemReader reader() throws Exception { FlatFileItemReader reader = new FlatFileItemReader(); //1 ource(new ClassPathResource("")); //2 eMapper(new DefaultLineMapper() {{ //3 setLineTokenizer(new DelimitedLineTokenizer() {{ setNames(new String[] { "name","age", "nation" ,"address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(); }}); }}); esToSkip(3);
return reader; } @Bean public ItemProcessor processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 idator(csvBeanValidator()); //2 return processor; } /**
*写⼊数据到数据库中 * 1执⾏的sql 语句 2 设置数据源
*/ @Bean public ItemWriter writer(DataSource dataSource) {//1 JdbcBatchItemWriter writer = new JdbcBatchItemWriter(); //2 mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider()); String sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_l, :name, :age, :nation,:address)"; (sql); //3 aSource(dataSource); return writer; } // 作业的仓库 就是设置数据源 @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); aSource(dataSource); nsactionManager(transactionManager); abaseType("mysql"); return ect(); } //调度器 使⽤它来执⾏ 我们的批处理 @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); Repository(jobRepository(dataSource, transactionManager)); return jobLauncher; } //将监听器加⼊到job中 @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return ("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } //步骤绑定 reader 与writer ⼀次性处理65000条记录 @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer, ItemProcessor processor) { return stepBuilderFactory .get("step1") .chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator csvBeanValidator() { return new CsvBeanValidator(); }}在配置⽂件中 启动⾃动执⾏批处理 = job1,job2 #启动时要执⾏的Job,默认执⾏全部d=true #是否⾃动执⾏定义的Job,默认是d=true #是否初始化Spring Batch的数据库,默认为是=-prefix= #设置SpringBatch的数据库表的前缀项⽬汇总从 项⽬中我们可以看到 总的步骤就是 ⾸先读取我们需要实现的⽂件进⾏解析,然后转换成需要的实体类并且绑定到reader中,⼆ 实现我们需要的writer 并且帮到到数据库上,三实现job监听器将其绑定到步骤中 。最后开启批处理 ⾃动执⾏⼊库即可。这个简单步骤主要是配置中⽤到的 理解流程 ⾃⼰也可以⽅便实现 批处理的流程。总结以上所述是⼩编给⼤家介绍的SpringBoot和SpringBatch 使⽤,希望对⼤家有所帮助,如果⼤家有任何疑问请给我留⾔,⼩编会及时回复⼤家的。在此也⾮常感谢⼤家对⽹站的⽀持!
2023年6月21日发(作者:)
详解SpringBoot和SpringBatch使⽤什么是Spring BatchSpring Batch 是⼀个轻量级的、完善的批处理框架,旨在帮助企业建⽴健壮、⾼效的批处理应⽤。Spring Batch是Spring的⼀个⼦项⽬,使⽤Java语⾔并基于Spring框架为基础开发,使的已经使⽤ Spring 框架的开发者或者企业更容易访问和利⽤企业服务。Spring Batch 提供了⼤量可重⽤的组件,包括了⽇志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于⼤数据量和⾼性能的批处理任务,Spring Batch 同样提供了⾼级功能和特性来⽀持,⽐如分区功能、远程功能。总之,通过 SpringBatch 能够⽀持简单的、复杂的和⼤数据量的批处理作业。Spring Batch 使⽤我们⾸先配置Spring Batch 在Spring Boot 中的使⽤,数据库⽤的是mysql,pom⽂件如下,因为Spring Boot 中的Spring Batch包含 hsqsldb 所以我们将其去除 spring-boot-starter-batch hsqldb spring-boot-starter-jdbc spring-boot-starter-web ate hibernate-validator mysql mysql-connector-java 5.1.21 spring-boot-starter-test test 配置好我们需要的实体类。页⾯就不展⽰了。如果有数据校验添加的话那么我们需要配置⾃定义的检验器。若果没有课略过该步骤public class CsvBeanValidator implements Validator,InitializingBean { private tor validator; @Override public void validate(T value) throws ValidationException { Set> constraintViolations=te(value); if(()>0){ StringBuilder message=new StringBuilder(); for(ConstraintViolation constraintViolation:constraintViolations){ (sage() +"n"); } throw new ValidationException(ng()); } } //在这⾥我们使⽤的是JSR-303校验数据,在此进⾏初始化 @Override public void afterPropertiesSet() throws Exception {
ValidatorFactory validatorFactory= efaultValidatorFactory(); validator=ontext().getValidator(); }}public class CsvItemProcessor extends ValidatingItemProcessor { @Override public Person process(Person item) throws ValidationException { s(item); // 在这⾥启动 然后才会调⽤我们⾃定义的校验器,否则不能通过 。 if (ion().equals("汉族")){ e("01"); }else{ ion("02"); } return item; }}进⾏job任务监听 ⾃定义类实现JobExecutionListener 即可long startTime; long endTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = tTimeMillis(); n("任务处理开始"); } @Override public void afterJob(JobExecution jobExecution) { endTime = tTimeMillis(); n("耗时多长时间:" + (endTime - startTime) + "ms"); n("任务处理结束"); }进⾏Spring Batch 的注⼊ ⽅法有xml⽂件注⼊bean ,在这⾥选择java注⼊@Configuration@EnableBatchProcessing //开启批处理public class CsvBatchConfig { /**1 ⾸先我们通过 FlatFileItemReader 读取我们需要的⽂件 通过setResource来实现 * 2 设置map 在这⾥通过先设置解析器 setLineTokenizer 来解析我们csv⽂件中的数 据 * 3 setFieldSetMapper 将我们需要的数据转化为我们的实体对象 存储 * 4 如果想 跳过前⾯的⼏⾏ 需要使⽤setLinesToSkip就可以实现
*/
@Bean public ItemReader reader() throws Exception { FlatFileItemReader reader = new FlatFileItemReader(); //1 ource(new ClassPathResource("")); //2 eMapper(new DefaultLineMapper() {{ //3 setLineTokenizer(new DelimitedLineTokenizer() {{ setNames(new String[] { "name","age", "nation" ,"address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(); }}); }}); esToSkip(3);
return reader; } @Bean public ItemProcessor processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 idator(csvBeanValidator()); //2 return processor; } /**
*写⼊数据到数据库中 * 1执⾏的sql 语句 2 设置数据源
*/ @Bean public ItemWriter writer(DataSource dataSource) {//1 JdbcBatchItemWriter writer = new JdbcBatchItemWriter(); //2 mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider()); String sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_l, :name, :age, :nation,:address)"; (sql); //3 aSource(dataSource); return writer; } // 作业的仓库 就是设置数据源 @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); aSource(dataSource); nsactionManager(transactionManager); abaseType("mysql"); return ect(); } //调度器 使⽤它来执⾏ 我们的批处理 @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); Repository(jobRepository(dataSource, transactionManager)); return jobLauncher; } //将监听器加⼊到job中 @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return ("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } //步骤绑定 reader 与writer ⼀次性处理65000条记录 @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer, ItemProcessor processor) { return stepBuilderFactory .get("step1") .chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator csvBeanValidator() { return new CsvBeanValidator(); }}在配置⽂件中 启动⾃动执⾏批处理 = job1,job2 #启动时要执⾏的Job,默认执⾏全部d=true #是否⾃动执⾏定义的Job,默认是d=true #是否初始化Spring Batch的数据库,默认为是=-prefix= #设置SpringBatch的数据库表的前缀项⽬汇总从 项⽬中我们可以看到 总的步骤就是 ⾸先读取我们需要实现的⽂件进⾏解析,然后转换成需要的实体类并且绑定到reader中,⼆ 实现我们需要的writer 并且帮到到数据库上,三实现job监听器将其绑定到步骤中 。最后开启批处理 ⾃动执⾏⼊库即可。这个简单步骤主要是配置中⽤到的 理解流程 ⾃⼰也可以⽅便实现 批处理的流程。总结以上所述是⼩编给⼤家介绍的SpringBoot和SpringBatch 使⽤,希望对⼤家有所帮助,如果⼤家有任何疑问请给我留⾔,⼩编会及时回复⼤家的。在此也⾮常感谢⼤家对⽹站的⽀持!
发布评论