risk, compliance잡 range형태로 수정

This commit is contained in:
Kim JiMyeung 2025-12-23 09:42:50 +09:00
부모 75531ab5e5
커밋 1124c2e84a
6개의 변경된 파일442개의 추가작업 그리고 2개의 파일을 삭제

파일 보기

@ -0,0 +1,94 @@
package com.snp.batch.jobs.risk.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
import com.snp.batch.jobs.risk.batch.entity.RiskEntity;
import com.snp.batch.jobs.risk.batch.processor.RiskDataProcessor;
import com.snp.batch.jobs.risk.batch.reader.RiskDataRangeReader;
import com.snp.batch.jobs.risk.batch.reader.RiskDataReader;
import com.snp.batch.jobs.risk.batch.writer.RiskDataWriter;
import com.snp.batch.jobs.sanction.batch.reader.ComplianceDataRangeReader;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class RiskImportRangeJobConfig extends BaseJobConfig<RiskDto, RiskEntity> {
private final WebClient maritimeServiceApiWebClient;
private final RiskDataProcessor riskDataProcessor;
private final RiskDataWriter riskDataWriter;
private final RiskDataRangeReader riskDataRangeReader;
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
public RiskImportRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
RiskDataProcessor riskDataProcessor,
RiskDataWriter riskDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient, RiskDataRangeReader riskDataRangeReader) {
super(jobRepository, transactionManager);
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
this.riskDataProcessor = riskDataProcessor;
this.riskDataWriter = riskDataWriter;
this.riskDataRangeReader = riskDataRangeReader;
}
@Override
protected String getJobName() {
return "RiskRangeImportJob";
}
@Override
protected String getStepName() {
return "RiskRangeImportStep";
}
@Override
protected ItemReader<RiskDto> createReader() {
return riskDataRangeReader;
}
@Bean
@StepScope
public RiskDataRangeReader riskDataRangeReader(
@Value("#{jobParameters['fromDate']}") String startDate,
@Value("#{jobParameters['toDate']}") String stopDate
) {
return new RiskDataRangeReader(maritimeServiceApiWebClient, startDate, stopDate);
}
@Override
protected ItemProcessor<RiskDto, RiskEntity> createProcessor() {
return riskDataProcessor;
}
@Override
protected ItemWriter<RiskEntity> createWriter() { return riskDataWriter; }
@Bean(name = "RiskRangeImportJob")
public Job riskRangeImportJob() {
return job();
}
@Bean(name = "RiskRangeImportStep")
public Step riskRangeImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,120 @@
package com.snp.batch.jobs.risk.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.risk.batch.dto.RiskDto;
import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
@Slf4j
public class RiskDataRangeReader extends BaseApiReader<RiskDto> {
//TODO :
// 1. Core20 IMO_NUMBER 전체 조회
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
private List<RiskDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 100;
private String fromDate;
private String toDate;
public RiskDataRangeReader(WebClient webClient,
@Value("#{jobParameters['fromDate']}") String fromDate,
@Value("#{jobParameters['toDate']}") String toDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (fromDate == null || fromDate.isBlank() ||
toDate == null || toDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.fromDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.toDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.fromDate = fromDate;
this.toDate = toDate;
}
enableChunkMode();
}
@Override
protected String getReaderName() {
return "riskDataRangeReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/RiskAndCompliance/UpdatedRiskList";
}
@Override
protected void beforeFetch(){
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), fromDate, toDate);
}
@Override
protected List<RiskDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), fromDate, toDate);
allData = callApiWithBatch(fromDate, toDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int end = Math.min(currentBatchIndex + batchSize, allData.size());
// 4) 현재 batch 리스트 잘라서 반환
List<RiskDto> batch = allData.subList(currentBatchIndex, end);
int batchNum = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), batchNum, totalBatches, batch.size());
// 다음 batch 인덱스 이동
currentBatchIndex = end;
updateApiCallStats(totalBatches, batchNum);
return batch;
}
private List<RiskDto> callApiWithBatch(String fromDate, String stopDate) {
String url = getApiPath() + "?fromDate=" + fromDate +"&stopDate=" + stopDate;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<RiskDto>>() {})
.block();
}
}

파일 보기

