Movement Method Range형식으로 변경

This commit is contained in:
Kim JiMyeung 2025-12-19 13:37:35 +09:00
커밋 63e9253d7f
51개의 변경된 파일2761개의 추가작업 그리고 281개의 파일을 삭제

파일 보기

@ -1,6 +1,5 @@
package com.snp.batch.common.batch.repository; package com.snp.batch.common.batch.repository;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;

파일 보기

@ -39,7 +39,6 @@ public class SwaggerConfig {
.info(apiInfo()) .info(apiInfo())
.servers(List.of( .servers(List.of(
new Server() new Server()
.url("http://localhost:" + serverPort + contextPath) .url("http://localhost:" + serverPort + contextPath)
.description("로컬 개발 서버"), .description("로컬 개발 서버"),
new Server() new Server()
@ -89,4 +88,4 @@ public class SwaggerConfig {
.name("Apache 2.0") .name("Apache 2.0")
.url("https://www.apache.org/licenses/LICENSE-2.0")); .url("https://www.apache.org/licenses/LICENSE-2.0"));
} }
} }

파일 보기

@ -80,8 +80,8 @@ public class PscInspectionJobConfig extends BaseJobConfig<PscInspectionDto, PscI
@StepScope @StepScope
public PscApiReader pscApiReader( public PscApiReader pscApiReader(
@Qualifier("maritimeApiWebClient") WebClient webClient, @Qualifier("maritimeApiWebClient") WebClient webClient,
@Value("#{jobParameters['fromDate']}") String fromDate, @Value("#{jobParameters['startDate']}") String fromDate,
@Value("#{jobParameters['toDate']}") String toDate @Value("#{jobParameters['stopDate']}") String toDate
) { ) {
return new PscApiReader(webClient, fromDate, toDate); return new PscApiReader(webClient, fromDate, toDate);
} }
@ -103,7 +103,7 @@ public class PscInspectionJobConfig extends BaseJobConfig<PscInspectionDto, PscI
@Override @Override
protected int getChunkSize() { protected int getChunkSize() {
return 10; // API에서 100개씩 가져오므로 chunk도 100으로 설정 return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
} }
@Bean(name = "PSCDetailImportJob") @Bean(name = "PSCDetailImportJob")

파일 보기

@ -8,9 +8,12 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -18,22 +21,29 @@ import java.util.List;
@StepScope @StepScope
public class PscApiReader extends BaseApiReader<PscInspectionDto> { public class PscApiReader extends BaseApiReader<PscInspectionDto> {
//private final JdbcTemplate jdbcTemplate; private final String startDate;
private final String stopDate;
private final String fromDate;
private final String toDate;
// private List<String> allImoNumbers;
private List<PscInspectionDto> allData; private List<PscInspectionDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 10; private final int batchSize = 1000;
public PscApiReader(@Qualifier("maritimeApiWebClient") WebClient webClient, public PscApiReader(@Qualifier("maritimeApiWebClient") WebClient webClient,
@Value("#{jobParameters['fromDate']}") String fromDate, @Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['toDate']}") String toDate) { @Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient); super(webClient);
//this.jdbcTemplate = jdbcTemplate;
this.fromDate = fromDate; // 날짜가 없으면 전날 하루 기준
this.toDate = toDate; if (startDate == null || startDate.isBlank() ||
stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); enableChunkMode();
} }
@ -45,7 +55,7 @@ public class PscApiReader extends BaseApiReader<PscInspectionDto> {
@Override @Override
protected void resetCustomState() { protected void resetCustomState() {
this.currentBatchIndex = 0; this.currentBatchIndex = 0;
// this.allImoNumbers = null; this.allData = null;
} }
@Override @Override
@ -53,37 +63,18 @@ public class PscApiReader extends BaseApiReader<PscInspectionDto> {
return "/MaritimeWCF/PSCService.svc/RESTFul/GetPSCDataByLastUpdateDateRange"; return "/MaritimeWCF/PSCService.svc/RESTFul/GetPSCDataByLastUpdateDateRange";
} }
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_berthcalls) ORDER BY imo_number";
@Override @Override
protected void beforeFetch() { protected void beforeFetch() {
// 전처리 과정 log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
// Step 1. IMO 전체 번호 조회
/*log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
// API 통계 초기화
updateApiCallStats(totalBatches, 0);*/
log.info("[PSC] 요청 날짜 범위: {} → {}", fromDate, toDate);
} }
@Override @Override
protected List<PscInspectionDto> fetchNextBatch() { protected List<PscInspectionDto> fetchNextBatch() {
// 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다 // 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다
if (allData == null) { if (allData == null) {
log.info("[PSC] 최초 API 조회 실행: {} ~ {}", fromDate, toDate); log.info("[PSC] 최초 API 조회 실행: {} ~ {}", startDate, stopDate);
allData = callApiWithBatch(fromDate, toDate); allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[PSC] 조회된 데이터 없음 → 종료"); log.warn("[PSC] 조회된 데이터 없음 → 종료");
@ -116,20 +107,19 @@ public class PscApiReader extends BaseApiReader<PscInspectionDto> {
return batch; return batch;
} }
// private List<PscInspectionDto> callApiWithBatch(String lrno) { private List<PscInspectionDto> callApiWithBatch(String startDate, String stopDate) {
private List<PscInspectionDto> callApiWithBatch(String from, String to) {
String[] f = from.split("-"); LocalDateTime fromDay = parseToDateTime(startDate, true);
String[] t = to.split("-"); LocalDateTime toDay = parseToDateTime(stopDate, false);
String url = getApiPath() String url = getApiPath()
+ "?shipsCategory=0" + "?shipsCategory=0"
+ "&fromYear=" + f[0] + "&fromYear=" + fromDay.getYear()
+ "&fromMonth=" + f[1] + "&fromMonth=" + fromDay.getMonthValue()
+ "&fromDay=" + f[2] + "&fromDay=" + fromDay.getDayOfMonth()
+ "&toYear=" + t[0] + "&toYear=" + toDay.getYear()
+ "&toMonth=" + t[1] + "&toMonth=" + toDay.getMonthValue()
+ "&toDay=" + t[2]; + "&toDay=" + toDay.getDayOfMonth();
log.info("[PSC] API 호출 URL = {}", url); log.info("[PSC] API 호출 URL = {}", url);
@ -170,4 +160,18 @@ public class PscApiReader extends BaseApiReader<PscInspectionDto> {
getReaderName(), allData.size()); getReaderName(), allData.size());
} }
} }
private LocalDateTime parseToDateTime(String value, boolean isStart) {
// yyyy-MM-dd 경우
if (value.length() == 10) {
LocalDate date = LocalDate.parse(value);
return isStart
? date.atStartOfDay()
: date.plusDays(1).atStartOfDay();
}
// yyyy-MM-ddTHH:mm:ssZ 경우
return OffsetDateTime.parse(value).toLocalDateTime();
}
} }

파일 보기

@ -1,18 +0,0 @@
package com.snp.batch.jobs.shipMovement.batch.repository;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
import org.springframework.stereotype.Repository;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 선박 상세 정보 Repository 인터페이스
*/
public interface ShipMovementRepository {
void saveAll(List<ShipMovementEntity> entities);
boolean existsByPortCallId(Integer portCallId);
}

파일 보기

@ -1,40 +0,0 @@
package com.snp.batch.jobs.shipMovement.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
import com.snp.batch.jobs.shipMovement.batch.repository.ShipMovementRepository;
import com.snp.batch.jobs.shipdetail.batch.repository.ShipDetailRepository;
import com.snp.batch.jobs.shipdetail.batch.repository.ShipHashRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 선박 상세 정보 Writer
*/
@Slf4j
@Component
public class ShipMovementWriter extends BaseWriter<ShipMovementEntity> {
private final ShipMovementRepository shipMovementRepository;
public ShipMovementWriter(ShipDetailRepository shipDetailRepository, ShipHashRepository shipHashRepository, ShipMovementRepository shipMovementRepositoryy) {
super("ShipMovement");
this.shipMovementRepository = shipMovementRepositoryy;
}
@Override
protected void writeItems(List<ShipMovementEntity> items) throws Exception {
if (items.isEmpty()) { return; }
log.info("선박 상세 정보 데이터 저장: {} 건", items.size());
shipMovementRepository.saveAll(items);
log.info("선박 상세 정보 및 해시 데이터 저장 완료: {} 건", items.size());
}
}

파일 보기

