IT/development

[springBoot] spring batch multiThread process(feat. taskExecutor)

알 수 없는 사용자 2023. 11. 22. 17:06
반응형

목차

    아래 포스팅에서 이어진 내용입니다.

     

    [springBoot] spring batch job/stepExecutionListener

    목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch 사용자 정의 ExitStatus 목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch flowJob (feat.simpleFlow) 목차 아래 포스팅에서 이어

    yaga.tistory.com


    spring batch에서 싱글 스레드 방식을 멀티 스레드 방식으로 구현한 예제

    이 방식은 스레드마다 새로운 chunk가 할당되고 스레드끼리 chunk를 공유하지 않아서 데이터 동기화가 보장된다고 한다.

    FileJobConfig 🤗

    package com.dev.lsy.infrenspringbatchstudy.batch.job.file;
    
    import com.dev.lsy.infrenspringbatchstudy.batch.domain.Product;
    import com.dev.lsy.infrenspringbatchstudy.batch.listener.CustomItemReadListener;
    import com.dev.lsy.infrenspringbatchstudy.batch.listener.CustomItemWriteListener;
    import com.dev.lsy.infrenspringbatchstudy.batch.listener.JobListener;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
    import org.springframework.batch.item.database.JdbcBatchItemWriter;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
    import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import javax.sql.DataSource;
    
    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class FileJobConfig {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource;
        private final int chunkSize = 100;
    
        @Bean
        public Job fileJob() {
            return jobBuilderFactory.get("fileJob")
                    .incrementer(new RunIdIncrementer())
                    .start(fileStep1())
                    .listener(new JobListener())
                    .build();
        }
    
        @Bean
        public Step fileStep1() {
            return stepBuilderFactory.get("fileStep1")
                    .<Product, Product>chunk(chunkSize)
                    .reader(customFlatFileItemReader(null))
                    //리스너 등록
                    .listener(new CustomItemReadListener())
                    .writer(customFlatItemWriter())
                    .listener(new CustomItemWriteListener())
                    //이 옵션을 이용해서 멀티스레드 방식으로 실행
                    .taskExecutor(taskExecutor())
                    .build();
        }
    
        @Bean
        //여기선 스레드 설정
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //기본 스레드 4개
            taskExecutor.setCorePoolSize(4);
            //최대 8개
            taskExecutor.setMaxPoolSize(8);
            taskExecutor.setThreadNamePrefix("sub-thread");
            return taskExecutor;
        }
    
    
        @Bean
        @StepScope
        //csv파일을 읽는 리더
        public FlatFileItemReader<Product> customFlatFileItemReader(@Value("#{jobParameters['requestDate']}") String requestDate) {
            return new FlatFileItemReaderBuilder<Product>()
                    .name("flatFileItemReader")
                    .resource(new ClassPathResource("product_" + requestDate + ".csv"))
                    .fieldSetMapper(new BeanWrapperFieldSetMapper<>())
                    .targetType(Product.class)
                    .linesToSkip(1)
                    .delimited().delimiter(",")
                    .names("id", "name", "price", "type")
                    .build();
        }
    
        @Bean
        public JdbcBatchItemWriter<Product> customFlatItemWriter() {
    
            JdbcBatchItemWriter<Product> itemWriter = new JdbcBatchItemWriter<>();
    
            itemWriter.setDataSource(dataSource);
            itemWriter.setSql("insert into product values (:id, :name, :price, :type)");
            itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
            itemWriter.afterPropertiesSet();
    
            return itemWriter;
        }
    }

    JobListener 😶

    package com.dev.lsy.infrenspringbatchstudy.batch.listener;
    
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobExecutionListener;
    
    @Slf4j
    public class JobListener implements JobExecutionListener {
        @Override
        public void beforeJob(JobExecution jobExecution) {
    
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
            long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
            log.info("총 소요시간 ==> {} ms", time);
        }
    }

    CustomItemReaderListener 🙂

    package com.dev.lsy.infrenspringbatchstudy.batch.listener;
    
    import com.dev.lsy.infrenspringbatchstudy.batch.domain.Product;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.ItemReadListener;
    
    @Slf4j
    public class CustomItemReadListener implements ItemReadListener<Product> {
    
        @Override
        public void beforeRead() {
    
        }
    
        @Override
        //읽은 다음에 호출되는 메소드
        public void afterRead(Product item) {
            //스레드명과 아이템 아이디 출력
            log.info("Thread : " + Thread.currentThread().getName() + ", read item : " + item.getId());
        }
    
        @Override
        public void onReadError(Exception ex) {
    
        }
    }

    CustomItemWriteListener 😄

    package com.dev.lsy.infrenspringbatchstudy.batch.listener;
    
    import com.dev.lsy.infrenspringbatchstudy.batch.domain.Product;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.ItemWriteListener;
    
    import java.util.List;
    
    @Slf4j
    public class CustomItemWriteListener implements ItemWriteListener<Product> {
    
        @Override
        public void beforeWrite(List<? extends Product> items) {
    
        }
    
        @Override
        //아이템 쓰고 나서 호출
        public void afterWrite(List<? extends Product> items) {
            log.info("Thread : " + Thread.currentThread().getName() + ", write item : " + items.size());
        }
    
        @Override
        public void onWriteError(Exception exception, List<? extends Product> items) {
    
        }
    }

    Product 😍

    package com.dev.lsy.infrenspringbatchstudy.batch.domain;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Product {
    
        private Long id;
        private String name;
        private int price;
        private String type;
    }

    ProductEntity 🤑

    package com.dev.lsy.infrenspringbatchstudy.batch.domain;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import javax.persistence.Entity;
    import javax.persistence.Id;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Entity(name = "product")
    public class ProductEntity {
    
        @Id
        private Long id;
        private String name;
        private int price;
        private String type;
    }

    ProductRepository 😆

    package com.dev.lsy.infrenspringbatchstudy.batch.repository;
    
    import com.dev.lsy.infrenspringbatchstudy.batch.domain.ProductEntity;
    import org.springframework.data.jpa.repository.JpaRepository;
    
    public interface ProductRepository extends JpaRepository<ProductEntity, Long> {
    }

    ProductRepositoryTest 😚

    package com.dev.lsy.infrenspringbatchstudy.batch.repository;
    
    import com.dev.lsy.infrenspringbatchstudy.batch.domain.ProductEntity;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.annotation.Commit;
    
    @SpringBootTest
    class ProductRepositoryTest {
    
        @Autowired
        private ProductRepository productRepository;
    
        @Test
        @Commit
        public void save() {
            for (int i = 1; i < 100001; i++) {
                productRepository.save(new ProductEntity((long) i, i + "name", i + 1000, i + "type"));
            }
        }
    
    }

    ddl script 😏

    CREATE TABLE `product` (
      `id` bigint(20) NOT NULL,
      `name` varchar(255) DEFAULT NULL,
      `price` int(11) NOT NULL,
      `type` varchar(255) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

    application.yml 🙊

    spring:
      jpa:
        hibernate:
          ddl-auto: update
          use-new-id-generator-mappings: true
        show-sql: true
        properties:
          hibernate:
            dialect: org.hibernate.dialect.MySQL57Dialect
      datasource:
      	#본인에게 맞게 수정
        url: jdbc:mysql://localhost:포트명/db명?serverTimezone=Asia/Seoul
        username: 유저명
        password: 비밀번호
    
      batch:
        jdbc:    	
          initialize-schema: always
        job:
          names: ${job.name:NONE}

    build.gradle 🐰

    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-batch'
        implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
        compileOnly 'org.projectlombok:lombok'
        runtimeOnly 'com.mysql:mysql-connector-j'
        runtimeOnly 'org.mariadb.jdbc:mariadb-java-client'
        annotationProcessor 'org.projectlombok:lombok'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
        testImplementation 'org.springframework.batch:spring-batch-test'
    }

    product_20231119.csv

    product_20231119.csv
    0.00MB

    mainClass 🐑

    package com.dev.lsy.infrenspringbatchstudy;
    
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @EnableBatchProcessing
    @SpringBootApplication
    public class InfrenSpringBatchStudyApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(InfrenSpringBatchStudyApplication.class, args);
        }
    
    }

    개인 스터디 기록을 메모하는 공간이라 틀린점이 있을 수 있습니다.

    틀린 점 있을 경우 댓글 부탁드립니다.

    reference: https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98


    다음 내용

     

    [springBoot] spring batch AsyncProcess/Writer

    목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch multiThread process(feat. taskExecutor) 목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch job/stepExecutionListener 목차 아래 포스팅

    yaga.tistory.com

    반응형