2023年6月21日发(作者:)

SpringBatch实现批处理本⽂实现解析csv格式⽂件,插⼊数据到数据库,Spring Batch概念请⾃⾏查找⼀⼂Spring Batch配置spring: batch: job: #

⾃动执⾏批处理 enabled: false jdbc: #

是否执⾏sql初始化脚本,always每次启动都执⾏sql脚本,表存在的时候会报错,程序中把异常隐藏了,debug⽇志可见 initialize-schema: always #

数据库连接配置 datasource: url: jdbc:mysql://localhost:3306/batch?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false username: root password: hikari: pool-name: testPool maximum-pool-size: 10 minimumIdle: 5 idle-timeout: 300000 max-lifetime: 600000 connection-timeout: 30000⼆⼂监听器@Slf4jpublic class JobListener implements JobExecutionListener { private long startTime; private long endTime; @Override public void beforeJob(JobExecution jobExecution) { //

批处理任务执⾏前执⾏代码 startTime = tTimeMillis(); ("job process start"); } @Override public void afterJob(JobExecution jobExecution) { //

执⾏后代码 endTime = tTimeMillis(); ("job process end, time:{}", endTime - startTime); }}三⼂批处理执⾏配置@Slf4j@Configurationpublic class DataBatchConfig { /** * StepScope:实现从运⾏时指定的jobParameters获取绑定的参数,加了该注解以后,⽅法返回不能是ItemReader接⼝,必须是具体的实现类,否则 *

会报错 * * @param time

获取绑定的参数 * @return ItemReader具体的实现类,

没有StepScope可以返回ItemReader, FlatFileItemReader是spring提供的解析⽂件的reader */ @Bean @Bean @StepScope public FlatFileItemReader reader(@Value("#{jobParameters[time]}") Long time) { ("time:{}", time); FlatFileItemReader itemReader = new FlatFileItemReader(); //

设置读取的资源⽂件 ource(new ClassPathResource("")); //

资源⽂件的每⼀⾏映射到实体对象的⽅式 DefaultLineMapper lineMapper = new DefaultLineMapper<>(); //

解析资源⽂件的⽅式,⽤空格分隔每⼀⾏ DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer(" "); //

设置分隔后的数据对应的实体类的名字 es(new String[]{"id", "nickname", "phone"}); //

将解析的数据对应实体类的字段映射 BeanWrapperFieldSetMapper fieldMapper = new BeanWrapperFieldSetMapper<>(); //

设置实体 getType(); //

将delimitedLineTokenizer和fieldMapper设置到lineMapper eTokenizer(delimitedLineTokenizer); ldSetMapper(fieldMapper); // lineMapper设置到itemReader eMapper(lineMapper); return itemReader; } @Bean public ItemWriter writer(DataSource dataSource) { // jdbc的write JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); //

设置sql参数的提供是bean的属性 mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); // sql

参数使⽤:的形式绑定对应的bean属性 String sql = "insert into user values(:id,:nickname,:phone)"; (sql); aSource(dataSource); return writer; } @Bean public JobRepository cvsJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { //

配置执⾏过程中产⽣的数据的存储位置,使⽤mysql存储 JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); abaseType("mysql"); nsactionManager(transactionManager); aSource(dataSource); return ect(); } @Bean public SimpleJobLauncher csvJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { // job的启动对象 SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); //

设置jobRepository Repository(cvsJobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step step) { // job的名字 return ("importCsvJob") //

每次执⾏的id .incrementer(new RunIdIncrementer()) //

执⾏的步骤 .flow(step) .end() .end() //

监听器 .listener(csvJobListener()) .build(); } @Bean public JobListener csvJobListener() { //

⾃定义的监听器 return new JobListener(); } @Bean public Step step(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer) { return stepBuilderFactory //

名字 .get("step") // Chunk的机制(即每次读取⼀条数据,再处理⼀条数据,累积到⼀定数量后再⼀次性交给writer进⾏写⼊操作) .chunk(65000) .reader(reader) .writer(writer) .build(); }}//

对应实体@Getter@Setterpublic class User { private Integer id; private String nickname; private String phone;}// 3 aaaa 4 3344 4四⼂测试代码@SpringBootTestclass BatchApplicationTests { //

注⼊执⾏器 @Autowired SimpleJobLauncher jobLauncher; //

注⼊执⾏job @Autowired Job job; @Test void contextLoads() { //

执⾏的参数,通过@StepScope获取 JobParameters parameters = new JobParametersBuilder().addLong("time", tTimeMillis()).toJobParameters(); try { //

执⾏批处理 (job, parameters); } catch (JobExecutionAlreadyRunningException e) { tackTrace(); } catch (JobRestartException e) { tackTrace(); } catch (JobInstanceAlreadyCompleteException e) { tackTrace(); } catch (JobParametersInvalidException e) { tackTrace(); } }初步实现了Spring Batch,这个框架还提供了很多功能,还需要进⼀步学习

2023年6月21日发(作者:)

SpringBatch实现批处理本⽂实现解析csv格式⽂件,插⼊数据到数据库,Spring Batch概念请⾃⾏查找⼀⼂Spring Batch配置spring: batch: job: #

⾃动执⾏批处理 enabled: false jdbc: #

是否执⾏sql初始化脚本,always每次启动都执⾏sql脚本,表存在的时候会报错,程序中把异常隐藏了,debug⽇志可见 initialize-schema: always #

数据库连接配置 datasource: url: jdbc:mysql://localhost:3306/batch?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false username: root password: hikari: pool-name: testPool maximum-pool-size: 10 minimumIdle: 5 idle-timeout: 300000 max-lifetime: 600000 connection-timeout: 30000⼆⼂监听器@Slf4jpublic class JobListener implements JobExecutionListener { private long startTime; private long endTime; @Override public void beforeJob(JobExecution jobExecution) { //

批处理任务执⾏前执⾏代码 startTime = tTimeMillis(); ("job process start"); } @Override public void afterJob(JobExecution jobExecution) { //

执⾏后代码 endTime = tTimeMillis(); ("job process end, time:{}", endTime - startTime); }}三⼂批处理执⾏配置@Slf4j@Configurationpublic class DataBatchConfig { /** * StepScope:实现从运⾏时指定的jobParameters获取绑定的参数,加了该注解以后,⽅法返回不能是ItemReader接⼝,必须是具体的实现类,否则 *

会报错 * * @param time

获取绑定的参数 * @return ItemReader具体的实现类,

没有StepScope可以返回ItemReader, FlatFileItemReader是spring提供的解析⽂件的reader */ @Bean @Bean @StepScope public FlatFileItemReader reader(@Value("#{jobParameters[time]}") Long time) { ("time:{}", time); FlatFileItemReader itemReader = new FlatFileItemReader(); //

设置读取的资源⽂件 ource(new ClassPathResource("")); //

资源⽂件的每⼀⾏映射到实体对象的⽅式 DefaultLineMapper lineMapper = new DefaultLineMapper<>(); //

解析资源⽂件的⽅式,⽤空格分隔每⼀⾏ DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer(" "); //

设置分隔后的数据对应的实体类的名字 es(new String[]{"id", "nickname", "phone"}); //

将解析的数据对应实体类的字段映射 BeanWrapperFieldSetMapper fieldMapper = new BeanWrapperFieldSetMapper<>(); //

设置实体 getType(); //

将delimitedLineTokenizer和fieldMapper设置到lineMapper eTokenizer(delimitedLineTokenizer); ldSetMapper(fieldMapper); // lineMapper设置到itemReader eMapper(lineMapper); return itemReader; } @Bean public ItemWriter writer(DataSource dataSource) { // jdbc的write JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); //

设置sql参数的提供是bean的属性 mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); // sql

参数使⽤:的形式绑定对应的bean属性 String sql = "insert into user values(:id,:nickname,:phone)"; (sql); aSource(dataSource); return writer; } @Bean public JobRepository cvsJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { //

配置执⾏过程中产⽣的数据的存储位置,使⽤mysql存储 JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); abaseType("mysql"); nsactionManager(transactionManager); aSource(dataSource); return ect(); } @Bean public SimpleJobLauncher csvJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { // job的启动对象 SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); //

设置jobRepository Repository(cvsJobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step step) { // job的名字 return ("importCsvJob") //

每次执⾏的id .incrementer(new RunIdIncrementer()) //

执⾏的步骤 .flow(step) .end() .end() //

监听器 .listener(csvJobListener()) .build(); } @Bean public JobListener csvJobListener() { //

⾃定义的监听器 return new JobListener(); } @Bean public Step step(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer) { return stepBuilderFactory //

名字 .get("step") // Chunk的机制(即每次读取⼀条数据,再处理⼀条数据,累积到⼀定数量后再⼀次性交给writer进⾏写⼊操作) .chunk(65000) .reader(reader) .writer(writer) .build(); }}//

对应实体@Getter@Setterpublic class User { private Integer id; private String nickname; private String phone;}// 3 aaaa 4 3344 4四⼂测试代码@SpringBootTestclass BatchApplicationTests { //

注⼊执⾏器 @Autowired SimpleJobLauncher jobLauncher; //

注⼊执⾏job @Autowired Job job; @Test void contextLoads() { //

执⾏的参数,通过@StepScope获取 JobParameters parameters = new JobParametersBuilder().addLong("time", tTimeMillis()).toJobParameters(); try { //

执⾏批处理 (job, parameters); } catch (JobExecutionAlreadyRunningException e) { tackTrace(); } catch (JobRestartException e) { tackTrace(); } catch (JobInstanceAlreadyCompleteException e) { tackTrace(); } catch (JobParametersInvalidException e) { tackTrace(); } }初步实现了Spring Batch,这个框架还提供了很多功能,还需要进⼀步学习