Risk&Compliance Data Import Job 개발

This commit is contained in:
hyojin kim 2025-11-27 21:55:46 +09:00
부모 6be90723b4
커밋 906611c9b8
29개의 변경된 파일2200개의 추가작업 그리고 21개의 파일을 삭제

파일 보기

@ -29,9 +29,13 @@ public class MaritimeApiWebClientConfig {
@Value("${app.batch.ship-api.url}")
private String maritimeApiUrl;
@Value("https://aisapi.maritime.spglobal.com")
@Value("${app.batch.ais-api.url}")
private String maritimeAisApiUrl;
@Value("${app.batch.webservice-api.url}")
private String maritimeServiceApiUrl;
@Value("${app.batch.ship-api.username}")
private String maritimeApiUsername;
@ -79,6 +83,22 @@ public class MaritimeApiWebClientConfig {
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼
.build();
}
@Bean(name = "maritimeServiceApiWebClient")
public WebClient maritimeServiceApiWebClient(){
log.info("========================================");
log.info("Maritime AIS API WebClient 생성");
log.info("Base URL: {}", maritimeServiceApiUrl);
log.info("========================================");
return WebClient.builder()
.baseUrl(maritimeServiceApiUrl)
.defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword))
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼
.build();
}
}

파일 보기

@ -0,0 +1,84 @@
package com.snp.batch.jobs.risk.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor;
import com.snp.batch.jobs.risk.batch.reader.RiskDataReader;
import com.snp.batch.jobs.risk.batch.writer.RiskDataWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class RiskImportJobConfig extends BaseJobConfig<RiskDto, RiskEntity> {
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeServiceApiWebClient;
private final RiskDataProcessor riskDataProcessor;
private final RiskDataWriter riskDataWriter;
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
public RiskImportJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
RiskDataProcessor riskDataProcessor,
RiskDataWriter riskDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient) {
super(jobRepository, transactionManager);
this.jdbcTemplate = jdbcTemplate;
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
this.riskDataProcessor = riskDataProcessor;
this.riskDataWriter = riskDataWriter;
}
@Override
protected String getJobName() {
return "riskImportJob";
}
@Override
protected String getStepName() {
return "riskImportStep";
}
@Override
protected ItemReader<RiskDto> createReader() {
return new RiskDataReader(maritimeServiceApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<RiskDto, RiskEntity> createProcessor() {
return riskDataProcessor;
}
@Override
protected ItemWriter<RiskEntity> createWriter() { return riskDataWriter; }
@Bean(name = "riskImportJob")
public Job riskImportJob() {
return job();
}
@Bean(name = "riskImportStep")
public Step riskImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,277 @@
package com.snp.batch.jobs.risk.batch.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RiskDto {
// 1. Vessel and General Information
@JsonProperty("lrno")
private String lrno;
@JsonProperty("lastUpdated")
private String lastUpdated;
@JsonProperty("riskDataMaintained")
private Integer riskDataMaintained;
// 2. AIS/Tracking Risk
@JsonProperty("daysSinceLastSeenOnAIS")
private Integer daysSinceLastSeenOnAIS;
@JsonProperty("daysSinceLastSeenOnAISNarrative")
private String daysSinceLastSeenOnAISNarrative;
@JsonProperty("daysUnderAIS")
private Integer daysUnderAIS;
@JsonProperty("daysUnderAISNarrative")
private String daysUnderAISNarrative;
@JsonProperty("imoCorrectOnAIS")
private Integer imoCorrectOnAIS;
@JsonProperty("imoCorrectOnAISNarrative")
private String imoCorrectOnAISNarrative;
@JsonProperty("sailingUnderName")
private Integer sailingUnderName;
@JsonProperty("sailingUnderNameNarrative")
private String sailingUnderNameNarrative;
@JsonProperty("anomalousMessagesFromMMSI")
private Integer anomalousMessagesFromMMSI;
@JsonProperty("anomalousMessagesFromMMSINarrative")
private String anomalousMessagesFromMMSINarrative;
@JsonProperty("mostRecentDarkActivity")
private Integer mostRecentDarkActivity;
@JsonProperty("mostRecentDarkActivityNarrative")
private String mostRecentDarkActivityNarrative;
// 3. Operational & History Risk
@JsonProperty("portCalls")
private Integer portCalls;
@JsonProperty("portCallsNarrative")
private String portCallsNarrative;
@JsonProperty("portRisk")
private Integer portRisk;
@JsonProperty("portRiskNarrative")
private String portRiskNarrative;
@JsonProperty("stsOperations")
private Integer stsOperations;
@JsonProperty("stsOperationsNarrative")
private String stsOperationsNarrative;
@JsonProperty("driftingHighSeas")
private Integer driftingHighSeas;
@JsonProperty("driftingHighSeasNarrative")
private String driftingHighSeasNarrative;
@JsonProperty("riskEvents")
private Integer riskEvents;
@JsonProperty("riskEventNarrative")
private String riskEventNarrative;
@JsonProperty("riskEventNarrativeExtended")
private String riskEventNarrativeExtended;
@JsonProperty("flagChanges")
private Integer flagChanges;
@JsonProperty("flagChangeNarrative")
private String flagChangeNarrative;
// 4. PSC (Port State Control) & Flag Risk
@JsonProperty("flagParisMOUPerformance")
private Integer flagParisMOUPerformance;
@JsonProperty("flagParisMOUPerformanceNarrative")
private String flagParisMOUPerformanceNarrative;
@JsonProperty("flagTokyoMOUPeformance")
private Integer flagTokyoMOUPeformance;
@JsonProperty("flagTokyoMOUPeformanceNarrative")
private String flagTokyoMOUPeformanceNarrative;
@JsonProperty("flagUSCGMOUPerformance")
private Integer flagUSCGMOUPerformance;
@JsonProperty("flagUSCGMOUPerformanceNarrative")
private String flagUSCGMOUPerformanceNarrative;
@JsonProperty("uscgQualship21")
private Integer uscgQualship21;
@JsonProperty("uscgQualship21Narrative")
private String uscgQualship21Narrative;
@JsonProperty("timeSincePSCInspection")
private Integer timeSincePSCInspection;
@JsonProperty("timeSincePSCInspectionNarrative")
private String timeSincePSCInspectionNarrative;
@JsonProperty("pscInspections")
private Integer pscInspections;
@JsonProperty("pscInspectionNarrative")
private String pscInspectionNarrative;
@JsonProperty("pscDefects")
private Integer pscDefects;
@JsonProperty("pscDefectsNarrative")
private String pscDefectsNarrative;
@JsonProperty("pscDetentions")
private Integer pscDetentions;
@JsonProperty("pscDetentionsNarrative")
private String pscDetentionsNarrative;
// 5. Certification & Class Risk
@JsonProperty("currentSMCCertificate")
private Integer currentSMCCertificate;
@JsonProperty("currentSMCCertificateNarrative")
private String currentSMCCertificateNarrative;
@JsonProperty("docChanges")
private Integer docChanges;
@JsonProperty("docChangesNarrative")
private String docChangesNarrative;
@JsonProperty("currentClass")
private Integer currentClass;
@JsonProperty("currentClassNarrative")
private String currentClassNarrative;
@JsonProperty("currentClassNarrativeExtended")
private String currentClassNarrativeExtended;
@JsonProperty("classStatusChanges")
private Integer classStatusChanges;
@JsonProperty("classStatusChangesNarrative")
private String classStatusChangesNarrative;
// 6. Ownership & Financial Risk
@JsonProperty("pandICoverage")
private Integer pandICoverage;
@JsonProperty("pandICoverageNarrative")
private String pandICoverageNarrative;
@JsonProperty("pandICoverageNarrativeExtended")
private String pandICoverageNarrativeExtended;
@JsonProperty("nameChanges")
private Integer nameChanges;
@JsonProperty("nameChangesNarrative")
private String nameChangesNarrative;
@JsonProperty("gboChanges")
private Integer gboChanges;
@JsonProperty("gboChangesNarrative")
private String gboChangesNarrative;
@JsonProperty("ageOfShip")
private Integer ageOfShip;
@JsonProperty("ageofShipNarrative")
private String ageofShipNarrative;
// 7. Sanctions & Specialized Risk
@JsonProperty("iuuFishingViolation")
private Integer iuuFishingViolation;
@JsonProperty("iuuFishingNarrative")
private String iuuFishingNarrative; // null 포함
@JsonProperty("draughtChanges")
private Integer draughtChanges;
@JsonProperty("draughtChangesNarrative")
private String draughtChangesNarrative;
@JsonProperty("mostRecentSanctionedPortCall")
private Integer mostRecentSanctionedPortCall;
@JsonProperty("mostRecentSanctionedPortCallNarrative")
private String mostRecentSanctionedPortCallNarrative; // null 포함
@JsonProperty("singleShipOperation")
private Integer singleShipOperation;
@JsonProperty("singleShipOperationNarrative")
private String singleShipOperationNarrative;
@JsonProperty("fleetSafety")
private Integer fleetSafety;
@JsonProperty("fleetSafetyNarrative")
private String fleetSafetyNarrative;
@JsonProperty("fleetPSC")
private Integer fleetPSC;
@JsonProperty("fleetPSCNarrative")
private String fleetPSCNarrative;
// 8. Survey & Other Risk
@JsonProperty("specialSurveyOverdue")
private Integer specialSurveyOverdue;
@JsonProperty("specialSurveyOverdueNarrative")
private String specialSurveyOverdueNarrative;
@JsonProperty("ownerUnknown")
private Integer ownerUnknown;
@JsonProperty("ownerUnknownNarrative")
private String ownerUnknownNarrative;
@JsonProperty("russianPortCall")
private Integer russianPortCall;
@JsonProperty("russianPortCallNarrative")
private String russianPortCallNarrative;
@JsonProperty("russianOwnerRegistration")
private Integer russianOwnerRegistration;
@JsonProperty("russianOwnerRegistrationNarrative")
private String russianOwnerRegistrationNarrative;
@JsonProperty("russianSTS")
private Integer russianSTS;
@JsonProperty("russianSTSNarrative")
private String russianSTSNarrative;
}

파일 보기

@ -0,0 +1,16 @@
package com.snp.batch.jobs.risk.batch.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RiskResponse {
private List<RiskDto> riskDtoList;
}

파일 보기

@ -0,0 +1,189 @@
package com.snp.batch.jobs.risk.batch.entity;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class RiskEntity extends BaseEntity {
private String lrno;
private String lastUpdated;
private Integer riskDataMaintained;
private Integer daysSinceLastSeenOnAIS;
private String daysSinceLastSeenOnAISNarrative;
private Integer daysUnderAIS;
private String daysUnderAISNarrative;
private Integer imoCorrectOnAIS;
private String imoCorrectOnAISNarrative;
private Integer sailingUnderName;
private String sailingUnderNameNarrative;
private Integer anomalousMessagesFromMMSI;
private String anomalousMessagesFromMMSINarrative;
private Integer mostRecentDarkActivity;
private String mostRecentDarkActivityNarrative;
private Integer portCalls;
private String portCallsNarrative;
private Integer portRisk;
private String portRiskNarrative;
private Integer stsOperations;
private String stsOperationsNarrative;
private Integer driftingHighSeas;
private String driftingHighSeasNarrative;
private Integer riskEvents;
private String riskEventNarrative;
private String riskEventNarrativeExtended;
private Integer flagChanges;
private String flagChangeNarrative;
private Integer flagParisMOUPerformance;
private String flagParisMOUPerformanceNarrative;
private Integer flagTokyoMOUPeformance;
private String flagTokyoMOUPeformanceNarrative;
private Integer flagUSCGMOUPerformance;
private String flagUSCGMOUPerformanceNarrative;
private Integer uscgQualship21;
private String uscgQualship21Narrative;
private Integer timeSincePSCInspection;
private String timeSincePSCInspectionNarrative;
private Integer pscInspections;
private String pscInspectionNarrative;
private Integer pscDefects;
private String pscDefectsNarrative;
private Integer pscDetentions;
private String pscDetentionsNarrative;
private Integer currentSMCCertificate;
private String currentSMCCertificateNarrative;
private Integer docChanges;
private String docChangesNarrative;
private Integer currentClass;
private String currentClassNarrative;
private String currentClassNarrativeExtended;
private Integer classStatusChanges;
private String classStatusChangesNarrative;
private Integer pandICoverage;
private String pandICoverageNarrative;
private String pandICoverageNarrativeExtended;
private Integer nameChanges;
private String nameChangesNarrative;
private Integer gboChanges;
private String gboChangesNarrative;
private Integer ageOfShip;
private String ageofShipNarrative;
private Integer iuuFishingViolation;
private String iuuFishingNarrative; // null 포함
private Integer draughtChanges;
private String draughtChangesNarrative;
private Integer mostRecentSanctionedPortCall;
private String mostRecentSanctionedPortCallNarrative; // null 포함
private Integer singleShipOperation;
private String singleShipOperationNarrative;
private Integer fleetSafety;
private String fleetSafetyNarrative;
private Integer fleetPSC;
private String fleetPSCNarrative;
private Integer specialSurveyOverdue;
private String specialSurveyOverdueNarrative;
private Integer ownerUnknown;
private String ownerUnknownNarrative;
private Integer russianPortCall;
private String russianPortCallNarrative;
private Integer russianOwnerRegistration;
private String russianOwnerRegistrationNarrative;
private Integer russianSTS;
private String russianSTSNarrative;
}

파일 보기

@ -0,0 +1,122 @@
package com.snp.batch.jobs.risk.batch.processor;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RiskDataProcessor extends BaseProcessor<RiskDto, RiskEntity> {
@Override
protected RiskEntity processItem(RiskDto dto) throws Exception {
log.debug("Risk 데이터 처리 시작: imoNumber={}", dto.getLrno());
RiskEntity entity = RiskEntity.builder()
// 1. Vessel and General Information
.lrno(dto.getLrno())
.lastUpdated(dto.getLastUpdated())
.riskDataMaintained(dto.getRiskDataMaintained())
// 2. AIS/Tracking Risk
.daysSinceLastSeenOnAIS(dto.getDaysSinceLastSeenOnAIS())
.daysSinceLastSeenOnAISNarrative(dto.getDaysSinceLastSeenOnAISNarrative())
.daysUnderAIS(dto.getDaysUnderAIS())
.daysUnderAISNarrative(dto.getDaysUnderAISNarrative())
.imoCorrectOnAIS(dto.getImoCorrectOnAIS())
.imoCorrectOnAISNarrative(dto.getImoCorrectOnAISNarrative())
.sailingUnderName(dto.getSailingUnderName())
.sailingUnderNameNarrative(dto.getSailingUnderNameNarrative())
.anomalousMessagesFromMMSI(dto.getAnomalousMessagesFromMMSI())
.anomalousMessagesFromMMSINarrative(dto.getAnomalousMessagesFromMMSINarrative())
.mostRecentDarkActivity(dto.getMostRecentDarkActivity())
.mostRecentDarkActivityNarrative(dto.getMostRecentDarkActivityNarrative())
// 3. Operational & History Risk
.portCalls(dto.getPortCalls())
.portCallsNarrative(dto.getPortCallsNarrative())
.portRisk(dto.getPortRisk())
.portRiskNarrative(dto.getPortRiskNarrative())
.stsOperations(dto.getStsOperations())
.stsOperationsNarrative(dto.getStsOperationsNarrative())
.driftingHighSeas(dto.getDriftingHighSeas())
.driftingHighSeasNarrative(dto.getDriftingHighSeasNarrative())
.riskEvents(dto.getRiskEvents())
.riskEventNarrative(dto.getRiskEventNarrative())
.riskEventNarrativeExtended(dto.getRiskEventNarrativeExtended())
.flagChanges(dto.getFlagChanges())
.flagChangeNarrative(dto.getFlagChangeNarrative())
// 4. PSC (Port State Control) & Flag Risk
.flagParisMOUPerformance(dto.getFlagParisMOUPerformance())
.flagParisMOUPerformanceNarrative(dto.getFlagParisMOUPerformanceNarrative())
.flagTokyoMOUPeformance(dto.getFlagTokyoMOUPeformance())
.flagTokyoMOUPeformanceNarrative(dto.getFlagTokyoMOUPeformanceNarrative())
.flagUSCGMOUPerformance(dto.getFlagUSCGMOUPerformance())
.flagUSCGMOUPerformanceNarrative(dto.getFlagUSCGMOUPerformanceNarrative())
.uscgQualship21(dto.getUscgQualship21())
.uscgQualship21Narrative(dto.getUscgQualship21Narrative())
.timeSincePSCInspection(dto.getTimeSincePSCInspection())
.timeSincePSCInspectionNarrative(dto.getTimeSincePSCInspectionNarrative())
.pscInspections(dto.getPscInspections())
.pscInspectionNarrative(dto.getPscInspectionNarrative())
.pscDefects(dto.getPscDefects())
.pscDefectsNarrative(dto.getPscDefectsNarrative())
.pscDetentions(dto.getPscDetentions())
.pscDetentionsNarrative(dto.getPscDetentionsNarrative())
// 5. Certification & Class Risk
.currentSMCCertificate(dto.getCurrentSMCCertificate())
.currentSMCCertificateNarrative(dto.getCurrentSMCCertificateNarrative())
.docChanges(dto.getDocChanges())
.docChangesNarrative(dto.getDocChangesNarrative())
.currentClass(dto.getCurrentClass())
.currentClassNarrative(dto.getCurrentClassNarrative())
.currentClassNarrativeExtended(dto.getCurrentClassNarrativeExtended())
.classStatusChanges(dto.getClassStatusChanges())
.classStatusChangesNarrative(dto.getClassStatusChangesNarrative())
// 6. Ownership & Financial Risk
.pandICoverage(dto.getPandICoverage())
.pandICoverageNarrative(dto.getPandICoverageNarrative())
.pandICoverageNarrativeExtended(dto.getPandICoverageNarrativeExtended())
.nameChanges(dto.getNameChanges())
.nameChangesNarrative(dto.getNameChangesNarrative())
.gboChanges(dto.getGboChanges())
.gboChangesNarrative(dto.getGboChangesNarrative())
.ageOfShip(dto.getAgeOfShip())
.ageofShipNarrative(dto.getAgeofShipNarrative())
// 7. Sanctions & Specialized Risk
.iuuFishingViolation(dto.getIuuFishingViolation())
.iuuFishingNarrative(dto.getIuuFishingNarrative())
.draughtChanges(dto.getDraughtChanges())
.draughtChangesNarrative(dto.getDraughtChangesNarrative())
.mostRecentSanctionedPortCall(dto.getMostRecentSanctionedPortCall())
.mostRecentSanctionedPortCallNarrative(dto.getMostRecentSanctionedPortCallNarrative())
.singleShipOperation(dto.getSingleShipOperation())
.singleShipOperationNarrative(dto.getSingleShipOperationNarrative())
.fleetSafety(dto.getFleetSafety())
.fleetSafetyNarrative(dto.getFleetSafetyNarrative())
.fleetPSC(dto.getFleetPSC())
.fleetPSCNarrative(dto.getFleetPSCNarrative())
// 8. Survey & Other Risk
.specialSurveyOverdue(dto.getSpecialSurveyOverdue())
.specialSurveyOverdueNarrative(dto.getSpecialSurveyOverdueNarrative())
.ownerUnknown(dto.getOwnerUnknown())
.ownerUnknownNarrative(dto.getOwnerUnknownNarrative())
.russianPortCall(dto.getRussianPortCall())
.russianPortCallNarrative(dto.getRussianPortCallNarrative())
.russianOwnerRegistration(dto.getRussianOwnerRegistration())
.russianOwnerRegistrationNarrative(dto.getRussianOwnerRegistrationNarrative())
.russianSTS(dto.getRussianSTS())
.russianSTSNarrative(dto.getRussianSTSNarrative())
.build();
log.debug("Risk 데이터 처리 완료: imoNumber={}", dto.getLrno());
return entity;
}
}

파일 보기

@ -0,0 +1,144 @@
package com.snp.batch.jobs.risk.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
@Slf4j
public class RiskDataReader extends BaseApiReader<RiskDto> {
//TODO :
// 1. Core20 IMO_NUMBER 전체 조회
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
private final JdbcTemplate jdbcTemplate;
private List<String> allImoNumbers;
private int currentBatchIndex = 0;
private final int batchSize = 100;
public RiskDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "riskDataReader";
}
@Override
protected String getApiPath() {
return "/RiskAndCompliance/RisksByImos";
}
// private String getTargetTable(){
// return "snp_data.core20";
// }
private String getTargetTable(){
return "snp_data.ship_data";
}
private String GET_CORE_IMO_LIST =
// "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno";
"select imo_number as ihslrorimoshipno from snp_data.ship_data order by imo_number";
@Override
protected void beforeFetch(){
log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_CORE_IMO_LIST, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
updateApiCallStats(totalBatches, 0);
}
@Override
protected List<RiskDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<RiskDto> response = callAisApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null) {
// List<ComplianceDto> targets = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, response.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return response;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
private List<RiskDto> callAisApiWithBatch(String imoNumbers) {
String url = getApiPath() + "?imos=" + imoNumbers;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<RiskDto>>() {})
.block();
}
}

