IT/Live Coding

[springBoot] spring batch AsyncProcess/Writer(ํ…Œ์ŠคํŠธ ์˜์ƒ & ์†Œ์Šค์ฝ”๋“œ ํฌํ•จ)

์•Œ ์ˆ˜ ์—†๋Š” ์‚ฌ์šฉ์ž 2023. 11. 23.

์•„๋ž˜ ํฌ์ŠคํŒ…์—์„œ ์ด์–ด์ง„ ๋‚ด์šฉ์ž…๋‹ˆ๋‹ค.

 

[springBoot] spring batch multiThread process(feat. taskExecutor)

๋ชฉ์ฐจ ์•„๋ž˜ ํฌ์ŠคํŒ…์—์„œ ์ด์–ด์ง„ ๋‚ด์šฉ์ž…๋‹ˆ๋‹ค. [springBoot] spring batch job/stepExecutionListener ๋ชฉ์ฐจ ์•„๋ž˜ ํฌ์ŠคํŒ…์—์„œ ์ด์–ด์ง„ ๋‚ด์šฉ์ž…๋‹ˆ๋‹ค. [springBoot] spring batch ์‚ฌ์šฉ์ž ์ •์˜ ExitStatus ๋ชฉ์ฐจ ์•„๋ž˜ ํฌ์ŠคํŒ…์—์„œ ์ด์–ด

yaga.tistory.com


๋™๊ธฐ Step์•ˆ์—์„œ process, writer๋ฅผ ๋น„๋™๊ธฐ๋กœ ์ž‘์—…ํ•˜๋Š” ์˜ˆ์ œ(์†๋„๋งŒ ๋น„๊ต)

๋น„๋™๊ธฐ ํ”„๋กœ์„ธ์„œ๋Š” ์‹ค์ œ ์ž‘์—…์€ ํ”„๋กœ์„ธ์„œ์—๊ฒŒ ์œ„์ž„ํ•˜๊ณ  taskExecutor๋กœ ์Šค๋ ˆ๋“œ ํ• ๋‹น

๋น„๋™๊ธฐ ๋ผ์ดํ„ฐ๋Š” ๋น„๋™๊ธฐ ํ”„๋กœ์„ธ์Šค ์‹คํ–‰๊ฒฐ๊ณผ ๋ชจ๋‘ ๋ฐ›๊ธฐ์ „๊นŒ์ง€ ๋Œ€๊ธฐ ํ–ˆ๋‹ค๊ฐ€ ๋ฐ›์œผ๋ฉด ์“ฐ๊ธฐ ์ž‘์—…

๋น„๋™๊ธฐ ๋ผ์ดํ„ฐ๋„ ์‹ค์ œ ์ž‘์—…์€ ๋ผ์ดํ„ฐ์—๊ฒŒ ์œ„์ž„ 

JobConfig ๐Ÿ™‚

package com.dev.lsy.batchservice.config;

import com.dev.lsy.batchservice.domain.Customer;
import com.dev.lsy.batchservice.listener.StopWatchjobListener;
import com.dev.lsy.batchservice.mapper.CustomRowMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    /**
     * job
     * @return
     * @throws Exception
     */
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
//                .start(step1())
                .start(asyncStep1())
                .listener(new StopWatchjobListener())
                .build();
    }

    /**
     * ๋™๊ธฐ step
     * @return
     * @throws Exception
     */
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(100)
                .reader(pagingItemReader())
                .processor(customItemProcessor())
                .writer(customItemWriter())
                .build();
    }


    @Bean
    //๋น„๋™๊ธฐ ์Šคํ…
    public Step asyncStep1() throws Exception{
        return stepBuilderFactory.get("asyncStep1")
                .<Customer, Customer>chunk(100)
                .reader(pagingItemReader())
                //๋น„๋™๊ธฐ ํ”„๋กœ์„ธ์„œ
                .processor(asyncItemProcessor())
                //๋น„๋™๊ธฐ ๋ผ์ดํ„ฐ
                .writer(asyncItemWriter())
                .build();
    }

    @Bean
    //๋น„๋™๊ธฐ ํ”„๋กœ์„ธ์„œ
    public AsyncItemProcessor asyncItemProcessor() {
        //๋น„๋™๊ธฐ ํ”„๋กœ์„ธ์„œ ๊ฐ์ฒด ์ƒ์„ฑํ›„ ์„ธํŒ…
        AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
        //ํ”„๋กœ์„ธ์„œ์—๊ฒŒ ์‹ค์ œ ์ž‘์—… ์œ„์ž„
        asyncItemProcessor.setDelegate(customItemProcessor());
        //์Šค๋ ˆ๋“œ ํ• ๋‹น
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    //๋น„๋™๊ธฐ ์“ฐ๊ธฐ
    public AsyncItemWriter asyncItemWriter() {

        AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
        //์“ฐ๊ธฐ์—๊ฒŒ ์œ„์ž„
        asyncItemWriter.setDelegate(customItemWriter());
        return asyncItemWriter;
    }

    @Bean
    //๋‹จ์ˆœํžˆ ๋Œ€๋ฌธ์ž๋กœ๋งŒ ๋ณ€๊ฒฝํ•˜๋Š” ์ฒ˜๋ฆฌ
    public ItemProcessor<Customer, Customer> customItemProcessor() {
        return new ItemProcessor<Customer, Customer>() {
            @Override
            public Customer process(Customer item) throws Exception {

                Thread.sleep(20);
                return new Customer(item.getId(), item.getFirstName().toUpperCase(), item.getLastName().toUpperCase(), item.getBirthdate());
            }
        };
    }

    /**
     * ์ปค์Šคํ…€ writer
     * @return
     */
    @Bean
    public JdbcBatchItemWriter customItemWriter() {
        JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();

        itemWriter.setDataSource(dataSource);
        itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }

    /**
     * ๋™๊ธฐ reader
     * @return
     */
    @Bean
    public JdbcPagingItemReader<Customer> pagingItemReader() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(dataSource);
        reader.setFetchSize(300);
        reader.setRowMapper(new CustomRowMapper());

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, first_name, last_name, birthdate");
        queryProvider.setFromClause("from customer");

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);

        return reader;
    }
}

