From 1124c2e84a61bcb559edada151ec1bc0fc643001 Mon Sep 17 00:00:00 2001 From: Kim JiMyeung Date: Tue, 23 Dec 2025 09:42:50 +0900 Subject: [PATCH] =?UTF-8?q?risk,=20compliance=EC=9E=A1=20range=ED=98=95?= =?UTF-8?q?=ED=83=9C=EB=A1=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/RiskImportRangeJobConfig.java | 94 +++++++++++++ .../batch/reader/RiskDataRangeReader.java | 120 ++++++++++++++++ .../batch/repository/RiskRepositoryImpl.java | 2 +- .../config/SanctionUpdateRangeJobConfig.java | 98 ++++++++++++++ .../reader/ComplianceDataRangeReader.java | 128 ++++++++++++++++++ .../repository/ComplianceRepositoryImpl.java | 2 +- 6 files changed, 442 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java create mode 100644 src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java create mode 100644 src/main/java/com/snp/batch/jobs/sanction/batch/config/SanctionUpdateRangeJobConfig.java create mode 100644 src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataRangeReader.java diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java new file mode 100644 index 0000000..1ff38d5 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java @@ -0,0 +1,94 @@ +package com.snp.batch.jobs.risk.batch.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.risk.batch.dto.RiskDto; +import com.snp.batch.jobs.risk.batch.entity.RiskEntity; +import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor; +import com.snp.batch.jobs.risk.batch.reader.RiskDataRangeReader; +import com.snp.batch.jobs.risk.batch.reader.RiskDataReader; +import com.snp.batch.jobs.risk.batch.writer.RiskDataWriter; +import com.snp.batch.jobs.sanction.batch.reader.ComplianceDataRangeReader; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +@Slf4j +@Configuration +public class RiskImportRangeJobConfig extends BaseJobConfig { + private final WebClient maritimeServiceApiWebClient; + private final RiskDataProcessor riskDataProcessor; + private final RiskDataWriter riskDataWriter; + private final RiskDataRangeReader riskDataRangeReader; + + @Override + protected int getChunkSize() { + return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정 + } + public RiskImportRangeJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + RiskDataProcessor riskDataProcessor, + RiskDataWriter riskDataWriter, + JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient, RiskDataRangeReader riskDataRangeReader) { + super(jobRepository, transactionManager); + this.maritimeServiceApiWebClient = maritimeServiceApiWebClient; + this.riskDataProcessor = riskDataProcessor; + this.riskDataWriter = riskDataWriter; + this.riskDataRangeReader = riskDataRangeReader; + } + + @Override + protected String getJobName() { + return "RiskRangeImportJob"; + } + + @Override + protected String getStepName() { + return "RiskRangeImportStep"; + } + + @Override + protected ItemReader createReader() { + return riskDataRangeReader; + } + @Bean + @StepScope + public RiskDataRangeReader riskDataRangeReader( + @Value("#{jobParameters['fromDate']}") String startDate, + @Value("#{jobParameters['toDate']}") String stopDate + ) { + return new RiskDataRangeReader(maritimeServiceApiWebClient, startDate, stopDate); + } + + @Override + protected ItemProcessor createProcessor() { + return riskDataProcessor; + } + + @Override + protected ItemWriter createWriter() { return riskDataWriter; } + + @Bean(name = "RiskRangeImportJob") + public Job riskRangeImportJob() { + return job(); + } + + @Bean(name = "RiskRangeImportStep") + public Step riskRangeImportStep() { + return step(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java new file mode 100644 index 0000000..a29dd09 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataRangeReader.java @@ -0,0 +1,120 @@ +package com.snp.batch.jobs.risk.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.risk.batch.dto.RiskDto; +import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class RiskDataRangeReader extends BaseApiReader { + + //TODO : + // 1. Core20 IMO_NUMBER 전체 조회 + // 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복) + // 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복) + + private List allData; + private int currentBatchIndex = 0; + private final int batchSize = 100; + private String fromDate; + private String toDate; + public RiskDataRangeReader(WebClient webClient, + @Value("#{jobParameters['fromDate']}") String fromDate, + @Value("#{jobParameters['toDate']}") String toDate) { + super(webClient); + + // 날짜가 없으면 전날 하루 기준 + if (fromDate == null || fromDate.isBlank() || + toDate == null || toDate.isBlank()) { + + LocalDate yesterday = LocalDate.now().minusDays(1); + this.fromDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; + this.toDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; + } else { + this.fromDate = fromDate; + this.toDate = toDate; + } + + enableChunkMode(); + } + + @Override + protected String getReaderName() { + return "riskDataRangeReader"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allData = null; + } + + @Override + protected String getApiPath() { + return "/RiskAndCompliance/UpdatedRiskList"; + } + + @Override + protected void beforeFetch(){ + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), fromDate, toDate); + } + + @Override + protected List fetchNextBatch() throws Exception { + // 모든 배치 처리 완료 확인 + if (allData == null) { + log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), fromDate, toDate); + allData = callApiWithBatch(fromDate, toDate); + + if (allData == null || allData.isEmpty()) { + log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); + return null; + } + + log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize); + } + + // 2) 이미 끝까지 읽었으면 종료 + if (currentBatchIndex >= allData.size()) { + log.info("[{}] 모든 배치 처리 완료", getReaderName()); + return null; + } + + // 3) 이번 배치의 end 계산 + int end = Math.min(currentBatchIndex + batchSize, allData.size()); + + // 4) 현재 batch 리스트 잘라서 반환 + List batch = allData.subList(currentBatchIndex, end); + + int batchNum = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allData.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), batchNum, totalBatches, batch.size()); + + // 다음 batch 인덱스 이동 + currentBatchIndex = end; + updateApiCallStats(totalBatches, batchNum); + + return batch; + } + + private List callApiWithBatch(String fromDate, String stopDate) { + String url = getApiPath() + "?fromDate=" + fromDate +"&stopDate=" + stopDate; + log.debug("[{}] API 호출: {}", getReaderName(), url); + return webClient.get() + .uri(url) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() {}) + .block(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java index a5c8695..a6a07ff 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java @@ -41,7 +41,7 @@ public class RiskRepositoryImpl extends BaseJdbcRepository imp @Override protected String getUpdateSql() { return """ - INSERT INTO snp_data.risk ( + INSERT INTO new_snp.risk ( lrno, lastupdated, riskdatamaintained, dayssincelastseenonais, dayssincelastseenonaisnarrative, daysunderais, daysunderaisnarrative, imocorrectonais, imocorrectonaisnarrative, sailingundername, sailingundernamenarrative, anomalousmessagesfrommmsi, anomalousmessagesfrommmsinarrative, diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/config/SanctionUpdateRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/sanction/batch/config/SanctionUpdateRangeJobConfig.java new file mode 100644 index 0000000..4da073f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/config/SanctionUpdateRangeJobConfig.java @@ -0,0 +1,98 @@ +package com.snp.batch.jobs.sanction.batch.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto; +import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity; +import com.snp.batch.jobs.sanction.batch.processor.ComplianceDataProcessor; +import com.snp.batch.jobs.sanction.batch.reader.ComplianceDataRangeReader; +import com.snp.batch.jobs.sanction.batch.reader.ComplianceDataReader; +import com.snp.batch.jobs.sanction.batch.writer.ComplianceDataWriter; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.reader.AnchorageCallsRangeReader; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +@Slf4j +@Configuration +public class SanctionUpdateRangeJobConfig extends BaseJobConfig { + private final JdbcTemplate jdbcTemplate; + private final WebClient maritimeServiceApiWebClient; + private final ComplianceDataProcessor complianceDataProcessor; + private final ComplianceDataWriter complianceDataWriter; + private final ComplianceDataRangeReader complianceDataRangeReader; + + @Override + protected int getChunkSize() { + return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정 + } + public SanctionUpdateRangeJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + ComplianceDataProcessor complianceDataProcessor, + ComplianceDataWriter complianceDataWriter, + JdbcTemplate jdbcTemplate, + @Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient, ComplianceDataRangeReader complianceDataRangeReader) { + super(jobRepository, transactionManager); + this.jdbcTemplate = jdbcTemplate; + this.maritimeServiceApiWebClient = maritimeServiceApiWebClient; + this.complianceDataProcessor = complianceDataProcessor; + this.complianceDataWriter = complianceDataWriter; + this.complianceDataRangeReader = complianceDataRangeReader; + } + + @Override + protected String getJobName() { + return "SanctionRangeUpdateJob"; + } + + @Override + protected String getStepName() { + return "SanctionRangeUpdateStep"; + } + + @Override + protected ItemReader createReader() { + return complianceDataRangeReader; + } + + @Bean + @StepScope + public ComplianceDataRangeReader complianceDataRangeReader( + @Value("#{jobParameters['fromDate']}") String startDate, + @Value("#{jobParameters['toDate']}") String stopDate + ) { + return new ComplianceDataRangeReader(maritimeServiceApiWebClient, startDate, stopDate); + } + @Override + protected ItemProcessor createProcessor() { + return complianceDataProcessor; + } + + @Override + protected ItemWriter createWriter() { + return complianceDataWriter; + } + + @Bean(name = "SanctionRangeUpdateJob") + public Job sanctionRangeUpdateJob() { + return job(); + } + + @Bean(name = "SanctionRangeUpdateStep") + public Step sanctionRangeUpdateStep() { + return step(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataRangeReader.java b/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataRangeReader.java new file mode 100644 index 0000000..60f3677 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataRangeReader.java @@ -0,0 +1,128 @@ +package com.snp.batch.jobs.sanction.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto; +import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.reactive.function.client.WebClient; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class ComplianceDataRangeReader extends BaseApiReader { + + //TODO : + // 1. Core20 IMO_NUMBER 전체 조회 + // 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복) + // 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복) + + //private final JdbcTemplate jdbcTemplate; + + private List allData; + private int currentBatchIndex = 0; + private final int batchSize = 100; + private String fromDate; + private String toDate; + public ComplianceDataRangeReader(WebClient webClient, + @Value("#{jobParameters['fromDate']}") String fromDate, + @Value("#{jobParameters['toDate']}") String toDate) { + super(webClient); + + // 날짜가 없으면 전날 하루 기준 + if (fromDate == null || fromDate.isBlank() || + toDate == null || toDate.isBlank()) { + + LocalDate yesterday = LocalDate.now().minusDays(1); + this.fromDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; + this.toDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; + } else { + this.fromDate = fromDate; + this.toDate = toDate; + } + + enableChunkMode(); + } + + @Override + protected String getReaderName() { + return "ComplianceDataReader"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allData = null; + } + + @Override + protected String getApiPath() { + return "/RiskAndCompliance/UpdatedComplianceList"; + } + + private String getTargetTable(){ + return "snp_data.core20"; + } + private String GET_CORE_IMO_LIST = +// "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno"; + "select imo_number as ihslrorimoshipno from snp_data.ship_data order by imo_number"; + @Override + protected void beforeFetch(){ + log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), fromDate, toDate); + } + + @Override + protected List fetchNextBatch() throws Exception { + // 모든 배치 처리 완료 확인 + if (allData == null) { + log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), fromDate, toDate); + allData = callApiWithBatch(fromDate, toDate); + + if (allData == null || allData.isEmpty()) { + log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); + return null; + } + + log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize); + } + + // 2) 이미 끝까지 읽었으면 종료 + if (currentBatchIndex >= allData.size()) { + log.info("[{}] 모든 배치 처리 완료", getReaderName()); + return null; + } + + // 3) 이번 배치의 end 계산 + int end = Math.min(currentBatchIndex + batchSize, allData.size()); + + // 4) 현재 batch 리스트 잘라서 반환 + List batch = allData.subList(currentBatchIndex, end); + + int batchNum = (currentBatchIndex / batchSize) + 1; + int totalBatches = (int) Math.ceil((double) allData.size() / batchSize); + + log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), batchNum, totalBatches, batch.size()); + + // 다음 batch 인덱스 이동 + currentBatchIndex = end; + updateApiCallStats(totalBatches, batchNum); + + return batch; + } + + private List callApiWithBatch(String fromDate, String stopDate) { + String url = getApiPath() + "?fromDate=" + fromDate +"&stopDate=" + stopDate; + log.debug("[{}] API 호출: {}", getReaderName(), url); + return webClient.get() + .uri(url) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() {}) + .block(); + } + +} diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepositoryImpl.java index db90923..e4ace50 100644 --- a/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/repository/ComplianceRepositoryImpl.java @@ -42,7 +42,7 @@ public class ComplianceRepositoryImpl extends BaseJdbcRepository