IT/Live Coding

[springBoot] spring batch AsyncProcess/Writer(테스트 영상 & 소스코드 포함)

알 수 없는 사용자 2023. 11. 23.

아래 포스팅에서 이어진 내용입니다.

 

[springBoot] spring batch multiThread process(feat. taskExecutor)

목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch job/stepExecutionListener 목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch 사용자 정의 ExitStatus 목차 아래 포스팅에서 이어...

yaga.tistory.com


[springBoot] spring batch AsyncProcess/Writer(테스트 영상 & 소스코드 포함)

동기 Step안에서 process, writer를 비동기로 작업하는 예제(속도만 비교)

비동기 프로세서는 실제 작업은 프로세서에게 위임하고 taskExecutor로 스레드 할당

비동기 라이터는 비동기 프로세스 실행결과 모두 받기전까지 대기 했다가 받으면 쓰기 작업

비동기 라이터도 실제 작업은 라이터에게 위임 

JobConfig 🙂

package com.dev.lsy.batchservice.config;

import com.dev.lsy.batchservice.domain.Customer;
import com.dev.lsy.batchservice.listener.StopWatchjobListener;
import com.dev.lsy.batchservice.mapper.CustomRowMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    /**
     * job
     * @return
     * @throws Exception
     */
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
//                .start(step1())
                .start(asyncStep1())
                .listener(new StopWatchjobListener())
                .build();
    }

    /**
     * 동기 step
     * @return
     * @throws Exception
     */
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(100)
                .reader(pagingItemReader())
                .processor(customItemProcessor())
                .writer(customItemWriter())
                .build();
    }


    @Bean
    //비동기 스텝
    public Step asyncStep1() throws Exception{
        return stepBuilderFactory.get("asyncStep1")
                .<Customer, Customer>chunk(100)
                .reader(pagingItemReader())
                //비동기 프로세서
                .processor(asyncItemProcessor())
                //비동기 라이터
                .writer(asyncItemWriter())
                .build();
    }

    @Bean
    //비동기 프로세서
    public AsyncItemProcessor asyncItemProcessor() {
        //비동기 프로세서 객체 생성후 세팅
        AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
        //프로세서에게 실제 작업 위임
        asyncItemProcessor.setDelegate(customItemProcessor());
        //스레드 할당
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    //비동기 쓰기
    public AsyncItemWriter asyncItemWriter() {

        AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
        //쓰기에게 위임
        asyncItemWriter.setDelegate(customItemWriter());
        return asyncItemWriter;
    }

    @Bean
    //단순히 대문자로만 변경하는 처리
    public ItemProcessor<Customer, Customer> customItemProcessor() {
        return new ItemProcessor<Customer, Customer>() {
            @Override
            public Customer process(Customer item) throws Exception {

                Thread.sleep(20);
                return new Customer(item.getId(), item.getFirstName().toUpperCase(), item.getLastName().toUpperCase(), item.getBirthdate());
            }
        };
    }

    /**
     * 커스텀 writer
     * @return
     */
    @Bean
    public JdbcBatchItemWriter customItemWriter() {
        JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();

        itemWriter.setDataSource(dataSource);
        itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }

    /**
     * 동기 reader
     * @return
     */
    @Bean
    public JdbcPagingItemReader<Customer> pagingItemReader() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(dataSource);
        reader.setFetchSize(300);
        reader.setRowMapper(new CustomRowMapper());

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, first_name, last_name, birthdate");
        queryProvider.setFromClause("from customer");

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);

        return reader;
    }
}

Customer 😏

package com.dev.lsy.batchservice.domain;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Customer {

    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

 

StopWatchjobListener 🥰

package com.dev.lsy.batchservice.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

@Slf4j
public class StopWatchjobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {

    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
        log.info("==============================================");
        log.info("소요시간 ==> [{}] ms", time);
        log.info("==============================================");
    }
}

CustomItemReadListener 😅

package com.dev.lsy.batchservice.listener;

import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;

@Slf4j
public class CustomItemReadListener implements ItemReadListener<Customer> {

    @Override
    public void beforeRead() {

    }

    @Override
    public void afterRead(Customer item) {
        log.info("Thread : " + Thread.currentThread().getName() + " read item : " + item.getId());
    }

    @Override
    public void onReadError(Exception ex) {

    }
}

CustomItermProcessorListener 😄

package com.dev.lsy.batchservice.listener;

import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemProcessListener;

@Slf4j
public class CustomItermProcessorListener implements ItemProcessListener<Customer, Customer> {
    @Override
    public void beforeProcess(Customer item) {

    }

    @Override
    public void afterProcess(Customer item, Customer result) {
        log.info("Thread : " + Thread.currentThread().getName() + " process item : " + item.getId());
    }

    @Override
    public void onProcessError(Customer item, Exception e) {

    }
}

CustomItemWriterListener 🤡

package com.dev.lsy.batchservice.listener;

import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;

import java.util.List;

@Slf4j
public class CustomItemWriterListener implements ItemWriteListener<Customer> {
    @Override
    public void beforeWrite(List<? extends Customer> items) {

    }

    @Override
    public void afterWrite(List<? extends Customer> items) {
        log.info("Thread : " + Thread.currentThread().getName() + " write item : " + items.size());
    }

    @Override
    public void onWriteError(Exception exception, List<? extends Customer> items) {

    }
}

CustomRowMapper 🦄

package com.dev.lsy.batchservice.mapper;


import com.dev.lsy.batchservice.domain.Customer;
import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class CustomRowMapper implements RowMapper<Customer> {

    @Override
    public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
        return new Customer(rs.getLong("id"),
                rs.getString("first_name"),
                rs.getString("last_name"),
                rs.getString("birthdate"));
    }
}

개인 스터디 기록을 메모하는 공간이라 틀린점이 있을 수 있습니다.

틀린 점 있을 경우 댓글 부탁드립니다.

reference: https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98