@ -0,0 +1,114 @@
package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.processor.AnchorageCallsProcessor;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.reader.AnchorageCallsRangeReader;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.writer.AnchorageCallsWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* AnchorageCallsReader (ship_data Maritime API)
* (AnchorageCallsDto)
* AnchorageCallsProcessor
* (AnchorageCallsEntity)
* AnchorageCallsWriter
* (t_anchoragecall 테이블)
*/
@Slf4j
@Configuration
public class AnchorageCallsRangeJobConfig extends BaseJobConfig<AnchorageCallsDto, AnchorageCallsEntity> {
private final AnchorageCallsProcessor anchorageCallsProcessor;
private final AnchorageCallsWriter anchorageCallsWriter;
private final AnchorageCallsRangeReader anchorageCallsRangeReader;
public AnchorageCallsRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
AnchorageCallsProcessor anchorageCallsProcessor,
AnchorageCallsWriter anchorageCallsWriter,
AnchorageCallsRangeReader anchorageCallsRangeReader
) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.anchorageCallsProcessor = anchorageCallsProcessor;
this.anchorageCallsWriter = anchorageCallsWriter;
this.anchorageCallsRangeReader = anchorageCallsRangeReader;
}
@Override
protected String getJobName() {
return "AnchorageCallsRangeImportJob";
}
@Override
protected String getStepName() {
return "AnchorageCallsRangeImportStep";
}
@Override
protected ItemReader<AnchorageCallsDto> createReader() { // 타입 변경
return anchorageCallsRangeReader;
}
@Bean
@StepScope
public AnchorageCallsRangeReader anchorageCallsReader(
@Qualifier("maritimeServiceApiWebClient") WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
return new AnchorageCallsRangeReader(webClient, startDate, stopDate);
}
@Override
protected ItemProcessor<AnchorageCallsDto, AnchorageCallsEntity> createProcessor() {
return anchorageCallsProcessor;
}
@Override
protected ItemWriter<AnchorageCallsEntity> createWriter() { // 타입 변경
return anchorageCallsWriter;
}
@Override
protected int getChunkSize() {
return 5000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "AnchorageCallsRangeImportJob")
public Job anchorageCallsRangeImportJob() {
return job();
}
@Bean(name = "AnchorageCallsRangeImportStep")
public Step anchorageCallsRangeImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,153 @@
package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class AnchorageCallsRangeReader extends BaseApiReader<AnchorageCallsDto> {
private List<AnchorageCallsDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 5000;
private String startDate;
private String stopDate;
public AnchorageCallsRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() ||
stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode();
}
@Override
protected String getReaderName() {
return "AnchorageCallsReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/Movements/AnchorageCalls";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
@Override
protected List<AnchorageCallsDto> fetchNextBatch() throws Exception {
// 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다
if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate);
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int end = Math.min(currentBatchIndex + batchSize, allData.size());
// 4) 현재 batch 리스트 잘라서 반환
List<AnchorageCallsDto> batch = allData.subList(currentBatchIndex, end);
int batchNum = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), batchNum, totalBatches, batch.size());
// 다음 batch 인덱스 이동
currentBatchIndex = end;
updateApiCallStats(totalBatches, batchNum);
return batch;
}
/**
* Query Parameter를 사용한 API 호출
* @return API 응답
*/
private List<AnchorageCallsDto> callApiWithBatch(String startDate, String stopDate) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate=" + stopDate;
log.info("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(AnchorageCallsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<AnchorageCallsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
}
}
}

파일 보기

@ -1,6 +1,5 @@
package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository; package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity;
import java.util.List; import java.util.List;

파일 보기

@ -3,8 +3,6 @@ package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity;
import com.snp.batch.jobs.shipMovement.batch.repository.ShipMovementRepository;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
@ -32,7 +30,8 @@ public class AnchorageCallsRepositoryImpl extends BaseJdbcRepository<AnchorageCa
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override @Override
protected String getTableName() { protected String getTableName() {
return "snp_data.t_anchoragecall"; // return "snp_data.t_anchoragecall";
return "new_snp.t_anchoragecall";
} }
@Override @Override
@ -47,8 +46,10 @@ public class AnchorageCallsRepositoryImpl extends BaseJdbcRepository<AnchorageCa
@Override @Override
public String getInsertSql() { public String getInsertSql() {
/*return """
INSERT INTO snp_data.t_anchoragecall(*/
return """ return """
INSERT INTO snp_data.t_anchoragecall( INSERT INTO new_snp.t_anchoragecall(
imo, imo,
mvmn_type, mvmn_type,
mvmn_dt, mvmn_dt,

파일 보기

@ -1,11 +1,8 @@
package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.writer; package com.snp.batch.jobs.shipMovementAnchorageCalls.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter; import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.shipMovement.batch.repository.ShipMovementRepository;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository.AnchorageCallsRepository; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository.AnchorageCallsRepository;
import com.snp.batch.jobs.shipdetail.batch.repository.ShipDetailRepository;
import com.snp.batch.jobs.shipdetail.batch.repository.ShipHashRepository;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

파일 보기

@ -0,0 +1,118 @@
package com.snp.batch.jobs.shipMovementBerthCalls.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsDto;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.processor.BerthCallsProcessor;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.reader.BerthCallsRangeReader;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.writer.BerthCallsWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* ShipMovementReader (ship_data Maritime API)
* (PortCallDto)
* ShipMovementProcessor
* (ShipMovementEntity)
* ShipDetailDataWriter
* (ship_movement 테이블)
*/
@Slf4j
@Configuration
public class BerthCallsRangJobConfig extends BaseJobConfig<BerthCallsDto, BerthCallsEntity> {
private final BerthCallsProcessor berthCallsProcessor;
private final BerthCallsWriter berthCallsWriter;
private final BerthCallsRangeReader berthCallsRangeReader;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
private final ObjectMapper objectMapper; // ObjectMapper 주입 추가
public BerthCallsRangJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
BerthCallsProcessor berthCallsProcessor,
BerthCallsWriter berthCallsWriter, BerthCallsRangeReader berthCallsRangeReader, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.berthCallsProcessor = berthCallsProcessor;
this.berthCallsWriter = berthCallsWriter;
this.berthCallsRangeReader = berthCallsRangeReader;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
this.objectMapper = objectMapper; // ObjectMapper 초기화
}
@Override
protected String getJobName() {
return "BerthCallsRangeImportJob";
}
@Override
protected String getStepName() {
return "BerthCallsRangeImportStep";
}
@Override
protected ItemReader<BerthCallsDto> createReader() { // 타입 변경
return berthCallsRangeReader;
}
@Bean
@StepScope
public BerthCallsRangeReader berthCallsRangeReader(
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
return new BerthCallsRangeReader(maritimeApiWebClient, startDate, stopDate);
}
@Override
protected ItemProcessor<BerthCallsDto, BerthCallsEntity> createProcessor() {
return berthCallsProcessor;
}
@Override
protected ItemWriter<BerthCallsEntity> createWriter() {
return berthCallsWriter;
}
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
@Bean(name = "BerthCallsRangeImportJob")
public Job berthCallsRangeImportJob() {
return job();
}
@Bean(name = "BerthCallsRangeImportStep")
public Step berthCallsRangeImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,154 @@
package com.snp.batch.jobs.shipMovementBerthCalls.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.dto.BerthCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class BerthCallsRangeReader extends BaseApiReader<BerthCallsDto> {
private List<BerthCallsDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 5000;
private String startDate;
private String stopDate;
public BerthCallsRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode();
}
@Override
protected String getReaderName() {
return "BerthCallsRangeReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/Movements/BerthCalls";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
@Override
protected List<BerthCallsDto> fetchNextBatch() throws Exception {
// 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다
if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate);
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int end = Math.min(currentBatchIndex + batchSize, allData.size());
// 4) 현재 batch 리스트 잘라서 반환
List<BerthCallsDto> batch = allData.subList(currentBatchIndex, end);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), currentBatchNumber, totalBatches, batch.size());
// 다음 batch 인덱스 이동
currentBatchIndex = end;
updateApiCallStats(totalBatches, currentBatchNumber);
return batch;
}
/**
* Query Parameter를 사용한 API 호출
* @return API 응답
*/
private List<BerthCallsDto> callApiWithBatch(String startDate, String stopDate) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate=" + stopDate;
// "&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(BerthCallsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<BerthCallsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
}
}
}

파일 보기

