IT/Live Coding

[springBoot] spring batch service 개발 1(테스트 영상 & 소스코드 포함)

알 수 없는 사용자 2023. 11. 27.

아래 포스팅에서 이어진 내용입니다.

 

[springBoot] spring batch AsyncProcess/Writer

목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch multiThread process(feat. taskExecutor) 목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch job/stepExecutionListener 목차 아래 포스팅

yaga.tistory.com


Api Service, batch job 2개 개발 사이드 프로젝트(배운 건 써먹어야 체득이 되니까..)

architecture를 ppt로 그릴려다가 포기(너무 못 그려서)

batch1: customer 테이블의 데이터를 읽어서 customer2 테이블에 데이터를 저장

batch2: customer2 테이블의 데이터를 읽어서 로그로 출력(이 때 Api Server의 api를 이용해 데이터를 조회)이를 spring batch의 scheduler로 실행시킴

Job1Config 😄

package com.dev.lsy.batchservice.batch.config;


import com.dev.lsy.batchservice.batch.domain.Customer;
import com.dev.lsy.batchservice.batch.listener.StopWatchjobListener;
import com.dev.lsy.batchservice.batch.mapper.CustomRowMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@RequiredArgsConstructor
@Configuration
//customer 테이블의 데이터를 읽어서 customer2 테이블에 insert 하는 job
public class Job1Config {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    //이 job에서는 step을 실행하고 리스너에서 소요시간 측정
    public Job batch1() throws Exception {
        return jobBuilderFactory.get("batch1")
                .incrementer(new RunIdIncrementer())
                .start(batchStep1())
                .listener(new StopWatchjobListener())
                .build();
    }

    @Bean
    //스텝에서는 chunk 기반으로 데이터를 읽고 씀
    public Step batchStep1() throws Exception {
        return stepBuilderFactory.get("batchStep1")
                .<Customer, Customer>chunk(100)
                .reader(pagingItemReader1())
                .writer(customItemWriter())
                .build();
    }


    @Bean
    //customer 테이블 reader
    public JdbcPagingItemReader<Customer> pagingItemReader1() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(dataSource);
        reader.setFetchSize(300);
        reader.setRowMapper(new CustomRowMapper());

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, first_name, last_name, birthdate");
        queryProvider.setFromClause("from customer");

        Map<String, Order> sortKeys = new HashMap<>(1);

        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);

        return reader;
    }

    @Bean
    //customer2 테이블에 데이터 insert
    public JdbcBatchItemWriter customItemWriter() {
        JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();

        itemWriter.setDataSource(dataSource);
        itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
        itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }

}

Job2Config 🤗

package com.dev.lsy.batchservice.batch.config;


import com.dev.lsy.batchservice.batch.domain.Customer;
import com.dev.lsy.batchservice.batch.listener.StopWatchjobListener;
import com.dev.lsy.batchservice.batch.mapper.CustomRowMapper;
import com.dev.lsy.batchservice.batch.tasklet.CustomTasklet;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@RequiredArgsConstructor
@Configuration
//customer2 테이블의 데이터를 읽어서 단순히 로그만 출력하는 job
public class Job2Config {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    //job에서 step 호출
    public Job batch2() throws Exception {
        return jobBuilderFactory.get("batch2")
                .incrementer(new RunIdIncrementer())
                .start(batchStep2())
                .listener(new StopWatchjobListener())
                .build();
    }

    @Bean
    //chunk 프로세스가 아닌 tasklet
    //step에서는 tasklet 호출
    public Step batchStep2() throws Exception {
        return stepBuilderFactory.get("batchStep2")
                .tasklet(new CustomTasklet())
                .build();
    }
}

Scheduler 🙂

package com.dev.lsy.batchservice.batch.scheduler;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
@EnableScheduling
@Slf4j
@RequiredArgsConstructor
public class Scheduler {

    private final JobLauncher launcher;
    //아래 멤버변수인 batch1,2는 job1Config의 job1,2Config의 name과 동일해야 함
    private final Job batch1;   
    private final Job batch2;

    //job1 실행
    @Scheduled(cron = "0 19 11 * * *")
    public void job1() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        //launcher는 2개의 parameter를 받음(job 객체, jobParameters(여기선 현재시간을 넘김))
        launcher.run(batch1, new JobParametersBuilder()
                .addString("date", "param1_" + LocalDateTime.now().toString())
                .toJobParameters()
        );
    }

    //job2 실행(1분에 1번 씩 실행)
    @Scheduled(cron = "0/10 * * * * *")
    public void job2() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        launcher.run(batch2, new JobParametersBuilder()
                .addString("date", "param2_" + LocalDateTime.now().toString())
                .toJobParameters()
        );
    }
}

CustomTasklet 🤑

package com.dev.lsy.batchservice.batch.tasklet;

import com.dev.lsy.batchservice.batch.domain.Customer;
import com.dev.lsy.batchservice.batch.domain.Response;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Slf4j
public class CustomTasklet implements Tasklet {

    //원래는 외부에서 가져와야 되는데 그게 자꾸 에러나고 못 찾아서 시간관계상 하드코딩
    private String url = "http://localhost:9090/customer";
        
    @Override
    //rest 방식으로 데이터 호출
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        RestTemplate restTemplate = new RestTemplate();
        //http://localhost:9090/customer 호출해서 데이터를 response타입으로 받되 제네릭타입을 Customer로 설정 
        ResponseEntity<Response<Customer>> response = restTemplate.exchange(
                       url,
                       HttpMethod.GET,
                       null,
                       new ParameterizedTypeReference<Response<Customer>>() {}
               );
        //response의 body안의 list를 가져와서 로그 출력
        List<Customer> customerList = response.getBody().getList();

        log.info("list ==> [{}]", customerList);
        
        //response가 없으면 계속 진행, 있으면 완료 후 종료
        return (response != null ? RepeatStatus.FINISHED : RepeatStatus.CONTINUABLE);
    }
}

나머지는 소스코드 전체로 첨부(ddl, dml script도 첨부)

데이터베이스, 사용자 생성 후 import 후 jdk 설정 맞추고 실행하면 됨

sample-project3-master.zip
0.22MB


개인 스터디 기록을 메모하는 공간이라 틀린점이 있을 수 있습니다.

틀린 점 있을 경우 댓글 부탁드립니다.

댓글