diff --git a/src/main/java/com/snp/batch/common/batch/config/BaseMultiStepJobConfig.java b/src/main/java/com/snp/batch/common/batch/config/BaseMultiStepJobConfig.java new file mode 100644 index 0000000..2d2053c --- /dev/null +++ b/src/main/java/com/snp/batch/common/batch/config/BaseMultiStepJobConfig.java @@ -0,0 +1,44 @@ +package com.snp.batch.common.batch.config; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * 기존 단일 스텝 기능을 유지하면서 멀티 스텝 구성을 지원하는 확장 클래스 + */ +public abstract class BaseMultiStepJobConfig extends BaseJobConfig { + + public BaseMultiStepJobConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager) { + super(jobRepository, transactionManager); + } + + /** + * 하위 클래스에서 멀티 스텝 흐름을 정의합니다. + */ + protected abstract Job createJobFlow(JobBuilder jobBuilder); + + /** + * 부모의 job() 메서드를 오버라이드하여 멀티 스텝 흐름을 태웁니다. + */ + @Override + public Job job() { + JobBuilder jobBuilder = new JobBuilder(getJobName(), jobRepository); + configureJob(jobBuilder); // 기존 리스너 등 설정 유지 + + return createJobFlow(jobBuilder); + } + + // 단일 스텝용 Reader/Processor/Writer는 사용하지 않을 경우 + // 기본적으로 null이나 예외를 던지도록 구현하여 구현 부담을 줄일 수 있습니다. + @Override + protected ItemReader createReader() { return null; } + @Override + protected ItemProcessor createProcessor() { return null; } + @Override + protected ItemWriter createWriter() { return null; } +} \ No newline at end of file diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java index dd0d4bf..8bc27a2 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java @@ -1,9 +1,9 @@ package com.snp.batch.jobs.compliance.batch.config; -import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.compliance.batch.dto.ComplianceDto; -import com.snp.batch.jobs.compliance.batch.processor.ComplianceDataProcessor; import com.snp.batch.jobs.compliance.batch.entity.ComplianceEntity; +import com.snp.batch.jobs.compliance.batch.processor.ComplianceDataProcessor; import com.snp.batch.jobs.compliance.batch.reader.ComplianceDataRangeReader; import com.snp.batch.jobs.compliance.batch.writer.ComplianceDataWriter; import com.snp.batch.service.BatchDateService; @@ -11,12 +11,15 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -25,13 +28,16 @@ import org.springframework.web.reactive.function.client.WebClient; @Slf4j @Configuration -public class ComplianceImportRangeJobConfig extends BaseJobConfig { +public class ComplianceImportRangeJobConfig extends BaseMultiStepJobConfig { private final JdbcTemplate jdbcTemplate; private final WebClient maritimeServiceApiWebClient; private final ComplianceDataProcessor complianceDataProcessor; private final ComplianceDataWriter complianceDataWriter; private final ComplianceDataRangeReader complianceDataRangeReader; private final BatchDateService batchDateService; + protected String getApiKey() {return "COMPLIANCE_IMPORT_API";} + protected String getBatchUpdateSql() { + return "UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW() WHERE API_KEY = '" + getApiKey() + "'";} @Override protected int getChunkSize() { @@ -65,6 +71,15 @@ public class ComplianceImportRangeJobConfig extends BaseJobConfig createReader() { return complianceDataRangeReader; @@ -94,5 +109,46 @@ public class ComplianceImportRangeJobConfig extends BaseJobConfig { + log.info(">>>>> Compliance Upsert 프로시저 호출 시작"); + // PostgreSQL 기준 프로시저 호출 (CALL) + jdbcTemplate.execute("CALL new_snp.upsert_current_compliance()"); + + log.info(">>>>> Compliance Upsert 프로시저 호출 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "CurrentComplianceUpdateStep") + public Step currentComplianceUpdateStep() { + return new StepBuilder("CurrentComplianceUpdateStep", jobRepository) + .tasklet(currentComplianceUpsertTasklet(), transactionManager) + .build(); + } + + /** + * 3단계: 모든 스텝 성공 시 배치 실행 로그(날짜) 업데이트 + */ + @Bean + public Tasklet complianceLastExecutionUpdateTasklet() { + return (contribution, chunkContext) -> { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "ComplianceLastExecutionUpdateStep") + public Step complianceLastExecutionUpdateStep() { + return new StepBuilder("ComplianceLastExecutionUpdateStep", jobRepository) + .tasklet(complianceLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java b/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java index 6cdbdf3..6a989cb 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java @@ -34,7 +34,7 @@ public class ComplianceDataRangeReader extends BaseApiReader { @Override protected String getReaderName() { - return "ComplianceDataReader"; + return "ComplianceDataRangeReader"; } @Override @@ -56,7 +56,6 @@ public class ComplianceDataRangeReader extends BaseApiReader { protected List fetchNextBatch() throws Exception { // 모든 배치 처리 완료 확인 if (allData == null) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), fromDate, toDate); allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/writer/ComplianceDataWriter.java b/src/main/java/com/snp/batch/jobs/compliance/batch/writer/ComplianceDataWriter.java index b9ab2e4..fbba771 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/writer/ComplianceDataWriter.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/writer/ComplianceDataWriter.java @@ -1,33 +1,23 @@ package com.snp.batch.jobs.compliance.batch.writer; import com.snp.batch.common.batch.writer.BaseWriter; -import com.snp.batch.jobs.compliance.batch.repository.ComplianceRepository; import com.snp.batch.jobs.compliance.batch.entity.ComplianceEntity; -import com.snp.batch.service.BatchDateService; +import com.snp.batch.jobs.compliance.batch.repository.ComplianceRepository; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.time.LocalDate; import java.util.List; @Slf4j @Component public class ComplianceDataWriter extends BaseWriter { - private final ComplianceRepository complianceRepository; - private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 - protected String getApiKey() {return "COMPLIANCE_IMPORT_API";} - public ComplianceDataWriter(ComplianceRepository complianceRepository, BatchDateService batchDateService) { + public ComplianceDataWriter(ComplianceRepository complianceRepository) { super("ComplianceRepository"); this.complianceRepository = complianceRepository; - this.batchDateService = batchDateService; } - @Override protected void writeItems(List items) throws Exception { complianceRepository.saveComplianceAll(items); - LocalDate successDate = LocalDate.now(); // 현재 배치 실행 시점의 날짜 (Reader의 toDay와 동일한 값) - batchDateService.updateLastSuccessDate(getApiKey(), successDate); - log.info("batch_last_execution update 완료 : {}", getApiKey()); } } diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java index 00e8bb8..7b1d2f0 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java @@ -1,6 +1,6 @@ package com.snp.batch.jobs.risk.batch.config; -import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.risk.batch.dto.RiskDto; import com.snp.batch.jobs.risk.batch.entity.RiskEntity; import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor; @@ -11,12 +11,15 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -25,13 +28,18 @@ import org.springframework.web.reactive.function.client.WebClient; @Slf4j @Configuration -public class RiskImportRangeJobConfig extends BaseJobConfig { +public class RiskImportRangeJobConfig extends BaseMultiStepJobConfig { private final WebClient maritimeServiceApiWebClient; private final RiskDataProcessor riskDataProcessor; private final RiskDataWriter riskDataWriter; private final RiskDataRangeReader riskDataRangeReader; private final JdbcTemplate jdbcTemplate; private final BatchDateService batchDateService; + + protected String getApiKey() {return "RISK_IMPORT_API";} + protected String getBatchUpdateSql() { + return "UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW() WHERE API_KEY = '" + getApiKey() + "'";} + @Override protected int getChunkSize() { return 10000; @@ -64,6 +72,15 @@ public class RiskImportRangeJobConfig extends BaseJobConfig return "RiskRangeImportStep"; } + @Override + protected Job createJobFlow(JobBuilder jobBuilder) { + return jobBuilder + .start(riskRangeImportStep()) // 1단계: API 데이터 적재 + .next(currentRiskUpdateStep()) // 2단계: 프로시저 실행으로 데이터 동기화 + .next(riskLastExecutionUpdateStep()) // 3단계: 모두 완료 시, BATCH_LAST_EXECUTION 마지막 성공일자 업데이트 + .build(); + } + @Override protected ItemReader createReader() { return riskDataRangeReader; @@ -91,5 +108,46 @@ public class RiskImportRangeJobConfig extends BaseJobConfig public Step riskRangeImportStep() { return step(); } + /** + * 2단계: Current Risk 업데이트 + */ + @Bean + public Tasklet currentRiskUpsertTasklet() { + return (contribution, chunkContext) -> { + log.info(">>>>> Risk Upsert 프로시저 호출 시작"); + // PostgreSQL 기준 프로시저 호출 (CALL) + jdbcTemplate.execute("CALL new_snp.upsert_current_risk()"); + + log.info(">>>>> Risk Upsert 프로시저 호출 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "CurrentRiskUpdateStep") + public Step currentRiskUpdateStep() { + return new StepBuilder("CurrentRiskUpdateStep", jobRepository) + .tasklet(currentRiskUpsertTasklet(), transactionManager) + .build(); + } + + /** + * 3단계: 모든 스텝 성공 시 배치 실행 로그(날짜) 업데이트 + */ + @Bean + public Tasklet riskLastExecutionUpdateTasklet() { + return (contribution, chunkContext) -> { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "RiskLastExecutionUpdateStep") + public Step riskLastExecutionUpdateStep() { + return new StepBuilder("RiskLastExecutionUpdateStep", jobRepository) + .tasklet(riskLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java index 37b5550..cedf890 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java @@ -55,7 +55,6 @@ public class RiskDataRangeReader extends BaseApiReader { protected List fetchNextBatch() throws Exception { // 모든 배치 처리 완료 확인 if (allData == null) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), fromDate, toDate); allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDataWriter.java b/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDataWriter.java index eb642fa..a1360be 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDataWriter.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDataWriter.java @@ -3,31 +3,21 @@ package com.snp.batch.jobs.risk.batch.writer; import com.snp.batch.common.batch.writer.BaseWriter; import com.snp.batch.jobs.risk.batch.entity.RiskEntity; import com.snp.batch.jobs.risk.batch.repository.RiskRepository; -import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.time.LocalDate; import java.util.List; @Slf4j @Component public class RiskDataWriter extends BaseWriter { - private final RiskRepository riskRepository; - private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 - protected String getApiKey() {return "RISK_IMPORT_API";} - public RiskDataWriter(RiskRepository riskRepository, BatchDateService batchDateService) { + public RiskDataWriter(RiskRepository riskRepository) { super("riskRepository"); this.riskRepository = riskRepository; - this.batchDateService = batchDateService; } - @Override protected void writeItems(List items) throws Exception { riskRepository.saveRiskAll(items); - LocalDate successDate = LocalDate.now(); // 현재 배치 실행 시점의 날짜 (Reader의 toDay와 동일한 값) - batchDateService.updateLastSuccessDate(getApiKey(), successDate); - log.info("batch_last_execution update 완료 : {}", getApiKey()); } }