@ -32,7 +32,8 @@ public class BerthCallsRepositoryImpl extends BaseJdbcRepository<BerthCallsEntit
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override @Override
protected String getTableName() { protected String getTableName() {
return "snp_data.t_berthcall"; // return "snp_data.t_berthcall";
return "new_snp.t_berthcall";
} }
@Override @Override
@ -47,8 +48,10 @@ public class BerthCallsRepositoryImpl extends BaseJdbcRepository<BerthCallsEntit
@Override @Override
public String getInsertSql() { public String getInsertSql() {
/*return """
INSERT INTO snp_data.t_berthcall(*/
return """ return """
INSERT INTO snp_data.t_berthcall( INSERT INTO new_snp.t_berthcall(
imo, imo,
mvmn_type, mvmn_type,
mvmn_dt, mvmn_dt,

파일 보기

@ -1,8 +1,6 @@
package com.snp.batch.jobs.shipMovementBerthCalls.batch.writer; package com.snp.batch.jobs.shipMovementBerthCalls.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter; import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.entity.AnchorageCallsEntity;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.repository.AnchorageCallsRepository;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity; import com.snp.batch.jobs.shipMovementBerthCalls.batch.entiity.BerthCallsEntity;
import com.snp.batch.jobs.shipMovementBerthCalls.batch.repository.BerthCallsRepository; import com.snp.batch.jobs.shipMovementBerthCalls.batch.repository.BerthCallsRepository;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

파일 보기

@ -0,0 +1,116 @@
package com.snp.batch.jobs.shipMovementCurrentlyAt.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementCurrentlyAt.batch.reader.CurrentlyAtRangeReader;
import com.snp.batch.jobs.shipMovementCurrentlyAt.batch.reader.CurrentlyAtReader;
import com.snp.batch.jobs.shipMovementCurrentlyAt.batch.dto.CurrentlyAtDto;
import com.snp.batch.jobs.shipMovementCurrentlyAt.batch.entity.CurrentlyAtEntity;
import com.snp.batch.jobs.shipMovementCurrentlyAt.batch.processor.CurrentlyAtProcessor;
import com.snp.batch.jobs.shipMovementCurrentlyAt.batch.writer.CurrentlyAtWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* CurrentlyAtReader (ship_data Maritime API)
* (CurrentlyAtDto)
* CurrentlyAtProcessor
* (CurrentlyAtEntity)
* CurrentlyAtWriter
* (currentlyat 테이블)
*/
@Slf4j
@Configuration
public class CurrentlyAtRangeJobConfig extends BaseJobConfig<CurrentlyAtDto, CurrentlyAtEntity> {
private final CurrentlyAtProcessor currentlyAtProcessor;
private final CurrentlyAtWriter currentlyAtWriter;
private final CurrentlyAtRangeReader currentlyAtRangeReader;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public CurrentlyAtRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
CurrentlyAtProcessor currentlyAtProcessor,
CurrentlyAtWriter currentlyAtWriter, CurrentlyAtRangeReader currentlyAtRangeReader, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.currentlyAtProcessor = currentlyAtProcessor;
this.currentlyAtWriter = currentlyAtWriter;
this.currentlyAtRangeReader = currentlyAtRangeReader;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "CurrentlyAtRangeImportJob";
}
@Override
protected String getStepName() {
return "currentlyAtRangeImportStep";
}
@Override
protected ItemReader<CurrentlyAtDto> createReader() { // 타입 변경
return currentlyAtRangeReader;
}
@Bean
@StepScope
public CurrentlyAtRangeReader currentlyAtReader(
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new CurrentlyAtRangeReader(maritimeApiWebClient, startDate, stopDate);
}
@Override
protected ItemProcessor<CurrentlyAtDto, CurrentlyAtEntity> createProcessor() {
return currentlyAtProcessor;
}
@Override
protected ItemWriter<CurrentlyAtEntity> createWriter() { // 타입 변경
return currentlyAtWriter;
}
@Override
protected int getChunkSize() {
return 5000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "CurrentlyAtRangeImportJob")
public Job currentlyAtRangeImportJob() {
return job();
}
@Bean(name = "CurrentlyAtRangeImportStep")
public Step currentlyAtRangeImportStep() {
return step();
}
}

파일 보기

@ -1,6 +1,6 @@
package com.snp.batch.jobs.shipMovementCurrentlyAt.batch.dto; package com.snp.batch.jobs.shipMovementCurrentlyAt.batch.dto;
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsPositionDto; import com.snp.batch.jobs.shipMovementPortCalls.batch.dto.PortCallsPositionDto;
import lombok.Data; import lombok.Data;
@Data @Data

파일 보기

@ -0,0 +1,154 @@
package com.snp.batch.jobs.shipMovementCurrentlyAt.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementCurrentlyAt.batch.dto.CurrentlyAtDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
* <p>
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
* <p>
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
* <p>
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class CurrentlyAtRangeReader extends BaseApiReader<CurrentlyAtDto> {
private List<CurrentlyAtDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 5000;
private String startDate;
private String stopDate;
public CurrentlyAtRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "CurrentlyAtReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/Movements/CurrentlyAt";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
@Override
protected void beforeFetch() {
// 전처리 과정
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
@Override
protected List<CurrentlyAtDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null ) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate);
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int endIndex = Math.min(currentBatchIndex + batchSize, allData.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<CurrentlyAtDto> batch = allData.subList(currentBatchIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), currentBatchNumber, totalBatches, batch.size());
currentBatchIndex = endIndex;
updateApiCallStats(totalBatches, currentBatchNumber);
return batch;
}
/**
* Query Parameter를 사용한 API 호출
* @return API 응답
*/
private List<CurrentlyAtDto> callApiWithBatch(String startDate, String stopDate) {
String url = getApiPath() + "?dateCreatedUpdatedStart=" + startDate +"&dateCreatedUpdatedStop="+stopDate;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(CurrentlyAtDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<CurrentlyAtDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
}
}
}

파일 보기

@ -27,7 +27,8 @@ public class CurrentlyAtRepositoryImpl extends BaseJdbcRepository<CurrentlyAtEnt
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override @Override
protected String getTableName() { protected String getTableName() {
return "snp_data.t_currentlyat"; // return "snp_data.t_currentlyat";
return "new_snp.t_currentlyat";
} }
@Override @Override
@ -42,8 +43,10 @@ public class CurrentlyAtRepositoryImpl extends BaseJdbcRepository<CurrentlyAtEnt
@Override @Override
public String getInsertSql() { public String getInsertSql() {
/*return """
INSERT INTO snp_data.t_currentlyat(*/
return """ return """
INSERT INTO snp_data.t_currentlyat( INSERT INTO new_snp.t_currentlyat(
imo, imo,
mvmn_type, mvmn_type,
mvmn_dt, mvmn_dt,
@ -158,49 +161,4 @@ public class CurrentlyAtRepositoryImpl extends BaseJdbcRepository<CurrentlyAtEnt
} }
/*private static class ShipMovementRowMapper implements RowMapper<ShipMovementEntity> {
@Override
public ShipMovementEntity mapRow(ResultSet rs, int rowNum) throws SQLException {
ShipMovementEntity entity = ShipMovementEntity.builder()
.id(rs.getLong("id"))
.imolRorIHSNumber(rs.getString("imolRorIHSNumber"))
.portCallId(rs.getObject("portCallId", Integer.class))
.facilityId(rs.getObject("facilityId", Integer.class))
.facilityName(rs.getString("facilityName"))
.facilityType(rs.getString("facilityType"))
.subFacilityId(rs.getObject("subFacilityId", Integer.class))
.subFacilityName(rs.getString("subFacilityName"))
.subFacilityType(rs.getString("subFacilityType"))
.parentFacilityId(rs.getObject("parentFacilityId", Integer.class))
.parentFacilityName(rs.getString("parentFacilityName"))
.parentFacilityType(rs.getString("parentFacilityType"))
.countryCode(rs.getString("countryCode"))
.countryName(rs.getString("countryName"))
.draught(rs.getObject("draught", Double.class))
.latitude(rs.getObject("latitude", Double.class))
.longitude(rs.getObject("longitude", Double.class))
.destination(rs.getString("destination"))
.iso2(rs.getString("iso2"))
.position(parseJson(rs.getString("position")))
.schemaType(rs.getString("schemaType"))
.build();
Timestamp movementDate = rs.getTimestamp("movementDate");
if (movementDate != null) {
entity.setMovementDate(movementDate.toLocalDateTime());
}
return entity;
}
private JsonNode parseJson(String json) {
try {
if (json == null) return null;
return new ObjectMapper().readTree(json);
} catch (Exception e) {
throw new RuntimeException("JSON 파싱 오류: " + json);
}
}
}*/
} }

파일 보기

@ -0,0 +1,119 @@
package com.snp.batch.jobs.shipMovementDarkActivity.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.dto.DarkActivityDto;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.entity.DarkActivityEntity;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.processor.DarkActivityProcessor;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.reader.DarkActivityRangeReader;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.writer.DarkActivityWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* DarkActivityReader (ship_data Maritime API)
* (DarkActivityDto)
* DarkActivityProcessor
* (DarkActivityEntity)
* DarkActivityWriter
* (t_darkactivity 테이블)
*/
@Slf4j
@Configuration
public class DarkActivityRangeJobConfig extends BaseJobConfig<DarkActivityDto, DarkActivityEntity> {
private final DarkActivityProcessor darkActivityProcessor;
private final DarkActivityWriter darkActivityWriter;
private final DarkActivityRangeReader darkActivityRangeReader;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public DarkActivityRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
DarkActivityProcessor darkActivityProcessor,
DarkActivityWriter darkActivityWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper, DarkActivityRangeReader darkActivityRangeReader) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.darkActivityProcessor = darkActivityProcessor;
this.darkActivityWriter = darkActivityWriter;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
this.darkActivityRangeReader = darkActivityRangeReader;
}
@Override
protected String getJobName() {
return "DarkActivityRangeImportJob";
}
@Override
protected String getStepName() {
return "DarkActivityRangeImportStep";
}
@Override
protected ItemReader<DarkActivityDto> createReader() { // 타입 변경
// Reader 생성자 수정: ObjectMapper를 전달합니다.
return darkActivityRangeReader;
}
@Bean
@StepScope
public DarkActivityRangeReader darkActivityReader(
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new DarkActivityRangeReader(maritimeApiWebClient, startDate, stopDate);
}
@Override
protected ItemProcessor<DarkActivityDto, DarkActivityEntity> createProcessor() {
return darkActivityProcessor;
}
@Override
protected ItemWriter<DarkActivityEntity> createWriter() { // 타입 변경
return darkActivityWriter;
}
@Override
protected int getChunkSize() {
return 5000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "DarkActivityRangeImportJob")
public Job darkActivityRangeImportJob() {
return job();
}
@Bean(name = "DarkActivityRangeImportStep")
public Step darkActivityRangeImportStep() {
return step();
}
}

파일 보기

@ -24,7 +24,7 @@ public class DarkActivityDto {
private Double latitude; private Double latitude;
private Double longitude; private Double longitude;
private AnchorageCallsPositionDto position; private DarkActivityPositionDto position;
private String eventStartDate; private String eventStartDate;
} }

파일 보기

@ -0,0 +1,17 @@
package com.snp.batch.jobs.shipMovementDarkActivity.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class DarkActivityPositionDto {
private boolean isNull;
private int stSrid;
private double lat;
@JsonProperty("long")
private double lon;
private double z;
private double m;
private boolean hasZ;
private boolean hasM;
}

파일 보기

