🔨 Multi Step Job Config 추가

This commit is contained in:
hyojin kim 2025-12-29 18:02:18 +09:00
부모 32af369f23
커밋 94f7d4b5c0
7개의 변경된 파일169개의 추가작업 그리고 33개의 파일을 삭제

파일 보기

@ -0,0 +1,44 @@
package com.snp.batch.common.batch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.transaction.PlatformTransactionManager;
/**
* 기존 단일 스텝 기능을 유지하면서 멀티 스텝 구성을 지원하는 확장 클래스
*/
public abstract class BaseMultiStepJobConfig<I, O> extends BaseJobConfig<I, O> {
public BaseMultiStepJobConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
super(jobRepository, transactionManager);
}
/**
* 하위 클래스에서 멀티 스텝 흐름을 정의합니다.
*/
protected abstract Job createJobFlow(JobBuilder jobBuilder);
/**
* 부모의 job() 메서드를 오버라이드하여 멀티 스텝 흐름을 태웁니다.
*/
@Override
public Job job() {
JobBuilder jobBuilder = new JobBuilder(getJobName(), jobRepository);
configureJob(jobBuilder); // 기존 리스너 설정 유지
return createJobFlow(jobBuilder);
}
// 단일 스텝용 Reader/Processor/Writer는 사용하지 않을 경우
// 기본적으로 null이나 예외를 던지도록 구현하여 구현 부담을 줄일 있습니다.
@Override
protected ItemReader<I> createReader() { return null; }
@Override
protected ItemProcessor<I, O> createProcessor() { return null; }
@Override
protected ItemWriter<O> createWriter() { return null; }
}

파일 보기

