์๋ ํฌ์คํ ์์ ์ด์ด์ง ๋ด์ฉ์ ๋๋ค.
[springBoot] spring batch multiThread process(feat. taskExecutor)
๋ชฉ์ฐจ ์๋ ํฌ์คํ ์์ ์ด์ด์ง ๋ด์ฉ์ ๋๋ค. [springBoot] spring batch job/stepExecutionListener ๋ชฉ์ฐจ ์๋ ํฌ์คํ ์์ ์ด์ด์ง ๋ด์ฉ์ ๋๋ค. [springBoot] spring batch ์ฌ์ฉ์ ์ ์ ExitStatus ๋ชฉ์ฐจ ์๋ ํฌ์คํ ์์ ์ด์ด
yaga.tistory.com
๋๊ธฐ Step์์์ process, writer๋ฅผ ๋น๋๊ธฐ๋ก ์์ ํ๋ ์์ (์๋๋ง ๋น๊ต)
๋น๋๊ธฐ ํ๋ก์ธ์๋ ์ค์ ์์ ์ ํ๋ก์ธ์์๊ฒ ์์ํ๊ณ taskExecutor๋ก ์ค๋ ๋ ํ ๋น
๋น๋๊ธฐ ๋ผ์ดํฐ๋ ๋น๋๊ธฐ ํ๋ก์ธ์ค ์คํ๊ฒฐ๊ณผ ๋ชจ๋ ๋ฐ๊ธฐ์ ๊น์ง ๋๊ธฐ ํ๋ค๊ฐ ๋ฐ์ผ๋ฉด ์ฐ๊ธฐ ์์
๋น๋๊ธฐ ๋ผ์ดํฐ๋ ์ค์ ์์ ์ ๋ผ์ดํฐ์๊ฒ ์์
JobConfig ๐
package com.dev.lsy.batchservice.config;
import com.dev.lsy.batchservice.domain.Customer;
import com.dev.lsy.batchservice.listener.StopWatchjobListener;
import com.dev.lsy.batchservice.mapper.CustomRowMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
/**
* job
* @return
* @throws Exception
*/
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
// .start(step1())
.start(asyncStep1())
.listener(new StopWatchjobListener())
.build();
}
/**
* ๋๊ธฐ step
* @return
* @throws Exception
*/
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100)
.reader(pagingItemReader())
.processor(customItemProcessor())
.writer(customItemWriter())
.build();
}
@Bean
//๋น๋๊ธฐ ์คํ
public Step asyncStep1() throws Exception{
return stepBuilderFactory.get("asyncStep1")
.<Customer, Customer>chunk(100)
.reader(pagingItemReader())
//๋น๋๊ธฐ ํ๋ก์ธ์
.processor(asyncItemProcessor())
//๋น๋๊ธฐ ๋ผ์ดํฐ
.writer(asyncItemWriter())
.build();
}
@Bean
//๋น๋๊ธฐ ํ๋ก์ธ์
public AsyncItemProcessor asyncItemProcessor() {
//๋น๋๊ธฐ ํ๋ก์ธ์ ๊ฐ์ฒด ์์ฑํ ์ธํ
AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
//ํ๋ก์ธ์์๊ฒ ์ค์ ์์
์์
asyncItemProcessor.setDelegate(customItemProcessor());
//์ค๋ ๋ ํ ๋น
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;
}
@Bean
//๋น๋๊ธฐ ์ฐ๊ธฐ
public AsyncItemWriter asyncItemWriter() {
AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
//์ฐ๊ธฐ์๊ฒ ์์
asyncItemWriter.setDelegate(customItemWriter());
return asyncItemWriter;
}
@Bean
//๋จ์ํ ๋๋ฌธ์๋ก๋ง ๋ณ๊ฒฝํ๋ ์ฒ๋ฆฌ
public ItemProcessor<Customer, Customer> customItemProcessor() {
return new ItemProcessor<Customer, Customer>() {
@Override
public Customer process(Customer item) throws Exception {
Thread.sleep(20);
return new Customer(item.getId(), item.getFirstName().toUpperCase(), item.getLastName().toUpperCase(), item.getBirthdate());
}
};
}
/**
* ์ปค์คํ
writer
* @return
*/
@Bean
public JdbcBatchItemWriter customItemWriter() {
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
/**
* ๋๊ธฐ reader
* @return
*/
@Bean
public JdbcPagingItemReader<Customer> pagingItemReader() {
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(300);
reader.setRowMapper(new CustomRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, first_name, last_name, birthdate");
queryProvider.setFromClause("from customer");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
}
Customer ๐
package com.dev.lsy.batchservice.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
StopWatchjobListener ๐ฅฐ
package com.dev.lsy.batchservice.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
@Slf4j
public class StopWatchjobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
log.info("==============================================");
log.info("์์์๊ฐ ==> [{}] ms", time);
log.info("==============================================");
}
}
CustomItemReadListener ๐
package com.dev.lsy.batchservice.listener;
import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
@Slf4j
public class CustomItemReadListener implements ItemReadListener<Customer> {
@Override
public void beforeRead() {
}
@Override
public void afterRead(Customer item) {
log.info("Thread : " + Thread.currentThread().getName() + " read item : " + item.getId());
}
@Override
public void onReadError(Exception ex) {
}
}
CustomItermProcessorListener ๐
package com.dev.lsy.batchservice.listener;
import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemProcessListener;
@Slf4j
public class CustomItermProcessorListener implements ItemProcessListener<Customer, Customer> {
@Override
public void beforeProcess(Customer item) {
}
@Override
public void afterProcess(Customer item, Customer result) {
log.info("Thread : " + Thread.currentThread().getName() + " process item : " + item.getId());
}
@Override
public void onProcessError(Customer item, Exception e) {
}
}
CustomItemWriterListener ๐คก
package com.dev.lsy.batchservice.listener;
import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
@Slf4j
public class CustomItemWriterListener implements ItemWriteListener<Customer> {
@Override
public void beforeWrite(List<? extends Customer> items) {
}
@Override
public void afterWrite(List<? extends Customer> items) {
log.info("Thread : " + Thread.currentThread().getName() + " write item : " + items.size());
}
@Override
public void onWriteError(Exception exception, List<? extends Customer> items) {
}
}
CustomRowMapper ๐ฆ
package com.dev.lsy.batchservice.mapper;
import com.dev.lsy.batchservice.domain.Customer;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
public class CustomRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return new Customer(rs.getLong("id"),
rs.getString("first_name"),
rs.getString("last_name"),
rs.getString("birthdate"));
}
}
๊ฐ์ธ ์คํฐ๋ ๊ธฐ๋ก์ ๋ฉ๋ชจํ๋ ๊ณต๊ฐ์ด๋ผ ํ๋ฆฐ์ ์ด ์์ ์ ์์ต๋๋ค.
ํ๋ฆฐ ์ ์์ ๊ฒฝ์ฐ ๋๊ธ ๋ถํ๋๋ฆฝ๋๋ค.
reference: https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98
๋๊ธ