@ -0,0 +1,182 @@
package com.snp.batch.jobs.shipMovementDarkActivity.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.dto.DarkActivityDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class DarkActivityRangeReader extends BaseApiReader<DarkActivityDto> {
private List<DarkActivityDto> allData;
// DB 해시값을 저장할
private int currentBatchIndex = 0;
private final int batchSize = 5000;
// @Value("#{jobParameters['startDate']}")
private String startDate;
// private String startDate = "2025-01-01";
// @Value("#{jobParameters['stopDate']}")
private String stopDate;
// private String stopDate = "2025-12-31";
/*public DarkActivityRangeReader(WebClient webClient) {
super(webClient);
enableChunkMode(); // Chunk 모드 활성화
}*/
public DarkActivityRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "DarkActivityReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/Movements/DarkActivity";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
private static final String GET_ALL_IMO_QUERY =
"SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_darkactivity) ORDER BY imo_number";
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<DarkActivityDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null ) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate);
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int endIndex = Math.min(currentBatchIndex + batchSize, allData.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<DarkActivityDto> batch = allData.subList(currentBatchIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), currentBatchNumber, totalBatches, batch.size());
currentBatchIndex = endIndex;
updateApiCallStats(totalBatches, currentBatchNumber);
return batch;
}
/**
* Query Parameter를 사용한 API 호출
*
* @param startDate,stopDate
* @return API 응답
*/
private List<DarkActivityDto> callApiWithBatch(String startDate, String stopDate){
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate;
// +"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(DarkActivityDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<DarkActivityDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
/* log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allData.size());*/
}
}
}

파일 보기

@ -32,7 +32,8 @@ public class DarkActivityRepositoryImpl extends BaseJdbcRepository<DarkActivityE
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override @Override
protected String getTableName() { protected String getTableName() {
return "snp_data.t_darkactivity"; return "new_snp.t_darkactivity";
// return "snp_data.t_darkactivity";
} }
@Override @Override
@ -47,8 +48,10 @@ public class DarkActivityRepositoryImpl extends BaseJdbcRepository<DarkActivityE
@Override @Override
public String getInsertSql() { public String getInsertSql() {
// return """
// INSERT INTO snp_data.t_darkactivity(
return """ return """
INSERT INTO snp_data.t_darkactivity( INSERT INTO new_snp.t_darkactivity(
imo, imo,
mvmn_type, mvmn_type,
mvmn_dt, mvmn_dt,
@ -66,7 +69,7 @@ public class DarkActivityRepositoryImpl extends BaseJdbcRepository<DarkActivityE
evt_start_dt, evt_start_dt,
lcinfo lcinfo
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (imo, mvmn_type, mvmn_dt) ON CONFLICT (imo, mvmn_type, mvmn_dt, fclty_id)
DO UPDATE SET DO UPDATE SET
mvmn_type = EXCLUDED.mvmn_type, mvmn_type = EXCLUDED.mvmn_type,
mvmn_dt = EXCLUDED.mvmn_dt, mvmn_dt = EXCLUDED.mvmn_dt,

파일 보기

@ -0,0 +1,114 @@
package com.snp.batch.jobs.shipMovementDestination.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.reader.DarkActivityRangeReader;
import com.snp.batch.jobs.shipMovementDestination.batch.dto.DestinationDto;
import com.snp.batch.jobs.shipMovementDestination.batch.entity.DestinationEntity;
import com.snp.batch.jobs.shipMovementDestination.batch.processor.DestinationProcessor;
import com.snp.batch.jobs.shipMovementDestination.batch.reader.DestinationRangeReader;
import com.snp.batch.jobs.shipMovementDestination.batch.writer.DestinationWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* DestinationReader (ship_data Maritime API)
* (DestinationDto)
* DestinationProcessor
* (DestinationEntity)
* DestinationProcessor
* (t_destination 테이블)
*/
@Slf4j
@Configuration
public class DestinationsRangeJobConfig extends BaseJobConfig<DestinationDto, DestinationEntity> {
private final DestinationProcessor DestinationProcessor;
private final DestinationWriter DestinationWriter;
private final DestinationRangeReader destinationRangeReader;
private final WebClient maritimeApiWebClient;
public DestinationsRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
DestinationProcessor DestinationProcessor,
DestinationWriter DestinationWriter, DestinationRangeReader destinationRangeReader,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.DestinationProcessor = DestinationProcessor;
this.DestinationWriter = DestinationWriter;
this.destinationRangeReader = destinationRangeReader;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "DestinationsRangeImportJob";
}
@Override
protected String getStepName() {
return "DestinationsRangeImportStep";
}
@Override
protected ItemReader<DestinationDto> createReader() { // 타입 변경
return destinationRangeReader;
}
@Bean
@StepScope
public DestinationRangeReader destinationRangeReader(
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new DestinationRangeReader(maritimeApiWebClient, startDate, stopDate);
}
@Override
protected ItemProcessor<DestinationDto, DestinationEntity> createProcessor() {
return DestinationProcessor;
}
@Override
protected ItemWriter<DestinationEntity> createWriter() { // 타입 변경
return DestinationWriter;
}
@Override
protected int getChunkSize() {
return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "DestinationsRangeImportJob")
public Job destinationsRangeImportJob() {
return job();
}
@Bean(name = "DestinationsRangeImportStep")
public Step destinationsRangeImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,161 @@
package com.snp.batch.jobs.shipMovementDestination.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementDestination.batch.dto.DestinationDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
* <p>
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
* <p>
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
* <p>
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class DestinationRangeReader extends BaseApiReader<DestinationDto> {
private List<DestinationDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 1000;
private String startDate;
private String stopDate;
public DestinationRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient);
// 날짜가 + 한달 기간 도착예정지 정보 update
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) {
LocalDate today = LocalDate.now();
this.startDate = today
.atStartOfDay()
.format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = today
.plusDays(15)
.atStartOfDay()
.format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "DestinationsRange";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/Movements/Destinations";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
@Override
protected List<DestinationDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
// 모든 배치 처리 완료 확인
if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate);
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int endIndex = Math.min(currentBatchIndex + batchSize, allData.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<DestinationDto> batch = allData.subList(currentBatchIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), currentBatchNumber, totalBatches, batch.size());
currentBatchIndex = endIndex;
updateApiCallStats(totalBatches, currentBatchNumber);
return batch;
}
/**
* Query Parameter를 사용한 API 호출
* @return API 응답
*/
private List<DestinationDto> callApiWithBatch(String startDate, String stopDate) {
String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate;
// +"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(DestinationDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<DestinationDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
}
}
}

파일 보기

