2023年6月21日发(作者:)

SpringBatch读取txt⽂件并写⼊数据库的⽅法教程项⽬需求近⽇需要实现⽤户推荐相关的功能,也就是说向⽤户推荐他可能喜欢的东西。我们的数据分析⼯程师会将⽤户以及⽤户可能喜欢的东西整理成⽂档给我,我只需要将数据从⽂档中读取出来,然后对数据进⾏进⼀步的清洗(例如去掉特殊符号,长度如果太长则截取)。然后将处理后的数据存⼊数据库(Mysql)。所以分为三步:读取⽂档获得数据对获得的数据进⾏处理更新数据库(新增或更新)考虑到这个数据量以后会越来越⼤,这⾥没有使⽤ poi 来读取数据,⽽直接使⽤了 SpringBatch。实现步骤本⽂假设读者已经能够使⽤ SpringBoot 连接处理 Mysql,所以这部分⽂中会省略。1、创建 Maven 项⽬,并在 中添加依赖 spring-boot-starter-parent E 1.8 spring-boot-starter-batch spring-boot-starter-data-jpa spring-boot-starter-test test mybatis-spring-boot-starter 1.2.0 tlombok lombok 1.12.6 s commons-lang3 3.4 mysql mysql-connector-java runtime a druid 1.0.26 spring-boot-starter-web 这⾥是这个⼩项⽬中⽤到的所有依赖,包括连接数据库的依赖以及⼯具类等。2、编写 Model 类我们要从⽂档中读取的有效列就是 uid,tag,type,就是⽤户 ID,⽤户可能包含的标签(⽤于推送),⽤户类别(⽤户⽤户之间互相推荐)。 中的 @Entity,@Column 注解,是为了利⽤ JPA ⽣成数据表⽽写的,可要可不要。@Data@EqualsAndHashCode@NoArgsConstructor@AllArgsConstructor//@Entity(name = "user_map")public class UserMap extends BaseModel { @Column(name = "uid", unique = true, nullable = false) private Long uid; @Column(name = "tag") private String tag; @Column(name = "type") private Integer type;}3、实现批处理配置类@Configuration@EnableBatchProcessingpublic class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("prodDataSource") DataSource prodDataSource; @Bean public FlatFileItemReader reader() { FlatFileItemReader reader = new FlatFileItemReader<>(); ource(new ClassPathResource("")); eMapper(new DefaultLineMapper() {{ setLineTokenizer(new DelimitedLineTokenizer("|") {{ setNames(new String[]{"uid", "tag", "type"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(); }}); }}); return reader; } @Bean public JdbcBatchItemWriter importWriter() { JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); ("INSERT INTO user_map (uid,tag,type) VALUES (:uid, :tag,:type)"); aSource(prodDataSource); return writer; } @Bean public JdbcBatchItemWriter updateWriter() { JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); ("UPDATE user_map SET type = (:type),tag = (:tag) WHERE uid = (:uid)"); aSource(prodDataSource); return writer; } @Bean public UserMapItemProcessor processor(sStatus processStatus) { return new UserMapItemProcessor(processStatus); } @Bean public Job importUserJob(JobCompletionNotificationListener listener) { return ("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(importStep()) .end() .build(); } @Bean public Step importStep() { return ("importStep") .chunk(100) .reader(reader()) .processor(processor(IMPORT)) .writer(importWriter()) .build(); } @Bean public Job updateUserJob(JobCompletionNotificationListener listener) { return ("updateUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(updateStep()) .end() .build(); } @Bean public Step updateStep() { return ("updateStep") .chunk(100) .reader(reader()) .processor(processor(UPDATE)) .writer(updateWriter()) .build(); }}prodDataSource 是假设⽤户已经设置好的,如果不知道怎么配置,也可以参考之前的⽂章进⾏配置:。reader(),这⽅法从⽂件中读取数据,并且设置了⼀些必要的参数。紧接着是写操作

importWriter() 和

updateWriter()

