Destination, Transits, CurrentlyAt 증분Job

This commit is contained in:
Kim JiMyeung 2025-12-08 17:47:30 +09:00
부모 34ce85f33f
커밋 6c98ebc24f
38개의 변경된 파일1960개의 추가작업 그리고 66개의 파일을 삭제

파일 보기

@ -0,0 +1,103 @@
package com.snp.batch.jobs.shipCurrentlyAt.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipCurrentlyAt.batch.dto.CurrentlyAtDto;
import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity;
import com.snp.batch.jobs.shipCurrentlyAt.batch.processor.CurrentlyAtProcessor;
import com.snp.batch.jobs.shipCurrentlyAt.batch.reader.CurrentlyAtReader;
import com.snp.batch.jobs.shipCurrentlyAt.batch.writer.CurrentlyAtWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* CurrentlyAtReader (ship_data Maritime API)
* (CurrentlyAtDto)
* CurrentlyAtProcessor
* (CurrentlyAtEntity)
* CurrentlyAtWriter
* (currentlyat 테이블)
*/
@Slf4j
@Configuration
public class CurrentlyAtJobConfig extends BaseJobConfig<CurrentlyAtDto, CurrentlyAtEntity> {
private final CurrentlyAtProcessor currentlyAtProcessor;
private final CurrentlyAtWriter currentlyAtWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public CurrentlyAtJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
CurrentlyAtProcessor currentlyAtProcessor,
CurrentlyAtWriter currentlyAtWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.currentlyAtProcessor = currentlyAtProcessor;
this.currentlyAtWriter = currentlyAtWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "CurrentlyAtImportJob";
}
@Override
protected String getStepName() {
return "CurrentlyAtImportStep";
}
@Override
protected ItemReader<CurrentlyAtDto> createReader() { // 타입 변경
return new CurrentlyAtReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<CurrentlyAtDto, CurrentlyAtEntity> createProcessor() {
return currentlyAtProcessor;
}
@Override
protected ItemWriter<CurrentlyAtEntity> createWriter() { // 타입 변경
return currentlyAtWriter;
}
@Override
protected int getChunkSize() {
return 50; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "CurrentlyAtImportJob")
public Job currentlyAtImportJob() {
return job();
}
@Bean(name = "CurrentlyAtImportStep")
public Step currentlyAtImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,37 @@
package com.snp.batch.jobs.shipCurrentlyAt.batch.dto;
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsPositionDto;
import lombok.Data;
@Data
public class CurrentlyAtDto {
private String movementType;
private String imolRorIHSNumber;
private String movementDate;
private Integer portCallId;
private Integer facilityId;
private String facilityName;
private String facilityType;
private Integer subFacilityId;
private String subFacilityName;
private String subFacilityType;
private Integer parentFacilityId;
private String parentFacilityName;
private String parentFacilityType;
private String countryCode;
private String countryName;
private Double draught;
private Double latitude;
private Double longitude;
private PortCallsPositionDto position;
private String destination;
private String iso2;
}

파일 보기

@ -0,0 +1,17 @@
package com.snp.batch.jobs.shipCurrentlyAt.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class CurrentlyAtPositionDto {
private boolean isNull;
private int stSrid;
private double lat;
@JsonProperty("long")
private double lon;
private double z;
private double m;
private boolean hasZ;
private boolean hasM;
}

파일 보기

@ -0,0 +1,41 @@
package com.snp.batch.jobs.shipCurrentlyAt.batch.entity;
import com.fasterxml.jackson.databind.JsonNode;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.SequenceGenerator;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class CurrentlyAtEntity {
private String movementType;
private String imolRorIHSNumber;
private LocalDateTime movementDate;
private Integer portCallId;
private Integer facilityId;
private String facilityName;
private String facilityType;
private Integer subFacilityId;
private String subFacilityName;
private String subFacilityType;
private Integer parentFacilityId;
private String parentFacilityName;
private String parentFacilityType;
private String countryCode;
private String countryName;
private Double draught;
private Double latitude;
private Double longitude;
private String destination;
private String iso2;
private JsonNode position;
}

파일 보기

@ -0,0 +1,71 @@
package com.snp.batch.jobs.shipCurrentlyAt.batch.processor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.shipCurrentlyAt.batch.dto.CurrentlyAtDto;
import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 선박 상세 정보 Processor
* ShipDetailDto ShipDetailEntity 변환
*/
/**
* 선박 상세 정보 Processor (해시 비교 증분 데이터 추출)
* I: ShipDetailComparisonData (DB 해시 + API Map Data)
* O: ShipDetailUpdate (변경분)
*/
@Slf4j
@Component
public class CurrentlyAtProcessor extends BaseProcessor<CurrentlyAtDto, CurrentlyAtEntity> {
private final ObjectMapper objectMapper;
public CurrentlyAtProcessor(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
protected CurrentlyAtEntity processItem(CurrentlyAtDto dto) throws Exception {
log.debug("Currently 정보 처리 시작: imoNumber={}, facilityName={}",
dto.getImolRorIHSNumber(), dto.getFacilityName());
JsonNode positionNode = null;
if (dto.getPosition() != null) {
// Position 객체를 JsonNode로 변환
positionNode = objectMapper.valueToTree(dto.getPosition());
}
CurrentlyAtEntity entity = CurrentlyAtEntity.builder()
.movementType(dto.getMovementType())
.imolRorIHSNumber(dto.getImolRorIHSNumber())
.movementDate(LocalDateTime.parse(dto.getMovementDate()))
.portCallId(dto.getPortCallId())
.facilityId(dto.getFacilityId())
.facilityName(dto.getFacilityName())
.facilityType(dto.getFacilityType())
.subFacilityId(dto.getSubFacilityId())
.subFacilityName(dto.getSubFacilityName())
.subFacilityType(dto.getSubFacilityType())
.parentFacilityId(dto.getParentFacilityId())
.parentFacilityName(dto.getParentFacilityName())
.parentFacilityType(dto.getParentFacilityType())
.countryCode(dto.getCountryCode())
.countryName(dto.getCountryName())
.draught(dto.getDraught())
.latitude(dto.getLatitude())
.longitude(dto.getLongitude())
.destination(dto.getDestination())
.iso2(dto.getIso2())
.position(positionNode) // JsonNode로 매핑
.build();
return entity;
}
}

파일 보기

@ -0,0 +1,211 @@
package com.snp.batch.jobs.shipCurrentlyAt.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipCurrentlyAt.batch.dto.CurrentlyAtDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
* <p>
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
* <p>
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
* <p>
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class CurrentlyAtReader extends BaseApiReader<CurrentlyAtDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private int currentBatchIndex = 0;
private final int batchSize = 10;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
// private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
// private String stopDate = "2024-12-31";
public CurrentlyAtReader(WebClient webClient, JdbcTemplate jdbcTemplate) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "CurrentlyAtReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/Movements/CurrentlyAt";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_currentlyat) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
* <p>
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<CurrentlyAtDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<CurrentlyAtDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null) {
List<CurrentlyAtDto> portCalls = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, portCalls.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return portCalls;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<CurrentlyAtDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(CurrentlyAtDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<CurrentlyAtDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -0,0 +1,13 @@
package com.snp.batch.jobs.shipCurrentlyAt.batch.repository;
import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity;
import java.util.List;
/**
* 선박 상세 정보 Repository 인터페이스
*/
public interface CurrentlyAtRepository {
void saveAll(List<CurrentlyAtEntity> entities);
}

파일 보기

@ -0,0 +1,211 @@
package com.snp.batch.jobs.shipCurrentlyAt.batch.repository;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
import com.snp.batch.jobs.shipMovement.batch.repository.ShipMovementRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.List;
/**
* 선박 상세 정보 Repository 구현체
* BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현
*/
@Slf4j
@Repository("CurrentlyAtRepository")
public class CurrentlyAtRepositoryImpl extends BaseJdbcRepository<CurrentlyAtEntity, String>
implements CurrentlyAtRepository {
public CurrentlyAtRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate);
}
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
protected String getTableName() {
return "snp_data.t_currentlyat";
}
@Override
protected String getEntityName() {
return "CurrentlyAt";
}
@Override
protected String extractId(CurrentlyAtEntity entity) {
return entity.getImolRorIHSNumber();
}
@Override
public String getInsertSql() {
return """
INSERT INTO snp_data.t_currentlyat(
imo,
mvmn_type,
mvmn_dt,
stpov_id,
fclty_id,
fclty_nm,
fclty_type,
lwrnk_fclty_id,
lwrnk_fclty_nm,
lwrnk_fclty_type,
up_fclty_id,
up_fclty_nm,
up_fclty_type,
ntn_cd,
ntn_nm,
draft,
lat,
lon,
dstn,
iso2_ntn_cd,
lcinfo
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (imo, mvmn_dt)
DO UPDATE SET
mvmn_type = EXCLUDED.mvmn_type,
mvmn_dt = EXCLUDED.mvmn_dt,
stpov_id = EXCLUDED.stpov_id,
fclty_id = EXCLUDED.fclty_id,
fclty_nm = EXCLUDED.fclty_nm,
fclty_type = EXCLUDED.fclty_type,
lwrnk_fclty_id = EXCLUDED.lwrnk_fclty_id,
lwrnk_fclty_nm = EXCLUDED.lwrnk_fclty_nm,
lwrnk_fclty_type = EXCLUDED.lwrnk_fclty_type,
up_fclty_id = EXCLUDED.up_fclty_id,
up_fclty_nm = EXCLUDED.up_fclty_nm,
up_fclty_type = EXCLUDED.up_fclty_type,
ntn_cd = EXCLUDED.ntn_cd,
ntn_nm = EXCLUDED.ntn_nm,
draft = EXCLUDED.draft,
lat = EXCLUDED.lat,
lon = EXCLUDED.lon,
dstn = EXCLUDED.dstn,
iso2_ntn_cd = EXCLUDED.iso2_ntn_cd,
lcinfo = EXCLUDED.lcinfo
""";
}
@Override
protected String getUpdateSql() {
return null;
}
@Override
protected void setInsertParameters(PreparedStatement ps, CurrentlyAtEntity e) throws Exception {
int i = 1;
ps.setString(i++, e.getImolRorIHSNumber()); // imo
ps.setString(i++, e.getMovementType()); // mvmn_type
ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt
ps.setObject(i++, e.getPortCallId()); // stpov_id
ps.setObject(i++, e.getFacilityId()); // fclty_id
ps.setString(i++, e.getFacilityName()); // fclty_nm
ps.setString(i++, e.getFacilityType()); // fclty_type
ps.setObject(i++, e.getSubFacilityId()); // lwrnk_fclty_id
ps.setString(i++, e.getSubFacilityName()); // lwrnk_fclty_nm
ps.setString(i++, e.getSubFacilityType()); // lwrnk_fclty_type
ps.setObject(i++, e.getParentFacilityId()); // up_fclty_id
ps.setString(i++, e.getParentFacilityName()); // up_fclty_nm
ps.setString(i++, e.getParentFacilityType()); // up_fclty_type
ps.setString(i++, e.getCountryCode()); // ntn_cd
ps.setString(i++, e.getCountryName()); // ntn_nm
setDoubleOrNull(ps, i++, e.getDraught()); // draft
setDoubleOrNull(ps, i++, e.getLatitude()); // lat
setDoubleOrNull(ps, i++, e.getLongitude());// lon
ps.setString(i++, e.getDestination()); // dstn
ps.setString(i++, e.getIso2()); // iso2_ntn_cd
if (e.getPosition() != null) {
ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb)
} else {
ps.setNull(i++, java.sql.Types.OTHER);
}
// ps.setString(i++, e.getSchemaType());
}
private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception {
if (value != null) {
ps.setDouble(index, value);
} else {
// java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정
ps.setNull(index, java.sql.Types.DOUBLE);
}
}
@Override
protected void setUpdateParameters(PreparedStatement ps, CurrentlyAtEntity entity) throws Exception {
}
@Override
protected RowMapper<CurrentlyAtEntity> getRowMapper() {
return null;
}
@Override
public void saveAll(List<CurrentlyAtEntity> entities) {
if (entities == null || entities.isEmpty()) return;
log.info("CurrentltAt 저장 시작 = {}건", entities.size());
batchInsert(entities);
}
/*private static class ShipMovementRowMapper implements RowMapper<ShipMovementEntity> {
@Override
public ShipMovementEntity mapRow(ResultSet rs, int rowNum) throws SQLException {
ShipMovementEntity entity = ShipMovementEntity.builder()
.id(rs.getLong("id"))
.imolRorIHSNumber(rs.getString("imolRorIHSNumber"))
.portCallId(rs.getObject("portCallId", Integer.class))
.facilityId(rs.getObject("facilityId", Integer.class))
.facilityName(rs.getString("facilityName"))
.facilityType(rs.getString("facilityType"))
.subFacilityId(rs.getObject("subFacilityId", Integer.class))
.subFacilityName(rs.getString("subFacilityName"))
.subFacilityType(rs.getString("subFacilityType"))
.parentFacilityId(rs.getObject("parentFacilityId", Integer.class))
.parentFacilityName(rs.getString("parentFacilityName"))
.parentFacilityType(rs.getString("parentFacilityType"))
.countryCode(rs.getString("countryCode"))
.countryName(rs.getString("countryName"))
.draught(rs.getObject("draught", Double.class))
.latitude(rs.getObject("latitude", Double.class))
.longitude(rs.getObject("longitude", Double.class))
.destination(rs.getString("destination"))
.iso2(rs.getString("iso2"))
.position(parseJson(rs.getString("position")))
.schemaType(rs.getString("schemaType"))
.build();
Timestamp movementDate = rs.getTimestamp("movementDate");
if (movementDate != null) {
entity.setMovementDate(movementDate.toLocalDateTime());
}
return entity;
}
private JsonNode parseJson(String json) {
try {
if (json == null) return null;
return new ObjectMapper().readTree(json);
} catch (Exception e) {
throw new RuntimeException("JSON 파싱 오류: " + json);
}
}
}*/
}

