반응형
목차
아래 포스팅에서 이어진 내용입니다.
Api Service, batch job 2개 개발 사이드 프로젝트(배운 건 써먹어야 체득이 되니까..)
architecture를 ppt로 그릴려다가 포기(너무 못 그려서)
batch1: customer 테이블의 데이터를 읽어서 customer2 테이블에 데이터를 저장
batch2: customer2 테이블의 데이터를 읽어서 로그로 출력(이 때 Api Server의 api를 이용해 데이터를 조회)이를 spring batch의 scheduler로 실행시킴
Job1Config 😄
package com.dev.lsy.batchservice.batch.config;
import com.dev.lsy.batchservice.batch.domain.Customer;
import com.dev.lsy.batchservice.batch.listener.StopWatchjobListener;
import com.dev.lsy.batchservice.batch.mapper.CustomRowMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@Configuration
//customer 테이블의 데이터를 읽어서 customer2 테이블에 insert 하는 job
public class Job1Config {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
//이 job에서는 step을 실행하고 리스너에서 소요시간 측정
public Job batch1() throws Exception {
return jobBuilderFactory.get("batch1")
.incrementer(new RunIdIncrementer())
.start(batchStep1())
.listener(new StopWatchjobListener())
.build();
}
@Bean
//스텝에서는 chunk 기반으로 데이터를 읽고 씀
public Step batchStep1() throws Exception {
return stepBuilderFactory.get("batchStep1")
.<Customer, Customer>chunk(100)
.reader(pagingItemReader1())
.writer(customItemWriter())
.build();
}
@Bean
//customer 테이블 reader
public JdbcPagingItemReader<Customer> pagingItemReader1() {
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(300);
reader.setRowMapper(new CustomRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, first_name, last_name, birthdate");
queryProvider.setFromClause("from customer");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
//customer2 테이블에 데이터 insert
public JdbcBatchItemWriter customItemWriter() {
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
}
Job2Config 🤗
package com.dev.lsy.batchservice.batch.config;
import com.dev.lsy.batchservice.batch.domain.Customer;
import com.dev.lsy.batchservice.batch.listener.StopWatchjobListener;
import com.dev.lsy.batchservice.batch.mapper.CustomRowMapper;
import com.dev.lsy.batchservice.batch.tasklet.CustomTasklet;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@Configuration
//customer2 테이블의 데이터를 읽어서 단순히 로그만 출력하는 job
public class Job2Config {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
//job에서 step 호출
public Job batch2() throws Exception {
return jobBuilderFactory.get("batch2")
.incrementer(new RunIdIncrementer())
.start(batchStep2())
.listener(new StopWatchjobListener())
.build();
}
@Bean
//chunk 프로세스가 아닌 tasklet
//step에서는 tasklet 호출
public Step batchStep2() throws Exception {
return stepBuilderFactory.get("batchStep2")
.tasklet(new CustomTasklet())
.build();
}
}
Scheduler 🙂
package com.dev.lsy.batchservice.batch.scheduler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
@EnableScheduling
@Slf4j
@RequiredArgsConstructor
public class Scheduler {
private final JobLauncher launcher;
//아래 멤버변수인 batch1,2는 job1Config의 job1,2Config의 name과 동일해야 함
private final Job batch1;
private final Job batch2;
//job1 실행
@Scheduled(cron = "0 19 11 * * *")
public void job1() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
//launcher는 2개의 parameter를 받음(job 객체, jobParameters(여기선 현재시간을 넘김))
launcher.run(batch1, new JobParametersBuilder()
.addString("date", "param1_" + LocalDateTime.now().toString())
.toJobParameters()
);
}
//job2 실행(1분에 1번 씩 실행)
@Scheduled(cron = "0/10 * * * * *")
public void job2() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
launcher.run(batch2, new JobParametersBuilder()
.addString("date", "param2_" + LocalDateTime.now().toString())
.toJobParameters()
);
}
}
CustomTasklet 🤑
package com.dev.lsy.batchservice.batch.tasklet;
import com.dev.lsy.batchservice.batch.domain.Customer;
import com.dev.lsy.batchservice.batch.domain.Response;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
import java.util.List;
@Slf4j
public class CustomTasklet implements Tasklet {
//원래는 외부에서 가져와야 되는데 그게 자꾸 에러나고 못 찾아서 시간관계상 하드코딩
private String url = "http://localhost:9090/customer";
@Override
//rest 방식으로 데이터 호출
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
RestTemplate restTemplate = new RestTemplate();
//http://localhost:9090/customer 호출해서 데이터를 response타입으로 받되 제네릭타입을 Customer로 설정
ResponseEntity<Response<Customer>> response = restTemplate.exchange(
url,
HttpMethod.GET,
null,
new ParameterizedTypeReference<Response<Customer>>() {}
);
//response의 body안의 list를 가져와서 로그 출력
List<Customer> customerList = response.getBody().getList();
log.info("list ==> [{}]", customerList);
//response가 없으면 계속 진행, 있으면 완료 후 종료
return (response != null ? RepeatStatus.FINISHED : RepeatStatus.CONTINUABLE);
}
}
나머지는 소스코드 전체로 첨부(ddl, dml script도 첨부)
데이터베이스, 사용자 생성 후 import 후 jdk 설정 맞추고 실행하면 됨
개인 스터디 기록을 메모하는 공간이라 틀린점이 있을 수 있습니다.
틀린 점 있을 경우 댓글 부탁드립니다.
반응형
'IT > development' 카테고리의 다른 글
[Java] 추상 클래스, 추상 메소드 간단 예제 (57) | 2023.12.05 |
---|---|
[springBoot] springBoot,java version downgrade (68) | 2023.11.27 |
[springBoot] spring batch AsyncProcess/Writer (61) | 2023.11.23 |
[springBoot] spring batch multiThread process(feat. taskExecutor) (52) | 2023.11.22 |
[springBoot] spring batch job/stepExecutionListener (51) | 2023.11.18 |