,读者看其中⼀个就好,因为我这⾥是需要更新或者修改的,所以分为两个。processor(ProcessStatus status) ,该⽅法是对我们处理数据的类进⾏实例化,这⾥我根据 status 是 IMPORT 还是 UPDATE 来获取不同的处理结果。其他的看代码就可以看懂了,哈哈,不详细说了。4、将获得的数据进⾏清洗blic class UserMapItemProcessor implements ItemProcessor { private static final int MAX_TAG_LENGTH = 200;

private ProcessStatus processStatus; public UserMapItemProcessor(ProcessStatus processStatus) { sStatus = processStatus; } @Autowired IUserMapService userMapService; private static final String TAG_PATTERN_STR = "^[a-zA-Z0-9u4E00-u9FA5_-]+$"; public static final Pattern TAG_PATTERN = e(TAG_PATTERN_STR); private static final Logger LOG = ger(); @Override public UserMap process(UserMap userMap) throws Exception { Long uid = (); String tag = cleanTag(()); Integer label = e() == null ? f(0) : e(); if (lank(tag)) { Map params = new HashMap<>(); ("uid", uid); UserMap userMapFromDB = One(params); if (userMapFromDB == null) { if (sStatus == ) { return new UserMap(uid, tag, label); } } else { if (sStatus == ) { if (!(()) && !(e())) { e(label); (tag); return userMapFromDB; } } } } return null; } /** * 清洗标签 * * @param tag * @return */ private static String cleanTag(String tag) { if (lank(tag)) { try { tag = ing(f("{") + 1, dexOf("}")); String[] tagArray = (","); Optional reduce = (tagArray).parallel() .map(str -> (":")[0]) .map(str -> eAll("'", "")) .map(str -> eAll(" ", "")) .filter(str -> TAG_r(str).matches()) .reduce((x, y) -> x + "," + y); Function str = (s -> () > MAX_TAG_LENGTH ? ing(0, MAX_TAG_LENGTH) : s); return (()); } catch (Exception e) { (sage(), e); } } return null; } protected enum ProcessStatus { IMPORT, UPDATE; } public static void main(String[] args) { String distinctTag = cleanTag("Counter({'《重新定义》': 3, '轻想上的轻⼩说': 3, '⼩说': 2, 'Fate': 2, '同⼈⼩说': 2, '雪狼⼋组': 1, " + "'社会': 1, '⼈⽂': 1, '短篇': 1, '重新定义': 1, 'AMV': 1, '《FBD》': 1, '《雪狼六组》': 1, '战争': 1, '《灰⽻联盟》': 1, " + "'谁说轻想没⼈写⼩说': 1})"); n(distinctTag); }}读取到的数据格式如

main()

⽅法所⽰,清理之后的结果如:轻想上的轻⼩说,⼩说,Fate,同⼈⼩说,雪狼⼋组,社会,⼈⽂,短篇,重新定义,AMV,战争,谁说轻想没⼈写⼩说 。去掉了特殊符号以及数字等。使⽤了 Java8 的 Lambda 表达式。并且这⾥在处理的时候,判断如果该数据⽤户已经存在,则进⾏更新,如果不存在,则新增。5、Job 执⾏结束回调类@Componentpublic class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = ger(); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { mplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { n("end ....."); }}具体的逻辑可⾃⾏实现。完成以上⼏个步骤,运⾏项⽬,就可以读取并写⼊数据到数据库了。总结以上就是这篇⽂章的全部内容了,希望本⽂的内容对⼤家学习或者⼯作能带来⼀定的帮助,如果有疑问⼤家可以留⾔交流,谢谢⼤家对的⽀持。

2023年6月21日发(作者:)

SpringBatch读取txt⽂件并写⼊数据库的⽅法教程项⽬需求近⽇需要实现⽤户推荐相关的功能,也就是说向⽤户推荐他可能喜欢的东西。我们的数据分析⼯程师会将⽤户以及⽤户可能喜欢的东西整理成⽂档给我,我只需要将数据从⽂档中读取出来,然后对数据进⾏进⼀步的清洗(例如去掉特殊符号,长度如果太长则截取)。然后将处理后的数据存⼊数据库(Mysql)。所以分为三步:读取⽂档获得数据对获得的数据进⾏处理更新数据库(新增或更新)考虑到这个数据量以后会越来越⼤,这⾥没有使⽤ poi 来读取数据,⽽直接使⽤了 SpringBatch。实现步骤本⽂假设读者已经能够使⽤ SpringBoot 连接处理 Mysql,所以这部分⽂中会省略。1、创建 Maven 项⽬,并在 中添加依赖 spring-boot-starter-parent E 1.8 spring-boot-starter-batch spring-boot-starter-data-jpa spring-boot-starter-test test mybatis-spring-boot-starter 1.2.0 tlombok lombok 1.12.6 s commons-lang3 3.4 mysql mysql-connector-java runtime a druid 1.0.26 spring-boot-starter-web 这⾥是这个⼩项⽬中⽤到的所有依赖,包括连接数据库的依赖以及⼯具类等。2、编写 Model 类我们要从⽂档中读取的有效列就是 uid,tag,type,就是⽤户 ID,⽤户可能包含的标签(⽤于推送),⽤户类别(⽤户⽤户之间互相推荐)。 中的 @Entity,@Column 注解,是为了利⽤ JPA ⽣成数据表⽽写的,可要可不要。@Data@EqualsAndHashCode@NoArgsConstructor@AllArgsConstructor//@Entity(name = "user_map")public class UserMap extends BaseModel { @Column(name = "uid", unique = true, nullable = false) private Long uid; @Column(name = "tag") private String tag; @Column(name = "type") private Integer type;}3、实现批处理配置类@Configuration@EnableBatchProcessingpublic class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("prodDataSource") DataSource prodDataSource; @Bean public FlatFileItemReader reader() { FlatFileItemReader reader = new FlatFileItemReader<>(); ource(new ClassPathResource("")); eMapper(new DefaultLineMapper() {{ setLineTokenizer(new DelimitedLineTokenizer("|") {{ setNames(new String[]{"uid", "tag", "type"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(); }}); }}); return reader; } @Bean public JdbcBatchItemWriter importWriter() { JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); ("INSERT INTO user_map (uid,tag,type) VALUES (:uid, :tag,:type)"); aSource(prodDataSource); return writer; } @Bean public JdbcBatchItemWriter updateWriter() { JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); mSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); ("UPDATE user_map SET type = (:type),tag = (:tag) WHERE uid = (:uid)"); aSource(prodDataSource); return writer; } @Bean public UserMapItemProcessor processor(sStatus processStatus) { return new UserMapItemProcessor(processStatus); } @Bean public Job importUserJob(JobCompletionNotificationListener listener) { return ("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(importStep()) .end() .build(); } @Bean public Step importStep() { return ("importStep") .chunk(100) .reader(reader()) .processor(processor(IMPORT)) .writer(importWriter()) .build(); } @Bean public Job updateUserJob(JobCompletionNotificationListener listener) { return ("updateUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(updateStep()) .end() .build(); } @Bean public Step updateStep() { return ("updateStep") .chunk(100) .reader(reader()) .processor(processor(UPDATE)) .writer(updateWriter()) .build(); }}prodDataSource 是假设⽤户已经设置好的,如果不知道怎么配置,也可以参考之前的⽂章进⾏配置:。reader(),这⽅法从⽂件中读取数据,并且设置了⼀些必要的参数。紧接着是写操作

