2023年6月21日发(作者:)

Spring实现批处理任务Spring实现批处理任务前⾔本⽂主要讲述基本的批处理任务的解决⽅案。创建业务数据⼿动创建⼀个⽂件来模拟业务数据src/main/resources/tiesJill,DoeJoe,DoeJustin,DoeJane,DoeJohn,Doe编写SQL脚本接下来写⼀个sql脚本创建⼀个表来存储数据src/main/resources/P TABLE people IF EXISTS;CREATE TABLE people ( person_id BIGINT IDENTITY NOT NULL PRIMARY KEY, first_name VARCHAR(20), last_name VARCHAR(20));Spring Boot运⾏模式-@@platform@@ @。sql在启动时⾃动执⾏。-all是所有平台的默认值。创建实体类接下来写⼀个类来表⽰每⾏的数据package ;public class Person { private String lastName; private String firstName; public Person() { } public Person(String firstName, String lastName) { ame = firstName; me = lastName; } public void setFirstName(String firstName) { ame = firstName; } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { me = lastName; } @Override public String toString() { return "firstName: " + firstName + ", lastName: " + lastName; }}创建⼀个中间处理器批处理中的⼀个常见范例是获取数据,转换数据,然后将其输出到其他地⽅。这⾥编写⼀个简单的转换器,将名称转换为⼤写。package ;import ;import ;import cutionListener;import ;import BatchProcessing;import lderFactory;import ilderFactory;import ncrementer;import opertyItemSqlParameterSourceProvider;import tchItemWriter;import tchItemWriterBuilder;import leItemReader;import leItemReaderBuilder;import apperFieldSetMapper;import tLineMapper;import tedLineTokenizer;import red;import ;import uration;import athResource;import athResource;import mplate;import urce;@Configuration@EnableBatchProcessingpublic class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Bean public FlatFileItemReader reader() { return new FlatFileItemReaderBuilder() .name("personItemReader") .resource(new ClassPathResource("")) .delimited() .names(new String[]{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(); }}) .build(); } @Bean public PersonItemProcessor processor() { return new PersonItemProcessor(); } @Bean public JdbcBatchItemWriter writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); } @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return ("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter writer) { return ("step1") . chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); }}  对于初学者来说,@EnableBatchProcessing注释添加了很多任务⽀持的关键bean,并节省了⼤量的辅助⼯作。这个例⼦使⽤⼀个基于内存的数据库(由@EnableBatchProcessing提供),这意味着当它完成时,数据就消失了。第⼀个代码块定义输⼊、处理器和输出。  reader()创建⼀个ItemReader。它查找⼀个名为的⽂件,并解析每个⾏项⽬,其中包含⾜够的信息将其转换为Person。processor()创建前⾯定义的PersonItemProcessor的⼀个实例,⽤于⼤写数据。write(DataSource)创建⼀个ItemWriter。这⼀个针对JDBC⽬标,并⾃动获取由@EnableBatchProcessing创建的数据源的副本。它包括插⼊由Java bean属性驱动的单个⼈所需的SQL语句。 @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return ("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter writer) { return ("step1") . chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); }第⼀个⽅法定义作业,第⼆个⽅法定义单个步骤。作业是由步骤构建的,其中每个步骤都包含⼀个读取器、⼀个处理器和⼀个写⼊器。在这个作业定义中,你需要⼀个递增器,因为作业使⽤数据库来维护执⾏状态。然后列出每个步骤,其中这个作业只有⼀个步骤。作业结束,Java API⽣成⼀个配置完美的作业。在步骤定义中,定义⼀次要写多少数据。在本例中,它⼀次写⼊多达10条记录。接下来,使⽤前⾯注⼊的位配置读取器、处理器和写⼊器。package ;import ;import ;import Factory;import tatus;import cution;import cutionListenerSupport;import red;import mplate;import ent;@Componentpublic class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = ger(); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { mplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { if(tus() == TED) { (" JOB FINISHED! Time to verify the results"); ("SELECT first_name, last_name FROM people", (rs, row) -> new Person( ing(1), ing(2)) ).forEach(person -> ("Found <" + person + "> in the database.")); } }}

