반응형
목차
아래 포스팅에서 이어진 내용입니다.
spring batch에서 싱글 스레드 방식을 멀티 스레드 방식으로 구현한 예제
이 방식은 스레드마다 새로운 chunk가 할당되고 스레드끼리 chunk를 공유하지 않아서 데이터 동기화가 보장된다고 한다.
FileJobConfig 🤗
package com.dev.lsy.infrenspringbatchstudy.batch.job.file;
import com.dev.lsy.infrenspringbatchstudy.batch.domain.Product;
import com.dev.lsy.infrenspringbatchstudy.batch.listener.CustomItemReadListener;
import com.dev.lsy.infrenspringbatchstudy.batch.listener.CustomItemWriteListener;
import com.dev.lsy.infrenspringbatchstudy.batch.listener.JobListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sql.DataSource;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class FileJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private final int chunkSize = 100;
@Bean
public Job fileJob() {
return jobBuilderFactory.get("fileJob")
.incrementer(new RunIdIncrementer())
.start(fileStep1())
.listener(new JobListener())
.build();
}
@Bean
public Step fileStep1() {
return stepBuilderFactory.get("fileStep1")
.<Product, Product>chunk(chunkSize)
.reader(customFlatFileItemReader(null))
//리스너 등록
.listener(new CustomItemReadListener())
.writer(customFlatItemWriter())
.listener(new CustomItemWriteListener())
//이 옵션을 이용해서 멀티스레드 방식으로 실행
.taskExecutor(taskExecutor())
.build();
}
@Bean
//여기선 스레드 설정
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//기본 스레드 4개
taskExecutor.setCorePoolSize(4);
//최대 8개
taskExecutor.setMaxPoolSize(8);
taskExecutor.setThreadNamePrefix("sub-thread");
return taskExecutor;
}
@Bean
@StepScope
//csv파일을 읽는 리더
public FlatFileItemReader<Product> customFlatFileItemReader(@Value("#{jobParameters['requestDate']}") String requestDate) {
return new FlatFileItemReaderBuilder<Product>()
.name("flatFileItemReader")
.resource(new ClassPathResource("product_" + requestDate + ".csv"))
.fieldSetMapper(new BeanWrapperFieldSetMapper<>())
.targetType(Product.class)
.linesToSkip(1)
.delimited().delimiter(",")
.names("id", "name", "price", "type")
.build();
}
@Bean
public JdbcBatchItemWriter<Product> customFlatItemWriter() {
JdbcBatchItemWriter<Product> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("insert into product values (:id, :name, :price, :type)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
}
JobListener 😶
package com.dev.lsy.infrenspringbatchstudy.batch.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
@Slf4j
public class JobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
log.info("총 소요시간 ==> {} ms", time);
}
}
CustomItemReaderListener 🙂
package com.dev.lsy.infrenspringbatchstudy.batch.listener;
import com.dev.lsy.infrenspringbatchstudy.batch.domain.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
@Slf4j
public class CustomItemReadListener implements ItemReadListener<Product> {
@Override
public void beforeRead() {
}
@Override
//읽은 다음에 호출되는 메소드
public void afterRead(Product item) {
//스레드명과 아이템 아이디 출력
log.info("Thread : " + Thread.currentThread().getName() + ", read item : " + item.getId());
}
@Override
public void onReadError(Exception ex) {
}
}
CustomItemWriteListener 😄
package com.dev.lsy.infrenspringbatchstudy.batch.listener;
import com.dev.lsy.infrenspringbatchstudy.batch.domain.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
@Slf4j
public class CustomItemWriteListener implements ItemWriteListener<Product> {
@Override
public void beforeWrite(List<? extends Product> items) {
}
@Override
//아이템 쓰고 나서 호출
public void afterWrite(List<? extends Product> items) {
log.info("Thread : " + Thread.currentThread().getName() + ", write item : " + items.size());
}
@Override
public void onWriteError(Exception exception, List<? extends Product> items) {
}
}
Product 😍
package com.dev.lsy.infrenspringbatchstudy.batch.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product {
private Long id;
private String name;
private int price;
private String type;
}
ProductEntity 🤑
package com.dev.lsy.infrenspringbatchstudy.batch.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Entity;
import javax.persistence.Id;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity(name = "product")
public class ProductEntity {
@Id
private Long id;
private String name;
private int price;
private String type;
}
ProductRepository 😆
package com.dev.lsy.infrenspringbatchstudy.batch.repository;
import com.dev.lsy.infrenspringbatchstudy.batch.domain.ProductEntity;
import org.springframework.data.jpa.repository.JpaRepository;
public interface ProductRepository extends JpaRepository<ProductEntity, Long> {
}
ProductRepositoryTest 😚
package com.dev.lsy.infrenspringbatchstudy.batch.repository;
import com.dev.lsy.infrenspringbatchstudy.batch.domain.ProductEntity;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Commit;
@SpringBootTest
class ProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
@Test
@Commit
public void save() {
for (int i = 1; i < 100001; i++) {
productRepository.save(new ProductEntity((long) i, i + "name", i + 1000, i + "type"));
}
}
}
ddl script 😏
CREATE TABLE `product` (
`id` bigint(20) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`price` int(11) NOT NULL,
`type` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
application.yml 🙊
spring:
jpa:
hibernate:
ddl-auto: update
use-new-id-generator-mappings: true
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL57Dialect
datasource:
#본인에게 맞게 수정
url: jdbc:mysql://localhost:포트명/db명?serverTimezone=Asia/Seoul
username: 유저명
password: 비밀번호
batch:
jdbc:
initialize-schema: always
job:
names: ${job.name:NONE}
build.gradle 🐰
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'com.mysql:mysql-connector-j'
runtimeOnly 'org.mariadb.jdbc:mariadb-java-client'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.batch:spring-batch-test'
}
product_20231119.csv
mainClass 🐑
package com.dev.lsy.infrenspringbatchstudy;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableBatchProcessing
@SpringBootApplication
public class InfrenSpringBatchStudyApplication {
public static void main(String[] args) {
SpringApplication.run(InfrenSpringBatchStudyApplication.class, args);
}
}
개인 스터디 기록을 메모하는 공간이라 틀린점이 있을 수 있습니다.
틀린 점 있을 경우 댓글 부탁드립니다.
reference: https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98
다음 내용
반응형
'IT > development' 카테고리의 다른 글
[springBoot] spring batch service 개발 1 (62) | 2023.11.27 |
---|---|
[springBoot] spring batch AsyncProcess/Writer (61) | 2023.11.23 |
[springBoot] spring batch job/stepExecutionListener (51) | 2023.11.18 |
[springBoot] spring batch 사용자 정의 ExitStatus (53) | 2023.11.18 |
[springBoot] spring batch flowJob (feat.simpleFlow) (55) | 2023.11.18 |