From e1fa48768e4d616b6ca32f3a844feaf791a9a90c Mon Sep 17 00:00:00 2001 From: hyojin kim Date: Thu, 8 Jan 2026 15:12:06 +0900 Subject: [PATCH] =?UTF-8?q?:boom:=20API=20=EC=A1=B0=ED=9A=8C=20=EA=B8=B0?= =?UTF-8?q?=EA=B0=84=20=EC=84=B8=ED=8C=85=20=EB=B0=A9=EC=8B=9D=20=EB=B3=80?= =?UTF-8?q?=EA=B2=BD=20=EB=B0=8F=20=ED=86=B5=EC=9D=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../global/model/BatchLastExecution.java | 12 +- .../projection/DateRangeProjection.java | 9 ++ .../BatchLastExecutionRepository.java | 15 +- .../ComplianceImportRangeJobConfig.java | 35 +++-- .../reader/ComplianceDataRangeReader.java | 2 +- .../batch/config/EventImportJobConfig.java | 8 +- .../event/batch/reader/EventDataReader.java | 15 +- .../config/AnchorageCallsRangeJobConfig.java | 88 +++++++----- .../batch/config/BerthCallsRangJobConfig.java | 85 ++++++----- .../config/CurrentlyAtRangeJobConfig.java | 86 ++++++----- .../config/DestinationsRangeJobConfig.java | 88 +++++++----- .../config/ShipPortCallsRangeJobConfig.java | 84 +++++++---- .../config/StsOperationRangeJobConfig.java | 80 +++++++---- .../config/TerminalCallsRangeJobConfig.java | 86 ++++++----- .../batch/config/TransitsRangeJobConfig.java | 83 +++++++---- .../reader/AnchorageCallsRangeReader.java | 71 +++------- .../batch/reader/BerthCallsRangeReader.java | 71 +++------- .../batch/reader/CurrentlyAtRangeReader.java | 74 +++------- .../batch/reader/DestinationRangeReader.java | 85 +++-------- .../batch/reader/PortCallsRangeReader.java | 84 +++-------- .../batch/reader/StsOperationRangeReader.java | 81 +++-------- .../reader/TerminalCallsRangeReader.java | 81 +++-------- .../batch/reader/TransitsRangeReader.java | 81 +++-------- .../batch/config/PscInspectionJobConfig.java | 74 +++++----- .../batch/reader/PscApiReader.java | 69 ++++----- .../batch/reader/RiskDataRangeReader.java | 3 +- .../config/ShipDetailUpdateJobConfig.java | 65 +++++---- .../reader/ShipDetailUpdateDataReader.java | 15 +- .../snp/batch/service/BatchDateService.java | 133 +++++++++--------- 29 files changed, 827 insertions(+), 936 deletions(-) create mode 100644 src/main/java/com/snp/batch/global/projection/DateRangeProjection.java diff --git a/src/main/java/com/snp/batch/global/model/BatchLastExecution.java b/src/main/java/com/snp/batch/global/model/BatchLastExecution.java index 6158776..d75b49b 100644 --- a/src/main/java/com/snp/batch/global/model/BatchLastExecution.java +++ b/src/main/java/com/snp/batch/global/model/BatchLastExecution.java @@ -9,6 +9,8 @@ import org.springframework.data.jpa.domain.support.AuditingEntityListener; import jakarta.persistence.*; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.OffsetDateTime; + @Entity @Getter @Setter @@ -21,7 +23,13 @@ public class BatchLastExecution { private String apiKey; @Column(name = "LAST_SUCCESS_DATE", nullable = false) - private LocalDate lastSuccessDate; + private LocalDateTime lastSuccessDate; + + @Column(name = "RANGE_FROM_DATE", nullable = true) + private LocalDateTime rangeFromDate; + + @Column(name = "RANGE_TO_DATE", nullable = true) + private LocalDateTime rangeToDate; @CreatedDate @Column(name = "CREATED_AT", updatable = false, nullable = false) @@ -31,7 +39,7 @@ public class BatchLastExecution { @Column(name = "UPDATED_AT", nullable = false) private LocalDateTime updatedAt; - public BatchLastExecution(String apiKey, LocalDate lastSuccessDate) { + public BatchLastExecution(String apiKey, LocalDateTime lastSuccessDate) { this.apiKey = apiKey; this.lastSuccessDate = lastSuccessDate; } diff --git a/src/main/java/com/snp/batch/global/projection/DateRangeProjection.java b/src/main/java/com/snp/batch/global/projection/DateRangeProjection.java new file mode 100644 index 0000000..a4f5412 --- /dev/null +++ b/src/main/java/com/snp/batch/global/projection/DateRangeProjection.java @@ -0,0 +1,9 @@ +package com.snp.batch.global.projection; + +import java.time.LocalDateTime; + +public interface DateRangeProjection { + LocalDateTime getLastSuccessDate(); + LocalDateTime getRangeFromDate(); + LocalDateTime getRangeToDate(); +} \ No newline at end of file diff --git a/src/main/java/com/snp/batch/global/repository/BatchLastExecutionRepository.java b/src/main/java/com/snp/batch/global/repository/BatchLastExecutionRepository.java index d62e7d3..d751f2d 100644 --- a/src/main/java/com/snp/batch/global/repository/BatchLastExecutionRepository.java +++ b/src/main/java/com/snp/batch/global/repository/BatchLastExecutionRepository.java @@ -1,6 +1,7 @@ package com.snp.batch.global.repository; import com.snp.batch.global.model.BatchLastExecution; +import com.snp.batch.global.projection.DateRangeProjection; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; @@ -8,6 +9,8 @@ import org.springframework.data.repository.query.Param; import org.springframework.stereotype.Repository; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.util.Optional; @Repository @@ -21,8 +24,16 @@ public interface BatchLastExecutionRepository extends JpaRepository findLastSuccessDate(@Param("apiKey") String apiKey); + // 2. findDateRangeByApiKey 함수 구현 + /** + * API 키를 기준으로 범위 설정 날짜를 조회합니다. + * @param apiKey 조회할 API 키 (예: "PSC_IMPORT_API") + * @return 마지막 성공 일자 (LocalDate)를 포함하는 Optional + */ + @Query("SELECT b.lastSuccessDate AS lastSuccessDate, b.rangeFromDate AS rangeFromDate, b.rangeToDate AS rangeToDate FROM BatchLastExecution b WHERE b.apiKey = :apiKey") + Optional findDateRangeByApiKey(@Param("apiKey") String apiKey); - // 2. updateLastSuccessDate 함수 구현 (직접 UPDATE 쿼리 사용) + // 3. updateLastSuccessDate 함수 구현 (직접 UPDATE 쿼리 사용) /** * 특정 API 키의 마지막 성공 일자를 업데이트합니다. * @@ -32,5 +43,5 @@ public interface BatchLastExecutionRepository extends JpaRepository { log.info(">>>>> Compliance History Value Change Manage 프로시저 호출 시작"); - // 1. 입력 포맷 및 타겟 포맷 정의 - DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SS'Z'"); + // 1. 입력 포맷(UTC 'Z' 포함) 및 프로시저용 타겟 포맷 정의 + DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSX"); DateTimeFormatter targetFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); - Map params = batchDateService.getRiskComplianceApiDateParams(getApiKey()); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); - // 2. String -> LocalDateTime (또는 OffsetDateTime) -> String 변환 String rawFromDate = params.get("fromDate"); String rawToDate = params.get("toDate"); - String startDt = LocalDateTime.parse(rawFromDate, inputFormatter).format(targetFormatter); - String endDt = LocalDateTime.parse(rawToDate, inputFormatter).format(targetFormatter); + // 2. UTC 문자열 -> OffsetDateTime -> Asia/Seoul 변환 -> LocalDateTime 추출 + String startDt = convertToKstString(rawFromDate, inputFormatter, targetFormatter); + String endDt = convertToKstString(rawToDate, inputFormatter, targetFormatter); - log.info("Compliance History Value Change Manage 프로시저 변수 : 시작일: {}, 종료일: {}", startDt, endDt); + log.info("Compliance History Value Change Manage 프로시저 변수 (KST 변환): 시작일: {}, 종료일: {}", startDt, endDt); - // 3. 프로시저 호출 (SQL 인젝션 방지를 위해 Prepared Statement 방식 권장) - jdbcTemplate.execute(String.format("CALL new_snp.compliance_history_value_change_manage('%s', '%s')", startDt, endDt)); + // 3. 프로시저 호출 (안전한 파라미터 바인딩 권장) + jdbcTemplate.update("CALL new_snp.compliance_history_value_change_manage(CAST(? AS TIMESTAMP), CAST(? AS TIMESTAMP))", startDt, endDt); log.info(">>>>> Compliance History Value Change Manage 프로시저 호출 완료"); return RepeatStatus.FINISHED; }; } + /** + * UTC 문자열을 한국 시간(KST) 문자열로 변환하는 헬퍼 메소드 + */ + private String convertToKstString(String rawDate, DateTimeFormatter input, DateTimeFormatter target) { + if (rawDate == null) return null; + + // 1. 문자열을 OffsetDateTime으로 파싱 (Z를 인식하여 UTC 시간으로 인지함) + return OffsetDateTime.parse(rawDate, input) + // 2. 시간대를 서울(+09:00)로 변경 (값이 9시간 더해짐) + .atZoneSameInstant(ZoneId.of("Asia/Seoul")) + // 3. 프로시저 형식에 맞게 포맷팅 + .format(target); + } @Bean(name = "ComplianceHistoryValueChangeManageStep") public Step complianceHistoryValueChangeManageStep() { return new StepBuilder("ComplianceHistoryValueChangeManageStep", jobRepository) diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java b/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java index 6a989cb..a98e398 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java @@ -103,7 +103,7 @@ public class ComplianceDataRangeReader extends BaseApiReader { } private List callApiWithBatch() { - Map params = batchDateService.getRiskComplianceApiDateParams(getApiKey()); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); String url = getApiPath(); diff --git a/src/main/java/com/snp/batch/jobs/event/batch/config/EventImportJobConfig.java b/src/main/java/com/snp/batch/jobs/event/batch/config/EventImportJobConfig.java index 06febda..1253064 100644 --- a/src/main/java/com/snp/batch/jobs/event/batch/config/EventImportJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/event/batch/config/EventImportJobConfig.java @@ -62,12 +62,12 @@ public class EventImportJobConfig extends BaseMultiStepJobConfig createWriter() { return eventDataWriter; } - @Bean(name = "eventImportJob") + @Bean(name = "EventImportJob") public Job eventImportJob() { return job(); } - @Bean(name = "eventImportStep") + @Bean(name = "EventImportStep") public Step eventImportStep() { return step(); } diff --git a/src/main/java/com/snp/batch/jobs/event/batch/reader/EventDataReader.java b/src/main/java/com/snp/batch/jobs/event/batch/reader/EventDataReader.java index c1e0cfd..78cc602 100644 --- a/src/main/java/com/snp/batch/jobs/event/batch/reader/EventDataReader.java +++ b/src/main/java/com/snp/batch/jobs/event/batch/reader/EventDataReader.java @@ -2,15 +2,11 @@ package com.snp.batch.jobs.event.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.event.batch.dto.*; -import com.snp.batch.jobs.event.batch.dto.EventDetailDto; -import com.snp.batch.jobs.event.batch.entity.EventDetailEntity; -import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailComparisonData; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.reactive.function.client.WebClient; -import java.time.LocalDate; import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; @@ -193,10 +189,15 @@ public class EventDataReader extends BaseApiReader { } private EventResponse callEventApiWithBatch() { - Map params = batchDateService.getShipUpdateApiDateParams(getApiKey()); + Map params = batchDateService.getDateRangeWithoutTimeParams(getApiKey()); String url = getApiPath(); - log.info("[{}] API 호출: {}", getReaderName(), url); + + log.info("[{}] Events API Date Range: {} → {}", + getReaderName(), + String.format("%s-%s-%s",params.get("fromYear"),params.get("fromMonth"),params.get("fromDay")), + String.format("%s-%s-%s",params.get("toYear"),params.get("toMonth"),params.get("toDay")) + ); return webClient.get() .uri(url, uriBuilder -> uriBuilder @@ -215,8 +216,6 @@ public class EventDataReader extends BaseApiReader { private EventDetailResponse callEventDetailApiWithBatch(Long eventId) { String url = getEventDetailApiPath(); - log.info("[{}] API 호출: {}", getReaderName(), url); - return webClient.get() .uri(url, uriBuilder -> uriBuilder // 맵에서 파라미터 값을 동적으로 가져와 세팅 diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java index 4872fd9..017aaaf 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java @@ -1,64 +1,63 @@ package com.snp.batch.jobs.movement.batch.config; -import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto; import com.snp.batch.jobs.movement.batch.entity.AnchorageCallsEntity; import com.snp.batch.jobs.movement.batch.processor.AnchorageCallsProcessor; -import com.snp.batch.jobs.movement.batch.writer.AnchorageCallsWriter; import com.snp.batch.jobs.movement.batch.reader.AnchorageCallsRangeReader; +import com.snp.batch.jobs.movement.batch.writer.AnchorageCallsWriter; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * AnchorageCallsReader (ship_data → Maritime API) - * ↓ (AnchorageCallsDto) - * AnchorageCallsProcessor - * ↓ (AnchorageCallsEntity) - * AnchorageCallsWriter - * ↓ (t_anchoragecall 테이블) - */ - @Slf4j @Configuration -public class AnchorageCallsRangeJobConfig extends BaseJobConfig { +public class AnchorageCallsRangeJobConfig extends BaseMultiStepJobConfig { private final AnchorageCallsProcessor anchorageCallsProcessor; private final AnchorageCallsWriter anchorageCallsWriter; private final AnchorageCallsRangeReader anchorageCallsRangeReader; + private final WebClient maritimeServiceApiWebClient; + private final JdbcTemplate jdbcTemplate; + private final BatchDateService batchDateService; + protected String getApiKey() {return "ANCHORAGE_CALLS_IMPORT_API";} + protected String getBatchUpdateSql() { + return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());} + public AnchorageCallsRangeJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, AnchorageCallsProcessor anchorageCallsProcessor, AnchorageCallsWriter anchorageCallsWriter, - AnchorageCallsRangeReader anchorageCallsRangeReader - ) { // ObjectMapper 주입 추가 + AnchorageCallsRangeReader anchorageCallsRangeReader, + @Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient, + JdbcTemplate jdbcTemplate, + BatchDateService batchDateService + ) { super(jobRepository, transactionManager); this.anchorageCallsProcessor = anchorageCallsProcessor; this.anchorageCallsWriter = anchorageCallsWriter; this.anchorageCallsRangeReader = anchorageCallsRangeReader; + this.maritimeServiceApiWebClient = maritimeServiceApiWebClient; + this.jdbcTemplate = jdbcTemplate; + this.batchDateService = batchDateService; } @Override @@ -71,6 +70,14 @@ public class AnchorageCallsRangeJobConfig extends BaseJobConfig createReader() { // 타입 변경 return anchorageCallsRangeReader; @@ -78,12 +85,8 @@ public class AnchorageCallsRangeJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "AnchorageCallsLastExecutionUpdateStep") + public Step anchorageCallsLastExecutionUpdateStep() { + return new StepBuilder("AnchorageCallsLastExecutionUpdateStep", jobRepository) + .tasklet(anchorageCallsLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/BerthCallsRangJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/BerthCallsRangJobConfig.java index 2c660db..81a8cb7 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/BerthCallsRangJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/BerthCallsRangJobConfig.java @@ -1,72 +1,62 @@ package com.snp.batch.jobs.movement.batch.config; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.movement.batch.dto.BerthCallsDto; import com.snp.batch.jobs.movement.batch.entiity.BerthCallsEntity; import com.snp.batch.jobs.movement.batch.processor.BerthCallsProcessor; import com.snp.batch.jobs.movement.batch.reader.BerthCallsRangeReader; import com.snp.batch.jobs.movement.batch.writer.BerthCallsWriter; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * ShipMovementReader (ship_data → Maritime API) - * ↓ (PortCallDto) - * ShipMovementProcessor - * ↓ (ShipMovementEntity) - * ShipDetailDataWriter - * ↓ (ship_movement 테이블) - */ - @Slf4j @Configuration -public class BerthCallsRangJobConfig extends BaseJobConfig { +public class BerthCallsRangJobConfig extends BaseMultiStepJobConfig { private final BerthCallsProcessor berthCallsProcessor; private final BerthCallsWriter berthCallsWriter; private final BerthCallsRangeReader berthCallsRangeReader; - private final JdbcTemplate jdbcTemplate; private final WebClient maritimeApiWebClient; - private final ObjectMapper objectMapper; // ObjectMapper 주입 추가 + private final JdbcTemplate jdbcTemplate; + private final BatchDateService batchDateService; + protected String getApiKey() {return "BERTH_CALLS_IMPORT_API";} + protected String getBatchUpdateSql() { + return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());} public BerthCallsRangJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, BerthCallsProcessor berthCallsProcessor, - BerthCallsWriter berthCallsWriter, BerthCallsRangeReader berthCallsRangeReader, JdbcTemplate jdbcTemplate, + BerthCallsWriter berthCallsWriter, + BerthCallsRangeReader berthCallsRangeReader, @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, - ObjectMapper objectMapper) { // ObjectMapper 주입 추가 + JdbcTemplate jdbcTemplate, + BatchDateService batchDateService + ) { super(jobRepository, transactionManager); this.berthCallsProcessor = berthCallsProcessor; this.berthCallsWriter = berthCallsWriter; this.berthCallsRangeReader = berthCallsRangeReader; - this.jdbcTemplate = jdbcTemplate; this.maritimeApiWebClient = maritimeApiWebClient; - this.objectMapper = objectMapper; // ObjectMapper 초기화 + this.jdbcTemplate = jdbcTemplate; + this.batchDateService = batchDateService; } @Override @@ -79,17 +69,22 @@ public class BerthCallsRangJobConfig extends BaseJobConfig createReader() { // 타입 변경 return berthCallsRangeReader; } @Bean @StepScope - public BerthCallsRangeReader berthCallsRangeReader( - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate - ) { - return new BerthCallsRangeReader(maritimeApiWebClient, startDate, stopDate); + public BerthCallsRangeReader berthCallsRangeReader() { + return new BerthCallsRangeReader(maritimeApiWebClient, batchDateService); } @Override protected ItemProcessor createProcessor() { @@ -103,7 +98,7 @@ public class BerthCallsRangJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "BerthCallsLastExecutionUpdateStep") + public Step berthCallsLastExecutionUpdateStep() { + return new StepBuilder("BerthCallsLastExecutionUpdateStep", jobRepository) + .tasklet(berthCallsLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/CurrentlyAtRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/CurrentlyAtRangeJobConfig.java index 39c04f2..d897615 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/CurrentlyAtRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/CurrentlyAtRangeJobConfig.java @@ -1,68 +1,62 @@ package com.snp.batch.jobs.movement.batch.config; -import com.snp.batch.common.batch.config.BaseJobConfig; -import com.snp.batch.jobs.movement.batch.reader.CurrentlyAtRangeReader; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.movement.batch.dto.CurrentlyAtDto; import com.snp.batch.jobs.movement.batch.entity.CurrentlyAtEntity; import com.snp.batch.jobs.movement.batch.processor.CurrentlyAtProcessor; +import com.snp.batch.jobs.movement.batch.reader.CurrentlyAtRangeReader; import com.snp.batch.jobs.movement.batch.writer.CurrentlyAtWriter; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * CurrentlyAtReader (ship_data → Maritime API) - * ↓ (CurrentlyAtDto) - * CurrentlyAtProcessor - * ↓ (CurrentlyAtEntity) - * CurrentlyAtWriter - * ↓ (currentlyat 테이블) - */ - @Slf4j @Configuration -public class CurrentlyAtRangeJobConfig extends BaseJobConfig { +public class CurrentlyAtRangeJobConfig extends BaseMultiStepJobConfig { private final CurrentlyAtProcessor currentlyAtProcessor; private final CurrentlyAtWriter currentlyAtWriter; private final CurrentlyAtRangeReader currentlyAtRangeReader; - private final JdbcTemplate jdbcTemplate; private final WebClient maritimeApiWebClient; + private final JdbcTemplate jdbcTemplate; + private final BatchDateService batchDateService; + protected String getApiKey() {return "CURRENTLY_AT_IMPORT_API";} + protected String getBatchUpdateSql() { + return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());} public CurrentlyAtRangeJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, CurrentlyAtProcessor currentlyAtProcessor, - CurrentlyAtWriter currentlyAtWriter, CurrentlyAtRangeReader currentlyAtRangeReader, JdbcTemplate jdbcTemplate, - @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + CurrentlyAtWriter currentlyAtWriter, + CurrentlyAtRangeReader currentlyAtRangeReader, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, + JdbcTemplate jdbcTemplate, + BatchDateService batchDateService + ) { // ObjectMapper 주입 추가 super(jobRepository, transactionManager); this.currentlyAtProcessor = currentlyAtProcessor; this.currentlyAtWriter = currentlyAtWriter; this.currentlyAtRangeReader = currentlyAtRangeReader; - this.jdbcTemplate = jdbcTemplate; this.maritimeApiWebClient = maritimeApiWebClient; + this.jdbcTemplate = jdbcTemplate; + this.batchDateService = batchDateService; } @Override @@ -72,7 +66,15 @@ public class CurrentlyAtRangeJobConfig extends BaseJobConfig createProcessor() { @@ -112,4 +110,24 @@ public class CurrentlyAtRangeJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "CurrentlyAtLastExecutionUpdateStep") + public Step currentlyAtLastExecutionUpdateStep() { + return new StepBuilder("CurrentlyAtLastExecutionUpdateStep", jobRepository) + .tasklet(currentlyAtLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java index 56de7d7..3e21812 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java @@ -1,65 +1,63 @@ package com.snp.batch.jobs.movement.batch.config; -import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.movement.batch.dto.DestinationDto; import com.snp.batch.jobs.movement.batch.entity.DestinationEntity; import com.snp.batch.jobs.movement.batch.processor.DestinationProcessor; -import com.snp.batch.jobs.movement.batch.writer.DestinationWriter; import com.snp.batch.jobs.movement.batch.reader.DestinationRangeReader; +import com.snp.batch.jobs.movement.batch.writer.DestinationWriter; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * DestinationReader (ship_data → Maritime API) - * ↓ (DestinationDto) - * DestinationProcessor - * ↓ (DestinationEntity) - * DestinationProcessor - * ↓ (t_destination 테이블) - */ - @Slf4j @Configuration -public class DestinationsRangeJobConfig extends BaseJobConfig { +public class DestinationsRangeJobConfig extends BaseMultiStepJobConfig { - private final com.snp.batch.jobs.movement.batch.processor.DestinationProcessor DestinationProcessor; - private final com.snp.batch.jobs.movement.batch.writer.DestinationWriter DestinationWriter; + private final DestinationProcessor DestinationProcessor; + private final DestinationWriter DestinationWriter; private final DestinationRangeReader destinationRangeReader; private final WebClient maritimeApiWebClient; + private final JdbcTemplate jdbcTemplate; + private final BatchDateService batchDateService; + protected String getApiKey() {return "DESTINATIONS_IMPORT_API";} + protected String getBatchUpdateSql() { + return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());} + public DestinationsRangeJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, DestinationProcessor DestinationProcessor, - DestinationWriter DestinationWriter, DestinationRangeReader destinationRangeReader, - @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + DestinationWriter DestinationWriter, + DestinationRangeReader destinationRangeReader, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, + JdbcTemplate jdbcTemplate, + BatchDateService batchDateService + ) { // ObjectMapper 주입 추가 super(jobRepository, transactionManager); this.DestinationProcessor = DestinationProcessor; this.DestinationWriter = DestinationWriter; this.destinationRangeReader = destinationRangeReader; this.maritimeApiWebClient = maritimeApiWebClient; + this.jdbcTemplate = jdbcTemplate; + this.batchDateService = batchDateService; } @Override @@ -72,18 +70,22 @@ public class DestinationsRangeJobConfig extends BaseJobConfig createReader() { // 타입 변경 return destinationRangeReader; } @Bean @StepScope - public DestinationRangeReader destinationRangeReader( - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate - ) { - // jobParameters 없으면 null 넘어오고 Reader에서 default 처리 - return new DestinationRangeReader(maritimeApiWebClient, startDate, stopDate); + public DestinationRangeReader destinationRangeReader() { + return new DestinationRangeReader(maritimeApiWebClient, batchDateService); } @Override protected ItemProcessor createProcessor() { @@ -109,4 +111,24 @@ public class DestinationsRangeJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "DestinationsLastExecutionUpdateStep") + public Step destinationsLastExecutionUpdateStep() { + return new StepBuilder("DestinationsLastExecutionUpdateStep", jobRepository) + .tasklet(destinationsLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/ShipPortCallsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/ShipPortCallsRangeJobConfig.java index 1895259..b8a4078 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/ShipPortCallsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/ShipPortCallsRangeJobConfig.java @@ -1,65 +1,62 @@ package com.snp.batch.jobs.movement.batch.config; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.movement.batch.dto.PortCallsDto; import com.snp.batch.jobs.movement.batch.entity.PortCallsEntity; import com.snp.batch.jobs.movement.batch.processor.PortCallsProcessor; import com.snp.batch.jobs.movement.batch.reader.PortCallsRangeReader; import com.snp.batch.jobs.movement.batch.writer.PortCallsWriter; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * PortCallsReader (ship_data → Maritime API) - * ↓ (PortCallDto) - * PortCallsProcessor - * ↓ (PortCallsEntity) - * ShipDetailDataWriter - * ↓ (ship_movement 테이블) - */ - @Slf4j @Configuration -public class ShipPortCallsRangeJobConfig extends BaseJobConfig { +public class ShipPortCallsRangeJobConfig extends BaseMultiStepJobConfig { private final PortCallsProcessor portCallsProcessor; private final PortCallsWriter portCallsWriter; private final PortCallsRangeReader portCallsRangeReader; + private final WebClient maritimeApiWebClient; + private final JdbcTemplate jdbcTemplate; + private final BatchDateService batchDateService; + protected String getApiKey() {return "PORT_CALLS_IMPORT_API";} + protected String getBatchUpdateSql() { + return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());} + public ShipPortCallsRangeJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, PortCallsProcessor portCallsProcessor, - PortCallsWriter portCallsWriter, JdbcTemplate jdbcTemplate, + PortCallsWriter portCallsWriter, + PortCallsRangeReader portCallsRangeReader, @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, - ObjectMapper objectMapper, PortCallsRangeReader portCallsRangeReader) { // ObjectMapper 주입 추가 + JdbcTemplate jdbcTemplate, + BatchDateService batchDateService + ) { super(jobRepository, transactionManager); this.portCallsProcessor = portCallsProcessor; this.portCallsWriter = portCallsWriter; this.portCallsRangeReader = portCallsRangeReader; + this.maritimeApiWebClient = maritimeApiWebClient; + this.jdbcTemplate = jdbcTemplate; + this.batchDateService = batchDateService; } @Override @@ -74,13 +71,18 @@ public class ShipPortCallsRangeJobConfig extends BaseJobConfig createReader() { // 타입 변경 return portCallsRangeReader; @@ -110,4 +112,24 @@ public class ShipPortCallsRangeJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "PortCallsLastExecutionUpdateStep") + public Step portCallsLastExecutionUpdateStep() { + return new StepBuilder("PortCallsLastExecutionUpdateStep", jobRepository) + .tasklet(portCallsLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/StsOperationRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/StsOperationRangeJobConfig.java index fb8bb71..aab5ca1 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/StsOperationRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/StsOperationRangeJobConfig.java @@ -1,19 +1,25 @@ package com.snp.batch.jobs.movement.batch.config; import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.movement.batch.dto.StsOperationDto; import com.snp.batch.jobs.movement.batch.entity.StsOperationEntity; import com.snp.batch.jobs.movement.batch.processor.StsOperationProcessor; import com.snp.batch.jobs.movement.batch.reader.StsOperationRangeReader; import com.snp.batch.jobs.movement.batch.writer.StsOperationWriter; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -22,47 +28,38 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * StsOperationReader (ship_data → Maritime API) - * ↓ (StsOperationDto) - * StsOperationProcessor - * ↓ (StsOperationEntity) - * StsOperationWriter - * ↓ (t_stsoperation 테이블) - */ - @Slf4j @Configuration -public class StsOperationRangeJobConfig extends BaseJobConfig { +public class StsOperationRangeJobConfig extends BaseMultiStepJobConfig { private final StsOperationProcessor stsOperationProcessor; private final StsOperationWriter stsOperationWriter; private final StsOperationRangeReader stsOperationRangeReader; - private final JdbcTemplate jdbcTemplate; private final WebClient maritimeApiWebClient; + private final JdbcTemplate jdbcTemplate; + private final BatchDateService batchDateService; + protected String getApiKey() {return "STS_OPERATION_IMPORT_API";} + protected String getBatchUpdateSql() { + return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());} + public StsOperationRangeJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, StsOperationProcessor stsOperationProcessor, - StsOperationWriter stsOperationWriter, StsOperationRangeReader stsOperationRangeReader, JdbcTemplate jdbcTemplate, - @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + StsOperationWriter stsOperationWriter, + StsOperationRangeReader stsOperationRangeReader, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, + JdbcTemplate jdbcTemplate, + BatchDateService batchDateService + ) { // ObjectMapper 주입 추가 super(jobRepository, transactionManager); this.stsOperationProcessor = stsOperationProcessor; this.stsOperationWriter = stsOperationWriter; this.stsOperationRangeReader = stsOperationRangeReader; - this.jdbcTemplate = jdbcTemplate; this.maritimeApiWebClient = maritimeApiWebClient; + this.jdbcTemplate = jdbcTemplate; + this.batchDateService = batchDateService; } @Override @@ -75,6 +72,14 @@ public class StsOperationRangeJobConfig extends BaseJobConfig createReader() { // 타입 변경 // Reader 생성자 수정: ObjectMapper를 전달합니다. @@ -82,12 +87,9 @@ public class StsOperationRangeJobConfig extends BaseJobConfig createProcessor() { @@ -113,4 +115,24 @@ public class StsOperationRangeJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "StsOperationLastExecutionUpdateStep") + public Step stsOperationLastExecutionUpdateStep() { + return new StepBuilder("StsOperationLastExecutionUpdateStep", jobRepository) + .tasklet(stsOperationLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/TerminalCallsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/TerminalCallsRangeJobConfig.java index e47709d..bba42be 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/TerminalCallsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/TerminalCallsRangeJobConfig.java @@ -1,68 +1,63 @@ package com.snp.batch.jobs.movement.batch.config; -import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; +import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto; import com.snp.batch.jobs.movement.batch.entity.TerminalCallsEntity; import com.snp.batch.jobs.movement.batch.processor.TerminalCallsProcessor; -import com.snp.batch.jobs.movement.batch.writer.TerminalCallsWriter; -import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto; import com.snp.batch.jobs.movement.batch.reader.TerminalCallsRangeReader; +import com.snp.batch.jobs.movement.batch.writer.TerminalCallsWriter; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * TerminalCallsReader (ship_data → Maritime API) - * ↓ (TerminalCallsDto) - * TerminalCallsProcessor - * ↓ (TerminalCallsEntity) - * TerminalCallsWriter - * ↓ (t_terminalcall 테이블) - */ - @Slf4j @Configuration -public class TerminalCallsRangeJobConfig extends BaseJobConfig { +public class TerminalCallsRangeJobConfig extends BaseMultiStepJobConfig { private final TerminalCallsProcessor terminalCallsProcessor; private final TerminalCallsWriter terminalCallsWriter; private final TerminalCallsRangeReader terminalCallsRangeReader; - private final JdbcTemplate jdbcTemplate; private final WebClient maritimeApiWebClient; + private final JdbcTemplate jdbcTemplate; + private final BatchDateService batchDateService; + protected String getApiKey() {return "TERMINAL_CALLS_IMPORT_API";} + protected String getBatchUpdateSql() { + return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());} + public TerminalCallsRangeJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, TerminalCallsProcessor terminalCallsProcessor, - TerminalCallsWriter terminalCallsWriter, TerminalCallsRangeReader terminalCallsRangeReader, JdbcTemplate jdbcTemplate, - @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + TerminalCallsWriter terminalCallsWriter, + TerminalCallsRangeReader terminalCallsRangeReader, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, + JdbcTemplate jdbcTemplate, + BatchDateService batchDateService + ) { // ObjectMapper 주입 추가 super(jobRepository, transactionManager); this.terminalCallsProcessor = terminalCallsProcessor; this.terminalCallsWriter = terminalCallsWriter; this.terminalCallsRangeReader = terminalCallsRangeReader; - this.jdbcTemplate = jdbcTemplate; this.maritimeApiWebClient = maritimeApiWebClient; + this.jdbcTemplate = jdbcTemplate; + this.batchDateService = batchDateService; } @Override @@ -75,18 +70,23 @@ public class TerminalCallsRangeJobConfig extends BaseJobConfig createReader() { // 타입 변경 return terminalCallsRangeReader; } @Bean @StepScope - public TerminalCallsRangeReader terminalCallsRangeReader( - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate - ) { + public TerminalCallsRangeReader terminalCallsRangeReader() { // jobParameters 없으면 null 넘어오고 Reader에서 default 처리 - return new TerminalCallsRangeReader(maritimeApiWebClient, startDate, stopDate); + return new TerminalCallsRangeReader(maritimeApiWebClient, batchDateService); } @Override protected ItemProcessor createProcessor() { @@ -112,4 +112,24 @@ public class TerminalCallsRangeJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "TerminalCallsLastExecutionUpdateStep") + public Step terminalCallsLastExecutionUpdateStep() { + return new StepBuilder("TerminalCallsLastExecutionUpdateStep", jobRepository) + .tasklet(terminalCallsLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/TransitsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/TransitsRangeJobConfig.java index a5aacb4..8fd22bb 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/TransitsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/TransitsRangeJobConfig.java @@ -1,65 +1,62 @@ package com.snp.batch.jobs.movement.batch.config; -import com.snp.batch.common.batch.config.BaseJobConfig; -import com.snp.batch.jobs.movement.batch.reader.TransitsRangeReader; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.movement.batch.dto.TransitsDto; import com.snp.batch.jobs.movement.batch.entity.TransitsEntity; import com.snp.batch.jobs.movement.batch.processor.TransitsProcessor; +import com.snp.batch.jobs.movement.batch.reader.TransitsRangeReader; import com.snp.batch.jobs.movement.batch.writer.TransitsWriter; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * TransitsReader (ship_data → Maritime API) - * ↓ (TransitsDto) - * TransitsProcessor - * ↓ (TransitsEntity) - * TransitsWriter - * ↓ (t_transit 테이블) - */ - @Slf4j @Configuration -public class TransitsRangeJobConfig extends BaseJobConfig { +public class TransitsRangeJobConfig extends BaseMultiStepJobConfig { private final TransitsProcessor transitsProcessor; private final TransitsWriter transitsWriter; private final TransitsRangeReader transitsRangeReader; private final WebClient maritimeApiWebClient; + private final JdbcTemplate jdbcTemplate; + private final BatchDateService batchDateService; + protected String getApiKey() {return "TRANSITS_IMPORT_API";} + protected String getBatchUpdateSql() { + return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());} public TransitsRangeJobConfig( JobRepository jobRepository, PlatformTransactionManager transactionManager, TransitsProcessor TransitsProcessor, - TransitsWriter transitsWriter, TransitsRangeReader transitsRangeReader, - @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + TransitsWriter transitsWriter, + TransitsRangeReader transitsRangeReader, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, + JdbcTemplate jdbcTemplate, + BatchDateService batchDateService + ) { // ObjectMapper 주입 추가 super(jobRepository, transactionManager); this.transitsProcessor = TransitsProcessor; this.transitsWriter = transitsWriter; this.transitsRangeReader = transitsRangeReader; this.maritimeApiWebClient = maritimeApiWebClient; + this.jdbcTemplate = jdbcTemplate; + this.batchDateService = batchDateService; } @Override @@ -72,18 +69,22 @@ public class TransitsRangeJobConfig extends BaseJobConfig createReader() { // 타입 변경 return transitsRangeReader; } @Bean @StepScope - public TransitsRangeReader transitsRangeReader( - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate - ) { - // jobParameters 없으면 null 넘어오고 Reader에서 default 처리 - return new TransitsRangeReader(maritimeApiWebClient, startDate, stopDate); + public TransitsRangeReader transitsRangeReader() { + return new TransitsRangeReader(maritimeApiWebClient, batchDateService); } @Override protected ItemProcessor createProcessor() { @@ -109,4 +110,24 @@ public class TransitsRangeJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "TransitsLastExecutionUpdateStep") + public Step transitsLastExecutionUpdateStep() { + return new StepBuilder("TransitsLastExecutionUpdateStep", jobRepository) + .tasklet(transitsLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/AnchorageCallsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/AnchorageCallsRangeReader.java index df73716..167e98c 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/AnchorageCallsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/AnchorageCallsRangeReader.java @@ -2,62 +2,29 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.beans.factory.annotation.Value; import org.springframework.web.reactive.function.client.WebClient; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - * - * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - * - * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - * - * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ @Slf4j @StepScope public class AnchorageCallsRangeReader extends BaseApiReader { - + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - private String startDate; - private String stopDate; - public AnchorageCallsRangeReader(WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { + protected String getApiKey() { + return "ANCHORAGE_CALLS_IMPORT_API"; + } + + public AnchorageCallsRangeReader(WebClient webClient, BatchDateService batchDateService) { super(webClient); - - // 날짜가 없으면 전날 하루 기준 - if (startDate == null || startDate.isBlank() || - stopDate == null || stopDate.isBlank()) { - - LocalDate yesterday = LocalDate.now().minusDays(1); - this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - + this.batchDateService = batchDateService; enableChunkMode(); } @@ -82,17 +49,11 @@ public class AnchorageCallsRangeReader extends BaseApiReader return "https://webservices.maritime.spglobal.com"; } - @Override - protected void beforeFetch() { - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - @Override protected List fetchNextBatch() throws Exception { // 1) 처음 호출이면 API 한 번 호출해서 전체 데이터를 가져온다 if (allData == null) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); + allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); @@ -130,12 +91,16 @@ public class AnchorageCallsRangeReader extends BaseApiReader * Query Parameter를 사용한 API 호출 * @return API 응답 */ - private List callApiWithBatch(String startDate, String stopDate) { - String url = getApiPath() + "?startDate=" + startDate +"&stopDate=" + stopDate; - log.info("[{}] API 호출: {}", getReaderName(), url); + private List callApiWithBatch() { + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); return webClient.get() - .uri(url) + .uri(getApiPath(), uriBuilder -> uriBuilder + // 맵에서 파라미터 값을 동적으로 가져와 세팅 + .queryParam("startDate", params.get("fromDate")) + .queryParam("stopDate", params.get("toDate")) + .build()) .retrieve() .bodyToFlux(AnchorageCallsDto.class) .collectList() diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/BerthCallsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/BerthCallsRangeReader.java index d02c47a..539f79a 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/BerthCallsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/BerthCallsRangeReader.java @@ -2,61 +2,28 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.BerthCallsDto; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.beans.factory.annotation.Value; import org.springframework.web.reactive.function.client.WebClient; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - * - * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - * - * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - * - * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ @Slf4j @StepScope public class BerthCallsRangeReader extends BaseApiReader { - + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - private String startDate; - private String stopDate; - - public BerthCallsRangeReader(WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { + protected String getApiKey() { + return "BERTH_CALLS_IMPORT_API"; + } + public BerthCallsRangeReader(WebClient webClient, BatchDateService batchDateService) { super(webClient); - - // 날짜가 없으면 전날 하루 기준 - if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { - LocalDate yesterday = LocalDate.now().minusDays(1); - this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - + this.batchDateService = batchDateService; enableChunkMode(); } @@ -81,17 +48,11 @@ public class BerthCallsRangeReader extends BaseApiReader { return "https://webservices.maritime.spglobal.com"; } - @Override - protected void beforeFetch() { - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - @Override protected List fetchNextBatch() throws Exception { // 1) 처음 호출이면 API 한 번 호출해서 전체 데이터를 가져온다 if (allData == null) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); + allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); @@ -129,14 +90,16 @@ public class BerthCallsRangeReader extends BaseApiReader { * Query Parameter를 사용한 API 호출 * @return API 응답 */ - private List callApiWithBatch(String startDate, String stopDate) { - String url = getApiPath() + "?startDate=" + startDate +"&stopDate=" + stopDate; -// "&lrno=" + lrno; - - log.debug("[{}] API 호출: {}", getReaderName(), url); + private List callApiWithBatch() { + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); return webClient.get() - .uri(url) + .uri(getApiPath(), uriBuilder -> uriBuilder + // 맵에서 파라미터 값을 동적으로 가져와 세팅 + .queryParam("startDate", params.get("fromDate")) + .queryParam("stopDate", params.get("toDate")) + .build()) .retrieve() .bodyToFlux(BerthCallsDto.class) .collectList() diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/CurrentlyAtRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/CurrentlyAtRangeReader.java index baaf074..4f72777 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/CurrentlyAtRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/CurrentlyAtRangeReader.java @@ -2,62 +2,28 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.CurrentlyAtDto; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.beans.factory.annotation.Value; import org.springframework.web.reactive.function.client.WebClient; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - *

- * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - *

- * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - *

- * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ @Slf4j @StepScope public class CurrentlyAtRangeReader extends BaseApiReader { - + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - private String startDate; - private String stopDate; - - public CurrentlyAtRangeReader(WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { - + protected String getApiKey() { + return "CURRENTLY_AT_IMPORT_API"; + } + public CurrentlyAtRangeReader(WebClient webClient, BatchDateService batchDateService) { super(webClient); - - // 날짜가 없으면 전날 하루 기준 - if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { - LocalDate yesterday = LocalDate.now().minusDays(1); - this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - - enableChunkMode(); // ✨ Chunk 모드 활성화 + this.batchDateService = batchDateService; + enableChunkMode(); } @Override @@ -81,19 +47,12 @@ public class CurrentlyAtRangeReader extends BaseApiReader { return "https://webservices.maritime.spglobal.com"; } - @Override - protected void beforeFetch() { - // 전처리 과정 - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - @Override protected List fetchNextBatch() throws Exception { // 모든 배치 처리 완료 확인 if (allData == null ) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); + allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); @@ -130,13 +89,16 @@ public class CurrentlyAtRangeReader extends BaseApiReader { * Query Parameter를 사용한 API 호출 * @return API 응답 */ - private List callApiWithBatch(String startDate, String stopDate) { - String url = getApiPath() + "?dateCreatedUpdatedStart=" + startDate +"&dateCreatedUpdatedStop="+stopDate; - - log.debug("[{}] API 호출: {}", getReaderName(), url); + private List callApiWithBatch() { + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); return webClient.get() - .uri(url) + .uri(getApiPath(), uriBuilder -> uriBuilder + // 맵에서 파라미터 값을 동적으로 가져와 세팅 + .queryParam("startDate", params.get("fromDate")) + .queryParam("stopDate", params.get("toDate")) + .build()) .retrieve() .bodyToFlux(CurrentlyAtDto.class) .collectList() diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/DestinationRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/DestinationRangeReader.java index 0a01368..e70f17f 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/DestinationRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/DestinationRangeReader.java @@ -2,71 +2,33 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.DestinationDto; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.beans.factory.annotation.Value; import org.springframework.web.reactive.function.client.WebClient; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - *

- * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - *

- * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - *

- * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ @Slf4j @StepScope public class DestinationRangeReader extends BaseApiReader { - + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 private List allData; private int currentBatchIndex = 0; private final int batchSize = 1000; - private String startDate; - private String stopDate; - - public DestinationRangeReader(WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { + protected String getApiKey() { + return "DESTINATIONS_IMPORT_API"; + } + public DestinationRangeReader(WebClient webClient, BatchDateService batchDateService) { super(webClient); - // 날짜가 + 한달 기간 도착예정지 정보 update - if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { - LocalDate today = LocalDate.now(); - this.startDate = today - .atStartOfDay() - .format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - - this.stopDate = today - .plusDays(15) - .atStartOfDay() - .format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - - enableChunkMode(); // ✨ Chunk 모드 활성화 + this.batchDateService = batchDateService; + enableChunkMode(); } @Override protected String getReaderName() { - return "DestinationsRange"; + return "DestinationsRangeReader"; } @Override @@ -85,22 +47,11 @@ public class DestinationRangeReader extends BaseApiReader { return "https://webservices.maritime.spglobal.com"; } - /** - * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 - */ - @Override - protected void beforeFetch() { - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - @Override protected List fetchNextBatch() throws Exception { - // 모든 배치 처리 완료 확인 - // 모든 배치 처리 완료 확인 if (allData == null) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); + allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); @@ -136,14 +87,16 @@ public class DestinationRangeReader extends BaseApiReader { * Query Parameter를 사용한 API 호출 * @return API 응답 */ - private List callApiWithBatch(String startDate, String stopDate) { - String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate; -// +"&lrno=" + lrno; - - log.debug("[{}] API 호출: {}", getReaderName(), url); + private List callApiWithBatch() { + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); return webClient.get() - .uri(url) + .uri(getApiPath(), uriBuilder -> uriBuilder + // 맵에서 파라미터 값을 동적으로 가져와 세팅 + .queryParam("startDate", params.get("fromDate")) + .queryParam("stopDate", params.get("toDate")) + .build()) .retrieve() .bodyToFlux(DestinationDto.class) .collectList() diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/PortCallsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/PortCallsRangeReader.java index a4f7fc1..51ec562 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/PortCallsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/PortCallsRangeReader.java @@ -2,63 +2,29 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.PortCallsDto; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.beans.factory.annotation.Value; import org.springframework.web.reactive.function.client.WebClient; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - * - * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - * - * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - * - * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ @Slf4j @StepScope public class PortCallsRangeReader extends BaseApiReader { - + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - private String startDate; - private String stopDate; - public PortCallsRangeReader(WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { - super(webClient); - - // 날짜가 없으면 전날 하루 기준 - if (startDate == null || startDate.isBlank() || - stopDate == null || stopDate.isBlank()) { - - LocalDate yesterday = LocalDate.now().minusDays(1); - this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - - enableChunkMode(); - } + protected String getApiKey() { + return "PORT_CALLS_IMPORT_API"; + } + public PortCallsRangeReader(WebClient webClient, BatchDateService batchDateService) { + super(webClient); + this.batchDateService = batchDateService; + enableChunkMode(); + } @Override protected String getReaderName() { @@ -81,25 +47,12 @@ public class PortCallsRangeReader extends BaseApiReader { return "https://webservices.maritime.spglobal.com"; } - @Override - protected void beforeFetch() { - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - - /** - * ✨ Chunk 기반 핵심 메서드: 다음 배치를 조회하여 반환 - * - * Spring Batch가 batchsize만큼 read() 호출 완료 후 이 메서드 재호출 - * - * @return 다음 배치 (더 이상 없으면 null) - */ @Override protected List fetchNextBatch() throws Exception { // 모든 배치 처리 완료 확인 if (allData == null) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); + allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); @@ -137,12 +90,15 @@ public class PortCallsRangeReader extends BaseApiReader { * Query Parameter를 사용한 API 호출 * @return API 응답 */ - private List callApiWithBatch(String startDate, String stopDate) { - String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate; - log.info("[{}] API 호출: {}", getReaderName(), url); - + private List callApiWithBatch() { + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); return webClient.get() - .uri(url) + .uri(getApiPath(), uriBuilder -> uriBuilder + // 맵에서 파라미터 값을 동적으로 가져와 세팅 + .queryParam("startDate", params.get("fromDate")) + .queryParam("stopDate", params.get("toDate")) + .build()) .retrieve() .bodyToFlux(PortCallsDto.class) .collectList() diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/StsOperationRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/StsOperationRangeReader.java index 6fc067e..9977edc 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/StsOperationRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/StsOperationRangeReader.java @@ -2,6 +2,7 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.StsOperationDto; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.beans.factory.annotation.Value; @@ -10,57 +11,28 @@ import org.springframework.web.reactive.function.client.WebClient; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - * - * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - * - * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - * - * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ @Slf4j @StepScope public class StsOperationRangeReader extends BaseApiReader { - + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - private String startDate; - private String stopDate; + protected String getApiKey() { + return "STS_OPERATION_IMPORT_API"; + } - public StsOperationRangeReader(WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { + public StsOperationRangeReader(WebClient webClient, BatchDateService batchDateService) { super(webClient); - // 날짜가 없으면 전날 하루 기준 - if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { - LocalDate yesterday = LocalDate.now().minusDays(1); - this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - - enableChunkMode(); // ✨ Chunk 모드 활성화 + this.batchDateService = batchDateService; + enableChunkMode(); } @Override protected String getReaderName() { - return "StsOperationReader"; + return "StsOperationRangeReader"; } @Override @@ -79,28 +51,12 @@ public class StsOperationRangeReader extends BaseApiReader { return "https://webservices.maritime.spglobal.com"; } - /** - * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 - */ - @Override - protected void beforeFetch() { - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - - /** - * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 - * - * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 - * - * @return 다음 배치 100건 (더 이상 없으면 null) - */ @Override protected List fetchNextBatch() throws Exception { // 모든 배치 처리 완료 확인 if (allData == null ) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); + allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); @@ -134,17 +90,18 @@ public class StsOperationRangeReader extends BaseApiReader { /** * Query Parameter를 사용한 API 호출 - * - * @param startDate,stopDate * @return API 응답 */ - private List callApiWithBatch(String startDate, String stopDate) { - String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate; - - log.debug("[{}] API 호출: {}", getReaderName(), url); + private List callApiWithBatch() { + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); return webClient.get() - .uri(url) + .uri(getApiPath(), uriBuilder -> uriBuilder + // 맵에서 파라미터 값을 동적으로 가져와 세팅 + .queryParam("startDate", params.get("fromDate")) + .queryParam("stopDate", params.get("toDate")) + .build()) .retrieve() .bodyToFlux(StsOperationDto.class) .collectList() diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/TerminalCallsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/TerminalCallsRangeReader.java index 4ead4c9..ad3ad6a 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/TerminalCallsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/TerminalCallsRangeReader.java @@ -2,65 +2,33 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.beans.factory.annotation.Value; import org.springframework.web.reactive.function.client.WebClient; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - *

- * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - *

- * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - *

- * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ @Slf4j @StepScope public class TerminalCallsRangeReader extends BaseApiReader { - + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 private List allData; private int currentBatchIndex = 0; private final int batchSize = 1000; - private String startDate; - private String stopDate; - - public TerminalCallsRangeReader(WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { + protected String getApiKey() { + return "TERMINAL_CALLS_IMPORT_API"; + } + public TerminalCallsRangeReader(WebClient webClient, BatchDateService batchDateService) { super(webClient); - // 날짜가 없으면 전날 하루 기준 - if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { - LocalDate yesterday = LocalDate.now().minusDays(1); - this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - - enableChunkMode(); // ✨ Chunk 모드 활성화 + this.batchDateService = batchDateService; + enableChunkMode(); } @Override protected String getReaderName() { - return "TerminalCalls"; + return "TerminalCallsRangeReader"; } @Override @@ -79,26 +47,12 @@ public class TerminalCallsRangeReader extends BaseApiReader { return "https://webservices.maritime.spglobal.com"; } - - @Override - protected void beforeFetch() { - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - - /** - * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 - *

- * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 - * - * @return 다음 배치 100건 (더 이상 없으면 null) - */ @Override protected List fetchNextBatch() throws Exception { // 모든 배치 처리 완료 확인 if (allData == null ) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); + allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); @@ -132,15 +86,18 @@ public class TerminalCallsRangeReader extends BaseApiReader { /** * Query Parameter를 사용한 API 호출 - * @param startDate, stopDate * @return API 응답 */ - private List callApiWithBatch(String startDate, String stopDate) { - String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate; - log.debug("[{}] API 호출: {}", getReaderName(), url); + private List callApiWithBatch() { + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); return webClient.get() - .uri(url) + .uri(getApiPath(), uriBuilder -> uriBuilder + // 맵에서 파라미터 값을 동적으로 가져와 세팅 + .queryParam("startDate", params.get("fromDate")) + .queryParam("stopDate", params.get("toDate")) + .build()) .retrieve() .bodyToFlux(TerminalCallsDto.class) .collectList() diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/TransitsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/TransitsRangeReader.java index b4a0a86..a3295a3 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/TransitsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/TransitsRangeReader.java @@ -2,65 +2,33 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.TransitsDto; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.beans.factory.annotation.Value; import org.springframework.web.reactive.function.client.WebClient; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - * - * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - * - * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - * - * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ @Slf4j @StepScope public class TransitsRangeReader extends BaseApiReader { - + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 private List allData; private int currentBatchIndex = 0; private final int batchSize = 1000; - private String startDate; - private String stopDate; - - public TransitsRangeReader(WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { + protected String getApiKey() { + return "TRANSITS_IMPORT_API"; + } + public TransitsRangeReader(WebClient webClient, BatchDateService batchDateService) { super(webClient); - // 날짜가 없으면 전날 하루 기준 - if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { - LocalDate yesterday = LocalDate.now().minusDays(1); - this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - - enableChunkMode(); // ✨ Chunk 모드 활성화 + this.batchDateService = batchDateService; + enableChunkMode(); } @Override protected String getReaderName() { - return "Transits"; + return "TransitsRangeReader"; } @Override @@ -79,25 +47,12 @@ public class TransitsRangeReader extends BaseApiReader { return "https://webservices.maritime.spglobal.com"; } - @Override - protected void beforeFetch() { - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - - /** - * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 - * - * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 - * - * @return 다음 배치 100건 (더 이상 없으면 null) - */ @Override protected List fetchNextBatch() throws Exception { // 모든 배치 처리 완료 확인 if (allData == null ) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); + allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); @@ -134,14 +89,16 @@ public class TransitsRangeReader extends BaseApiReader { * @param startDate,stopDate * @return API 응답 */ - private List callApiWithBatch(String startDate, String stopDate) { - String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate; -// +"&lrno=" + lrno; - - log.debug("[{}] API 호출: {}", getReaderName(), url); + private List callApiWithBatch() { + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); return webClient.get() - .uri(url) + .uri(getApiPath(), uriBuilder -> uriBuilder + // 맵에서 파라미터 값을 동적으로 가져와 세팅 + .queryParam("startDate", params.get("fromDate")) + .queryParam("stopDate", params.get("toDate")) + .build()) .retrieve() .bodyToFlux(TransitsDto.class) .collectList() diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java index b753867..1143d00 100644 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java @@ -1,54 +1,42 @@ package com.snp.batch.jobs.pscInspection.batch.config; -import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.pscInspection.batch.dto.PscInspectionDto; import com.snp.batch.jobs.pscInspection.batch.entity.PscInspectionEntity; import com.snp.batch.jobs.pscInspection.batch.processor.PscInspectionProcessor; import com.snp.batch.jobs.pscInspection.batch.reader.PscApiReader; import com.snp.batch.jobs.pscInspection.batch.writer.PscInspectionWriter; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; -import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.beans.factory.annotation.Value; - -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * ShipMovementReader (ship_data → Maritime API) - * ↓ (PortCallDto) - * ShipMovementProcessor - * ↓ (ShipMovementEntity) - * ShipDetailDataWriter - * ↓ (ship_movement 테이블) - */ @Slf4j @Configuration -public class PscInspectionJobConfig extends BaseJobConfig { +public class PscInspectionJobConfig extends BaseMultiStepJobConfig { private final PscInspectionProcessor pscInspectionProcessor; private final PscInspectionWriter pscInspectionWriter; private final JdbcTemplate jdbcTemplate; private final WebClient maritimeApiWebClient; + private final BatchDateService batchDateService; + protected String getApiKey() {return "PSC_IMPORT_API";} + protected String getBatchUpdateSql() { + return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());} public PscInspectionJobConfig( JobRepository jobRepository, @@ -56,11 +44,13 @@ public class PscInspectionJobConfig extends BaseJobConfig createReader() { - return pscApiReader(null, null, null); + return new PscApiReader(maritimeApiWebClient, jdbcTemplate, batchDateService); } @Override @@ -115,4 +103,24 @@ public class PscInspectionJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "PSCLastExecutionUpdateStep") + public Step pscLastExecutionUpdateStep() { + return new StepBuilder("PSCLastExecutionUpdateStep", jobRepository) + .tasklet(pscLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/reader/PscApiReader.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/reader/PscApiReader.java index d7d3e2d..3ae6a76 100644 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/reader/PscApiReader.java +++ b/src/main/java/com/snp/batch/jobs/pscInspection/batch/reader/PscApiReader.java @@ -4,10 +4,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.pscInspection.batch.dto.PscApiResponseDto; import com.snp.batch.jobs.pscInspection.batch.dto.PscInspectionDto; +import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.reactive.function.client.WebClient; import java.time.LocalDate; @@ -16,35 +18,26 @@ import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.List; +import java.util.Map; @Slf4j @StepScope public class PscApiReader extends BaseApiReader { - private final String startDate; - private final String stopDate; private List allData; private int currentBatchIndex = 0; private final int batchSize = 1000; - public PscApiReader(@Qualifier("maritimeApiWebClient") WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { + private final JdbcTemplate jdbcTemplate; + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + protected String getApiKey() { + return "PSC_IMPORT_API"; + } + public PscApiReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService) { super(webClient); - - // 날짜가 없으면 전날 하루 기준 - if (startDate == null || startDate.isBlank() || - stopDate == null || stopDate.isBlank()) { - - LocalDate yesterday = LocalDate.now().minusDays(1); - this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - - enableChunkMode(); + this.jdbcTemplate = jdbcTemplate; + this.batchDateService = batchDateService; + enableChunkMode(); // ✨ Chunk 모드 활성화 } @Override @@ -63,18 +56,12 @@ public class PscApiReader extends BaseApiReader { return "/MaritimeWCF/PSCService.svc/RESTFul/GetPSCDataByLastUpdateDateRange"; } - @Override - protected void beforeFetch() { - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - @Override protected List fetchNextBatch() { // 1) 처음 호출이면 API 한 번 호출해서 전체 데이터를 가져온다 if (allData == null) { - log.info("[PSC] 최초 API 조회 실행: {} ~ {}", startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); + allData = callApiWithBatch(); if (allData == null || allData.isEmpty()) { log.warn("[PSC] 조회된 데이터 없음 → 종료"); @@ -107,24 +94,28 @@ public class PscApiReader extends BaseApiReader { return batch; } - private List callApiWithBatch(String startDate, String stopDate) { + private List callApiWithBatch() { - LocalDateTime fromDay = parseToDateTime(startDate, true); - LocalDateTime toDay = parseToDateTime(stopDate, false); + Map params = batchDateService.getDateRangeWithoutTimeParams(getApiKey()); - String url = getApiPath() - + "?shipsCategory=0" - + "&fromYear=" + fromDay.getYear() - + "&fromMonth=" + fromDay.getMonthValue() - + "&fromDay=" + fromDay.getDayOfMonth() - + "&toYear=" + toDay.getYear() - + "&toMonth=" + toDay.getMonthValue() - + "&toDay=" + toDay.getDayOfMonth(); + String url = getApiPath(); - log.info("[PSC] API 호출 URL = {}", url); + log.info("[{}] PSC API Date Range: {} → {}", + getReaderName(), + String.format("%s-%s-%s",params.get("fromYear"),params.get("fromMonth"),params.get("fromDay")), + String.format("%s-%s-%s",params.get("toYear"),params.get("toMonth"),params.get("toDay")) + ); String json = webClient.get() - .uri(url) + .uri(url, uriBuilder -> uriBuilder + .queryParam("shipsCategory", params.get("shipsCategory")) + .queryParam("fromYear", params.get("fromYear")) + .queryParam("fromMonth", params.get("fromMonth")) + .queryParam("fromDay", params.get("fromDay")) + .queryParam("toYear", params.get("toYear")) + .queryParam("toMonth", params.get("toMonth")) + .queryParam("toDay", params.get("toDay")) + .build()) .retrieve() .bodyToMono(String.class) .block(); diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java index cedf890..a5250af 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java @@ -101,7 +101,7 @@ public class RiskDataRangeReader extends BaseApiReader { } private List callApiWithBatch() { - Map params = batchDateService.getRiskComplianceApiDateParams(getApiKey()); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); String url = getApiPath(); @@ -116,4 +116,5 @@ public class RiskDataRangeReader extends BaseApiReader { .block(); } + } diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java index 60d2c98..d28006d 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java @@ -1,7 +1,7 @@ package com.snp.batch.jobs.shipdetail.batch.config; import com.fasterxml.jackson.databind.ObjectMapper; -import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.common.batch.config.BaseMultiStepJobConfig; import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailComparisonData; import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailUpdate; import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor; @@ -11,10 +11,14 @@ import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -22,33 +26,9 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * ShipDetailDataReader (ship_data → Maritime API) - * ↓ (ShipDetailDto) - * ShipDetailDataProcessor - * ↓ (ShipDetailEntity) - * ShipDetailDataWriter - * ↓ (ship_detail 테이블) - */ - -/** - * 선박 상세 정보 Import Job Config - * I: ShipDetailComparisonData (Reader 출력) - * O: ShipDetailUpdate (Processor 출력) - */ @Slf4j @Configuration -public class ShipDetailUpdateJobConfig extends BaseJobConfig { +public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { private final ShipDetailDataProcessor shipDetailDataProcessor; private final ShipDetailDataWriter shipDetailDataWriter; @@ -56,6 +36,10 @@ public class ShipDetailUpdateJobConfig extends BaseJobConfig createReader() { // 타입 변경 // Reader 생성자 수정: ObjectMapper를 전달합니다. @@ -115,4 +107,25 @@ public class ShipDetailUpdateJobConfig extends BaseJobConfig { + log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작"); + + jdbcTemplate.execute(getBatchUpdateSql()); + + log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료"); + return RepeatStatus.FINISHED; + }; + } + @Bean(name = "ShipDetailLastExecutionUpdateStep") + public Step shipDetailLastExecutionUpdateStep() { + return new StepBuilder("ShipDetailLastExecutionUpdateStep", jobRepository) + .tasklet(shipDetailLastExecutionUpdateTasklet(), transactionManager) + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailUpdateDataReader.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailUpdateDataReader.java index 5a6f499..53f5458 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailUpdateDataReader.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailUpdateDataReader.java @@ -11,6 +11,7 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.reactive.function.client.WebClient; import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; @@ -252,10 +253,15 @@ public class ShipDetailUpdateDataReader extends BaseApiReader params = batchDateService.getShipUpdateApiDateParams(getApiKey()); + Map params = batchDateService.getDateRangeWithoutTimeParams(getApiKey()); String url = getShipUpdateApiPath(); - log.info("[{}] API 호출: {}", getReaderName(), url); + + log.info("[{}] Ship Detail Update Date Range: {} → {}", + getReaderName(), + String.format("%s-%s-%s",params.get("fromYear"),params.get("fromMonth"),params.get("fromDay")), + String.format("%s-%s-%s",params.get("toYear"),params.get("toMonth"),params.get("toDay")) + ); return webClient.get() .uri(url, uriBuilder -> uriBuilder @@ -291,11 +297,6 @@ public class ShipDetailUpdateDataReader extends BaseApiReader getShipUpdateApiDateParams(String apiKey) { - // 1. 마지막 성공 일자 (FROM 날짜)를 DB에서 조회 - // 조회된 값이 없으면 (최초 실행), API 호출 시점의 하루 전 날짜를 사용합니다. - LocalDate lastDate = repository.findLastSuccessDate(apiKey) - .orElse(LocalDate.now().minusDays(1)); + public Map getDateRangeWithoutTimeParams(String apiKey) { + return repository.findDateRangeByApiKey(apiKey) + .map(projection -> { + Map params = new HashMap<>(); - // 2. 현재 실행 시점의 일자 (TO 날짜) 계산 - LocalDate currentDate = LocalDate.now(); + LocalDateTime fromTarget = (projection.getRangeFromDate() != null) + ? projection.getRangeFromDate() + : projection.getLastSuccessDate(); - // 3. 파라미터 Map 구성 - Map params = new HashMap<>(); + LocalDateTime toTarget = (projection.getRangeToDate() != null) + ? projection.getRangeToDate() + : LocalDateTime.now(); - // FROM Parameters (DB 조회 값) - params.put("fromYear", String.valueOf(lastDate.getYear())); - params.put("fromMonth", String.valueOf(lastDate.getMonthValue())); - params.put("fromDay", String.valueOf(lastDate.getDayOfMonth())); + // 2. 파라미터 맵에 날짜 정보 매핑 + putDateParams(params, "from", fromTarget); + putDateParams(params, "to", toTarget); - // TO Parameters (현재 시점 값) - params.put("toYear", String.valueOf(currentDate.getYear())); - params.put("toMonth", String.valueOf(currentDate.getMonthValue())); - params.put("toDay", String.valueOf(currentDate.getDayOfMonth())); + // 3. 고정 값 설정 + params.put("shipsCategory", "0"); - // 고정 값 - params.put("shipsCategory", "0"); - - return params; - } - - public Map getRiskComplianceApiDateParams(String apiKey) { - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SS'Z'"); - // 1. 마지막 성공 일자 (FROM 날짜)를 DB에서 조회 - // 조회된 값이 없으면 (최초 실행), API 호출 시점의 하루 전 날짜를 사용합니다. - LocalDate lastDate = repository.findLastSuccessDate(apiKey) - .orElse(LocalDate.now().minusDays(1)); - - // 2. 현재 실행 시점의 일자 (TO 날짜) 계산 - ZonedDateTime nowUtc = ZonedDateTime.now(ZoneOffset.UTC); - - // 3. 파라미터 Map 구성 - Map params = new HashMap<>(); - - // FROM Parameters (DB 조회 값) - String fromDateStr = lastDate.atStartOfDay().format(formatter); - params.put("fromDate", fromDateStr); - // TO Parameters (현재 시점 값) - String toDateStr = nowUtc.format(formatter); - params.put("toDate", toDateStr); - - return params; + return params; + }) + .orElseGet(() -> { + log.warn("해당 apiKey에 대한 데이터를 찾을 수 없습니다: {}", apiKey); + return new HashMap<>(); + }); } /** - * 배치 성공 시, 다음 실행을 위해 to 날짜를 DB에 저장 및 업데이트합니다. - * @param successDate API 호출 성공 시 사용된 to 날짜 + * LocalDateTime에서 연, 월, 일을 추출하여 Map에 담는 헬퍼 메소드 */ - @Transactional // UPDATE 쿼리를 사용하므로 트랜잭션 필요 - public void updateLastSuccessDate(String apiKey, LocalDate successDate) { // ✨ apiKey 추가 - - // 1. UPDATE 시도 - int updatedRows = repository.updateLastSuccessDate(apiKey, successDate); - - // 2. 업데이트된 레코드가 없다면 (최초 실행), INSERT 수행 - if (updatedRows == 0) { - BatchLastExecution entity = new BatchLastExecution(apiKey, successDate); - repository.save(entity); + private void putDateParams(Map params, String prefix, LocalDateTime dateTime) { + if (dateTime != null) { + params.put(prefix + "Year", String.valueOf(dateTime.getYear())); + params.put(prefix + "Month", String.valueOf(dateTime.getMonthValue())); + params.put(prefix + "Day", String.valueOf(dateTime.getDayOfMonth())); } } + + public Map getDateRangeWithTimezoneParams(String apiKey) { + return repository.findDateRangeByApiKey(apiKey) + .map(projection -> { + Map params = new HashMap<>(); + // 'Z'를 문자열 리터럴이 아닌 실제 타임존 기호(X)로 처리 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSX"); + + // 한국 시간을 UTC로 변환하는 헬퍼 메소드 (아래 정의) + params.put("fromDate", formatToUtc(projection.getRangeFromDate() != null ? + projection.getRangeFromDate() : projection.getLastSuccessDate(), formatter)); + + LocalDateTime toDateTime = projection.getRangeToDate() != null ? + projection.getRangeToDate() : LocalDateTime.now(); + + params.put("toDate", formatToUtc(toDateTime, formatter)); + + return params; + }) + .orElseGet(() -> { + log.warn("해당 apiKey에 대한 데이터를 찾을 수 없습니다: {}", apiKey); + return new HashMap<>(); + }); + } + + // 한국 시간(LocalDateTime)을 UTC 문자열로 변환하는 로직 + private String formatToUtc(LocalDateTime localDateTime, DateTimeFormatter formatter) { + if (localDateTime == null) return null; + // 1. 한국 시간대(KST)임을 명시 + // 2. UTC로 시간대를 변경 (9시간 빠짐) + // 3. 포맷팅 (끝에 Z가 자동으로 붙음) + return localDateTime.atZone(ZoneId.of("Asia/Seoul")) + .withZoneSameInstant(ZoneOffset.UTC) + .format(formatter); + } + }