From 6c98ebc24f5beea4ef1486c4f92ed687a57b38dc Mon Sep 17 00:00:00 2001 From: Kim JiMyeung Date: Mon, 8 Dec 2025 17:47:30 +0900 Subject: [PATCH] =?UTF-8?q?Destination,=20Transits,=20CurrentlyAt=20?= =?UTF-8?q?=EC=A6=9D=EB=B6=84Job?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/config/CurrentlyAtJobConfig.java | 103 +++++++++ .../batch/dto/CurrentlyAtDto.java | 37 +++ .../batch/dto/CurrentlyAtPositionDto.java | 17 ++ .../batch/entity/CurrentlyAtEntity.java | 41 ++++ .../batch/processor/CurrentlyAtProcessor.java | 71 ++++++ .../batch/reader/CurrentlyAtReader.java | 211 ++++++++++++++++++ .../repository/CurrentlyAtRepository.java | 13 ++ .../repository/CurrentlyAtRepositoryImpl.java | 211 ++++++++++++++++++ .../batch/writer/CurrentlyAtWriter.java | 36 +++ .../batch/config/ShipMovementJobConfig.java | 8 +- .../{PortCallDto.java => PortCallsDto.java} | 4 +- ...tionDto.java => PortCallsPositionDto.java} | 2 +- .../batch/dto/ShipMovementApiResponse.java | 2 +- .../processor/ShipMovementProcessor.java | 6 +- .../batch/reader/ShipMovementReader.java | 16 +- .../batch/config/AnchorageCallsJobConfig.java | 23 +- .../batch/dto/BerthCallsDto.java | 1 - .../batch/config/DarkActivityJobConfig.java | 12 +- .../batch/config/DestinationsJobConfig.java | 103 +++++++++ .../batch/dto/DestinationDto.java | 24 ++ .../batch/dto/DestinationPositionDto.java | 17 ++ .../batch/entity/DestinationEntity.java | 32 +++ .../batch/processor/DestinationProcessor.java | 61 +++++ .../batch/reader/DestinationReader.java | 211 ++++++++++++++++++ .../repository/DestinationRepository.java | 14 ++ .../repository/DestinationRepositoryImpl.java | 131 +++++++++++ .../batch/writer/DestinationWriter.java | 36 +++ .../batch/config/StsOperationJobConfig.java | 18 +- .../batch/dto/StsOperationDto.java | 1 - .../batch/config/TerminalCallsJobConfig.java | 13 +- .../batch/config/TransitsJobConfig.java | 103 +++++++++ .../batch/dto/TransitsDto.java | 13 ++ .../batch/entity/TransitsEntity.java | 21 ++ .../batch/processor/TransitsProcessor.java | 47 ++++ .../batch/reader/TransitsReader.java | 211 ++++++++++++++++++ .../repository/TransitlsRepositoryImpl.java | 108 +++++++++ .../batch/repository/TransitsRepository.java | 13 ++ .../batch/writer/TransitsWriter.java | 35 +++ 38 files changed, 1960 insertions(+), 66 deletions(-) create mode 100644 src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/config/CurrentlyAtJobConfig.java create mode 100644 src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/dto/CurrentlyAtDto.java create mode 100644 src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/dto/CurrentlyAtPositionDto.java create mode 100644 src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/entity/CurrentlyAtEntity.java create mode 100644 src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/processor/CurrentlyAtProcessor.java create mode 100644 src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/reader/CurrentlyAtReader.java create mode 100644 src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/repository/CurrentlyAtRepository.java create mode 100644 src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/repository/CurrentlyAtRepositoryImpl.java create mode 100644 src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/writer/CurrentlyAtWriter.java rename src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/{PortCallDto.java => PortCallsDto.java} (91%) rename src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/{PositionDto.java => PortCallsPositionDto.java} (90%) create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/config/DestinationsJobConfig.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/dto/DestinationDto.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/dto/DestinationPositionDto.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/entity/DestinationEntity.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/processor/DestinationProcessor.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/reader/DestinationReader.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/repository/DestinationRepository.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/repository/DestinationRepositoryImpl.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/writer/DestinationWriter.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/config/TransitsJobConfig.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/dto/TransitsDto.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/entity/TransitsEntity.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/processor/TransitsProcessor.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/reader/TransitsReader.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/repository/TransitlsRepositoryImpl.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/repository/TransitsRepository.java create mode 100644 src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/writer/TransitsWriter.java diff --git a/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/config/CurrentlyAtJobConfig.java b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/config/CurrentlyAtJobConfig.java new file mode 100644 index 0000000..b5a8909 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/config/CurrentlyAtJobConfig.java @@ -0,0 +1,103 @@ +package com.snp.batch.jobs.shipCurrentlyAt.batch.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.shipCurrentlyAt.batch.dto.CurrentlyAtDto; +import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity; +import com.snp.batch.jobs.shipCurrentlyAt.batch.processor.CurrentlyAtProcessor; +import com.snp.batch.jobs.shipCurrentlyAt.batch.reader.CurrentlyAtReader; +import com.snp.batch.jobs.shipCurrentlyAt.batch.writer.CurrentlyAtWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * 선박 상세 정보 Import Job Config + * + * 특징: + * - ship_data 테이블에서 IMO 번호 조회 + * - IMO 번호를 100개씩 배치로 분할 + * - Maritime API GetShipsByIHSLRorIMONumbers 호출 + * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 + * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) + * + * 데이터 흐름: + * CurrentlyAtReader (ship_data → Maritime API) + * ↓ (CurrentlyAtDto) + * CurrentlyAtProcessor + * ↓ (CurrentlyAtEntity) + * CurrentlyAtWriter + * ↓ (currentlyat 테이블) + */ + +@Slf4j +@Configuration +public class CurrentlyAtJobConfig extends BaseJobConfig { + + private final CurrentlyAtProcessor currentlyAtProcessor; + private final CurrentlyAtWriter currentlyAtWriter; + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeApiWebClient; + + public CurrentlyAtJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + CurrentlyAtProcessor currentlyAtProcessor, + CurrentlyAtWriter currentlyAtWriter, JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + super(jobRepository, transactionManager); + this.currentlyAtProcessor = currentlyAtProcessor; + this.currentlyAtWriter = currentlyAtWriter; + this.jdbcTemplate = jdbcTemplate; + this.maritimeApiWebClient = maritimeApiWebClient; + } + + @Override + protected String getJobName() { + return "CurrentlyAtImportJob"; + } + + @Override + protected String getStepName() { + return "CurrentlyAtImportStep"; + } + + @Override + protected ItemReader createReader() { // 타입 변경 + return new CurrentlyAtReader(maritimeApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return currentlyAtProcessor; + } + + @Override + protected ItemWriter createWriter() { // 타입 변경 + return currentlyAtWriter; + } + + @Override + protected int getChunkSize() { + return 50; // API에서 100개씩 가져오므로 chunk도 100으로 설정 + } + + @Bean(name = "CurrentlyAtImportJob") + public Job currentlyAtImportJob() { + return job(); + } + + @Bean(name = "CurrentlyAtImportStep") + public Step currentlyAtImportStep() { + return step(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/dto/CurrentlyAtDto.java b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/dto/CurrentlyAtDto.java new file mode 100644 index 0000000..2f1201e --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/dto/CurrentlyAtDto.java @@ -0,0 +1,37 @@ +package com.snp.batch.jobs.shipCurrentlyAt.batch.dto; + +import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsPositionDto; +import lombok.Data; + +@Data +public class CurrentlyAtDto { + private String movementType; + private String imolRorIHSNumber; + private String movementDate; + + private Integer portCallId; + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer subFacilityId; + private String subFacilityName; + private String subFacilityType; + + private Integer parentFacilityId; + private String parentFacilityName; + private String parentFacilityType; + + private String countryCode; + private String countryName; + + private Double draught; + private Double latitude; + private Double longitude; + + private PortCallsPositionDto position; + + private String destination; + private String iso2; +} diff --git a/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/dto/CurrentlyAtPositionDto.java b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/dto/CurrentlyAtPositionDto.java new file mode 100644 index 0000000..3f0df2a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/dto/CurrentlyAtPositionDto.java @@ -0,0 +1,17 @@ +package com.snp.batch.jobs.shipCurrentlyAt.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class CurrentlyAtPositionDto { + private boolean isNull; + private int stSrid; + private double lat; + @JsonProperty("long") + private double lon; + private double z; + private double m; + private boolean hasZ; + private boolean hasM; +} diff --git a/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/entity/CurrentlyAtEntity.java b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/entity/CurrentlyAtEntity.java new file mode 100644 index 0000000..c935894 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/entity/CurrentlyAtEntity.java @@ -0,0 +1,41 @@ +package com.snp.batch.jobs.shipCurrentlyAt.batch.entity; + +import com.fasterxml.jackson.databind.JsonNode; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.SequenceGenerator; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class CurrentlyAtEntity { + private String movementType; + private String imolRorIHSNumber; + private LocalDateTime movementDate; + private Integer portCallId; + private Integer facilityId; + private String facilityName; + private String facilityType; + private Integer subFacilityId; + private String subFacilityName; + private String subFacilityType; + private Integer parentFacilityId; + private String parentFacilityName; + private String parentFacilityType; + private String countryCode; + private String countryName; + private Double draught; + private Double latitude; + private Double longitude; + private String destination; + private String iso2; + private JsonNode position; +} diff --git a/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/processor/CurrentlyAtProcessor.java b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/processor/CurrentlyAtProcessor.java new file mode 100644 index 0000000..64256e7 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/processor/CurrentlyAtProcessor.java @@ -0,0 +1,71 @@ +package com.snp.batch.jobs.shipCurrentlyAt.batch.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.shipCurrentlyAt.batch.dto.CurrentlyAtDto; +import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 선박 상세 정보 Processor + * ShipDetailDto → ShipDetailEntity 변환 + */ + +/** + * 선박 상세 정보 Processor (해시 비교 및 증분 데이터 추출) + * I: ShipDetailComparisonData (DB 해시 + API Map Data) + * O: ShipDetailUpdate (변경분) + */ +@Slf4j +@Component +public class CurrentlyAtProcessor extends BaseProcessor { + + private final ObjectMapper objectMapper; + + public CurrentlyAtProcessor(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + protected CurrentlyAtEntity processItem(CurrentlyAtDto dto) throws Exception { + log.debug("Currently 정보 처리 시작: imoNumber={}, facilityName={}", + dto.getImolRorIHSNumber(), dto.getFacilityName()); + + JsonNode positionNode = null; + if (dto.getPosition() != null) { + // Position 객체를 JsonNode로 변환 + positionNode = objectMapper.valueToTree(dto.getPosition()); + } + + CurrentlyAtEntity entity = CurrentlyAtEntity.builder() + .movementType(dto.getMovementType()) + .imolRorIHSNumber(dto.getImolRorIHSNumber()) + .movementDate(LocalDateTime.parse(dto.getMovementDate())) + .portCallId(dto.getPortCallId()) + .facilityId(dto.getFacilityId()) + .facilityName(dto.getFacilityName()) + .facilityType(dto.getFacilityType()) + .subFacilityId(dto.getSubFacilityId()) + .subFacilityName(dto.getSubFacilityName()) + .subFacilityType(dto.getSubFacilityType()) + .parentFacilityId(dto.getParentFacilityId()) + .parentFacilityName(dto.getParentFacilityName()) + .parentFacilityType(dto.getParentFacilityType()) + .countryCode(dto.getCountryCode()) + .countryName(dto.getCountryName()) + .draught(dto.getDraught()) + .latitude(dto.getLatitude()) + .longitude(dto.getLongitude()) + .destination(dto.getDestination()) + .iso2(dto.getIso2()) + .position(positionNode) // JsonNode로 매핑 + .build(); + + return entity; + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/reader/CurrentlyAtReader.java b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/reader/CurrentlyAtReader.java new file mode 100644 index 0000000..e25bf41 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/reader/CurrentlyAtReader.java @@ -0,0 +1,211 @@ +package com.snp.batch.jobs.shipCurrentlyAt.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.shipCurrentlyAt.batch.dto.CurrentlyAtDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; + +/** + * 선박 상세 정보 Reader (v2.0 - Chunk 기반) + *

+ * 기능: + * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) + * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 + * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 + * 4. Spring Batch가 100건씩 Process → Write 수행 + *

