반응형
목차
아래 포스팅에서 이어진 내용입니다.
동기 Step안에서 process, writer를 비동기로 작업하는 예제(속도만 비교)
비동기 프로세서는 실제 작업은 프로세서에게 위임하고 taskExecutor로 스레드 할당
비동기 라이터는 비동기 프로세스 실행결과 모두 받기전까지 대기 했다가 받으면 쓰기 작업
비동기 라이터도 실제 작업은 라이터에게 위임
JobConfig 🙂
package com.dev.lsy.batchservice.config;
import com.dev.lsy.batchservice.domain.Customer;
import com.dev.lsy.batchservice.listener.StopWatchjobListener;
import com.dev.lsy.batchservice.mapper.CustomRowMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
/**
* job
* @return
* @throws Exception
*/
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
// .start(step1())
.start(asyncStep1())
.listener(new StopWatchjobListener())
.build();
}
/**
* 동기 step
* @return
* @throws Exception
*/
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100)
.reader(pagingItemReader())
.processor(customItemProcessor())
.writer(customItemWriter())
.build();
}
@Bean
//비동기 스텝
public Step asyncStep1() throws Exception{
return stepBuilderFactory.get("asyncStep1")
.<Customer, Customer>chunk(100)
.reader(pagingItemReader())
//비동기 프로세서
.processor(asyncItemProcessor())
//비동기 라이터
.writer(asyncItemWriter())
.build();
}
@Bean
//비동기 프로세서
public AsyncItemProcessor asyncItemProcessor() {
//비동기 프로세서 객체 생성후 세팅
AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
//프로세서에게 실제 작업 위임
asyncItemProcessor.setDelegate(customItemProcessor());
//스레드 할당
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;
}
@Bean
//비동기 쓰기
public AsyncItemWriter asyncItemWriter() {
AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
//쓰기에게 위임
asyncItemWriter.setDelegate(customItemWriter());
return asyncItemWriter;
}
@Bean
//단순히 대문자로만 변경하는 처리
public ItemProcessor<Customer, Customer> customItemProcessor() {
return new ItemProcessor<Customer, Customer>() {
@Override
public Customer process(Customer item) throws Exception {
Thread.sleep(20);
return new Customer(item.getId(), item.getFirstName().toUpperCase(), item.getLastName().toUpperCase(), item.getBirthdate());
}
};
}
/**
* 커스텀 writer
* @return
*/
@Bean
public JdbcBatchItemWriter customItemWriter() {
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
/**
* 동기 reader
* @return
*/
@Bean
public JdbcPagingItemReader<Customer> pagingItemReader() {
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(300);
reader.setRowMapper(new CustomRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, first_name, last_name, birthdate");
queryProvider.setFromClause("from customer");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
}
Customer 😏
package com.dev.lsy.batchservice.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
StopWatchjobListener 🥰
package com.dev.lsy.batchservice.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
@Slf4j
public class StopWatchjobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
log.info("==============================================");
log.info("소요시간 ==> [{}] ms", time);
log.info("==============================================");
}
}
CustomItemReadListener 😅
package com.dev.lsy.batchservice.listener;
import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
@Slf4j
public class CustomItemReadListener implements ItemReadListener<Customer> {
@Override
public void beforeRead() {
}
@Override
public void afterRead(Customer item) {
log.info("Thread : " + Thread.currentThread().getName() + " read item : " + item.getId());
}
@Override
public void onReadError(Exception ex) {
}
}
CustomItermProcessorListener 😄
package com.dev.lsy.batchservice.listener;
import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemProcessListener;
@Slf4j
public class CustomItermProcessorListener implements ItemProcessListener<Customer, Customer> {
@Override
public void beforeProcess(Customer item) {
}
@Override
public void afterProcess(Customer item, Customer result) {
log.info("Thread : " + Thread.currentThread().getName() + " process item : " + item.getId());
}
@Override
public void onProcessError(Customer item, Exception e) {
}
}
CustomItemWriterListener 🤡
package com.dev.lsy.batchservice.listener;
import com.dev.lsy.batchservice.domain.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
@Slf4j
public class CustomItemWriterListener implements ItemWriteListener<Customer> {
@Override
public void beforeWrite(List<? extends Customer> items) {
}
@Override
public void afterWrite(List<? extends Customer> items) {
log.info("Thread : " + Thread.currentThread().getName() + " write item : " + items.size());
}
@Override
public void onWriteError(Exception exception, List<? extends Customer> items) {
}
}
CustomRowMapper 🦄
package com.dev.lsy.batchservice.mapper;
import com.dev.lsy.batchservice.domain.Customer;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
public class CustomRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return new Customer(rs.getLong("id"),
rs.getString("first_name"),
rs.getString("last_name"),
rs.getString("birthdate"));
}
}
개인 스터디 기록을 메모하는 공간이라 틀린점이 있을 수 있습니다.
틀린 점 있을 경우 댓글 부탁드립니다.
reference: https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98
반응형
'IT > development' 카테고리의 다른 글
[springBoot] springBoot,java version downgrade (68) | 2023.11.27 |
---|---|
[springBoot] spring batch service 개발 1 (62) | 2023.11.27 |
[springBoot] spring batch multiThread process(feat. taskExecutor) (52) | 2023.11.22 |
[springBoot] spring batch job/stepExecutionListener (51) | 2023.11.18 |
[springBoot] spring batch 사용자 정의 ExitStatus (53) | 2023.11.18 |