diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/config/AnchorageCallsJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/config/AnchorageCallsJobConfig.java new file mode 100644 index 0000000..77a35d7 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/config/AnchorageCallsJobConfig.java @@ -0,0 +1,115 @@ +package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto; +import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; +import com.snp.batch.jobs.shipMovement.batch.processor.ShipMovementProcessor; +import com.snp.batch.jobs.shipMovement.batch.reader.ShipMovementReader; +import com.snp.batch.jobs.shipMovement.batch.writer.ShipMovementWriter; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.processor.AnchorageCallsProcessor; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.reader.AnchorageCallsReader; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.writer.AnchorageCallsWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +/** + * 선박 상세 정보 Import Job Config + * + * 특징: + * - ship_data 테이블에서 IMO 번호 조회 + * - IMO 번호를 100개씩 배치로 분할 + * - Maritime API GetShipsByIHSLRorIMONumbers 호출 + * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 + * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) + * + * 데이터 흐름: + * ShipMovementReader (ship_data → Maritime API) + * ↓ (PortCallDto) + * ShipMovementProcessor + * ↓ (ShipMovementEntity) + * ShipDetailDataWriter + * ↓ (ship_movement 테이블) + */ + +@Slf4j +@Configuration +public class AnchorageCallsJobConfig extends BaseJobConfig { + + private final AnchorageCallsProcessor anchorageCallsProcessor; + private final AnchorageCallsWriter anchorageCallsWriter; + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeApiWebClient; + + public AnchorageCallsJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + AnchorageCallsProcessor anchorageCallsProcessor, + AnchorageCallsWriter anchorageCallsWriter, JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient + ) { // ObjectMapper 주입 추가 + super(jobRepository, transactionManager); + this.anchorageCallsProcessor = anchorageCallsProcessor; + this.anchorageCallsWriter = anchorageCallsWriter; + this.jdbcTemplate = jdbcTemplate; + this.maritimeApiWebClient = maritimeApiWebClient; + } + + @Override + protected String getJobName() { + return "AnchorageCallsImportJob"; + } + + @Override + protected String getStepName() { + return "AnchorageCallsImportStep"; + } + + @Override + protected ItemReader createReader() { // 타입 변경 + return new AnchorageCallsReader(maritimeApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return anchorageCallsProcessor; + } + + @Override + protected ItemWriter createWriter() { // 타입 변경 + return anchorageCallsWriter; + } + + @Override + protected int getChunkSize() { + return 50; // API에서 100개씩 가져오므로 chunk도 100으로 설정 + } + + @Bean(name = "AnchorageCallsImportJob") + public Job anchorageCallsImportJob() { + return job(); + } + + @Bean(name = "AnchorageCallsImportStep") + public Step anchorageCallsImportStep() { + return step(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/dto/AnchorageCallsDto.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/dto/AnchorageCallsDto.java new file mode 100644 index 0000000..cd26678 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/dto/AnchorageCallsDto.java @@ -0,0 +1,33 @@ +package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto; + +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +public class AnchorageCallsDto { + private String movementType; + private String imolRorIHSNumber; + private String movementDate; + + private Integer portCallId; + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer subFacilityId; + private String subFacilityName; + private String subFacilityType; + + private String countryCode; + private String countryName; + + private Double draught; + private Double latitude; + private Double longitude; + + private AnchorageCallsPositionDto position; + + private String destination; + private String iso2; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/dto/AnchorageCallsPositionDto.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/dto/AnchorageCallsPositionDto.java new file mode 100644 index 0000000..23d3613 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/dto/AnchorageCallsPositionDto.java @@ -0,0 +1,19 @@ +package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class AnchorageCallsPositionDto { + private Boolean isNull; + private Integer stSrid; + private Double lat; + @JsonProperty("long") + private Double lon; + private Double z; + private Double m; + private Boolean hasZ; + private Boolean hasM; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/entity/AnchorageCallsEntity.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/entity/AnchorageCallsEntity.java new file mode 100644 index 0000000..70aaad8 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/entity/AnchorageCallsEntity.java @@ -0,0 +1,47 @@ +package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity; + +import com.fasterxml.jackson.databind.JsonNode; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.SequenceGenerator; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class AnchorageCallsEntity { + + private Long id; + + private String movementType; + private String imolRorIHSNumber; + private LocalDateTime movementDate; + + private Integer portCallId; + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer subFacilityId; + private String subFacilityName; + private String subFacilityType; + + private String countryCode; + private String countryName; + + private Double draught; + private Double latitude; + private Double longitude; + + private JsonNode position; + + private String destination; + private String iso2; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/processor/AnchorageCallsProcessor.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/processor/AnchorageCallsProcessor.java new file mode 100644 index 0000000..ee03a7e --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/processor/AnchorageCallsProcessor.java @@ -0,0 +1,68 @@ +package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 선박 상세 정보 Processor + * ShipDetailDto → ShipDetailEntity 변환 + */ + +/** + * 선박 상세 정보 Processor (해시 비교 및 증분 데이터 추출) + * I: ShipDetailComparisonData (DB 해시 + API Map Data) + * O: ShipDetailUpdate (변경분) + */ +@Slf4j +@Component +public class AnchorageCallsProcessor extends BaseProcessor { + + private final ObjectMapper objectMapper; + + public AnchorageCallsProcessor(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + protected AnchorageCallsEntity processItem(AnchorageCallsDto dto) throws Exception { + log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", + dto.getImolRorIHSNumber(), dto.getFacilityName()); + + JsonNode positionNode = null; + if (dto.getPosition() != null) { + // Position 객체를 JsonNode로 변환 + positionNode = objectMapper.valueToTree(dto.getPosition()); + } + + AnchorageCallsEntity entity = AnchorageCallsEntity.builder() + .movementType(dto.getMovementType()) + .imolRorIHSNumber(dto.getImolRorIHSNumber()) + .movementDate(LocalDateTime.parse(dto.getMovementDate())) + .portCallId(dto.getPortCallId()) + .facilityId(dto.getFacilityId()) + .facilityName(dto.getFacilityName()) + .facilityType(dto.getFacilityType()) + .subFacilityId(dto.getSubFacilityId()) + .subFacilityName(dto.getSubFacilityName()) + .subFacilityType(dto.getSubFacilityType()) + .countryCode(dto.getCountryCode()) + .countryName(dto.getCountryName()) + .draught(dto.getDraught()) + .latitude(dto.getLatitude()) + .longitude(dto.getLongitude()) + .destination(dto.getDestination()) + .iso2(dto.getIso2()) + .position(positionNode) // JsonNode로 매핑 + .build(); + + return entity; + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/reader/AnchorageCallsReader.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/reader/AnchorageCallsReader.java new file mode 100644 index 0000000..9c3f782 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/reader/AnchorageCallsReader.java @@ -0,0 +1,216 @@ +package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * 선박 상세 정보 Reader (v2.0 - Chunk 기반) + * + * 기능: + * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) + * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 + * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 + * 4. Spring Batch가 100건씩 Process → Write 수행 + * + * Chunk 처리 흐름: + * - beforeFetch() → IMO 전체 조회 (1회) + * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) + * - read() → 1건씩 반환 (100번) + * - Processor/Writer → 100건 처리 + * - 반복... (1,718번의 Chunk) + * + * 기존 방식과의 차이: + * - 기존: 17만건 전체 메모리 로드 → Process → Write + * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) + */ +@Slf4j +@StepScope +public class AnchorageCallsReader extends BaseApiReader { + + private final JdbcTemplate jdbcTemplate; + + // 배치 처리 상태 + private List allImoNumbers; + // DB 해시값을 저장할 맵 + private Map dbMasterHashes; + private int currentBatchIndex = 0; + private final int batchSize = 5; + + // @Value("#{jobParameters['startDate']}") +// private String startDate; + private String startDate = "2025-01-01"; + + // @Value("#{jobParameters['stopDate']}") +// private String stopDate; + private String stopDate = "2025-12-31"; + + public AnchorageCallsReader(WebClient webClient, JdbcTemplate jdbcTemplate ) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "AnchorageCallsReader"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + this.dbMasterHashes = null; + } + + @Override + protected String getApiPath() { + return "/Movements/AnchorageCalls"; + } + + @Override + protected String getApiBaseUrl() { + return "https://webservices.maritime.spglobal.com"; + } + + private static final String GET_ALL_IMO_QUERY = +// "SELECT imo_number FROM ship_data ORDER BY id"; + "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_anchoragecall) ORDER BY imo_number"; + + private static final String FETCH_ALL_HASHES_QUERY = + "SELECT imo_number, ship_detail_hash FROM ship_detail_hash_json ORDER BY imo_number"; + + /** + * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 + */ + @Override + protected void beforeFetch() { + // 전처리 과정 + // Step 1. IMO 전체 번호 조회 + log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + // API 통계 초기화 + updateApiCallStats(totalBatches, 0); + } + + /** + * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 + * + * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 + * + * @return 다음 배치 100건 (더 이상 없으면 null) + */ + @Override + protected List fetchNextBatch() throws Exception { + + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + List response = callApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + + // 응답 처리 + if (response != null ) { + List anchorageCalls = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, anchorageCalls.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return anchorageCalls; + + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + /** + * Query Parameter를 사용한 API 호출 + * + * @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") + * @return API 응답 + */ + private List callApiWithBatch(String lrno) { + String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno; + + log.debug("[{}] API 호출: {}", getReaderName(), url); + + return webClient.get() + .uri(url) + .retrieve() + .bodyToFlux(AnchorageCallsDto.class) + .collectList() + .block(); + } + + @Override + protected void afterFetch(List data) { + if (data == null) { + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); + log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", + getReaderName(), allImoNumbers.size()); + } + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/repository/AnchorageCallsRepository.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/repository/AnchorageCallsRepository.java new file mode 100644 index 0000000..5bcfa85 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/repository/AnchorageCallsRepository.java @@ -0,0 +1,14 @@ +package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository; + +import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; + +import java.util.List; + +/** + * 선박 상세 정보 Repository 인터페이스 + */ + +public interface AnchorageCallsRepository { + void saveAll(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/repository/AnchorageCallsRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/repository/AnchorageCallsRepositoryImpl.java new file mode 100644 index 0000000..0e805d0 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/repository/AnchorageCallsRepositoryImpl.java @@ -0,0 +1,201 @@ +package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; +import com.snp.batch.jobs.shipMovement.batch.repository.ShipMovementRepository; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; + +/** + * 선박 상세 정보 Repository 구현체 + * BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현 + */ +@Slf4j +@Repository("anchorageCallsRepository") +public class AnchorageCallsRepositoryImpl extends BaseJdbcRepository + implements AnchorageCallsRepository { + + public AnchorageCallsRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override + protected String getTableName() { + return "snp_data.t_anchoragecall"; + } + + @Override + protected String getEntityName() { + return "AnchorageCalls"; + } + + @Override + protected String extractId(AnchorageCallsEntity entity) { + return entity.getImolRorIHSNumber(); + } + + @Override + public String getInsertSql() { + return """ + INSERT INTO snp_data.t_anchoragecall( + imo, + mvmn_type, + mvmn_dt, + stpov_id, + fclty_id, + fclty_nm, + fclty_type, + lwrnk_fclty_id, + lwrnk_fclty_nm, + lwrnk_fclty_type, + ntn_cd, + ntn_nm, + draft, + lat, + lon, + dstn, + iso2_ntn_cd, + lcinfo + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (imo, mvmn_dt) + DO UPDATE SET + mvmn_type = EXCLUDED.mvmn_type, + mvmn_dt = EXCLUDED.mvmn_dt, + stpov_id = EXCLUDED.stpov_id, + fclty_id = EXCLUDED.fclty_id, + fclty_nm = EXCLUDED.fclty_nm, + fclty_type = EXCLUDED.fclty_type, + lwrnk_fclty_id = EXCLUDED.lwrnk_fclty_id, + lwrnk_fclty_nm = EXCLUDED.lwrnk_fclty_nm, + lwrnk_fclty_type = EXCLUDED.lwrnk_fclty_type, + ntn_cd = EXCLUDED.ntn_cd, + ntn_nm = EXCLUDED.ntn_nm, + draft = EXCLUDED.draft, + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + dstn = EXCLUDED.dstn, + iso2_ntn_cd = EXCLUDED.iso2_ntn_cd, + lcinfo = EXCLUDED.lcinfo + """; + } + + @Override + protected String getUpdateSql() { + return null; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, AnchorageCallsEntity e) throws Exception { + int i = 1; + ps.setString(i++, e.getImolRorIHSNumber()); // imo + ps.setString(i++, e.getMovementType()); // mvmn_type + ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt + ps.setObject(i++, e.getPortCallId()); // stpov_id + ps.setObject(i++, e.getFacilityId()); // fclty_id + ps.setString(i++, e.getFacilityName()); // fclty_nm + ps.setString(i++, e.getFacilityType()); // fclty_type + ps.setObject(i++, e.getSubFacilityId()); // lwrnk_fclty_id + ps.setString(i++, e.getSubFacilityName()); // lwrnk_fclty_nm + ps.setString(i++, e.getSubFacilityType()); // lwrnk_fclty_type + ps.setString(i++, e.getCountryCode()); // ntn_cd + ps.setString(i++, e.getCountryName()); // ntn_nm + setDoubleOrNull(ps, i++, e.getDraught()); // draft + setDoubleOrNull(ps, i++, e.getLatitude()); // lat + setDoubleOrNull(ps, i++, e.getLongitude());// lon + ps.setString(i++, e.getDestination()); // dstn + ps.setString(i++, e.getIso2()); // iso2_ntn_cd + + if (e.getPosition() != null) { + ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb) + } else { + ps.setNull(i++, java.sql.Types.OTHER); + } + +// ps.setString(i++, e.getSchemaType()); + + } + + private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception { + if (value != null) { + ps.setDouble(index, value); + } else { + // java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정 + ps.setNull(index, java.sql.Types.DOUBLE); + } + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, AnchorageCallsEntity entity) throws Exception { + + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + public void saveAll(List entities) { + if (entities == null || entities.isEmpty()) return; + + log.info("ShipMovement 저장 시작 = {}건", entities.size()); + batchInsert(entities); + + } + + + /** + * ShipDetailEntity RowMapper + */ + private static class AnchorageCallsRowMapper implements RowMapper { + @Override + public AnchorageCallsEntity mapRow(ResultSet rs, int rowNum) throws SQLException { + AnchorageCallsEntity entity = AnchorageCallsEntity.builder() + .id(rs.getLong("id")) + .imolRorIHSNumber(rs.getString("imolRorIHSNumber")) + .portCallId(rs.getObject("portCallId", Integer.class)) + .facilityId(rs.getObject("facilityId", Integer.class)) + .facilityName(rs.getString("facilityName")) + .facilityType(rs.getString("facilityType")) + .subFacilityId(rs.getObject("subFacilityId", Integer.class)) + .subFacilityName(rs.getString("subFacilityName")) + .subFacilityType(rs.getString("subFacilityType")) + .countryCode(rs.getString("countryCode")) + .countryName(rs.getString("countryName")) + .draught(rs.getObject("draught", Double.class)) + .latitude(rs.getObject("latitude", Double.class)) + .longitude(rs.getObject("longitude", Double.class)) + .destination(rs.getString("destination")) + .iso2(rs.getString("iso2")) + .position(parseJson(rs.getString("position"))) + .build(); + + Timestamp movementDate = rs.getTimestamp("movementDate"); + if (movementDate != null) { + entity.setMovementDate(movementDate.toLocalDateTime()); + } + + return entity; + } + + private JsonNode parseJson(String json) { + try { + if (json == null) return null; + return new ObjectMapper().readTree(json); + } catch (Exception e) { + throw new RuntimeException("JSON 파싱 오류: " + json); + } + } + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/writer/AnchorageCallsWriter.java b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/writer/AnchorageCallsWriter.java new file mode 100644 index 0000000..198d223 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementAnchorageCalls/batch/writer/AnchorageCallsWriter.java @@ -0,0 +1,38 @@ +package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.shipMovement.batch.repository.ShipMovementRepository; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository.AnchorageCallsRepository; +import com.snp.batch.jobs.shipdetail.batch.repository.ShipDetailRepository; +import com.snp.batch.jobs.shipdetail.batch.repository.ShipHashRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 선박 상세 정보 Writer + */ +@Slf4j +@Component +public class AnchorageCallsWriter extends BaseWriter { + + private final AnchorageCallsRepository anchorageCallsRepository; + + + public AnchorageCallsWriter(AnchorageCallsRepository anchorageCallsRepository) { + super("AnchorageCalls"); + this.anchorageCallsRepository = anchorageCallsRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + + if (items.isEmpty()) { return; } + + anchorageCallsRepository.saveAll(items); + log.info("AnchorageCalls 데이터 저장: {} 건", items.size()); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/config/BerthCallsJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/config/BerthCallsJobConfig.java new file mode 100644 index 0000000..2b43ed5 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/config/BerthCallsJobConfig.java @@ -0,0 +1,107 @@ +package com.snp.batch.jobs.shipMovementBerthCalls.batch.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsDto; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.processor.BerthCallsProcessor; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.reader.BerthCallsReader; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.writer.BerthCallsWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * 선박 상세 정보 Import Job Config + * + * 특징: + * - ship_data 테이블에서 IMO 번호 조회 + * - IMO 번호를 100개씩 배치로 분할 + * - Maritime API GetShipsByIHSLRorIMONumbers 호출 + * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 + * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) + * + * 데이터 흐름: + * ShipMovementReader (ship_data → Maritime API) + * ↓ (PortCallDto) + * ShipMovementProcessor + * ↓ (ShipMovementEntity) + * ShipDetailDataWriter + * ↓ (ship_movement 테이블) + */ + +@Slf4j +@Configuration +public class BerthCallsJobConfig extends BaseJobConfig { + + private final BerthCallsProcessor berthCallsProcessor; + private final BerthCallsWriter berthCallsWriter; + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeApiWebClient; + private final ObjectMapper objectMapper; // ObjectMapper 주입 추가 + + public BerthCallsJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + BerthCallsProcessor berthCallsProcessor, + BerthCallsWriter berthCallsWriter, JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, + ObjectMapper objectMapper) { // ObjectMapper 주입 추가 + super(jobRepository, transactionManager); + this.berthCallsProcessor = berthCallsProcessor; + this.berthCallsWriter = berthCallsWriter; + this.jdbcTemplate = jdbcTemplate; + this.maritimeApiWebClient = maritimeApiWebClient; + this.objectMapper = objectMapper; // ObjectMapper 초기화 + } + + @Override + protected String getJobName() { + return "BerthCallsImportJob"; + } + + @Override + protected String getStepName() { + return "BerthCallsImportStep"; + } + + @Override + protected ItemReader createReader() { // 타입 변경 + return new BerthCallsReader(maritimeApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return berthCallsProcessor; + } + + @Override + protected ItemWriter createWriter() { // 타입 변경 + return berthCallsWriter; + } + + @Override + protected int getChunkSize() { + return 200; // API에서 100개씩 가져오므로 chunk도 100으로 설정 + } + + @Bean(name = "BerthCallsImportJob") + public Job berthCallsImportJob() { + return job(); + } + + @Bean(name = "BerthCallsImportStep") + public Step berthCallsImportStep() { + return step(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsDto.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsDto.java new file mode 100644 index 0000000..03b813e --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsDto.java @@ -0,0 +1,33 @@ +package com.snp.batch.jobs.shipMovementBerthCalls.batch.dto; + +import com.snp.batch.jobs.shipMovement.batch.dto.PositionDto; +import lombok.Data; + +@Data +public class BerthCallsDto { + private String movementType; + private String imolRorIHSNumber; + private String movementDate; + + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer parentFacilityId; + private String parentFacilityName; + private String parentFacilityType; + + private String countryCode; + private String countryName; + + private Double draught; + private Double latitude; + private Double longitude; + + private BerthCallsPositionDto position; + + private Integer parentCallId; + private String iso2; + private String eventStartDate; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsPositionDto.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsPositionDto.java new file mode 100644 index 0000000..ffc652c --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/dto/BerthCallsPositionDto.java @@ -0,0 +1,17 @@ +package com.snp.batch.jobs.shipMovementBerthCalls.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class BerthCallsPositionDto { + private boolean isNull; + private int stSrid; + private double lat; + @JsonProperty("long") + private double lon; + private double z; + private double m; + private boolean hasZ; + private boolean hasM; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/entiity/BerthCallsEntity.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/entiity/BerthCallsEntity.java new file mode 100644 index 0000000..4cc1b8f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/entiity/BerthCallsEntity.java @@ -0,0 +1,47 @@ +package com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity; + +import com.fasterxml.jackson.databind.JsonNode; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.SequenceGenerator; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class BerthCallsEntity { + + private Long id; + + private String movementType; + private String imolRorIHSNumber; + private LocalDateTime movementDate; + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer parentFacilityId; + private String parentFacilityName; + private String parentFacilityType; + + private String countryCode; + private String countryName; + + private Double draught; + private Double latitude; + private Double longitude; + + private JsonNode position; + + private Integer parentCallId; + private String iso2; + private LocalDateTime eventStartDate; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/processor/BerthCallsProcessor.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/processor/BerthCallsProcessor.java new file mode 100644 index 0000000..d196256 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/processor/BerthCallsProcessor.java @@ -0,0 +1,68 @@ +package com.snp.batch.jobs.shipMovementBerthCalls.batch.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsDto; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 선박 상세 정보 Processor + * ShipDetailDto → ShipDetailEntity 변환 + */ + +/** + * 선박 상세 정보 Processor (해시 비교 및 증분 데이터 추출) + * I: ShipDetailComparisonData (DB 해시 + API Map Data) + * O: ShipDetailUpdate (변경분) + */ +@Slf4j +@Component +public class BerthCallsProcessor extends BaseProcessor { + + private final ObjectMapper objectMapper; + + public BerthCallsProcessor(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + protected BerthCallsEntity processItem(BerthCallsDto dto) throws Exception { + log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", + dto.getImolRorIHSNumber(), dto.getFacilityName()); + + JsonNode positionNode = null; + if (dto.getPosition() != null) { + // Position 객체를 JsonNode로 변환 + positionNode = objectMapper.valueToTree(dto.getPosition()); + } + + BerthCallsEntity entity = BerthCallsEntity.builder() + .movementType(dto.getMovementType()) + .imolRorIHSNumber(dto.getImolRorIHSNumber()) + .movementDate(LocalDateTime.parse(dto.getMovementDate())) + .facilityId(dto.getFacilityId()) + .facilityName(dto.getFacilityName()) + .facilityType(dto.getFacilityType()) + .parentFacilityId(dto.getParentFacilityId()) + .parentFacilityName(dto.getParentFacilityName()) + .parentFacilityType(dto.getParentFacilityType()) + .countryCode(dto.getCountryCode()) + .countryName(dto.getCountryName()) + .draught(dto.getDraught()) + .latitude(dto.getLatitude()) + .longitude(dto.getLongitude()) + .position(positionNode) // JsonNode로 매핑 + .parentCallId(dto.getParentCallId()) + .iso2(dto.getIso2()) + .eventStartDate(LocalDateTime.parse(dto.getEventStartDate())) + .build(); + + return entity; + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/reader/BerthCallsReader.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/reader/BerthCallsReader.java new file mode 100644 index 0000000..9f6771d --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/reader/BerthCallsReader.java @@ -0,0 +1,213 @@ +package com.snp.batch.jobs.shipMovementBerthCalls.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * 선박 상세 정보 Reader (v2.0 - Chunk 기반) + * + * 기능: + * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) + * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 + * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 + * 4. Spring Batch가 100건씩 Process → Write 수행 + * + * Chunk 처리 흐름: + * - beforeFetch() → IMO 전체 조회 (1회) + * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) + * - read() → 1건씩 반환 (100번) + * - Processor/Writer → 100건 처리 + * - 반복... (1,718번의 Chunk) + * + * 기존 방식과의 차이: + * - 기존: 17만건 전체 메모리 로드 → Process → Write + * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) + */ +@Slf4j +@StepScope +public class BerthCallsReader extends BaseApiReader { + + private final JdbcTemplate jdbcTemplate; + + // 배치 처리 상태 + private List allImoNumbers; + // DB 해시값을 저장할 맵 + private Map dbMasterHashes; + private int currentBatchIndex = 0; + private final int batchSize = 5; + + // @Value("#{jobParameters['startDate']}") +// private String startDate; + private String startDate = "2025-01-01"; + + // @Value("#{jobParameters['stopDate']}") +// private String stopDate; + private String stopDate = "2025-12-31"; + + public BerthCallsReader(WebClient webClient, JdbcTemplate jdbcTemplate ) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "BerthCallsReader"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + this.dbMasterHashes = null; + } + + @Override + protected String getApiPath() { + return "/Movements/BerthCalls"; + } + + @Override + protected String getApiBaseUrl() { + return "https://webservices.maritime.spglobal.com"; + } + + private static final String GET_ALL_IMO_QUERY = +// "SELECT imo_number FROM ship_data ORDER BY id"; + "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_berthcalls) ORDER BY imo_number"; + + /** + * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 + */ + @Override + protected void beforeFetch() { + // 전처리 과정 + // Step 1. IMO 전체 번호 조회 + log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + // API 통계 초기화 + updateApiCallStats(totalBatches, 0); + } + + /** + * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 + * + * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 + * + * @return 다음 배치 100건 (더 이상 없으면 null) + */ + @Override + protected List fetchNextBatch() throws Exception { + + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + List response = callApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + + // 응답 처리 + if (response != null ) { + List berthCalls = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, berthCalls.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return berthCalls; + + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + /** + * Query Parameter를 사용한 API 호출 + * + * @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") + * @return API 응답 + */ + private List callApiWithBatch(String lrno) { + String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno; + + log.debug("[{}] API 호출: {}", getReaderName(), url); + + return webClient.get() + .uri(url) + .retrieve() + .bodyToFlux(BerthCallsDto.class) + .collectList() + .block(); + } + + @Override + protected void afterFetch(List data) { + if (data == null) { + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); + log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", + getReaderName(), allImoNumbers.size()); + } + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/repository/BerthCallsRepository.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/repository/BerthCallsRepository.java new file mode 100644 index 0000000..df2d707 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/repository/BerthCallsRepository.java @@ -0,0 +1,14 @@ +package com.snp.batch.jobs.shipMovementBerthCalls.batch.repository; + +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; + +import java.util.List; + +/** + * 선박 상세 정보 Repository 인터페이스 + */ + +public interface BerthCallsRepository { + void saveAll(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/repository/BerthCallsRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/repository/BerthCallsRepositoryImpl.java new file mode 100644 index 0000000..757d8a4 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/repository/BerthCallsRepositoryImpl.java @@ -0,0 +1,192 @@ +package com.snp.batch.jobs.shipMovementBerthCalls.batch.repository; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository.AnchorageCallsRepository; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; + +/** + * 선박 상세 정보 Repository 구현체 + * BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현 + */ +@Slf4j +@Repository("BerthCallsRepository") +public class BerthCallsRepositoryImpl extends BaseJdbcRepository + implements BerthCallsRepository { + + public BerthCallsRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override + protected String getTableName() { + return "snp_data.t_berthcall"; + } + + @Override + protected String getEntityName() { + return "BerthCalls"; + } + + @Override + protected String extractId(BerthCallsEntity entity) { + return entity.getImolRorIHSNumber(); + } + + @Override + public String getInsertSql() { + return """ + INSERT INTO snp_data.t_berthcall( + imo, + mvmn_type, + mvmn_dt, + fclty_id, + fclty_nm, + fclty_type, + up_fclty_id, + up_fclty_nm, + up_fclty_type, + ntn_cd, + ntn_nm, + draft, + lat, + lon, + prnt_call_id, + iso2_ntn_cd, + evt_start_dt, + lcinfo + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (imo, mvmn_dt) + DO UPDATE SET + mvmn_type = EXCLUDED.mvmn_type, + mvmn_dt = EXCLUDED.mvmn_dt, + fclty_id = EXCLUDED.fclty_id, + fclty_nm = EXCLUDED.fclty_nm, + fclty_type = EXCLUDED.fclty_type, + up_fclty_id = EXCLUDED.up_fclty_id, + up_fclty_nm = EXCLUDED.up_fclty_nm, + up_fclty_type = EXCLUDED.up_fclty_type, + ntn_cd = EXCLUDED.ntn_cd, + ntn_nm = EXCLUDED.ntn_nm, + draft = EXCLUDED.draft, + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + prnt_call_id = EXCLUDED.prnt_call_id, + iso2_ntn_cd = EXCLUDED.iso2_ntn_cd, + evt_start_dt = EXCLUDED.evt_start_dt, + lcinfo = EXCLUDED.lcinfo + """; + } + + @Override + protected String getUpdateSql() { + return null; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, BerthCallsEntity e) throws Exception { + int i = 1; + ps.setString(i++, e.getImolRorIHSNumber()); // imo + ps.setString(i++, e.getMovementType()); // mvmn_type + ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt + ps.setObject(i++, e.getFacilityId()); // fclty_id + ps.setString(i++, e.getFacilityName()); // fclty_nm + ps.setString(i++, e.getFacilityType()); // fclty_type + ps.setObject(i++, e.getParentFacilityId()); //up_fclty_id + ps.setString(i++, e.getParentFacilityName()); // up_fclty_nm + ps.setString(i++, e.getParentFacilityType()); //up_fclty_type + ps.setString(i++, e.getCountryCode()); // ntn_cd + ps.setString(i++, e.getCountryName()); // ntn_nm + setDoubleOrNull(ps, i++, e.getDraught()); // draft + setDoubleOrNull(ps, i++, e.getLatitude()); // lat + setDoubleOrNull(ps, i++, e.getLongitude());// lon + ps.setObject(i++, e.getParentCallId()); //prnt_call_id + ps.setString(i++, e.getIso2()); // iso2_ntn_cd + ps.setTimestamp(i++, e.getEventStartDate() != null ? Timestamp.valueOf(e.getEventStartDate()) : null); // evt_start_dt + + if (e.getPosition() != null) { + ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb) + } else { + ps.setNull(i++, java.sql.Types.OTHER); + } + } + + private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception { + if (value != null) { + ps.setDouble(index, value); + } else { + // java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정 + ps.setNull(index, java.sql.Types.DOUBLE); + } + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, BerthCallsEntity entity) throws Exception { + + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + public void saveAll(List entities) { + if (entities == null || entities.isEmpty()) return; + + log.info("BerthCalls 저장 시작 = {}건", entities.size()); + batchInsert(entities); + + } + + + /** + * ShipDetailEntity RowMapper + */ + private static class BerthCallsRowMapper implements RowMapper { + @Override + public BerthCallsEntity mapRow(ResultSet rs, int rowNum) throws SQLException { + BerthCallsEntity entity = BerthCallsEntity.builder() + .id(rs.getLong("id")) + .imolRorIHSNumber(rs.getString("imolRorIHSNumber")) + .facilityId(rs.getObject("facilityId", Integer.class)) + .facilityName(rs.getString("facilityName")) + .facilityType(rs.getString("facilityType")) + .countryCode(rs.getString("countryCode")) + .countryName(rs.getString("countryName")) + .draught(rs.getObject("draught", Double.class)) + .latitude(rs.getObject("latitude", Double.class)) + .longitude(rs.getObject("longitude", Double.class)) + .position(parseJson(rs.getString("position"))) + .build(); + + Timestamp movementDate = rs.getTimestamp("movementDate"); + if (movementDate != null) { + entity.setMovementDate(movementDate.toLocalDateTime()); + } + + return entity; + } + + private JsonNode parseJson(String json) { + try { + if (json == null) return null; + return new ObjectMapper().readTree(json); + } catch (Exception e) { + throw new RuntimeException("JSON 파싱 오류: " + json); + } + } + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/writer/BerthCallsWriter.java b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/writer/BerthCallsWriter.java new file mode 100644 index 0000000..03c1db0 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementBerthCalls/batch/writer/BerthCallsWriter.java @@ -0,0 +1,37 @@ +package com.snp.batch.jobs.shipMovementBerthCalls.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository.AnchorageCallsRepository; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.repository.BerthCallsRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 선박 상세 정보 Writer + */ +@Slf4j +@Component +public class BerthCallsWriter extends BaseWriter { + + private final BerthCallsRepository berthCallsRepository; + + + public BerthCallsWriter(BerthCallsRepository berthCallsRepository) { + super("BerthCalls"); + this.berthCallsRepository = berthCallsRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + + if (items.isEmpty()) { return; } + + berthCallsRepository.saveAll(items); + log.info("BerthCalls 데이터 저장: {} 건", items.size()); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/config/DarkActivityJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/config/DarkActivityJobConfig.java new file mode 100644 index 0000000..0076a18 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/config/DarkActivityJobConfig.java @@ -0,0 +1,106 @@ +package com.snp.batch.jobs.shipMovementDarkActivity.batch.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.dto.DarkActivityDto; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.entity.DarkActivityEntity; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.processor.DarkActivityProcessor; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.reader.DarkActivityReader; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.writer.DarkActivityWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * 선박 상세 정보 Import Job Config + * + * 특징: + * - ship_data 테이블에서 IMO 번호 조회 + * - IMO 번호를 100개씩 배치로 분할 + * - Maritime API GetShipsByIHSLRorIMONumbers 호출 + * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 + * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) + * + * 데이터 흐름: + * ShipMovementReader (ship_data → Maritime API) + * ↓ (PortCallDto) + * ShipMovementProcessor + * ↓ (ShipMovementEntity) + * ShipDetailDataWriter + * ↓ (ship_movement 테이블) + */ + +@Slf4j +@Configuration +public class DarkActivityJobConfig extends BaseJobConfig { + + private final DarkActivityProcessor darkActivityProcessor; + private final DarkActivityWriter darkActivityWriter; + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeApiWebClient; + + public DarkActivityJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + DarkActivityProcessor darkActivityProcessor, + DarkActivityWriter darkActivityWriter, JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, + ObjectMapper objectMapper) { // ObjectMapper 주입 추가 + super(jobRepository, transactionManager); + this.darkActivityProcessor = darkActivityProcessor; + this.darkActivityWriter = darkActivityWriter; + this.jdbcTemplate = jdbcTemplate; + this.maritimeApiWebClient = maritimeApiWebClient; + } + + @Override + protected String getJobName() { + return "DarkActivityImportJob"; + } + + @Override + protected String getStepName() { + return "DarkActivityImportStep"; + } + + @Override + protected ItemReader createReader() { // 타입 변경 + // Reader 생성자 수정: ObjectMapper를 전달합니다. + return new DarkActivityReader(maritimeApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return darkActivityProcessor; + } + + @Override + protected ItemWriter createWriter() { // 타입 변경 + return darkActivityWriter; + } + + @Override + protected int getChunkSize() { + return 5; // API에서 100개씩 가져오므로 chunk도 100으로 설정 + } + + @Bean(name = "DarkActivityImportJob") + public Job darkActivityImportJob() { + return job(); + } + + @Bean(name = "DarkActivityImportStep") + public Step darkActivityImportStep() { + return step(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/dto/DarkActivityDto.java b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/dto/DarkActivityDto.java new file mode 100644 index 0000000..9cb7b81 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/dto/DarkActivityDto.java @@ -0,0 +1,30 @@ +package com.snp.batch.jobs.shipMovementDarkActivity.batch.dto; + +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsPositionDto; +import lombok.Data; + +@Data +public class DarkActivityDto { + private String movementType; + private String imolRorIHSNumber; + private String movementDate; + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer subFacilityId; + private String subFacilityName; + private String subFacilityType; + + private String countryCode; + private String countryName; + + private Double draught; + private Double latitude; + private Double longitude; + + private AnchorageCallsPositionDto position; + + private String eventStartDate; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/entity/DarkActivityEntity.java b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/entity/DarkActivityEntity.java new file mode 100644 index 0000000..f05aea5 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/entity/DarkActivityEntity.java @@ -0,0 +1,41 @@ +package com.snp.batch.jobs.shipMovementDarkActivity.batch.entity; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class DarkActivityEntity { + + private Long id; + + private String movementType; + private String imolRorIHSNumber; + private LocalDateTime movementDate; + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer subFacilityId; + private String subFacilityName; + private String subFacilityType; + + private String countryCode; + private String countryName; + + private Double draught; + private Double latitude; + private Double longitude; + + private JsonNode position; + + private LocalDateTime eventStartDate; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/processor/DarkActivityProcessor.java b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/processor/DarkActivityProcessor.java new file mode 100644 index 0000000..e465f8a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/processor/DarkActivityProcessor.java @@ -0,0 +1,66 @@ +package com.snp.batch.jobs.shipMovementDarkActivity.batch.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.dto.DarkActivityDto; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.entity.DarkActivityEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 선박 상세 정보 Processor + * ShipDetailDto → ShipDetailEntity 변환 + */ + +/** + * 선박 상세 정보 Processor (해시 비교 및 증분 데이터 추출) + * I: ShipDetailComparisonData (DB 해시 + API Map Data) + * O: ShipDetailUpdate (변경분) + */ +@Slf4j +@Component +public class DarkActivityProcessor extends BaseProcessor { + + private final ObjectMapper objectMapper; + + public DarkActivityProcessor(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + protected DarkActivityEntity processItem(DarkActivityDto dto) throws Exception { + log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", + dto.getImolRorIHSNumber(), dto.getFacilityName()); + + JsonNode positionNode = null; + if (dto.getPosition() != null) { + // Position 객체를 JsonNode로 변환 + positionNode = objectMapper.valueToTree(dto.getPosition()); + } + + DarkActivityEntity entity = DarkActivityEntity.builder() + .movementType(dto.getMovementType()) + .imolRorIHSNumber(dto.getImolRorIHSNumber()) + .movementDate(LocalDateTime.parse(dto.getMovementDate())) + .facilityId(dto.getFacilityId()) + .facilityName(dto.getFacilityName()) + .facilityType(dto.getFacilityType()) + .subFacilityId(dto.getSubFacilityId()) + .subFacilityName(dto.getSubFacilityName()) + .subFacilityType(dto.getSubFacilityType()) + .countryCode(dto.getCountryCode()) + .countryName(dto.getCountryName()) + .draught(dto.getDraught()) + .latitude(dto.getLatitude()) + .longitude(dto.getLongitude()) + .position(positionNode) // JsonNode로 매핑 + .eventStartDate(LocalDateTime.parse(dto.getEventStartDate())) + .build(); + + return entity; + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/reader/DarkActivityReader.java b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/reader/DarkActivityReader.java new file mode 100644 index 0000000..bb74cd0 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/reader/DarkActivityReader.java @@ -0,0 +1,212 @@ +package com.snp.batch.jobs.shipMovementDarkActivity.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.dto.DarkActivityDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * 선박 상세 정보 Reader (v2.0 - Chunk 기반) + * + * 기능: + * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) + * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 + * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 + * 4. Spring Batch가 100건씩 Process → Write 수행 + * + * Chunk 처리 흐름: + * - beforeFetch() → IMO 전체 조회 (1회) + * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) + * - read() → 1건씩 반환 (100번) + * - Processor/Writer → 100건 처리 + * - 반복... (1,718번의 Chunk) + * + * 기존 방식과의 차이: + * - 기존: 17만건 전체 메모리 로드 → Process → Write + * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) + */ +@Slf4j +@StepScope +public class DarkActivityReader extends BaseApiReader { + + private final JdbcTemplate jdbcTemplate; + + // 배치 처리 상태 + private List allImoNumbers; + // DB 해시값을 저장할 맵 + private Map dbMasterHashes; + private int currentBatchIndex = 0; + private final int batchSize = 5; + + // @Value("#{jobParameters['startDate']}") +// private String startDate; + private String startDate = "2025-01-01"; + + // @Value("#{jobParameters['stopDate']}") +// private String stopDate; + private String stopDate = "2025-12-31"; + + public DarkActivityReader(WebClient webClient, JdbcTemplate jdbcTemplate ) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "DarkActivityReader"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + this.dbMasterHashes = null; + } + + @Override + protected String getApiPath() { + return "/Movements/DarkActivity"; + } + + @Override + protected String getApiBaseUrl() { + return "https://webservices.maritime.spglobal.com"; + } + + private static final String GET_ALL_IMO_QUERY = +// "SELECT imo_number FROM ship_data ORDER BY id"; + "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_darkactivity) ORDER BY imo_number"; + + /** + * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 + */ + @Override + protected void beforeFetch() { + // 전처리 과정 + // Step 1. IMO 전체 번호 조회 + log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + // API 통계 초기화 + updateApiCallStats(totalBatches, 0); + } + + /** + * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 + * + * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 + * + * @return 다음 배치 100건 (더 이상 없으면 null) + */ + @Override + protected List fetchNextBatch() throws Exception { + + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + // API 호출 + List response = callApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + + // 응답 처리 + if (response != null ) { + List darkActivityList = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, darkActivityList.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return darkActivityList; + + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + /** + * Query Parameter를 사용한 API 호출 + * + * @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") + * @return API 응답 + */ + private List callApiWithBatch(String lrno) { + String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno; + + log.debug("[{}] API 호출: {}", getReaderName(), url); + + return webClient.get() + .uri(url) + .retrieve() + .bodyToFlux(DarkActivityDto.class) + .collectList() + .block(); + } + + @Override + protected void afterFetch(List data) { + if (data == null) { + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); + log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", + getReaderName(), allImoNumbers.size()); + } + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/repository/DarkActivityRepository.java b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/repository/DarkActivityRepository.java new file mode 100644 index 0000000..f18da07 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/repository/DarkActivityRepository.java @@ -0,0 +1,14 @@ +package com.snp.batch.jobs.shipMovementDarkActivity.batch.repository; + +import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.entity.DarkActivityEntity; + +import java.util.List; + +/** + * 선박 상세 정보 Repository 인터페이스 + */ + +public interface DarkActivityRepository { + void saveAll(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/repository/DarkActivityRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/repository/DarkActivityRepositoryImpl.java new file mode 100644 index 0000000..89eb8fd --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/repository/DarkActivityRepositoryImpl.java @@ -0,0 +1,186 @@ +package com.snp.batch.jobs.shipMovementDarkActivity.batch.repository; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.repository.BerthCallsRepository; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.entity.DarkActivityEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; + +/** + * 선박 상세 정보 Repository 구현체 + * BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현 + */ +@Slf4j +@Repository("") +public class DarkActivityRepositoryImpl extends BaseJdbcRepository + implements DarkActivityRepository { + + public DarkActivityRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override + protected String getTableName() { + return "snp_data.t_darkactivity"; + } + + @Override + protected String getEntityName() { + return "DarkActivity"; + } + + @Override + protected String extractId(DarkActivityEntity entity) { + return entity.getImolRorIHSNumber(); + } + + @Override + public String getInsertSql() { + return """ + INSERT INTO snp_data.t_darkactivity( + imo, + mvmn_type, + mvmn_dt, + fclty_id, + fclty_nm, + fclty_type, + lwrnk_fclty_id, + lwrnk_fclty_nm, + lwrnk_fclty_type, + ntn_cd, + ntn_nm, + draft, + lat, + lon, + evt_start_dt, + lcinfo + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (imo, mvmn_dt) + DO UPDATE SET + mvmn_type = EXCLUDED.mvmn_type, + mvmn_dt = EXCLUDED.mvmn_dt, + fclty_id = EXCLUDED.fclty_id, + fclty_nm = EXCLUDED.fclty_nm, + fclty_type = EXCLUDED.fclty_type, + lwrnk_fclty_id = EXCLUDED.lwrnk_fclty_id, + lwrnk_fclty_nm = EXCLUDED.lwrnk_fclty_nm, + lwrnk_fclty_type = EXCLUDED.lwrnk_fclty_type, + ntn_cd = EXCLUDED.ntn_cd, + ntn_nm = EXCLUDED.ntn_nm, + draft = EXCLUDED.draft, + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + evt_start_dt = EXCLUDED.evt_start_dt, + lcinfo = EXCLUDED.lcinfo + """; + } + + @Override + protected String getUpdateSql() { + return null; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, DarkActivityEntity e) throws Exception { + int i = 1; + ps.setString(i++, e.getImolRorIHSNumber()); // imo + ps.setString(i++, e.getMovementType()); // mvmn_type + ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt + ps.setObject(i++, e.getFacilityId()); // fclty_id + ps.setString(i++, e.getFacilityName()); // fclty_nm + ps.setString(i++, e.getFacilityType()); // fclty_type + ps.setObject(i++, e.getSubFacilityId()); //lwrnk_fclty_id + ps.setString(i++, e.getSubFacilityName()); // lwrnk_fclty_nm + ps.setString(i++, e.getSubFacilityType()); //lwrnk_fclty_type + ps.setString(i++, e.getCountryCode()); // ntn_cd + ps.setString(i++, e.getCountryName()); // ntn_nm + setDoubleOrNull(ps, i++, e.getDraught()); // draft + setDoubleOrNull(ps, i++, e.getLatitude()); // lat + setDoubleOrNull(ps, i++, e.getLongitude());// lon + ps.setTimestamp(i++, e.getEventStartDate() != null ? Timestamp.valueOf(e.getEventStartDate()) : null); // evt_start_dt + + if (e.getPosition() != null) { + ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb) + } else { + ps.setNull(i++, java.sql.Types.OTHER); + } + } + + private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception { + if (value != null) { + ps.setDouble(index, value); + } else { + // java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정 + ps.setNull(index, java.sql.Types.DOUBLE); + } + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, DarkActivityEntity entity) throws Exception { + + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + public void saveAll(List entities) { + if (entities == null || entities.isEmpty()) return; + + log.info("DarkActivity 저장 시작 = {}건", entities.size()); + batchInsert(entities); + + } + + + /** + * ShipDetailEntity RowMapper + */ + private static class DarkActivityRowMapper implements RowMapper { + @Override + public DarkActivityEntity mapRow(ResultSet rs, int rowNum) throws SQLException { + DarkActivityEntity entity = DarkActivityEntity.builder() + .id(rs.getLong("id")) + .imolRorIHSNumber(rs.getString("imolRorIHSNumber")) + .facilityId(rs.getObject("facilityId", Integer.class)) + .facilityName(rs.getString("facilityName")) + .facilityType(rs.getString("facilityType")) + .countryCode(rs.getString("countryCode")) + .countryName(rs.getString("countryName")) + .draught(rs.getObject("draught", Double.class)) + .latitude(rs.getObject("latitude", Double.class)) + .longitude(rs.getObject("longitude", Double.class)) + .position(parseJson(rs.getString("position"))) + .build(); + + Timestamp movementDate = rs.getTimestamp("movementDate"); + if (movementDate != null) { + entity.setMovementDate(movementDate.toLocalDateTime()); + } + + return entity; + } + + private JsonNode parseJson(String json) { + try { + if (json == null) return null; + return new ObjectMapper().readTree(json); + } catch (Exception e) { + throw new RuntimeException("JSON 파싱 오류: " + json); + } + } + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/writer/DarkActivityWriter.java b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/writer/DarkActivityWriter.java new file mode 100644 index 0000000..901876c --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementDarkActivity/batch/writer/DarkActivityWriter.java @@ -0,0 +1,37 @@ +package com.snp.batch.jobs.shipMovementDarkActivity.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.repository.BerthCallsRepository; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.entity.DarkActivityEntity; +import com.snp.batch.jobs.shipMovementDarkActivity.batch.repository.DarkActivityRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 선박 상세 정보 Writer + */ +@Slf4j +@Component +public class DarkActivityWriter extends BaseWriter { + + private final DarkActivityRepository darkActivityRepository; + + + public DarkActivityWriter(DarkActivityRepository darkActivityRepository) { + super("DarkActivity"); + this.darkActivityRepository = darkActivityRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + + if (items.isEmpty()) { return; } + + darkActivityRepository.saveAll(items); + log.info("DarkActivity 데이터 저장: {} 건", items.size()); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/config/StsOperationJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/config/StsOperationJobConfig.java new file mode 100644 index 0000000..1d57569 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/config/StsOperationJobConfig.java @@ -0,0 +1,110 @@ +package com.snp.batch.jobs.shipMovementStsOperations.batch.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsDto; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.processor.BerthCallsProcessor; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.reader.BerthCallsReader; +import com.snp.batch.jobs.shipMovementBerthCalls.batch.writer.BerthCallsWriter; +import com.snp.batch.jobs.shipMovementStsOperations.batch.dto.StsOperationDto; +import com.snp.batch.jobs.shipMovementStsOperations.batch.entity.StsOperationEntity; +import com.snp.batch.jobs.shipMovementStsOperations.batch.processor.StsOperationProcessor; +import com.snp.batch.jobs.shipMovementStsOperations.batch.reader.StsOperationReader; +import com.snp.batch.jobs.shipMovementStsOperations.batch.writer.StsOperationWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * 선박 상세 정보 Import Job Config + * + * 특징: + * - ship_data 테이블에서 IMO 번호 조회 + * - IMO 번호를 100개씩 배치로 분할 + * - Maritime API GetShipsByIHSLRorIMONumbers 호출 + * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 + * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) + * + * 데이터 흐름: + * ShipMovementReader (ship_data → Maritime API) + * ↓ (PortCallDto) + * ShipMovementProcessor + * ↓ (ShipMovementEntity) + * ShipDetailDataWriter + * ↓ (ship_movement 테이블) + */ + +@Slf4j +@Configuration +public class StsOperationJobConfig extends BaseJobConfig { + + private final StsOperationProcessor stsOperationProcessor; + private final StsOperationWriter stsOperationWriter; + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeApiWebClient; + + public StsOperationJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + StsOperationProcessor stsOperationProcessor, + StsOperationWriter stsOperationWriter, JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + super(jobRepository, transactionManager); + this.stsOperationProcessor = stsOperationProcessor; + this.stsOperationWriter = stsOperationWriter; + this.jdbcTemplate = jdbcTemplate; + this.maritimeApiWebClient = maritimeApiWebClient; + } + + @Override + protected String getJobName() { + return "STSOperationImportJob"; + } + + @Override + protected String getStepName() { + return "STSOperationImportStep"; + } + + @Override + protected ItemReader createReader() { // 타입 변경 + // Reader 생성자 수정: ObjectMapper를 전달합니다. + return new StsOperationReader(maritimeApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return stsOperationProcessor; + } + + @Override + protected ItemWriter createWriter() { // 타입 변경 + return stsOperationWriter; + } + + @Override + protected int getChunkSize() { + return 200; // API에서 100개씩 가져오므로 chunk도 100으로 설정 + } + + @Bean(name = "STSOperationImportJob") + public Job stsOperationImportJob() { + return job(); + } + + @Bean(name = "STSOperationImportStep") + public Step stsOperationImportStep() { + return step(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationDto.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationDto.java new file mode 100644 index 0000000..16c8db8 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationDto.java @@ -0,0 +1,35 @@ +package com.snp.batch.jobs.shipMovementStsOperations.batch.dto; + +import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsPositionDto; +import lombok.Data; + +@Data +public class StsOperationDto { + private String movementType; + private String imolRorIHSNumber; + private String movementDate; + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer parentFacilityId; + private String parentFacilityName; + private String parentFacilityType; + + private Double draught; + private Double latitude; + private Double longitude; + + private StsOperationPositionDto position; + + private Long parentCallId; + + private String countryCode; + private String countryName; + + private String stsLocation; + private String stsType; + + private String eventStartDate; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationPositionDto.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationPositionDto.java new file mode 100644 index 0000000..85496f0 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/dto/StsOperationPositionDto.java @@ -0,0 +1,17 @@ +package com.snp.batch.jobs.shipMovementStsOperations.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StsOperationPositionDto { + private boolean isNull; + private int stSrid; + private double lat; + @JsonProperty("long") + private double lon; + private double z; + private double m; + private boolean hasZ; + private boolean hasM; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/entity/StsOperationEntity.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/entity/StsOperationEntity.java new file mode 100644 index 0000000..e47acf0 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/entity/StsOperationEntity.java @@ -0,0 +1,45 @@ +package com.snp.batch.jobs.shipMovementStsOperations.batch.entity; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class StsOperationEntity { + + private Long id; + + private String movementType; + private String imolRorIHSNumber; + private java.time.LocalDateTime movementDate; + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer parentFacilityId; + private String parentFacilityName; + private String parentFacilityType; + + private Double draught; + private Double latitude; + private Double longitude; + + private JsonNode position; + + private Long parentCallId; + + private String countryCode; + private String countryName; + + private String stsLocation; + private String stsType; + private LocalDateTime eventStartDate; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/processor/StsOperationProcessor.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/processor/StsOperationProcessor.java new file mode 100644 index 0000000..fdb73bd --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/processor/StsOperationProcessor.java @@ -0,0 +1,69 @@ +package com.snp.batch.jobs.shipMovementStsOperations.batch.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.shipMovementStsOperations.batch.dto.StsOperationDto; +import com.snp.batch.jobs.shipMovementStsOperations.batch.entity.StsOperationEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 선박 상세 정보 Processor + * ShipDetailDto → ShipDetailEntity 변환 + */ + +/** + * 선박 상세 정보 Processor (해시 비교 및 증분 데이터 추출) + * I: ShipDetailComparisonData (DB 해시 + API Map Data) + * O: ShipDetailUpdate (변경분) + */ +@Slf4j +@Component +public class StsOperationProcessor extends BaseProcessor { + + private final ObjectMapper objectMapper; + + public StsOperationProcessor(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + protected StsOperationEntity processItem(StsOperationDto dto) throws Exception { + log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", + dto.getImolRorIHSNumber(), dto.getFacilityName()); + + JsonNode positionNode = null; + if (dto.getPosition() != null) { + // Position 객체를 JsonNode로 변환 + positionNode = objectMapper.valueToTree(dto.getPosition()); + } + + StsOperationEntity entity = StsOperationEntity.builder() + .movementType(dto.getMovementType()) + .imolRorIHSNumber(dto.getImolRorIHSNumber()) + .movementDate(LocalDateTime.parse(dto.getMovementDate())) + .facilityId(dto.getFacilityId()) + .facilityName(dto.getFacilityName()) + .facilityType(dto.getFacilityType()) + .parentFacilityId(dto.getParentFacilityId()) + .parentFacilityName(dto.getParentFacilityName()) + .parentFacilityType(dto.getParentFacilityType()) + .draught(dto.getDraught()) + .latitude(dto.getLatitude()) + .longitude(dto.getLongitude()) + .position(positionNode) // JsonNode로 매핑 + .parentCallId(dto.getParentCallId()) + .countryCode(dto.getCountryCode()) + .countryName(dto.getCountryName()) + .stsLocation(dto.getStsLocation()) + .stsType(dto.getStsType()) + .eventStartDate(LocalDateTime.parse(dto.getEventStartDate())) + .build(); + + return entity; + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/reader/StsOperationReader.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/reader/StsOperationReader.java new file mode 100644 index 0000000..a6c286c --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/reader/StsOperationReader.java @@ -0,0 +1,213 @@ +package com.snp.batch.jobs.shipMovementStsOperations.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.shipMovementStsOperations.batch.dto.StsOperationDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * 선박 상세 정보 Reader (v2.0 - Chunk 기반) + * + * 기능: + * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) + * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 + * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 + * 4. Spring Batch가 100건씩 Process → Write 수행 + * + * Chunk 처리 흐름: + * - beforeFetch() → IMO 전체 조회 (1회) + * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) + * - read() → 1건씩 반환 (100번) + * - Processor/Writer → 100건 처리 + * - 반복... (1,718번의 Chunk) + * + * 기존 방식과의 차이: + * - 기존: 17만건 전체 메모리 로드 → Process → Write + * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) + */ +@Slf4j +@StepScope +public class StsOperationReader extends BaseApiReader { + + private final JdbcTemplate jdbcTemplate; + + // 배치 처리 상태 + private List allImoNumbers; + // DB 해시값을 저장할 맵 + private Map dbMasterHashes; + private int currentBatchIndex = 0; + private final int batchSize = 5; + + // @Value("#{jobParameters['startDate']}") +// private String startDate; + private String startDate = "2025-01-01"; + + // @Value("#{jobParameters['stopDate']}") +// private String stopDate; + private String stopDate = "2025-12-31"; + + public StsOperationReader(WebClient webClient, JdbcTemplate jdbcTemplate ) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "StsOperationReader"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + this.dbMasterHashes = null; + } + + @Override + protected String getApiPath() { + return "/Movements/StsOperations"; + } + + @Override + protected String getApiBaseUrl() { + return "https://webservices.maritime.spglobal.com"; + } + + private static final String GET_ALL_IMO_QUERY = +// "SELECT imo_number FROM ship_data ORDER BY id"; + "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_stsoperation) ORDER BY imo_number"; + + /** + * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 + */ + @Override + protected void beforeFetch() { + // 전처리 과정 + // Step 1. IMO 전체 번호 조회 + log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + // API 통계 초기화 + updateApiCallStats(totalBatches, 0); + } + + /** + * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 + * + * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 + * + * @return 다음 배치 100건 (더 이상 없으면 null) + */ + @Override + protected List fetchNextBatch() throws Exception { + + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + List response = callApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + + // 응답 처리 + if (response != null ) { + List responseList = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, responseList.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return responseList; + + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + /** + * Query Parameter를 사용한 API 호출 + * + * @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") + * @return API 응답 + */ + private List callApiWithBatch(String lrno) { + String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno; + + log.debug("[{}] API 호출: {}", getReaderName(), url); + + return webClient.get() + .uri(url) + .retrieve() + .bodyToFlux(StsOperationDto.class) + .collectList() + .block(); + } + + @Override + protected void afterFetch(List data) { + if (data == null) { + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); + log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", + getReaderName(), allImoNumbers.size()); + } + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/repository/StsOperationRepository.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/repository/StsOperationRepository.java new file mode 100644 index 0000000..a081c51 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/repository/StsOperationRepository.java @@ -0,0 +1,12 @@ +package com.snp.batch.jobs.shipMovementStsOperations.batch.repository; + +import com.snp.batch.jobs.shipMovementStsOperations.batch.entity.StsOperationEntity; +import java.util.List; + +/** + * 선박 상세 정보 Repository 인터페이스 + */ + +public interface StsOperationRepository { + void saveAll(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/repository/StsOperationRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/repository/StsOperationRepositoryImpl.java new file mode 100644 index 0000000..06cb967 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/repository/StsOperationRepositoryImpl.java @@ -0,0 +1,162 @@ +package com.snp.batch.jobs.shipMovementStsOperations.batch.repository; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.shipMovementStsOperations.batch.entity.StsOperationEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; + +/** + * 선박 상세 정보 Repository 구현체 + * BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현 + */ +@Slf4j +@Repository("StsOperationRepository") +public class StsOperationRepositoryImpl extends BaseJdbcRepository + implements StsOperationRepository { + + public StsOperationRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override + protected String getTableName() { + return "snp_data.t_stsoperation"; + } + + @Override + protected String getEntityName() { + return "StsOperation"; + } + + @Override + protected String extractId(StsOperationEntity entity) { + return entity.getImolRorIHSNumber(); + } + + @Override + public String getInsertSql() { + return """ + INSERT INTO snp_data.t_stsoperation( + imo, + mvmn_type, + mvmn_dt, + fclty_id, + fclty_nm, + fclty_type, + up_fclty_id, + up_fclty_nm, + up_fclty_type, + draft, + lat, + lon, + prnt_call_id, + ntn_cd, + ntn_nm, + sts_location, + sts_type, + evt_start_dt, + lcinfo + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (imo, mvmn_dt) + DO UPDATE SET + mvmn_type = EXCLUDED.mvmn_type, + mvmn_dt = EXCLUDED.mvmn_dt, + fclty_id = EXCLUDED.fclty_id, + fclty_nm = EXCLUDED.fclty_nm, + fclty_type = EXCLUDED.fclty_type, + up_fclty_id = EXCLUDED.up_fclty_id, + up_fclty_nm = EXCLUDED.up_fclty_nm, + up_fclty_type = EXCLUDED.up_fclty_type, + draft = EXCLUDED.draft, + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + prnt_call_id = EXCLUDED.prnt_call_id, + ntn_cd = EXCLUDED.ntn_cd, + ntn_nm = EXCLUDED.ntn_nm, + sts_location = EXCLUDED.sts_location, + sts_type = EXCLUDED.sts_type, + evt_start_dt = EXCLUDED.evt_start_dt, + lcinfo = EXCLUDED.lcinfo + """; + } + + @Override + protected String getUpdateSql() { + return null; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, StsOperationEntity e) throws Exception { + int i = 1; + ps.setString(i++, safeString(e.getImolRorIHSNumber())); // imo + ps.setString(i++, safeString(e.getMovementType())); // mvmn_type + ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt + ps.setObject(i++, e.getFacilityId()); // fclty_id + ps.setString(i++, safeString(e.getFacilityName())); // fclty_nm + ps.setString(i++, safeString(e.getFacilityType())); // fclty_type + ps.setObject(i++, e.getParentFacilityId()); //up_fclty_id + ps.setString(i++, safeString(e.getParentFacilityName())); // up_fclty_nm + ps.setString(i++, safeString(e.getParentFacilityType())); //up_fclty_type + setDoubleOrNull(ps, i++, e.getDraught()); // draft + setDoubleOrNull(ps, i++, e.getLatitude()); // lat + setDoubleOrNull(ps, i++, e.getLongitude());// lon + ps.setObject(i++, e.getParentCallId()); //prnt_call_id + ps.setString(i++, safeString(e.getCountryCode())); // ntn_cd + ps.setString(i++, safeString(e.getCountryName())); // ntn_nm + ps.setString(i++, safeString(e.getStsLocation())); // iso2_ntn_cd + ps.setString(i++, safeString(e.getStsType())); + ps.setTimestamp(i++, e.getEventStartDate() != null ? Timestamp.valueOf(e.getEventStartDate()) : null); // evt_start_dt + + if (e.getPosition() != null) { + ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb) + } else { + ps.setNull(i++, java.sql.Types.OTHER); + } + } + + private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception { + if (value != null) { + ps.setDouble(index, value); + } else { + // java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정 + ps.setNull(index, java.sql.Types.DOUBLE); + } + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, StsOperationEntity entity) throws Exception { + + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + public void saveAll(List entities) { + if (entities == null || entities.isEmpty()) return; + + log.info("StsOperation 저장 시작 = {}건", entities.size()); + batchInsert(entities); + + } + private String safeString(String v) { + if (v == null) return null; + + v = v.trim(); + + return v.isEmpty() ? null : v; + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/writer/StsOperationWriter.java b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/writer/StsOperationWriter.java new file mode 100644 index 0000000..44c5536 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementStsOperations/batch/writer/StsOperationWriter.java @@ -0,0 +1,36 @@ +package com.snp.batch.jobs.shipMovementStsOperations.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.shipMovementStsOperations.batch.entity.StsOperationEntity; +import com.snp.batch.jobs.shipMovementStsOperations.batch.repository.StsOperationRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 선박 상세 정보 Writer + */ +@Slf4j +@Component +public class StsOperationWriter extends BaseWriter { + + private final StsOperationRepository stsOperationRepository; + + + public StsOperationWriter(StsOperationRepository stsOperationRepository) { + super("StsOperation"); + this.stsOperationRepository = stsOperationRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + + if (items.isEmpty()) { return; } + + stsOperationRepository.saveAll(items); + log.info("STS OPERATION 데이터 저장: {} 건", items.size()); + + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/config/TerminalCallsJobConfig.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/config/TerminalCallsJobConfig.java new file mode 100644 index 0000000..a298f9e --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/config/TerminalCallsJobConfig.java @@ -0,0 +1,104 @@ +package com.snp.batch.jobs.shipMovementTerminalCalls.batch.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto.TerminalCallsDto; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.processor.TerminalCallsProcessor; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.reader.TerminalCallsReader; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.writer.TerminalCallsWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * 선박 상세 정보 Import Job Config + * + * 특징: + * - ship_data 테이블에서 IMO 번호 조회 + * - IMO 번호를 100개씩 배치로 분할 + * - Maritime API GetShipsByIHSLRorIMONumbers 호출 + * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 + * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) + * + * 데이터 흐름: + * ShipMovementReader (ship_data → Maritime API) + * ↓ (PortCallDto) + * ShipMovementProcessor + * ↓ (ShipMovementEntity) + * ShipDetailDataWriter + * ↓ (ship_movement 테이블) + */ + +@Slf4j +@Configuration +public class TerminalCallsJobConfig extends BaseJobConfig { + + private final TerminalCallsProcessor terminalCallsProcessor; + private final TerminalCallsWriter terminalCallsWriter; + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeApiWebClient; + + public TerminalCallsJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + TerminalCallsProcessor terminalCallsProcessor, + TerminalCallsWriter terminalCallsWriter, JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 + super(jobRepository, transactionManager); + this.terminalCallsProcessor = terminalCallsProcessor; + this.terminalCallsWriter = terminalCallsWriter; + this.jdbcTemplate = jdbcTemplate; + this.maritimeApiWebClient = maritimeApiWebClient; + } + + @Override + protected String getJobName() { + return "TerminalCallsImportJob"; + } + + @Override + protected String getStepName() { + return "TerminalCallImportStep"; + } + + @Override + protected ItemReader createReader() { // 타입 변경 + return new TerminalCallsReader(maritimeApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return terminalCallsProcessor; + } + + @Override + protected ItemWriter createWriter() { // 타입 변경 + return terminalCallsWriter; + } + + @Override + protected int getChunkSize() { + return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정 + } + + @Bean(name = "TerminalCallsImportJob") + public Job terminalCallsImportJob() { + return job(); + } + + @Bean(name = "TerminalCallImportStep") + public Step terminalCallImportStep() { + return step(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/dto/TerminalCallsDto.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/dto/TerminalCallsDto.java new file mode 100644 index 0000000..d35a7d7 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/dto/TerminalCallsDto.java @@ -0,0 +1,32 @@ +package com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto; + +import lombok.Data; + +@Data +public class TerminalCallsDto { + private String movementType; + private String imolRorIHSNumber; + private String movementDate; + + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer parentFacilityId; + private String parentFacilityName; + private String parentFacilityType; + + private String countryCode; + private String countryName; + + private Double draught; + private Double latitude; + private Double longitude; + + private TerminalCallsPositionDto position; + + private Integer parentCallId; + private String iso2; + private String eventStartDate; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/dto/TerminalCallsPositionDto.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/dto/TerminalCallsPositionDto.java new file mode 100644 index 0000000..844f8bb --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/dto/TerminalCallsPositionDto.java @@ -0,0 +1,17 @@ +package com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class TerminalCallsPositionDto { + private boolean isNull; + private int stSrid; + private double lat; + @JsonProperty("long") + private double lon; + private double z; + private double m; + private boolean hasZ; + private boolean hasM; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/entity/TerminalCallsEntity.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/entity/TerminalCallsEntity.java new file mode 100644 index 0000000..a003375 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/entity/TerminalCallsEntity.java @@ -0,0 +1,43 @@ +package com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class TerminalCallsEntity { + + private Long id; + + private String movementType; + private String imolRorIHSNumber; + private LocalDateTime movementDate; + + private Integer facilityId; + private String facilityName; + private String facilityType; + + private Integer parentFacilityId; + private String parentFacilityName; + private String parentFacilityType; + + private String countryCode; + private String countryName; + + private Double draught; + private Double latitude; + private Double longitude; + + private JsonNode position; + + private Integer parentCallId; + private String iso2; + private LocalDateTime eventStartDate; +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/processor/TerminalCallsProcessor.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/processor/TerminalCallsProcessor.java new file mode 100644 index 0000000..8438cc4 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/processor/TerminalCallsProcessor.java @@ -0,0 +1,68 @@ +package com.snp.batch.jobs.shipMovementTerminalCalls.batch.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto.TerminalCallsDto; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 선박 상세 정보 Processor + * ShipDetailDto → ShipDetailEntity 변환 + */ + +/** + * 선박 상세 정보 Processor (해시 비교 및 증분 데이터 추출) + * I: ShipDetailComparisonData (DB 해시 + API Map Data) + * O: ShipDetailUpdate (변경분) + */ +@Slf4j +@Component +public class TerminalCallsProcessor extends BaseProcessor { + + private final ObjectMapper objectMapper; + + public TerminalCallsProcessor(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + protected TerminalCallsEntity processItem(TerminalCallsDto dto) throws Exception { + log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", + dto.getImolRorIHSNumber(), dto.getFacilityName()); + + JsonNode positionNode = null; + if (dto.getPosition() != null) { + // Position 객체를 JsonNode로 변환 + positionNode = objectMapper.valueToTree(dto.getPosition()); + } + + TerminalCallsEntity entity = TerminalCallsEntity.builder() + .movementType(dto.getMovementType()) + .imolRorIHSNumber(dto.getImolRorIHSNumber()) + .movementDate(LocalDateTime.parse(dto.getMovementDate())) + .facilityId(dto.getFacilityId()) + .facilityName(dto.getFacilityName()) + .facilityType(dto.getFacilityType()) + .parentFacilityId(dto.getParentFacilityId()) + .parentFacilityName(dto.getParentFacilityName()) + .parentFacilityType(dto.getParentFacilityType()) + .countryCode(dto.getCountryCode()) + .countryName(dto.getCountryName()) + .draught(dto.getDraught()) + .latitude(dto.getLatitude()) + .longitude(dto.getLongitude()) + .position(positionNode) // JsonNode로 매핑 + .parentCallId(dto.getParentCallId()) + .iso2(dto.getIso2()) + .eventStartDate(LocalDateTime.parse(dto.getEventStartDate())) + .build(); + + return entity; + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/reader/TerminalCallsReader.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/reader/TerminalCallsReader.java new file mode 100644 index 0000000..9018360 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/reader/TerminalCallsReader.java @@ -0,0 +1,213 @@ +package com.snp.batch.jobs.shipMovementTerminalCalls.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto.TerminalCallsDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * 선박 상세 정보 Reader (v2.0 - Chunk 기반) + * + * 기능: + * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) + * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 + * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 + * 4. Spring Batch가 100건씩 Process → Write 수행 + * + * Chunk 처리 흐름: + * - beforeFetch() → IMO 전체 조회 (1회) + * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) + * - read() → 1건씩 반환 (100번) + * - Processor/Writer → 100건 처리 + * - 반복... (1,718번의 Chunk) + * + * 기존 방식과의 차이: + * - 기존: 17만건 전체 메모리 로드 → Process → Write + * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) + */ +@Slf4j +@StepScope +public class TerminalCallsReader extends BaseApiReader { + + private final JdbcTemplate jdbcTemplate; + + // 배치 처리 상태 + private List allImoNumbers; + // DB 해시값을 저장할 맵 + private Map dbMasterHashes; + private int currentBatchIndex = 0; + private final int batchSize = 5; + + // @Value("#{jobParameters['startDate']}") +// private String startDate; + private String startDate = "2025-01-01"; + + // @Value("#{jobParameters['stopDate']}") +// private String stopDate; + private String stopDate = "2025-12-31"; + + public TerminalCallsReader(WebClient webClient, JdbcTemplate jdbcTemplate ) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "TerminalCalls"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + this.dbMasterHashes = null; + } + + @Override + protected String getApiPath() { + return "/Movements/TerminalCalls"; + } + + @Override + protected String getApiBaseUrl() { + return "https://webservices.maritime.spglobal.com"; + } + + private static final String GET_ALL_IMO_QUERY = +// "SELECT imo_number FROM ship_data ORDER BY id"; + "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_terminalcall) ORDER BY imo_number"; + + /** + * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 + */ + @Override + protected void beforeFetch() { + // 전처리 과정 + // Step 1. IMO 전체 번호 조회 + log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + // API 통계 초기화 + updateApiCallStats(totalBatches, 0); + } + + /** + * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 + * + * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 + * + * @return 다음 배치 100건 (더 이상 없으면 null) + */ + @Override + protected List fetchNextBatch() throws Exception { + + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + List response = callApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + + // 응답 처리 + if (response != null ) { + List terminalCalls = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, terminalCalls.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return terminalCalls; + + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + /** + * Query Parameter를 사용한 API 호출 + * + * @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") + * @return API 응답 + */ + private List callApiWithBatch(String lrno) { + String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno; + + log.debug("[{}] API 호출: {}", getReaderName(), url); + + return webClient.get() + .uri(url) + .retrieve() + .bodyToFlux(TerminalCallsDto.class) + .collectList() + .block(); + } + + @Override + protected void afterFetch(List data) { + if (data == null) { + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); + log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", + getReaderName(), allImoNumbers.size()); + } + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/repository/TerminalCallsRepository.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/repository/TerminalCallsRepository.java new file mode 100644 index 0000000..6b22b39 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/repository/TerminalCallsRepository.java @@ -0,0 +1,13 @@ +package com.snp.batch.jobs.shipMovementTerminalCalls.batch.repository; + +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity; + +import java.util.List; + +/** + * 선박 상세 정보 Repository 인터페이스 + */ + +public interface TerminalCallsRepository { + void saveAll(List entities); +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/repository/TerminalCallsRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/repository/TerminalCallsRepositoryImpl.java new file mode 100644 index 0000000..53b60ee --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/repository/TerminalCallsRepositoryImpl.java @@ -0,0 +1,152 @@ +package com.snp.batch.jobs.shipMovementTerminalCalls.batch.repository; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; + +/** + * 선박 상세 정보 Repository 구현체 + * BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현 + */ +@Slf4j +@Repository("TerminalCallsRepository") +public class TerminalCallsRepositoryImpl extends BaseJdbcRepository + implements TerminalCallsRepository { + + public TerminalCallsRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override + protected String getTableName() { + return "snp_data.t_terminalcall"; + } + + @Override + protected String getEntityName() { + return "TerminallCalls"; + } + + @Override + protected String extractId(TerminalCallsEntity entity) { + return entity.getImolRorIHSNumber(); + } + + @Override + public String getInsertSql() { + return """ + INSERT INTO snp_data.t_terminalcall( + imo, + mvmn_type, + mvmn_dt, + fclty_id, + fclty_nm, + fclty_type, + up_fclty_id, + up_fclty_nm, + up_fclty_type, + ntn_cd, + ntn_nm, + draft, + lat, + lon, + prnt_call_id, + iso2_ntn_cd, + evt_start_dt, + lcinfo + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (imo, mvmn_dt) + DO UPDATE SET + mvmn_type = EXCLUDED.mvmn_type, + mvmn_dt = EXCLUDED.mvmn_dt, + fclty_id = EXCLUDED.fclty_id, + fclty_nm = EXCLUDED.fclty_nm, + fclty_type = EXCLUDED.fclty_type, + up_fclty_id = EXCLUDED.up_fclty_id, + up_fclty_nm = EXCLUDED.up_fclty_nm, + up_fclty_type = EXCLUDED.up_fclty_type, + ntn_cd = EXCLUDED.ntn_cd, + ntn_nm = EXCLUDED.ntn_nm, + draft = EXCLUDED.draft, + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + prnt_call_id = EXCLUDED.prnt_call_id, + iso2_ntn_cd = EXCLUDED.iso2_ntn_cd, + evt_start_dt = EXCLUDED.evt_start_dt, + lcinfo = EXCLUDED.lcinfo + """; + } + + @Override + protected String getUpdateSql() { + return null; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, TerminalCallsEntity e) throws Exception { + int i = 1; + ps.setString(i++, e.getImolRorIHSNumber()); // imo + ps.setString(i++, e.getMovementType()); // mvmn_type + ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt + ps.setObject(i++, e.getFacilityId()); // fclty_id + ps.setString(i++, e.getFacilityName()); // fclty_nm + ps.setString(i++, e.getFacilityType()); // fclty_type + ps.setObject(i++, e.getParentFacilityId()); //up_fclty_id + ps.setString(i++, e.getParentFacilityName()); // up_fclty_nm + ps.setString(i++, e.getParentFacilityType()); //up_fclty_type + ps.setString(i++, e.getCountryCode()); // ntn_cd + ps.setString(i++, e.getCountryName()); // ntn_nm + setDoubleOrNull(ps, i++, e.getDraught()); // draft + setDoubleOrNull(ps, i++, e.getLatitude()); // lat + setDoubleOrNull(ps, i++, e.getLongitude());// lon + ps.setObject(i++, e.getParentCallId()); //prnt_call_id + ps.setString(i++, e.getIso2()); // iso2_ntn_cd + ps.setTimestamp(i++, e.getEventStartDate() != null ? Timestamp.valueOf(e.getEventStartDate()) : null); // evt_start_dt + + if (e.getPosition() != null) { + ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb) + } else { + ps.setNull(i++, java.sql.Types.OTHER); + } + } + + private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception { + if (value != null) { + ps.setDouble(index, value); + } else { + // java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정 + ps.setNull(index, java.sql.Types.DOUBLE); + } + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, TerminalCallsEntity entity) throws Exception { + + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + public void saveAll(List entities) { + if (entities == null || entities.isEmpty()) return; + + log.info("TerminallCalls 저장 시작 = {}건", entities.size()); + batchInsert(entities); + + } + +} diff --git a/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/writer/TerminalCallsWriter.java b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/writer/TerminalCallsWriter.java new file mode 100644 index 0000000..c5d1a2a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/shipMovementTerminalCalls/batch/writer/TerminalCallsWriter.java @@ -0,0 +1,35 @@ +package com.snp.batch.jobs.shipMovementTerminalCalls.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity; +import com.snp.batch.jobs.shipMovementTerminalCalls.batch.repository.TerminalCallsRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 선박 상세 정보 Writer + */ +@Slf4j +@Component +public class TerminalCallsWriter extends BaseWriter { + + private final TerminalCallsRepository terminalCallsRepository; + + + public TerminalCallsWriter(TerminalCallsRepository terminalCallsRepository) { + super("TerminalCalls"); + this.terminalCallsRepository = terminalCallsRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + + if (items.isEmpty()) { return; } + + terminalCallsRepository.saveAll(items); + log.info("TerminalCalls 데이터 저장: {} 건", items.size()); + } + +}