IT/development

[springBoot] spring batch AsyncProcess/Writer

알 수 없는 사용자 2023. 11. 23. 16:48
반응형

목차

    아래 포스팅에서 이어진 내용입니다.

     

    [springBoot] spring batch multiThread process(feat. taskExecutor)

    목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch job/stepExecutionListener 목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch 사용자 정의 ExitStatus 목차 아래 포스팅에서 이어

    yaga.tistory.com


    동기 Step안에서 process, writer를 비동기로 작업하는 예제(속도만 비교)

    비동기 프로세서는 실제 작업은 프로세서에게 위임하고 taskExecutor로 스레드 할당

    비동기 라이터는 비동기 프로세스 실행결과 모두 받기전까지 대기 했다가 받으면 쓰기 작업

    비동기 라이터도 실제 작업은 라이터에게 위임 

    JobConfig 🙂

    package com.dev.lsy.batchservice.config;
    
    import com.dev.lsy.batchservice.domain.Customer;
    import com.dev.lsy.batchservice.listener.StopWatchjobListener;
    import com.dev.lsy.batchservice.mapper.CustomRowMapper;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.integration.async.AsyncItemProcessor;
    import org.springframework.batch.integration.async.AsyncItemWriter;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
    import org.springframework.batch.item.database.JdbcBatchItemWriter;
    import org.springframework.batch.item.database.JdbcPagingItemReader;
    import org.springframework.batch.item.database.Order;
    import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    
    import javax.sql.DataSource;
    import java.util.HashMap;
    import java.util.Map;
    
    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class JobConfig {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource;
    
        /**
         * job
         * @return
         * @throws Exception
         */
        @Bean
        public Job job() throws Exception {
            return jobBuilderFactory.get("batchJob")
                    .incrementer(new RunIdIncrementer())
    //                .start(step1())
                    .start(asyncStep1())
                    .listener(new StopWatchjobListener())
                    .build();
        }
    
        /**
         * 동기 step
         * @return
         * @throws Exception
         */
        @Bean
        public Step step1() throws Exception {
            return stepBuilderFactory.get("step1")
                    .<Customer, Customer>chunk(100)
                    .reader(pagingItemReader())
                    .processor(customItemProcessor())
                    .writer(customItemWriter())
                    .build();
        }
    
    
        @Bean
        //비동기 스텝
        public Step asyncStep1() throws Exception{
            return stepBuilderFactory.get("asyncStep1")
                    .<Customer, Customer>chunk(100)
                    .reader(pagingItemReader())
                    //비동기 프로세서
                    .processor(asyncItemProcessor())
                    //비동기 라이터
                    .writer(asyncItemWriter())
                    .build();
        }
    
        @Bean
        //비동기 프로세서
        public AsyncItemProcessor asyncItemProcessor() {
            //비동기 프로세서 객체 생성후 세팅
            AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
            //프로세서에게 실제 작업 위임
            asyncItemProcessor.setDelegate(customItemProcessor());
            //스레드 할당
            asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
            return asyncItemProcessor;
        }
    
        @Bean
        //비동기 쓰기
        public AsyncItemWriter asyncItemWriter() {
    
            AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
            //쓰기에게 위임
            asyncItemWriter.setDelegate(customItemWriter());
            return asyncItemWriter;
        }
    
        @Bean
        //단순히 대문자로만 변경하는 처리
        public ItemProcessor<Customer, Customer> customItemProcessor() {
            return new ItemProcessor<Customer, Customer>() {
                @Override
                public Customer process(Customer item) throws Exception {
    
                    Thread.sleep(20);
                    return new Customer(item.getId(), item.getFirstName().toUpperCase(), item.getLastName().toUpperCase(), item.getBirthdate());
                }
            };
        }
    
        /**
         * 커스텀 writer
         * @return
         */
        @Bean
        public JdbcBatchItemWriter customItemWriter() {
            JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
    
            itemWriter.setDataSource(dataSource);
            itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
            itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
            itemWriter.afterPropertiesSet();
    
            return itemWriter;
        }
    
        /**
         * 동기 reader
         * @return
         */
        @Bean
        public JdbcPagingItemReader<Customer> pagingItemReader() {
            JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
    
            reader.setDataSource(dataSource);
            reader.setFetchSize(300);
            reader.setRowMapper(new CustomRowMapper());
    
            MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
            queryProvider.setSelectClause("id, first_name, last_name, birthdate");
            queryProvider.setFromClause("from customer");
    
            Map<String, Order> sortKeys = new HashMap<>(1);
    
            sortKeys.put("id", Order.ASCENDING);
    
            queryProvider.setSortKeys(sortKeys);
            reader.setQueryProvider(queryProvider);
    
            return reader;
        }
    }

    Customer 😏

    package com.dev.lsy.batchservice.domain;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    
    @Data
    @AllArgsConstructor
    public class Customer {
    
        private Long id;
        private String firstName;
        private String lastName;
        private String birthdate;
    }

     

    StopWatchjobListener 🥰

    package com.dev.lsy.batchservice.listener;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobExecutionListener;
    
    @Slf4j
    public class StopWatchjobListener implements JobExecutionListener {
    
        @Override
        public void beforeJob(JobExecution jobExecution) {
    
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
            long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
            log.info("==============================================");
            log.info("소요시간 ==> [{}] ms", time);
            log.info("==============================================");
        }
    }

    CustomItemReadListener 😅

    package com.dev.lsy.batchservice.listener;
    
    import com.dev.lsy.batchservice.domain.Customer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.ItemReadListener;
    
    @Slf4j
    public class CustomItemReadListener implements ItemReadListener<Customer> {
    
        @Override
        public void beforeRead() {
    
        }
    
        @Override
        public void afterRead(Customer item) {
            log.info("Thread : " + Thread.currentThread().getName() + " read item : " + item.getId());
        }
    
        @Override
        public void onReadError(Exception ex) {
    
        }
    }

    CustomItermProcessorListener 😄

    package com.dev.lsy.batchservice.listener;
    
    import com.dev.lsy.batchservice.domain.Customer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.ItemProcessListener;
    
    @Slf4j
    public class CustomItermProcessorListener implements ItemProcessListener<Customer, Customer> {
        @Override
        public void beforeProcess(Customer item) {
    
        }
    
        @Override
        public void afterProcess(Customer item, Customer result) {
            log.info("Thread : " + Thread.currentThread().getName() + " process item : " + item.getId());
        }
    
        @Override
        public void onProcessError(Customer item, Exception e) {
    
        }
    }

    CustomItemWriterListener 🤡

    package com.dev.lsy.batchservice.listener;
    
    import com.dev.lsy.batchservice.domain.Customer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.ItemWriteListener;
    
    import java.util.List;
    
    @Slf4j
    public class CustomItemWriterListener implements ItemWriteListener<Customer> {
        @Override
        public void beforeWrite(List<? extends Customer> items) {
    
        }
    
        @Override
        public void afterWrite(List<? extends Customer> items) {
            log.info("Thread : " + Thread.currentThread().getName() + " write item : " + items.size());
        }
    
        @Override
        public void onWriteError(Exception exception, List<? extends Customer> items) {
    
        }
    }

    CustomRowMapper 🦄

    package com.dev.lsy.batchservice.mapper;
    
    
    import com.dev.lsy.batchservice.domain.Customer;
    import org.springframework.jdbc.core.RowMapper;
    
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    public class CustomRowMapper implements RowMapper<Customer> {
    
        @Override
        public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
            return new Customer(rs.getLong("id"),
                    rs.getString("first_name"),
                    rs.getString("last_name"),
                    rs.getString("birthdate"));
        }
    }

    개인 스터디 기록을 메모하는 공간이라 틀린점이 있을 수 있습니다.

    틀린 점 있을 경우 댓글 부탁드립니다.

    reference: https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98

    반응형