2023年6月21日发(作者:)

Spring实现批处理任务Spring实现批处理任务前⾔本⽂主要讲述基本的批处理任务的解决⽅案。创建业务数据⼿动创建⼀个⽂件来模拟业务数据src/main/resources/tiesJill,DoeJoe,DoeJustin,DoeJane,DoeJohn,Doe编写SQL脚本接下来写⼀个sql脚本创建⼀个表来存储数据src/main/resources/P TABLE people IF EXISTS;CREATE TABLE people ( person_id BIGINT IDENTITY NOT NULL PRIMARY KEY, first_name VARCHAR(20), last_name VARCHAR(20));Spring Boot运⾏模式-@@platform@@ @。sql在启动时⾃动执⾏。-all是所有平台的默认值。创建实体类接下来写⼀个类来表⽰每⾏的数据package ;public class Person { private String lastName; private String firstName; public Person() { } public Person(String firstName, String lastName) { ame = firstName; me = lastName; } public void setFirstName(String firstName) { ame = firstName; } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { me = lastName; } @Override public String toString() { return "firstName: " + firstName + ", lastName: " + lastName; }}创建⼀个中间处理器批处理中的⼀个常见范例是获取数据,转换数据,然后将其输出到其他地⽅。这⾥编写⼀个简单的转换器,将名称转换为⼤写。package ;import ;import ;import cutionListener;import ;import BatchProcessing;import lderFactory;import ilderFactory;import ncrementer;import opertyItemSqlParameterSourceProvider;import tchItemWriter;import tchItemWriterBuilder;import leItemReader;import leItemReaderBuilder;import apperFieldSetMapper;import tLineMapper;import tedLineTokenizer;import red;import ;import uration;import athResource;import athResource;import mplate;import urce;@Configuration@EnableBatchProcessingpublic class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Bean public FlatFileItemReader reader() { return new FlatFileItemReaderBuilder() .name("personItemReader") .resource(new ClassPathResource("")) .delimited() .names(new String[]{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(); }}) .build(); } @Bean public PersonItemProcessor processor() { return new PersonItemProcessor(); } @Bean public JdbcBatchItemWriter writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); } @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return ("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter writer) { return ("step1") . chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); }}  对于初学者来说,@EnableBatchProcessing注释添加了很多任务⽀持的关键bean,并节省了⼤量的辅助⼯作。这个例⼦使⽤⼀个基于内存的数据库(由@EnableBatchProcessing提供),这意味着当它完成时,数据就消失了。第⼀个代码块定义输⼊、处理器和输出。  reader()创建⼀个ItemReader。它查找⼀个名为的⽂件,并解析每个⾏项⽬,其中包含⾜够的信息将其转换为Person。processor()创建前⾯定义的PersonItemProcessor的⼀个实例,⽤于⼤写数据。write(DataSource)创建⼀个ItemWriter。这⼀个针对JDBC⽬标,并⾃动获取由@EnableBatchProcessing创建的数据源的副本。它包括插⼊由Java bean属性驱动的单个⼈所需的SQL语句。 @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return ("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter writer) { return ("step1") . chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); }第⼀个⽅法定义作业,第⼆个⽅法定义单个步骤。作业是由步骤构建的,其中每个步骤都包含⼀个读取器、⼀个处理器和⼀个写⼊器。在这个作业定义中,你需要⼀个递增器,因为作业使⽤数据库来维护执⾏状态。然后列出每个步骤,其中这个作业只有⼀个步骤。作业结束,Java API⽣成⼀个配置完美的作业。在步骤定义中,定义⼀次要写多少数据。在本例中,它⼀次写⼊多达10条记录。接下来,使⽤前⾯注⼊的位配置读取器、处理器和写⼊器。package ;import ;import ;import Factory;import tatus;import cution;import cutionListenerSupport;import red;import mplate;import ent;@Componentpublic class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = ger(); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { mplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { if(tus() == TED) { (" JOB FINISHED! Time to verify the results"); ("SELECT first_name, last_name FROM people", (rs, row) -> new Person( ing(1), ing(2)) ).forEach(person -> ("Found <" + person + "> in the database.")); } }}