파일 보기

@ -0,0 +1,36 @@
package com.snp.batch.jobs.shipCurrentlyAt.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.shipCurrentlyAt.batch.entity.CurrentlyAtEntity;
import com.snp.batch.jobs.shipCurrentlyAt.batch.repository.CurrentlyAtRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 선박 상세 정보 Writer
*/
@Slf4j
@Component
public class CurrentlyAtWriter extends BaseWriter<CurrentlyAtEntity> {
private final CurrentlyAtRepository currentlyAtRepository;
public CurrentlyAtWriter(CurrentlyAtRepository currentlyAtRepository) {
super("CurrentlyAt");
this.currentlyAtRepository = currentlyAtRepository;
}
@Override
protected void writeItems(List<CurrentlyAtEntity> items) throws Exception {
if (items.isEmpty()) { return; }
currentlyAtRepository.saveAll(items);
log.info("CurrentlyAt 데이터 저장: {} 건", items.size());
}
}

파일 보기

@ -2,7 +2,7 @@ package com.snp.batch.jobs.shipMovement.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto; import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsDto;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
import com.snp.batch.jobs.shipMovement.batch.processor.ShipMovementProcessor; import com.snp.batch.jobs.shipMovement.batch.processor.ShipMovementProcessor;
import com.snp.batch.jobs.shipMovement.batch.reader.ShipMovementReader; import com.snp.batch.jobs.shipMovement.batch.reader.ShipMovementReader;
@ -47,7 +47,7 @@ import java.time.format.DateTimeFormatter;
@Slf4j @Slf4j
@Configuration @Configuration
public class ShipMovementJobConfig extends BaseJobConfig<PortCallDto, ShipMovementEntity> { public class ShipMovementJobConfig extends BaseJobConfig<PortCallsDto, ShipMovementEntity> {
private final ShipMovementProcessor shipMovementProcessor; private final ShipMovementProcessor shipMovementProcessor;
private final ShipMovementWriter shipMovementWriter; private final ShipMovementWriter shipMovementWriter;
@ -101,14 +101,14 @@ public class ShipMovementJobConfig extends BaseJobConfig<PortCallDto, ShipMoveme
return reader; return reader;
} }
@Override @Override
protected ItemReader<PortCallDto> createReader() { // 타입 변경 protected ItemReader<PortCallsDto> createReader() { // 타입 변경
// Reader 생성자 수정: ObjectMapper를 전달합니다. // Reader 생성자 수정: ObjectMapper를 전달합니다.
return shipMovementReader(null, null); return shipMovementReader(null, null);
//return new ShipMovementReader(maritimeApiWebClient, jdbcTemplate, objectMapper); //return new ShipMovementReader(maritimeApiWebClient, jdbcTemplate, objectMapper);
} }
@Override @Override
protected ItemProcessor<PortCallDto, ShipMovementEntity> createProcessor() { protected ItemProcessor<PortCallsDto, ShipMovementEntity> createProcessor() {
return shipMovementProcessor; return shipMovementProcessor;
} }