파일 보기

@ -0,0 +1,9 @@
package com.snp.batch.jobs.risk.batch.repository;
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
import java.util.List;
public interface RiskRepository {
void saveRiskAll(List<RiskEntity> items);
}

파일 보기

@ -0,0 +1,271 @@
package com.snp.batch.jobs.risk.batch.repository;
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.util.List;
@Slf4j
@Repository("riskRepository")
public class RiskRepositoryImpl extends BaseJdbcRepository<RiskEntity, Long> implements RiskRepository {
public RiskRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate);
}
@Override
protected String getTableName() {
return null;
}
@Override
protected RowMapper<RiskEntity> getRowMapper() {
return null;
}
@Override
protected Long extractId(RiskEntity entity) {
return null;
}
@Override
protected String getInsertSql() {
return null;
}
@Override
protected String getUpdateSql() {
return """
INSERT INTO snp_data.risk (
lrno, lastupdated, riskdatamaintained, dayssincelastseenonais, dayssincelastseenonaisnarrative,
daysunderais, daysunderaisnarrative, imocorrectonais, imocorrectonaisnarrative, sailingundername,
sailingundernamenarrative, anomalousmessagesfrommmsi, anomalousmessagesfrommmsinarrative,
mostrecentdarkactivity, mostrecentdarkactivitynarrative, portcalls, portcallsnarrative, portrisk,
portrisknarrative, stsoperations, stsoperationsnarrative, driftinghighseas, driftinghighseasnarrative,
riskevents, riskeventnarrative, riskeventnarrativeextended, flagchanges, flagchangenarrative,
flagparismouperformance, flagparismouperformancenarrative, flagtokyomoupeformance, flagtokyomoupeformancenarrative,
flaguscgmouperformance, flaguscgmouperformancenarrative, uscgqualship21, uscgqualship21narrative,
timesincepscinspection, timesincepscinspectionnarrative, pscinspections, pscinspectionnarrative,
pscdefects, pscdefectsnarrative, pscdetentions, pscdetentionsnarrative, currentsmccertificate,
currentsmccertificatenarrative, docchanges, docchangesnarrative, currentclass, currentclassnarrative,
currentclassnarrativeextended, classstatuschanges, classstatuschangesnarrative, pandicoverage,
pandicoveragenarrative, pandicoveragenarrativeextended, namechanges, namechangesnarrative, gbochanges,
gbochangesnarrative, ageofship, ageofshipnarrative, iuufishingviolation, iuufishingnarrative,
draughtchanges, draughtchangesnarrative, mostrecentsanctionedportcall, mostrecentsanctionedportcallnarrative,
singleshipoperation, singleshipoperationnarrative, fleetsafety, fleetsafetynarrative, fleetpsc,
fleetpscnarrative, specialsurveyoverdue, specialsurveyoverduenarrative, ownerunknown, ownerunknownnarrative,
russianportcall, russianportcallnarrative, russianownerregistration, russianownerregistrationnarrative,
russiansts, russianstsnarrative
)
VALUES (
?, ?::timestamptz, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
ON CONFLICT (lrno, lastupdated)
DO UPDATE SET
riskdatamaintained = EXCLUDED.riskdatamaintained,
dayssincelastseenonais = EXCLUDED.dayssincelastseenonais,
dayssincelastseenonaisnarrative = EXCLUDED.dayssincelastseenonaisnarrative,
daysunderais = EXCLUDED.daysunderais,
daysunderaisnarrative = EXCLUDED.daysunderaisnarrative,
imocorrectonais = EXCLUDED.imocorrectonais,
imocorrectonaisnarrative = EXCLUDED.imocorrectonaisnarrative,
sailingundername = EXCLUDED.sailingundername,
sailingundernamenarrative = EXCLUDED.sailingundernamenarrative,
anomalousmessagesfrommmsi = EXCLUDED.anomalousmessagesfrommmsi,
anomalousmessagesfrommmsinarrative = EXCLUDED.anomalousmessagesfrommmsinarrative,
mostrecentdarkactivity = EXCLUDED.mostrecentdarkactivity,
mostrecentdarkactivitynarrative = EXCLUDED.mostrecentdarkactivitynarrative,
portcalls = EXCLUDED.portcalls,
portcallsnarrative = EXCLUDED.portcallsnarrative,
portrisk = EXCLUDED.portrisk,
portrisknarrative = EXCLUDED.portrisknarrative,
stsoperations = EXCLUDED.stsoperations,
stsoperationsnarrative = EXCLUDED.stsoperationsnarrative,
driftinghighseas = EXCLUDED.driftinghighseas,
driftinghighseasnarrative = EXCLUDED.driftinghighseasnarrative,
riskevents = EXCLUDED.riskevents,
riskeventnarrative = EXCLUDED.riskeventnarrative,
riskeventnarrativeextended = EXCLUDED.riskeventnarrativeextended,
flagchanges = EXCLUDED.flagchanges,
flagchangenarrative = EXCLUDED.flagchangenarrative,
flagparismouperformance = EXCLUDED.flagparismouperformance,
flagparismouperformancenarrative = EXCLUDED.flagparismouperformancenarrative,
flagtokyomoupeformance = EXCLUDED.flagtokyomoupeformance,
flagtokyomoupeformancenarrative = EXCLUDED.flagtokyomoupeformancenarrative,
flaguscgmouperformance = EXCLUDED.flaguscgmouperformance,
flaguscgmouperformancenarrative = EXCLUDED.flaguscgmouperformancenarrative,
uscgqualship21 = EXCLUDED.uscgqualship21,
uscgqualship21narrative = EXCLUDED.uscgqualship21narrative,
timesincepscinspection = EXCLUDED.timesincepscinspection,
timesincepscinspectionnarrative = EXCLUDED.timesincepscinspectionnarrative,
pscinspections = EXCLUDED.pscinspections,
pscinspectionnarrative = EXCLUDED.pscinspectionnarrative,
pscdefects = EXCLUDED.pscdefects,
pscdefectsnarrative = EXCLUDED.pscdefectsnarrative,
pscdetentions = EXCLUDED.pscdetentions,
pscdetentionsnarrative = EXCLUDED.pscdetentionsnarrative,
currentsmccertificate = EXCLUDED.currentsmccertificate,
currentsmccertificatenarrative = EXCLUDED.currentsmccertificatenarrative,
docchanges = EXCLUDED.docchanges,
docchangesnarrative = EXCLUDED.docchangesnarrative,
currentclass = EXCLUDED.currentclass,
currentclassnarrative = EXCLUDED.currentclassnarrative,
currentclassnarrativeextended = EXCLUDED.currentclassnarrativeextended,
classstatuschanges = EXCLUDED.classstatuschanges,
classstatuschangesnarrative = EXCLUDED.classstatuschangesnarrative,
pandicoverage = EXCLUDED.pandicoverage,
pandicoveragenarrative = EXCLUDED.pandicoveragenarrative,
pandicoveragenarrativeextended = EXCLUDED.pandicoveragenarrativeextended,
namechanges = EXCLUDED.namechanges,
namechangesnarrative = EXCLUDED.namechangesnarrative,
gbochanges = EXCLUDED.gbochanges,
gbochangesnarrative = EXCLUDED.gbochangesnarrative,
ageofship = EXCLUDED.ageofship,
ageofshipnarrative = EXCLUDED.ageofshipnarrative,
iuufishingviolation = EXCLUDED.iuufishingviolation,
iuufishingnarrative = EXCLUDED.iuufishingnarrative,
draughtchanges = EXCLUDED.draughtchanges,
draughtchangesnarrative = EXCLUDED.draughtchangesnarrative,
mostrecentsanctionedportcall = EXCLUDED.mostrecentsanctionedportcall,
mostrecentsanctionedportcallnarrative = EXCLUDED.mostrecentsanctionedportcallnarrative,
singleshipoperation = EXCLUDED.singleshipoperation,
singleshipoperationnarrative = EXCLUDED.singleshipoperationnarrative,
fleetsafety = EXCLUDED.fleetsafety,
fleetsafetynarrative = EXCLUDED.fleetsafetynarrative,
fleetpsc = EXCLUDED.fleetpsc,
fleetpscnarrative = EXCLUDED.fleetpscnarrative,
specialsurveyoverdue = EXCLUDED.specialsurveyoverdue,
specialsurveyoverduenarrative = EXCLUDED.specialsurveyoverduenarrative,
ownerunknown = EXCLUDED.ownerunknown,
ownerunknownnarrative = EXCLUDED.ownerunknownnarrative,
russianportcall = EXCLUDED.russianportcall,
russianportcallnarrative = EXCLUDED.russianportcallnarrative,
russianownerregistration = EXCLUDED.russianownerregistration,
russianownerregistrationnarrative = EXCLUDED.russianownerregistrationnarrative,
russiansts = EXCLUDED.russiansts,
russianstsnarrative = EXCLUDED.russianstsnarrative;
""";
}
@Override
protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) throws Exception {
}
@Override
protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) throws Exception {
int idx = 1;
ps.setString(idx++, entity.getLrno());
ps.setString(idx++, entity.getLastUpdated());
ps.setObject(idx++, entity.getRiskDataMaintained(), java.sql.Types.INTEGER);
ps.setObject(idx++, entity.getDaysSinceLastSeenOnAIS(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getDaysSinceLastSeenOnAISNarrative());
ps.setObject(idx++, entity.getDaysUnderAIS(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getDaysUnderAISNarrative());
ps.setObject(idx++, entity.getImoCorrectOnAIS(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getImoCorrectOnAISNarrative());
ps.setObject(idx++, entity.getSailingUnderName(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getSailingUnderNameNarrative());
ps.setObject(idx++, entity.getAnomalousMessagesFromMMSI(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getAnomalousMessagesFromMMSINarrative());
ps.setObject(idx++, entity.getMostRecentDarkActivity(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getMostRecentDarkActivityNarrative());
ps.setObject(idx++, entity.getPortCalls(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getPortCallsNarrative());
ps.setObject(idx++, entity.getPortRisk(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getPortRiskNarrative());
ps.setObject(idx++, entity.getStsOperations(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getStsOperationsNarrative());
ps.setObject(idx++, entity.getDriftingHighSeas(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getDriftingHighSeasNarrative());
ps.setObject(idx++, entity.getRiskEvents(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getRiskEventNarrative());
ps.setString(idx++, entity.getRiskEventNarrativeExtended());
ps.setObject(idx++, entity.getFlagChanges(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getFlagChangeNarrative());
ps.setObject(idx++, entity.getFlagParisMOUPerformance(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getFlagParisMOUPerformanceNarrative());
ps.setObject(idx++, entity.getFlagTokyoMOUPeformance(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getFlagTokyoMOUPeformanceNarrative());
ps.setObject(idx++, entity.getFlagUSCGMOUPerformance(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getFlagUSCGMOUPerformanceNarrative());
ps.setObject(idx++, entity.getUscgQualship21(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getUscgQualship21Narrative());
ps.setObject(idx++, entity.getTimeSincePSCInspection(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getTimeSincePSCInspectionNarrative());
ps.setObject(idx++, entity.getPscInspections(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getPscInspectionNarrative());
ps.setObject(idx++, entity.getPscDefects(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getPscDefectsNarrative());
ps.setObject(idx++, entity.getPscDetentions(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getPscDetentionsNarrative());
ps.setObject(idx++, entity.getCurrentSMCCertificate(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getCurrentSMCCertificateNarrative());
ps.setObject(idx++, entity.getDocChanges(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getDocChangesNarrative());
ps.setObject(idx++, entity.getCurrentClass(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getCurrentClassNarrative());
ps.setString(idx++, entity.getCurrentClassNarrativeExtended());
ps.setObject(idx++, entity.getClassStatusChanges(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getClassStatusChangesNarrative());
ps.setObject(idx++, entity.getPandICoverage(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getPandICoverageNarrative());
ps.setString(idx++, entity.getPandICoverageNarrativeExtended());
ps.setObject(idx++, entity.getNameChanges(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getNameChangesNarrative());
ps.setObject(idx++, entity.getGboChanges(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getGboChangesNarrative());
ps.setObject(idx++, entity.getAgeOfShip(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getAgeofShipNarrative());
ps.setObject(idx++, entity.getIuuFishingViolation(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getIuuFishingNarrative());
ps.setObject(idx++, entity.getDraughtChanges(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getDraughtChangesNarrative());
ps.setObject(idx++, entity.getMostRecentSanctionedPortCall(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getMostRecentSanctionedPortCallNarrative());
ps.setObject(idx++, entity.getSingleShipOperation(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getSingleShipOperationNarrative());
ps.setObject(idx++, entity.getFleetSafety(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getFleetSafetyNarrative());
ps.setObject(idx++, entity.getFleetPSC(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getFleetPSCNarrative());
ps.setObject(idx++, entity.getSpecialSurveyOverdue(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getSpecialSurveyOverdueNarrative());
ps.setObject(idx++, entity.getOwnerUnknown(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getOwnerUnknownNarrative());
ps.setObject(idx++, entity.getRussianPortCall(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getRussianPortCallNarrative());
ps.setObject(idx++, entity.getRussianOwnerRegistration(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getRussianOwnerRegistrationNarrative());
ps.setObject(idx++, entity.getRussianSTS(), java.sql.Types.INTEGER);
ps.setString(idx++, entity.getRussianSTSNarrative());
}
@Override
protected String getEntityName() {
return "RiskEntity";
}
@Override
public void saveRiskAll(List<RiskEntity> items) {
if (items == null || items.isEmpty()) {
return;
}
jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(),
(ps, entity) -> {
try {
setUpdateParameters(ps, entity);
} catch (Exception e) {
log.error("배치 수정 파라미터 설정 실패", e);
throw new RuntimeException(e);
}
});
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size());
}
}

파일 보기

@ -0,0 +1,26 @@
package com.snp.batch.jobs.risk.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
import com.snp.batch.jobs.risk.batch.repository.RiskRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class RiskDataWriter extends BaseWriter<RiskEntity> {
private final RiskRepository riskRepository;
public RiskDataWriter(RiskRepository riskRepository) {
super("riskRepository");
this.riskRepository = riskRepository;
}
@Override
protected void writeItems(List<RiskEntity> items) throws Exception {
riskRepository.saveRiskAll(items);
log.info("Risk 저장 완료: 수정={} 건", items.size());
}
}

파일 보기

@ -0,0 +1,86 @@
package com.snp.batch.jobs.sanction.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto;
import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity;
import com.snp.batch.jobs.sanction.batch.processor.ComplianceDataProcessor;
import com.snp.batch.jobs.sanction.batch.reader.ComplianceDataReader;
import com.snp.batch.jobs.sanction.batch.writer.ComplianceDataWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class SanctionUpdateJobConfig extends BaseJobConfig<ComplianceDto, ComplianceEntity> {
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeServiceApiWebClient;
private final ComplianceDataProcessor complianceDataProcessor;
private final ComplianceDataWriter complianceDataWriter;
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
public SanctionUpdateJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ComplianceDataProcessor complianceDataProcessor,
ComplianceDataWriter complianceDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient) {
super(jobRepository, transactionManager);
this.jdbcTemplate = jdbcTemplate;
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
this.complianceDataProcessor = complianceDataProcessor;
this.complianceDataWriter = complianceDataWriter;
}
@Override
protected String getJobName() {
return "sanctionUpdateJob";
}
@Override
protected String getStepName() {
return "sanctionUpdateStep";
}
@Override
protected ItemReader<ComplianceDto> createReader() {
return new ComplianceDataReader(maritimeServiceApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<ComplianceDto, ComplianceEntity> createProcessor() {
return complianceDataProcessor;
}
@Override
protected ItemWriter<ComplianceEntity> createWriter() {
return complianceDataWriter;
}
@Bean(name = "sanctionUpdateJob")
public Job sanctionUpdateJob() {
return job();
}
@Bean(name = "sanctionUpdateStep")
public Step sanctionUpdateStep() {
return step();
}
}

파일 보기

@ -0,0 +1,121 @@
package com.snp.batch.jobs.sanction.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ComplianceDto {
@JsonProperty("shipEUSanctionList")
private Integer shipEUSanctionList;
@JsonProperty("shipUNSanctionList")
private Integer shipUNSanctionList;
@JsonProperty("lrimoShipNo")
private String lrimoShipNo;
@JsonProperty("dateAmended")
private String dateAmended; // 수정일시
// 2. Compliance Status (Integer 타입은 0, 1, 2 등의 코드 null 처리)
@JsonProperty("legalOverall")
private Integer legalOverall; // 종합제재
@JsonProperty("shipBESSanctionList")
private Integer shipBESSanctionList; // 선박BES제재
@JsonProperty("shipDarkActivityIndicator")
private Integer shipDarkActivityIndicator; // 선박다크활동
@JsonProperty("shipDetailsNoLongerMaintained")
private Integer shipDetailsNoLongerMaintained; // 선박세부정보미유지
@JsonProperty("shipFlagDisputed")
private Integer shipFlagDisputed; // 선박국기논쟁
@JsonProperty("shipFlagSanctionedCountry")
private Integer shipFlagSanctionedCountry; // 선박국가제재
@JsonProperty("shipHistoricalFlagSanctionedCountry")
private Integer shipHistoricalFlagSanctionedCountry; // 선박국가제재이력
@JsonProperty("shipOFACNonSDNSanctionList")
private Integer shipOFACNonSDNSanctionList; // 선박OFAC비SDN제재
@JsonProperty("shipOFACSanctionList")
private Integer shipOFACSanctionList; // 선박OFAC제재
@JsonProperty("shipOFACAdvisoryList")
private Integer shipOFACAdvisoryList; // 선박OFAC주의
@JsonProperty("shipOwnerOFACSSIList")
private Integer shipOwnerOFACSSIList; // 선박소유자OFCS제재
@JsonProperty("shipOwnerAustralianSanctionList")
private Integer shipOwnerAustralianSanctionList; // 선박소유자AUS제재
@JsonProperty("shipOwnerBESSanctionList")
private Integer shipOwnerBESSanctionList; // 선박소유자BES제재
@JsonProperty("shipOwnerCanadianSanctionList")
private Integer shipOwnerCanadianSanctionList; // 선박소유자CAN제재
@JsonProperty("shipOwnerEUSanctionList")
private Integer shipOwnerEUSanctionList; // 선박소유자EU제재
@JsonProperty("shipOwnerFATFJurisdiction")
private Integer shipOwnerFATFJurisdiction; // 선박소유자FATF규제구역
@JsonProperty("shipOwnerHistoricalOFACSanctionedCountry")
private Integer shipOwnerHistoricalOFACSanctionedCountry; // 선박소유자OFAC제재이력
@JsonProperty("shipOwnerOFACSanctionList")
private Integer shipOwnerOFACSanctionList; // 선박소유자OFAC제재
@JsonProperty("shipOwnerOFACSanctionedCountry")
private Integer shipOwnerOFACSanctionedCountry; // 선박소유자OFAC제재국가
@JsonProperty("shipOwnerParentCompanyNonCompliance")
private Integer shipOwnerParentCompanyNonCompliance; // 선박소유자모회사비준수
@JsonProperty("shipOwnerParentFATFJurisdiction")
private Integer shipOwnerParentFATFJurisdiction; // 선박소유자모회사FATF규제구역 (JSON에 null 포함)
@JsonProperty("shipOwnerParentOFACSanctionedCountry")
private Integer shipOwnerParentOFACSanctionedCountry; // 선박소유자모회사OFAC제재국가 (JSON에 null 포함)
@JsonProperty("shipOwnerSwissSanctionList")
private Integer shipOwnerSwissSanctionList; // 선박소유자SWI제재
@JsonProperty("shipOwnerUAESanctionList")
private Integer shipOwnerUAESanctionList; // 선박소유자UAE제재
@JsonProperty("shipOwnerUNSanctionList")
private Integer shipOwnerUNSanctionList; // 선박소유자UN제재
@JsonProperty("shipSanctionedCountryPortCallLast12m")
private Integer shipSanctionedCountryPortCallLast12m; // 선박제재국가기항최종12M
@JsonProperty("shipSanctionedCountryPortCallLast3m")
private Integer shipSanctionedCountryPortCallLast3m; // 선박제재국가기항최종3M
@JsonProperty("shipSanctionedCountryPortCallLast6m")
private Integer shipSanctionedCountryPortCallLast6m; // 선박제재국가기항최종6M
@JsonProperty("shipSecurityLegalDisputeEvent")
private Integer shipSecurityLegalDisputeEvent; // 선박보안법적분쟁이벤트
@JsonProperty("shipSTSPartnerNonComplianceLast12m")
private Integer shipSTSPartnerNonComplianceLast12m; // 선박STS파트너비준수12M
@JsonProperty("shipSwissSanctionList")
private Integer shipSwissSanctionList; // 선박SWI제재
}

파일 보기

@ -0,0 +1,17 @@
package com.snp.batch.jobs.sanction.batch.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ComplianceResponse {
private List<ComplianceDto> complianceDtoList;
}

파일 보기

@ -0,0 +1,90 @@
package com.snp.batch.jobs.sanction.batch.entity;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class ComplianceEntity extends BaseEntity {
private String lrimoShipNo; // LR/IMO번호
private String dateAmended; // 수정일시
// 2. Compliance Status (모든 필드는 DTO와 동일한 Integer 타입)
private Integer legalOverall; // 종합제재
private Integer shipBESSanctionList; // 선박BES제재
private Integer shipDarkActivityIndicator; // 선박다크활동
private Integer shipDetailsNoLongerMaintained; // 선박세부정보미유지
private Integer shipEUSanctionList; // 선박EU제재
private Integer shipFlagDisputed; // 선박국기논쟁
private Integer shipFlagSanctionedCountry; // 선박국가제재
private Integer shipHistoricalFlagSanctionedCountry; // 선박국가제재이력
private Integer shipOFACNonSDNSanctionList; // 선박OFAC비SDN제재
private Integer shipOFACSanctionList; // 선박OFAC제재
private Integer shipOFACAdvisoryList; // 선박OFAC주의
private Integer shipOwnerOFACSSIList; // 선박소유자OFCS제재
private Integer shipOwnerAustralianSanctionList; // 선박소유자AUS제재
private Integer shipOwnerBESSanctionList; // 선박소유자BES제재
private Integer shipOwnerCanadianSanctionList; // 선박소유자CAN제재
private Integer shipOwnerEUSanctionList; // 선박소유자EU제재
private Integer shipOwnerFATFJurisdiction; // 선박소유자FATF규제구역
private Integer shipOwnerHistoricalOFACSanctionedCountry; // 선박소유자OFAC제재이력
private Integer shipOwnerOFACSanctionList; // 선박소유자OFAC제재
private Integer shipOwnerOFACSanctionedCountry; // 선박소유자OFAC제재국가
private Integer shipOwnerParentCompanyNonCompliance; // 선박소유자모회사비준수
private Integer shipOwnerParentFATFJurisdiction; // 선박소유자모회사FATF규제구역
private Integer shipOwnerParentOFACSanctionedCountry; // 선박소유자모회사OFAC제재국가
private Integer shipOwnerSwissSanctionList; // 선박소유자SWI제재
private Integer shipOwnerUAESanctionList; // 선박소유자UAE제재
private Integer shipOwnerUNSanctionList; // 선박소유자UN제재
private Integer shipSanctionedCountryPortCallLast12m; // 선박제재국가기항최종12M
private Integer shipSanctionedCountryPortCallLast3m; // 선박제재국가기항최종3M
private Integer shipSanctionedCountryPortCallLast6m; // 선박제재국가기항최종6M
private Integer shipSecurityLegalDisputeEvent; // 선박보안법적분쟁이벤트
private Integer shipSTSPartnerNonComplianceLast12m; // 선박STS파트너비준수12M
private Integer shipSwissSanctionList; // 선박SWI제재
private Integer shipUNSanctionList; // 선박UN제재
}

파일 보기

@ -0,0 +1,60 @@
package com.snp.batch.jobs.sanction.batch.processor;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto;
import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ComplianceDataProcessor extends BaseProcessor<ComplianceDto, ComplianceEntity> {
@Override
protected ComplianceEntity processItem(ComplianceDto dto) throws Exception {
log.debug("AIS 최신 항적 데이터 처리 시작: imoNumber={}", dto.getLrimoShipNo());
ComplianceEntity entity = ComplianceEntity.builder()
// 1. Primary Keys
.lrimoShipNo(dto.getLrimoShipNo())
.dateAmended(dto.getDateAmended())
// 2. Compliance Status
.legalOverall(dto.getLegalOverall())
.shipBESSanctionList(dto.getShipBESSanctionList())
.shipDarkActivityIndicator(dto.getShipDarkActivityIndicator())
.shipDetailsNoLongerMaintained(dto.getShipDetailsNoLongerMaintained())
.shipEUSanctionList(dto.getShipEUSanctionList())
.shipFlagDisputed(dto.getShipFlagDisputed())
.shipFlagSanctionedCountry(dto.getShipFlagSanctionedCountry())
.shipHistoricalFlagSanctionedCountry(dto.getShipHistoricalFlagSanctionedCountry())
.shipOFACNonSDNSanctionList(dto.getShipOFACNonSDNSanctionList())
.shipOFACSanctionList(dto.getShipOFACSanctionList())
.shipOFACAdvisoryList(dto.getShipOFACAdvisoryList())
.shipOwnerOFACSSIList(dto.getShipOwnerOFACSSIList())
.shipOwnerAustralianSanctionList(dto.getShipOwnerAustralianSanctionList())
.shipOwnerBESSanctionList(dto.getShipOwnerBESSanctionList())
.shipOwnerCanadianSanctionList(dto.getShipOwnerCanadianSanctionList())
.shipOwnerEUSanctionList(dto.getShipOwnerEUSanctionList())
.shipOwnerFATFJurisdiction(dto.getShipOwnerFATFJurisdiction())
.shipOwnerHistoricalOFACSanctionedCountry(dto.getShipOwnerHistoricalOFACSanctionedCountry())
.shipOwnerOFACSanctionList(dto.getShipOwnerOFACSanctionList())
.shipOwnerOFACSanctionedCountry(dto.getShipOwnerOFACSanctionedCountry())
.shipOwnerParentCompanyNonCompliance(dto.getShipOwnerParentCompanyNonCompliance())
.shipOwnerParentFATFJurisdiction(dto.getShipOwnerParentFATFJurisdiction())
.shipOwnerParentOFACSanctionedCountry(dto.getShipOwnerParentOFACSanctionedCountry())
.shipOwnerSwissSanctionList(dto.getShipOwnerSwissSanctionList())
.shipOwnerUAESanctionList(dto.getShipOwnerUAESanctionList())
.shipOwnerUNSanctionList(dto.getShipOwnerUNSanctionList())
.shipSanctionedCountryPortCallLast12m(dto.getShipSanctionedCountryPortCallLast12m())
.shipSanctionedCountryPortCallLast3m(dto.getShipSanctionedCountryPortCallLast3m())
.shipSanctionedCountryPortCallLast6m(dto.getShipSanctionedCountryPortCallLast6m())
.shipSecurityLegalDisputeEvent(dto.getShipSecurityLegalDisputeEvent())
.shipSTSPartnerNonComplianceLast12m(dto.getShipSTSPartnerNonComplianceLast12m())
.shipSwissSanctionList(dto.getShipSwissSanctionList())
.shipUNSanctionList(dto.getShipUNSanctionList())
.build();
log.debug("AIS 최신 항적 데이터 처리 완료: imoNumber={}", dto.getLrimoShipNo());
return entity;
}
}

파일 보기

@ -0,0 +1,142 @@
package com.snp.batch.jobs.sanction.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto;
import com.snp.batch.jobs.sanction.batch.dto.ComplianceResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
@Slf4j
public class ComplianceDataReader extends BaseApiReader<ComplianceDto> {
//TODO :
// 1. Core20 IMO_NUMBER 전체 조회
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
private final JdbcTemplate jdbcTemplate;
private List<String> allImoNumbers;
private int currentBatchIndex = 0;
private final int batchSize = 100;
public ComplianceDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
enableChunkMode(); // Chunk 모드 활성화
}
@Override
protected String getReaderName() {
return "ShipLastPositionDataReader";
}
@Override
protected String getApiPath() {
return "/RiskAndCompliance/CompliancesByImos";
}
private String getTargetTable(){
return "snp_data.core20";
}
private String GET_CORE_IMO_LIST =
// "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno";
"select imo_number as ihslrorimoshipno from snp_data.ship_data order by imo_number";
@Override
protected void beforeFetch(){
log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName());
allImoNumbers = jdbcTemplate.queryForList(GET_CORE_IMO_LIST, String.class);
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size());
log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize);
log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches);
updateApiCallStats(totalBatches, 0);
}
@Override
protected List<ComplianceDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<String> currentBatch = allImoNumbers.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// IMO 번호를 쉼표로 연결 (: "1000019,1000021,1000033,...")
String imoParam = String.join(",", currentBatch);
// API 호출
List<ComplianceDto> response = callAisApiWithBatch(imoParam);
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
// 응답 처리
if (response != null) {
// List<ComplianceDto> targets = response;
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, response.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < allImoNumbers.size()) {
Thread.sleep(500);
}
return response;
} else {
log.warn("[{}] 배치 {}/{} 응답 없음",
getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
}
}
private List<ComplianceDto> callAisApiWithBatch(String imoNumbers) {
String url = getApiPath() + "?imos=" + imoNumbers;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<ComplianceDto>>() {})
.block();
}
}

파일 보기

@ -0,0 +1,9 @@
package com.snp.batch.jobs.sanction.batch.repository;
import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity;
import java.util.List;
public interface ComplianceRepository {
void saveComplianceAll(List<ComplianceEntity> items);
}

파일 보기

@ -0,0 +1,166 @@
package com.snp.batch.jobs.sanction.batch.repository;
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.sql.Types;
import java.util.List;
@Slf4j
@Repository("complianceRepository")
public class ComplianceRepositoryImpl extends BaseJdbcRepository<ComplianceEntity, Long> implements ComplianceRepository {
public ComplianceRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate);
}
@Override
protected String getTableName() {
return null;
}
@Override
protected RowMapper<ComplianceEntity> getRowMapper() {
return null;
}
@Override
protected Long extractId(ComplianceEntity entity) {
return null;
}
@Override
protected String getInsertSql() {
return null;
}
@Override
protected String getUpdateSql() {
return """
INSERT INTO snp_data.compliance (
lrimoshipno, dateamended, legaloverall, shipbessanctionlist, shipdarkactivityindicator,
shipdetailsnolongermaintained, shipeusanctionlist, shipflagdisputed, shipflagsanctionedcountry,
shiphistoricalflagsanctionedcountry, shipofacnonsdnsanctionlist, shipofacsanctionlist,
shipofacadvisorylist, shipownerofacssilist, shipowneraustraliansanctionlist, shipownerbessanctionlist,
shipownercanadiansanctionlist, shipownereusanctionlist, shipownerfatfjurisdiction,
shipownerhistoricalofacsanctionedcountry, shipownerofacsanctionlist, shipownerofacsanctionedcountry,
shipownerparentcompanynoncompliance, shipownerparentfatfjurisdiction, shipownerparentofacsanctionedcountry,
shipownerswisssanctionlist, shipowneruaesanctionlist, shipownerunsanctionlist,
shipsanctionedcountryportcalllast12m, shipsanctionedcountryportcalllast3m, shipsanctionedcountryportcalllast6m,
shipsecuritylegaldisputeevent, shipstspartnernoncompliancelast12m, shipswisssanctionlist,
shipunsanctionlist
)
VALUES (
?, ?::timestamptz, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
ON CONFLICT (lrimoshipno, dateamended)
DO UPDATE SET
legaloverall = EXCLUDED.legaloverall,
shipbessanctionlist = EXCLUDED.shipbessanctionlist,
shipdarkactivityindicator = EXCLUDED.shipdarkactivityindicator,
shipdetailsnolongermaintained = EXCLUDED.shipdetailsnolongermaintained,
shipeusanctionlist = EXCLUDED.shipeusanctionlist,
shipflagdisputed = EXCLUDED.shipflagdisputed,
shipflagsanctionedcountry = EXCLUDED.shipflagsanctionedcountry,
shiphistoricalflagsanctionedcountry = EXCLUDED.shiphistoricalflagsanctionedcountry,
shipofacnonsdnsanctionlist = EXCLUDED.shipofacnonsdnsanctionlist,
shipofacsanctionlist = EXCLUDED.shipofacsanctionlist,
shipofacadvisorylist = EXCLUDED.shipofacadvisorylist,
shipownerofacssilist = EXCLUDED.shipownerofacssilist,
shipowneraustraliansanctionlist = EXCLUDED.shipowneraustraliansanctionlist,
shipownerbessanctionlist = EXCLUDED.shipownerbessanctionlist,
shipownercanadiansanctionlist = EXCLUDED.shipownercanadiansanctionlist,
shipownereusanctionlist = EXCLUDED.shipownereusanctionlist,
shipownerfatfjurisdiction = EXCLUDED.shipownerfatfjurisdiction,
shipownerhistoricalofacsanctionedcountry = EXCLUDED.shipownerhistoricalofacsanctionedcountry,
shipownerofacsanctionlist = EXCLUDED.shipownerofacsanctionlist,
shipownerofacsanctionedcountry = EXCLUDED.shipownerofacsanctionedcountry,
shipownerparentcompanynoncompliance = EXCLUDED.shipownerparentcompanynoncompliance,
shipownerparentfatfjurisdiction = EXCLUDED.shipownerparentfatfjurisdiction,
shipownerparentofacsanctionedcountry = EXCLUDED.shipownerparentofacsanctionedcountry,
shipownerswisssanctionlist = EXCLUDED.shipownerswisssanctionlist,
shipowneruaesanctionlist = EXCLUDED.shipowneruaesanctionlist,
shipownerunsanctionlist = EXCLUDED.shipownerunsanctionlist,
shipsanctionedcountryportcalllast12m = EXCLUDED.shipsanctionedcountryportcalllast12m,
shipsanctionedcountryportcalllast3m = EXCLUDED.shipsanctionedcountryportcalllast3m,
shipsanctionedcountryportcalllast6m = EXCLUDED.shipsanctionedcountryportcalllast6m,
shipsecuritylegaldisputeevent = EXCLUDED.shipsecuritylegaldisputeevent,
shipstspartnernoncompliancelast12m = EXCLUDED.shipstspartnernoncompliancelast12m,
shipswisssanctionlist = EXCLUDED.shipswisssanctionlist,
shipunsanctionlist = EXCLUDED.shipunsanctionlist;
""";
}
@Override
protected void setInsertParameters(PreparedStatement ps, ComplianceEntity entity) throws Exception {
}
@Override
protected void setUpdateParameters(PreparedStatement ps, ComplianceEntity entity) throws Exception {
int idx = 1;
ps.setString(idx++, entity.getLrimoShipNo());
ps.setString(idx++, entity.getDateAmended());
ps.setObject(idx++, entity.getLegalOverall(), Types.INTEGER);
ps.setObject(idx++, entity.getShipBESSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipDarkActivityIndicator(), Types.INTEGER);
ps.setObject(idx++, entity.getShipDetailsNoLongerMaintained(), Types.INTEGER);
ps.setObject(idx++, entity.getShipEUSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipFlagDisputed(), Types.INTEGER);
ps.setObject(idx++, entity.getShipFlagSanctionedCountry(), Types.INTEGER);
ps.setObject(idx++, entity.getShipHistoricalFlagSanctionedCountry(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOFACNonSDNSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOFACSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOFACAdvisoryList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerOFACSSIList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerAustralianSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerBESSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerCanadianSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerEUSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerFATFJurisdiction(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerHistoricalOFACSanctionedCountry(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerOFACSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerOFACSanctionedCountry(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerParentCompanyNonCompliance(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerParentFATFJurisdiction(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerParentOFACSanctionedCountry(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerSwissSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerUAESanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipOwnerUNSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipSanctionedCountryPortCallLast12m(), Types.INTEGER);
ps.setObject(idx++, entity.getShipSanctionedCountryPortCallLast3m(), Types.INTEGER);
ps.setObject(idx++, entity.getShipSanctionedCountryPortCallLast6m(), Types.INTEGER);
ps.setObject(idx++, entity.getShipSecurityLegalDisputeEvent(), Types.INTEGER);
ps.setObject(idx++, entity.getShipSTSPartnerNonComplianceLast12m(), Types.INTEGER);
ps.setObject(idx++, entity.getShipSwissSanctionList(), Types.INTEGER);
ps.setObject(idx++, entity.getShipUNSanctionList(), Types.INTEGER);
}
@Override
protected String getEntityName() {
return "ComplianceEntity";
}
@Override
public void saveComplianceAll(List<ComplianceEntity> items) {
if (items == null || items.isEmpty()) {
return;
}
jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(),
(ps, entity) -> {
try {
setUpdateParameters(ps, entity);
} catch (Exception e) {
log.error("배치 수정 파라미터 설정 실패", e);
throw new RuntimeException(e);
}
});
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size());
}
}

파일 보기

@ -0,0 +1,25 @@
package com.snp.batch.jobs.sanction.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity;
import com.snp.batch.jobs.sanction.batch.repository.ComplianceRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class ComplianceDataWriter extends BaseWriter<ComplianceEntity> {
private final ComplianceRepository complianceRepository;
public ComplianceDataWriter(ComplianceRepository complianceRepository) {
super("complianceRepository");
this.complianceRepository = complianceRepository;
}
@Override
protected void writeItems(List<ComplianceEntity> items) throws Exception {
complianceRepository.saveComplianceAll(items);
}
}

파일 보기

@ -98,7 +98,7 @@ public class ShipDetailImportJobConfig extends BaseJobConfig<ShipDetailCompariso
@Override
protected int getChunkSize() {
return 50; // API에서 100개씩 가져오므로 chunk도 100으로 설정
return 30; // API에서 100개씩 가져오므로 chunk도 100으로 설정
}
@Bean(name = "shipDetailImportJob")

파일 보기

@ -38,13 +38,8 @@ public class ShipLastPositionDataReader extends BaseApiReader<TargetEnhancedDto>
return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced";
}
@Override
protected String getApiBaseUrl() {
return "https://aisapi.maritime.spglobal.com";
}
private String getTargetTable(){
return "test_s_p.test_core20";
return "snp_data.core20";
}
private String GET_CORE_IMO_LIST =
"SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno";

파일 보기

@ -29,7 +29,7 @@ public class ShipDetailRepositoryImpl extends BaseJdbcRepository<ShipDetailEntit
@Override
protected String getTableName() {
return "test_s_p.test_core20";
return "snp_data.core20";
}
@Override
@ -45,13 +45,13 @@ public class ShipDetailRepositoryImpl extends BaseJdbcRepository<ShipDetailEntit
@Override
protected String getInsertSql() {
return """
INSERT INTO test_s_p.test_core20(
INSERT INTO snp_data.core20(
shipresultindex, vesselid, ihslrorimoshipno, maritimemobileserviceidentitymmsinumber, shipname,
callsign, flagname, portofregistry, classificationsociety, shiptypelevel5,
shiptypelevel5subtype, yearofbuild, shipbuilder, lengthoverallloa, breadthmoulded,
"depth", draught, grosstonnage, deadweight, teu,
mainenginetype, status, operator, flagcode, shiptypelevel2
) VALUES (nextval('test_s_p.core20_index_seq'::regclass), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (nextval('snp_data.core20_index_seq'::regclass), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (ihslrorimoshipno)
DO UPDATE SET
vesselid = EXCLUDED.vesselid,
@ -83,7 +83,7 @@ public class ShipDetailRepositoryImpl extends BaseJdbcRepository<ShipDetailEntit
@Override
protected String getUpdateSql() {
return """
UPDATE test_s_p.test_core20
UPDATE snp_data.core20
SET vesselid = ?,
maritimemobileserviceidentitymmsinumber = ?,
shipname = ?,

파일 보기

@ -4,10 +4,10 @@ public class ShipDetailSql {
public static String getOwnerHistorySql(){
return """
INSERT INTO test_s_p.test_ownerhistory(
INSERT INTO snp_data.ownerhistory(
datasetversion, companystatus, effectivedate, lrno, "owner",
ownercode, "sequence", shipresultindex, vesselid, rowindex, batch_flag
)VALUES(?, ?, ?, ?, ?, ?, ?, nextval('test_s_p.ownerhistory_index_seq'::regclass), ?, nextval('test_s_p.ownerhistory_row_index_seq'::regclass), 'N')
)VALUES(?, ?, ?, ?, ?, ?, ?, nextval('snp_data.ownerhistory_index_seq'::regclass), ?, nextval('snp_data.ownerhistory_row_index_seq'::regclass), 'N')
ON CONFLICT (lrno,ownercode, effectivedate)
DO UPDATE SET
datasetversion = EXCLUDED.datasetversion,

파일 보기

@ -21,7 +21,7 @@ public class ShipHashRepositoryImpl extends BaseJdbcRepository<ShipHashEntity, S
@Override
protected String getTableName() {
return "public.ship_detail_hash_json";
return "snp_data.ship_detail_hash_json";
}
@Override
@ -37,10 +37,10 @@ public class ShipHashRepositoryImpl extends BaseJdbcRepository<ShipHashEntity, S
@Override
protected String getInsertSql() {
return """
INSERT INTO public.ship_detail_hash_json(
id, imo_number, ship_detail_hash, created_at, created_by, updated_at, updated_by
INSERT INTO snp_data.ship_detail_hash_json(
imo_number, ship_detail_hash, created_at, created_by, updated_at, updated_by
)VALUES(
nextval('ship_imo_data_id_seq1'::regclass), ?, ?, ?, ?, ?, ?
?, ?, ?, ?, ?, ?
)
ON CONFLICT (imo_number)
DO UPDATE SET
@ -53,7 +53,7 @@ public class ShipHashRepositoryImpl extends BaseJdbcRepository<ShipHashEntity, S
@Override
protected String getUpdateSql() {
return """
UPDATE public.ship_detail_hash_json
UPDATE snp_data.ship_detail_hash_json
SET ship_detail_hash = ?,
updated_at = ?,
updated_by = ?

파일 보기

@ -41,7 +41,7 @@ public class ShipLastPositionRepositoryImpl extends BaseJdbcRepository<TargetEnh
@Override
protected String getUpdateSql() {
return """
UPDATE test_s_p.test_core20
UPDATE snp_data.core20
SET lastseen = ?::timestamptz,
lastport = ?,
position_latitude = ?,

파일 보기

@ -0,0 +1,102 @@
spring:
application:
name: snp-batch
# PostgreSQL Database Configuration
datasource:
url: jdbc:postgresql://10.26.252.39:5432/mdadb?currentSchema=snp_data
username: mda
password: mda#8932
driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 10
minimum-idle: 5
connection-timeout: 30000
# JPA Configuration
jpa:
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect
format_sql: true
default_schema: public
# Batch Configuration
batch:
jdbc:
table-prefix: "snp_data.batch_"
initialize-schema: never # Changed to 'never' as tables already exist
job:
enabled: false # Prevent auto-run on startup
# Thymeleaf Configuration
thymeleaf:
cache: false
prefix: classpath:/templates/
suffix: .html
# Quartz Scheduler Configuration - Using JDBC Store for persistence
quartz:
job-store-type: jdbc # JDBC store for schedule persistence
jdbc:
initialize-schema: always # Create Quartz tables if not exist
properties:
org.quartz.scheduler.instanceName: SNPBatchScheduler
org.quartz.scheduler.instanceId: AUTO
org.quartz.threadPool.threadCount: 10
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.isClustered: false
org.quartz.jobStore.misfireThreshold: 60000
# Server Configuration
server:
port: 8041
servlet:
context-path: /
# Actuator Configuration
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,batch
endpoint:
health:
show-details: always
# Logging Configuration
logging:
level:
root: INFO
com.snp.batch: DEBUG
org.springframework.batch: DEBUG
org.springframework.jdbc: DEBUG
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
file:
name: logs/snp-batch.log
# Custom Application Properties
app:
batch:
chunk-size: 1000
api:
url: https://api.example.com/data
timeout: 30000
ship-api:
url: https://shipsapi.maritime.spglobal.com
username: 7cc0517d-5ed6-452e-a06f-5bbfd6ab6ade
password: 2LLzSJNqtxWVD8zC
ais-api:
url: https://aisapi.maritime.spglobal.com
webservice-api:
url: https://webservices.maritime.spglobal.com
schedule:
enabled: true
cron: "0 0 * * * ?" # Every hour

파일 보기

@ -0,0 +1,102 @@
spring:
application:
name: snp-batch
# PostgreSQL Database Configuration
datasource:
url: jdbc:postgresql://10.187.28.28:5432/mdadb?currentSchema=snp_data
username: mda
password: mda#8932
driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 10
minimum-idle: 5
connection-timeout: 30000
# JPA Configuration
jpa:
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect
format_sql: true
default_schema: public
# Batch Configuration
batch:
jdbc:
table-prefix: "snp_data.batch_"
initialize-schema: never # Changed to 'never' as tables already exist
job:
enabled: false # Prevent auto-run on startup
# Thymeleaf Configuration
thymeleaf:
cache: false
prefix: classpath:/templates/
suffix: .html
# Quartz Scheduler Configuration - Using JDBC Store for persistence
quartz:
job-store-type: jdbc # JDBC store for schedule persistence
jdbc:
initialize-schema: always # Create Quartz tables if not exist
properties:
org.quartz.scheduler.instanceName: SNPBatchScheduler
org.quartz.scheduler.instanceId: AUTO
org.quartz.threadPool.threadCount: 10
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.isClustered: false
org.quartz.jobStore.misfireThreshold: 60000
# Server Configuration
server:
port: 8041
servlet:
context-path: /
# Actuator Configuration
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,batch
endpoint:
health:
show-details: always
# Logging Configuration
logging:
level:
root: INFO
com.snp.batch: DEBUG
org.springframework.batch: DEBUG
org.springframework.jdbc: DEBUG
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
file:
name: logs/snp-batch.log
# Custom Application Properties
app:
batch:
chunk-size: 1000
api:
url: https://api.example.com/data
timeout: 30000
ship-api:
url: https://shipsapi.maritime.spglobal.com
username: 7cc0517d-5ed6-452e-a06f-5bbfd6ab6ade
password: 2LLzSJNqtxWVD8zC
ais-api:
url: https://aisapi.maritime.spglobal.com
webservice-api:
url: https://webservices.maritime.spglobal.com
schedule:
enabled: true
cron: "0 0 * * * ?" # Every hour

파일 보기

@ -0,0 +1,101 @@
spring:
application:
name: snp-batch
# PostgreSQL Database Configuration
datasource:
url: jdbc:postgresql://211.208.115.83:5432/snpdb
username: snp
password: snp#8932
driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 10
minimum-idle: 5
connection-timeout: 30000
# JPA Configuration
jpa:
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect
format_sql: true
default_schema: public
# Batch Configuration
batch:
jdbc:
initialize-schema: never # Changed to 'never' as tables already exist
job:
enabled: false # Prevent auto-run on startup
# Thymeleaf Configuration
thymeleaf:
cache: false
prefix: classpath:/templates/
suffix: .html
# Quartz Scheduler Configuration - Using JDBC Store for persistence
quartz:
job-store-type: jdbc # JDBC store for schedule persistence
jdbc:
initialize-schema: always # Create Quartz tables if not exist
properties:
org.quartz.scheduler.instanceName: SNPBatchScheduler
org.quartz.scheduler.instanceId: AUTO
org.quartz.threadPool.threadCount: 10
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.isClustered: false
org.quartz.jobStore.misfireThreshold: 60000
# Server Configuration
server:
port: 8041
servlet:
context-path: /
# Actuator Configuration
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,batch
endpoint:
health:
show-details: always
# Logging Configuration
logging:
level:
root: INFO
com.snp.batch: DEBUG
org.springframework.batch: DEBUG
org.springframework.jdbc: DEBUG
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
file:
name: logs/snp-batch.log
# Custom Application Properties
app:
batch:
chunk-size: 1000
api:
url: https://api.example.com/data
timeout: 30000
ship-api:
url: https://shipsapi.maritime.spglobal.com
username: 7cc0517d-5ed6-452e-a06f-5bbfd6ab6ade
password: 2LLzSJNqtxWVD8zC
ais-api:
url: https://aisapi.maritime.spglobal.com
webservice-api:
url: https://webservices.maritime.spglobal.com
schedule:
enabled: true
cron: "0 0 * * * ?" # Every hour

파일 보기

@ -27,6 +27,7 @@ spring:
# Batch Configuration
batch:
jdbc:
table-prefix: "snp_data.batch_"
initialize-schema: never # Changed to 'never' as tables already exist
job:
enabled: false # Prevent auto-run on startup
@ -54,7 +55,7 @@ spring:
# Server Configuration
server:
port: 8081
port: 8041
servlet:
context-path: /
@ -92,6 +93,10 @@ app:
url: https://shipsapi.maritime.spglobal.com
username: 7cc0517d-5ed6-452e-a06f-5bbfd6ab6ade
password: 2LLzSJNqtxWVD8zC
ais-api:
url: https://aisapi.maritime.spglobal.com
webservice-api:
url: https://webservices.maritime.spglobal.com
schedule:
enabled: true
cron: "0 0 * * * ?" # Every hour