2023年6月21日发(作者:)
SpringBatch实现批处理本⽂实现解析csv格式⽂件,插⼊数据到数据库,Spring Batch概念请⾃⾏查找⼀⼂Spring Batch配置spring: batch: job: #
⾃动执⾏批处理 enabled: false jdbc: #
是否执⾏sql初始化脚本,always每次启动都执⾏sql脚本,表存在的时候会报错,程序中把异常隐藏了,debug⽇志可见 initialize-schema: always #
数据库连接配置 datasource: url: jdbc:mysql://localhost:3306/batch?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false username: root password: hikari: pool-name: testPool maximum-pool-size: 10 minimumIdle: 5 idle-timeout: 300000 max-lifetime: 600000 connection-timeout: 30000⼆⼂监听器@Slf4jpublic class JobListener implements JobExecutionListener { private long startTime; private long endTime; @Override public void beforeJob(JobExecution jobExecution) { //
批处理任务执⾏前执⾏代码 startTime = tTimeMillis(); ("job process start"); } @Override public void afterJob(JobExecution jobExecution) { //
执⾏后代码 endTime = tTimeMillis(); ("job process end, time:{}", endTime - startTime); }}三⼂批处理执⾏配置@Slf4j@Configurationpublic class DataBatchConfig { /** * StepScope:实现从运⾏时指定的jobParameters获取绑定的参数,加了该注解以后,⽅法返回不能是ItemReader接⼝,必须是具体的实现类,否则 *
会报错 * * @param time
获取绑定的参数 * @return ItemReader具体的实现类,
没有StepScope可以返回ItemReader, FlatFileItemReader是spring提供的解析⽂件的reader */ @Bean @Bean @StepScope public FlatFileItemReader
设置读取的资源⽂件 ource(new ClassPathResource("")); //
资源⽂件的每⼀⾏映射到实体对象的⽅式 DefaultLineMapper
解析资源⽂件的⽅式,⽤空格分隔每⼀⾏ DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer(" "); //
设置分隔后的数据对应的实体类的名字 es(new String[]{"id", "nickname", "phone"}); //
将解析的数据对应实体类的字段映射 BeanWrapperFieldSetMapper
设置实体 getType(); //
将delimitedLineTokenizer和fieldMapper设置到lineMapper eTokenizer(delimitedLineTokenizer); ldSetMapper(fieldMapper); // lineMapper设置到itemReader eMapper(lineMapper); return itemReader; } @Bean public ItemWriter
设置sql参数的提供是bean的属性 mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); // sql
参数使⽤:的形式绑定对应的bean属性 String sql = "insert into user values(:id,:nickname,:phone)"; (sql); aSource(dataSource); return writer; } @Bean public JobRepository cvsJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { //
配置执⾏过程中产⽣的数据的存储位置,使⽤mysql存储 JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); abaseType("mysql"); nsactionManager(transactionManager); aSource(dataSource); return ect(); } @Bean public SimpleJobLauncher csvJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { // job的启动对象 SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); //
设置jobRepository Repository(cvsJobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step step) { // job的名字 return ("importCsvJob") //
每次执⾏的id .incrementer(new RunIdIncrementer()) //
执⾏的步骤 .flow(step) .end() .end() //
监听器 .listener(csvJobListener()) .build(); } @Bean public JobListener csvJobListener() { //
⾃定义的监听器 return new JobListener(); } @Bean public Step step(StepBuilderFactory stepBuilderFactory, ItemReader
名字 .get("step") // Chunk的机制(即每次读取⼀条数据,再处理⼀条数据,累积到⼀定数量后再⼀次性交给writer进⾏写⼊操作) .
对应实体@Getter@Setterpublic class User { private Integer id; private String nickname; private String phone;}// 3 aaaa 4 3344 4四⼂测试代码@SpringBootTestclass BatchApplicationTests { //
注⼊执⾏器 @Autowired SimpleJobLauncher jobLauncher; //
注⼊执⾏job @Autowired Job job; @Test void contextLoads() { //
执⾏的参数,通过@StepScope获取 JobParameters parameters = new JobParametersBuilder().addLong("time", tTimeMillis()).toJobParameters(); try { //
执⾏批处理 (job, parameters); } catch (JobExecutionAlreadyRunningException e) { tackTrace(); } catch (JobRestartException e) { tackTrace(); } catch (JobInstanceAlreadyCompleteException e) { tackTrace(); } catch (JobParametersInvalidException e) { tackTrace(); } }初步实现了Spring Batch,这个框架还提供了很多功能,还需要进⼀步学习
2023年6月21日发(作者:)
SpringBatch实现批处理本⽂实现解析csv格式⽂件,插⼊数据到数据库,Spring Batch概念请⾃⾏查找⼀⼂Spring Batch配置spring: batch: job: #
⾃动执⾏批处理 enabled: false jdbc: #
是否执⾏sql初始化脚本,always每次启动都执⾏sql脚本,表存在的时候会报错,程序中把异常隐藏了,debug⽇志可见 initialize-schema: always #
数据库连接配置 datasource: url: jdbc:mysql://localhost:3306/batch?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false username: root password: hikari: pool-name: testPool maximum-pool-size: 10 minimumIdle: 5 idle-timeout: 300000 max-lifetime: 600000 connection-timeout: 30000⼆⼂监听器@Slf4jpublic class JobListener implements JobExecutionListener { private long startTime; private long endTime; @Override public void beforeJob(JobExecution jobExecution) { //
批处理任务执⾏前执⾏代码 startTime = tTimeMillis(); ("job process start"); } @Override public void afterJob(JobExecution jobExecution) { //
执⾏后代码 endTime = tTimeMillis(); ("job process end, time:{}", endTime - startTime); }}三⼂批处理执⾏配置@Slf4j@Configurationpublic class DataBatchConfig { /** * StepScope:实现从运⾏时指定的jobParameters获取绑定的参数,加了该注解以后,⽅法返回不能是ItemReader接⼝,必须是具体的实现类,否则 *
会报错 * * @param time
获取绑定的参数 * @return ItemReader具体的实现类,
没有StepScope可以返回ItemReader, FlatFileItemReader是spring提供的解析⽂件的reader */ @Bean @Bean @StepScope public FlatFileItemReader
设置读取的资源⽂件 ource(new ClassPathResource("")); //
资源⽂件的每⼀⾏映射到实体对象的⽅式 DefaultLineMapper
解析资源⽂件的⽅式,⽤空格分隔每⼀⾏ DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer(" "); //
设置分隔后的数据对应的实体类的名字 es(new String[]{"id", "nickname", "phone"}); //
将解析的数据对应实体类的字段映射 BeanWrapperFieldSetMapper
设置实体 getType(); //
将delimitedLineTokenizer和fieldMapper设置到lineMapper eTokenizer(delimitedLineTokenizer); ldSetMapper(fieldMapper); // lineMapper设置到itemReader eMapper(lineMapper); return itemReader; } @Bean public ItemWriter
设置sql参数的提供是bean的属性 mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); // sql
参数使⽤:的形式绑定对应的bean属性 String sql = "insert into user values(:id,:nickname,:phone)"; (sql); aSource(dataSource); return writer; } @Bean public JobRepository cvsJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { //
配置执⾏过程中产⽣的数据的存储位置,使⽤mysql存储 JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); abaseType("mysql"); nsactionManager(transactionManager); aSource(dataSource); return ect(); } @Bean public SimpleJobLauncher csvJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { // job的启动对象 SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); //
设置jobRepository Repository(cvsJobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step step) { // job的名字 return ("importCsvJob") //
每次执⾏的id .incrementer(new RunIdIncrementer()) //
执⾏的步骤 .flow(step) .end() .end() //
监听器 .listener(csvJobListener()) .build(); } @Bean public JobListener csvJobListener() { //
⾃定义的监听器 return new JobListener(); } @Bean public Step step(StepBuilderFactory stepBuilderFactory, ItemReader
名字 .get("step") // Chunk的机制(即每次读取⼀条数据,再处理⼀条数据,累积到⼀定数量后再⼀次性交给writer进⾏写⼊操作) .
对应实体@Getter@Setterpublic class User { private Integer id; private String nickname; private String phone;}// 3 aaaa 4 3344 4四⼂测试代码@SpringBootTestclass BatchApplicationTests { //
注⼊执⾏器 @Autowired SimpleJobLauncher jobLauncher; //
注⼊执⾏job @Autowired Job job; @Test void contextLoads() { //
执⾏的参数,通过@StepScope获取 JobParameters parameters = new JobParametersBuilder().addLong("time", tTimeMillis()).toJobParameters(); try { //
执⾏批处理 (job, parameters); } catch (JobExecutionAlreadyRunningException e) { tackTrace(); } catch (JobRestartException e) { tackTrace(); } catch (JobInstanceAlreadyCompleteException e) { tackTrace(); } catch (JobParametersInvalidException e) { tackTrace(); } }初步实现了Spring Batch,这个框架还提供了很多功能,还需要进⼀步学习
发布评论