+ * Chunk 처리 흐름: + * - beforeFetch() → IMO 전체 조회 (1회) + * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) + * - read() → 1건씩 반환 (100번) + * - Processor/Writer → 100건 처리 + * - 반복... (1,718번의 Chunk) + *

+ * 기존 방식과의 차이: + * - 기존: 17만건 전체 메모리 로드 → Process → Write + * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) + */ +@Slf4j +@StepScope +public class CurrentlyAtReader extends BaseApiReader { + + private final JdbcTemplate jdbcTemplate; + + // 배치 처리 상태 + private List allImoNumbers; + // DB 해시값을 저장할 맵 + private int currentBatchIndex = 0; + private final int batchSize = 10; + +// @Value("#{jobParameters['startDate']}") +// private String startDate; +// private String startDate = "2025-01-01"; + +// @Value("#{jobParameters['stopDate']}") +// private String stopDate; +// private String stopDate = "2024-12-31"; + + public CurrentlyAtReader(WebClient webClient, JdbcTemplate jdbcTemplate) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "CurrentlyAtReader"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + } + + @Override + protected String getApiPath() { + return "/Movements/CurrentlyAt"; + } + + @Override + protected String getApiBaseUrl() { + return "https://webservices.maritime.spglobal.com"; + } + + private static final String GET_ALL_IMO_QUERY = + "SELECT imo_number FROM ship_data ORDER BY id"; +// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_currentlyat) ORDER BY imo_number"; + + + /** + * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 + */ + @Override + protected void beforeFetch() { + // 전처리 과정 + // Step 1. IMO 전체 번호 조회 + log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + // API 통계 초기화 + updateApiCallStats(totalBatches, 0); + } + + /** + * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 + *

+ * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 + * + * @return 다음 배치 100건 (더 이상 없으면 null) + */ + @Override + protected List fetchNextBatch() throws Exception { + + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + List response = callApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + + // 응답 처리 + if (response != null) { + List portCalls = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, portCalls.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return portCalls; + + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + /** + * Query Parameter를 사용한 API 호출 + * + * @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") + * @return API 응답 + */ + private List callApiWithBatch(String lrno) { + String url = getApiPath() + "?lrno=" + lrno; + + log.debug("[{}] API 호출: {}", getReaderName(), url); + + return webClient.get() + .uri(url) + .retrieve() + .bodyToFlux(CurrentlyAtDto.class) + .collectList() + .block(); + } + + @Override + protected void afterFetch(List data) { + if (data == null) { + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); + log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", + getReaderName(), allImoNumbers.size()); + } + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/repository/CurrentlyAtRepository.java b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/repository/CurrentlyAtRepository.java new file mode 100644 index 0000000..30f12da --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/repository/CurrentlyAtRepository.java @@ -0,0 +1,13 @@ +package com.snp.batch.jobs.shipCurrentlyAt.batch.repository; + +import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity; + +import java.util.List; + +/** + * 선박 상세 정보 Repository 인터페이스 + */ + +public interface CurrentlyAtRepository { + void saveAll(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/repository/CurrentlyAtRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/repository/CurrentlyAtRepositoryImpl.java new file mode 100644 index 0000000..fc25a22 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/repository/CurrentlyAtRepositoryImpl.java @@ -0,0 +1,211 @@ +package com.snp.batch.jobs.shipCurrentlyAt.batch.repository; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity; +import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; +import com.snp.batch.jobs.shipMovement.batch.repository.ShipMovementRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; + +/** + * 선박 상세 정보 Repository 구현체 + * BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현 + */ +@Slf4j +@Repository("CurrentlyAtRepository") +public class CurrentlyAtRepositoryImpl extends BaseJdbcRepository + implements CurrentlyAtRepository { + + public CurrentlyAtRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override + protected String getTableName() { + return "snp_data.t_currentlyat"; + } + + @Override + protected String getEntityName() { + return "CurrentlyAt"; + } + + @Override + protected String extractId(CurrentlyAtEntity entity) { + return entity.getImolRorIHSNumber(); + } + + @Override + public String getInsertSql() { + return """ + INSERT INTO snp_data.t_currentlyat( + imo, + mvmn_type, + mvmn_dt, + stpov_id, + fclty_id, + fclty_nm, + fclty_type, + lwrnk_fclty_id, + lwrnk_fclty_nm, + lwrnk_fclty_type, + up_fclty_id, + up_fclty_nm, + up_fclty_type, + ntn_cd, + ntn_nm, + draft, + lat, + lon, + dstn, + iso2_ntn_cd, + lcinfo + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (imo, mvmn_dt) + DO UPDATE SET + mvmn_type = EXCLUDED.mvmn_type, + mvmn_dt = EXCLUDED.mvmn_dt, + stpov_id = EXCLUDED.stpov_id, + fclty_id = EXCLUDED.fclty_id, + fclty_nm = EXCLUDED.fclty_nm, + fclty_type = EXCLUDED.fclty_type, + lwrnk_fclty_id = EXCLUDED.lwrnk_fclty_id, + lwrnk_fclty_nm = EXCLUDED.lwrnk_fclty_nm, + lwrnk_fclty_type = EXCLUDED.lwrnk_fclty_type, + up_fclty_id = EXCLUDED.up_fclty_id, + up_fclty_nm = EXCLUDED.up_fclty_nm, + up_fclty_type = EXCLUDED.up_fclty_type, + ntn_cd = EXCLUDED.ntn_cd, + ntn_nm = EXCLUDED.ntn_nm, + draft = EXCLUDED.draft, + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + dstn = EXCLUDED.dstn, + iso2_ntn_cd = EXCLUDED.iso2_ntn_cd, + lcinfo = EXCLUDED.lcinfo + """; + } + + @Override + protected String getUpdateSql() { + return null; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, CurrentlyAtEntity e) throws Exception { + int i = 1; + ps.setString(i++, e.getImolRorIHSNumber()); // imo + ps.setString(i++, e.getMovementType()); // mvmn_type + ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt + ps.setObject(i++, e.getPortCallId()); // stpov_id + ps.setObject(i++, e.getFacilityId()); // fclty_id + ps.setString(i++, e.getFacilityName()); // fclty_nm + ps.setString(i++, e.getFacilityType()); // fclty_type + ps.setObject(i++, e.getSubFacilityId()); // lwrnk_fclty_id + ps.setString(i++, e.getSubFacilityName()); // lwrnk_fclty_nm + ps.setString(i++, e.getSubFacilityType()); // lwrnk_fclty_type + ps.setObject(i++, e.getParentFacilityId()); // up_fclty_id + ps.setString(i++, e.getParentFacilityName()); // up_fclty_nm + ps.setString(i++, e.getParentFacilityType()); // up_fclty_type + ps.setString(i++, e.getCountryCode()); // ntn_cd + ps.setString(i++, e.getCountryName()); // ntn_nm + setDoubleOrNull(ps, i++, e.getDraught()); // draft + setDoubleOrNull(ps, i++, e.getLatitude()); // lat + setDoubleOrNull(ps, i++, e.getLongitude());// lon + ps.setString(i++, e.getDestination()); // dstn + ps.setString(i++, e.getIso2()); // iso2_ntn_cd + + if (e.getPosition() != null) { + ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb) + } else { + ps.setNull(i++, java.sql.Types.OTHER); + } + +// ps.setString(i++, e.getSchemaType()); + + } + + private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception { + if (value != null) { + ps.setDouble(index, value); + } else { + // java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정 + ps.setNull(index, java.sql.Types.DOUBLE); + } + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, CurrentlyAtEntity entity) throws Exception { + + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + public void saveAll(List entities) { + if (entities == null || entities.isEmpty()) return; + + log.info("CurrentltAt 저장 시작 = {}건", entities.size()); + batchInsert(entities); + + } + + + /*private static class ShipMovementRowMapper implements RowMapper { + @Override + public ShipMovementEntity mapRow(ResultSet rs, int rowNum) throws SQLException { + ShipMovementEntity entity = ShipMovementEntity.builder() + .id(rs.getLong("id")) + .imolRorIHSNumber(rs.getString("imolRorIHSNumber")) + .portCallId(rs.getObject("portCallId", Integer.class)) + .facilityId(rs.getObject("facilityId", Integer.class)) + .facilityName(rs.getString("facilityName")) + .facilityType(rs.getString("facilityType")) + .subFacilityId(rs.getObject("subFacilityId", Integer.class)) + .subFacilityName(rs.getString("subFacilityName")) + .subFacilityType(rs.getString("subFacilityType")) + .parentFacilityId(rs.getObject("parentFacilityId", Integer.class)) + .parentFacilityName(rs.getString("parentFacilityName")) + .parentFacilityType(rs.getString("parentFacilityType")) + .countryCode(rs.getString("countryCode")) + .countryName(rs.getString("countryName")) + .draught(rs.getObject("draught", Double.class)) + .latitude(rs.getObject("latitude", Double.class)) + .longitude(rs.getObject("longitude", Double.class)) + .destination(rs.getString("destination")) + .iso2(rs.getString("iso2")) + .position(parseJson(rs.getString("position"))) + .schemaType(rs.getString("schemaType")) + .build(); + + Timestamp movementDate = rs.getTimestamp("movementDate"); + if (movementDate != null) { + entity.setMovementDate(movementDate.toLocalDateTime()); + } + + return entity; + } + + private JsonNode parseJson(String json) { + try { + if (json == null) return null; + return new ObjectMapper().readTree(json); + } catch (Exception e) { + throw new RuntimeException("JSON 파싱 오류: " + json); + } + } + }*/ +} diff --git a/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/writer/CurrentlyAtWriter.java b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/writer/CurrentlyAtWriter.java new file mode 100644 index 0000000..c84e7f0 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipCurrentlyAt/batch/writer/CurrentlyAtWriter.java @@ -0,0 +1,36 @@ +package com.snp.batch.jobs.shipCurrentlyAt.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity; +import com.snp.batch.jobs.shipCurrentlyAt.batch.repository.CurrentlyAtRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 선박 상세 정보 Writer + */ +@Slf4j +@Component +public class CurrentlyAtWriter extends BaseWriter { + + private final CurrentlyAtRepository currentlyAtRepository; + + + public CurrentlyAtWriter(CurrentlyAtRepository currentlyAtRepository) { + super("CurrentlyAt"); + this.currentlyAtRepository = currentlyAtRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + + if (items.isEmpty()) { return; } + + currentlyAtRepository.saveAll(items); + log.info("CurrentlyAt 데이터 저장: {} 건", items.size()); + + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovement/batch/config/ShipMovementJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovement/batch/config/ShipMovementJobConfig.java index 674a579..c840630 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovement/batch/config/ShipMovementJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipMovement/batch/config/ShipMovementJobConfig.java @@ -2,7 +2,7 @@ package com.snp.batch.jobs.shipMovement.batch.config; import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.config.BaseJobConfig; -import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto; +import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsDto; import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; import com.snp.batch.jobs.shipMovement.batch.processor.ShipMovementProcessor; import com.snp.batch.jobs.shipMovement.batch.reader.ShipMovementReader; @@ -47,7 +47,7 @@ import java.time.format.DateTimeFormatter; @Slf4j @Configuration -public class ShipMovementJobConfig extends BaseJobConfig { +public class ShipMovementJobConfig extends BaseJobConfig { private final ShipMovementProcessor shipMovementProcessor; private final ShipMovementWriter shipMovementWriter; @@ -101,14 +101,14 @@ public class ShipMovementJobConfig extends BaseJobConfig createReader() { // 타입 변경 + protected ItemReader createReader() { // 타입 변경 // Reader 생성자 수정: ObjectMapper를 전달합니다. return shipMovementReader(null, null); //return new ShipMovementReader(maritimeApiWebClient, jdbcTemplate, objectMapper); } @Override - protected ItemProcessor createProcessor() { + protected ItemProcessor createProcessor() { return shipMovementProcessor; } diff --git a/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PortCallDto.java b/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PortCallsDto.java similarity index 91% rename from src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PortCallDto.java rename to src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PortCallsDto.java index 6d02359..c97db50 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PortCallDto.java +++ b/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PortCallsDto.java @@ -3,7 +3,7 @@ package com.snp.batch.jobs.shipMovement.batch.dto; import lombok.Data; @Data -public class PortCallDto { +public class PortCallsDto { private String movementType; private String imolRorIHSNumber; private String movementDate; @@ -29,7 +29,7 @@ public class PortCallDto { private Double latitude; private Double longitude; - private PositionDto position; + private PortCallsPositionDto position; private String destination; private String iso2; diff --git a/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PositionDto.java b/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PortCallsPositionDto.java similarity index 90% rename from src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PositionDto.java rename to src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PortCallsPositionDto.java index 9a367ba..8906ba0 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PositionDto.java +++ b/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/PortCallsPositionDto.java @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; @Data -public class PositionDto { +public class PortCallsPositionDto { private boolean isNull; private int stSrid; private double lat; diff --git a/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/ShipMovementApiResponse.java b/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/ShipMovementApiResponse.java index effef52..eb8fae8 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/ShipMovementApiResponse.java +++ b/src/main/java/com/snp/batch/jobs/shipMovement/batch/dto/ShipMovementApiResponse.java @@ -8,5 +8,5 @@ import java.util.List; @Data public class ShipMovementApiResponse { @JsonProperty("portCalls") - List portCallList; + List portCallList; } diff --git a/src/main/java/com/snp/batch/jobs/shipMovement/batch/processor/ShipMovementProcessor.java b/src/main/java/com/snp/batch/jobs/shipMovement/batch/processor/ShipMovementProcessor.java index a270089..102e404 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovement/batch/processor/ShipMovementProcessor.java +++ b/src/main/java/com/snp/batch/jobs/shipMovement/batch/processor/ShipMovementProcessor.java @@ -3,7 +3,7 @@ package com.snp.batch.jobs.shipMovement.batch.processor; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.processor.BaseProcessor; -import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto; +import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsDto; import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -22,7 +22,7 @@ import java.time.LocalDateTime; */ @Slf4j @Component -public class ShipMovementProcessor extends BaseProcessor { +public class ShipMovementProcessor extends BaseProcessor { private final ObjectMapper objectMapper; @@ -31,7 +31,7 @@ public class ShipMovementProcessor extends BaseProcessor { +public class ShipMovementReader extends BaseApiReader { private final JdbcTemplate jdbcTemplate; private final ObjectMapper objectMapper; @@ -125,7 +119,7 @@ public class ShipMovementReader extends BaseApiReader { * @return 다음 배치 100건 (더 이상 없으면 null) */ @Override - protected List fetchNextBatch() throws Exception { + protected List fetchNextBatch() throws Exception { // 모든 배치 처리 완료 확인 if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { @@ -158,7 +152,7 @@ public class ShipMovementReader extends BaseApiReader { // 응답 처리 if (response != null && response.getPortCallList() != null) { - List portCalls = response.getPortCallList(); + List portCalls = response.getPortCallList(); log.info("[{}] 배치 {}/{} 완료: {} 건 조회", getReaderName(), currentBatchNumber, totalBatches, portCalls.size()); @@ -213,7 +207,7 @@ public class ShipMovementReader extends BaseApiReader { } @Override - protected void afterFetch(List data) { + protected void afterFetch(List data) { if (data == null) { int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/config/AnchorageCallsJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/config/AnchorageCallsJobConfig.java index 77a35d7..dcfd05b 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/config/AnchorageCallsJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/config/AnchorageCallsJobConfig.java @@ -1,12 +1,6 @@ package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.config; -import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.config.BaseJobConfig; -import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto; -import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; -import com.snp.batch.jobs.shipMovement.batch.processor.ShipMovementProcessor; -import com.snp.batch.jobs.shipMovement.batch.reader.ShipMovementReader; -import com.snp.batch.jobs.shipMovement.batch.writer.ShipMovementWriter; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.processor.AnchorageCallsProcessor; @@ -15,22 +9,17 @@ import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.writer.AnchorageCalls import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; -import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.WebClient; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; - /** * 선박 상세 정보 Import Job Config * @@ -42,12 +31,12 @@ import java.time.format.DateTimeFormatter; * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) * * 데이터 흐름: - * ShipMovementReader (ship_data → Maritime API) - * ↓ (PortCallDto) - * ShipMovementProcessor - * ↓ (ShipMovementEntity) - * ShipDetailDataWriter - * ↓ (ship_movement 테이블) + * AnchorageCallsReader (ship_data → Maritime API) + * ↓ (AnchorageCallsDto) + * AnchorageCallsProcessor + * ↓ (AnchorageCallsEntity) + * AnchorageCallsWriter + * ↓ (t_anchoragecall 테이블) */ @Slf4j diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsDto.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsDto.java index 03b813e..9483216 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsDto.java +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsDto.java @@ -1,6 +1,5 @@ package com.snp.batch.jobs.shipMovementBerthCalls.batch.dto; -import com.snp.batch.jobs.shipMovement.batch.dto.PositionDto; import lombok.Data; @Data diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/config/DarkActivityJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/config/DarkActivityJobConfig.java index 0076a18..a370b0b 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/config/DarkActivityJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/config/DarkActivityJobConfig.java @@ -32,12 +32,12 @@ import org.springframework.web.reactive.function.client.WebClient; * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) * * 데이터 흐름: - * ShipMovementReader (ship_data → Maritime API) - * ↓ (PortCallDto) - * ShipMovementProcessor - * ↓ (ShipMovementEntity) - * ShipDetailDataWriter - * ↓ (ship_movement 테이블) + * DarkActivityReader (ship_data → Maritime API) + * ↓ (DarkActivityDto) + * DarkActivityProcessor + * ↓ (DarkActivityEntity) + * DarkActivityWriter + * ↓ (t_darkactivity 테이블) */ @Slf4j diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/config/DestinationsJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/config/DestinationsJobConfig.java new file mode 100644 index 0000000..807741e --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/config/DestinationsJobConfig.java @@ -0,0 +1,103 @@ +package com.snp.batch.jobs.shipMovementDestination.batch.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.shipMovementDestination.batch.dto.DestinationDto; +import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity; +import com.snp.batch.jobs.shipMovementDestination.batch.processor.DestinationProcessor; +import com.snp.batch.jobs.shipMovementDestination.batch.reader.DestinationReader; +import com.snp.batch.jobs.shipMovementDestination.batch.writer.DestinationWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * 선박 상세 정보 Import Job Config + * + * 특징: + * - ship_data 테이블에서 IMO 번호 조회 + * - IMO 번호를 100개씩 배치로 분할 + * - Maritime API GetShipsByIHSLRorIMONumbers 호출 + * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 + * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) + * + * 데이터 흐름: + * DestinationReader (ship_data → Maritime API) + * ↓ (DestinationDto) + * DestinationProcessor + * ↓ (DestinationEntity) + * DestinationProcessor + * ↓ (t_destination 테이블) + */ + +@Slf4j +@Configuration +public class DestinationsJobConfig extends BaseJobConfig { + + private final DestinationProcessor destinationProcessor; + private final DestinationWriter destinationWriter; + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeApiWebClient; + + public DestinationsJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + DestinationProcessor destinationProcessor, + DestinationWriter destinationWriter, JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + super(jobRepository, transactionManager); + this.destinationProcessor = destinationProcessor; + this.destinationWriter = destinationWriter; + this.jdbcTemplate = jdbcTemplate; + this.maritimeApiWebClient = maritimeApiWebClient; + } + + @Override + protected String getJobName() { + return "DestinationsImportJob"; + } + + @Override + protected String getStepName() { + return "DestinationsImportStep"; + } + + @Override + protected ItemReader createReader() { // 타입 변경 + return new DestinationReader(maritimeApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return destinationProcessor; + } + + @Override + protected ItemWriter createWriter() { // 타입 변경 + return destinationWriter; + } + + @Override + protected int getChunkSize() { + return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정 + } + + @Bean(name = "DestinationsImportJob") + public Job destinationsImportJob() { + return job(); + } + + @Bean(name = "DestinationsImportStep") + public Step destinationsImportStep() { + return step(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/dto/DestinationDto.java b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/dto/DestinationDto.java new file mode 100644 index 0000000..c150ee4 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/dto/DestinationDto.java @@ -0,0 +1,24 @@ +package com.snp.batch.jobs.shipMovementDestination.batch.dto; + +import lombok.Data; + +@Data +public class DestinationDto { + private String movementType; + private String imolRorIHSNumber; + private String movementDate; + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private String countryCode; + private String countryName; + + private Double latitude; + private Double longitude; + + private DestinationPositionDto position; + + private String iso2; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/dto/DestinationPositionDto.java b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/dto/DestinationPositionDto.java new file mode 100644 index 0000000..f600d28 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/dto/DestinationPositionDto.java @@ -0,0 +1,17 @@ +package com.snp.batch.jobs.shipMovementDestination.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class DestinationPositionDto { + private boolean isNull; + private int stSrid; + private double lat; + @JsonProperty("long") + private double lon; + private double z; + private double m; + private boolean hasZ; + private boolean hasM; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/entity/DestinationEntity.java b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/entity/DestinationEntity.java new file mode 100644 index 0000000..fa2a23a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/entity/DestinationEntity.java @@ -0,0 +1,32 @@ +package com.snp.batch.jobs.shipMovementDestination.batch.entity; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class DestinationEntity { + private String movementType; + private String imolRorIHSNumber; + private LocalDateTime movementDate; + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private String countryCode; + private String countryName; + + private Double latitude; + private Double longitude; + + private JsonNode position; + private String iso2; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/processor/DestinationProcessor.java b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/processor/DestinationProcessor.java new file mode 100644 index 0000000..8379fe5 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/processor/DestinationProcessor.java @@ -0,0 +1,61 @@ +package com.snp.batch.jobs.shipMovementDestination.batch.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.shipMovementDestination.batch.dto.DestinationDto; +import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 선박 상세 정보 Processor + * ShipDetailDto → ShipDetailEntity 변환 + */ + +/** + * 선박 상세 정보 Processor (해시 비교 및 증분 데이터 추출) + * I: ShipDetailComparisonData (DB 해시 + API Map Data) + * O: ShipDetailUpdate (변경분) + */ +@Slf4j +@Component +public class DestinationProcessor extends BaseProcessor { + + private final ObjectMapper objectMapper; + + public DestinationProcessor(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + protected DestinationEntity processItem(DestinationDto dto) throws Exception { + log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", + dto.getImolRorIHSNumber(), dto.getFacilityName()); + + JsonNode positionNode = null; + if (dto.getPosition() != null) { + // Position 객체를 JsonNode로 변환 + positionNode = objectMapper.valueToTree(dto.getPosition()); + } + + DestinationEntity entity = DestinationEntity.builder() + .movementType(dto.getMovementType()) + .imolRorIHSNumber(dto.getImolRorIHSNumber()) + .movementDate(LocalDateTime.parse(dto.getMovementDate())) + .facilityId(dto.getFacilityId()) + .facilityName(dto.getFacilityName()) + .facilityType(dto.getFacilityType()) + .countryCode(dto.getCountryCode()) + .countryName(dto.getCountryName()) + .latitude(dto.getLatitude()) + .longitude(dto.getLongitude()) + .position(positionNode) // JsonNode로 매핑 + .iso2(dto.getIso2()) + .build(); + return entity; + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/reader/DestinationReader.java b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/reader/DestinationReader.java new file mode 100644 index 0000000..3220826 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/reader/DestinationReader.java @@ -0,0 +1,211 @@ +package com.snp.batch.jobs.shipMovementDestination.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.shipMovementDestination.batch.dto.DestinationDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * 선박 상세 정보 Reader (v2.0 - Chunk 기반) + * + * 기능: + * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) + * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 + * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 + * 4. Spring Batch가 100건씩 Process → Write 수행 + * + * Chunk 처리 흐름: + * - beforeFetch() → IMO 전체 조회 (1회) + * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) + * - read() → 1건씩 반환 (100번) + * - Processor/Writer → 100건 처리 + * - 반복... (1,718번의 Chunk) + * + * 기존 방식과의 차이: + * - 기존: 17만건 전체 메모리 로드 → Process → Write + * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) + */ +@Slf4j +@StepScope +public class DestinationReader extends BaseApiReader { + + private final JdbcTemplate jdbcTemplate; + + // 배치 처리 상태 + private List allImoNumbers; + // DB 해시값을 저장할 맵 + private int currentBatchIndex = 0; + private final int batchSize = 5; + + // @Value("#{jobParameters['startDate']}") +// private String startDate; + private String startDate = "2025-01-01"; + + // @Value("#{jobParameters['stopDate']}") +// private String stopDate; + private String stopDate = "2025-12-31"; + + public DestinationReader(WebClient webClient, JdbcTemplate jdbcTemplate ) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "Destinations"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + } + + @Override + protected String getApiPath() { + return "/Movements/Destinations"; + } + + @Override + protected String getApiBaseUrl() { + return "https://webservices.maritime.spglobal.com"; + } + + private static final String GET_ALL_IMO_QUERY = +// "SELECT imo_number FROM ship_data ORDER BY id"; + "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_destination) ORDER BY imo_number"; + + /** + * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 + */ + @Override + protected void beforeFetch() { + // 전처리 과정 + // Step 1. IMO 전체 번호 조회 + log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + // API 통계 초기화 + updateApiCallStats(totalBatches, 0); + } + + /** + * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 + * + * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 + * + * @return 다음 배치 100건 (더 이상 없으면 null) + */ + @Override + protected List fetchNextBatch() throws Exception { + + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + List response = callApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + + // 응답 처리 + if (response != null ) { + List destinations = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, destinations.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return destinations; + + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + /** + * Query Parameter를 사용한 API 호출 + * + * @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") + * @return API 응답 + */ + private List callApiWithBatch(String lrno) { + String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno; + + log.debug("[{}] API 호출: {}", getReaderName(), url); + + return webClient.get() + .uri(url) + .retrieve() + .bodyToFlux(DestinationDto.class) + .collectList() + .block(); + } + + @Override + protected void afterFetch(List data) { + if (data == null) { + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); + log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", + getReaderName(), allImoNumbers.size()); + } + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/repository/DestinationRepository.java b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/repository/DestinationRepository.java new file mode 100644 index 0000000..4613e37 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/repository/DestinationRepository.java @@ -0,0 +1,14 @@ +package com.snp.batch.jobs.shipMovementDestination.batch.repository; + +import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity; + +import java.util.List; + +/** + * 선박 상세 정보 Repository 인터페이스 + */ + +public interface DestinationRepository { + void saveAll(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/repository/DestinationRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/repository/DestinationRepositoryImpl.java new file mode 100644 index 0000000..d864265 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/repository/DestinationRepositoryImpl.java @@ -0,0 +1,131 @@ +package com.snp.batch.jobs.shipMovementDestination.batch.repository; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.Timestamp; +import java.util.List; + +/** + * 선박 상세 정보 Repository 구현체 + * BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현 + */ +@Slf4j +@Repository("DestinationRepository") +public class DestinationRepositoryImpl extends BaseJdbcRepository + implements DestinationRepository { + + public DestinationRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override + protected String getTableName() { + return "snp_data.t_destination"; + } + + @Override + protected String getEntityName() { + return "Destinations"; + } + + @Override + protected String extractId(DestinationEntity entity) { + return entity.getImolRorIHSNumber(); + } + + @Override + public String getInsertSql() { + return """ + INSERT INTO snp_data.t_destination( + imo, + mvmn_type, + mvmn_dt, + fclty_id, + fclty_nm, + fclty_type, + ntn_cd, + ntn_nm, + lat, + lon, + iso2_ntn_cd, + lcinfo + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (imo, mvmn_dt) + DO UPDATE SET + mvmn_type = EXCLUDED.mvmn_type, + mvmn_dt = EXCLUDED.mvmn_dt, + fclty_id = EXCLUDED.fclty_id, + fclty_nm = EXCLUDED.fclty_nm, + fclty_type = EXCLUDED.fclty_type, + ntn_cd = EXCLUDED.ntn_cd, + ntn_nm = EXCLUDED.ntn_nm, + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + iso2_ntn_cd = EXCLUDED.iso2_ntn_cd, + lcinfo = EXCLUDED.lcinfo + """; + } + + @Override + protected String getUpdateSql() { + return null; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, DestinationEntity e) throws Exception { + int i = 1; + ps.setString(i++, e.getImolRorIHSNumber()); // imo + ps.setString(i++, e.getMovementType()); // mvmn_type + ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt + ps.setObject(i++, e.getFacilityId()); // fclty_id + ps.setString(i++, e.getFacilityName()); // fclty_nm + ps.setString(i++, e.getFacilityType()); // fclty_type + ps.setString(i++, e.getCountryCode()); // ntn_cd + ps.setString(i++, e.getCountryName()); // ntn_nm + setDoubleOrNull(ps, i++, e.getLatitude()); // lat + setDoubleOrNull(ps, i++, e.getLongitude());// lon + ps.setString(i++, e.getIso2()); // iso2_ntn_cd + + if (e.getPosition() != null) { + ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb) + } else { + ps.setNull(i++, java.sql.Types.OTHER); + } + } + + private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception { + if (value != null) { + ps.setDouble(index, value); + } else { + // java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정 + ps.setNull(index, java.sql.Types.DOUBLE); + } + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, DestinationEntity entity) throws Exception { + + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + public void saveAll(List entities) { + if (entities == null || entities.isEmpty()) return; + + log.info("Destinations 저장 시작 = {}건", entities.size()); + batchInsert(entities); + + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/writer/DestinationWriter.java b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/writer/DestinationWriter.java new file mode 100644 index 0000000..be05993 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDestination/batch/writer/DestinationWriter.java @@ -0,0 +1,36 @@ +package com.snp.batch.jobs.shipMovementDestination.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity; +import com.snp.batch.jobs.shipMovementDestination.batch.repository.DestinationRepository; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 선박 상세 정보 Writer + */ +@Slf4j +@Component +public class DestinationWriter extends BaseWriter { + + private final DestinationRepository destinationRepository; + + + public DestinationWriter(DestinationRepository destinationRepository) { + super("Destinations"); + this.destinationRepository = destinationRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + + if (items.isEmpty()) { return; } + + destinationRepository.saveAll(items); + log.info("Destinations 데이터 저장: {} 건", items.size()); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/config/StsOperationJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/config/StsOperationJobConfig.java index 1d57569..d2a5cce 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/config/StsOperationJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/config/StsOperationJobConfig.java @@ -1,12 +1,6 @@ package com.snp.batch.jobs.shipMovementStsOperations.batch.config; -import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.config.BaseJobConfig; -import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsDto; -import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; -import com.snp.batch.jobs.shipMovementBerthCalls.batch.processor.BerthCallsProcessor; -import com.snp.batch.jobs.shipMovementBerthCalls.batch.reader.BerthCallsReader; -import com.snp.batch.jobs.shipMovementBerthCalls.batch.writer.BerthCallsWriter; import com.snp.batch.jobs.shipMovementStsOperations.batch.dto.StsOperationDto; import com.snp.batch.jobs.shipMovementStsOperations.batch.entity.StsOperationEntity; import com.snp.batch.jobs.shipMovementStsOperations.batch.processor.StsOperationProcessor; @@ -37,12 +31,12 @@ import org.springframework.web.reactive.function.client.WebClient; * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) * * 데이터 흐름: - * ShipMovementReader (ship_data → Maritime API) - * ↓ (PortCallDto) - * ShipMovementProcessor - * ↓ (ShipMovementEntity) - * ShipDetailDataWriter - * ↓ (ship_movement 테이블) + * StsOperationReader (ship_data → Maritime API) + * ↓ (StsOperationDto) + * StsOperationProcessor + * ↓ (StsOperationEntity) + * StsOperationWriter + * ↓ (t_stsoperation 테이블) */ @Slf4j diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationDto.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationDto.java index 16c8db8..0a7fca7 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationDto.java +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationDto.java @@ -1,6 +1,5 @@ package com.snp.batch.jobs.shipMovementStsOperations.batch.dto; -import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsPositionDto; import lombok.Data; @Data diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/config/TerminalCallsJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/config/TerminalCallsJobConfig.java index a298f9e..a221b25 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/config/TerminalCallsJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/config/TerminalCallsJobConfig.java @@ -1,6 +1,5 @@ package com.snp.batch.jobs.shipMovementTerminalCalls.batch.config; -import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto.TerminalCallsDto; import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity; @@ -32,12 +31,12 @@ import org.springframework.web.reactive.function.client.WebClient; * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) * * 데이터 흐름: - * ShipMovementReader (ship_data → Maritime API) - * ↓ (PortCallDto) - * ShipMovementProcessor - * ↓ (ShipMovementEntity) - * ShipDetailDataWriter - * ↓ (ship_movement 테이블) + * TerminalCallsReader (ship_data → Maritime API) + * ↓ (TerminalCallsDto) + * TerminalCallsProcessor + * ↓ (TerminalCallsEntity) + * TerminalCallsWriter + * ↓ (t_terminalcall 테이블) */ @Slf4j diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/config/TransitsJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/config/TransitsJobConfig.java new file mode 100644 index 0000000..f6d65ba --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/config/TransitsJobConfig.java @@ -0,0 +1,103 @@ +package com.snp.batch.jobs.shipMovementTransits.batch.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.shipMovementTransits.batch.dto.TransitsDto; +import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity; +import com.snp.batch.jobs.shipMovementTransits.batch.processor.TransitsProcessor; +import com.snp.batch.jobs.shipMovementTransits.batch.reader.TransitsReader; +import com.snp.batch.jobs.shipMovementTransits.batch.writer.TransitsWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * 선박 상세 정보 Import Job Config + * + * 특징: + * - ship_data 테이블에서 IMO 번호 조회 + * - IMO 번호를 100개씩 배치로 분할 + * - Maritime API GetShipsByIHSLRorIMONumbers 호출 + * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 + * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) + * + * 데이터 흐름: + * TransitsReader (ship_data → Maritime API) + * ↓ (TransitsDto) + * TransitsProcessor + * ↓ (TransitsEntity) + * TransitsWriter + * ↓ (t_transit 테이블) + */ + +@Slf4j +@Configuration +public class TransitsJobConfig extends BaseJobConfig { + + private final TransitsProcessor transitsProcessor; + private final TransitsWriter transitsWriter; + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeApiWebClient; + + public TransitsJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + TransitsProcessor TransitsProcessor, + TransitsWriter transitsWriter, JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + super(jobRepository, transactionManager); + this.transitsProcessor = TransitsProcessor; + this.transitsWriter = transitsWriter; + this.jdbcTemplate = jdbcTemplate; + this.maritimeApiWebClient = maritimeApiWebClient; + } + + @Override + protected String getJobName() { + return "TransitsImportJob"; + } + + @Override + protected String getStepName() { + return "TransitsImportStep"; + } + + @Override + protected ItemReader createReader() { // 타입 변경 + return new TransitsReader(maritimeApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return transitsProcessor; + } + + @Override + protected ItemWriter createWriter() { // 타입 변경 + return transitsWriter; + } + + @Override + protected int getChunkSize() { + return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정 + } + + @Bean(name = "TransitsImportJob") + public Job transitsImportJob() { + return job(); + } + + @Bean(name = "TransitsImportStep") + public Step transitsImportStep() { + return step(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/dto/TransitsDto.java b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/dto/TransitsDto.java new file mode 100644 index 0000000..7dd2958 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/dto/TransitsDto.java @@ -0,0 +1,13 @@ +package com.snp.batch.jobs.shipMovementTransits.batch.dto; + +import lombok.Data; + +@Data +public class TransitsDto { + private String movementType; + private String imolRorIHSNumber; + private String movementDate; + private String facilityName; + private String facilityType; + private Double draught; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/entity/TransitsEntity.java b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/entity/TransitsEntity.java new file mode 100644 index 0000000..ddfe811 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/entity/TransitsEntity.java @@ -0,0 +1,21 @@ +package com.snp.batch.jobs.shipMovementTransits.batch.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class TransitsEntity { + private String movementType; + private String imolRorIHSNumber; + private LocalDateTime movementDate; + private String facilityName; + private String facilityType; + private Double draught; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/processor/TransitsProcessor.java b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/processor/TransitsProcessor.java new file mode 100644 index 0000000..8c7df92 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/processor/TransitsProcessor.java @@ -0,0 +1,47 @@ +package com.snp.batch.jobs.shipMovementTransits.batch.processor; + +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.shipMovementTransits.batch.dto.TransitsDto; +import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 선박 상세 정보 Processor + * ShipDetailDto → ShipDetailEntity 변환 + */ + +/** + * 선박 상세 정보 Processor (해시 비교 및 증분 데이터 추출) + * I: ShipDetailComparisonData (DB 해시 + API Map Data) + * O: ShipDetailUpdate (변경분) + */ +@Slf4j +@Component +public class TransitsProcessor extends BaseProcessor { + +// private final ObjectMapper objectMapper; + +// public TransitsProcessor(ObjectMapper objectMapper) { +// this.objectMapper = objectMapper; +// } + + @Override + protected TransitsEntity processItem(TransitsDto dto) throws Exception { + log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", + dto.getImolRorIHSNumber(), dto.getFacilityName()); + + TransitsEntity entity = TransitsEntity.builder() + .movementType(dto.getMovementType()) + .imolRorIHSNumber(dto.getImolRorIHSNumber()) + .movementDate(LocalDateTime.parse(dto.getMovementDate())) + .facilityName(dto.getFacilityName()) + .facilityType(dto.getFacilityType()) + .draught(dto.getDraught()) + .build(); + return entity; + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/reader/TransitsReader.java b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/reader/TransitsReader.java new file mode 100644 index 0000000..375b14f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/reader/TransitsReader.java @@ -0,0 +1,211 @@ +package com.snp.batch.jobs.shipMovementTransits.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.shipMovementTransits.batch.dto.TransitsDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * 선박 상세 정보 Reader (v2.0 - Chunk 기반) + * + * 기능: + * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) + * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 + * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 + * 4. Spring Batch가 100건씩 Process → Write 수행 + * + * Chunk 처리 흐름: + * - beforeFetch() → IMO 전체 조회 (1회) + * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) + * - read() → 1건씩 반환 (100번) + * - Processor/Writer → 100건 처리 + * - 반복... (1,718번의 Chunk) + * + * 기존 방식과의 차이: + * - 기존: 17만건 전체 메모리 로드 → Process → Write + * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) + */ +@Slf4j +@StepScope +public class TransitsReader extends BaseApiReader { + + private final JdbcTemplate jdbcTemplate; + + // 배치 처리 상태 + private List allImoNumbers; + // DB 해시값을 저장할 맵 + private int currentBatchIndex = 0; + private final int batchSize = 5; + + // @Value("#{jobParameters['startDate']}") +// private String startDate; + private String startDate = "2025-01-01"; + + // @Value("#{jobParameters['stopDate']}") +// private String stopDate; + private String stopDate = "2025-12-31"; + + public TransitsReader(WebClient webClient, JdbcTemplate jdbcTemplate ) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "Transits"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + } + + @Override + protected String getApiPath() { + return "/Movements/Transits"; + } + + @Override + protected String getApiBaseUrl() { + return "https://webservices.maritime.spglobal.com"; + } + + private static final String GET_ALL_IMO_QUERY = +// "SELECT imo_number FROM ship_data ORDER BY id"; + "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_transit) ORDER BY imo_number"; + + /** + * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 + */ + @Override + protected void beforeFetch() { + // 전처리 과정 + // Step 1. IMO 전체 번호 조회 + log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + // API 통계 초기화 + updateApiCallStats(totalBatches, 0); + } + + /** + * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 + * + * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 + * + * @return 다음 배치 100건 (더 이상 없으면 null) + */ + @Override + protected List fetchNextBatch() throws Exception { + + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + List response = callApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + + // 응답 처리 + if (response != null ) { + List transits = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, transits.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return transits; + + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + /** + * Query Parameter를 사용한 API 호출 + * + * @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") + * @return API 응답 + */ + private List callApiWithBatch(String lrno) { + String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno; + + log.debug("[{}] API 호출: {}", getReaderName(), url); + + return webClient.get() + .uri(url) + .retrieve() + .bodyToFlux(TransitsDto.class) + .collectList() + .block(); + } + + @Override + protected void afterFetch(List data) { + if (data == null) { + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); + log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", + getReaderName(), allImoNumbers.size()); + } + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/repository/TransitlsRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/repository/TransitlsRepositoryImpl.java new file mode 100644 index 0000000..f44e00a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/repository/TransitlsRepositoryImpl.java @@ -0,0 +1,108 @@ +package com.snp.batch.jobs.shipMovementTransits.batch.repository; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.Timestamp; +import java.util.List; + +/** + * 선박 상세 정보 Repository 구현체 + * BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현 + */ +@Slf4j +@Repository("TransitsRepository") +public class TransitlsRepositoryImpl extends BaseJdbcRepository + implements TransitsRepository { + + public TransitlsRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override + protected String getTableName() { + return "snp_data.t_transit"; + } + + @Override + protected String getEntityName() { + return "Transit"; + } + + @Override + protected String extractId(TransitsEntity entity) { + return entity.getImolRorIHSNumber(); + } + + @Override + public String getInsertSql() { + return """ + INSERT INTO snp_data.t_transit( + imo, + mvmn_type, + mvmn_dt, + fclty_nm, + fclty_type, + draft + ) VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (imo, mvmn_dt) + DO UPDATE SET + mvmn_type = EXCLUDED.mvmn_type, + mvmn_dt = EXCLUDED.mvmn_dt, + fclty_nm = EXCLUDED.fclty_nm, + fclty_type = EXCLUDED.fclty_type, + draft = EXCLUDED.draft + """; + } + + @Override + protected String getUpdateSql() { + return null; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, TransitsEntity e) throws Exception { + int i = 1; + ps.setString(i++, e.getImolRorIHSNumber()); // imo + ps.setString(i++, e.getMovementType()); // mvmn_type + ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt + ps.setString(i++, e.getFacilityName()); // fclty_nm + ps.setString(i++, e.getFacilityType()); // fclty_type + setDoubleOrNull(ps, i++, e.getDraught()); // draft + } + + private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception { + if (value != null) { + ps.setDouble(index, value); + } else { + // java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정 + ps.setNull(index, java.sql.Types.DOUBLE); + } + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, TransitsEntity entity) throws Exception { + + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + public void saveAll(List entities) { + if (entities == null || entities.isEmpty()) return; + + log.info("Transits 저장 시작 = {}건", entities.size()); + batchInsert(entities); + + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/repository/TransitsRepository.java b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/repository/TransitsRepository.java new file mode 100644 index 0000000..e134548 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/repository/TransitsRepository.java @@ -0,0 +1,13 @@ +package com.snp.batch.jobs.shipMovementTransits.batch.repository; + +import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity; + +import java.util.List; + +/** + * 선박 상세 정보 Repository 인터페이스 + */ + +public interface TransitsRepository { + void saveAll(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/writer/TransitsWriter.java b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/writer/TransitsWriter.java new file mode 100644 index 0000000..2e72d53 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTransits/batch/writer/TransitsWriter.java @@ -0,0 +1,35 @@ +package com.snp.batch.jobs.shipMovementTransits.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity; +import com.snp.batch.jobs.shipMovementTransits.batch.repository.TransitsRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 선박 상세 정보 Writer + */ +@Slf4j +@Component +public class TransitsWriter extends BaseWriter { + + private final TransitsRepository transitsRepository; + + + public TransitsWriter(TransitsRepository transitsRepository) { + super("Transits"); + this.transitsRepository = transitsRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + + if (items.isEmpty()) { return; } + + transitsRepository.saveAll(items); + log.info("Transits 데이터 저장: {} 건", items.size()); + } + +}