Spring Batch异步处理器-AsyncItemProcessor无效,启动时获得作业状态为COMPLETED

我正在尝试实现一个简单的spring批处理作业

目标是使用JpaItemReader在数据库上执行选择查询,并以异步方式处理所有itens

我遵循此示例-> https://github.com/mminella/scaling-demos/blob/master/single-jvm-demos/src/main/java/io/spring/batch/scalingdemos/asyncprocessor/AsyncProcessorJobApplication.java

如果我在单个线程中运行此代码,并且效果很好

但是当我添加异步过程时,看起来像工作 无需等待Process()的执行即可将状态更改为COMPLETED

我可能会错过此脚本中的某些内容吗?

这是控制台的结果:

2019-11-07 10:15:10.672 [main] T: INFO  o.s.batch.core.job.SimpleStepHandler - Step already complete or not restartable,so no action to execute: StepExecution: id=92,version=21,name=step1,status=COMPLETED,exitStatus=COMPLETED,readCount=1872,filterCount=0,writeCount=1872 readSkipCount=0,writeSkipCount=0,processSkipCount=0,commitCount=19,rollbackCount=0,exitDescription= 
2019-11-07 10:15:10.693 [main] T: INFO  c.s.g.G.JobCompletionNotificationListener - !!! JOB FINISHED! Time to verify the results 
2019-11-07 10:15:10.702 [main] T: INFO  o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=reCalculate]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 74ms 
201


>** Here is my Class**


import javax.persistence.EntityManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import com.batch.poc.common.Position;



@Component
public class PositionRecalculationBatchExecutorAsync {

    private static final Logger log = LoggerFactory.getLogger(PositionRecalculationBatchExecutorAsync.class);

    private static final int BATCH_SIZE = 450;

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired public EntityManagerFactory emf;


    @Bean
    @StepScope
    public   JpaPagingItemReader<Position> read( ) throws Exception{
        JpaPagingItemReader<Position> positionBatchReader = new  JpaPagingItemReader<>();

        positionBatchReader.setEntityManagerFactory(emf);

        StringBuilder query = new StringBuilder("SELECT ")
                .append(" pos ")
                .append(" FROM ")
                .append(" Position pos");


        positionBatchReader.setQueryString( query.toString() );
        positionBatchReader.setPageSize(400);
        positionBatchReader.setSaveState(false);
        positionBatchReader.afterPropertiesSet();
        return positionBatchReader;

    }


    @Bean
    @StepScope
    public  JpaItemWriter<Position> update() {
        //Not in Use
        JpaItemWriter<Position>  positionBatchWriter = new JpaItemWriter<> ();
        return positionBatchWriter;
    }

    @Bean public Job reCalculate(JobCompletionNotificationListener listener) throws Exception { 
        return this.jobBuilderFactory.get("reCalculate") 
                .start(step1() )
                .listener(listener)
                .build(); 
    }

    @Bean public PositionItemProcessor processor() {
        return new PositionItemProcessor(); 
    }



    @Bean 
    public AsyncItemProcessor<Position,Position>  asyncItemProcessor() throws Exception {
        AsyncItemProcessor<Position,Position> p =  new AsyncItemProcessor<>();
        p.setDelegate(processor());
        p.setTaskExecutor(new SimpleAsyncTaskExecutor());
        p.afterPropertiesSet();
        return p;

    }

    @Bean 
    @StepScope
    public AsyncItemWriter<Position> asyncItemWriter(){
        log.info("!!! JOB asyncItemWriter!!");
        AsyncItemWriter<Position> writer = new AsyncItemWriter<>();
        writer.setDelegate(this.update());
        return writer;
    }

    @Bean
    public Step step1() throws Exception {
        log.info("!!!  step1 !!");

        return this.stepBuilderFactory.get("step1")
                .<Position,Position> chunk(100)
                .reader(read())
                .processor(  (ItemProcessor) asyncItemProcessor() )
                .writer( asyncItemWriter() )
                .build();
    }


}
wwwlll47 回答:Spring Batch异步处理器-AsyncItemProcessor无效,启动时获得作业状态为COMPLETED

  

但是当我添加异步过程时,看起来工作将状态更改为COMPLETED

您似乎第一次使用常规处理器执行了该作业(此处已执行并完成了该步骤),然后在添加了异步处理器之后运行了相同的作业(此处该步骤已完成,因此显示了消息{ {1}}。

即使您想重新运行,也可以在步骤定义上设置Step already complete or not restartable,so no action to execute标志:

allowStartIfComplete
本文链接:https://www.f2er.com/3145634.html

大家都在问