@ -27,12 +27,13 @@ public class DestinationRepositoryImpl extends BaseJdbcRepository<DestinationEnt
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override @Override
protected String getTableName() { protected String getTableName() {
return "snp_data.t_destination"; // return "snp_data.t_destination";
return "new_snp.t_destination";
} }
@Override @Override
protected String getEntityName() { protected String getEntityName() {
return "Destinations"; return "DestinationsRange";
} }
@Override @Override
@ -42,8 +43,10 @@ public class DestinationRepositoryImpl extends BaseJdbcRepository<DestinationEnt
@Override @Override
public String getInsertSql() { public String getInsertSql() {
/*return """
INSERT INTO snp_data.t_destination(*/
return """ return """
INSERT INTO snp_data.t_destination( INSERT INTO new_snp.t_destination(
imo, imo,
mvmn_type, mvmn_type,
mvmn_dt, mvmn_dt,
@ -57,7 +60,7 @@ public class DestinationRepositoryImpl extends BaseJdbcRepository<DestinationEnt
iso2_ntn_cd, iso2_ntn_cd,
lcinfo lcinfo
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (imo, mvmn_type, mvmn_dt) ON CONFLICT (imo)
DO UPDATE SET DO UPDATE SET
mvmn_type = EXCLUDED.mvmn_type, mvmn_type = EXCLUDED.mvmn_type,
mvmn_dt = EXCLUDED.mvmn_dt, mvmn_dt = EXCLUDED.mvmn_dt,
@ -122,7 +125,6 @@ public class DestinationRepositoryImpl extends BaseJdbcRepository<DestinationEnt
@Override @Override
public void saveAll(List<DestinationEntity> entities) { public void saveAll(List<DestinationEntity> entities) {
if (entities == null || entities.isEmpty()) return; if (entities == null || entities.isEmpty()) return;
log.info("Destinations 저장 시작 = {}건", entities.size()); log.info("Destinations 저장 시작 = {}건", entities.size());
batchInsert(entities); batchInsert(entities);

파일 보기

@ -1,12 +1,12 @@
package com.snp.batch.jobs.shipMovement.batch.config; package com.snp.batch.jobs.shipMovementPortCalls.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsDto; import com.snp.batch.jobs.shipMovementPortCalls.batch.dto.PortCallsDto;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; import com.snp.batch.jobs.shipMovementPortCalls.batch.entity.PortCallsEntity;
import com.snp.batch.jobs.shipMovement.batch.processor.ShipMovementProcessor; import com.snp.batch.jobs.shipMovementPortCalls.batch.processor.PortCallsProcessor;
import com.snp.batch.jobs.shipMovement.batch.reader.ShipMovementReader; import com.snp.batch.jobs.shipMovementPortCalls.batch.reader.PortCallsReader;
import com.snp.batch.jobs.shipMovement.batch.writer.ShipMovementWriter; import com.snp.batch.jobs.shipMovementPortCalls.batch.writer.PortCallsWriter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
@ -37,34 +37,34 @@ import java.time.format.DateTimeFormatter;
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
* *
* 데이터 흐름: * 데이터 흐름:
* ShipMovementReader (ship_data Maritime API) * PortCallsReader (ship_data Maritime API)
* (PortCallDto) * (PortCallDto)
* ShipMovementProcessor * PortCallsProcessor
* (ShipMovementEntity) * (PortCallsEntity)
* ShipDetailDataWriter * ShipDetailDataWriter
* (ship_movement 테이블) * (ship_movement 테이블)
*/ */
@Slf4j @Slf4j
@Configuration @Configuration
public class ShipMovementJobConfig extends BaseJobConfig<PortCallsDto, ShipMovementEntity> { public class ShipPortCallsJobConfig extends BaseJobConfig<PortCallsDto, PortCallsEntity> {
private final ShipMovementProcessor shipMovementProcessor; private final PortCallsProcessor portCallsProcessor;
private final ShipMovementWriter shipMovementWriter; private final PortCallsWriter portCallsWriter;
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
private final ObjectMapper objectMapper; // ObjectMapper 주입 추가 private final ObjectMapper objectMapper; // ObjectMapper 주입 추가
public ShipMovementJobConfig( public ShipPortCallsJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
PlatformTransactionManager transactionManager, PlatformTransactionManager transactionManager,
ShipMovementProcessor shipMovementProcessor, PortCallsProcessor portCallsProcessor,
ShipMovementWriter shipMovementWriter, JdbcTemplate jdbcTemplate, PortCallsWriter portCallsWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper) { // ObjectMapper 주입 추가 ObjectMapper objectMapper) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.shipMovementProcessor = shipMovementProcessor; this.portCallsProcessor = portCallsProcessor;
this.shipMovementWriter = shipMovementWriter; this.portCallsWriter = portCallsWriter;
this.jdbcTemplate = jdbcTemplate; this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient; this.maritimeApiWebClient = maritimeApiWebClient;
this.objectMapper = objectMapper; // ObjectMapper 초기화 this.objectMapper = objectMapper; // ObjectMapper 초기화
@ -72,30 +72,28 @@ public class ShipMovementJobConfig extends BaseJobConfig<PortCallsDto, ShipMovem
@Override @Override
protected String getJobName() { protected String getJobName() {
return "shipMovementJob"; return "PortCallsImportJob";
} }
@Override @Override
protected String getStepName() { protected String getStepName() {
return "shipMovementStep"; return "PortCallsImportStep";
} }
@Bean @Bean
@StepScope @StepScope
public ShipMovementReader shipMovementReader( public PortCallsReader portCallsReader(
@Value("#{jobParameters['startDate']}") String startDate, @Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) { @Value("#{jobParameters['stopDate']}") String stopDate) {
LocalDate today = LocalDate.now(); if (startDate == null || startDate.isBlank() ||
stopDate == null || stopDate.isBlank()) {
if(startDate == null || startDate.isBlank()) { LocalDate yesterday = LocalDate.now().minusDays(1);
startDate = today.minusYears(1).plusDays(1).format(DateTimeFormatter.ISO_LOCAL_DATE); startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} }
if(stopDate == null || stopDate.isBlank()) { PortCallsReader reader = new PortCallsReader(maritimeApiWebClient, jdbcTemplate, objectMapper);
stopDate = today.format(DateTimeFormatter.ISO_LOCAL_DATE);
}
ShipMovementReader reader = new ShipMovementReader(maritimeApiWebClient, jdbcTemplate, objectMapper);
reader.setStartDate(startDate); reader.setStartDate(startDate);
reader.setStopDate(stopDate); reader.setStopDate(stopDate);
return reader; return reader;
@ -103,32 +101,32 @@ public class ShipMovementJobConfig extends BaseJobConfig<PortCallsDto, ShipMovem
@Override @Override
protected ItemReader<PortCallsDto> createReader() { // 타입 변경 protected ItemReader<PortCallsDto> createReader() { // 타입 변경
// Reader 생성자 수정: ObjectMapper를 전달합니다. // Reader 생성자 수정: ObjectMapper를 전달합니다.
return shipMovementReader(null, null); return portCallsReader( null, null);
//return new ShipMovementReader(maritimeApiWebClient, jdbcTemplate, objectMapper); //return new PortCallsReader(maritimeApiWebClient, jdbcTemplate, objectMapper);
} }
@Override @Override
protected ItemProcessor<PortCallsDto, ShipMovementEntity> createProcessor() { protected ItemProcessor<PortCallsDto, PortCallsEntity> createProcessor() {
return shipMovementProcessor; return portCallsProcessor;
} }
@Override @Override
protected ItemWriter<ShipMovementEntity> createWriter() { // 타입 변경 protected ItemWriter<PortCallsEntity> createWriter() { // 타입 변경
return shipMovementWriter; return portCallsWriter;
} }
@Override @Override
protected int getChunkSize() { protected int getChunkSize() {
return 50; // API에서 100개씩 가져오므로 chunk도 100으로 설정 return 1000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
} }
@Bean(name = "shipMovementJob") @Bean(name = "PortCallsImportJob")
public Job shipMovementJob() { public Job portCallsImportJob() {
return job(); return job();
} }
@Bean(name = "shipMovementStep") @Bean(name = "PortCallsImportStep")
public Step shipMovementStep() { public Step portCallsImportStep() {
return step(); return step();
} }
} }

파일 보기

@ -0,0 +1,114 @@
package com.snp.batch.jobs.shipMovementPortCalls.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementPortCalls.batch.dto.PortCallsDto;
import com.snp.batch.jobs.shipMovementPortCalls.batch.entity.PortCallsEntity;
import com.snp.batch.jobs.shipMovementPortCalls.batch.processor.PortCallsProcessor;
import com.snp.batch.jobs.shipMovementPortCalls.batch.reader.PortCallsRangeReader;
import com.snp.batch.jobs.shipMovementPortCalls.batch.reader.PortCallsReader;
import com.snp.batch.jobs.shipMovementPortCalls.batch.writer.PortCallsWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* PortCallsReader (ship_data Maritime API)
* (PortCallDto)
* PortCallsProcessor
* (PortCallsEntity)
* ShipDetailDataWriter
* (ship_movement 테이블)
*/
@Slf4j
@Configuration
public class ShipPortCallsRangeJobConfig extends BaseJobConfig<PortCallsDto, PortCallsEntity> {
private final PortCallsProcessor portCallsProcessor;
private final PortCallsWriter portCallsWriter;
private final PortCallsRangeReader portCallsRangeReader;
public ShipPortCallsRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
PortCallsProcessor portCallsProcessor,
PortCallsWriter portCallsWriter, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper, PortCallsRangeReader portCallsRangeReader) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.portCallsProcessor = portCallsProcessor;
this.portCallsWriter = portCallsWriter;
this.portCallsRangeReader = portCallsRangeReader;
}
@Override
protected String getJobName() {
return "PortCallsRangeImportJob";
}
@Override
protected String getStepName() {
return "PortCallsRangeImportStep";
}
@Bean
@StepScope
public PortCallsRangeReader portCallsRangeReader(
@Qualifier("maritimeServiceApiWebClient") WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
return new PortCallsRangeReader(webClient, startDate, stopDate);
}
@Override
protected ItemReader<PortCallsDto> createReader() { // 타입 변경
return portCallsRangeReader;
}
@Override
protected ItemProcessor<PortCallsDto, PortCallsEntity> createProcessor() {
return portCallsProcessor;
}
@Override
protected ItemWriter<PortCallsEntity> createWriter() { // 타입 변경
return portCallsWriter;
}
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개 가져오므로 chunk도 5000개씩 설정
}
@Bean(name = "PortCallsRangeImportJob")
public Job portCallsRangeImportJob() {
return job();
}
@Bean(name = "PortCallsRangeImportStep")
public Step portCallsRangeImportStep() {
return step();
}
}

파일 보기

@ -1,4 +1,4 @@
package com.snp.batch.jobs.shipMovement.batch.dto; package com.snp.batch.jobs.shipMovementPortCalls.batch.dto;
import lombok.Data; import lombok.Data;

파일 보기

@ -1,4 +1,4 @@
package com.snp.batch.jobs.shipMovement.batch.dto; package com.snp.batch.jobs.shipMovementPortCalls.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data; import lombok.Data;

파일 보기

@ -1,4 +1,4 @@
package com.snp.batch.jobs.shipMovement.batch.dto; package com.snp.batch.jobs.shipMovementPortCalls.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data; import lombok.Data;

파일 보기

@ -1,4 +1,4 @@
package com.snp.batch.jobs.shipMovement.batch.entity; package com.snp.batch.jobs.shipMovementPortCalls.batch.entity;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import jakarta.persistence.GeneratedValue; import jakarta.persistence.GeneratedValue;
@ -7,7 +7,6 @@ import jakarta.persistence.Id;
import jakarta.persistence.SequenceGenerator; import jakarta.persistence.SequenceGenerator;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@ -17,7 +16,7 @@ import java.time.LocalDateTime;
@SuperBuilder @SuperBuilder
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class ShipMovementEntity { public class PortCallsEntity {
@Id @Id
@GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ship_movement_id_seq") @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ship_movement_id_seq")
@SequenceGenerator(name = "ship_movement_id_seq", sequenceName = "ship_movement_id_seq", allocationSize = 1) @SequenceGenerator(name = "ship_movement_id_seq", sequenceName = "ship_movement_id_seq", allocationSize = 1)

파일 보기

@ -1,10 +1,10 @@
package com.snp.batch.jobs.shipMovement.batch.processor; package com.snp.batch.jobs.shipMovementPortCalls.batch.processor;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.processor.BaseProcessor; import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsDto; import com.snp.batch.jobs.shipMovementPortCalls.batch.dto.PortCallsDto;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; import com.snp.batch.jobs.shipMovementPortCalls.batch.entity.PortCallsEntity;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -22,16 +22,16 @@ import java.time.LocalDateTime;
*/ */
@Slf4j @Slf4j
@Component @Component
public class ShipMovementProcessor extends BaseProcessor<PortCallsDto, ShipMovementEntity> { public class PortCallsProcessor extends BaseProcessor<PortCallsDto, PortCallsEntity> {
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
public ShipMovementProcessor(ObjectMapper objectMapper) { public PortCallsProcessor(ObjectMapper objectMapper) {
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
} }
@Override @Override
protected ShipMovementEntity processItem(PortCallsDto dto) throws Exception { protected PortCallsEntity processItem(PortCallsDto dto) throws Exception {
log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}",
dto.getImolRorIHSNumber(), dto.getFacilityName()); dto.getImolRorIHSNumber(), dto.getFacilityName());
@ -41,7 +41,7 @@ public class ShipMovementProcessor extends BaseProcessor<PortCallsDto, ShipMovem
positionNode = objectMapper.valueToTree(dto.getPosition()); positionNode = objectMapper.valueToTree(dto.getPosition());
} }
ShipMovementEntity entity = ShipMovementEntity.builder() PortCallsEntity entity = PortCallsEntity.builder()
.movementType(dto.getMovementType()) .movementType(dto.getMovementType())
.imolRorIHSNumber(dto.getImolRorIHSNumber()) .imolRorIHSNumber(dto.getImolRorIHSNumber())
.movementDate(LocalDateTime.parse(dto.getMovementDate())) .movementDate(LocalDateTime.parse(dto.getMovementDate()))

파일 보기

@ -0,0 +1,160 @@
package com.snp.batch.jobs.shipMovementPortCalls.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementPortCalls.batch.dto.PortCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class PortCallsRangeReader extends BaseApiReader<PortCallsDto> {
private List<PortCallsDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 5000;
private String startDate;
private String stopDate;
public PortCallsRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() ||
stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode();
}
@Override
protected String getReaderName() {
return "PortCallsRangeReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/Movements/PortCalls";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
/**
* Chunk 기반 핵심 메서드: 다음 배치를 조회하여 반환
*
* Spring Batch가 batchsize만큼 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 ( 이상 없으면 null)
*/
@Override
protected List<PortCallsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate);
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int end = Math.min(currentBatchIndex + batchSize, allData.size());
// 4) 현재 batch 리스트 잘라서 반환
List<PortCallsDto> batch = allData.subList(currentBatchIndex, end);
int batchNum = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), batchNum, totalBatches, batch.size());
// 다음 batch 인덱스 이동
currentBatchIndex = end;
updateApiCallStats(totalBatches, batchNum);
return batch;
}
/**
* Query Parameter를 사용한 API 호출
* @return API 응답
*/
private List<PortCallsDto> callApiWithBatch(String startDate, String stopDate) {
String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate;
log.info("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(PortCallsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<PortCallsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
}
}
}