Customer ๐Ÿ˜

package com.dev.lsy.batchservice.domain;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Customer {

    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

 

StopWatchjobListener ๐Ÿฅฐ

package com.dev.lsy.batchservice.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

@Slf4j
public class StopWatchjobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {

    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
        log.info("==============================================");
        log.info("์†Œ์š”์‹œ๊ฐ„ ==> [{}] ms", time);
        log.info("==============================================");
    }
}

CustomItemReadListener ๐Ÿ˜…

package com.dev.lsy.batchservice.listener;

import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;

@Slf4j
public class CustomItemReadListener implements ItemReadListener<Customer> {

    @Override
    public void beforeRead() {

    }

    @Override
    public void afterRead(Customer item) {
        log.info("Thread : " + Thread.currentThread().getName() + " read item : " + item.getId());
    }

    @Override
    public void onReadError(Exception ex) {

    }
}

CustomItermProcessorListener ๐Ÿ˜„

package com.dev.lsy.batchservice.listener;

import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemProcessListener;

@Slf4j
public class CustomItermProcessorListener implements ItemProcessListener<Customer, Customer> {
    @Override
    public void beforeProcess(Customer item) {

    }

    @Override
    public void afterProcess(Customer item, Customer result) {
        log.info("Thread : " + Thread.currentThread().getName() + " process item : " + item.getId());
    }

    @Override
    public void onProcessError(Customer item, Exception e) {

    }
}

CustomItemWriterListener ๐Ÿคก

package com.dev.lsy.batchservice.listener;

import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;

import java.util.List;

@Slf4j
public class CustomItemWriterListener implements ItemWriteListener<Customer> {
    @Override
    public void beforeWrite(List<? extends Customer> items) {

    }

    @Override
    public void afterWrite(List<? extends Customer> items) {
        log.info("Thread : " + Thread.currentThread().getName() + " write item : " + items.size());
    }

    @Override
    public void onWriteError(Exception exception, List<? extends Customer> items) {

    }
}

CustomRowMapper ๐Ÿฆ„

package com.dev.lsy.batchservice.mapper;


import com.dev.lsy.batchservice.domain.Customer;
import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class CustomRowMapper implements RowMapper<Customer> {

    @Override
    public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
        return new Customer(rs.getLong("id"),
                rs.getString("first_name"),
                rs.getString("last_name"),
                rs.getString("birthdate"));
    }
}

๊ฐœ์ธ ์Šคํ„ฐ๋”” ๊ธฐ๋ก์„ ๋ฉ”๋ชจํ•˜๋Š” ๊ณต๊ฐ„์ด๋ผ ํ‹€๋ฆฐ์ ์ด ์žˆ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ํ‹€๋ฆฐ ์  ์žˆ์„ ๊ฒฝ์šฐ ๋Œ“๊ธ€ ๋ถ€ํƒ๋“œ๋ฆฝ๋‹ˆ๋‹ค.

reference: https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98

๋Œ“๊ธ€