파일 보기

@ -3,7 +3,7 @@ package com.snp.batch.jobs.shipMovement.batch.dto;
import lombok.Data; import lombok.Data;
@Data @Data
public class PortCallDto { public class PortCallsDto {
private String movementType; private String movementType;
private String imolRorIHSNumber; private String imolRorIHSNumber;
private String movementDate; private String movementDate;
@ -29,7 +29,7 @@ public class PortCallDto {
private Double latitude; private Double latitude;
private Double longitude; private Double longitude;
private PositionDto position; private PortCallsPositionDto position;
private String destination; private String destination;
private String iso2; private String iso2;

파일 보기

@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data; import lombok.Data;
@Data @Data
public class PositionDto { public class PortCallsPositionDto {
private boolean isNull; private boolean isNull;
private int stSrid; private int stSrid;
private double lat; private double lat;

파일 보기

@ -8,5 +8,5 @@ import java.util.List;
@Data @Data
public class ShipMovementApiResponse { public class ShipMovementApiResponse {
@JsonProperty("portCalls") @JsonProperty("portCalls")
List<PortCallDto> portCallList; List<PortCallsDto> portCallList;
} }

파일 보기

@ -3,7 +3,7 @@ package com.snp.batch.jobs.shipMovement.batch.processor;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.processor.BaseProcessor; import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto; import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsDto;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -22,7 +22,7 @@ import java.time.LocalDateTime;
*/ */
@Slf4j @Slf4j
@Component @Component
public class ShipMovementProcessor extends BaseProcessor<PortCallDto, ShipMovementEntity> { public class ShipMovementProcessor extends BaseProcessor<PortCallsDto, ShipMovementEntity> {
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
@ -31,7 +31,7 @@ public class ShipMovementProcessor extends BaseProcessor<PortCallDto, ShipMoveme
} }
@Override @Override
protected ShipMovementEntity processItem(PortCallDto dto) throws Exception { protected ShipMovementEntity processItem(PortCallsDto dto) throws Exception {
log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}",
dto.getImolRorIHSNumber(), dto.getFacilityName()); dto.getImolRorIHSNumber(), dto.getFacilityName());

파일 보기

@ -1,15 +1,9 @@
package com.snp.batch.jobs.shipMovement.batch.reader; package com.snp.batch.jobs.shipMovement.batch.reader;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.common.util.JsonChangeDetector; import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsDto;
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto;
import com.snp.batch.jobs.shipMovement.batch.dto.ShipMovementApiResponse; import com.snp.batch.jobs.shipMovement.batch.dto.ShipMovementApiResponse;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailApiResponse;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailComparisonData;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailDto;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipResultDto;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -40,7 +34,7 @@ import java.util.*;
*/ */
@Slf4j @Slf4j
@StepScope @StepScope
public class ShipMovementReader extends BaseApiReader<PortCallDto> { public class ShipMovementReader extends BaseApiReader<PortCallsDto> {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
@ -125,7 +119,7 @@ public class ShipMovementReader extends BaseApiReader<PortCallDto> {
* @return 다음 배치 100건 ( 이상 없으면 null) * @return 다음 배치 100건 ( 이상 없으면 null)
*/ */
@Override @Override
protected List<PortCallDto> fetchNextBatch() throws Exception { protected List<PortCallsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인 // 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
@ -158,7 +152,7 @@ public class ShipMovementReader extends BaseApiReader<PortCallDto> {
// 응답 처리 // 응답 처리
if (response != null && response.getPortCallList() != null) { if (response != null && response.getPortCallList() != null) {
List<PortCallDto> portCalls = response.getPortCallList(); List<PortCallsDto> portCalls = response.getPortCallList();
log.info("[{}] 배치 {}/{} 완료: {} 건 조회", log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, portCalls.size()); getReaderName(), currentBatchNumber, totalBatches, portCalls.size());
@ -213,7 +207,7 @@ public class ShipMovementReader extends BaseApiReader<PortCallDto> {
} }
@Override @Override
protected void afterFetch(List<PortCallDto> data) { protected void afterFetch(List<PortCallsDto> data) {
if (data == null) { if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);

파일 보기

@ -1,12 +1,6 @@
package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.config; package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallDto;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
import com.snp.batch.jobs.shipMovement.batch.processor.ShipMovementProcessor;
import com.snp.batch.jobs.shipMovement.batch.reader.ShipMovementReader;
import com.snp.batch.jobs.shipMovement.batch.writer.ShipMovementWriter;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.processor.AnchorageCallsProcessor; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.processor.AnchorageCallsProcessor;
@ -15,22 +9,17 @@ import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.writer.AnchorageCalls
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
/** /**
* 선박 상세 정보 Import Job Config * 선박 상세 정보 Import Job Config
* *
@ -42,12 +31,12 @@ import java.time.format.DateTimeFormatter;
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
* *
* 데이터 흐름: * 데이터 흐름:
* ShipMovementReader (ship_data Maritime API) * AnchorageCallsReader (ship_data Maritime API)
* (PortCallDto) * (AnchorageCallsDto)
* ShipMovementProcessor * AnchorageCallsProcessor
* (ShipMovementEntity) * (AnchorageCallsEntity)
* ShipDetailDataWriter * AnchorageCallsWriter
* (ship_movement 테이블) * (t_anchoragecall 테이블)
*/ */
@Slf4j @Slf4j

파일 보기

@ -1,6 +1,5 @@
package com.snp.batch.jobs.shipMovementBerthCalls.batch.dto; package com.snp.batch.jobs.shipMovementBerthCalls.batch.dto;
import com.snp.batch.jobs.shipMovement.batch.dto.PositionDto;
import lombok.Data; import lombok.Data;
@Data @Data

파일 보기

@ -32,12 +32,12 @@ import org.springframework.web.reactive.function.client.WebClient;
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
* *
* 데이터 흐름: * 데이터 흐름:
* ShipMovementReader (ship_data Maritime API) * DarkActivityReader (ship_data Maritime API)
* (PortCallDto) * (DarkActivityDto)
* ShipMovementProcessor * DarkActivityProcessor
* (ShipMovementEntity) * (DarkActivityEntity)
* ShipDetailDataWriter * DarkActivityWriter
* (ship_movement 테이블) * (t_darkactivity 테이블)
*/ */
@Slf4j @Slf4j

파일 보기

@ -0,0 +1,103 @@
package com.snp.batch.jobs.shipMovementDestination.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementDestination.batch.dto.DestinationDto;
import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity;
import com.snp.batch.jobs.shipMovementDestination.batch.processor.DestinationProcessor;
import com.snp.batch.jobs.shipMovementDestination.batch.reader.DestinationReader;
import com.snp.batch.jobs.shipMovementDestination.batch.writer.DestinationWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* DestinationReader (ship_data Maritime API)
* (DestinationDto)
* DestinationProcessor
* (DestinationEntity)
* DestinationProcessor
* (t_destination 테이블)
*/
@Slf4j
@Configuration
public class DestinationsJobConfig extends BaseJobConfig<DestinationDto, DestinationEntity> {
private final DestinationProcessor destinationProcessor;
private final DestinationWriter destinationWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public DestinationsJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
DestinationProcessor destinationProcessor,
DestinationWriter destinationWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.destinationProcessor = destinationProcessor;
this.destinationWriter = destinationWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "DestinationsImportJob";
}
@Override
protected String getStepName() {
return "DestinationsImportStep";
}
@Override
protected ItemReader<DestinationDto> createReader() { // 타입 변경
return new DestinationReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<DestinationDto, DestinationEntity> createProcessor() {
return destinationProcessor;
}
@Override
protected ItemWriter<DestinationEntity> createWriter() { // 타입 변경
return destinationWriter;
}
@Override
protected int getChunkSize() {
return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "DestinationsImportJob")
public Job destinationsImportJob() {
return job();
}
@Bean(name = "DestinationsImportStep")
public Step destinationsImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,24 @@
package com.snp.batch.jobs.shipMovementDestination.batch.dto;
import lombok.Data;
@Data
public class DestinationDto {
private String movementType;
private String imolRorIHSNumber;
private String movementDate;
private Integer facilityId;
private String facilityName;
private String facilityType;
private String countryCode;
private String countryName;
private Double latitude;
private Double longitude;
private DestinationPositionDto position;
private String iso2;
}

파일 보기

@ -0,0 +1,17 @@
package com.snp.batch.jobs.shipMovementDestination.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class DestinationPositionDto {
private boolean isNull;
private int stSrid;
private double lat;
@JsonProperty("long")
private double lon;
private double z;
private double m;
private boolean hasZ;
private boolean hasM;
}

파일 보기

@ -0,0 +1,32 @@
package com.snp.batch.jobs.shipMovementDestination.batch.entity;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class DestinationEntity {
private String movementType;
private String imolRorIHSNumber;
private LocalDateTime movementDate;
private Integer facilityId;
private String facilityName;
private String facilityType;
private String countryCode;
private String countryName;
private Double latitude;
private Double longitude;
private JsonNode position;
private String iso2;
}

파일 보기

@ -0,0 +1,61 @@
package com.snp.batch.jobs.shipMovementDestination.batch.processor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.shipMovementDestination.batch.dto.DestinationDto;
import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 선박 상세 정보 Processor
* ShipDetailDto ShipDetailEntity 변환
*/
/**
* 선박 상세 정보 Processor (해시 비교 증분 데이터 추출)
* I: ShipDetailComparisonData (DB 해시 + API Map Data)
* O: ShipDetailUpdate (변경분)
*/
@Slf4j
@Component
public class DestinationProcessor extends BaseProcessor<DestinationDto, DestinationEntity> {
private final ObjectMapper objectMapper;
public DestinationProcessor(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
protected DestinationEntity processItem(DestinationDto dto) throws Exception {
log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}",
dto.getImolRorIHSNumber(), dto.getFacilityName());
JsonNode positionNode = null;
if (dto.getPosition() != null) {
// Position 객체를 JsonNode로 변환
positionNode = objectMapper.valueToTree(dto.getPosition());
}
DestinationEntity entity = DestinationEntity.builder()
.movementType(dto.getMovementType())
.imolRorIHSNumber(dto.getImolRorIHSNumber())
.movementDate(LocalDateTime.parse(dto.getMovementDate()))
.facilityId(dto.getFacilityId())
.facilityName(dto.getFacilityName())
.facilityType(dto.getFacilityType())
.countryCode(dto.getCountryCode())
.countryName(dto.getCountryName())
.latitude(dto.getLatitude())
.longitude(dto.getLongitude())
.position(positionNode) // JsonNode로 매핑
.iso2(dto.getIso2())
.build();
return entity;
}
}

파일 보기

@ -0,0 +1,211 @@
package com.snp.batch.jobs.shipMovementDestination.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementDestination.batch.dto.DestinationDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class DestinationReader extends BaseApiReader<DestinationDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private int currentBatchIndex = 0;
private final int batchSize = 5;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
private String stopDate = "2025-12-31";
public DestinationReader(WebClient webClient, JdbcTemplate jdbcTemplate ) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "Destinations";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/Movements/Destinations";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
// "SELECT imo_number FROM ship_data ORDER BY id";
"SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_destination) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<DestinationDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<DestinationDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null ) {
List<DestinationDto> destinations = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, destinations.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return destinations;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<DestinationDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(DestinationDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<DestinationDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -0,0 +1,14 @@
package com.snp.batch.jobs.shipMovementDestination.batch.repository;
import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity;
import java.util.List;
/**
* 선박 상세 정보 Repository 인터페이스
*/
public interface DestinationRepository {
void saveAll(List<DestinationEntity> entities);
}

파일 보기

@ -0,0 +1,131 @@
package com.snp.batch.jobs.shipMovementDestination.batch.repository;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.List;
/**
* 선박 상세 정보 Repository 구현체
* BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현
*/
@Slf4j
@Repository("DestinationRepository")
public class DestinationRepositoryImpl extends BaseJdbcRepository<DestinationEntity, String>
implements DestinationRepository {
public DestinationRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate);
}
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
protected String getTableName() {
return "snp_data.t_destination";
}
@Override
protected String getEntityName() {
return "Destinations";
}
@Override
protected String extractId(DestinationEntity entity) {
return entity.getImolRorIHSNumber();
}
@Override
public String getInsertSql() {
return """
INSERT INTO snp_data.t_destination(
imo,
mvmn_type,
mvmn_dt,
fclty_id,
fclty_nm,
fclty_type,
ntn_cd,
ntn_nm,
lat,
lon,
iso2_ntn_cd,
lcinfo
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (imo, mvmn_dt)
DO UPDATE SET
mvmn_type = EXCLUDED.mvmn_type,
mvmn_dt = EXCLUDED.mvmn_dt,
fclty_id = EXCLUDED.fclty_id,
fclty_nm = EXCLUDED.fclty_nm,
fclty_type = EXCLUDED.fclty_type,
ntn_cd = EXCLUDED.ntn_cd,
ntn_nm = EXCLUDED.ntn_nm,
lat = EXCLUDED.lat,
lon = EXCLUDED.lon,
iso2_ntn_cd = EXCLUDED.iso2_ntn_cd,
lcinfo = EXCLUDED.lcinfo
""";
}
@Override
protected String getUpdateSql() {
return null;
}
@Override
protected void setInsertParameters(PreparedStatement ps, DestinationEntity e) throws Exception {
int i = 1;
ps.setString(i++, e.getImolRorIHSNumber()); // imo
ps.setString(i++, e.getMovementType()); // mvmn_type
ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt
ps.setObject(i++, e.getFacilityId()); // fclty_id
ps.setString(i++, e.getFacilityName()); // fclty_nm
ps.setString(i++, e.getFacilityType()); // fclty_type
ps.setString(i++, e.getCountryCode()); // ntn_cd
ps.setString(i++, e.getCountryName()); // ntn_nm
setDoubleOrNull(ps, i++, e.getLatitude()); // lat
setDoubleOrNull(ps, i++, e.getLongitude());// lon
ps.setString(i++, e.getIso2()); // iso2_ntn_cd
if (e.getPosition() != null) {
ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb)
} else {
ps.setNull(i++, java.sql.Types.OTHER);
}
}
private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception {
if (value != null) {
ps.setDouble(index, value);
} else {
// java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정
ps.setNull(index, java.sql.Types.DOUBLE);
}
}
@Override
protected void setUpdateParameters(PreparedStatement ps, DestinationEntity entity) throws Exception {
}
@Override
protected RowMapper<DestinationEntity> getRowMapper() {
return null;
}
@Override
public void saveAll(List<DestinationEntity> entities) {
if (entities == null || entities.isEmpty()) return;
log.info("Destinations 저장 시작 = {}건", entities.size());
batchInsert(entities);
}
}

파일 보기

@ -0,0 +1,36 @@
package com.snp.batch.jobs.shipMovementDestination.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity;
import com.snp.batch.jobs.shipMovementDestination.batch.repository.DestinationRepository;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 선박 상세 정보 Writer
*/
@Slf4j
@Component
public class DestinationWriter extends BaseWriter<DestinationEntity> {
private final DestinationRepository destinationRepository;
public DestinationWriter(DestinationRepository destinationRepository) {
super("Destinations");
this.destinationRepository = destinationRepository;
}
@Override
protected void writeItems(List<DestinationEntity> items) throws Exception {
if (items.isEmpty()) { return; }
destinationRepository.saveAll(items);
log.info("Destinations 데이터 저장: {} 건", items.size());
}
}

파일 보기

@ -1,12 +1,6 @@
package com.snp.batch.jobs.shipMovementStsOperations.batch.config; package com.snp.batch.jobs.shipMovementStsOperations.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsDto;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.processor.BerthCallsProcessor;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.reader.BerthCallsReader;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.writer.BerthCallsWriter;
import com.snp.batch.jobs.shipMovementStsOperations.batch.dto.StsOperationDto; import com.snp.batch.jobs.shipMovementStsOperations.batch.dto.StsOperationDto;
import com.snp.batch.jobs.shipMovementStsOperations.batch.entity.StsOperationEntity; import com.snp.batch.jobs.shipMovementStsOperations.batch.entity.StsOperationEntity;
import com.snp.batch.jobs.shipMovementStsOperations.batch.processor.StsOperationProcessor; import com.snp.batch.jobs.shipMovementStsOperations.batch.processor.StsOperationProcessor;
@ -37,12 +31,12 @@ import org.springframework.web.reactive.function.client.WebClient;
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
* *
* 데이터 흐름: * 데이터 흐름:
* ShipMovementReader (ship_data Maritime API) * StsOperationReader (ship_data Maritime API)
* (PortCallDto) * (StsOperationDto)
* ShipMovementProcessor * StsOperationProcessor
* (ShipMovementEntity) * (StsOperationEntity)
* ShipDetailDataWriter * StsOperationWriter
* (ship_movement 테이블) * (t_stsoperation 테이블)
*/ */
@Slf4j @Slf4j

파일 보기

@ -1,6 +1,5 @@
package com.snp.batch.jobs.shipMovementStsOperations.batch.dto; package com.snp.batch.jobs.shipMovementStsOperations.batch.dto;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsPositionDto;
import lombok.Data; import lombok.Data;
@Data @Data

파일 보기

@ -1,6 +1,5 @@
package com.snp.batch.jobs.shipMovementTerminalCalls.batch.config; package com.snp.batch.jobs.shipMovementTerminalCalls.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto.TerminalCallsDto; import com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto.TerminalCallsDto;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity; import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity;
@ -32,12 +31,12 @@ import org.springframework.web.reactive.function.client.WebClient;
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
* *
* 데이터 흐름: * 데이터 흐름:
* ShipMovementReader (ship_data Maritime API) * TerminalCallsReader (ship_data Maritime API)
* (PortCallDto) * (TerminalCallsDto)
* ShipMovementProcessor * TerminalCallsProcessor
* (ShipMovementEntity) * (TerminalCallsEntity)
* ShipDetailDataWriter * TerminalCallsWriter
* (ship_movement 테이블) * (t_terminalcall 테이블)
*/ */
@Slf4j @Slf4j

파일 보기

@ -0,0 +1,103 @@
package com.snp.batch.jobs.shipMovementTransits.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementTransits.batch.dto.TransitsDto;
import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity;
import com.snp.batch.jobs.shipMovementTransits.batch.processor.TransitsProcessor;
import com.snp.batch.jobs.shipMovementTransits.batch.reader.TransitsReader;
import com.snp.batch.jobs.shipMovementTransits.batch.writer.TransitsWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* TransitsReader (ship_data Maritime API)
* (TransitsDto)
* TransitsProcessor
* (TransitsEntity)
* TransitsWriter
* (t_transit 테이블)
*/
@Slf4j
@Configuration
public class TransitsJobConfig extends BaseJobConfig<TransitsDto, TransitsEntity> {
private final TransitsProcessor transitsProcessor;
private final TransitsWriter transitsWriter;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public TransitsJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
TransitsProcessor TransitsProcessor,
TransitsWriter transitsWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.transitsProcessor = TransitsProcessor;
this.transitsWriter = transitsWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "TransitsImportJob";
}
@Override
protected String getStepName() {
return "TransitsImportStep";
}
@Override
protected ItemReader<TransitsDto> createReader() { // 타입 변경
return new TransitsReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<TransitsDto, TransitsEntity> createProcessor() {
return transitsProcessor;
}
@Override
protected ItemWriter<TransitsEntity> createWriter() { // 타입 변경
return transitsWriter;
}
@Override
protected int getChunkSize() {
return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "TransitsImportJob")
public Job transitsImportJob() {
return job();
}
@Bean(name = "TransitsImportStep")
public Step transitsImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,13 @@
package com.snp.batch.jobs.shipMovementTransits.batch.dto;
import lombok.Data;
@Data
public class TransitsDto {
private String movementType;
private String imolRorIHSNumber;
private String movementDate;
private String facilityName;
private String facilityType;
private Double draught;
}

파일 보기

@ -0,0 +1,21 @@
package com.snp.batch.jobs.shipMovementTransits.batch.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class TransitsEntity {
private String movementType;
private String imolRorIHSNumber;
private LocalDateTime movementDate;
private String facilityName;
private String facilityType;
private Double draught;
}

파일 보기

@ -0,0 +1,47 @@
package com.snp.batch.jobs.shipMovementTransits.batch.processor;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.shipMovementTransits.batch.dto.TransitsDto;
import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 선박 상세 정보 Processor
* ShipDetailDto ShipDetailEntity 변환
*/
/**
* 선박 상세 정보 Processor (해시 비교 증분 데이터 추출)
* I: ShipDetailComparisonData (DB 해시 + API Map Data)
* O: ShipDetailUpdate (변경분)
*/
@Slf4j
@Component
public class TransitsProcessor extends BaseProcessor<TransitsDto, TransitsEntity> {
// private final ObjectMapper objectMapper;
// public TransitsProcessor(ObjectMapper objectMapper) {
// this.objectMapper = objectMapper;
// }
@Override
protected TransitsEntity processItem(TransitsDto dto) throws Exception {
log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}",
dto.getImolRorIHSNumber(), dto.getFacilityName());
TransitsEntity entity = TransitsEntity.builder()
.movementType(dto.getMovementType())
.imolRorIHSNumber(dto.getImolRorIHSNumber())
.movementDate(LocalDateTime.parse(dto.getMovementDate()))
.facilityName(dto.getFacilityName())
.facilityType(dto.getFacilityType())
.draught(dto.getDraught())
.build();
return entity;
}
}

파일 보기

@ -0,0 +1,211 @@
package com.snp.batch.jobs.shipMovementTransits.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementTransits.batch.dto.TransitsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class TransitsReader extends BaseApiReader<TransitsDto> {
private final JdbcTemplate jdbcTemplate;
// 배치 처리 상태
private List<String> allImoNumbers;
// DB 해시값을 저장할
private int currentBatchIndex = 0;
private final int batchSize = 5;
// @Value("#{jobParameters['startDate']}")
// private String startDate;
private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
// private String stopDate;
private String stopDate = "2025-12-31";
public TransitsReader(WebClient webClient, JdbcTemplate jdbcTemplate ) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "Transits";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override
protected String getApiPath() {
return "/Movements/Transits";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
// "SELECT imo_number FROM ship_data ORDER BY id";
"SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_transit) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
// 전처리 과정
// Step 1. IMO 전체 번호 조회
log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<TransitsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<TransitsDto> response = callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null ) {
List<TransitsDto> transits = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, transits.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return transits;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
/**
* Query Parameter를 사용한 API 호출
*
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답
*/
private List<TransitsDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(TransitsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<TransitsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());
}
}
}

파일 보기

@ -0,0 +1,108 @@
package com.snp.batch.jobs.shipMovementTransits.batch.repository;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.List;
/**
* 선박 상세 정보 Repository 구현체
* BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현
*/
@Slf4j
@Repository("TransitsRepository")
public class TransitlsRepositoryImpl extends BaseJdbcRepository<TransitsEntity, String>
implements TransitsRepository {
public TransitlsRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate);
}
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
protected String getTableName() {
return "snp_data.t_transit";
}
@Override
protected String getEntityName() {
return "Transit";
}
@Override
protected String extractId(TransitsEntity entity) {
return entity.getImolRorIHSNumber();
}
@Override
public String getInsertSql() {
return """
INSERT INTO snp_data.t_transit(
imo,
mvmn_type,
mvmn_dt,
fclty_nm,
fclty_type,
draft
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (imo, mvmn_dt)
DO UPDATE SET
mvmn_type = EXCLUDED.mvmn_type,
mvmn_dt = EXCLUDED.mvmn_dt,
fclty_nm = EXCLUDED.fclty_nm,
fclty_type = EXCLUDED.fclty_type,
draft = EXCLUDED.draft
""";
}
@Override
protected String getUpdateSql() {
return null;
}
@Override
protected void setInsertParameters(PreparedStatement ps, TransitsEntity e) throws Exception {
int i = 1;
ps.setString(i++, e.getImolRorIHSNumber()); // imo
ps.setString(i++, e.getMovementType()); // mvmn_type
ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt
ps.setString(i++, e.getFacilityName()); // fclty_nm
ps.setString(i++, e.getFacilityType()); // fclty_type
setDoubleOrNull(ps, i++, e.getDraught()); // draft
}
private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception {
if (value != null) {
ps.setDouble(index, value);
} else {
// java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정
ps.setNull(index, java.sql.Types.DOUBLE);
}
}
@Override
protected void setUpdateParameters(PreparedStatement ps, TransitsEntity entity) throws Exception {
}
@Override
protected RowMapper<TransitsEntity> getRowMapper() {
return null;
}
@Override
public void saveAll(List<TransitsEntity> entities) {
if (entities == null || entities.isEmpty()) return;
log.info("Transits 저장 시작 = {}건", entities.size());
batchInsert(entities);
}
}

파일 보기

@ -0,0 +1,13 @@
package com.snp.batch.jobs.shipMovementTransits.batch.repository;
import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity;
import java.util.List;
/**
* 선박 상세 정보 Repository 인터페이스
*/
public interface TransitsRepository {
void saveAll(List<TransitsEntity> entities);
}

파일 보기

@ -0,0 +1,35 @@
package com.snp.batch.jobs.shipMovementTransits.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity;
import com.snp.batch.jobs.shipMovementTransits.batch.repository.TransitsRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 선박 상세 정보 Writer
*/
@Slf4j
@Component
public class TransitsWriter extends BaseWriter<TransitsEntity> {
private final TransitsRepository transitsRepository;
public TransitsWriter(TransitsRepository transitsRepository) {
super("Transits");
this.transitsRepository = transitsRepository;
}
@Override
protected void writeItems(List<TransitsEntity> items) throws Exception {
if (items.isEmpty()) { return; }
transitsRepository.saveAll(items);
log.info("Transits 데이터 저장: {} 건", items.size());
}
}