movement 배치
This commit is contained in:
부모
18fa95e903
커밋
e44637e1f3
@ -1,5 +1,6 @@
|
||||
package com.snp.batch.common.batch.repository;
|
||||
|
||||
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
@ -32,6 +32,9 @@ public class MaritimeApiWebClientConfig {
|
||||
@Value("https://aisapi.maritime.spglobal.com")
|
||||
private String maritimeAisApiUrl;
|
||||
|
||||
@Value("https://webservices.maritime.spglobal.com")
|
||||
private String maritimeWebSerivcesUrl;
|
||||
|
||||
@Value("${app.batch.ship-api.username}")
|
||||
private String maritimeApiUsername;
|
||||
|
||||
@ -79,6 +82,22 @@ public class MaritimeApiWebClientConfig {
|
||||
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean(name = "maritimeWebServicesWebClient")
|
||||
public WebClient maritimeWebServicesWebClient(){
|
||||
log.info("========================================");
|
||||
log.info("Maritime WebSerivces WebClient 생성");
|
||||
log.info("Base URL: {}", maritimeWebSerivcesUrl);
|
||||
log.info("========================================");
|
||||
|
||||
return WebClient.builder()
|
||||
.baseUrl(maritimeWebSerivcesUrl)
|
||||
.defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword))
|
||||
.codecs(configurer -> configurer
|
||||
.defaultCodecs()
|
||||
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,134 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.config;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.snp.batch.common.batch.config.BaseJobConfig;
|
||||
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto;
|
||||
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
|
||||
import com.snp.batch.jobs.shipMovement.batch.processor.ShipMovementProcessor;
|
||||
import com.snp.batch.jobs.shipMovement.batch.reader.ShipMovementReader;
|
||||
import com.snp.batch.jobs.shipMovement.batch.writer.ShipMovementWriter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
/**
|
||||
* 선박 상세 정보 Import Job Config
|
||||
*
|
||||
* 특징:
|
||||
* - ship_data 테이블에서 IMO 번호 조회
|
||||
* - IMO 번호를 100개씩 배치로 분할
|
||||
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
|
||||
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
|
||||
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
|
||||
*
|
||||
* 데이터 흐름:
|
||||
* ShipMovementReader (ship_data → Maritime API)
|
||||
* ↓ (PortCallDto)
|
||||
* ShipMovementProcessor
|
||||
* ↓ (ShipMovementEntity)
|
||||
* ShipDetailDataWriter
|
||||
* ↓ (ship_movement 테이블)
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class ShipMovementJobConfig extends BaseJobConfig<PortCallDto, ShipMovementEntity> {
|
||||
|
||||
private final ShipMovementProcessor shipMovementProcessor;
|
||||
private final ShipMovementWriter shipMovementWriter;
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final WebClient maritimeApiWebClient;
|
||||
private final ObjectMapper objectMapper; // ObjectMapper 주입 추가
|
||||
|
||||
public ShipMovementJobConfig(
|
||||
JobRepository jobRepository,
|
||||
PlatformTransactionManager transactionManager,
|
||||
ShipMovementProcessor shipMovementProcessor,
|
||||
ShipMovementWriter shipMovementWriter, JdbcTemplate jdbcTemplate,
|
||||
@Qualifier("maritimeWebServicesWebClient") WebClient maritimeApiWebClient,
|
||||
ObjectMapper objectMapper) { // ObjectMapper 주입 추가
|
||||
super(jobRepository, transactionManager);
|
||||
this.shipMovementProcessor = shipMovementProcessor;
|
||||
this.shipMovementWriter = shipMovementWriter;
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.maritimeApiWebClient = maritimeApiWebClient;
|
||||
this.objectMapper = objectMapper; // ObjectMapper 초기화
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getJobName() {
|
||||
return "shipMovementJob";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getStepName() {
|
||||
return "shipMovementStep";
|
||||
}
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public ShipMovementReader shipMovementReader(
|
||||
@Value("#{jobParameters['startDate']}") String startDate,
|
||||
@Value("#{jobParameters['stopDate']}") String stopDate) {
|
||||
LocalDate today = LocalDate.now();
|
||||
|
||||
if(startDate == null || startDate.isBlank()) {
|
||||
startDate = today.minusYears(1).plusDays(1).format(DateTimeFormatter.ISO_LOCAL_DATE);
|
||||
}
|
||||
|
||||
if(stopDate == null || stopDate.isBlank()) {
|
||||
stopDate = today.format(DateTimeFormatter.ISO_LOCAL_DATE);
|
||||
}
|
||||
|
||||
ShipMovementReader reader = new ShipMovementReader(maritimeApiWebClient, jdbcTemplate, objectMapper);
|
||||
reader.setStartDate(startDate);
|
||||
reader.setStopDate(stopDate);
|
||||
return reader;
|
||||
}
|
||||
@Override
|
||||
protected ItemReader<PortCallDto> createReader() { // 타입 변경
|
||||
// Reader 생성자 수정: ObjectMapper를 전달합니다.
|
||||
return shipMovementReader(null, null);
|
||||
//return new ShipMovementReader(maritimeApiWebClient, jdbcTemplate, objectMapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemProcessor<PortCallDto, ShipMovementEntity> createProcessor() {
|
||||
return shipMovementProcessor;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ItemWriter<ShipMovementEntity> createWriter() { // 타입 변경
|
||||
return shipMovementWriter;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getChunkSize() {
|
||||
return 50; // API에서 100개씩 가져오므로 chunk도 100으로 설정
|
||||
}
|
||||
|
||||
@Bean(name = "shipMovementJob")
|
||||
public Job shipMovementJob() {
|
||||
return job();
|
||||
}
|
||||
|
||||
@Bean(name = "shipMovementStep")
|
||||
public Step shipMovementStep() {
|
||||
return step();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class PortCallDto {
|
||||
private String movementType;
|
||||
private String imolRorIHSNumber;
|
||||
private String movementDate;
|
||||
|
||||
private Integer portCallId;
|
||||
|
||||
private Integer facilityId;
|
||||
private String facilityName;
|
||||
private String facilityType;
|
||||
|
||||
private Integer subFacilityId;
|
||||
private String subFacilityName;
|
||||
private String subFacilityType;
|
||||
|
||||
private Integer parentFacilityId;
|
||||
private String parentFacilityName;
|
||||
private String parentFacilityType;
|
||||
|
||||
private String countryCode;
|
||||
private String countryName;
|
||||
|
||||
private Double draught;
|
||||
private Double latitude;
|
||||
private Double longitude;
|
||||
|
||||
private PositionDto position;
|
||||
|
||||
private String destination;
|
||||
private String iso2;
|
||||
}
|
||||
@ -0,0 +1,17 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class PositionDto {
|
||||
private boolean isNull;
|
||||
private int stSrid;
|
||||
private double lat;
|
||||
@JsonProperty("long")
|
||||
private double lon;
|
||||
private double z;
|
||||
private double m;
|
||||
private boolean hasZ;
|
||||
private boolean hasM;
|
||||
}
|
||||
@ -0,0 +1,12 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class ShipMovementApiResponse {
|
||||
@JsonProperty("portCalls")
|
||||
List<PortCallDto> portCallList;
|
||||
}
|
||||
@ -0,0 +1,48 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.entity;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import jakarta.persistence.GeneratedValue;
|
||||
import jakarta.persistence.GenerationType;
|
||||
import jakarta.persistence.Id;
|
||||
import jakarta.persistence.SequenceGenerator;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ShipMovementEntity {
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ship_movement_id_seq")
|
||||
@SequenceGenerator(name = "ship_movement_id_seq", sequenceName = "ship_movement_id_seq", allocationSize = 1)
|
||||
private Long id;
|
||||
|
||||
private String movementType;
|
||||
private String imolRorIHSNumber;
|
||||
private LocalDateTime movementDate;
|
||||
private Integer portCallId;
|
||||
private Integer facilityId;
|
||||
private String facilityName;
|
||||
private String facilityType;
|
||||
private Integer subFacilityId;
|
||||
private String subFacilityName;
|
||||
private String subFacilityType;
|
||||
private Integer parentFacilityId;
|
||||
private String parentFacilityName;
|
||||
private String parentFacilityType;
|
||||
private String countryCode;
|
||||
private String countryName;
|
||||
private Double draught;
|
||||
private Double latitude;
|
||||
private Double longitude;
|
||||
private String destination;
|
||||
private String iso2;
|
||||
private JsonNode position;
|
||||
private String schemaType;
|
||||
}
|
||||
@ -0,0 +1,72 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.processor;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.snp.batch.common.batch.processor.BaseProcessor;
|
||||
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto;
|
||||
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* 선박 상세 정보 Processor
|
||||
* ShipDetailDto → ShipDetailEntity 변환
|
||||
*/
|
||||
|
||||
/**
|
||||
* 선박 상세 정보 Processor (해시 비교 및 증분 데이터 추출)
|
||||
* I: ShipDetailComparisonData (DB 해시 + API Map Data)
|
||||
* O: ShipDetailUpdate (변경분)
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ShipMovementProcessor extends BaseProcessor<PortCallDto, ShipMovementEntity> {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public ShipMovementProcessor(ObjectMapper objectMapper) {
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ShipMovementEntity processItem(PortCallDto dto) throws Exception {
|
||||
log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}",
|
||||
dto.getImolRorIHSNumber(), dto.getFacilityName());
|
||||
|
||||
JsonNode positionNode = null;
|
||||
if (dto.getPosition() != null) {
|
||||
// Position 객체를 JsonNode로 변환
|
||||
positionNode = objectMapper.valueToTree(dto.getPosition());
|
||||
}
|
||||
|
||||
ShipMovementEntity entity = ShipMovementEntity.builder()
|
||||
.movementType(dto.getMovementType())
|
||||
.imolRorIHSNumber(dto.getImolRorIHSNumber())
|
||||
.movementDate(LocalDateTime.parse(dto.getMovementDate()))
|
||||
.portCallId(dto.getPortCallId())
|
||||
.facilityId(dto.getFacilityId())
|
||||
.facilityName(dto.getFacilityName())
|
||||
.facilityType(dto.getFacilityType())
|
||||
.subFacilityId(dto.getSubFacilityId())
|
||||
.subFacilityName(dto.getSubFacilityName())
|
||||
.subFacilityType(dto.getSubFacilityType())
|
||||
.parentFacilityId(dto.getParentFacilityId())
|
||||
.parentFacilityName(dto.getParentFacilityName())
|
||||
.parentFacilityType(dto.getParentFacilityType())
|
||||
.countryCode(dto.getCountryCode())
|
||||
.countryName(dto.getCountryName())
|
||||
.draught(dto.getDraught())
|
||||
.latitude(dto.getLatitude())
|
||||
.longitude(dto.getLongitude())
|
||||
.destination(dto.getDestination())
|
||||
.iso2(dto.getIso2())
|
||||
.position(positionNode) // JsonNode로 매핑
|
||||
.schemaType("PORTCALL") // API 타입 구분
|
||||
.build();
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,231 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.reader;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.snp.batch.common.batch.reader.BaseApiReader;
|
||||
import com.snp.batch.common.util.JsonChangeDetector;
|
||||
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto;
|
||||
import com.snp.batch.jobs.shipMovement.batch.dto.ShipMovementApiResponse;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailApiResponse;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailComparisonData;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
|
||||
import com.snp.batch.jobs.shipdetail.batch.dto.ShipResultDto;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
|
||||
*
|
||||
* 기능:
|
||||
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
|
||||
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
|
||||
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
|
||||
* 4. Spring Batch가 100건씩 Process → Write 수행
|
||||
*
|
||||
* Chunk 처리 흐름:
|
||||
* - beforeFetch() → IMO 전체 조회 (1회)
|
||||
* - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회)
|
||||
* - read() → 1건씩 반환 (100번)
|
||||
* - Processor/Writer → 100건 처리
|
||||
* - 반복... (1,718번의 Chunk)
|
||||
*
|
||||
* 기존 방식과의 차이:
|
||||
* - 기존: 17만건 전체 메모리 로드 → Process → Write
|
||||
* - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회)
|
||||
*/
|
||||
@Slf4j
|
||||
@StepScope
|
||||
public class ShipMovementReader extends BaseApiReader<PortCallDto> {
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
// 배치 처리 상태
|
||||
private List<String> allImoNumbers;
|
||||
// DB 해시값을 저장할 맵
|
||||
private Map<String, String> dbMasterHashes;
|
||||
private int currentBatchIndex = 0;
|
||||
private final int batchSize = 50;
|
||||
|
||||
@Value("#{jobParameters['startDate']}")
|
||||
private String startDate;
|
||||
// private String startDate = "2024-01-01";
|
||||
|
||||
@Value("#{jobParameters['stopDate']}")
|
||||
private String stopDate;
|
||||
// private String stopDate = "2024-12-31";
|
||||
public void setStartDate(String startDate) {this.startDate = startDate;}
|
||||
public void setStopDate(String stopDate){this.stopDate=stopDate;}
|
||||
public ShipMovementReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
|
||||
super(webClient);
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.objectMapper = objectMapper;
|
||||
enableChunkMode(); // ✨ Chunk 모드 활성화
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getReaderName() {
|
||||
return "ShipMovementReader";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getApiPath() {
|
||||
return "/Movements";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getApiBaseUrl() {
|
||||
return "https://webservices.maritime.spglobal.com";
|
||||
}
|
||||
|
||||
private static final String GET_ALL_IMO_QUERY =
|
||||
"SELECT imo_number FROM ship_data ORDER BY id";
|
||||
|
||||
private static final String FETCH_ALL_HASHES_QUERY =
|
||||
"SELECT imo_number, ship_detail_hash FROM ship_detail_hash_json ORDER BY imo_number";
|
||||
|
||||
/**
|
||||
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
|
||||
*/
|
||||
@Override
|
||||
protected void beforeFetch() {
|
||||
// 전처리 과정
|
||||
// Step 1. IMO 전체 번호 조회
|
||||
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
|
||||
|
||||
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
|
||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||
|
||||
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
|
||||
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
|
||||
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
|
||||
|
||||
/* // Step 2. 전 배치 결과 imo_number, ship_detail_json, ship_detail_hash 데이터 전체 조회
|
||||
log.info("[{}] DB Master Hash 전체 조회 시작...", getReaderName());
|
||||
|
||||
// 1-1. DB에서 모든 IMO와 Hash 조회
|
||||
dbMasterHashes = jdbcTemplate.query(FETCH_ALL_HASHES_QUERY, rs -> {
|
||||
Map<String, String> map = new HashMap<>();
|
||||
while (rs.next()) {
|
||||
map.put(rs.getString("imo_number"), rs.getString("ship_detail_hash"));
|
||||
}
|
||||
return map;
|
||||
});
|
||||
|
||||
log.info("[{}] DB Master Hash 조회 완료. 총 {}건.", getReaderName(), dbMasterHashes.size());*/
|
||||
|
||||
// API 통계 초기화
|
||||
updateApiCallStats(totalBatches, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
|
||||
*
|
||||
* Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출
|
||||
*
|
||||
* @return 다음 배치 100건 (더 이상 없으면 null)
|
||||
*/
|
||||
@Override
|
||||
protected List<PortCallDto> fetchNextBatch() throws Exception {
|
||||
|
||||
// 모든 배치 처리 완료 확인
|
||||
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
|
||||
return null; // Job 종료
|
||||
}
|
||||
|
||||
// 현재 배치의 시작/끝 인덱스 계산
|
||||
int startIndex = currentBatchIndex;
|
||||
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
|
||||
|
||||
// 현재 배치의 IMO 번호 추출 (100개)
|
||||
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
|
||||
|
||||
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
|
||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||
|
||||
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
|
||||
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
|
||||
|
||||
try {
|
||||
// IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...")
|
||||
String imoParam = String.join(",", currentBatch);
|
||||
|
||||
// API 호출
|
||||
ShipMovementApiResponse response = callApiWithBatch(imoParam);
|
||||
|
||||
// 다음 배치로 인덱스 이동
|
||||
currentBatchIndex = endIndex;
|
||||
|
||||
|
||||
// 응답 처리
|
||||
if (response != null && response.getPortCallList() != null) {
|
||||
List<PortCallDto> portCalls = response.getPortCallList();
|
||||
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
|
||||
getReaderName(), currentBatchNumber, totalBatches, portCalls.size());
|
||||
|
||||
// API 호출 통계 업데이트
|
||||
updateApiCallStats(totalBatches, currentBatchNumber);
|
||||
|
||||
// API 과부하 방지 (다음 배치 전 0.5초 대기)
|
||||
if (currentBatchIndex < allImoNumbers.size()) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
return portCalls;
|
||||
|
||||
} else {
|
||||
log.warn("[{}] 배치 {}/{} 응답 없음",
|
||||
getReaderName(), currentBatchNumber, totalBatches);
|
||||
|
||||
// API 호출 통계 업데이트 (실패도 카운트)
|
||||
updateApiCallStats(totalBatches, currentBatchNumber);
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
|
||||
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
|
||||
|
||||
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
|
||||
currentBatchIndex = endIndex;
|
||||
|
||||
// 빈 리스트 반환 (Job 계속 진행)
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Query Parameter를 사용한 API 호출
|
||||
*
|
||||
* @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...")
|
||||
* @return API 응답
|
||||
*/
|
||||
private ShipMovementApiResponse callApiWithBatch(String lrno) {
|
||||
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
|
||||
|
||||
log.debug("[{}] API 호출: {}", getReaderName(), url);
|
||||
|
||||
return webClient.get()
|
||||
.uri(url)
|
||||
.retrieve()
|
||||
.bodyToMono(ShipMovementApiResponse.class)
|
||||
.block();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterFetch(List<PortCallDto> data) {
|
||||
if (data == null) {
|
||||
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
|
||||
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
|
||||
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
|
||||
getReaderName(), allImoNumbers.size());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,18 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.repository;
|
||||
|
||||
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 선박 상세 정보 Repository 인터페이스
|
||||
*/
|
||||
|
||||
public interface ShipMovementRepository {
|
||||
|
||||
void saveAll(List<ShipMovementEntity> entities);
|
||||
|
||||
boolean existsByPortCallId(Integer portCallId);
|
||||
}
|
||||
@ -0,0 +1,252 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.repository;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
|
||||
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 선박 상세 정보 Repository 구현체
|
||||
* BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현
|
||||
*/
|
||||
@Slf4j
|
||||
@Repository("ShipMovementRepository")
|
||||
public class ShipMovementRepositoryImpl extends BaseJdbcRepository<ShipMovementEntity, String>
|
||||
implements ShipMovementRepository {
|
||||
|
||||
public ShipMovementRepositoryImpl(JdbcTemplate jdbcTemplate) {
|
||||
super(jdbcTemplate);
|
||||
}
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
@Override
|
||||
protected String getTableName() {
|
||||
return "snp_data.t_ship_stpov_info";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getEntityName() {
|
||||
return "ShipMovement";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractId(ShipMovementEntity entity) {
|
||||
return entity.getImolRorIHSNumber();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInsertSql() {
|
||||
return """
|
||||
INSERT INTO snp_data.t_ship_stpov_info(
|
||||
imo,
|
||||
mvmn_type,
|
||||
mvmn_dt,
|
||||
stpov_id,
|
||||
fclty_id,
|
||||
fclty_nm,
|
||||
fclty_type,
|
||||
lwrnk_fclty_id,
|
||||
lwrnk_fclty_nm,
|
||||
lwrnk_fclty_type,
|
||||
up_fclty_id,
|
||||
up_fclty_nm,
|
||||
up_fclty_type,
|
||||
ntn_cd,
|
||||
ntn_nm,
|
||||
draft,
|
||||
lat,
|
||||
lon,
|
||||
dstn,
|
||||
iso2_ntn_cd,
|
||||
lcinfo
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT (imo, mvmn_dt)
|
||||
DO UPDATE SET
|
||||
mvmn_type = EXCLUDED.mvmn_type,
|
||||
mvmn_dt = EXCLUDED.mvmn_dt,
|
||||
stpov_id = EXCLUDED.stpov_id,
|
||||
fclty_id = EXCLUDED.fclty_id,
|
||||
fclty_nm = EXCLUDED.fclty_nm,
|
||||
fclty_type = EXCLUDED.fclty_type,
|
||||
lwrnk_fclty_id = EXCLUDED.lwrnk_fclty_id,
|
||||
lwrnk_fclty_nm = EXCLUDED.lwrnk_fclty_nm,
|
||||
lwrnk_fclty_type = EXCLUDED.lwrnk_fclty_type,
|
||||
up_fclty_id = EXCLUDED.up_fclty_id,
|
||||
up_fclty_nm = EXCLUDED.up_fclty_nm,
|
||||
up_fclty_type = EXCLUDED.up_fclty_type,
|
||||
ntn_cd = EXCLUDED.ntn_cd,
|
||||
draft = EXCLUDED.draft,
|
||||
lat = EXCLUDED.lat,
|
||||
lon = EXCLUDED.lon,
|
||||
dstn = EXCLUDED.dstn,
|
||||
iso2_ntn_cd = EXCLUDED.iso2_ntn_cd,
|
||||
lcinfo = EXCLUDED.lcinfo
|
||||
""";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getUpdateSql() {
|
||||
return """
|
||||
UPDATE snp_data.t_ship_stpov_info
|
||||
SET vesselid = ?,
|
||||
maritimemobileserviceidentitymmsinumber = ?,
|
||||
shipname = ?,
|
||||
callsign = ?,
|
||||
flagname = ?,
|
||||
portofregistry = ?,
|
||||
classificationsociety = ?,
|
||||
shiptypelevel5 = ?,
|
||||
shiptypelevel5subtype = ?,
|
||||
yearofbuild = ?,
|
||||
shipbuilder = ?,
|
||||
lengthoverallloa = ?,
|
||||
breadthmoulded = ?,
|
||||
"depth" = ?,
|
||||
draught = ?,
|
||||
grosstonnage = ?,
|
||||
deadweight = ?,
|
||||
teu = ?,
|
||||
speedservice = ?,
|
||||
mainenginetype = ?,
|
||||
batch_flag = 'N'::character varying,
|
||||
status = ?,
|
||||
operator = ?,
|
||||
flagcode = ?,
|
||||
shiptypelevel2 = ?
|
||||
WHERE ihslrorimoshipno = ?
|
||||
""";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setInsertParameters(PreparedStatement ps, ShipMovementEntity e) throws Exception {
|
||||
int i = 1;
|
||||
ps.setString(i++, e.getImolRorIHSNumber()); // imo
|
||||
ps.setString(i++, e.getMovementType()); // mvmn_type
|
||||
ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt
|
||||
ps.setObject(i++, e.getPortCallId()); // stpov_id
|
||||
// stpov_type는 'PORTCALL'로 하드코딩되었으므로 세팅 안함
|
||||
ps.setObject(i++, e.getFacilityId()); // fclty_id
|
||||
ps.setString(i++, e.getFacilityName()); // fclty_nm
|
||||
ps.setString(i++, e.getFacilityType()); // fclty_type
|
||||
ps.setObject(i++, e.getSubFacilityId()); // lwrnk_fclty_id
|
||||
ps.setString(i++, e.getSubFacilityName()); // lwrnk_fclty_nm
|
||||
ps.setString(i++, e.getSubFacilityType()); // lwrnk_fclty_type
|
||||
ps.setObject(i++, e.getParentFacilityId()); // up_fclty_id
|
||||
ps.setString(i++, e.getParentFacilityName()); // up_fclty_nm
|
||||
ps.setString(i++, e.getParentFacilityType()); // up_fclty_type
|
||||
ps.setString(i++, e.getCountryCode()); // ntn_cd
|
||||
ps.setString(i++, e.getCountryName()); // ntn_nm
|
||||
setDoubleOrNull(ps, i++, e.getDraught()); // draft
|
||||
setDoubleOrNull(ps, i++, e.getLatitude()); // lat
|
||||
setDoubleOrNull(ps, i++, e.getLongitude());// lon
|
||||
ps.setString(i++, e.getDestination()); // dstn
|
||||
ps.setString(i++, e.getIso2()); // iso2_ntn_cd
|
||||
|
||||
if (e.getPosition() != null) {
|
||||
ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb)
|
||||
} else {
|
||||
ps.setNull(i++, java.sql.Types.OTHER);
|
||||
}
|
||||
|
||||
// ps.setString(i++, e.getSchemaType());
|
||||
|
||||
}
|
||||
|
||||
private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception {
|
||||
if (value != null) {
|
||||
ps.setDouble(index, value);
|
||||
} else {
|
||||
// java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정
|
||||
ps.setNull(index, java.sql.Types.DOUBLE);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUpdateParameters(PreparedStatement ps, ShipMovementEntity entity) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RowMapper<ShipMovementEntity> getRowMapper() {
|
||||
return new ShipMovementRowMapper();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveAll(List<ShipMovementEntity> entities) {
|
||||
if (entities == null || entities.isEmpty()) return;
|
||||
|
||||
log.info("ShipMovement 저장 시작 = {}건", entities.size());
|
||||
batchInsert(entities);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean existsByPortCallId(Integer portCallId) {
|
||||
String sql = """
|
||||
SELECT COUNT(1)
|
||||
FROM ship_movement
|
||||
WHERE portCallId = ?
|
||||
""";
|
||||
|
||||
Integer count = jdbcTemplate.queryForObject(sql, Integer.class, portCallId);
|
||||
return count != null && count > 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* ShipDetailEntity RowMapper
|
||||
*/
|
||||
private static class ShipMovementRowMapper implements RowMapper<ShipMovementEntity> {
|
||||
@Override
|
||||
public ShipMovementEntity mapRow(ResultSet rs, int rowNum) throws SQLException {
|
||||
ShipMovementEntity entity = ShipMovementEntity.builder()
|
||||
.id(rs.getLong("id"))
|
||||
.imolRorIHSNumber(rs.getString("imolRorIHSNumber"))
|
||||
.portCallId(rs.getObject("portCallId", Integer.class))
|
||||
.facilityId(rs.getObject("facilityId", Integer.class))
|
||||
.facilityName(rs.getString("facilityName"))
|
||||
.facilityType(rs.getString("facilityType"))
|
||||
.subFacilityId(rs.getObject("subFacilityId", Integer.class))
|
||||
.subFacilityName(rs.getString("subFacilityName"))
|
||||
.subFacilityType(rs.getString("subFacilityType"))
|
||||
.parentFacilityId(rs.getObject("parentFacilityId", Integer.class))
|
||||
.parentFacilityName(rs.getString("parentFacilityName"))
|
||||
.parentFacilityType(rs.getString("parentFacilityType"))
|
||||
.countryCode(rs.getString("countryCode"))
|
||||
.countryName(rs.getString("countryName"))
|
||||
.draught(rs.getObject("draught", Double.class))
|
||||
.latitude(rs.getObject("latitude", Double.class))
|
||||
.longitude(rs.getObject("longitude", Double.class))
|
||||
.destination(rs.getString("destination"))
|
||||
.iso2(rs.getString("iso2"))
|
||||
.position(parseJson(rs.getString("position")))
|
||||
.schemaType(rs.getString("schemaType"))
|
||||
.build();
|
||||
|
||||
Timestamp movementDate = rs.getTimestamp("movementDate");
|
||||
if (movementDate != null) {
|
||||
entity.setMovementDate(movementDate.toLocalDateTime());
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
private JsonNode parseJson(String json) {
|
||||
try {
|
||||
if (json == null) return null;
|
||||
return new com.fasterxml.jackson.databind.ObjectMapper().readTree(json);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("JSON 파싱 오류: " + json);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,40 @@
|
||||
package com.snp.batch.jobs.shipMovement.batch.writer;
|
||||
|
||||
import com.snp.batch.common.batch.writer.BaseWriter;
|
||||
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
|
||||
import com.snp.batch.jobs.shipMovement.batch.repository.ShipMovementRepository;
|
||||
import com.snp.batch.jobs.shipdetail.batch.repository.ShipDetailRepository;
|
||||
import com.snp.batch.jobs.shipdetail.batch.repository.ShipHashRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 선박 상세 정보 Writer
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ShipMovementWriter extends BaseWriter<ShipMovementEntity> {
|
||||
|
||||
private final ShipMovementRepository shipMovementRepository;
|
||||
|
||||
|
||||
public ShipMovementWriter(ShipDetailRepository shipDetailRepository, ShipHashRepository shipHashRepository, ShipMovementRepository shipMovementRepositoryy) {
|
||||
super("ShipMovement");
|
||||
this.shipMovementRepository = shipMovementRepositoryy;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeItems(List<ShipMovementEntity> items) throws Exception {
|
||||
|
||||
if (items.isEmpty()) { return; }
|
||||
|
||||
log.info("선박 상세 정보 데이터 저장: {} 건", items.size());
|
||||
|
||||
shipMovementRepository.saveAll(items);
|
||||
log.info("선박 상세 정보 및 해시 데이터 저장 완료: {} 건", items.size());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
불러오는 중...
Reference in New Issue
Block a user