IT/development

[springBoot] spring batch service 개발 1

알 수 없는 사용자 2023. 11. 27. 14:01
반응형

목차

    아래 포스팅에서 이어진 내용입니다.

     

    [springBoot] spring batch AsyncProcess/Writer

    목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch multiThread process(feat. taskExecutor) 목차 아래 포스팅에서 이어진 내용입니다. [springBoot] spring batch job/stepExecutionListener 목차 아래 포스팅

    yaga.tistory.com


    Api Service, batch job 2개 개발 사이드 프로젝트(배운 건 써먹어야 체득이 되니까..)

    architecture를 ppt로 그릴려다가 포기(너무 못 그려서)

    batch1: customer 테이블의 데이터를 읽어서 customer2 테이블에 데이터를 저장

    batch2: customer2 테이블의 데이터를 읽어서 로그로 출력(이 때 Api Server의 api를 이용해 데이터를 조회)이를 spring batch의 scheduler로 실행시킴

    Job1Config 😄

    package com.dev.lsy.batchservice.batch.config;
    
    
    import com.dev.lsy.batchservice.batch.domain.Customer;
    import com.dev.lsy.batchservice.batch.listener.StopWatchjobListener;
    import com.dev.lsy.batchservice.batch.mapper.CustomRowMapper;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
    import org.springframework.batch.item.database.JdbcBatchItemWriter;
    import org.springframework.batch.item.database.JdbcPagingItemReader;
    import org.springframework.batch.item.database.Order;
    import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.sql.DataSource;
    import java.util.HashMap;
    import java.util.Map;
    
    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    //customer 테이블의 데이터를 읽어서 customer2 테이블에 insert 하는 job
    public class Job1Config {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource;
    
        @Bean
        //이 job에서는 step을 실행하고 리스너에서 소요시간 측정
        public Job batch1() throws Exception {
            return jobBuilderFactory.get("batch1")
                    .incrementer(new RunIdIncrementer())
                    .start(batchStep1())
                    .listener(new StopWatchjobListener())
                    .build();
        }
    
        @Bean
        //스텝에서는 chunk 기반으로 데이터를 읽고 씀
        public Step batchStep1() throws Exception {
            return stepBuilderFactory.get("batchStep1")
                    .<Customer, Customer>chunk(100)
                    .reader(pagingItemReader1())
                    .writer(customItemWriter())
                    .build();
        }
    
    
        @Bean
        //customer 테이블 reader
        public JdbcPagingItemReader<Customer> pagingItemReader1() {
            JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
    
            reader.setDataSource(dataSource);
            reader.setFetchSize(300);
            reader.setRowMapper(new CustomRowMapper());
    
            MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
            queryProvider.setSelectClause("id, first_name, last_name, birthdate");
            queryProvider.setFromClause("from customer");
    
            Map<String, Order> sortKeys = new HashMap<>(1);
    
            sortKeys.put("id", Order.ASCENDING);
    
            queryProvider.setSortKeys(sortKeys);
            reader.setQueryProvider(queryProvider);
    
            return reader;
        }
    
        @Bean
        //customer2 테이블에 데이터 insert
        public JdbcBatchItemWriter customItemWriter() {
            JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
    
            itemWriter.setDataSource(dataSource);
            itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
            itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
            itemWriter.afterPropertiesSet();
    
            return itemWriter;
        }
    
    }

    Job2Config 🤗

    package com.dev.lsy.batchservice.batch.config;
    
    
    import com.dev.lsy.batchservice.batch.domain.Customer;
    import com.dev.lsy.batchservice.batch.listener.StopWatchjobListener;
    import com.dev.lsy.batchservice.batch.mapper.CustomRowMapper;
    import com.dev.lsy.batchservice.batch.tasklet.CustomTasklet;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
    import org.springframework.batch.item.database.JdbcBatchItemWriter;
    import org.springframework.batch.item.database.JdbcPagingItemReader;
    import org.springframework.batch.item.database.Order;
    import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.sql.DataSource;
    import java.util.HashMap;
    import java.util.Map;
    
    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    //customer2 테이블의 데이터를 읽어서 단순히 로그만 출력하는 job
    public class Job2Config {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource;
    
        @Bean
        //job에서 step 호출
        public Job batch2() throws Exception {
            return jobBuilderFactory.get("batch2")
                    .incrementer(new RunIdIncrementer())
                    .start(batchStep2())
                    .listener(new StopWatchjobListener())
                    .build();
        }
    
        @Bean
        //chunk 프로세스가 아닌 tasklet
        //step에서는 tasklet 호출
        public Step batchStep2() throws Exception {
            return stepBuilderFactory.get("batchStep2")
                    .tasklet(new CustomTasklet())
                    .build();
        }
    }

    Scheduler 🙂

    package com.dev.lsy.batchservice.batch.scheduler;
    
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.JobParametersInvalidException;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
    import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
    import org.springframework.batch.core.repository.JobRestartException;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDateTime;
    
    @Component
    @EnableScheduling
    @Slf4j
    @RequiredArgsConstructor
    public class Scheduler {
    
        private final JobLauncher launcher;
        //아래 멤버변수인 batch1,2는 job1Config의 job1,2Config의 name과 동일해야 함
        private final Job batch1;   
        private final Job batch2;
    
        //job1 실행
        @Scheduled(cron = "0 19 11 * * *")
        public void job1() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
            //launcher는 2개의 parameter를 받음(job 객체, jobParameters(여기선 현재시간을 넘김))
            launcher.run(batch1, new JobParametersBuilder()
                    .addString("date", "param1_" + LocalDateTime.now().toString())
                    .toJobParameters()
            );
        }
    
        //job2 실행(1분에 1번 씩 실행)
        @Scheduled(cron = "0/10 * * * * *")
        public void job2() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
            launcher.run(batch2, new JobParametersBuilder()
                    .addString("date", "param2_" + LocalDateTime.now().toString())
                    .toJobParameters()
            );
        }
    }

    CustomTasklet 🤑

    package com.dev.lsy.batchservice.batch.tasklet;
    
    import com.dev.lsy.batchservice.batch.domain.Customer;
    import com.dev.lsy.batchservice.batch.domain.Response;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.core.ParameterizedTypeReference;
    import org.springframework.http.HttpMethod;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.client.RestTemplate;
    
    import java.util.List;
    
    @Slf4j
    public class CustomTasklet implements Tasklet {
    
        //원래는 외부에서 가져와야 되는데 그게 자꾸 에러나고 못 찾아서 시간관계상 하드코딩
        private String url = "http://localhost:9090/customer";
            
        @Override
        //rest 방식으로 데이터 호출
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            RestTemplate restTemplate = new RestTemplate();
            //http://localhost:9090/customer 호출해서 데이터를 response타입으로 받되 제네릭타입을 Customer로 설정 
            ResponseEntity<Response<Customer>> response = restTemplate.exchange(
                           url,
                           HttpMethod.GET,
                           null,
                           new ParameterizedTypeReference<Response<Customer>>() {}
                   );
            //response의 body안의 list를 가져와서 로그 출력
            List<Customer> customerList = response.getBody().getList();
    
            log.info("list ==> [{}]", customerList);
            
            //response가 없으면 계속 진행, 있으면 완료 후 종료
            return (response != null ? RepeatStatus.FINISHED : RepeatStatus.CONTINUABLE);
        }
    }

    나머지는 소스코드 전체로 첨부(ddl, dml script도 첨부)

    데이터베이스, 사용자 생성 후 import 후 jdk 설정 맞추고 실행하면 됨

    sample-project3-master.zip
    0.22MB


    개인 스터디 기록을 메모하는 공간이라 틀린점이 있을 수 있습니다.

    틀린 점 있을 경우 댓글 부탁드립니다.

    반응형