public interface ItemStream {
	// step执行之前执行
    void open(ExecutionContext var1) throws ItemStreamException;
	// 成功处理每一批chunk之后执行
    void update(ExecutionContext var1) throws ItemStreamException;
	// 整个step执行完之后才会执行
    void close() throws ItemStreamException;
}

SpringBatch从入门到实战(九):ItemStreamReader-LMLPHP

一:user.csv

1,monday,10,上海市,浦东新区
2,zhangsan,20,北京市,朝阳区
3,xxx,30,深圳市,宝安区
4,wangwu,31,上海市,浦东新区
5,huihui,32,上海市,浦东新区

二:ItemStreamReader


/**
 * 异常处理和重启
 */
@Component
public class MyItemReader implements ItemStreamReader<UserInfo> {

    private FlatFileItemReader<UserInfo> flatFileItemReader = new FlatFileItemReader();
    private int currentLine = 0;
    // 是否允许重启
    private boolean restart = false;
    private ExecutionContext executionContext;

    public MyItemReader() {
        flatFileItemReader.setResource(new ClassPathResource("static/user.csv"));
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("id", "username", "age", "city", "area");

        BeanWrapperFieldSetMapper<UserInfo> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(UserInfo.class);

        DefaultLineMapper<UserInfo> defaultLineMapper = new DefaultLineMapper<>();
        defaultLineMapper.setLineTokenizer(tokenizer);
        defaultLineMapper.setFieldSetMapper(fieldSetMapper);
        defaultLineMapper.afterPropertiesSet();

        flatFileItemReader.setLineMapper(defaultLineMapper);
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContext = executionContext;
        if (executionContext.containsKey("currentLine")) {
            // 6.作业第二次执行
            this.currentLine = executionContext.getInt("currentLine");
            this.restart = true;
        } else {
            // 1.作业第一次先执行
            this.currentLine = 0;
            executionContext.put("currentLine", this.currentLine);
            System.err.println("start reading from line:" + this.currentLine + 1);
        }
        System.err.println("step执行之前");
    }

    @Override
    public UserInfo read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        UserInfo userInfo = null;
        this.currentLine++;

        if (restart) {
            // 7.作业第二次执行:跳过成功的行
            flatFileItemReader.setLinesToSkip(this.currentLine - 1);
            restart = false;
            System.err.println("restart from line:" + this.currentLine);
        }

        // 2.作业第一次执行读数据
        // 8.作业第二次执行读(从成功的行开始读)
        flatFileItemReader.open(this.executionContext);
        userInfo = flatFileItemReader.read();

        if (userInfo != null && userInfo.getUsername().equals("xxx")) {
            // 4.作业第一次 下一轮Chunk遇到xxx报错
            throw new RuntimeException("read userinfo id=" + userInfo.getId() + " exception");
        }
        return userInfo;
    }
    

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        // 3.读完一轮chunkSize后执行
        // 4.读第二轮ChunkSize后执行
        executionContext.put("currentLine", this.currentLine);
        System.err.println("读完一个ChunkSize执行,读的过程中报错也会执行");
    }

    @Override
    public void close() throws ItemStreamException {
        // 5.步骤执行结束执行
        System.err.println("Step最后执行");
    }
}

三:config

@Configuration
public class RestartJobConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Autowired
    private MyItemReader myItemReader;

    @Autowired
    private MyItemWriter myItemWriter;


    @Bean
    public Job restartJob() {
        return jobBuilderFactory.get("myRestartJob2")
                .start(restartStep())
                .build();
    }

    @Bean
    public Step restartStep() {
        return stepBuilderFactory.get("restartStep")
                .<UserInfo, UserInfo>chunk(2)
                .reader(myItemReader)
                .writer(myItemWriter)
                .build();
    }
}

四:controller

@RestController
@RequestMapping("/job")
public class JobController {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job restartJob;

   

    @RequestMapping("/start")
    public ExitStatus start() throws Exception {
        JobExecution jobExecution = jobLauncher.run(restartJob, new JobParameters());
        return jobExecution.getExitStatus();
    }
}

第一次执行完,将第3条数据的用户名xxx随便改成其它值,然后重启服务,尝试第二次继续执行。
SpringBatch从入门到实战(九):ItemStreamReader-LMLPHP

06-15 17:06