파일 보기

@ -1,9 +1,10 @@
package com.snp.batch.jobs.shipMovement.batch.reader; package com.snp.batch.jobs.shipMovementPortCalls.batch.reader;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovement.batch.dto.PortCallsDto; import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto;
import com.snp.batch.jobs.shipMovement.batch.dto.ShipMovementApiResponse; import com.snp.batch.jobs.shipMovementPortCalls.batch.dto.PortCallsDto;
import com.snp.batch.jobs.shipMovementPortCalls.batch.dto.ShipMovementApiResponse;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -34,7 +35,7 @@ import java.util.*;
*/ */
@Slf4j @Slf4j
@StepScope @StepScope
public class ShipMovementReader extends BaseApiReader<PortCallsDto> { public class PortCallsReader extends BaseApiReader<PortCallsDto> {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
@ -46,16 +47,16 @@ public class ShipMovementReader extends BaseApiReader<PortCallsDto> {
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 10; private final int batchSize = 10;
@Value("#{jobParameters['startDate']}") // @Value("#{jobParameters['startDate']}")
private String startDate; // private String startDate;
// private String startDate = "2024-01-01"; private String startDate = "2025-01-01";
@Value("#{jobParameters['stopDate']}") // @Value("#{jobParameters['stopDate']}")
private String stopDate; // private String stopDate;
// private String stopDate = "2024-12-31"; private String stopDate = "2025-12-31";
public void setStartDate(String startDate) {this.startDate = startDate;} public void setStartDate(String startDate) {this.startDate = startDate;}
public void setStopDate(String stopDate){this.stopDate=stopDate;} public void setStopDate(String stopDate){this.stopDate=stopDate;}
public ShipMovementReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) { public PortCallsReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
super(webClient); super(webClient);
this.jdbcTemplate = jdbcTemplate; this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
@ -76,7 +77,7 @@ public class ShipMovementReader extends BaseApiReader<PortCallsDto> {
@Override @Override
protected String getApiPath() { protected String getApiPath() {
return "/Movements"; return "/Movements/PortCalls";
} }
@Override @Override
@ -88,9 +89,6 @@ public class ShipMovementReader extends BaseApiReader<PortCallsDto> {
"SELECT imo_number FROM ship_data ORDER BY id"; "SELECT imo_number FROM ship_data ORDER BY id";
// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_ship_stpov_info) ORDER BY imo_number"; // "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_ship_stpov_info) ORDER BY imo_number";
private static final String FETCH_ALL_HASHES_QUERY =
"SELECT imo_number, ship_detail_hash FROM ship_detail_hash_json ORDER BY imo_number";
/** /**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/ */
@ -144,15 +142,16 @@ public class ShipMovementReader extends BaseApiReader<PortCallsDto> {
String imoParam = String.join(",", currentBatch); String imoParam = String.join(",", currentBatch);
// API 호출 // API 호출
ShipMovementApiResponse response = callApiWithBatch(imoParam); // ShipMovementApiResponse response = callApiWithBatch(imoParam);
List<PortCallsDto> response= callApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동 // 다음 배치로 인덱스 이동
currentBatchIndex = endIndex; currentBatchIndex = endIndex;
// 응답 처리 // 응답 처리
if (response != null && response.getPortCallList() != null) { if (response != null) {
List<PortCallsDto> portCalls = response.getPortCallList(); List<PortCallsDto> portCalls = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회", log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, portCalls.size()); getReaderName(), currentBatchNumber, totalBatches, portCalls.size());
@ -194,7 +193,7 @@ public class ShipMovementReader extends BaseApiReader<PortCallsDto> {
* @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...") * @param lrno 쉼표로 연결된 IMO 번호 (: "1000019,1000021,...")
* @return API 응답 * @return API 응답
*/ */
private ShipMovementApiResponse callApiWithBatch(String lrno) { private List<PortCallsDto> callApiWithBatch(String lrno) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno; String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url); log.debug("[{}] API 호출: {}", getReaderName(), url);
@ -202,7 +201,8 @@ public class ShipMovementReader extends BaseApiReader<PortCallsDto> {
return webClient.get() return webClient.get()
.uri(url) .uri(url)
.retrieve() .retrieve()
.bodyToMono(ShipMovementApiResponse.class) .bodyToFlux(PortCallsDto.class)
.collectList()
.block(); .block();
} }

파일 보기

@ -0,0 +1,16 @@
package com.snp.batch.jobs.shipMovementPortCalls.batch.repository;
import com.snp.batch.jobs.shipMovementPortCalls.batch.entity.PortCallsEntity;
import java.util.List;
/**
* 선박 상세 정보 Repository 인터페이스
*/
public interface PortCallsRepository {
void saveAll(List<PortCallsEntity> entities);
boolean existsByPortCallId(Integer portCallId);
}

파일 보기

@ -1,9 +1,9 @@
package com.snp.batch.jobs.shipMovement.batch.repository; package com.snp.batch.jobs.shipMovementPortCalls.batch.repository;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.shipMovement.batch.entity.ShipMovementEntity; import com.snp.batch.jobs.shipMovementPortCalls.batch.entity.PortCallsEntity;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.RowMapper;
@ -21,16 +21,17 @@ import java.util.List;
*/ */
@Slf4j @Slf4j
@Repository("ShipMovementRepository") @Repository("ShipMovementRepository")
public class ShipMovementRepositoryImpl extends BaseJdbcRepository<ShipMovementEntity, String> public class PortCallsRepositoryImpl extends BaseJdbcRepository<PortCallsEntity, String>
implements ShipMovementRepository { implements PortCallsRepository {
public ShipMovementRepositoryImpl(JdbcTemplate jdbcTemplate) { public PortCallsRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate); super(jdbcTemplate);
} }
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override @Override
protected String getTableName() { protected String getTableName() {
return "snp_data.t_ship_stpov_info"; // return "snp_data.t_ship_stpov_info";
return "new_snp.t_ship_stpov_info";
} }
@Override @Override
@ -39,14 +40,16 @@ public class ShipMovementRepositoryImpl extends BaseJdbcRepository<ShipMovementE
} }
@Override @Override
protected String extractId(ShipMovementEntity entity) { protected String extractId(PortCallsEntity entity) {
return entity.getImolRorIHSNumber(); return entity.getImolRorIHSNumber();
} }
@Override @Override
public String getInsertSql() { public String getInsertSql() {
// return """
// INSERT INTO snp_data.t_ship_stpov_info(
return """ return """
INSERT INTO snp_data.t_ship_stpov_info( INSERT INTO new_snp.t_ship_stpov_info(
imo, imo,
mvmn_type, mvmn_type,
mvmn_dt, mvmn_dt,
@ -127,7 +130,7 @@ public class ShipMovementRepositoryImpl extends BaseJdbcRepository<ShipMovementE
} }
@Override @Override
protected void setInsertParameters(PreparedStatement ps, ShipMovementEntity e) throws Exception { protected void setInsertParameters(PreparedStatement ps, PortCallsEntity e) throws Exception {
int i = 1; int i = 1;
ps.setString(i++, e.getImolRorIHSNumber()); // imo ps.setString(i++, e.getImolRorIHSNumber()); // imo
ps.setString(i++, e.getMovementType()); // mvmn_type ps.setString(i++, e.getMovementType()); // mvmn_type
@ -171,17 +174,17 @@ public class ShipMovementRepositoryImpl extends BaseJdbcRepository<ShipMovementE
} }
@Override @Override
protected void setUpdateParameters(PreparedStatement ps, ShipMovementEntity entity) throws Exception { protected void setUpdateParameters(PreparedStatement ps, PortCallsEntity entity) throws Exception {
} }
@Override @Override
protected RowMapper<ShipMovementEntity> getRowMapper() { protected RowMapper<PortCallsEntity> getRowMapper() {
return new ShipMovementRowMapper(); return new ShipMovementRowMapper();
} }
@Override @Override
public void saveAll(List<ShipMovementEntity> entities) { public void saveAll(List<PortCallsEntity> entities) {
if (entities == null || entities.isEmpty()) return; if (entities == null || entities.isEmpty()) return;
log.info("ShipMovement 저장 시작 = {}건", entities.size()); log.info("ShipMovement 저장 시작 = {}건", entities.size());
@ -205,10 +208,10 @@ public class ShipMovementRepositoryImpl extends BaseJdbcRepository<ShipMovementE
/** /**
* ShipDetailEntity RowMapper * ShipDetailEntity RowMapper
*/ */
private static class ShipMovementRowMapper implements RowMapper<ShipMovementEntity> { private static class ShipMovementRowMapper implements RowMapper<PortCallsEntity> {
@Override @Override
public ShipMovementEntity mapRow(ResultSet rs, int rowNum) throws SQLException { public PortCallsEntity mapRow(ResultSet rs, int rowNum) throws SQLException {
ShipMovementEntity entity = ShipMovementEntity.builder() PortCallsEntity entity = PortCallsEntity.builder()
.id(rs.getLong("id")) .id(rs.getLong("id"))
.imolRorIHSNumber(rs.getString("imolRorIHSNumber")) .imolRorIHSNumber(rs.getString("imolRorIHSNumber"))
.portCallId(rs.getObject("portCallId", Integer.class)) .portCallId(rs.getObject("portCallId", Integer.class))

파일 보기

@ -0,0 +1,38 @@
package com.snp.batch.jobs.shipMovementPortCalls.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.shipMovementPortCalls.batch.entity.PortCallsEntity;
import com.snp.batch.jobs.shipMovementPortCalls.batch.repository.PortCallsRepository;
import com.snp.batch.jobs.shipdetail.batch.repository.ShipDetailRepository;
import com.snp.batch.jobs.shipdetail.batch.repository.ShipHashRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 선박 상세 정보 Writer
*/
@Slf4j
@Component
public class PortCallsWriter extends BaseWriter<PortCallsEntity> {
private final PortCallsRepository shipMovementRepository;
public PortCallsWriter(PortCallsRepository shipMovementRepository) {
super("ShipPortCalls");
this.shipMovementRepository = shipMovementRepository;
}
@Override
protected void writeItems(List<PortCallsEntity> items) throws Exception {
if (items.isEmpty()) { return; }
shipMovementRepository.saveAll(items);
log.info("PortCalls 데이터 저장 완료: {} 건", items.size());
}
}

파일 보기

@ -0,0 +1,117 @@
package com.snp.batch.jobs.shipMovementStsOperations.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.reader.DarkActivityRangeReader;
import com.snp.batch.jobs.shipMovementStsOperations.batch.dto.StsOperationDto;
import com.snp.batch.jobs.shipMovementStsOperations.batch.entity.StsOperationEntity;
import com.snp.batch.jobs.shipMovementStsOperations.batch.processor.StsOperationProcessor;
import com.snp.batch.jobs.shipMovementStsOperations.batch.reader.StsOperationRangeReader;
import com.snp.batch.jobs.shipMovementStsOperations.batch.writer.StsOperationWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* StsOperationReader (ship_data Maritime API)
* (StsOperationDto)
* StsOperationProcessor
* (StsOperationEntity)
* StsOperationWriter
* (t_stsoperation 테이블)
*/
@Slf4j
@Configuration
public class StsOperationRangeJobConfig extends BaseJobConfig<StsOperationDto, StsOperationEntity> {
private final StsOperationProcessor stsOperationProcessor;
private final StsOperationWriter stsOperationWriter;
private final StsOperationRangeReader stsOperationRangeReader;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public StsOperationRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
StsOperationProcessor stsOperationProcessor,
StsOperationWriter stsOperationWriter, StsOperationRangeReader stsOperationRangeReader, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.stsOperationProcessor = stsOperationProcessor;
this.stsOperationWriter = stsOperationWriter;
this.stsOperationRangeReader = stsOperationRangeReader;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "STSOperationRangeImportJob";
}
@Override
protected String getStepName() {
return "STSOperationRangeImportStep";
}
@Override
protected ItemReader<StsOperationDto> createReader() { // 타입 변경
// Reader 생성자 수정: ObjectMapper를 전달합니다.
return stsOperationRangeReader;
}
@Bean
@StepScope
public StsOperationRangeReader stsOperationRangeReader(
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new StsOperationRangeReader(maritimeApiWebClient, startDate, stopDate);
}
@Override
protected ItemProcessor<StsOperationDto, StsOperationEntity> createProcessor() {
return stsOperationProcessor;
}
@Override
protected ItemWriter<StsOperationEntity> createWriter() { // 타입 변경
return stsOperationWriter;
}
@Override
protected int getChunkSize() {
return 5000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "STSOperationRangeImportJob")
public Job STSOperationRangeImportJob() {
return job();
}
@Bean(name = "STSOperationRangeImportStep")
public Step STSOperationRangeImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,164 @@
package com.snp.batch.jobs.shipMovementStsOperations.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementStsOperations.batch.dto.StsOperationDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class StsOperationRangeReader extends BaseApiReader<StsOperationDto> {
private List<StsOperationDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 5000;
private String startDate;
private String stopDate;
public StsOperationRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "StsOperationReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/Movements/StsOperations";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<StsOperationDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null ) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate);
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int endIndex = Math.min(currentBatchIndex + batchSize, allData.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<StsOperationDto> batch = allData.subList(currentBatchIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), currentBatchNumber, totalBatches, batch.size());
currentBatchIndex = endIndex;
updateApiCallStats(totalBatches, currentBatchNumber);
return batch;
}
/**
* Query Parameter를 사용한 API 호출
*
* @param startDate,stopDate
* @return API 응답
*/
private List<StsOperationDto> callApiWithBatch(String startDate, String stopDate) {
String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(StsOperationDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<StsOperationDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
/*log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size());*/
}
}
}

파일 보기

@ -30,7 +30,8 @@ public class StsOperationRepositoryImpl extends BaseJdbcRepository<StsOperationE
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override @Override
protected String getTableName() { protected String getTableName() {
return "snp_data.t_stsoperation"; // return "snp_data.t_stsoperation";
return "new_snp.t_stsoperation";
} }
@Override @Override
@ -45,8 +46,10 @@ public class StsOperationRepositoryImpl extends BaseJdbcRepository<StsOperationE
@Override @Override
public String getInsertSql() { public String getInsertSql() {
// return """
// INSERT INTO snp_data.t_stsoperation(
return """ return """
INSERT INTO snp_data.t_stsoperation( INSERT INTO new_snp.t_stsoperation(
imo, imo,
mvmn_type, mvmn_type,
mvmn_dt, mvmn_dt,
@ -67,7 +70,7 @@ public class StsOperationRepositoryImpl extends BaseJdbcRepository<StsOperationE
evt_start_dt, evt_start_dt,
lcinfo lcinfo
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (imo, mvmn_type, mvmn_dt) ON CONFLICT (imo, mvmn_type, mvmn_dt, fclty_id)
DO UPDATE SET DO UPDATE SET
mvmn_type = EXCLUDED.mvmn_type, mvmn_type = EXCLUDED.mvmn_type,
mvmn_dt = EXCLUDED.mvmn_dt, mvmn_dt = EXCLUDED.mvmn_dt,

파일 보기

@ -0,0 +1,117 @@
package com.snp.batch.jobs.shipMovementTerminalCalls.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.reader.DarkActivityRangeReader;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto.TerminalCallsDto;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.entity.TerminalCallsEntity;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.processor.TerminalCallsProcessor;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.reader.TerminalCallsRangeReader;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.reader.TerminalCallsReader;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.writer.TerminalCallsWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* TerminalCallsReader (ship_data Maritime API)
* (TerminalCallsDto)
* TerminalCallsProcessor
* (TerminalCallsEntity)
* TerminalCallsWriter
* (t_terminalcall 테이블)
*/
@Slf4j
@Configuration
public class TerminalCallsRangeJobConfig extends BaseJobConfig<TerminalCallsDto, TerminalCallsEntity> {
private final TerminalCallsProcessor terminalCallsProcessor;
private final TerminalCallsWriter terminalCallsWriter;
private final TerminalCallsRangeReader terminalCallsRangeReader;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
public TerminalCallsRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
TerminalCallsProcessor terminalCallsProcessor,
TerminalCallsWriter terminalCallsWriter, TerminalCallsRangeReader terminalCallsRangeReader, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.terminalCallsProcessor = terminalCallsProcessor;
this.terminalCallsWriter = terminalCallsWriter;
this.terminalCallsRangeReader = terminalCallsRangeReader;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "TerminalCallsRangeImportJob";
}
@Override
protected String getStepName() {
return "TerminalCallsRangeImportStep";
}
@Override
protected ItemReader<TerminalCallsDto> createReader() { // 타입 변경
return terminalCallsRangeReader;
}
@Bean
@StepScope
public TerminalCallsRangeReader terminalCallsRangeReader(
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new TerminalCallsRangeReader(maritimeApiWebClient, startDate, stopDate);
}
@Override
protected ItemProcessor<TerminalCallsDto, TerminalCallsEntity> createProcessor() {
return terminalCallsProcessor;
}
@Override
protected ItemWriter<TerminalCallsEntity> createWriter() { // 타입 변경
return terminalCallsWriter;
}
@Override
protected int getChunkSize() {
return 1000; // API에서 100개씩 가져오므로 chunk도 1000으로 설정
}
@Bean(name = "TerminalCallsRangeImportJob")
public Job terminalCallsRangeImportJob() {
return job();
}
@Bean(name = "TerminalCallsRangeImportStep")
public Step terminalCallsRangeImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,162 @@
package com.snp.batch.jobs.shipMovementTerminalCalls.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.dto.DarkActivityDto;
import com.snp.batch.jobs.shipMovementTerminalCalls.batch.dto.TerminalCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
* <p>
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
* <p>
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
* <p>
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class TerminalCallsRangeReader extends BaseApiReader<TerminalCallsDto> {
private List<TerminalCallsDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 1000;
private String startDate;
private String stopDate;
public TerminalCallsRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "TerminalCalls";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/Movements/TerminalCalls";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
* <p>
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<TerminalCallsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null ) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate);
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int endIndex = Math.min(currentBatchIndex + batchSize, allData.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<TerminalCallsDto> batch = allData.subList(currentBatchIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), currentBatchNumber, totalBatches, batch.size());
currentBatchIndex = endIndex;
updateApiCallStats(totalBatches, currentBatchNumber);
return batch;
}
/**
* Query Parameter를 사용한 API 호출
* @param startDate, stopDate
* @return API 응답
*/
private List<TerminalCallsDto> callApiWithBatch(String startDate, String stopDate) {
String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(TerminalCallsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<TerminalCallsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
}
}
}

파일 보기

@ -30,7 +30,8 @@ public class TerminalCallsRepositoryImpl extends BaseJdbcRepository<TerminalCall
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override @Override
protected String getTableName() { protected String getTableName() {
return "snp_data.t_terminalcall"; // return "snp_data.t_terminalcall";
return "new_snp.t_terminalcall";
} }
@Override @Override
@ -45,8 +46,10 @@ public class TerminalCallsRepositoryImpl extends BaseJdbcRepository<TerminalCall
@Override @Override
public String getInsertSql() { public String getInsertSql() {
// return """
// INSERT INTO snp_data.t_terminalcall(
return """ return """
INSERT INTO snp_data.t_terminalcall( INSERT INTO new_snp.t_terminalcall(
imo, imo,
mvmn_type, mvmn_type,
mvmn_dt, mvmn_dt,

파일 보기

@ -0,0 +1,115 @@
package com.snp.batch.jobs.shipMovementTransits.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.shipMovementDarkActivity.batch.reader.DarkActivityRangeReader;
import com.snp.batch.jobs.shipMovementTransits.batch.dto.TransitsDto;
import com.snp.batch.jobs.shipMovementTransits.batch.entity.TransitsEntity;
import com.snp.batch.jobs.shipMovementTransits.batch.processor.TransitsProcessor;
import com.snp.batch.jobs.shipMovementTransits.batch.reader.TransitsRangeReader;
import com.snp.batch.jobs.shipMovementTransits.batch.reader.TransitsReader;
import com.snp.batch.jobs.shipMovementTransits.batch.writer.TransitsWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* TransitsReader (ship_data Maritime API)
* (TransitsDto)
* TransitsProcessor
* (TransitsEntity)
* TransitsWriter
* (t_transit 테이블)
*/
@Slf4j
@Configuration
public class TransitsRangeJobConfig extends BaseJobConfig<TransitsDto, TransitsEntity> {
private final TransitsProcessor transitsProcessor;
private final TransitsWriter transitsWriter;
private final TransitsRangeReader transitsRangeReader;
private final WebClient maritimeApiWebClient;
public TransitsRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
TransitsProcessor TransitsProcessor,
TransitsWriter transitsWriter, TransitsRangeReader transitsRangeReader,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager);
this.transitsProcessor = TransitsProcessor;
this.transitsWriter = transitsWriter;
this.transitsRangeReader = transitsRangeReader;
this.maritimeApiWebClient = maritimeApiWebClient;
}
@Override
protected String getJobName() {
return "TransitsRangeImportJob";
}
@Override
protected String getStepName() {
return "TransitsRangeImportStep";
}
@Override
protected ItemReader<TransitsDto> createReader() { // 타입 변경
return transitsRangeReader;
}
@Bean
@StepScope
public TransitsRangeReader transitsRangeReader(
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new TransitsRangeReader(maritimeApiWebClient, startDate, stopDate);
}
@Override
protected ItemProcessor<TransitsDto, TransitsEntity> createProcessor() {
return transitsProcessor;
}
@Override
protected ItemWriter<TransitsEntity> createWriter() { // 타입 변경
return transitsWriter;
}
@Override
protected int getChunkSize() {
return 1000; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "TransitsRangeImportJob")
public Job transitsRangeImportJob() {
return job();
}
@Bean(name = "TransitsRangeImportStep")
public Step transitsRangeImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,159 @@
package com.snp.batch.jobs.shipMovementTransits.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.shipMovementTransits.batch.dto.TransitsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j
@StepScope
public class TransitsRangeReader extends BaseApiReader<TransitsDto> {
private List<TransitsDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 1000;
private String startDate;
private String stopDate;
public TransitsRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "Transits";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/Movements/Transits";
}
@Override
protected String getApiBaseUrl() {
return "https://webservices.maritime.spglobal.com";
}
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override
protected List<TransitsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null ) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate);
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int endIndex = Math.min(currentBatchIndex + batchSize, allData.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<TransitsDto> batch = allData.subList(currentBatchIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), currentBatchNumber, totalBatches, batch.size());
currentBatchIndex = endIndex;
updateApiCallStats(totalBatches, currentBatchNumber);
return batch;
}
/**
* Query Parameter를 사용한 API 호출
* @param startDate,stopDate
* @return API 응답
*/
private List<TransitsDto> callApiWithBatch(String startDate, String stopDate) {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate;
// +"&lrno=" + lrno;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(TransitsDto.class)
.collectList()
.block();
}
@Override
protected void afterFetch(List<TransitsDto> data) {
if (data == null) {
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
}
}
}

파일 보기

@ -18,16 +18,17 @@ import java.util.List;
*/ */
@Slf4j @Slf4j
@Repository("TransitsRepository") @Repository("TransitsRepository")
public class TransitlsRepositoryImpl extends BaseJdbcRepository<TransitsEntity, String> public class TransitsRepositoryImpl extends BaseJdbcRepository<TransitsEntity, String>
implements TransitsRepository { implements TransitsRepository {
public TransitlsRepositoryImpl(JdbcTemplate jdbcTemplate) { public TransitsRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate); super(jdbcTemplate);
} }
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override @Override
protected String getTableName() { protected String getTableName() {
return "snp_data.t_transit"; // return "snp_data.t_transit";
return "new_snp.t_transit";
} }
@Override @Override
@ -42,8 +43,10 @@ public class TransitlsRepositoryImpl extends BaseJdbcRepository<TransitsEntity,
@Override @Override
public String getInsertSql() { public String getInsertSql() {
// return """
// INSERT INTO snp_data.t_transit(
return """ return """
INSERT INTO snp_data.t_transit( INSERT INTO new_snp.t_transit(
imo, imo,
mvmn_type, mvmn_type,
mvmn_dt, mvmn_dt,

파일 보기

@ -4,7 +4,7 @@ spring:
# PostgreSQL Database Configuration # PostgreSQL Database Configuration
datasource: datasource:
url: jdbc:postgresql://211.208.115.83:5432/snpdb?currentSchema=snp_data,public url: jdbc:postgresql://211.208.115.83:5432/snpdb
username: snp username: snp
password: snp#8932 password: snp#8932
driver-class-name: org.postgresql.Driver driver-class-name: org.postgresql.Driver
@ -49,13 +49,14 @@ spring:
org.quartz.threadPool.threadCount: 10 org.quartz.threadPool.threadCount: 10
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.tablePrefix: QRTZ_ org.quartz.jobStore.tablePrefix: snp_data.QRTZ_
org.quartz.jobStore.isClustered: false org.quartz.jobStore.isClustered: false
org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.misfireThreshold: 60000
# Server Configuration # Server Configuration
server: server:
port: 8041 port: 8081
# port: 8041
servlet: servlet:
context-path: /snp-api context-path: /snp-api

파일 보기

@ -4,7 +4,7 @@ spring:
# PostgreSQL Database Configuration # PostgreSQL Database Configuration
datasource: datasource:
url: jdbc:postgresql://10.187.58.58:5432/mdadb?currentSchema=snp_data,public url: jdbc:postgresql://10.187.58.58:5432/mdadb
username: mda username: mda
password: mda#8932 password: mda#8932
driver-class-name: org.postgresql.Driver driver-class-name: org.postgresql.Driver
@ -55,7 +55,8 @@ spring:
# Server Configuration # Server Configuration
server: server:
port: 8041 port: 9000
# port: 8041
servlet: servlet:
context-path: /snp-api context-path: /snp-api

파일 보기

@ -4,7 +4,7 @@ spring:
# PostgreSQL Database Configuration # PostgreSQL Database Configuration
datasource: datasource:
url: jdbc:postgresql://211.208.115.83:5432/snpdb?currentSchema=snp_data,public url: jdbc:postgresql://211.208.115.83:5432/snpdb
username: snp username: snp
password: snp#8932 password: snp#8932
driver-class-name: org.postgresql.Driver driver-class-name: org.postgresql.Driver
@ -49,7 +49,7 @@ spring:
org.quartz.threadPool.threadCount: 10 org.quartz.threadPool.threadCount: 10
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.tablePrefix: QRTZ_ org.quartz.jobStore.tablePrefix: snp_data.QRTZ_
org.quartz.jobStore.isClustered: false org.quartz.jobStore.isClustered: false
org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.misfireThreshold: 60000