From 094bd13e36033cea79c8a1e4ff92840007ac9b57 Mon Sep 17 00:00:00 2001 From: hyojin kim Date: Fri, 21 Nov 2025 16:25:17 +0900 Subject: [PATCH] =?UTF-8?q?:sparkles:=20Core20=20:=20AIS=20=EC=8B=A0?= =?UTF-8?q?=ED=98=B8=20=EB=8D=B0=EC=9D=B4=ED=84=B0=20=EC=97=85=EB=8D=B0?= =?UTF-8?q?=EC=9D=B4=ED=8A=B8=20Job?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/common/util/SafeGetDataUtil.java | 33 ++++ .../config/MaritimeApiWebClientConfig.java | 19 ++ .../ShipLastPositionUpdateJobConfig.java | 85 ++++++++ .../shipdetail/batch/dto/AisApiResponse.java | 42 ++++ .../batch/dto/TargetEnhancedDto.java | 185 ++++++++++++++++++ .../batch/entity/TargetEnhancedEntity.java | 128 ++++++++++++ .../ShipLastPositionDataProcessor.java | 83 ++++++++ .../reader/ShipLastPositionDataReader.java | 150 ++++++++++++++ .../ShipLastPositionRepository.java | 9 + .../ShipLastPositionRepositoryImpl.java | 98 ++++++++++ .../writer/ShipLastPositionDataWriter.java | 24 +++ 11 files changed, 856 insertions(+) create mode 100644 src/main/java/com/snp/batch/common/util/SafeGetDataUtil.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipLastPositionUpdateJobConfig.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/AisApiResponse.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/TargetEnhancedDto.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/entity/TargetEnhancedEntity.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/processor/ShipLastPositionDataProcessor.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepository.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepositoryImpl.java create mode 100644 src/main/java/com/snp/batch/jobs/shipdetail/batch/writer/ShipLastPositionDataWriter.java diff --git a/src/main/java/com/snp/batch/common/util/SafeGetDataUtil.java b/src/main/java/com/snp/batch/common/util/SafeGetDataUtil.java new file mode 100644 index 0000000..ce80e53 --- /dev/null +++ b/src/main/java/com/snp/batch/common/util/SafeGetDataUtil.java @@ -0,0 +1,33 @@ +package com.snp.batch.common.util; + +public class SafeGetDataUtil { + private String safeGetString(String value) { + if (value == null || value.trim().isEmpty()) { + return null; + } + return value.trim(); + } + + private Double safeGetDouble(String value) { + if (value == null || value.trim().isEmpty()) { + return null; + } + try { + return Double.parseDouble(value); + } catch (NumberFormatException e) { + return null; + } + } + + private Long safeGetLong(String value) { + if (value == null || value.trim().isEmpty()) { + return null; + } + try { + return Long.parseLong(value.trim()); + } catch (NumberFormatException e) { + return null; + } + } + +} diff --git a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java index 2a3a6c3..3f5260a 100644 --- a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java +++ b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java @@ -29,6 +29,9 @@ public class MaritimeApiWebClientConfig { @Value("${app.batch.ship-api.url}") private String maritimeApiUrl; + @Value("https://aisapi.maritime.spglobal.com") + private String maritimeAisApiUrl; + @Value("${app.batch.ship-api.username}") private String maritimeApiUsername; @@ -60,6 +63,22 @@ public class MaritimeApiWebClientConfig { .maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 .build(); } + + @Bean(name = "maritimeAisApiWebClient") + public WebClient maritimeAisApiWebClient(){ + log.info("========================================"); + log.info("Maritime AIS API WebClient 생성"); + log.info("Base URL: {}", maritimeAisApiUrl); + log.info("========================================"); + + return WebClient.builder() + .baseUrl(maritimeAisApiUrl) + .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) + .codecs(configurer -> configurer + .defaultCodecs() + .maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipLastPositionUpdateJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipLastPositionUpdateJobConfig.java new file mode 100644 index 0000000..7807c7f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipLastPositionUpdateJobConfig.java @@ -0,0 +1,85 @@ +package com.snp.batch.jobs.shipdetail.batch.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto; +import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; +import com.snp.batch.jobs.shipdetail.batch.processor.ShipLastPositionDataProcessor; +import com.snp.batch.jobs.shipdetail.batch.reader.ShipLastPositionDataReader; +import com.snp.batch.jobs.shipdetail.batch.writer.ShipLastPositionDataWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; +@Slf4j +@Configuration +public class ShipLastPositionUpdateJobConfig extends BaseJobConfig { + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeAisApiWebClient; + + private final ShipLastPositionDataProcessor shipLastPositionDataProcessor; + + private final ShipLastPositionDataWriter shipLastPositionDataWriter; + + @Override + protected int getChunkSize() { + return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정 + } + public ShipLastPositionUpdateJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + ShipLastPositionDataProcessor shipLastPositionDataProcessor, + ShipLastPositionDataWriter shipLastPositionDataWriter, + JdbcTemplate jdbcTemplate, + @Qualifier("maritimeAisApiWebClient")WebClient maritimeAisApiWebClient) { + super(jobRepository, transactionManager); + this.jdbcTemplate = jdbcTemplate; + this.maritimeAisApiWebClient = maritimeAisApiWebClient; + this.shipLastPositionDataProcessor = shipLastPositionDataProcessor; + this.shipLastPositionDataWriter = shipLastPositionDataWriter; + } + + @Override + protected String getJobName() { + return "shipLastPositionUpdateJob"; + } + + @Override + protected String getStepName() { + return "shipLastPositionUpdateStep"; + } + + @Override + protected ItemReader createReader() { + return new ShipLastPositionDataReader(maritimeAisApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return shipLastPositionDataProcessor; + } + + @Override + protected ItemWriter createWriter() { + return shipLastPositionDataWriter; + } + + @Bean(name = "shipLastPositionUpdateJob") + public Job shipLastPositionUpdateJob() { + return job(); + } + + @Bean(name = "shipLastPositionUpdateStep") + public Step shipLastPositionUpdateStep() { + return step(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/AisApiResponse.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/AisApiResponse.java new file mode 100644 index 0000000..6af9b7d --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/AisApiResponse.java @@ -0,0 +1,42 @@ +package com.snp.batch.jobs.shipdetail.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * Maritime API GetShipsByIHSLRorIMONumbers 응답 래퍼 + * + * API 응답 구조: + * { + * "shipCount": 5, + * "Ships": [...] + * } + * + * Maritime API GetShipsByIHSLRorIMONumbersAll 응답 래퍼 + * + * API 응답 구조: + * { + * "shipCount": 5, + * "ShipResult": [...], + * "APSStatus": {...} + * } + * + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class AisApiResponse { + + @JsonProperty("ShipResult") + private List shipResult; + + @JsonProperty("targetEnhancedArr") + private List targetEnhancedArr; + +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/TargetEnhancedDto.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/TargetEnhancedDto.java new file mode 100644 index 0000000..a739d74 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/dto/TargetEnhancedDto.java @@ -0,0 +1,185 @@ +package com.snp.batch.jobs.shipdetail.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TargetEnhancedDto { + + @JsonProperty("DWT") + private Integer dwt; + + @JsonProperty("STAT5CODE") + private String stat5Code; + + @JsonProperty("Source") + private String source; + + @JsonProperty("CoG") + private Double cog; + + @JsonProperty("LastStaticUpdateReceived") + private String lastStaticUpdateReceived; + + @JsonProperty("ZoneId") + private Integer zoneId; + + @JsonProperty("LPCCode") + private String lpcCode; + + @JsonProperty("TonnesCargo") + private Integer tonnesCargo; + + @JsonProperty("StationId") + private String stationId; + + @JsonProperty("InSTS") + private Object inSts; // JSON에서 null이므로 Object로 처리 + + @JsonProperty("ImoVerified") + private String imoVerified; + + @JsonProperty("OnBerth") + private Boolean onBerth; + + @JsonProperty("DestinationTidied") + private String destinationTidied; + + @JsonProperty("Anomalous") + private Object anomalous; // JSON에서 null이므로 Object로 처리 + + @JsonProperty("MessageType") + private Object messageType; // JSON에서 null이므로 Object로 처리 + + @JsonProperty("DestinationPortID") + private Integer destinationPortID; + + @JsonProperty("DestinationUNLOCODE") + private String destinationUnlocode; + + @JsonProperty("MMSI") + private Integer mmsi; + + @JsonProperty("IMO") + private Integer imo; + + @JsonProperty("AgeMinutes") + private Double ageMinutes; + + @JsonProperty("Lat") + private Double lat; + + @JsonProperty("Lon") + private Double lon; + + @JsonProperty("Heading") + private Double heading; + + @JsonProperty("SoG") + private Double sog; + + @JsonProperty("Width") + private Integer width; + + @JsonProperty("Length") + private Integer length; + + @JsonProperty("Draught") + private Double draught; + + @JsonProperty("Name") + private String name; + + @JsonProperty("Callsign") + private String callsign; + + @JsonProperty("Destination") + private String destination; + + @JsonProperty("ETA") + private String eta; + + @JsonProperty("Status") + private String status; + + @JsonProperty("VesselType") + private String vesselType; + + @JsonProperty("ExtraInfo") + private String extraInfo; + + @JsonProperty("PositionAccuracy") + private Integer positionAccuracy; + + @JsonProperty("RoT") + private Integer rot; + + @JsonProperty("TimestampUTC") + private Integer timestampUtc; + + @JsonProperty("RepeatIndicator") + private Integer repeatIndicator; + + @JsonProperty("RAIMFlag") + private Integer raimFlag; + + @JsonProperty("RadioStatus") + private Integer radioStatus; + + @JsonProperty("Regional") + private Integer regional; + + @JsonProperty("Regional2") + private Integer regional2; + + @JsonProperty("Spare") + private Integer spare; + + @JsonProperty("Spare2") + private Integer spare2; + + @JsonProperty("AISVersion") + private Integer aisVersion; + + @JsonProperty("PositionFixType") + private Integer positionFixType; + + @JsonProperty("DTE") + private Integer dte; + + @JsonProperty("BandFlag") + private Integer bandFlag; + + @JsonProperty("ReceivedDate") + private String receivedDate; + + @JsonProperty("MessageTimestamp") + private String messageTimestamp; + + @JsonProperty("LengthBow") + private Integer lengthBow; + + @JsonProperty("LengthStern") + private Integer lengthStern; + + @JsonProperty("WidthPort") + private Integer widthPort; + + @JsonProperty("WidthStarboard") + private Integer widthStarboard; + + // Getters and Setters (생략 - 필요에 따라 추가) + // 예시: + public Integer getDwt() { + return dwt; + } + public void setDwt(Integer dwt) { + this.dwt = dwt; + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/entity/TargetEnhancedEntity.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/entity/TargetEnhancedEntity.java new file mode 100644 index 0000000..2689a12 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/entity/TargetEnhancedEntity.java @@ -0,0 +1,128 @@ +package com.snp.batch.jobs.shipdetail.batch.entity; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.snp.batch.common.batch.entity.BaseEntity; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class TargetEnhancedEntity extends BaseEntity { + + private Integer dwt; + + private String stat5Code; + + private String source; + + private Double cog; + + private String lastStaticUpdateReceived; + + private Integer zoneId; + + private String lpcCode; + + private Integer tonnesCargo; + + private String stationId; + + private Object inSts; // JSON에서 null이므로 Object로 처리 + + private String imoVerified; + + private Boolean onBerth; + + private String destinationTidied; + + private Object anomalous; // JSON에서 null이므로 Object로 처리 + + private Object messageType; // JSON에서 null이므로 Object로 처리 + + private Integer destinationPortID; + + private String destinationUnlocode; + + private Integer mmsi; + + private Integer imo; + + private Double ageMinutes; + + private Double lat; + + private Double lon; + + private Double heading; + + private Double sog; + + private Integer width; + + private Integer length; + + private Double draught; + + private String name; + + private String callsign; + + private String destination; + + private String eta; + + private String status; + + private String vesselType; + + private String extraInfo; + + private Integer positionAccuracy; + + private Integer rot; + + private Integer timestampUtc; + + private Integer repeatIndicator; + + private Integer raimFlag; + + private Integer radioStatus; + + private Integer regional; + + private Integer regional2; + + private Integer spare; + + private Integer spare2; + + private Integer aisVersion; + + private Integer positionFixType; + + private Integer dte; + + private Integer bandFlag; + + private String receivedDate; + + private String messageTimestamp; + + private Integer lengthBow; + + private Integer lengthStern; + + private Integer widthPort; + + private Integer widthStarboard; + +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/processor/ShipLastPositionDataProcessor.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/processor/ShipLastPositionDataProcessor.java new file mode 100644 index 0000000..f3dc3ef --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/processor/ShipLastPositionDataProcessor.java @@ -0,0 +1,83 @@ +package com.snp.batch.jobs.shipdetail.batch.processor; + +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto; +import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class ShipLastPositionDataProcessor extends BaseProcessor { + @Override + protected TargetEnhancedEntity processItem(TargetEnhancedDto dto) throws Exception { + log.debug("AIS 최신 항적 데이터 처리 시작: imoNumber={}, shipName={}", + dto.getImoVerified(), dto.getName()); + + TargetEnhancedEntity entity = TargetEnhancedEntity.builder() + .dwt(dto.getDwt()) + .stat5Code(dto.getStat5Code()) + .source(dto.getSource()) + .lpcCode(dto.getLpcCode()) + .tonnesCargo(dto.getTonnesCargo()) + .imoVerified(dto.getImoVerified()) + .lastStaticUpdateReceived(dto.getLastStaticUpdateReceived()) // Entity 필드 타입이 String인 경우 + // Position and Movement Data + .cog(dto.getCog()) + .zoneId(dto.getZoneId()) + .stationId(dto.getStationId()) + .onBerth(dto.getOnBerth()) + .mmsi(dto.getMmsi()) + .imo(dto.getImo()) + .ageMinutes(dto.getAgeMinutes()) + .lat(dto.getLat()) + .lon(dto.getLon()) + .heading(dto.getHeading()) + .sog(dto.getSog()) + .positionAccuracy(dto.getPositionAccuracy()) + .rot(dto.getRot()) + .timestampUtc(dto.getTimestampUtc()) + .status(dto.getStatus()) + // Dimensions and Draught + .width(dto.getWidth()) + .length(dto.getLength()) + .draught(dto.getDraught()) + .lengthBow(dto.getLengthBow()) + .lengthStern(dto.getLengthStern()) + .widthPort(dto.getWidthPort()) + .widthStarboard(dto.getWidthStarboard()) + // Voyage Data + .name(dto.getName()) + .callsign(dto.getCallsign()) + .destination(dto.getDestination()) + .destinationTidied(dto.getDestinationTidied()) + .destinationPortID(dto.getDestinationPortID()) + .destinationUnlocode(dto.getDestinationUnlocode()) + .eta(dto.getEta()) // Entity 필드 타입이 String인 경우 + .vesselType(dto.getVesselType()) + .extraInfo(dto.getExtraInfo()) + // AIS Metadata Fields + .repeatIndicator(dto.getRepeatIndicator()) + .raimFlag(dto.getRaimFlag()) + .radioStatus(dto.getRadioStatus()) + .regional(dto.getRegional()) + .regional2(dto.getRegional2()) + .spare(dto.getSpare()) + .spare2(dto.getSpare2()) + .aisVersion(dto.getAisVersion()) + .positionFixType(dto.getPositionFixType()) + .dte(dto.getDte()) + .bandFlag(dto.getBandFlag()) + .receivedDate(dto.getReceivedDate()) // Entity 필드 타입이 String인 경우 + .messageTimestamp(dto.getMessageTimestamp()) // Entity 필드 타입이 String인 경우 + // Null/Object Fields + .inSts(dto.getInSts()) + .anomalous(dto.getAnomalous()) + .messageType(dto.getMessageType()) + .build(); + + log.debug("AIS 최신 항적 데이터 처리 완료: imoNumber={}", dto.getImoVerified()); + + return entity; + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java new file mode 100644 index 0000000..44c3247 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java @@ -0,0 +1,150 @@ +package com.snp.batch.jobs.shipdetail.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.shipdetail.batch.dto.AisApiResponse; +import com.snp.batch.jobs.shipdetail.batch.dto.TargetEnhancedDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.*; +@Slf4j +public class ShipLastPositionDataReader extends BaseApiReader { + + //TODO : + // 1. Core20 IMO_NUMBER 전체 조회 + // 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복) + // 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복) + + private final JdbcTemplate jdbcTemplate; + + private List allImoNumbers; + private int currentBatchIndex = 0; + private final int batchSize = 5000; + + public ShipLastPositionDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "ShipLastPositionDataReader"; + } + + @Override + protected String getApiPath() { + return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced"; + } + + @Override + protected String getApiBaseUrl() { + return "https://aisapi.maritime.spglobal.com"; + } + + private String getTargetTable(){ + return "test_s_p.test_core20"; + } + private String GET_CORE_IMO_LIST = + "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno"; + + @Override + protected void beforeFetch(){ + log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_CORE_IMO_LIST, String.class); + + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + updateApiCallStats(totalBatches, 0); + } + + @Override + protected List fetchNextBatch() throws Exception { + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + AisApiResponse response = callAisApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + // 응답 처리 + if (response != null && response.getTargetEnhancedArr() != null) { + List targets = response.getTargetEnhancedArr(); + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, targets.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return targets; + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + private AisApiResponse callAisApiWithBatch(String imoNumbers) { + String url = getApiPath(); + + Map requestBody = Map.of("imo", imoNumbers); + + log.debug("[{}] API 호출: {}", getReaderName(), url); + log.debug("[{}] : {}", "requestBody", requestBody.toString()); + + return webClient.post() + .uri(url) + .bodyValue(requestBody) + .retrieve() + .bodyToMono(AisApiResponse.class) + .block(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepository.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepository.java new file mode 100644 index 0000000..7630460 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepository.java @@ -0,0 +1,9 @@ +package com.snp.batch.jobs.shipdetail.batch.repository; + +import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; + +import java.util.List; + +public interface ShipLastPositionRepository { + void saveLastPositionAll(List items); +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepositoryImpl.java new file mode 100644 index 0000000..1380d54 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipLastPositionRepositoryImpl.java @@ -0,0 +1,98 @@ +package com.snp.batch.jobs.shipdetail.batch.repository; + +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.PreparedStatement; +import java.util.List; + +@Slf4j +@Repository("shipLastPositionRepository") +public class ShipLastPositionRepositoryImpl extends BaseJdbcRepository implements ShipLastPositionRepository { + public ShipLastPositionRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + + @Override + protected String getTableName() { + return null; + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + protected Long extractId(TargetEnhancedEntity entity) { + return null; + } + + @Override + protected String getInsertSql() { + return null; + } + + @Override + protected String getUpdateSql() { + return """ + UPDATE test_s_p.test_core20 + SET lastseen = ?::timestamptz, + lastport = ?, + position_latitude = ?, + position_longitude = ?, + destination = ?, + eta = ?::timestamptz, + heading = ?, + batch_flag = 'N' + WHERE ihslrorimoshipno = ?; + """; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception { + + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, TargetEnhancedEntity entity) throws Exception { + int idx = 1; + ps.setString(idx++, entity.getReceivedDate()); // receivedDate : lastseen (Datetime) + ps.setString(idx++, entity.getLpcCode()); // lpcCode : lastport (String) + ps.setDouble(idx++, entity.getLat()); // lat : position_latitude (Double) + ps.setDouble(idx++, entity.getLon()); // lon : position_longitude (Double) + ps.setString(idx++, entity.getDestination()); // destination : destination (String) + ps.setString(idx++, entity.getEta()); // eta : eta (Datetime) + ps.setDouble(idx++, entity.getHeading()); // heading : heading (Double) + ps.setString(idx++, entity.getImoVerified()); // imoVerified : imo (String) + } + + @Override + protected String getEntityName() { + return "TargetEnhancedEntity"; + } + + @Override + @Transactional + public void saveLastPositionAll(List items) { + if (items == null || items.isEmpty()) { + return; + } + jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(), + (ps, entity) -> { + try { + setUpdateParameters(ps, entity); + } catch (Exception e) { + log.error("배치 수정 파라미터 설정 실패", e); + throw new RuntimeException(e); + } + }); + + log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size()); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/writer/ShipLastPositionDataWriter.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/writer/ShipLastPositionDataWriter.java new file mode 100644 index 0000000..5c5ef33 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/writer/ShipLastPositionDataWriter.java @@ -0,0 +1,24 @@ +package com.snp.batch.jobs.shipdetail.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.shipdetail.batch.entity.TargetEnhancedEntity; +import com.snp.batch.jobs.shipdetail.batch.repository.ShipLastPositionRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; +@Slf4j +@Component +public class ShipLastPositionDataWriter extends BaseWriter { + + private final ShipLastPositionRepository shipLastPositionRepository; + public ShipLastPositionDataWriter(ShipLastPositionRepository shipLastPositionRepository) { + super("ShipLastPosition"); + this.shipLastPositionRepository = shipLastPositionRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + shipLastPositionRepository.saveLastPositionAll(items); + } +}