IT/Live Coding

[springBoot] spring batch multiThread process(테스트 영상 & 소스코드 포함)

알 수 없는 사용자 2023. 11. 22.

아래 포스팅에서 이어진 내용입니다.

 

[springBoot] spring batch job/stepExecutionListener

목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch 사용자 정의 ExitStatus 목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch flowJob (feat.simpleFlow) 목차 아래 포스팅에서 이어...

yaga.tistory.com


[springBoot] spring batch multiThread process(테스트 영상 & 소스코드 포함)

spring batch에서 싱글 스레드 방식을 멀티 스레드 방식으로 구현한 예제

이 방식은 스레드마다 새로운 chunk가 할당되고 스레드끼리 chunk를 공유하지 않아서 데이터 동기화가 보장된다고 한다.

FileJobConfig 🤗

package com.dev.lsy.infrenspringbatchstudy.batch.job.file;

import com.dev.lsy.infrenspringbatchstudy.batch.domain.Product;
import com.dev.lsy.infrenspringbatchstudy.batch.listener.CustomItemReadListener;
import com.dev.lsy.infrenspringbatchstudy.batch.listener.CustomItemWriteListener;
import com.dev.lsy.infrenspringbatchstudy.batch.listener.JobListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.sql.DataSource;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class FileJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final int chunkSize = 100;

    @Bean
    public Job fileJob() {
        return jobBuilderFactory.get("fileJob")
                .incrementer(new RunIdIncrementer())
                .start(fileStep1())
                .listener(new JobListener())
                .build();
    }

    @Bean
    public Step fileStep1() {
        return stepBuilderFactory.get("fileStep1")
                .<Product, Product>chunk(chunkSize)
                .reader(customFlatFileItemReader(null))
                //리스너 등록
                .listener(new CustomItemReadListener())
                .writer(customFlatItemWriter())
                .listener(new CustomItemWriteListener())
                //이 옵션을 이용해서 멀티스레드 방식으로 실행
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    //여기선 스레드 설정
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //기본 스레드 4개
        taskExecutor.setCorePoolSize(4);
        //최대 8개
        taskExecutor.setMaxPoolSize(8);
        taskExecutor.setThreadNamePrefix("sub-thread");
        return taskExecutor;
    }


    @Bean
    @StepScope
    //csv파일을 읽는 리더
    public FlatFileItemReader<Product> customFlatFileItemReader(@Value("#{jobParameters['requestDate']}") String requestDate) {
        return new FlatFileItemReaderBuilder<Product>()
                .name("flatFileItemReader")
                .resource(new ClassPathResource("product_" + requestDate + ".csv"))
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>())
                .targetType(Product.class)
                .linesToSkip(1)
                .delimited().delimiter(",")
                .names("id", "name", "price", "type")
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Product> customFlatItemWriter() {

        JdbcBatchItemWriter<Product> itemWriter = new JdbcBatchItemWriter<>();

        itemWriter.setDataSource(dataSource);
        itemWriter.setSql("insert into product values (:id, :name, :price, :type)");
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }
}

JobListener 😶

package com.dev.lsy.infrenspringbatchstudy.batch.listener;


import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

@Slf4j
public class JobListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {

    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
        log.info("총 소요시간 ==> {} ms", time);
    }
}

CustomItemReaderListener 🙂

package com.dev.lsy.infrenspringbatchstudy.batch.listener;

import com.dev.lsy.infrenspringbatchstudy.batch.domain.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;

@Slf4j
public class CustomItemReadListener implements ItemReadListener<Product> {

    @Override
    public void beforeRead() {

    }

    @Override
    //읽은 다음에 호출되는 메소드
    public void afterRead(Product item) {
        //스레드명과 아이템 아이디 출력
        log.info("Thread : " + Thread.currentThread().getName() + ", read item : " + item.getId());
    }

    @Override
    public void onReadError(Exception ex) {

    }
}

CustomItemWriteListener 😄

package com.dev.lsy.infrenspringbatchstudy.batch.listener;

import com.dev.lsy.infrenspringbatchstudy.batch.domain.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;

import java.util.List;

@Slf4j
public class CustomItemWriteListener implements ItemWriteListener<Product> {

    @Override
    public void beforeWrite(List<? extends Product> items) {

    }

    @Override
    //아이템 쓰고 나서 호출
    public void afterWrite(List<? extends Product> items) {
        log.info("Thread : " + Thread.currentThread().getName() + ", write item : " + items.size());
    }

    @Override
    public void onWriteError(Exception exception, List<? extends Product> items) {

    }
}

Product 😍

package com.dev.lsy.infrenspringbatchstudy.batch.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product {

    private Long id;
    private String name;
    private int price;
    private String type;
}

ProductEntity 🤑

package com.dev.lsy.infrenspringbatchstudy.batch.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.Entity;
import javax.persistence.Id;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity(name = "product")
public class ProductEntity {

    @Id
    private Long id;
    private String name;
    private int price;
    private String type;
}

ProductRepository 😆

package com.dev.lsy.infrenspringbatchstudy.batch.repository;

import com.dev.lsy.infrenspringbatchstudy.batch.domain.ProductEntity;
import org.springframework.data.jpa.repository.JpaRepository;

public interface ProductRepository extends JpaRepository<ProductEntity, Long> {
}

ProductRepositoryTest 😚

package com.dev.lsy.infrenspringbatchstudy.batch.repository;

import com.dev.lsy.infrenspringbatchstudy.batch.domain.ProductEntity;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Commit;

@SpringBootTest
class ProductRepositoryTest {

    @Autowired
    private ProductRepository productRepository;

    @Test
    @Commit
    public void save() {
        for (int i = 1; i < 100001; i++) {
            productRepository.save(new ProductEntity((long) i, i + "name", i + 1000, i + "type"));
        }
    }

}

ddl script 😏

CREATE TABLE `product` (
  `id` bigint(20) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `price` int(11) NOT NULL,
  `type` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

application.yml 🙊

spring:
  jpa:
    hibernate:
      ddl-auto: update
      use-new-id-generator-mappings: true
    show-sql: true
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL57Dialect
  datasource:
  	#본인에게 맞게 수정
    url: jdbc:mysql://localhost:포트명/db명?serverTimezone=Asia/Seoul
    username: 유저명
    password: 비밀번호

  batch:
    jdbc:    	
      initialize-schema: always
    job:
      names: ${job.name:NONE}

build.gradle 🐰

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'com.mysql:mysql-connector-j'
    runtimeOnly 'org.mariadb.jdbc:mariadb-java-client'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.batch:spring-batch-test'
}

product_20231119.csv

product_20231119.csv
0.00MB

mainClass 🐑

package com.dev.lsy.infrenspringbatchstudy;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableBatchProcessing
@SpringBootApplication
public class InfrenSpringBatchStudyApplication {

    public static void main(String[] args) {
        SpringApplication.run(InfrenSpringBatchStudyApplication.class, args);
    }

}

개인 스터디 기록을 메모하는 공간이라 틀린점이 있을 수 있습니다.

틀린 점 있을 경우 댓글 부탁드립니다.

reference: https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98


다음 내용

 

[springBoot] spring batch AsyncProcess/Writer

목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch multiThread process(feat. taskExecutor) 목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch job/stepExecutionListener 목차 아래 포스팅...

yaga.tistory.com

댓글