diff --git a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java index 3f5260a..4f0d7f9 100644 --- a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java +++ b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java @@ -29,9 +29,13 @@ public class MaritimeApiWebClientConfig { @Value("${app.batch.ship-api.url}") private String maritimeApiUrl; - @Value("https://aisapi.maritime.spglobal.com") + @Value("${app.batch.ais-api.url}") private String maritimeAisApiUrl; + @Value("${app.batch.webservice-api.url}") + private String maritimeServiceApiUrl; + + @Value("${app.batch.ship-api.username}") private String maritimeApiUsername; @@ -79,6 +83,22 @@ public class MaritimeApiWebClientConfig { .maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 .build(); } + + @Bean(name = "maritimeServiceApiWebClient") + public WebClient maritimeServiceApiWebClient(){ + log.info("========================================"); + log.info("Maritime AIS API WebClient 생성"); + log.info("Base URL: {}", maritimeServiceApiUrl); + log.info("========================================"); + + return WebClient.builder() + .baseUrl(maritimeServiceApiUrl) + .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) + .codecs(configurer -> configurer + .defaultCodecs() + .maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 + .build(); + } } diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportJobConfig.java b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportJobConfig.java new file mode 100644 index 0000000..5aba1ab --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportJobConfig.java @@ -0,0 +1,84 @@ +package com.snp.batch.jobs.risk.batch.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.risk.batch.dto.RiskDto; +import com.snp.batch.jobs.risk.batch.entity.RiskEntity; +import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor; +import com.snp.batch.jobs.risk.batch.reader.RiskDataReader; +import com.snp.batch.jobs.risk.batch.writer.RiskDataWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +@Slf4j +@Configuration +public class RiskImportJobConfig extends BaseJobConfig { + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeServiceApiWebClient; + + private final RiskDataProcessor riskDataProcessor; + + private final RiskDataWriter riskDataWriter; + + @Override + protected int getChunkSize() { + return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정 + } + public RiskImportJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + RiskDataProcessor riskDataProcessor, + RiskDataWriter riskDataWriter, + JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient) { + super(jobRepository, transactionManager); + this.jdbcTemplate = jdbcTemplate; + this.maritimeServiceApiWebClient = maritimeServiceApiWebClient; + this.riskDataProcessor = riskDataProcessor; + this.riskDataWriter = riskDataWriter; + } + + @Override + protected String getJobName() { + return "riskImportJob"; + } + + @Override + protected String getStepName() { + return "riskImportStep"; + } + + @Override + protected ItemReader createReader() { + return new RiskDataReader(maritimeServiceApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return riskDataProcessor; + } + + @Override + protected ItemWriter createWriter() { return riskDataWriter; } + + @Bean(name = "riskImportJob") + public Job riskImportJob() { + return job(); + } + + @Bean(name = "riskImportStep") + public Step riskImportStep() { + return step(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/dto/RiskDto.java b/src/main/java/com/snp/batch/jobs/risk/batch/dto/RiskDto.java new file mode 100644 index 0000000..6797e1c --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/dto/RiskDto.java @@ -0,0 +1,277 @@ +package com.snp.batch.jobs.risk.batch.dto; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class RiskDto { + // 1. Vessel and General Information + @JsonProperty("lrno") + private String lrno; + + @JsonProperty("lastUpdated") + private String lastUpdated; + + @JsonProperty("riskDataMaintained") + private Integer riskDataMaintained; + + // 2. AIS/Tracking Risk + @JsonProperty("daysSinceLastSeenOnAIS") + private Integer daysSinceLastSeenOnAIS; + + @JsonProperty("daysSinceLastSeenOnAISNarrative") + private String daysSinceLastSeenOnAISNarrative; + + @JsonProperty("daysUnderAIS") + private Integer daysUnderAIS; + + @JsonProperty("daysUnderAISNarrative") + private String daysUnderAISNarrative; + + @JsonProperty("imoCorrectOnAIS") + private Integer imoCorrectOnAIS; + + @JsonProperty("imoCorrectOnAISNarrative") + private String imoCorrectOnAISNarrative; + + @JsonProperty("sailingUnderName") + private Integer sailingUnderName; + + @JsonProperty("sailingUnderNameNarrative") + private String sailingUnderNameNarrative; + + @JsonProperty("anomalousMessagesFromMMSI") + private Integer anomalousMessagesFromMMSI; + + @JsonProperty("anomalousMessagesFromMMSINarrative") + private String anomalousMessagesFromMMSINarrative; + + @JsonProperty("mostRecentDarkActivity") + private Integer mostRecentDarkActivity; + + @JsonProperty("mostRecentDarkActivityNarrative") + private String mostRecentDarkActivityNarrative; + + // 3. Operational & History Risk + @JsonProperty("portCalls") + private Integer portCalls; + + @JsonProperty("portCallsNarrative") + private String portCallsNarrative; + + @JsonProperty("portRisk") + private Integer portRisk; + + @JsonProperty("portRiskNarrative") + private String portRiskNarrative; + + @JsonProperty("stsOperations") + private Integer stsOperations; + + @JsonProperty("stsOperationsNarrative") + private String stsOperationsNarrative; + + @JsonProperty("driftingHighSeas") + private Integer driftingHighSeas; + + @JsonProperty("driftingHighSeasNarrative") + private String driftingHighSeasNarrative; + + @JsonProperty("riskEvents") + private Integer riskEvents; + + @JsonProperty("riskEventNarrative") + private String riskEventNarrative; + + @JsonProperty("riskEventNarrativeExtended") + private String riskEventNarrativeExtended; + + @JsonProperty("flagChanges") + private Integer flagChanges; + + @JsonProperty("flagChangeNarrative") + private String flagChangeNarrative; + + // 4. PSC (Port State Control) & Flag Risk + @JsonProperty("flagParisMOUPerformance") + private Integer flagParisMOUPerformance; + + @JsonProperty("flagParisMOUPerformanceNarrative") + private String flagParisMOUPerformanceNarrative; + + @JsonProperty("flagTokyoMOUPeformance") + private Integer flagTokyoMOUPeformance; + + @JsonProperty("flagTokyoMOUPeformanceNarrative") + private String flagTokyoMOUPeformanceNarrative; + + @JsonProperty("flagUSCGMOUPerformance") + private Integer flagUSCGMOUPerformance; + + @JsonProperty("flagUSCGMOUPerformanceNarrative") + private String flagUSCGMOUPerformanceNarrative; + + @JsonProperty("uscgQualship21") + private Integer uscgQualship21; + + @JsonProperty("uscgQualship21Narrative") + private String uscgQualship21Narrative; + + @JsonProperty("timeSincePSCInspection") + private Integer timeSincePSCInspection; + + @JsonProperty("timeSincePSCInspectionNarrative") + private String timeSincePSCInspectionNarrative; + + @JsonProperty("pscInspections") + private Integer pscInspections; + + @JsonProperty("pscInspectionNarrative") + private String pscInspectionNarrative; + + @JsonProperty("pscDefects") + private Integer pscDefects; + + @JsonProperty("pscDefectsNarrative") + private String pscDefectsNarrative; + + @JsonProperty("pscDetentions") + private Integer pscDetentions; + + @JsonProperty("pscDetentionsNarrative") + private String pscDetentionsNarrative; + + // 5. Certification & Class Risk + @JsonProperty("currentSMCCertificate") + private Integer currentSMCCertificate; + + @JsonProperty("currentSMCCertificateNarrative") + private String currentSMCCertificateNarrative; + + @JsonProperty("docChanges") + private Integer docChanges; + + @JsonProperty("docChangesNarrative") + private String docChangesNarrative; + + @JsonProperty("currentClass") + private Integer currentClass; + + @JsonProperty("currentClassNarrative") + private String currentClassNarrative; + + @JsonProperty("currentClassNarrativeExtended") + private String currentClassNarrativeExtended; + + @JsonProperty("classStatusChanges") + private Integer classStatusChanges; + + @JsonProperty("classStatusChangesNarrative") + private String classStatusChangesNarrative; + + // 6. Ownership & Financial Risk + @JsonProperty("pandICoverage") + private Integer pandICoverage; + + @JsonProperty("pandICoverageNarrative") + private String pandICoverageNarrative; + + @JsonProperty("pandICoverageNarrativeExtended") + private String pandICoverageNarrativeExtended; + + @JsonProperty("nameChanges") + private Integer nameChanges; + + @JsonProperty("nameChangesNarrative") + private String nameChangesNarrative; + + @JsonProperty("gboChanges") + private Integer gboChanges; + + @JsonProperty("gboChangesNarrative") + private String gboChangesNarrative; + + @JsonProperty("ageOfShip") + private Integer ageOfShip; + + @JsonProperty("ageofShipNarrative") + private String ageofShipNarrative; + + // 7. Sanctions & Specialized Risk + @JsonProperty("iuuFishingViolation") + private Integer iuuFishingViolation; + + @JsonProperty("iuuFishingNarrative") + private String iuuFishingNarrative; // null 값 포함 + + @JsonProperty("draughtChanges") + private Integer draughtChanges; + + @JsonProperty("draughtChangesNarrative") + private String draughtChangesNarrative; + + @JsonProperty("mostRecentSanctionedPortCall") + private Integer mostRecentSanctionedPortCall; + + @JsonProperty("mostRecentSanctionedPortCallNarrative") + private String mostRecentSanctionedPortCallNarrative; // null 값 포함 + + @JsonProperty("singleShipOperation") + private Integer singleShipOperation; + + @JsonProperty("singleShipOperationNarrative") + private String singleShipOperationNarrative; + + @JsonProperty("fleetSafety") + private Integer fleetSafety; + + @JsonProperty("fleetSafetyNarrative") + private String fleetSafetyNarrative; + + @JsonProperty("fleetPSC") + private Integer fleetPSC; + + @JsonProperty("fleetPSCNarrative") + private String fleetPSCNarrative; + + // 8. Survey & Other Risk + @JsonProperty("specialSurveyOverdue") + private Integer specialSurveyOverdue; + + @JsonProperty("specialSurveyOverdueNarrative") + private String specialSurveyOverdueNarrative; + + @JsonProperty("ownerUnknown") + private Integer ownerUnknown; + + @JsonProperty("ownerUnknownNarrative") + private String ownerUnknownNarrative; + + @JsonProperty("russianPortCall") + private Integer russianPortCall; + + @JsonProperty("russianPortCallNarrative") + private String russianPortCallNarrative; + + @JsonProperty("russianOwnerRegistration") + private Integer russianOwnerRegistration; + + @JsonProperty("russianOwnerRegistrationNarrative") + private String russianOwnerRegistrationNarrative; + + @JsonProperty("russianSTS") + private Integer russianSTS; + + @JsonProperty("russianSTSNarrative") + private String russianSTSNarrative; + +} \ No newline at end of file diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/dto/RiskResponse.java b/src/main/java/com/snp/batch/jobs/risk/batch/dto/RiskResponse.java new file mode 100644 index 0000000..1ab23e9 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/dto/RiskResponse.java @@ -0,0 +1,16 @@ +package com.snp.batch.jobs.risk.batch.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class RiskResponse { + private List riskDtoList; +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/entity/RiskEntity.java b/src/main/java/com/snp/batch/jobs/risk/batch/entity/RiskEntity.java new file mode 100644 index 0000000..e5ccc1c --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/entity/RiskEntity.java @@ -0,0 +1,189 @@ +package com.snp.batch.jobs.risk.batch.entity; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.snp.batch.common.batch.entity.BaseEntity; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class RiskEntity extends BaseEntity { + + private String lrno; + + private String lastUpdated; + + private Integer riskDataMaintained; + + private Integer daysSinceLastSeenOnAIS; + + private String daysSinceLastSeenOnAISNarrative; + + private Integer daysUnderAIS; + + private String daysUnderAISNarrative; + + private Integer imoCorrectOnAIS; + + private String imoCorrectOnAISNarrative; + + private Integer sailingUnderName; + + private String sailingUnderNameNarrative; + + private Integer anomalousMessagesFromMMSI; + + private String anomalousMessagesFromMMSINarrative; + + private Integer mostRecentDarkActivity; + + private String mostRecentDarkActivityNarrative; + + private Integer portCalls; + + private String portCallsNarrative; + + private Integer portRisk; + + private String portRiskNarrative; + + private Integer stsOperations; + + private String stsOperationsNarrative; + + private Integer driftingHighSeas; + + private String driftingHighSeasNarrative; + + private Integer riskEvents; + + private String riskEventNarrative; + + private String riskEventNarrativeExtended; + + private Integer flagChanges; + + private String flagChangeNarrative; + + private Integer flagParisMOUPerformance; + + private String flagParisMOUPerformanceNarrative; + + private Integer flagTokyoMOUPeformance; + + private String flagTokyoMOUPeformanceNarrative; + + private Integer flagUSCGMOUPerformance; + + private String flagUSCGMOUPerformanceNarrative; + + private Integer uscgQualship21; + + private String uscgQualship21Narrative; + + private Integer timeSincePSCInspection; + + private String timeSincePSCInspectionNarrative; + + private Integer pscInspections; + + private String pscInspectionNarrative; + + private Integer pscDefects; + + private String pscDefectsNarrative; + + private Integer pscDetentions; + + private String pscDetentionsNarrative; + + private Integer currentSMCCertificate; + + private String currentSMCCertificateNarrative; + + private Integer docChanges; + + private String docChangesNarrative; + + private Integer currentClass; + + private String currentClassNarrative; + + private String currentClassNarrativeExtended; + + private Integer classStatusChanges; + + private String classStatusChangesNarrative; + + private Integer pandICoverage; + + private String pandICoverageNarrative; + + private String pandICoverageNarrativeExtended; + + private Integer nameChanges; + + private String nameChangesNarrative; + + private Integer gboChanges; + + private String gboChangesNarrative; + + private Integer ageOfShip; + + private String ageofShipNarrative; + + private Integer iuuFishingViolation; + + private String iuuFishingNarrative; // null 값 포함 + + private Integer draughtChanges; + + private String draughtChangesNarrative; + + private Integer mostRecentSanctionedPortCall; + + private String mostRecentSanctionedPortCallNarrative; // null 값 포함 + + private Integer singleShipOperation; + + private String singleShipOperationNarrative; + + private Integer fleetSafety; + + private String fleetSafetyNarrative; + + private Integer fleetPSC; + + private String fleetPSCNarrative; + + private Integer specialSurveyOverdue; + + private String specialSurveyOverdueNarrative; + + private Integer ownerUnknown; + + private String ownerUnknownNarrative; + + private Integer russianPortCall; + + private String russianPortCallNarrative; + + private Integer russianOwnerRegistration; + + private String russianOwnerRegistrationNarrative; + + private Integer russianSTS; + + private String russianSTSNarrative; + +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/processor/RiskDataProcessor.java b/src/main/java/com/snp/batch/jobs/risk/batch/processor/RiskDataProcessor.java new file mode 100644 index 0000000..3f96d9b --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/processor/RiskDataProcessor.java @@ -0,0 +1,122 @@ +package com.snp.batch.jobs.risk.batch.processor; + +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.risk.batch.dto.RiskDto; +import com.snp.batch.jobs.risk.batch.entity.RiskEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class RiskDataProcessor extends BaseProcessor { + @Override + protected RiskEntity processItem(RiskDto dto) throws Exception { + log.debug("Risk 데이터 처리 시작: imoNumber={}", dto.getLrno()); + + RiskEntity entity = RiskEntity.builder() + // 1. Vessel and General Information + .lrno(dto.getLrno()) + .lastUpdated(dto.getLastUpdated()) + .riskDataMaintained(dto.getRiskDataMaintained()) + + // 2. AIS/Tracking Risk + .daysSinceLastSeenOnAIS(dto.getDaysSinceLastSeenOnAIS()) + .daysSinceLastSeenOnAISNarrative(dto.getDaysSinceLastSeenOnAISNarrative()) + .daysUnderAIS(dto.getDaysUnderAIS()) + .daysUnderAISNarrative(dto.getDaysUnderAISNarrative()) + .imoCorrectOnAIS(dto.getImoCorrectOnAIS()) + .imoCorrectOnAISNarrative(dto.getImoCorrectOnAISNarrative()) + .sailingUnderName(dto.getSailingUnderName()) + .sailingUnderNameNarrative(dto.getSailingUnderNameNarrative()) + .anomalousMessagesFromMMSI(dto.getAnomalousMessagesFromMMSI()) + .anomalousMessagesFromMMSINarrative(dto.getAnomalousMessagesFromMMSINarrative()) + .mostRecentDarkActivity(dto.getMostRecentDarkActivity()) + .mostRecentDarkActivityNarrative(dto.getMostRecentDarkActivityNarrative()) + + // 3. Operational & History Risk + .portCalls(dto.getPortCalls()) + .portCallsNarrative(dto.getPortCallsNarrative()) + .portRisk(dto.getPortRisk()) + .portRiskNarrative(dto.getPortRiskNarrative()) + .stsOperations(dto.getStsOperations()) + .stsOperationsNarrative(dto.getStsOperationsNarrative()) + .driftingHighSeas(dto.getDriftingHighSeas()) + .driftingHighSeasNarrative(dto.getDriftingHighSeasNarrative()) + .riskEvents(dto.getRiskEvents()) + .riskEventNarrative(dto.getRiskEventNarrative()) + .riskEventNarrativeExtended(dto.getRiskEventNarrativeExtended()) + .flagChanges(dto.getFlagChanges()) + .flagChangeNarrative(dto.getFlagChangeNarrative()) + + // 4. PSC (Port State Control) & Flag Risk + .flagParisMOUPerformance(dto.getFlagParisMOUPerformance()) + .flagParisMOUPerformanceNarrative(dto.getFlagParisMOUPerformanceNarrative()) + .flagTokyoMOUPeformance(dto.getFlagTokyoMOUPeformance()) + .flagTokyoMOUPeformanceNarrative(dto.getFlagTokyoMOUPeformanceNarrative()) + .flagUSCGMOUPerformance(dto.getFlagUSCGMOUPerformance()) + .flagUSCGMOUPerformanceNarrative(dto.getFlagUSCGMOUPerformanceNarrative()) + .uscgQualship21(dto.getUscgQualship21()) + .uscgQualship21Narrative(dto.getUscgQualship21Narrative()) + .timeSincePSCInspection(dto.getTimeSincePSCInspection()) + .timeSincePSCInspectionNarrative(dto.getTimeSincePSCInspectionNarrative()) + .pscInspections(dto.getPscInspections()) + .pscInspectionNarrative(dto.getPscInspectionNarrative()) + .pscDefects(dto.getPscDefects()) + .pscDefectsNarrative(dto.getPscDefectsNarrative()) + .pscDetentions(dto.getPscDetentions()) + .pscDetentionsNarrative(dto.getPscDetentionsNarrative()) + + // 5. Certification & Class Risk + .currentSMCCertificate(dto.getCurrentSMCCertificate()) + .currentSMCCertificateNarrative(dto.getCurrentSMCCertificateNarrative()) + .docChanges(dto.getDocChanges()) + .docChangesNarrative(dto.getDocChangesNarrative()) + .currentClass(dto.getCurrentClass()) + .currentClassNarrative(dto.getCurrentClassNarrative()) + .currentClassNarrativeExtended(dto.getCurrentClassNarrativeExtended()) + .classStatusChanges(dto.getClassStatusChanges()) + .classStatusChangesNarrative(dto.getClassStatusChangesNarrative()) + + // 6. Ownership & Financial Risk + .pandICoverage(dto.getPandICoverage()) + .pandICoverageNarrative(dto.getPandICoverageNarrative()) + .pandICoverageNarrativeExtended(dto.getPandICoverageNarrativeExtended()) + .nameChanges(dto.getNameChanges()) + .nameChangesNarrative(dto.getNameChangesNarrative()) + .gboChanges(dto.getGboChanges()) + .gboChangesNarrative(dto.getGboChangesNarrative()) + .ageOfShip(dto.getAgeOfShip()) + .ageofShipNarrative(dto.getAgeofShipNarrative()) + + // 7. Sanctions & Specialized Risk + .iuuFishingViolation(dto.getIuuFishingViolation()) + .iuuFishingNarrative(dto.getIuuFishingNarrative()) + .draughtChanges(dto.getDraughtChanges()) + .draughtChangesNarrative(dto.getDraughtChangesNarrative()) + .mostRecentSanctionedPortCall(dto.getMostRecentSanctionedPortCall()) + .mostRecentSanctionedPortCallNarrative(dto.getMostRecentSanctionedPortCallNarrative()) + .singleShipOperation(dto.getSingleShipOperation()) + .singleShipOperationNarrative(dto.getSingleShipOperationNarrative()) + .fleetSafety(dto.getFleetSafety()) + .fleetSafetyNarrative(dto.getFleetSafetyNarrative()) + .fleetPSC(dto.getFleetPSC()) + .fleetPSCNarrative(dto.getFleetPSCNarrative()) + + // 8. Survey & Other Risk + .specialSurveyOverdue(dto.getSpecialSurveyOverdue()) + .specialSurveyOverdueNarrative(dto.getSpecialSurveyOverdueNarrative()) + .ownerUnknown(dto.getOwnerUnknown()) + .ownerUnknownNarrative(dto.getOwnerUnknownNarrative()) + .russianPortCall(dto.getRussianPortCall()) + .russianPortCallNarrative(dto.getRussianPortCallNarrative()) + .russianOwnerRegistration(dto.getRussianOwnerRegistration()) + .russianOwnerRegistrationNarrative(dto.getRussianOwnerRegistrationNarrative()) + .russianSTS(dto.getRussianSTS()) + .russianSTSNarrative(dto.getRussianSTSNarrative()) + .build(); + + log.debug("Risk 데이터 처리 완료: imoNumber={}", dto.getLrno()); + + return entity; + } +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataReader.java b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataReader.java new file mode 100644 index 0000000..be6108f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataReader.java @@ -0,0 +1,144 @@ +package com.snp.batch.jobs.risk.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.risk.batch.dto.RiskDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; + +@Slf4j +public class RiskDataReader extends BaseApiReader { + + //TODO : + // 1. Core20 IMO_NUMBER 전체 조회 + // 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복) + // 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복) + + private final JdbcTemplate jdbcTemplate; + + private List allImoNumbers; + private int currentBatchIndex = 0; + private final int batchSize = 100; + + public RiskDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "riskDataReader"; + } + + @Override + protected String getApiPath() { + return "/RiskAndCompliance/RisksByImos"; + } + +// private String getTargetTable(){ +// return "snp_data.core20"; +// } + private String getTargetTable(){ + return "snp_data.ship_data"; + } + private String GET_CORE_IMO_LIST = +// "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno"; + "select imo_number as ihslrorimoshipno from snp_data.ship_data order by imo_number"; + @Override + protected void beforeFetch(){ + log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_CORE_IMO_LIST, String.class); + + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + updateApiCallStats(totalBatches, 0); + } + + @Override + protected List fetchNextBatch() throws Exception { + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + List response = callAisApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + // 응답 처리 + if (response != null) { +// List targets = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, response.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return response; + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + private List callAisApiWithBatch(String imoNumbers) { + String url = getApiPath() + "?imos=" + imoNumbers; + log.debug("[{}] API 호출: {}", getReaderName(), url); + return webClient.get() + .uri(url) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() {}) + .block(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepository.java b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepository.java new file mode 100644 index 0000000..795a052 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepository.java @@ -0,0 +1,9 @@ +package com.snp.batch.jobs.risk.batch.repository; + +import com.snp.batch.jobs.risk.batch.entity.RiskEntity; + +import java.util.List; + +public interface RiskRepository { + void saveRiskAll(List items); +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java new file mode 100644 index 0000000..ae6c6c8 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java @@ -0,0 +1,271 @@ +package com.snp.batch.jobs.risk.batch.repository; + +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.risk.batch.entity.RiskEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.util.List; + +@Slf4j +@Repository("riskRepository") +public class RiskRepositoryImpl extends BaseJdbcRepository implements RiskRepository { + + public RiskRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + + @Override + protected String getTableName() { + return null; + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + protected Long extractId(RiskEntity entity) { + return null; + } + + @Override + protected String getInsertSql() { + return null; + } + + @Override + protected String getUpdateSql() { + return """ + INSERT INTO snp_data.risk ( + lrno, lastupdated, riskdatamaintained, dayssincelastseenonais, dayssincelastseenonaisnarrative, + daysunderais, daysunderaisnarrative, imocorrectonais, imocorrectonaisnarrative, sailingundername, + sailingundernamenarrative, anomalousmessagesfrommmsi, anomalousmessagesfrommmsinarrative, + mostrecentdarkactivity, mostrecentdarkactivitynarrative, portcalls, portcallsnarrative, portrisk, + portrisknarrative, stsoperations, stsoperationsnarrative, driftinghighseas, driftinghighseasnarrative, + riskevents, riskeventnarrative, riskeventnarrativeextended, flagchanges, flagchangenarrative, + flagparismouperformance, flagparismouperformancenarrative, flagtokyomoupeformance, flagtokyomoupeformancenarrative, + flaguscgmouperformance, flaguscgmouperformancenarrative, uscgqualship21, uscgqualship21narrative, + timesincepscinspection, timesincepscinspectionnarrative, pscinspections, pscinspectionnarrative, + pscdefects, pscdefectsnarrative, pscdetentions, pscdetentionsnarrative, currentsmccertificate, + currentsmccertificatenarrative, docchanges, docchangesnarrative, currentclass, currentclassnarrative, + currentclassnarrativeextended, classstatuschanges, classstatuschangesnarrative, pandicoverage, + pandicoveragenarrative, pandicoveragenarrativeextended, namechanges, namechangesnarrative, gbochanges, + gbochangesnarrative, ageofship, ageofshipnarrative, iuufishingviolation, iuufishingnarrative, + draughtchanges, draughtchangesnarrative, mostrecentsanctionedportcall, mostrecentsanctionedportcallnarrative, + singleshipoperation, singleshipoperationnarrative, fleetsafety, fleetsafetynarrative, fleetpsc, + fleetpscnarrative, specialsurveyoverdue, specialsurveyoverduenarrative, ownerunknown, ownerunknownnarrative, + russianportcall, russianportcallnarrative, russianownerregistration, russianownerregistrationnarrative, + russiansts, russianstsnarrative + ) + VALUES ( + ?, ?::timestamptz, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ON CONFLICT (lrno, lastupdated) + DO UPDATE SET + riskdatamaintained = EXCLUDED.riskdatamaintained, + dayssincelastseenonais = EXCLUDED.dayssincelastseenonais, + dayssincelastseenonaisnarrative = EXCLUDED.dayssincelastseenonaisnarrative, + daysunderais = EXCLUDED.daysunderais, + daysunderaisnarrative = EXCLUDED.daysunderaisnarrative, + imocorrectonais = EXCLUDED.imocorrectonais, + imocorrectonaisnarrative = EXCLUDED.imocorrectonaisnarrative, + sailingundername = EXCLUDED.sailingundername, + sailingundernamenarrative = EXCLUDED.sailingundernamenarrative, + anomalousmessagesfrommmsi = EXCLUDED.anomalousmessagesfrommmsi, + anomalousmessagesfrommmsinarrative = EXCLUDED.anomalousmessagesfrommmsinarrative, + mostrecentdarkactivity = EXCLUDED.mostrecentdarkactivity, + mostrecentdarkactivitynarrative = EXCLUDED.mostrecentdarkactivitynarrative, + portcalls = EXCLUDED.portcalls, + portcallsnarrative = EXCLUDED.portcallsnarrative, + portrisk = EXCLUDED.portrisk, + portrisknarrative = EXCLUDED.portrisknarrative, + stsoperations = EXCLUDED.stsoperations, + stsoperationsnarrative = EXCLUDED.stsoperationsnarrative, + driftinghighseas = EXCLUDED.driftinghighseas, + driftinghighseasnarrative = EXCLUDED.driftinghighseasnarrative, + riskevents = EXCLUDED.riskevents, + riskeventnarrative = EXCLUDED.riskeventnarrative, + riskeventnarrativeextended = EXCLUDED.riskeventnarrativeextended, + flagchanges = EXCLUDED.flagchanges, + flagchangenarrative = EXCLUDED.flagchangenarrative, + flagparismouperformance = EXCLUDED.flagparismouperformance, + flagparismouperformancenarrative = EXCLUDED.flagparismouperformancenarrative, + flagtokyomoupeformance = EXCLUDED.flagtokyomoupeformance, + flagtokyomoupeformancenarrative = EXCLUDED.flagtokyomoupeformancenarrative, + flaguscgmouperformance = EXCLUDED.flaguscgmouperformance, + flaguscgmouperformancenarrative = EXCLUDED.flaguscgmouperformancenarrative, + uscgqualship21 = EXCLUDED.uscgqualship21, + uscgqualship21narrative = EXCLUDED.uscgqualship21narrative, + timesincepscinspection = EXCLUDED.timesincepscinspection, + timesincepscinspectionnarrative = EXCLUDED.timesincepscinspectionnarrative, + pscinspections = EXCLUDED.pscinspections, + pscinspectionnarrative = EXCLUDED.pscinspectionnarrative, + pscdefects = EXCLUDED.pscdefects, + pscdefectsnarrative = EXCLUDED.pscdefectsnarrative, + pscdetentions = EXCLUDED.pscdetentions, + pscdetentionsnarrative = EXCLUDED.pscdetentionsnarrative, + currentsmccertificate = EXCLUDED.currentsmccertificate, + currentsmccertificatenarrative = EXCLUDED.currentsmccertificatenarrative, + docchanges = EXCLUDED.docchanges, + docchangesnarrative = EXCLUDED.docchangesnarrative, + currentclass = EXCLUDED.currentclass, + currentclassnarrative = EXCLUDED.currentclassnarrative, + currentclassnarrativeextended = EXCLUDED.currentclassnarrativeextended, + classstatuschanges = EXCLUDED.classstatuschanges, + classstatuschangesnarrative = EXCLUDED.classstatuschangesnarrative, + pandicoverage = EXCLUDED.pandicoverage, + pandicoveragenarrative = EXCLUDED.pandicoveragenarrative, + pandicoveragenarrativeextended = EXCLUDED.pandicoveragenarrativeextended, + namechanges = EXCLUDED.namechanges, + namechangesnarrative = EXCLUDED.namechangesnarrative, + gbochanges = EXCLUDED.gbochanges, + gbochangesnarrative = EXCLUDED.gbochangesnarrative, + ageofship = EXCLUDED.ageofship, + ageofshipnarrative = EXCLUDED.ageofshipnarrative, + iuufishingviolation = EXCLUDED.iuufishingviolation, + iuufishingnarrative = EXCLUDED.iuufishingnarrative, + draughtchanges = EXCLUDED.draughtchanges, + draughtchangesnarrative = EXCLUDED.draughtchangesnarrative, + mostrecentsanctionedportcall = EXCLUDED.mostrecentsanctionedportcall, + mostrecentsanctionedportcallnarrative = EXCLUDED.mostrecentsanctionedportcallnarrative, + singleshipoperation = EXCLUDED.singleshipoperation, + singleshipoperationnarrative = EXCLUDED.singleshipoperationnarrative, + fleetsafety = EXCLUDED.fleetsafety, + fleetsafetynarrative = EXCLUDED.fleetsafetynarrative, + fleetpsc = EXCLUDED.fleetpsc, + fleetpscnarrative = EXCLUDED.fleetpscnarrative, + specialsurveyoverdue = EXCLUDED.specialsurveyoverdue, + specialsurveyoverduenarrative = EXCLUDED.specialsurveyoverduenarrative, + ownerunknown = EXCLUDED.ownerunknown, + ownerunknownnarrative = EXCLUDED.ownerunknownnarrative, + russianportcall = EXCLUDED.russianportcall, + russianportcallnarrative = EXCLUDED.russianportcallnarrative, + russianownerregistration = EXCLUDED.russianownerregistration, + russianownerregistrationnarrative = EXCLUDED.russianownerregistrationnarrative, + russiansts = EXCLUDED.russiansts, + russianstsnarrative = EXCLUDED.russianstsnarrative; + """; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, RiskEntity entity) throws Exception { + + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, RiskEntity entity) throws Exception { + int idx = 1; + ps.setString(idx++, entity.getLrno()); + ps.setString(idx++, entity.getLastUpdated()); + ps.setObject(idx++, entity.getRiskDataMaintained(), java.sql.Types.INTEGER); + ps.setObject(idx++, entity.getDaysSinceLastSeenOnAIS(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getDaysSinceLastSeenOnAISNarrative()); + ps.setObject(idx++, entity.getDaysUnderAIS(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getDaysUnderAISNarrative()); + ps.setObject(idx++, entity.getImoCorrectOnAIS(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getImoCorrectOnAISNarrative()); + ps.setObject(idx++, entity.getSailingUnderName(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getSailingUnderNameNarrative()); + ps.setObject(idx++, entity.getAnomalousMessagesFromMMSI(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getAnomalousMessagesFromMMSINarrative()); + ps.setObject(idx++, entity.getMostRecentDarkActivity(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getMostRecentDarkActivityNarrative()); + ps.setObject(idx++, entity.getPortCalls(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getPortCallsNarrative()); + ps.setObject(idx++, entity.getPortRisk(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getPortRiskNarrative()); + ps.setObject(idx++, entity.getStsOperations(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getStsOperationsNarrative()); + ps.setObject(idx++, entity.getDriftingHighSeas(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getDriftingHighSeasNarrative()); + ps.setObject(idx++, entity.getRiskEvents(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getRiskEventNarrative()); + ps.setString(idx++, entity.getRiskEventNarrativeExtended()); + ps.setObject(idx++, entity.getFlagChanges(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getFlagChangeNarrative()); + ps.setObject(idx++, entity.getFlagParisMOUPerformance(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getFlagParisMOUPerformanceNarrative()); + ps.setObject(idx++, entity.getFlagTokyoMOUPeformance(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getFlagTokyoMOUPeformanceNarrative()); + ps.setObject(idx++, entity.getFlagUSCGMOUPerformance(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getFlagUSCGMOUPerformanceNarrative()); + ps.setObject(idx++, entity.getUscgQualship21(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getUscgQualship21Narrative()); + ps.setObject(idx++, entity.getTimeSincePSCInspection(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getTimeSincePSCInspectionNarrative()); + ps.setObject(idx++, entity.getPscInspections(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getPscInspectionNarrative()); + ps.setObject(idx++, entity.getPscDefects(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getPscDefectsNarrative()); + ps.setObject(idx++, entity.getPscDetentions(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getPscDetentionsNarrative()); + ps.setObject(idx++, entity.getCurrentSMCCertificate(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getCurrentSMCCertificateNarrative()); + ps.setObject(idx++, entity.getDocChanges(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getDocChangesNarrative()); + ps.setObject(idx++, entity.getCurrentClass(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getCurrentClassNarrative()); + ps.setString(idx++, entity.getCurrentClassNarrativeExtended()); + ps.setObject(idx++, entity.getClassStatusChanges(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getClassStatusChangesNarrative()); + ps.setObject(idx++, entity.getPandICoverage(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getPandICoverageNarrative()); + ps.setString(idx++, entity.getPandICoverageNarrativeExtended()); + ps.setObject(idx++, entity.getNameChanges(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getNameChangesNarrative()); + ps.setObject(idx++, entity.getGboChanges(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getGboChangesNarrative()); + ps.setObject(idx++, entity.getAgeOfShip(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getAgeofShipNarrative()); + ps.setObject(idx++, entity.getIuuFishingViolation(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getIuuFishingNarrative()); + ps.setObject(idx++, entity.getDraughtChanges(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getDraughtChangesNarrative()); + ps.setObject(idx++, entity.getMostRecentSanctionedPortCall(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getMostRecentSanctionedPortCallNarrative()); + ps.setObject(idx++, entity.getSingleShipOperation(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getSingleShipOperationNarrative()); + ps.setObject(idx++, entity.getFleetSafety(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getFleetSafetyNarrative()); + ps.setObject(idx++, entity.getFleetPSC(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getFleetPSCNarrative()); + ps.setObject(idx++, entity.getSpecialSurveyOverdue(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getSpecialSurveyOverdueNarrative()); + ps.setObject(idx++, entity.getOwnerUnknown(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getOwnerUnknownNarrative()); + ps.setObject(idx++, entity.getRussianPortCall(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getRussianPortCallNarrative()); + ps.setObject(idx++, entity.getRussianOwnerRegistration(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getRussianOwnerRegistrationNarrative()); + ps.setObject(idx++, entity.getRussianSTS(), java.sql.Types.INTEGER); + ps.setString(idx++, entity.getRussianSTSNarrative()); + } + + @Override + protected String getEntityName() { + return "RiskEntity"; + } + + @Override + public void saveRiskAll(List items) { + if (items == null || items.isEmpty()) { + return; + } + jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(), + (ps, entity) -> { + try { + setUpdateParameters(ps, entity); + } catch (Exception e) { + log.error("배치 수정 파라미터 설정 실패", e); + throw new RuntimeException(e); + } + }); + + log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size()); + } +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDataWriter.java b/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDataWriter.java new file mode 100644 index 0000000..9bb7a6a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/writer/RiskDataWriter.java @@ -0,0 +1,26 @@ +package com.snp.batch.jobs.risk.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.risk.batch.entity.RiskEntity; +import com.snp.batch.jobs.risk.batch.repository.RiskRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Slf4j +@Component +public class RiskDataWriter extends BaseWriter { + + private final RiskRepository riskRepository; + public RiskDataWriter(RiskRepository riskRepository) { + super("riskRepository"); + this.riskRepository = riskRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + riskRepository.saveRiskAll(items); + log.info("Risk 저장 완료: 수정={} 건", items.size()); + } +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/config/SanctionUpdateJobConfig.java b/src/main/java/com/snp/batch/jobs/sanction/batch/config/SanctionUpdateJobConfig.java new file mode 100644 index 0000000..1707cdd --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/config/SanctionUpdateJobConfig.java @@ -0,0 +1,86 @@ +package com.snp.batch.jobs.sanction.batch.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto; +import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity; +import com.snp.batch.jobs.sanction.batch.processor.ComplianceDataProcessor; +import com.snp.batch.jobs.sanction.batch.reader.ComplianceDataReader; +import com.snp.batch.jobs.sanction.batch.writer.ComplianceDataWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +@Slf4j +@Configuration +public class SanctionUpdateJobConfig extends BaseJobConfig { + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeServiceApiWebClient; + + private final ComplianceDataProcessor complianceDataProcessor; + + private final ComplianceDataWriter complianceDataWriter; + + @Override + protected int getChunkSize() { + return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정 + } + public SanctionUpdateJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + ComplianceDataProcessor complianceDataProcessor, + ComplianceDataWriter complianceDataWriter, + JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient) { + super(jobRepository, transactionManager); + this.jdbcTemplate = jdbcTemplate; + this.maritimeServiceApiWebClient = maritimeServiceApiWebClient; + this.complianceDataProcessor = complianceDataProcessor; + this.complianceDataWriter = complianceDataWriter; + } + + @Override + protected String getJobName() { + return "sanctionUpdateJob"; + } + + @Override + protected String getStepName() { + return "sanctionUpdateStep"; + } + + @Override + protected ItemReader createReader() { + return new ComplianceDataReader(maritimeServiceApiWebClient, jdbcTemplate); + } + + @Override + protected ItemProcessor createProcessor() { + return complianceDataProcessor; + } + + @Override + protected ItemWriter createWriter() { + return complianceDataWriter; + } + + @Bean(name = "sanctionUpdateJob") + public Job sanctionUpdateJob() { + return job(); + } + + @Bean(name = "sanctionUpdateStep") + public Step sanctionUpdateStep() { + return step(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/dto/ComplianceDto.java b/src/main/java/com/snp/batch/jobs/sanction/batch/dto/ComplianceDto.java new file mode 100644 index 0000000..d898e0f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/dto/ComplianceDto.java @@ -0,0 +1,121 @@ +package com.snp.batch.jobs.sanction.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ComplianceDto { + + @JsonProperty("shipEUSanctionList") + private Integer shipEUSanctionList; + + @JsonProperty("shipUNSanctionList") + private Integer shipUNSanctionList; + + @JsonProperty("lrimoShipNo") + private String lrimoShipNo; + + @JsonProperty("dateAmended") + private String dateAmended; // 수정일시 + + // 2. Compliance Status (Integer 타입은 0, 1, 2 등의 코드 및 null 값 처리) + @JsonProperty("legalOverall") + private Integer legalOverall; // 종합제재 + + @JsonProperty("shipBESSanctionList") + private Integer shipBESSanctionList; // 선박BES제재 + + @JsonProperty("shipDarkActivityIndicator") + private Integer shipDarkActivityIndicator; // 선박다크활동 + + @JsonProperty("shipDetailsNoLongerMaintained") + private Integer shipDetailsNoLongerMaintained; // 선박세부정보미유지 + + @JsonProperty("shipFlagDisputed") + private Integer shipFlagDisputed; // 선박국기논쟁 + + @JsonProperty("shipFlagSanctionedCountry") + private Integer shipFlagSanctionedCountry; // 선박국가제재 + + @JsonProperty("shipHistoricalFlagSanctionedCountry") + private Integer shipHistoricalFlagSanctionedCountry; // 선박국가제재이력 + + @JsonProperty("shipOFACNonSDNSanctionList") + private Integer shipOFACNonSDNSanctionList; // 선박OFAC비SDN제재 + + @JsonProperty("shipOFACSanctionList") + private Integer shipOFACSanctionList; // 선박OFAC제재 + + @JsonProperty("shipOFACAdvisoryList") + private Integer shipOFACAdvisoryList; // 선박OFAC주의 + + @JsonProperty("shipOwnerOFACSSIList") + private Integer shipOwnerOFACSSIList; // 선박소유자OFCS제재 + + @JsonProperty("shipOwnerAustralianSanctionList") + private Integer shipOwnerAustralianSanctionList; // 선박소유자AUS제재 + + @JsonProperty("shipOwnerBESSanctionList") + private Integer shipOwnerBESSanctionList; // 선박소유자BES제재 + + @JsonProperty("shipOwnerCanadianSanctionList") + private Integer shipOwnerCanadianSanctionList; // 선박소유자CAN제재 + + @JsonProperty("shipOwnerEUSanctionList") + private Integer shipOwnerEUSanctionList; // 선박소유자EU제재 + + @JsonProperty("shipOwnerFATFJurisdiction") + private Integer shipOwnerFATFJurisdiction; // 선박소유자FATF규제구역 + + @JsonProperty("shipOwnerHistoricalOFACSanctionedCountry") + private Integer shipOwnerHistoricalOFACSanctionedCountry; // 선박소유자OFAC제재이력 + + @JsonProperty("shipOwnerOFACSanctionList") + private Integer shipOwnerOFACSanctionList; // 선박소유자OFAC제재 + + @JsonProperty("shipOwnerOFACSanctionedCountry") + private Integer shipOwnerOFACSanctionedCountry; // 선박소유자OFAC제재국가 + + @JsonProperty("shipOwnerParentCompanyNonCompliance") + private Integer shipOwnerParentCompanyNonCompliance; // 선박소유자모회사비준수 + + @JsonProperty("shipOwnerParentFATFJurisdiction") + private Integer shipOwnerParentFATFJurisdiction; // 선박소유자모회사FATF규제구역 (JSON에 null 포함) + + @JsonProperty("shipOwnerParentOFACSanctionedCountry") + private Integer shipOwnerParentOFACSanctionedCountry; // 선박소유자모회사OFAC제재국가 (JSON에 null 포함) + + @JsonProperty("shipOwnerSwissSanctionList") + private Integer shipOwnerSwissSanctionList; // 선박소유자SWI제재 + + @JsonProperty("shipOwnerUAESanctionList") + private Integer shipOwnerUAESanctionList; // 선박소유자UAE제재 + + @JsonProperty("shipOwnerUNSanctionList") + private Integer shipOwnerUNSanctionList; // 선박소유자UN제재 + + @JsonProperty("shipSanctionedCountryPortCallLast12m") + private Integer shipSanctionedCountryPortCallLast12m; // 선박제재국가기항최종12M + + @JsonProperty("shipSanctionedCountryPortCallLast3m") + private Integer shipSanctionedCountryPortCallLast3m; // 선박제재국가기항최종3M + + @JsonProperty("shipSanctionedCountryPortCallLast6m") + private Integer shipSanctionedCountryPortCallLast6m; // 선박제재국가기항최종6M + + @JsonProperty("shipSecurityLegalDisputeEvent") + private Integer shipSecurityLegalDisputeEvent; // 선박보안법적분쟁이벤트 + + @JsonProperty("shipSTSPartnerNonComplianceLast12m") + private Integer shipSTSPartnerNonComplianceLast12m; // 선박STS파트너비준수12M + + @JsonProperty("shipSwissSanctionList") + private Integer shipSwissSanctionList; // 선박SWI제재 + +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/dto/ComplianceResponse.java b/src/main/java/com/snp/batch/jobs/sanction/batch/dto/ComplianceResponse.java new file mode 100644 index 0000000..1eb8e57 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/dto/ComplianceResponse.java @@ -0,0 +1,17 @@ +package com.snp.batch.jobs.sanction.batch.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ComplianceResponse { + + private List complianceDtoList; +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/entity/ComplianceEntity.java b/src/main/java/com/snp/batch/jobs/sanction/batch/entity/ComplianceEntity.java new file mode 100644 index 0000000..983876a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/entity/ComplianceEntity.java @@ -0,0 +1,90 @@ +package com.snp.batch.jobs.sanction.batch.entity; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.snp.batch.common.batch.entity.BaseEntity; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class ComplianceEntity extends BaseEntity { + + private String lrimoShipNo; // LR/IMO번호 + + private String dateAmended; // 수정일시 + + // 2. Compliance Status (모든 필드는 DTO와 동일한 Integer 타입) + private Integer legalOverall; // 종합제재 + + private Integer shipBESSanctionList; // 선박BES제재 + + private Integer shipDarkActivityIndicator; // 선박다크활동 + + private Integer shipDetailsNoLongerMaintained; // 선박세부정보미유지 + + private Integer shipEUSanctionList; // 선박EU제재 + + private Integer shipFlagDisputed; // 선박국기논쟁 + + private Integer shipFlagSanctionedCountry; // 선박국가제재 + + private Integer shipHistoricalFlagSanctionedCountry; // 선박국가제재이력 + + private Integer shipOFACNonSDNSanctionList; // 선박OFAC비SDN제재 + + private Integer shipOFACSanctionList; // 선박OFAC제재 + + private Integer shipOFACAdvisoryList; // 선박OFAC주의 + + private Integer shipOwnerOFACSSIList; // 선박소유자OFCS제재 + + private Integer shipOwnerAustralianSanctionList; // 선박소유자AUS제재 + + private Integer shipOwnerBESSanctionList; // 선박소유자BES제재 + + private Integer shipOwnerCanadianSanctionList; // 선박소유자CAN제재 + + private Integer shipOwnerEUSanctionList; // 선박소유자EU제재 + + private Integer shipOwnerFATFJurisdiction; // 선박소유자FATF규제구역 + + private Integer shipOwnerHistoricalOFACSanctionedCountry; // 선박소유자OFAC제재이력 + + private Integer shipOwnerOFACSanctionList; // 선박소유자OFAC제재 + + private Integer shipOwnerOFACSanctionedCountry; // 선박소유자OFAC제재국가 + + private Integer shipOwnerParentCompanyNonCompliance; // 선박소유자모회사비준수 + + private Integer shipOwnerParentFATFJurisdiction; // 선박소유자모회사FATF규제구역 + + private Integer shipOwnerParentOFACSanctionedCountry; // 선박소유자모회사OFAC제재국가 + + private Integer shipOwnerSwissSanctionList; // 선박소유자SWI제재 + + private Integer shipOwnerUAESanctionList; // 선박소유자UAE제재 + + private Integer shipOwnerUNSanctionList; // 선박소유자UN제재 + + private Integer shipSanctionedCountryPortCallLast12m; // 선박제재국가기항최종12M + + private Integer shipSanctionedCountryPortCallLast3m; // 선박제재국가기항최종3M + + private Integer shipSanctionedCountryPortCallLast6m; // 선박제재국가기항최종6M + + private Integer shipSecurityLegalDisputeEvent; // 선박보안법적분쟁이벤트 + + private Integer shipSTSPartnerNonComplianceLast12m; // 선박STS파트너비준수12M + + private Integer shipSwissSanctionList; // 선박SWI제재 + + private Integer shipUNSanctionList; // 선박UN제재 + + +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/processor/ComplianceDataProcessor.java b/src/main/java/com/snp/batch/jobs/sanction/batch/processor/ComplianceDataProcessor.java new file mode 100644 index 0000000..6c3ea3f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/processor/ComplianceDataProcessor.java @@ -0,0 +1,60 @@ +package com.snp.batch.jobs.sanction.batch.processor; + +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto; +import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class ComplianceDataProcessor extends BaseProcessor { + @Override + protected ComplianceEntity processItem(ComplianceDto dto) throws Exception { + log.debug("AIS 최신 항적 데이터 처리 시작: imoNumber={}", dto.getLrimoShipNo()); + + ComplianceEntity entity = ComplianceEntity.builder() + // 1. Primary Keys + .lrimoShipNo(dto.getLrimoShipNo()) + .dateAmended(dto.getDateAmended()) + // 2. Compliance Status + .legalOverall(dto.getLegalOverall()) + .shipBESSanctionList(dto.getShipBESSanctionList()) + .shipDarkActivityIndicator(dto.getShipDarkActivityIndicator()) + .shipDetailsNoLongerMaintained(dto.getShipDetailsNoLongerMaintained()) + .shipEUSanctionList(dto.getShipEUSanctionList()) + .shipFlagDisputed(dto.getShipFlagDisputed()) + .shipFlagSanctionedCountry(dto.getShipFlagSanctionedCountry()) + .shipHistoricalFlagSanctionedCountry(dto.getShipHistoricalFlagSanctionedCountry()) + .shipOFACNonSDNSanctionList(dto.getShipOFACNonSDNSanctionList()) + .shipOFACSanctionList(dto.getShipOFACSanctionList()) + .shipOFACAdvisoryList(dto.getShipOFACAdvisoryList()) + .shipOwnerOFACSSIList(dto.getShipOwnerOFACSSIList()) + .shipOwnerAustralianSanctionList(dto.getShipOwnerAustralianSanctionList()) + .shipOwnerBESSanctionList(dto.getShipOwnerBESSanctionList()) + .shipOwnerCanadianSanctionList(dto.getShipOwnerCanadianSanctionList()) + .shipOwnerEUSanctionList(dto.getShipOwnerEUSanctionList()) + .shipOwnerFATFJurisdiction(dto.getShipOwnerFATFJurisdiction()) + .shipOwnerHistoricalOFACSanctionedCountry(dto.getShipOwnerHistoricalOFACSanctionedCountry()) + .shipOwnerOFACSanctionList(dto.getShipOwnerOFACSanctionList()) + .shipOwnerOFACSanctionedCountry(dto.getShipOwnerOFACSanctionedCountry()) + .shipOwnerParentCompanyNonCompliance(dto.getShipOwnerParentCompanyNonCompliance()) + .shipOwnerParentFATFJurisdiction(dto.getShipOwnerParentFATFJurisdiction()) + .shipOwnerParentOFACSanctionedCountry(dto.getShipOwnerParentOFACSanctionedCountry()) + .shipOwnerSwissSanctionList(dto.getShipOwnerSwissSanctionList()) + .shipOwnerUAESanctionList(dto.getShipOwnerUAESanctionList()) + .shipOwnerUNSanctionList(dto.getShipOwnerUNSanctionList()) + .shipSanctionedCountryPortCallLast12m(dto.getShipSanctionedCountryPortCallLast12m()) + .shipSanctionedCountryPortCallLast3m(dto.getShipSanctionedCountryPortCallLast3m()) + .shipSanctionedCountryPortCallLast6m(dto.getShipSanctionedCountryPortCallLast6m()) + .shipSecurityLegalDisputeEvent(dto.getShipSecurityLegalDisputeEvent()) + .shipSTSPartnerNonComplianceLast12m(dto.getShipSTSPartnerNonComplianceLast12m()) + .shipSwissSanctionList(dto.getShipSwissSanctionList()) + .shipUNSanctionList(dto.getShipUNSanctionList()) + .build(); + + log.debug("AIS 최신 항적 데이터 처리 완료: imoNumber={}", dto.getLrimoShipNo()); + + return entity; + } +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataReader.java b/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataReader.java new file mode 100644 index 0000000..ac9efb5 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataReader.java @@ -0,0 +1,142 @@ +package com.snp.batch.jobs.sanction.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto; +import com.snp.batch.jobs.sanction.batch.dto.ComplianceResponse; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; + +@Slf4j +public class ComplianceDataReader extends BaseApiReader { + + //TODO : + // 1. Core20 IMO_NUMBER 전체 조회 + // 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복) + // 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복) + + private final JdbcTemplate jdbcTemplate; + + private List allImoNumbers; + private int currentBatchIndex = 0; + private final int batchSize = 100; + + public ComplianceDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) { + super(webClient); + this.jdbcTemplate = jdbcTemplate; + enableChunkMode(); // ✨ Chunk 모드 활성화 + } + + @Override + protected String getReaderName() { + return "ShipLastPositionDataReader"; + } + + @Override + protected String getApiPath() { + return "/RiskAndCompliance/CompliancesByImos"; + } + + private String getTargetTable(){ + return "snp_data.core20"; + } + private String GET_CORE_IMO_LIST = +// "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno"; + "select imo_number as ihslrorimoshipno from snp_data.ship_data order by imo_number"; + @Override + protected void beforeFetch(){ + log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName()); + + allImoNumbers = jdbcTemplate.queryForList(GET_CORE_IMO_LIST, String.class); + + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); + log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); + log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); + + updateApiCallStats(totalBatches, 0); + } + + @Override + protected List fetchNextBatch() throws Exception { + // 모든 배치 처리 완료 확인 + if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { + return null; // Job 종료 + } + + // 현재 배치의 시작/끝 인덱스 계산 + int startIndex = currentBatchIndex; + int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); + + // 현재 배치의 IMO 번호 추출 (100개) + List currentBatch = allImoNumbers.subList(startIndex, endIndex); + + int currentBatchNumber = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", + getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); + + try { + // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") + String imoParam = String.join(",", currentBatch); + + // API 호출 + List response = callAisApiWithBatch(imoParam); + + // 다음 배치로 인덱스 이동 + currentBatchIndex = endIndex; + + // 응답 처리 + if (response != null) { +// List targets = response; + log.info("[{}] 배치 {}/{} 완료: {} 건 조회", + getReaderName(), currentBatchNumber, totalBatches, response.size()); + + // API 호출 통계 업데이트 + updateApiCallStats(totalBatches, currentBatchNumber); + + // API 과부하 방지 (다음 배치 전 0.5초 대기) + if (currentBatchIndex < allImoNumbers.size()) { + Thread.sleep(500); + } + + return response; + } else { + log.warn("[{}] 배치 {}/{} 응답 없음", + getReaderName(), currentBatchNumber, totalBatches); + + // API 호출 통계 업데이트 (실패도 카운트) + updateApiCallStats(totalBatches, currentBatchNumber); + + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] 배치 {}/{} 처리 중 오류: {}", + getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); + + // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) + currentBatchIndex = endIndex; + + // 빈 리스트 반환 (Job 계속 진행) + return Collections.emptyList(); + } + } + + private List callAisApiWithBatch(String imoNumbers) { + String url = getApiPath() + "?imos=" + imoNumbers; + log.debug("[{}] API 호출: {}", getReaderName(), url); + return webClient.get() + .uri(url) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() {}) + .block(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepository.java b/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepository.java new file mode 100644 index 0000000..e3270b3 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepository.java @@ -0,0 +1,9 @@ +package com.snp.batch.jobs.sanction.batch.repository; + +import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity; + +import java.util.List; + +public interface ComplianceRepository { + void saveComplianceAll(List items); +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepositoryImpl.java new file mode 100644 index 0000000..378f872 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepositoryImpl.java @@ -0,0 +1,166 @@ +package com.snp.batch.jobs.sanction.batch.repository; + +import com.snp.batch.common.batch.repository.BaseJdbcRepository; +import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +import java.sql.PreparedStatement; +import java.sql.Types; +import java.util.List; + +@Slf4j +@Repository("complianceRepository") +public class ComplianceRepositoryImpl extends BaseJdbcRepository implements ComplianceRepository { + + public ComplianceRepositoryImpl(JdbcTemplate jdbcTemplate) { + super(jdbcTemplate); + } + + @Override + protected String getTableName() { + return null; + } + + @Override + protected RowMapper getRowMapper() { + return null; + } + + @Override + protected Long extractId(ComplianceEntity entity) { + return null; + } + + @Override + protected String getInsertSql() { + return null; + } + + @Override + protected String getUpdateSql() { + return """ + INSERT INTO snp_data.compliance ( + lrimoshipno, dateamended, legaloverall, shipbessanctionlist, shipdarkactivityindicator, + shipdetailsnolongermaintained, shipeusanctionlist, shipflagdisputed, shipflagsanctionedcountry, + shiphistoricalflagsanctionedcountry, shipofacnonsdnsanctionlist, shipofacsanctionlist, + shipofacadvisorylist, shipownerofacssilist, shipowneraustraliansanctionlist, shipownerbessanctionlist, + shipownercanadiansanctionlist, shipownereusanctionlist, shipownerfatfjurisdiction, + shipownerhistoricalofacsanctionedcountry, shipownerofacsanctionlist, shipownerofacsanctionedcountry, + shipownerparentcompanynoncompliance, shipownerparentfatfjurisdiction, shipownerparentofacsanctionedcountry, + shipownerswisssanctionlist, shipowneruaesanctionlist, shipownerunsanctionlist, + shipsanctionedcountryportcalllast12m, shipsanctionedcountryportcalllast3m, shipsanctionedcountryportcalllast6m, + shipsecuritylegaldisputeevent, shipstspartnernoncompliancelast12m, shipswisssanctionlist, + shipunsanctionlist + ) + VALUES ( + ?, ?::timestamptz, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ON CONFLICT (lrimoshipno, dateamended) + DO UPDATE SET + legaloverall = EXCLUDED.legaloverall, + shipbessanctionlist = EXCLUDED.shipbessanctionlist, + shipdarkactivityindicator = EXCLUDED.shipdarkactivityindicator, + shipdetailsnolongermaintained = EXCLUDED.shipdetailsnolongermaintained, + shipeusanctionlist = EXCLUDED.shipeusanctionlist, + shipflagdisputed = EXCLUDED.shipflagdisputed, + shipflagsanctionedcountry = EXCLUDED.shipflagsanctionedcountry, + shiphistoricalflagsanctionedcountry = EXCLUDED.shiphistoricalflagsanctionedcountry, + shipofacnonsdnsanctionlist = EXCLUDED.shipofacnonsdnsanctionlist, + shipofacsanctionlist = EXCLUDED.shipofacsanctionlist, + shipofacadvisorylist = EXCLUDED.shipofacadvisorylist, + shipownerofacssilist = EXCLUDED.shipownerofacssilist, + shipowneraustraliansanctionlist = EXCLUDED.shipowneraustraliansanctionlist, + shipownerbessanctionlist = EXCLUDED.shipownerbessanctionlist, + shipownercanadiansanctionlist = EXCLUDED.shipownercanadiansanctionlist, + shipownereusanctionlist = EXCLUDED.shipownereusanctionlist, + shipownerfatfjurisdiction = EXCLUDED.shipownerfatfjurisdiction, + shipownerhistoricalofacsanctionedcountry = EXCLUDED.shipownerhistoricalofacsanctionedcountry, + shipownerofacsanctionlist = EXCLUDED.shipownerofacsanctionlist, + shipownerofacsanctionedcountry = EXCLUDED.shipownerofacsanctionedcountry, + shipownerparentcompanynoncompliance = EXCLUDED.shipownerparentcompanynoncompliance, + shipownerparentfatfjurisdiction = EXCLUDED.shipownerparentfatfjurisdiction, + shipownerparentofacsanctionedcountry = EXCLUDED.shipownerparentofacsanctionedcountry, + shipownerswisssanctionlist = EXCLUDED.shipownerswisssanctionlist, + shipowneruaesanctionlist = EXCLUDED.shipowneruaesanctionlist, + shipownerunsanctionlist = EXCLUDED.shipownerunsanctionlist, + shipsanctionedcountryportcalllast12m = EXCLUDED.shipsanctionedcountryportcalllast12m, + shipsanctionedcountryportcalllast3m = EXCLUDED.shipsanctionedcountryportcalllast3m, + shipsanctionedcountryportcalllast6m = EXCLUDED.shipsanctionedcountryportcalllast6m, + shipsecuritylegaldisputeevent = EXCLUDED.shipsecuritylegaldisputeevent, + shipstspartnernoncompliancelast12m = EXCLUDED.shipstspartnernoncompliancelast12m, + shipswisssanctionlist = EXCLUDED.shipswisssanctionlist, + shipunsanctionlist = EXCLUDED.shipunsanctionlist; + """; + } + + @Override + protected void setInsertParameters(PreparedStatement ps, ComplianceEntity entity) throws Exception { + + } + + @Override + protected void setUpdateParameters(PreparedStatement ps, ComplianceEntity entity) throws Exception { + int idx = 1; + ps.setString(idx++, entity.getLrimoShipNo()); + ps.setString(idx++, entity.getDateAmended()); + ps.setObject(idx++, entity.getLegalOverall(), Types.INTEGER); + ps.setObject(idx++, entity.getShipBESSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipDarkActivityIndicator(), Types.INTEGER); + ps.setObject(idx++, entity.getShipDetailsNoLongerMaintained(), Types.INTEGER); + ps.setObject(idx++, entity.getShipEUSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipFlagDisputed(), Types.INTEGER); + ps.setObject(idx++, entity.getShipFlagSanctionedCountry(), Types.INTEGER); + ps.setObject(idx++, entity.getShipHistoricalFlagSanctionedCountry(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOFACNonSDNSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOFACSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOFACAdvisoryList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerOFACSSIList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerAustralianSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerBESSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerCanadianSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerEUSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerFATFJurisdiction(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerHistoricalOFACSanctionedCountry(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerOFACSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerOFACSanctionedCountry(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerParentCompanyNonCompliance(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerParentFATFJurisdiction(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerParentOFACSanctionedCountry(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerSwissSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerUAESanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipOwnerUNSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipSanctionedCountryPortCallLast12m(), Types.INTEGER); + ps.setObject(idx++, entity.getShipSanctionedCountryPortCallLast3m(), Types.INTEGER); + ps.setObject(idx++, entity.getShipSanctionedCountryPortCallLast6m(), Types.INTEGER); + ps.setObject(idx++, entity.getShipSecurityLegalDisputeEvent(), Types.INTEGER); + ps.setObject(idx++, entity.getShipSTSPartnerNonComplianceLast12m(), Types.INTEGER); + ps.setObject(idx++, entity.getShipSwissSanctionList(), Types.INTEGER); + ps.setObject(idx++, entity.getShipUNSanctionList(), Types.INTEGER); + } + + @Override + protected String getEntityName() { + return "ComplianceEntity"; + } + + @Override + public void saveComplianceAll(List items) { + if (items == null || items.isEmpty()) { + return; + } + jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(), + (ps, entity) -> { + try { + setUpdateParameters(ps, entity); + } catch (Exception e) { + log.error("배치 수정 파라미터 설정 실패", e); + throw new RuntimeException(e); + } + }); + + log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size()); + } +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/writer/ComplianceDataWriter.java b/src/main/java/com/snp/batch/jobs/sanction/batch/writer/ComplianceDataWriter.java new file mode 100644 index 0000000..89b4c3e --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/writer/ComplianceDataWriter.java @@ -0,0 +1,25 @@ +package com.snp.batch.jobs.sanction.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity; +import com.snp.batch.jobs.sanction.batch.repository.ComplianceRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Slf4j +@Component +public class ComplianceDataWriter extends BaseWriter { + + private final ComplianceRepository complianceRepository; + public ComplianceDataWriter(ComplianceRepository complianceRepository) { + super("complianceRepository"); + this.complianceRepository = complianceRepository; + } + + @Override + protected void writeItems(List items) throws Exception { + complianceRepository.saveComplianceAll(items); + } +} diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImportJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImportJobConfig.java index 651a179..3e625a3 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImportJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImportJobConfig.java @@ -98,7 +98,7 @@ public class ShipDetailImportJobConfig extends BaseJobConfig return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced"; } - @Override - protected String getApiBaseUrl() { - return "https://aisapi.maritime.spglobal.com"; - } - private String getTargetTable(){ - return "test_s_p.test_core20"; + return "snp_data.core20"; } private String GET_CORE_IMO_LIST = "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno"; diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipDetailRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipDetailRepositoryImpl.java index 96c35d1..247c082 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipDetailRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipDetailRepositoryImpl.java @@ -29,7 +29,7 @@ public class ShipDetailRepositoryImpl extends BaseJdbcRepository