public interface ItemStream {
// step执行之前执行
void open(ExecutionContext var1) throws ItemStreamException;
// 成功处理每一批chunk之后执行
void update(ExecutionContext var1) throws ItemStreamException;
// 整个step执行完之后才会执行
void close() throws ItemStreamException;
}
一:user.csv
1,monday,10,上海市,浦东新区
2,zhangsan,20,北京市,朝阳区
3,xxx,30,深圳市,宝安区
4,wangwu,31,上海市,浦东新区
5,huihui,32,上海市,浦东新区
二:ItemStreamReader
/**
* 异常处理和重启
*/
@Component
public class MyItemReader implements ItemStreamReader<UserInfo> {
private FlatFileItemReader<UserInfo> flatFileItemReader = new FlatFileItemReader();
private int currentLine = 0;
// 是否允许重启
private boolean restart = false;
private ExecutionContext executionContext;
public MyItemReader() {
flatFileItemReader.setResource(new ClassPathResource("static/user.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "username", "age", "city", "area");
BeanWrapperFieldSetMapper<UserInfo> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(UserInfo.class);
DefaultLineMapper<UserInfo> defaultLineMapper = new DefaultLineMapper<>();
defaultLineMapper.setLineTokenizer(tokenizer);
defaultLineMapper.setFieldSetMapper(fieldSetMapper);
defaultLineMapper.afterPropertiesSet();
flatFileItemReader.setLineMapper(defaultLineMapper);
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
if (executionContext.containsKey("currentLine")) {
// 6.作业第二次执行
this.currentLine = executionContext.getInt("currentLine");
this.restart = true;
} else {
// 1.作业第一次先执行
this.currentLine = 0;
executionContext.put("currentLine", this.currentLine);
System.err.println("start reading from line:" + this.currentLine + 1);
}
System.err.println("step执行之前");
}
@Override
public UserInfo read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
UserInfo userInfo = null;
this.currentLine++;
if (restart) {
// 7.作业第二次执行:跳过成功的行
flatFileItemReader.setLinesToSkip(this.currentLine - 1);
restart = false;
System.err.println("restart from line:" + this.currentLine);
}
// 2.作业第一次执行读数据
// 8.作业第二次执行读(从成功的行开始读)
flatFileItemReader.open(this.executionContext);
userInfo = flatFileItemReader.read();
if (userInfo != null && userInfo.getUsername().equals("xxx")) {
// 4.作业第一次 下一轮Chunk遇到xxx报错
throw new RuntimeException("read userinfo id=" + userInfo.getId() + " exception");
}
return userInfo;
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// 3.读完一轮chunkSize后执行
// 4.读第二轮ChunkSize后执行
executionContext.put("currentLine", this.currentLine);
System.err.println("读完一个ChunkSize执行,读的过程中报错也会执行");
}
@Override
public void close() throws ItemStreamException {
// 5.步骤执行结束执行
System.err.println("Step最后执行");
}
}
三:config
@Configuration
public class RestartJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private MyItemReader myItemReader;
@Autowired
private MyItemWriter myItemWriter;
@Bean
public Job restartJob() {
return jobBuilderFactory.get("myRestartJob2")
.start(restartStep())
.build();
}
@Bean
public Step restartStep() {
return stepBuilderFactory.get("restartStep")
.<UserInfo, UserInfo>chunk(2)
.reader(myItemReader)
.writer(myItemWriter)
.build();
}
}
四:controller
@RestController
@RequestMapping("/job")
public class JobController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job restartJob;
@RequestMapping("/start")
public ExitStatus start() throws Exception {
JobExecution jobExecution = jobLauncher.run(restartJob, new JobParameters());
return jobExecution.getExitStatus();
}
}
第一次执行完,将第3条数据的用户名xxx随便改成其它值,然后重启服务,尝试第二次继续执行。