@ -41,7 +41,7 @@ public class RiskRepositoryImpl extends BaseJdbcRepository<RiskEntity, Long> imp
@Override @Override
protected String getUpdateSql() { protected String getUpdateSql() {
return """ return """
INSERT INTO snp_data.risk ( INSERT INTO new_snp.risk (
lrno, lastupdated, riskdatamaintained, dayssincelastseenonais, dayssincelastseenonaisnarrative, lrno, lastupdated, riskdatamaintained, dayssincelastseenonais, dayssincelastseenonaisnarrative,
daysunderais, daysunderaisnarrative, imocorrectonais, imocorrectonaisnarrative, sailingundername, daysunderais, daysunderaisnarrative, imocorrectonais, imocorrectonaisnarrative, sailingundername,
sailingundernamenarrative, anomalousmessagesfrommmsi, anomalousmessagesfrommmsinarrative, sailingundernamenarrative, anomalousmessagesfrommmsi, anomalousmessagesfrommmsinarrative,

파일 보기

@ -0,0 +1,98 @@
package com.snp.batch.jobs.sanction.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto;
import com.snp.batch.jobs.sanction.batch.entity.ComplianceEntity;
import com.snp.batch.jobs.sanction.batch.processor.ComplianceDataProcessor;
import com.snp.batch.jobs.sanction.batch.reader.ComplianceDataRangeReader;
import com.snp.batch.jobs.sanction.batch.reader.ComplianceDataReader;
import com.snp.batch.jobs.sanction.batch.writer.ComplianceDataWriter;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.reader.AnchorageCallsRangeReader;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class SanctionUpdateRangeJobConfig extends BaseJobConfig<ComplianceDto, ComplianceEntity> {
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeServiceApiWebClient;
private final ComplianceDataProcessor complianceDataProcessor;
private final ComplianceDataWriter complianceDataWriter;
private final ComplianceDataRangeReader complianceDataRangeReader;
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
public SanctionUpdateRangeJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ComplianceDataProcessor complianceDataProcessor,
ComplianceDataWriter complianceDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient, ComplianceDataRangeReader complianceDataRangeReader) {
super(jobRepository, transactionManager);
this.jdbcTemplate = jdbcTemplate;
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
this.complianceDataProcessor = complianceDataProcessor;
this.complianceDataWriter = complianceDataWriter;
this.complianceDataRangeReader = complianceDataRangeReader;
}
@Override
protected String getJobName() {
return "SanctionRangeUpdateJob";
}
@Override
protected String getStepName() {
return "SanctionRangeUpdateStep";
}
@Override
protected ItemReader<ComplianceDto> createReader() {
return complianceDataRangeReader;
}
@Bean
@StepScope
public ComplianceDataRangeReader complianceDataRangeReader(
@Value("#{jobParameters['fromDate']}") String startDate,
@Value("#{jobParameters['toDate']}") String stopDate
) {
return new ComplianceDataRangeReader(maritimeServiceApiWebClient, startDate, stopDate);
}
@Override
protected ItemProcessor<ComplianceDto, ComplianceEntity> createProcessor() {
return complianceDataProcessor;
}
@Override
protected ItemWriter<ComplianceEntity> createWriter() {
return complianceDataWriter;
}
@Bean(name = "SanctionRangeUpdateJob")
public Job sanctionRangeUpdateJob() {
return job();
}
@Bean(name = "SanctionRangeUpdateStep")
public Step sanctionRangeUpdateStep() {
return step();
}
}

파일 보기

@ -0,0 +1,128 @@
package com.snp.batch.jobs.sanction.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.sanction.batch.dto.ComplianceDto;
import com.snp.batch.jobs.shipMovementAnchorageCalls.batch.dto.AnchorageCallsDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
@Slf4j
public class ComplianceDataRangeReader extends BaseApiReader<ComplianceDto> {
//TODO :
// 1. Core20 IMO_NUMBER 전체 조회
// 2. IMO번호에 대한 마지막 AIS 신호 요청 (1회 최대 5000개 : Chunk 단위로 반복)
// 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복)
//private final JdbcTemplate jdbcTemplate;
private List<ComplianceDto> allData;
private int currentBatchIndex = 0;
private final int batchSize = 100;
private String fromDate;
private String toDate;
public ComplianceDataRangeReader(WebClient webClient,
@Value("#{jobParameters['fromDate']}") String fromDate,
@Value("#{jobParameters['toDate']}") String toDate) {
super(webClient);
// 날짜가 없으면 전날 하루 기준
if (fromDate == null || fromDate.isBlank() ||
toDate == null || toDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.fromDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.toDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.fromDate = fromDate;
this.toDate = toDate;
}
enableChunkMode();
}
@Override
protected String getReaderName() {
return "ComplianceDataReader";
}
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allData = null;
}
@Override
protected String getApiPath() {
return "/RiskAndCompliance/UpdatedComplianceList";
}
private String getTargetTable(){
return "snp_data.core20";
}
private String GET_CORE_IMO_LIST =
// "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno";
"select imo_number as ihslrorimoshipno from snp_data.ship_data order by imo_number";
@Override
protected void beforeFetch(){
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), fromDate, toDate);
}
@Override
protected List<ComplianceDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), fromDate, toDate);
allData = callApiWithBatch(fromDate, toDate);
if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
return null;
}
log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize);
}
// 2) 이미 끝까지 읽었으면 종료
if (currentBatchIndex >= allData.size()) {
log.info("[{}] 모든 배치 처리 완료", getReaderName());
return null;
}
// 3) 이번 배치의 end 계산
int end = Math.min(currentBatchIndex + batchSize, allData.size());
// 4) 현재 batch 리스트 잘라서 반환
List<ComplianceDto> batch = allData.subList(currentBatchIndex, end);
int batchNum = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) allData.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), batchNum, totalBatches, batch.size());
// 다음 batch 인덱스 이동
currentBatchIndex = end;
updateApiCallStats(totalBatches, batchNum);
return batch;
}
private List<ComplianceDto> callApiWithBatch(String fromDate, String stopDate) {
String url = getApiPath() + "?fromDate=" + fromDate +"&stopDate=" + stopDate;
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<ComplianceDto>>() {})
.block();
}
}

파일 보기

@ -42,7 +42,7 @@ public class ComplianceRepositoryImpl extends BaseJdbcRepository<ComplianceEntit
@Override @Override
protected String getUpdateSql() { protected String getUpdateSql() {
return """ return """
INSERT INTO snp_data.compliance ( INSERT INTO new_snp.compliance (
lrimoshipno, dateamended, legaloverall, shipbessanctionlist, shipdarkactivityindicator, lrimoshipno, dateamended, legaloverall, shipbessanctionlist, shipdarkactivityindicator,
shipdetailsnolongermaintained, shipeusanctionlist, shipflagdisputed, shipflagsanctionedcountry, shipdetailsnolongermaintained, shipeusanctionlist, shipflagdisputed, shipflagsanctionedcountry,
shiphistoricalflagsanctionedcountry, shipofacnonsdnsanctionlist, shipofacsanctionlist, shiphistoricalflagsanctionedcountry, shipofacnonsdnsanctionlist, shipofacsanctionlist,