From 64a3a55e7801b293b1ec67063ca8197d08c4a704 Mon Sep 17 00:00:00 2001 From: hyojin kim Date: Thu, 15 Jan 2026 15:58:20 +0900 Subject: [PATCH] =?UTF-8?q?:sparkles:=20batch=5Fapi=5Flog=20=EA=B4=80?= =?UTF-8?q?=EB=A6=AC=20=ED=94=84=EB=A1=9C=EC=84=B8=EC=8A=A4=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/batch/reader/BaseApiReader.java | 177 +++++++++++++++++- .../snp/batch/global/config/AsyncConfig.java | 24 +++ .../config/MaritimeApiWebClientConfig.java | 10 - .../snp/batch/global/model/BatchApiLog.java | 46 +++++ .../repository/BatchApiLogRepository.java | 10 + .../ComplianceImportRangeJobConfig.java | 19 +- .../reader/ComplianceDataRangeReader.java | 63 +++---- .../batch/config/EventImportJobConfig.java | 35 +++- .../event/batch/reader/EventDataReader.java | 56 ++---- .../config/AnchorageCallsRangeJobConfig.java | 19 +- .../batch/config/BerthCallsRangJobConfig.java | 19 +- .../config/CurrentlyAtRangeJobConfig.java | 19 +- .../config/DestinationsRangeJobConfig.java | 19 +- .../config/ShipPortCallsRangeJobConfig.java | 19 +- .../config/StsOperationRangeJobConfig.java | 21 ++- .../config/TerminalCallsRangeJobConfig.java | 20 +- .../batch/config/TransitsRangeJobConfig.java | 19 +- .../reader/AnchorageCallsRangeReader.java | 72 +++---- .../batch/reader/BerthCallsRangeReader.java | 68 +++---- .../batch/reader/CurrentlyAtRangeReader.java | 72 +++---- .../batch/reader/DestinationRangeReader.java | 72 +++---- .../batch/reader/PortCallsRangeReader.java | 66 +++---- .../batch/reader/StsOperationRangeReader.java | 65 +++---- .../reader/TerminalCallsRangeReader.java | 70 +++---- .../batch/reader/TransitsRangeReader.java | 64 +++---- .../batch/config/PscInspectionJobConfig.java | 27 ++- .../batch/reader/PscApiReader.java | 98 +++------- .../config/RiskImportRangeJobConfig.java | 19 +- .../batch/reader/RiskDataRangeReader.java | 46 ++--- .../config/ShipDetailUpdateJobConfig.java | 30 ++- .../reader/ShipDetailUpdateDataReader.java | 106 ++++------- .../snp/batch/service/BatchApiLogService.java | 33 ++++ .../snp/batch/service/BatchDateService.java | 24 +++ 33 files changed, 854 insertions(+), 673 deletions(-) create mode 100644 src/main/java/com/snp/batch/global/config/AsyncConfig.java create mode 100644 src/main/java/com/snp/batch/global/model/BatchApiLog.java create mode 100644 src/main/java/com/snp/batch/global/repository/BatchApiLogRepository.java create mode 100644 src/main/java/com/snp/batch/service/BatchApiLogService.java diff --git a/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java b/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java index 6edbc66..a06d804 100644 --- a/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java +++ b/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java @@ -1,5 +1,7 @@ package com.snp.batch.common.batch.reader; +import com.snp.batch.global.model.BatchApiLog; +import com.snp.batch.service.BatchApiLogService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.annotation.BeforeStep; @@ -7,8 +9,13 @@ import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemReader; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; import org.springframework.web.util.UriBuilder; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.web.util.UriComponentsBuilder; import java.net.URI; import java.time.LocalDateTime; @@ -72,12 +79,180 @@ public abstract class BaseApiReader implements ItemReader { private int totalApiCalls = 0; private int completedApiCalls = 0; + // Batch Execution Id + private Long jobExecutionId; // 현재 Job 실행 ID + private Long stepExecutionId; // 현재 Step 실행 ID + /** + * 스프링 배치가 Step을 시작할 때 실행 ID를 주입해줍니다. + */ + public void setExecutionIds(Long jobExecutionId, Long stepExecutionId) { + this.jobExecutionId = jobExecutionId; + this.stepExecutionId = stepExecutionId; + } /** * 기본 생성자 (WebClient 없이 사용 - Mock 데이터용) */ protected BaseApiReader() { this.webClient = null; } + /** + * API 호출 및 로그 적재 통합 메서드 + * Response Json 구조 : [...] + */ + protected List executeListApiCall( + String baseUrl, + String path, + Map params, + ParameterizedTypeReference> typeReference, + BatchApiLogService logService) { + + // 1. 전체 URI 생성 (로그용) + MultiValueMap multiValueParams = new LinkedMultiValueMap<>(); + if (params != null) { + params.forEach((key, value) -> + multiValueParams.put(key, Collections.singletonList(value)) + ); + } + + String fullUri = UriComponentsBuilder.fromHttpUrl(baseUrl) + .path(path) + .queryParams(multiValueParams) + .build() + .toUriString(); + + long startTime = System.currentTimeMillis(); + int statusCode = 200; + String errorMessage = null; + Long responseSize = 0L; + + try { + log.info("[{}] API 요청 시작: {}", getReaderName(), fullUri); + + List result = webClient.get() + .uri(uriBuilder -> { + uriBuilder.path(path); + if (params != null) params.forEach(uriBuilder::queryParam); + return uriBuilder.build(); + }) + .retrieve() + .bodyToMono(typeReference) + .block(); + + responseSize = (result != null) ? (long) result.size() : 0L; + return result; + + } catch (WebClientResponseException e) { + // API 서버에서 응답은 왔으나 에러인 경우 (4xx, 5xx) + statusCode = e.getStatusCode().value(); + errorMessage = String.format("API Error: %s", e.getResponseBodyAsString()); + throw e; + } catch (Exception e) { + // 네트워크 오류, 타임아웃 등 기타 예외 + statusCode = 500; + errorMessage = String.format("System Error: %s", e.getMessage()); + throw e; + } finally { + // 성공/실패 여부와 관계없이 무조건 로그 저장 + long duration = System.currentTimeMillis() - startTime; + + logService.saveLog(BatchApiLog.builder() + .apiRequestLocation(getReaderName()) + .requestUri(fullUri) + .httpMethod("GET") + .statusCode(statusCode) + .responseTimeMs(duration) + .responseCount(responseSize) + .errorMessage(errorMessage) + .createdAt(LocalDateTime.now()) + .jobExecutionId(this.jobExecutionId) // 추가 + .stepExecutionId(this.stepExecutionId) // 추가 + .build()); + } + } + + /** + * API 호출 및 로그 적재 통합 메서드 + * Response Json 구조 : { "data": [...] } + */ + protected R executeSingleApiCall( + String baseUrl, + String path, + Map params, + ParameterizedTypeReference typeReference, + BatchApiLogService logService, + Function sizeExtractor) { // 사이즈 추출 함수 추가 + + // 1. 전체 URI 생성 (로그용) + MultiValueMap multiValueParams = new LinkedMultiValueMap<>(); + if (params != null) { + params.forEach((key, value) -> + multiValueParams.put(key, Collections.singletonList(value)) + ); + } + + String fullUri = UriComponentsBuilder.fromHttpUrl(baseUrl) + .path(path) + .queryParams(multiValueParams) + .build() + .toUriString(); + + long startTime = System.currentTimeMillis(); + int statusCode = 200; + String errorMessage = null; + R result = null; + + try { + log.info("[{}] Single API 요청 시작: {}", getReaderName(), fullUri); + + result = webClient.get() + .uri(uriBuilder -> { + uriBuilder.path(path); + if (params != null) params.forEach(uriBuilder::queryParam); + return uriBuilder.build(); + }) + .retrieve() + .bodyToMono(typeReference) + .block(); + + return result; + + } catch (WebClientResponseException e) { + statusCode = e.getStatusCode().value(); + errorMessage = String.format("API Error: %s", e.getResponseBodyAsString()); + throw e; + } catch (Exception e) { + statusCode = 500; + errorMessage = String.format("System Error: %s", e.getMessage()); + throw e; + } finally { + long duration = System.currentTimeMillis() - startTime; + + // 2. 주입받은 함수를 통해 데이터 건수(size) 계산 + long size = 0L; + if (result != null && sizeExtractor != null) { + try { + size = sizeExtractor.apply(result); + } catch (Exception e) { + log.warn("[{}] 사이즈 추출 중 오류 발생: {}", getReaderName(), e.getMessage()); + } + } + + // 3. 로그 저장 (api_request_location, response_size 반영) + logService.saveLog(BatchApiLog.builder() + .apiRequestLocation(getReaderName()) + .jobExecutionId(this.jobExecutionId) + .stepExecutionId(this.stepExecutionId) + .requestUri(fullUri) + .httpMethod("GET") + .statusCode(statusCode) + .responseTimeMs(duration) + .responseCount(size) + .errorMessage(errorMessage) + .createdAt(LocalDateTime.now()) + .build()); + } + } + /** * WebClient를 주입받는 생성자 (실제 API 연동용) @@ -87,7 +262,7 @@ public abstract class BaseApiReader implements ItemReader { protected BaseApiReader(WebClient webClient) { this.webClient = webClient; } - + /** /** * Step 실행 전 초기화 및 API 정보 저장 * Spring Batch가 자동으로 StepExecution을 주입하고 이 메서드를 호출함 diff --git a/src/main/java/com/snp/batch/global/config/AsyncConfig.java b/src/main/java/com/snp/batch/global/config/AsyncConfig.java new file mode 100644 index 0000000..62ba92b --- /dev/null +++ b/src/main/java/com/snp/batch/global/config/AsyncConfig.java @@ -0,0 +1,24 @@ +package com.snp.batch.global.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + +@Configuration +@EnableAsync // 비동기 기능 활성화 +public class AsyncConfig { + + @Bean(name = "apiLogExecutor") + public Executor apiLogExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(2); // 기본 스레드 수 + executor.setMaxPoolSize(5); // 최대 스레드 수 + executor.setQueueCapacity(500); // 대기 큐 크기 + executor.setThreadNamePrefix("ApiLogThread-"); + executor.initialize(); + return executor; + } +} \ No newline at end of file diff --git a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java index d51cccb..d05a2f9 100644 --- a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java +++ b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java @@ -61,11 +61,6 @@ public class MaritimeApiWebClientConfig { return WebClient.builder() .baseUrl(maritimeApiUrl) - .filter((request, next) -> { - // [핵심] 여기서 최종 완성된 URI를 로그로 출력합니다. - log.info(">>>> API Request: [{} {}]", request.method(), request.url()); - return next.exchange(request); - }) .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .codecs(configurer -> configurer .defaultCodecs() @@ -98,11 +93,6 @@ public class MaritimeApiWebClientConfig { return WebClient.builder() .baseUrl(maritimeServiceApiUrl) - .filter((request, next) -> { - // [핵심] 여기서 최종 완성된 URI를 로그로 출력합니다. - log.info(">>>> API Request: [{} {}]", request.method(), request.url()); - return next.exchange(request); - }) .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .codecs(configurer -> configurer .defaultCodecs() diff --git a/src/main/java/com/snp/batch/global/model/BatchApiLog.java b/src/main/java/com/snp/batch/global/model/BatchApiLog.java new file mode 100644 index 0000000..41ae691 --- /dev/null +++ b/src/main/java/com/snp/batch/global/model/BatchApiLog.java @@ -0,0 +1,46 @@ +package com.snp.batch.global.model; + +import jakarta.persistence.*; +import lombok.*; +import org.hibernate.annotations.CreationTimestamp; + +import java.time.LocalDateTime; + +@Entity +@Table(name = "batch_api_log", schema = "snp_data") +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +@Builder +public class BatchApiLog { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) // PostgreSQL BIGSERIAL과 매핑 + private Long logId; + + @Column(name = "api_request_location") // job_name에서 변경 + private String apiRequestLocation; + + @Column(columnDefinition = "TEXT", nullable = false) + private String requestUri; + + @Column(nullable = false, length = 10) + private String httpMethod; + + private Integer statusCode; + + private Long responseTimeMs; + + @Column(name = "response_count") + private Long responseCount; + + @Column(columnDefinition = "TEXT") + private String errorMessage; + + @CreationTimestamp // 엔티티가 생성될 때 자동으로 시간 설정 + @Column(updatable = false) + private LocalDateTime createdAt; + + private Long jobExecutionId; // 추가 + private Long stepExecutionId; // 추가 +} \ No newline at end of file diff --git a/src/main/java/com/snp/batch/global/repository/BatchApiLogRepository.java b/src/main/java/com/snp/batch/global/repository/BatchApiLogRepository.java new file mode 100644 index 0000000..6ec53ff --- /dev/null +++ b/src/main/java/com/snp/batch/global/repository/BatchApiLogRepository.java @@ -0,0 +1,10 @@ +package com.snp.batch.global.repository; + +import com.snp.batch.global.model.BatchApiLog; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface BatchApiLogRepository extends JpaRepository { + +} \ No newline at end of file diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java index 96ee323..8fc7f4d 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java @@ -6,6 +6,7 @@ import com.snp.batch.jobs.compliance.batch.entity.ComplianceEntity; import com.snp.batch.jobs.compliance.batch.processor.ComplianceDataProcessor; import com.snp.batch.jobs.compliance.batch.reader.ComplianceDataRangeReader; import com.snp.batch.jobs.compliance.batch.writer.ComplianceDataWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; @@ -20,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -39,6 +41,10 @@ public class ComplianceImportRangeJobConfig extends BaseMultiStepJobConfig createProcessor() { diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java b/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java index 04e4f29..6c5c185 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataRangeReader.java @@ -2,13 +2,12 @@ package com.snp.batch.jobs.compliance.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.compliance.batch.dto.ComplianceDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.HttpStatusCode; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; @@ -18,15 +17,18 @@ public class ComplianceDataRangeReader extends BaseApiReader { private final JdbcTemplate jdbcTemplate; private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + String maritimeServiceApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - private String fromDate; - private String toDate; - public ComplianceDataRangeReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService) { + + public ComplianceDataRangeReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.jdbcTemplate = jdbcTemplate; this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } @@ -34,13 +36,6 @@ public class ComplianceDataRangeReader extends BaseApiReader { protected String getReaderName() { return "ComplianceDataRangeReader"; } - - @Override - protected void resetCustomState() { - this.currentBatchIndex = 0; - this.allData = null; - } - @Override protected String getApiPath() { return "/RiskAndCompliance/UpdatedComplianceList"; @@ -50,6 +45,12 @@ public class ComplianceDataRangeReader extends BaseApiReader { return "COMPLIANCE_IMPORT_API"; } + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allData = null; + } + @Override protected List fetchNextBatch() throws Exception { // 모든 배치 처리 완료 확인 @@ -102,36 +103,14 @@ public class ComplianceDataRangeReader extends BaseApiReader { private List callApiWithBatch() { Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - - String url = getApiPath(); - log.debug("[{}] API 호출: {}", getReaderName(), url); - return webClient.get() - .uri(url, uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("fromDate", params.get("fromDate")) - .queryParam("toDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToMono(new ParameterizedTypeReference>() {}) - .block(); + // 부모 클래스의 공통 모듈 호출 (단 한 줄로 처리 가능) + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } } diff --git a/src/main/java/com/snp/batch/jobs/event/batch/config/EventImportJobConfig.java b/src/main/java/com/snp/batch/jobs/event/batch/config/EventImportJobConfig.java index 1253064..5abb174 100644 --- a/src/main/java/com/snp/batch/jobs/event/batch/config/EventImportJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/event/batch/config/EventImportJobConfig.java @@ -6,10 +6,12 @@ import com.snp.batch.jobs.event.batch.entity.EventDetailEntity; import com.snp.batch.jobs.event.batch.processor.EventDataProcessor; import com.snp.batch.jobs.event.batch.reader.EventDataReader; import com.snp.batch.jobs.event.batch.writer.EventDataWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; @@ -19,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -28,13 +31,16 @@ import org.springframework.web.reactive.function.client.WebClient; @Slf4j @Configuration public class EventImportJobConfig extends BaseMultiStepJobConfig { + private final EventDataProcessor eventDataProcessor; + private final EventDataWriter eventDataWriter; + private final EventDataReader eventDataReader; private final JdbcTemplate jdbcTemplate; private final WebClient maritimeApiWebClient; - - private final EventDataProcessor eventDataProcessor; - - private final EventDataWriter eventDataWriter; private final BatchDateService batchDateService; + private final BatchApiLogService batchApiLogService; + + @Value("${app.batch.ship-api.url}") + private String maritimeApiUrl; protected String getApiKey() {return "EVENT_IMPORT_API";} protected String getBatchUpdateSql() { @@ -42,22 +48,26 @@ public class EventImportJobConfig extends BaseMultiStepJobConfig createReader() { - return new EventDataReader(maritimeApiWebClient, jdbcTemplate, batchDateService); + return eventDataReader; } @Override diff --git a/src/main/java/com/snp/batch/jobs/event/batch/reader/EventDataReader.java b/src/main/java/com/snp/batch/jobs/event/batch/reader/EventDataReader.java index 7c704b6..602c2b5 100644 --- a/src/main/java/com/snp/batch/jobs/event/batch/reader/EventDataReader.java +++ b/src/main/java/com/snp/batch/jobs/event/batch/reader/EventDataReader.java @@ -2,8 +2,10 @@ package com.snp.batch.jobs.event.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.event.batch.dto.*; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatusCode; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.reactive.function.client.WebClient; @@ -16,14 +18,18 @@ import java.util.stream.Collectors; @Slf4j public class EventDataReader extends BaseApiReader { + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeApiUrl; private Map eventPeriodMap; private final JdbcTemplate jdbcTemplate; - private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 - public EventDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService) { + public EventDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeApiUrl) { super(webClient); this.jdbcTemplate = jdbcTemplate; this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeApiUrl = maritimeApiUrl; enableChunkMode(); // ✨ Chunk 모드 활성화 } @@ -31,16 +37,13 @@ public class EventDataReader extends BaseApiReader { protected String getReaderName() { return "EventDataReader"; } - @Override protected String getApiPath() { return "/MaritimeWCF/MaritimeAndTradeEventsService.svc/RESTFul/GetEventListByEventChangeDateRange"; } - protected String getEventDetailApiPath() { return "/MaritimeWCF/MaritimeAndTradeEventsService.svc/RESTFul/GetEventDataByEventID"; } - protected String getApiKey() { return "EVENT_IMPORT_API"; } @@ -126,9 +129,9 @@ public class EventDataReader extends BaseApiReader { // API 호출 통계 업데이트 updateApiCallStats(totalBatches, currentBatchNumber); - // API 과부하 방지 (다음 배치 전 0.5초 대기) + // API 과부하 방지 (다음 배치 전 1.0초 대기) if (currentBatchIndex < eventIds.size()) { - Thread.sleep(500); + Thread.sleep(1000); } return eventDetailList; @@ -188,37 +191,14 @@ public class EventDataReader extends BaseApiReader { private EventResponse callEventApiWithBatch() { Map params = batchDateService.getDateRangeWithoutTimeParams(getApiKey()); - String url = getApiPath(); - - return webClient.get() - .uri(url, uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("fromYear", params.get("fromYear")) - .queryParam("fromMonth", params.get("fromMonth")) - .queryParam("fromDay", params.get("fromDay")) - .queryParam("toYear", params.get("toYear")) - .queryParam("toMonth", params.get("toMonth")) - .queryParam("toDay", params.get("toDay")) - .build()) - .retrieve() - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToMono(EventResponse.class) - .block(); + return executeSingleApiCall( + maritimeApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference() {}, + batchApiLogService, + res -> res.getMaritimeEvents() != null ? (long) res.getMaritimeEvents().size() : 0L // 람다 적용 + ); } private EventDetailResponse callEventDetailApiWithBatch(Long eventId) { diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java index 017aaaf..603c617 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java @@ -6,6 +6,7 @@ import com.snp.batch.jobs.movement.batch.entity.AnchorageCallsEntity; import com.snp.batch.jobs.movement.batch.processor.AnchorageCallsProcessor; import com.snp.batch.jobs.movement.batch.reader.AnchorageCallsRangeReader; import com.snp.batch.jobs.movement.batch.writer.AnchorageCallsWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; @@ -20,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -36,6 +38,10 @@ public class AnchorageCallsRangeJobConfig extends BaseMultiStepJobConfig createProcessor() { diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/CurrentlyAtRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/CurrentlyAtRangeJobConfig.java index d897615..82c2549 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/CurrentlyAtRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/CurrentlyAtRangeJobConfig.java @@ -6,6 +6,7 @@ import com.snp.batch.jobs.movement.batch.entity.CurrentlyAtEntity; import com.snp.batch.jobs.movement.batch.processor.CurrentlyAtProcessor; import com.snp.batch.jobs.movement.batch.reader.CurrentlyAtRangeReader; import com.snp.batch.jobs.movement.batch.writer.CurrentlyAtWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; @@ -20,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -36,6 +38,10 @@ public class CurrentlyAtRangeJobConfig extends BaseMultiStepJobConfig createProcessor() { diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java index fca2c61..928d502 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java @@ -6,6 +6,7 @@ import com.snp.batch.jobs.movement.batch.entity.DestinationEntity; import com.snp.batch.jobs.movement.batch.processor.DestinationProcessor; import com.snp.batch.jobs.movement.batch.reader.DestinationRangeReader; import com.snp.batch.jobs.movement.batch.writer.DestinationWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; @@ -20,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -36,6 +38,10 @@ public class DestinationsRangeJobConfig extends BaseMultiStepJobConfig createProcessor() { diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/ShipPortCallsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/ShipPortCallsRangeJobConfig.java index b8a4078..bc95359 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/ShipPortCallsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/ShipPortCallsRangeJobConfig.java @@ -6,6 +6,7 @@ import com.snp.batch.jobs.movement.batch.entity.PortCallsEntity; import com.snp.batch.jobs.movement.batch.processor.PortCallsProcessor; import com.snp.batch.jobs.movement.batch.reader.PortCallsRangeReader; import com.snp.batch.jobs.movement.batch.writer.PortCallsWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; @@ -20,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -36,6 +38,10 @@ public class ShipPortCallsRangeJobConfig extends BaseMultiStepJobConfig createReader() { // 타입 변경 - // Reader 생성자 수정: ObjectMapper를 전달합니다. return stsOperationRangeReader; } @Bean @StepScope - public StsOperationRangeReader stsOperationRangeReader() { - // jobParameters 없으면 null 넘어오고 Reader에서 default 처리 - return new StsOperationRangeReader(maritimeApiWebClient, batchDateService); + public StsOperationRangeReader stsOperationRangeReader( + @Value("#{stepExecution.jobExecution.id}") Long jobExecutionId, // SpEL로 ID 추출 + @Value("#{stepExecution.id}") Long stepExecutionId + ) { + StsOperationRangeReader reader = new StsOperationRangeReader(maritimeApiWebClient, batchDateService, batchApiLogService, maritimeServiceApiUrl); + reader.setExecutionIds(jobExecutionId, stepExecutionId); // ID 세팅 + return reader; } @Override protected ItemProcessor createProcessor() { diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/TerminalCallsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/TerminalCallsRangeJobConfig.java index 2870ce4..0d33208 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/TerminalCallsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/TerminalCallsRangeJobConfig.java @@ -6,6 +6,7 @@ import com.snp.batch.jobs.movement.batch.entity.TerminalCallsEntity; import com.snp.batch.jobs.movement.batch.processor.TerminalCallsProcessor; import com.snp.batch.jobs.movement.batch.reader.TerminalCallsRangeReader; import com.snp.batch.jobs.movement.batch.writer.TerminalCallsWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; @@ -20,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -36,6 +38,10 @@ public class TerminalCallsRangeJobConfig extends BaseMultiStepJobConfig createProcessor() { diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/TransitsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/TransitsRangeJobConfig.java index 54c57fa..11d2bcd 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/TransitsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/TransitsRangeJobConfig.java @@ -6,6 +6,7 @@ import com.snp.batch.jobs.movement.batch.entity.TransitsEntity; import com.snp.batch.jobs.movement.batch.processor.TransitsProcessor; import com.snp.batch.jobs.movement.batch.reader.TransitsRangeReader; import com.snp.batch.jobs.movement.batch.writer.TransitsWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; @@ -20,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -36,6 +38,10 @@ public class TransitsRangeJobConfig extends BaseMultiStepJobConfig createProcessor() { diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/AnchorageCallsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/AnchorageCallsRangeReader.java index a15d123..bf0fd8d 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/AnchorageCallsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/AnchorageCallsRangeReader.java @@ -2,12 +2,12 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.HttpStatusCode; -import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; @@ -16,23 +16,31 @@ import java.util.Map; @StepScope public class AnchorageCallsRangeReader extends BaseApiReader { private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeServiceApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - protected String getApiKey() { - return "ANCHORAGE_CALLS_IMPORT_API"; - } - - public AnchorageCallsRangeReader(WebClient webClient, BatchDateService batchDateService) { + public AnchorageCallsRangeReader(WebClient webClient, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } @Override protected String getReaderName() { - return "AnchorageCallsReader"; + return "AnchorageCallsRangeReader"; + } + @Override + protected String getApiPath() { + return "/Movements/AnchorageCalls"; + } + + protected String getApiKey() { + return "ANCHORAGE_CALLS_IMPORT_API"; } @Override @@ -41,16 +49,6 @@ public class AnchorageCallsRangeReader extends BaseApiReader this.allData = null; } - @Override - protected String getApiPath() { - return "/Movements/AnchorageCalls"; - } - - @Override - protected String getApiBaseUrl() { - return "https://webservices.maritime.spglobal.com"; - } - @Override protected List fetchNextBatch() throws Exception { // 1) 처음 호출이면 API 한 번 호출해서 전체 데이터를 가져온다 @@ -94,36 +92,14 @@ public class AnchorageCallsRangeReader extends BaseApiReader * @return API 응답 */ private List callApiWithBatch() { - Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - - return webClient.get() - .uri(getApiPath(), uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("startDate", params.get("fromDate")) - .queryParam("stopDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToFlux(AnchorageCallsDto.class) - .collectList() - .block(); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey(), "startDate", "stopDate"); + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } @Override diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/BerthCallsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/BerthCallsRangeReader.java index 8fc6127..a742932 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/BerthCallsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/BerthCallsRangeReader.java @@ -2,12 +2,12 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.BerthCallsDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.HttpStatusCode; -import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; @@ -16,16 +16,16 @@ import java.util.Map; @StepScope public class BerthCallsRangeReader extends BaseApiReader { private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeServiceApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - protected String getApiKey() { - return "BERTH_CALLS_IMPORT_API"; - } - - public BerthCallsRangeReader(WebClient webClient, BatchDateService batchDateService) { + public BerthCallsRangeReader(WebClient webClient, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } @@ -33,21 +33,19 @@ public class BerthCallsRangeReader extends BaseApiReader { protected String getReaderName() { return "BerthCallsRangeReader"; } - - @Override - protected void resetCustomState() { - this.currentBatchIndex = 0; - this.allData = null; - } - @Override protected String getApiPath() { return "/Movements/BerthCalls"; } + protected String getApiKey() { + return "BERTH_CALLS_IMPORT_API"; + } + @Override - protected String getApiBaseUrl() { - return "https://webservices.maritime.spglobal.com"; + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allData = null; } @Override @@ -93,36 +91,14 @@ public class BerthCallsRangeReader extends BaseApiReader { * @return API 응답 */ private List callApiWithBatch() { - Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - - return webClient.get() - .uri(getApiPath(), uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("startDate", params.get("fromDate")) - .queryParam("stopDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToFlux(BerthCallsDto.class) - .collectList() - .block(); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey(), "startDate", "stopDate"); + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } @Override diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/CurrentlyAtRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/CurrentlyAtRangeReader.java index 00e8704..0827855 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/CurrentlyAtRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/CurrentlyAtRangeReader.java @@ -2,12 +2,12 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.CurrentlyAtDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.HttpStatusCode; -import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; @@ -16,21 +16,29 @@ import java.util.Map; @StepScope public class CurrentlyAtRangeReader extends BaseApiReader { private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeServiceApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - protected String getApiKey() { - return "CURRENTLY_AT_IMPORT_API"; - } - public CurrentlyAtRangeReader(WebClient webClient, BatchDateService batchDateService) { + public CurrentlyAtRangeReader(WebClient webClient, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } - @Override protected String getReaderName() { - return "CurrentlyAtReader"; + return "CurrentlyAtRangeReader"; + } + @Override + protected String getApiPath() { + return "/Movements/CurrentlyAt"; + } + + protected String getApiKey() { + return "CURRENTLY_AT_IMPORT_API"; } @Override @@ -39,16 +47,6 @@ public class CurrentlyAtRangeReader extends BaseApiReader { this.allData = null; } - @Override - protected String getApiPath() { - return "/Movements/CurrentlyAt"; - } - - @Override - protected String getApiBaseUrl() { - return "https://webservices.maritime.spglobal.com"; - } - @Override protected List fetchNextBatch() throws Exception { @@ -92,36 +90,14 @@ public class CurrentlyAtRangeReader extends BaseApiReader { * @return API 응답 */ private List callApiWithBatch() { - Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - - return webClient.get() - .uri(getApiPath(), uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("startDate", params.get("fromDate")) - .queryParam("stopDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToFlux(CurrentlyAtDto.class) - .collectList() - .block(); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey(), "startDate", "stopDate"); + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } @Override diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/DestinationRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/DestinationRangeReader.java index e685c3b..2c43ec0 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/DestinationRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/DestinationRangeReader.java @@ -2,12 +2,12 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.DestinationDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.HttpStatusCode; -import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; @@ -16,21 +16,29 @@ import java.util.Map; @StepScope public class DestinationRangeReader extends BaseApiReader { private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeServiceApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - protected String getApiKey() { - return "DESTINATIONS_IMPORT_API"; - } - public DestinationRangeReader(WebClient webClient, BatchDateService batchDateService) { + public DestinationRangeReader(WebClient webClient, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } - @Override protected String getReaderName() { - return "DestinationsRangeReader"; + return "DestinationRangeReader"; + } + @Override + protected String getApiPath() { + return "/Movements/Destinations"; + } + + protected String getApiKey() { + return "DESTINATIONS_IMPORT_API"; } @Override @@ -39,16 +47,6 @@ public class DestinationRangeReader extends BaseApiReader { this.allData = null; } - @Override - protected String getApiPath() { - return "/Movements/Destinations"; - } - - @Override - protected String getApiBaseUrl() { - return "https://webservices.maritime.spglobal.com"; - } - @Override protected List fetchNextBatch() throws Exception { @@ -90,36 +88,14 @@ public class DestinationRangeReader extends BaseApiReader { * @return API 응답 */ private List callApiWithBatch() { - Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - - return webClient.get() - .uri(getApiPath(), uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("startDate", params.get("fromDate")) - .queryParam("stopDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToFlux(DestinationDto.class) - .collectList() - .block(); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey(), "startDate", "stopDate"); + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } @Override diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/PortCallsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/PortCallsRangeReader.java index fff45ec..aad0ebf 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/PortCallsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/PortCallsRangeReader.java @@ -2,12 +2,12 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.PortCallsDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.HttpStatusCode; -import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; @@ -16,15 +16,16 @@ import java.util.Map; @StepScope public class PortCallsRangeReader extends BaseApiReader { private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeServiceApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - protected String getApiKey() { - return "PORT_CALLS_IMPORT_API"; - } - public PortCallsRangeReader(WebClient webClient, BatchDateService batchDateService) { + public PortCallsRangeReader(WebClient webClient, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } @@ -32,21 +33,19 @@ public class PortCallsRangeReader extends BaseApiReader { protected String getReaderName() { return "PortCallsRangeReader"; } - - @Override - protected void resetCustomState() { - this.currentBatchIndex = 0; - this.allData = null; - } - @Override protected String getApiPath() { return "/Movements/PortCalls"; } + protected String getApiKey() { + return "PORT_CALLS_IMPORT_API"; + } + @Override - protected String getApiBaseUrl() { - return "https://webservices.maritime.spglobal.com"; + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allData = null; } @Override @@ -93,35 +92,14 @@ public class PortCallsRangeReader extends BaseApiReader { * @return API 응답 */ private List callApiWithBatch() { - Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - return webClient.get() - .uri(getApiPath(), uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("startDate", params.get("fromDate")) - .queryParam("stopDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToFlux(PortCallsDto.class) - .collectList() - .block(); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey(), "startDate", "stopDate"); + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } @Override diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/StsOperationRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/StsOperationRangeReader.java index 20f1bcc..b3640bf 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/StsOperationRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/StsOperationRangeReader.java @@ -2,12 +2,12 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.StsOperationDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.HttpStatusCode; -import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; @@ -16,16 +16,16 @@ import java.util.Map; @StepScope public class StsOperationRangeReader extends BaseApiReader { private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeServiceApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - protected String getApiKey() { - return "STS_OPERATION_IMPORT_API"; - } - - public StsOperationRangeReader(WebClient webClient, BatchDateService batchDateService) { + public StsOperationRangeReader(WebClient webClient, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } @@ -33,6 +33,14 @@ public class StsOperationRangeReader extends BaseApiReader { protected String getReaderName() { return "StsOperationRangeReader"; } + @Override + protected String getApiPath() { + return "/Movements/StsOperations"; + } + + protected String getApiKey() { + return "STS_OPERATION_IMPORT_API"; + } @Override protected void resetCustomState() { @@ -40,11 +48,6 @@ public class StsOperationRangeReader extends BaseApiReader { this.allData = null; } - @Override - protected String getApiPath() { - return "/Movements/StsOperations"; - } - @Override protected String getApiBaseUrl() { return "https://webservices.maritime.spglobal.com"; @@ -92,36 +95,14 @@ public class StsOperationRangeReader extends BaseApiReader { * @return API 응답 */ private List callApiWithBatch() { - Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - - return webClient.get() - .uri(getApiPath(), uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("startDate", params.get("fromDate")) - .queryParam("stopDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToFlux(StsOperationDto.class) - .collectList() - .block(); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey(), "startDate", "stopDate"); + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } @Override diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/TerminalCallsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/TerminalCallsRangeReader.java index 05bcdbc..49e7704 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/TerminalCallsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/TerminalCallsRangeReader.java @@ -2,12 +2,12 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.http.HttpStatusCode; -import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; @@ -16,22 +16,30 @@ import java.util.Map; @StepScope public class TerminalCallsRangeReader extends BaseApiReader { private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeServiceApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - protected String getApiKey() { - return "TERMINAL_CALLS_IMPORT_API"; - } - public TerminalCallsRangeReader(WebClient webClient, BatchDateService batchDateService) { + public TerminalCallsRangeReader(WebClient webClient, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } - @Override protected String getReaderName() { return "TerminalCallsRangeReader"; } + @Override + protected String getApiPath() { + return "/Movements/TerminalCalls"; + } + + protected String getApiKey() { + return "TERMINAL_CALLS_IMPORT_API"; + } @Override protected void resetCustomState() { @@ -39,16 +47,6 @@ public class TerminalCallsRangeReader extends BaseApiReader { this.allData = null; } - @Override - protected String getApiPath() { - return "/Movements/TerminalCalls"; - } - - @Override - protected String getApiBaseUrl() { - return "https://webservices.maritime.spglobal.com"; - } - @Override protected List fetchNextBatch() throws Exception { @@ -91,36 +89,14 @@ public class TerminalCallsRangeReader extends BaseApiReader { * @return API 응답 */ private List callApiWithBatch() { - Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - - return webClient.get() - .uri(getApiPath(), uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("startDate", params.get("fromDate")) - .queryParam("stopDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToFlux(TerminalCallsDto.class) - .collectList() - .block(); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey(), "startDate", "stopDate"); + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } @Override diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/TransitsRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/TransitsRangeReader.java index fd51d8b..b2475be 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/TransitsRangeReader.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/reader/TransitsRangeReader.java @@ -1,10 +1,13 @@ package com.snp.batch.jobs.movement.batch.reader; import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto; import com.snp.batch.jobs.movement.batch.dto.TransitsDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.http.HttpStatusCode; import reactor.core.publisher.Mono; @@ -16,22 +19,30 @@ import java.util.Map; @StepScope public class TransitsRangeReader extends BaseApiReader { private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeServiceApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; - protected String getApiKey() { - return "TRANSITS_IMPORT_API"; - } - public TransitsRangeReader(WebClient webClient, BatchDateService batchDateService) { + public TransitsRangeReader(WebClient webClient, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } - @Override protected String getReaderName() { return "TransitsRangeReader"; } + @Override + protected String getApiPath() { + return "/Movements/Transits"; + } + + protected String getApiKey() { + return "TRANSITS_IMPORT_API"; + } @Override protected void resetCustomState() { @@ -39,11 +50,6 @@ public class TransitsRangeReader extends BaseApiReader { this.allData = null; } - @Override - protected String getApiPath() { - return "/Movements/Transits"; - } - @Override protected String getApiBaseUrl() { return "https://webservices.maritime.spglobal.com"; @@ -91,36 +97,14 @@ public class TransitsRangeReader extends BaseApiReader { * @return API 응답 */ private List callApiWithBatch() { - Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - - return webClient.get() - .uri(getApiPath(), uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("startDate", params.get("fromDate")) - .queryParam("stopDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToFlux(TransitsDto.class) - .collectList() - .block(); + Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey(), "startDate", "stopDate"); + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } @Override diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java index a23d07a..95af555 100644 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java @@ -6,10 +6,12 @@ import com.snp.batch.jobs.pscInspection.batch.entity.PscInspectionEntity; import com.snp.batch.jobs.pscInspection.batch.processor.PscInspectionProcessor; import com.snp.batch.jobs.pscInspection.batch.reader.PscApiReader; import com.snp.batch.jobs.pscInspection.batch.writer.PscInspectionWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; @@ -19,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -31,9 +34,14 @@ public class PscInspectionJobConfig extends BaseMultiStepJobConfig createReader() { - return new PscApiReader(maritimeApiWebClient, jdbcTemplate, batchDateService); + return pscApiReader; } @Override diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/reader/PscApiReader.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/reader/PscApiReader.java index d2c2375..6d00c24 100644 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/reader/PscApiReader.java +++ b/src/main/java/com/snp/batch/jobs/pscInspection/batch/reader/PscApiReader.java @@ -1,16 +1,15 @@ package com.snp.batch.jobs.pscInspection.batch.reader; -import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.jobs.pscInspection.batch.dto.PscApiResponseDto; import com.snp.batch.jobs.pscInspection.batch.dto.PscInspectionDto; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.http.HttpStatusCode; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; import java.time.LocalDate; import java.time.LocalDateTime; @@ -22,20 +21,20 @@ import java.util.Map; @Slf4j @StepScope public class PscApiReader extends BaseApiReader { - + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeApiUrl; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; private final JdbcTemplate jdbcTemplate; - private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 - protected String getApiKey() { - return "PSC_IMPORT_API"; - } - public PscApiReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService) { + public PscApiReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeApiUrl) { super(webClient); this.jdbcTemplate = jdbcTemplate; this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeApiUrl = maritimeApiUrl; enableChunkMode(); // ✨ Chunk 모드 활성화 } @@ -43,6 +42,14 @@ public class PscApiReader extends BaseApiReader { protected String getReaderName() { return "PscApiReader"; } + @Override + protected String getApiPath() { + return "/MaritimeWCF/PSCService.svc/RESTFul/GetPSCDataByLastUpdateDateRange"; + } + + protected String getApiKey() { + return "PSC_IMPORT_API"; + } @Override protected void resetCustomState() { @@ -50,11 +57,6 @@ public class PscApiReader extends BaseApiReader { this.allData = null; } - @Override - protected String getApiPath() { - return "/MaritimeWCF/PSCService.svc/RESTFul/GetPSCDataByLastUpdateDateRange"; - } - @Override protected List fetchNextBatch() { @@ -94,61 +96,23 @@ public class PscApiReader extends BaseApiReader { } private List callApiWithBatch() { - Map params = batchDateService.getDateRangeWithoutTimeParams(getApiKey()); - String url = getApiPath(); - - String json = webClient.get() - .uri(url, uriBuilder -> uriBuilder - .queryParam("shipsCategory", params.get("shipsCategory")) - .queryParam("fromYear", params.get("fromYear")) - .queryParam("fromMonth", params.get("fromMonth")) - .queryParam("fromDay", params.get("fromDay")) - .queryParam("toYear", params.get("toYear")) - .queryParam("toMonth", params.get("toMonth")) - .queryParam("toDay", params.get("toDay")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToMono(String.class) - .block(); - - if (json == null || json.isBlank()) { - log.warn("[PSC] API 응답 없음"); - return Collections.emptyList(); + // 1. 단일 객체 응답 API 호출 + PscApiResponseDto response = executeSingleApiCall( + maritimeApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference() {}, + batchApiLogService, + res -> res.getInspections() != null ? (long) res.getInspections().size() : 0L // 람다 적용 + ); + // 2. Inspections Array 데이터 추출 + if (response != null && response.getInspections() != null) { + log.info("[{}] PSC 데이터 추출 성공 - 건수: {}", getReaderName(), response.getInspections().size()); + return response.getInspections(); } - try { - ObjectMapper mapper = new ObjectMapper(); - PscApiResponseDto resp = mapper.readValue(json, PscApiResponseDto.class); - - if (resp.getInspections() == null) { - log.warn("[PSC] inspections 필드 없음"); - return Collections.emptyList(); - } - - return resp.getInspections(); - - } catch (Exception e) { - log.error("[PSC] JSON 파싱 실패: {}", e.getMessage()); - return Collections.emptyList(); - } + return Collections.emptyList(); } @Override @@ -156,8 +120,6 @@ public class PscApiReader extends BaseApiReader { if (data == null) { int totalBatches = (int) Math.ceil((double) allData.size() / batchSize); log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); - log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", - getReaderName(), allData.size()); } } diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java index 59b8f62..19a372c 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java @@ -6,6 +6,7 @@ import com.snp.batch.jobs.risk.batch.entity.RiskEntity; import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor; import com.snp.batch.jobs.risk.batch.reader.RiskDataRangeReader; import com.snp.batch.jobs.risk.batch.writer.RiskDataWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; @@ -20,6 +21,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -35,6 +37,10 @@ public class RiskImportRangeJobConfig extends BaseMultiStepJobConfig { private final JdbcTemplate jdbcTemplate; private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; private List allData; private int currentBatchIndex = 0; private final int batchSize = 5000; private String fromDate; private String toDate; - public RiskDataRangeReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService) { + String maritimeServiceApiUrl; + public RiskDataRangeReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeServiceApiUrl) { super(webClient); this.jdbcTemplate = jdbcTemplate; this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeServiceApiUrl = maritimeServiceApiUrl; enableChunkMode(); } @@ -100,35 +103,14 @@ public class RiskDataRangeReader extends BaseApiReader { private List callApiWithBatch() { Map params = batchDateService.getDateRangeWithTimezoneParams(getApiKey()); -// log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); - - String url = getApiPath(); - return webClient.get() - .uri(url, uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("fromDate", params.get("fromDate")) - .queryParam("toDate", params.get("toDate")) - .build()) - .retrieve() - // 1. 에러 상태 코드 감지 (4xx, 5xx) - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToMono(new ParameterizedTypeReference>() {}) - .block(); + // 부모 클래스의 공통 모듈 호출 (단 한 줄로 처리 가능) + return executeListApiCall( + maritimeServiceApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference>() {}, + batchApiLogService + ); } diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java index d28006d..a4a15d7 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java @@ -7,10 +7,12 @@ import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailUpdate; import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor; import com.snp.batch.jobs.shipdetail.batch.reader.ShipDetailUpdateDataReader; import com.snp.batch.jobs.shipdetail.batch.writer.ShipDetailDataWriter; +import com.snp.batch.service.BatchApiLogService; import com.snp.batch.service.BatchDateService; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; @@ -20,6 +22,7 @@ import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -32,10 +35,15 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig createReader() { // 타입 변경 - // Reader 생성자 수정: ObjectMapper를 전달합니다. - return new ShipDetailUpdateDataReader(maritimeApiWebClient, jdbcTemplate, objectMapper, batchDateService); + return shipDetailUpdateDataReader; } @Override @@ -95,7 +117,7 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { + private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 + private final BatchApiLogService batchApiLogService; + private final String maritimeApiUrl; private final JdbcTemplate jdbcTemplate; private final ObjectMapper objectMapper; - private final BatchDateService batchDateService; // ✨ BatchDateService 필드 추가 - protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";} - // 배치 처리 상태 + private List allImoNumbers; // DB 해시값을 저장할 맵 private Map dbMasterHashes; private int currentBatchIndex = 0; - private final int batchSize = 20; - - public ShipDetailUpdateDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper,BatchDateService batchDateService) { + private final int batchSize = 50; + public ShipDetailUpdateDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper,BatchDateService batchDateService, BatchApiLogService batchApiLogService, String maritimeApiUrl) { super(webClient); this.jdbcTemplate = jdbcTemplate; this.objectMapper = objectMapper; this.batchDateService = batchDateService; + this.batchApiLogService = batchApiLogService; + this.maritimeApiUrl = maritimeApiUrl; enableChunkMode(); // ✨ Chunk 모드 활성화 } @@ -42,6 +44,12 @@ public class ShipDetailUpdateDataReader extends BaseApiReader - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToMono(ShipDetailApiResponse.class) - .block(); + return executeSingleApiCall( + maritimeApiUrl, + getApiPath(), + params, + new ParameterizedTypeReference() {}, + batchApiLogService, + res -> res.getShipResult() != null ? (long) res.getShipResult().size() : 0L // 람다 적용 + ); } private ShipUpdateApiResponse callShipUpdateApi(){ // 1. BatchDateService를 통해 동적 날짜 파라미터 맵 조회 Map params = batchDateService.getDateRangeWithoutTimeParams(getApiKey()); - - String url = getShipUpdateApiPath(); - - return webClient.get() - .uri(url, uriBuilder -> uriBuilder - // 맵에서 파라미터 값을 동적으로 가져와 세팅 - .queryParam("shipsCategory", params.get("shipsCategory")) - .queryParam("fromYear", params.get("fromYear")) - .queryParam("fromMonth", params.get("fromMonth")) - .queryParam("fromDay", params.get("fromDay")) - .queryParam("toYear", params.get("toYear")) - .queryParam("toMonth", params.get("toMonth")) - .queryParam("toDay", params.get("toDay")) - .build()) - .retrieve() - .onStatus(HttpStatusCode::isError, clientResponse -> - clientResponse.bodyToMono(String.class) // 에러 바디를 문자열로 읽음 - .flatMap(errorBody -> { - // 2. 로그에 상태 코드와 에러 메세지 출력 - log.error("[{}] API 호출 오류 발생!", getReaderName()); - log.error("[{}] ERROR CODE: {}, REASON: {}", - getReaderName(), - clientResponse.statusCode(), - errorBody); - - // 3. 상위로 예외 던지기 (배치 중단을 원할 경우) - return Mono.error(new RuntimeException( - String.format("API 호출 실패 (%s): %s", clientResponse.statusCode(), errorBody) - )); - }) - ) - .bodyToMono(ShipUpdateApiResponse.class) - .block(); + return executeSingleApiCall( + maritimeApiUrl, + getShipUpdateApiPath(), + params, + new ParameterizedTypeReference() {}, + batchApiLogService, + res -> res.getShips() != null ? (long) res.getShips().size() : 0L // 람다 적용 + ); } private List extractUpdateImoNumbers(ShipUpdateApiResponse response) { diff --git a/src/main/java/com/snp/batch/service/BatchApiLogService.java b/src/main/java/com/snp/batch/service/BatchApiLogService.java new file mode 100644 index 0000000..a5b1665 --- /dev/null +++ b/src/main/java/com/snp/batch/service/BatchApiLogService.java @@ -0,0 +1,33 @@ +package com.snp.batch.service; + +import com.snp.batch.global.model.BatchApiLog; +import com.snp.batch.global.repository.BatchApiLogRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +@Service +@RequiredArgsConstructor +@Slf4j +public class BatchApiLogService { + private final BatchApiLogRepository batchApiLogRepository; + + /** + * 비동기로 API 로그를 저장합니다. + * propagation = Propagation.REQUIRES_NEW 를 사용하여 + * 메인 배치가 실패(Rollback)하더라도 로그는 저장되도록 설정합니다. + */ + @Async("apiLogExecutor") // 설정한 스레드 풀 사용 + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void saveLog(BatchApiLog logEntry) { + try { + batchApiLogRepository.save(logEntry); + } catch (Exception e) { + // 로그 저장 실패가 배치를 중단시키지 않도록 여기서 예외 처리 + log.error("API 로그 저장 실패: {}", e.getMessage()); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/snp/batch/service/BatchDateService.java b/src/main/java/com/snp/batch/service/BatchDateService.java index fb93526..b4c9eff 100644 --- a/src/main/java/com/snp/batch/service/BatchDateService.java +++ b/src/main/java/com/snp/batch/service/BatchDateService.java @@ -82,6 +82,30 @@ public class BatchDateService { }); } + public Map getDateRangeWithTimezoneParams(String apiKey, String dateParam1, String dateParam2) { + return repository.findDateRangeByApiKey(apiKey) + .map(projection -> { + Map params = new HashMap<>(); + // 'Z'를 문자열 리터럴이 아닌 실제 타임존 기호(X)로 처리 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSX"); + + // 한국 시간을 UTC로 변환하는 헬퍼 메소드 (아래 정의) + params.put(dateParam1, formatToUtc(projection.getRangeFromDate() != null ? + projection.getRangeFromDate() : projection.getLastSuccessDate(), formatter)); + + LocalDateTime toDateTime = projection.getRangeToDate() != null ? + projection.getRangeToDate() : LocalDateTime.now(); + + params.put(dateParam2, formatToUtc(toDateTime, formatter)); + + return params; + }) + .orElseGet(() -> { + log.warn("해당 apiKey에 대한 데이터를 찾을 수 없습니다: {}", apiKey); + return new HashMap<>(); + }); + } + // 한국 시간(LocalDateTime)을 UTC 문자열로 변환하는 로직 private String formatToUtc(LocalDateTime localDateTime, DateTimeFormatter formatter) { if (localDateTime == null) return null;