@ -1,9 +1,9 @@
package com.snp.batch.jobs.compliance.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.compliance.batch.dto.ComplianceDto;
import com.snp.batch.jobs.compliance.batch.processor.ComplianceDataProcessor;
import com.snp.batch.jobs.compliance.batch.entity.ComplianceEntity;
import com.snp.batch.jobs.compliance.batch.processor.ComplianceDataProcessor;
import com.snp.batch.jobs.compliance.batch.reader.ComplianceDataRangeReader;
import com.snp.batch.jobs.compliance.batch.writer.ComplianceDataWriter;
import com.snp.batch.service.BatchDateService;
@ -11,12 +11,15 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
@ -25,13 +28,16 @@ import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class ComplianceImportRangeJobConfig extends BaseJobConfig<ComplianceDto, ComplianceEntity> {
public class ComplianceImportRangeJobConfig extends BaseMultiStepJobConfig<ComplianceDto, ComplianceEntity> {
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeServiceApiWebClient;
private final ComplianceDataProcessor complianceDataProcessor;
private final ComplianceDataWriter complianceDataWriter;
private final ComplianceDataRangeReader complianceDataRangeReader;
private final BatchDateService batchDateService;
protected String getApiKey() {return "COMPLIANCE_IMPORT_API";}
protected String getBatchUpdateSql() {
return "UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW() WHERE API_KEY = '" + getApiKey() + "'";}
@Override
protected int getChunkSize() {
@ -65,6 +71,15 @@ public class ComplianceImportRangeJobConfig extends BaseJobConfig<ComplianceDto,
return "ComplianceImportRangeStep";
}
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(complianceImportRangeStep()) // 1단계 실행
.next(currentComplianceUpdateStep()) // 2단계 실행 (1단계 실패 실행 )
.next(complianceLastExecutionUpdateStep()) // 3단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
}
@Override
protected ItemReader<ComplianceDto> createReader() {
return complianceDataRangeReader;
@ -94,5 +109,46 @@ public class ComplianceImportRangeJobConfig extends BaseJobConfig<ComplianceDto,
public Step complianceImportRangeStep() {
return step();
}
/**
* 2단계: Current Compliance 업데이트
*/
@Bean
public Tasklet currentComplianceUpsertTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> Compliance Upsert 프로시저 호출 시작");
// PostgreSQL 기준 프로시저 호출 (CALL)
jdbcTemplate.execute("CALL new_snp.upsert_current_compliance()");
log.info(">>>>> Compliance Upsert 프로시저 호출 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "CurrentComplianceUpdateStep")
public Step currentComplianceUpdateStep() {
return new StepBuilder("CurrentComplianceUpdateStep", jobRepository)
.tasklet(currentComplianceUpsertTasklet(), transactionManager)
.build();
}
/**
* 3단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet complianceLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "ComplianceLastExecutionUpdateStep")
public Step complianceLastExecutionUpdateStep() {
return new StepBuilder("ComplianceLastExecutionUpdateStep", jobRepository)
.tasklet(complianceLastExecutionUpdateTasklet(), transactionManager)
.build();
}
}

파일 보기

@ -34,7 +34,7 @@ public class ComplianceDataRangeReader extends BaseApiReader<ComplianceDto> {
@Override
protected String getReaderName() {
return "ComplianceDataReader";
return "ComplianceDataRangeReader";
}
@Override
@ -56,7 +56,6 @@ public class ComplianceDataRangeReader extends BaseApiReader<ComplianceDto> {
protected List<ComplianceDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), fromDate, toDate);
allData = callApiWithBatch();
if (allData == null || allData.isEmpty()) {

파일 보기

@ -1,33 +1,23 @@
package com.snp.batch.jobs.compliance.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.compliance.batch.repository.ComplianceRepository;
import com.snp.batch.jobs.compliance.batch.entity.ComplianceEntity;
import com.snp.batch.service.BatchDateService;
import com.snp.batch.jobs.compliance.batch.repository.ComplianceRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.util.List;
@Slf4j
@Component
public class ComplianceDataWriter extends BaseWriter<ComplianceEntity> {
private final ComplianceRepository complianceRepository;
private final BatchDateService batchDateService; // BatchDateService 필드 추가
protected String getApiKey() {return "COMPLIANCE_IMPORT_API";}
public ComplianceDataWriter(ComplianceRepository complianceRepository, BatchDateService batchDateService) {
public ComplianceDataWriter(ComplianceRepository complianceRepository) {
super("ComplianceRepository");
this.complianceRepository = complianceRepository;
this.batchDateService = batchDateService;
}
@Override
protected void writeItems(List<ComplianceEntity> items) throws Exception {
complianceRepository.saveComplianceAll(items);
LocalDate successDate = LocalDate.now(); // 현재 배치 실행 시점의 날짜 (Reader의 toDay와 동일한 )
batchDateService.updateLastSuccessDate(getApiKey(), successDate);
log.info("batch_last_execution update 완료 : {}", getApiKey());
}
}

파일 보기

@ -1,6 +1,6 @@
package com.snp.batch.jobs.risk.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor;
@ -11,12 +11,15 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
@ -25,13 +28,18 @@ import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class RiskImportRangeJobConfig extends BaseJobConfig<RiskDto, RiskEntity> {
public class RiskImportRangeJobConfig extends BaseMultiStepJobConfig<RiskDto, RiskEntity> {
private final WebClient maritimeServiceApiWebClient;
private final RiskDataProcessor riskDataProcessor;
private final RiskDataWriter riskDataWriter;
private final RiskDataRangeReader riskDataRangeReader;
private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService;
protected String getApiKey() {return "RISK_IMPORT_API";}
protected String getBatchUpdateSql() {
return "UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW() WHERE API_KEY = '" + getApiKey() + "'";}
@Override
protected int getChunkSize() {
return 10000;
@ -64,6 +72,15 @@ public class RiskImportRangeJobConfig extends BaseJobConfig<RiskDto, RiskEntity>
return "RiskRangeImportStep";
}
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(riskRangeImportStep()) // 1단계: API 데이터 적재
.next(currentRiskUpdateStep()) // 2단계: 프로시저 실행으로 데이터 동기화
.next(riskLastExecutionUpdateStep()) // 3단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
}
@Override
protected ItemReader<RiskDto> createReader() {
return riskDataRangeReader;
@ -91,5 +108,46 @@ public class RiskImportRangeJobConfig extends BaseJobConfig<RiskDto, RiskEntity>
public Step riskRangeImportStep() {
return step();
}
/**
* 2단계: Current Risk 업데이트
*/
@Bean
public Tasklet currentRiskUpsertTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> Risk Upsert 프로시저 호출 시작");
// PostgreSQL 기준 프로시저 호출 (CALL)
jdbcTemplate.execute("CALL new_snp.upsert_current_risk()");
log.info(">>>>> Risk Upsert 프로시저 호출 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "CurrentRiskUpdateStep")
public Step currentRiskUpdateStep() {
return new StepBuilder("CurrentRiskUpdateStep", jobRepository)
.tasklet(currentRiskUpsertTasklet(), transactionManager)
.build();
}
/**
* 3단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet riskLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "RiskLastExecutionUpdateStep")
public Step riskLastExecutionUpdateStep() {
return new StepBuilder("RiskLastExecutionUpdateStep", jobRepository)
.tasklet(riskLastExecutionUpdateTasklet(), transactionManager)
.build();
}
}

파일 보기

@ -55,7 +55,6 @@ public class RiskDataRangeReader extends BaseApiReader<RiskDto> {
protected List<RiskDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), fromDate, toDate);
allData = callApiWithBatch();
if (allData == null || allData.isEmpty()) {

파일 보기

@ -3,31 +3,21 @@ package com.snp.batch.jobs.risk.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
import com.snp.batch.jobs.risk.batch.repository.RiskRepository;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.util.List;
@Slf4j
@Component
public class RiskDataWriter extends BaseWriter<RiskEntity> {
private final RiskRepository riskRepository;
private final BatchDateService batchDateService; // BatchDateService 필드 추가
protected String getApiKey() {return "RISK_IMPORT_API";}
public RiskDataWriter(RiskRepository riskRepository, BatchDateService batchDateService) {
public RiskDataWriter(RiskRepository riskRepository) {
super("riskRepository");
this.riskRepository = riskRepository;
this.batchDateService = batchDateService;
}
@Override
protected void writeItems(List<RiskEntity> items) throws Exception {
riskRepository.saveRiskAll(items);
LocalDate successDate = LocalDate.now(); // 현재 배치 실행 시점의 날짜 (Reader의 toDay와 동일한 )
batchDateService.updateLastSuccessDate(getApiKey(), successDate);
log.info("batch_last_execution update 완료 : {}", getApiKey());
}
}