importWriter() 和

updateWriter()

,读者看其中⼀个就好,因为我这⾥是需要更新或者修改的,所以分为两个。processor(ProcessStatus status) ,该⽅法是对我们处理数据的类进⾏实例化,这⾥我根据 status 是 IMPORT 还是 UPDATE 来获取不同的处理结果。其他的看代码就可以看懂了,哈哈,不详细说了。4、将获得的数据进⾏清洗blic class UserMapItemProcessor implements ItemProcessor { private static final int MAX_TAG_LENGTH = 200;

private ProcessStatus processStatus; public UserMapItemProcessor(ProcessStatus processStatus) { sStatus = processStatus; } @Autowired IUserMapService userMapService; private static final String TAG_PATTERN_STR = "^[a-zA-Z0-9u4E00-u9FA5_-]+$"; public static final Pattern TAG_PATTERN = e(TAG_PATTERN_STR); private static final Logger LOG = ger(); @Override public UserMap process(UserMap userMap) throws Exception { Long uid = (); String tag = cleanTag(()); Integer label = e() == null ? f(0) : e(); if (lank(tag)) { Map params = new HashMap<>(); ("uid", uid); UserMap userMapFromDB = One(params); if (userMapFromDB == null) { if (sStatus == ) { return new UserMap(uid, tag, label); } } else { if (sStatus == ) { if (!(()) && !(e())) { e(label); (tag); return userMapFromDB; } } } } return null; } /** * 清洗标签 * * @param tag * @return */ private static String cleanTag(String tag) { if (lank(tag)) { try { tag = ing(f("{") + 1, dexOf("}")); String[] tagArray = (","); Optional reduce = (tagArray).parallel() .map(str -> (":")[0]) .map(str -> eAll("'", "")) .map(str -> eAll(" ", "")) .filter(str -> TAG_r(str).matches()) .reduce((x, y) -> x + "," + y); Function str = (s -> () > MAX_TAG_LENGTH ? ing(0, MAX_TAG_LENGTH) : s); return (()); } catch (Exception e) { (sage(), e); } } return null; } protected enum ProcessStatus { IMPORT, UPDATE; } public static void main(String[] args) { String distinctTag = cleanTag("Counter({'《重新定义》': 3, '轻想上的轻⼩说': 3, '⼩说': 2, 'Fate': 2, '同⼈⼩说': 2, '雪狼⼋组': 1, " + "'社会': 1, '⼈⽂': 1, '短篇': 1, '重新定义': 1, 'AMV': 1, '《FBD》': 1, '《雪狼六组》': 1, '战争': 1, '《灰⽻联盟》': 1, " + "'谁说轻想没⼈写⼩说': 1})"); n(distinctTag); }}读取到的数据格式如

main()

⽅法所⽰,清理之后的结果如:轻想上的轻⼩说,⼩说,Fate,同⼈⼩说,雪狼⼋组,社会,⼈⽂,短篇,重新定义,AMV,战争,谁说轻想没⼈写⼩说 。去掉了特殊符号以及数字等。使⽤了 Java8 的 Lambda 表达式。并且这⾥在处理的时候,判断如果该数据⽤户已经存在,则进⾏更新,如果不存在,则新增。5、Job 执⾏结束回调类@Componentpublic class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = ger(); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { mplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { n("end ....."); }}具体的逻辑可⾃⾏实现。完成以上⼏个步骤,运⾏项⽬,就可以读取并写⼊数据到数据库了。总结以上就是这篇⽂章的全部内容了,希望本⽂的内容对⼤家学习或者⼯作能带来⼀定的帮助,如果有疑问⼤家可以留⾔交流,谢谢⼤家对的⽀持。