💥 API 조회 기간 세팅 방식 변경 및 통일

This commit is contained in:
hyojin kim 2026-01-08 15:12:06 +09:00
부모 87a9217853
커밋 e1fa48768e
29개의 변경된 파일827개의 추가작업 그리고 936개의 파일을 삭제

파일 보기

@ -9,6 +9,8 @@ import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import jakarta.persistence.*; import jakarta.persistence.*;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.OffsetDateTime;
@Entity @Entity
@Getter @Getter
@Setter @Setter
@ -21,7 +23,13 @@ public class BatchLastExecution {
private String apiKey; private String apiKey;
@Column(name = "LAST_SUCCESS_DATE", nullable = false) @Column(name = "LAST_SUCCESS_DATE", nullable = false)
private LocalDate lastSuccessDate; private LocalDateTime lastSuccessDate;
@Column(name = "RANGE_FROM_DATE", nullable = true)
private LocalDateTime rangeFromDate;
@Column(name = "RANGE_TO_DATE", nullable = true)
private LocalDateTime rangeToDate;
@CreatedDate @CreatedDate
@Column(name = "CREATED_AT", updatable = false, nullable = false) @Column(name = "CREATED_AT", updatable = false, nullable = false)
@ -31,7 +39,7 @@ public class BatchLastExecution {
@Column(name = "UPDATED_AT", nullable = false) @Column(name = "UPDATED_AT", nullable = false)
private LocalDateTime updatedAt; private LocalDateTime updatedAt;
public BatchLastExecution(String apiKey, LocalDate lastSuccessDate) { public BatchLastExecution(String apiKey, LocalDateTime lastSuccessDate) {
this.apiKey = apiKey; this.apiKey = apiKey;
this.lastSuccessDate = lastSuccessDate; this.lastSuccessDate = lastSuccessDate;
} }

파일 보기

@ -0,0 +1,9 @@
package com.snp.batch.global.projection;
import java.time.LocalDateTime;
public interface DateRangeProjection {
LocalDateTime getLastSuccessDate();
LocalDateTime getRangeFromDate();
LocalDateTime getRangeToDate();
}

파일 보기

@ -1,6 +1,7 @@
package com.snp.batch.global.repository; package com.snp.batch.global.repository;
import com.snp.batch.global.model.BatchLastExecution; import com.snp.batch.global.model.BatchLastExecution;
import com.snp.batch.global.projection.DateRangeProjection;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
@ -8,6 +9,8 @@ import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.Optional; import java.util.Optional;
@Repository @Repository
@ -21,8 +24,16 @@ public interface BatchLastExecutionRepository extends JpaRepository<BatchLastExe
@Query("SELECT b.lastSuccessDate FROM BatchLastExecution b WHERE b.apiKey = :apiKey") @Query("SELECT b.lastSuccessDate FROM BatchLastExecution b WHERE b.apiKey = :apiKey")
Optional<LocalDate> findLastSuccessDate(@Param("apiKey") String apiKey); Optional<LocalDate> findLastSuccessDate(@Param("apiKey") String apiKey);
// 2. findDateRangeByApiKey 함수 구현
/**
* API 키를 기준으로 범위 설정 날짜를 조회합니다.
* @param apiKey 조회할 API (: "PSC_IMPORT_API")
* @return 마지막 성공 일자 (LocalDate) 포함하는 Optional
*/
@Query("SELECT b.lastSuccessDate AS lastSuccessDate, b.rangeFromDate AS rangeFromDate, b.rangeToDate AS rangeToDate FROM BatchLastExecution b WHERE b.apiKey = :apiKey")
Optional<DateRangeProjection> findDateRangeByApiKey(@Param("apiKey") String apiKey);
// 2. updateLastSuccessDate 함수 구현 (직접 UPDATE 쿼리 사용) // 3. updateLastSuccessDate 함수 구현 (직접 UPDATE 쿼리 사용)
/** /**
* 특정 API 키의 마지막 성공 일자를 업데이트합니다. * 특정 API 키의 마지막 성공 일자를 업데이트합니다.
* *
@ -32,5 +43,5 @@ public interface BatchLastExecutionRepository extends JpaRepository<BatchLastExe
*/ */
@Modifying @Modifying
@Query("UPDATE BatchLastExecution b SET b.lastSuccessDate = :successDate WHERE b.apiKey = :apiKey") @Query("UPDATE BatchLastExecution b SET b.lastSuccessDate = :successDate WHERE b.apiKey = :apiKey")
int updateLastSuccessDate(@Param("apiKey") String apiKey, @Param("successDate") LocalDate successDate); int updateLastSuccessDate(@Param("apiKey") String apiKey, @Param("successDate") LocalDateTime successDate);
} }

파일 보기

@ -26,9 +26,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDateTime; import java.time.*;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Map; import java.util.Map;
@ -146,28 +144,41 @@ public class ComplianceImportRangeJobConfig extends BaseMultiStepJobConfig<Compl
return (contribution, chunkContext) -> { return (contribution, chunkContext) -> {
log.info(">>>>> Compliance History Value Change Manage 프로시저 호출 시작"); log.info(">>>>> Compliance History Value Change Manage 프로시저 호출 시작");
// 1. 입력 포맷 타겟 포맷 정의 // 1. 입력 포맷(UTC 'Z' 포함) 프로시저용 타겟 포맷 정의
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SS'Z'"); DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSX");
DateTimeFormatter targetFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); DateTimeFormatter targetFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
Map<String, String> params = batchDateService.getRiskComplianceApiDateParams(getApiKey()); Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
// 2. String -> LocalDateTime (또는 OffsetDateTime) -> String 변환
String rawFromDate = params.get("fromDate"); String rawFromDate = params.get("fromDate");
String rawToDate = params.get("toDate"); String rawToDate = params.get("toDate");
String startDt = LocalDateTime.parse(rawFromDate, inputFormatter).format(targetFormatter); // 2. UTC 문자열 -> OffsetDateTime -> Asia/Seoul 변환 -> LocalDateTime 추출
String endDt = LocalDateTime.parse(rawToDate, inputFormatter).format(targetFormatter); String startDt = convertToKstString(rawFromDate, inputFormatter, targetFormatter);
String endDt = convertToKstString(rawToDate, inputFormatter, targetFormatter);
log.info("Compliance History Value Change Manage 프로시저 변수 : 시작일: {}, 종료일: {}", startDt, endDt); log.info("Compliance History Value Change Manage 프로시저 변수 (KST 변환): 시작일: {}, 종료일: {}", startDt, endDt);
// 3. 프로시저 호출 (SQL 인젝션 방지를 위해 Prepared Statement 방식 권장) // 3. 프로시저 호출 (안전한 파라미터 바인딩 권장)
jdbcTemplate.execute(String.format("CALL new_snp.compliance_history_value_change_manage('%s', '%s')", startDt, endDt)); jdbcTemplate.update("CALL new_snp.compliance_history_value_change_manage(CAST(? AS TIMESTAMP), CAST(? AS TIMESTAMP))", startDt, endDt);
log.info(">>>>> Compliance History Value Change Manage 프로시저 호출 완료"); log.info(">>>>> Compliance History Value Change Manage 프로시저 호출 완료");
return RepeatStatus.FINISHED; return RepeatStatus.FINISHED;
}; };
} }
/**
* UTC 문자열을 한국 시간(KST) 문자열로 변환하는 헬퍼 메소드
*/
private String convertToKstString(String rawDate, DateTimeFormatter input, DateTimeFormatter target) {
if (rawDate == null) return null;
// 1. 문자열을 OffsetDateTime으로 파싱 (Z를 인식하여 UTC 시간으로 인지함)
return OffsetDateTime.parse(rawDate, input)
// 2. 시간대를 서울(+09:00) 변경 (값이 9시간 더해짐)
.atZoneSameInstant(ZoneId.of("Asia/Seoul"))
// 3. 프로시저 형식에 맞게 포맷팅
.format(target);
}
@Bean(name = "ComplianceHistoryValueChangeManageStep") @Bean(name = "ComplianceHistoryValueChangeManageStep")
public Step complianceHistoryValueChangeManageStep() { public Step complianceHistoryValueChangeManageStep() {
return new StepBuilder("ComplianceHistoryValueChangeManageStep", jobRepository) return new StepBuilder("ComplianceHistoryValueChangeManageStep", jobRepository)

파일 보기

@ -103,7 +103,7 @@ public class ComplianceDataRangeReader extends BaseApiReader<ComplianceDto> {
} }
private List<ComplianceDto> callApiWithBatch() { private List<ComplianceDto> callApiWithBatch() {
Map<String, String> params = batchDateService.getRiskComplianceApiDateParams(getApiKey()); Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
String url = getApiPath(); String url = getApiPath();

파일 보기

@ -62,12 +62,12 @@ public class EventImportJobConfig extends BaseMultiStepJobConfig<EventDetailDto,
@Override @Override
protected String getJobName() { protected String getJobName() {
return "eventImportJob"; return "EventImportJob";
} }
@Override @Override
protected String getStepName() { protected String getStepName() {
return "eventImportStep"; return "EventImportStep";
} }
@Override @Override
@ -91,12 +91,12 @@ public class EventImportJobConfig extends BaseMultiStepJobConfig<EventDetailDto,
@Override @Override
protected ItemWriter<EventDetailEntity> createWriter() { return eventDataWriter; } protected ItemWriter<EventDetailEntity> createWriter() { return eventDataWriter; }
@Bean(name = "eventImportJob") @Bean(name = "EventImportJob")
public Job eventImportJob() { public Job eventImportJob() {
return job(); return job();
} }
@Bean(name = "eventImportStep") @Bean(name = "EventImportStep")
public Step eventImportStep() { public Step eventImportStep() {
return step(); return step();
} }

파일 보기

@ -2,15 +2,11 @@ package com.snp.batch.jobs.event.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.event.batch.dto.*; import com.snp.batch.jobs.event.batch.dto.*;
import com.snp.batch.jobs.event.batch.dto.EventDetailDto;
import com.snp.batch.jobs.event.batch.entity.EventDetailEntity;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailComparisonData;
import com.snp.batch.service.BatchDateService; import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -193,10 +189,15 @@ public class EventDataReader extends BaseApiReader<EventDetailDto> {
} }
private EventResponse callEventApiWithBatch() { private EventResponse callEventApiWithBatch() {
Map<String, String> params = batchDateService.getShipUpdateApiDateParams(getApiKey()); Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(getApiKey());
String url = getApiPath(); String url = getApiPath();
log.info("[{}] API 호출: {}", getReaderName(), url);
log.info("[{}] Events API Date Range: {} → {}",
getReaderName(),
String.format("%s-%s-%s",params.get("fromYear"),params.get("fromMonth"),params.get("fromDay")),
String.format("%s-%s-%s",params.get("toYear"),params.get("toMonth"),params.get("toDay"))
);
return webClient.get() return webClient.get()
.uri(url, uriBuilder -> uriBuilder .uri(url, uriBuilder -> uriBuilder
@ -215,8 +216,6 @@ public class EventDataReader extends BaseApiReader<EventDetailDto> {
private EventDetailResponse callEventDetailApiWithBatch(Long eventId) { private EventDetailResponse callEventDetailApiWithBatch(Long eventId) {
String url = getEventDetailApiPath(); String url = getEventDetailApiPath();
log.info("[{}] API 호출: {}", getReaderName(), url);
return webClient.get() return webClient.get()
.uri(url, uriBuilder -> uriBuilder .uri(url, uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅 // 맵에서 파라미터 값을 동적으로 가져와 세팅

파일 보기

@ -1,64 +1,63 @@
package com.snp.batch.jobs.movement.batch.config; package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto; import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto;
import com.snp.batch.jobs.movement.batch.entity.AnchorageCallsEntity; import com.snp.batch.jobs.movement.batch.entity.AnchorageCallsEntity;
import com.snp.batch.jobs.movement.batch.processor.AnchorageCallsProcessor; import com.snp.batch.jobs.movement.batch.processor.AnchorageCallsProcessor;
import com.snp.batch.jobs.movement.batch.writer.AnchorageCallsWriter;
import com.snp.batch.jobs.movement.batch.reader.AnchorageCallsRangeReader; import com.snp.batch.jobs.movement.batch.reader.AnchorageCallsRangeReader;
import com.snp.batch.jobs.movement.batch.writer.AnchorageCallsWriter;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* AnchorageCallsReader (ship_data Maritime API)
* (AnchorageCallsDto)
* AnchorageCallsProcessor
* (AnchorageCallsEntity)
* AnchorageCallsWriter
* (t_anchoragecall 테이블)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class AnchorageCallsRangeJobConfig extends BaseJobConfig<AnchorageCallsDto, AnchorageCallsEntity> { public class AnchorageCallsRangeJobConfig extends BaseMultiStepJobConfig<AnchorageCallsDto, AnchorageCallsEntity> {
private final AnchorageCallsProcessor anchorageCallsProcessor; private final AnchorageCallsProcessor anchorageCallsProcessor;
private final AnchorageCallsWriter anchorageCallsWriter; private final AnchorageCallsWriter anchorageCallsWriter;
private final AnchorageCallsRangeReader anchorageCallsRangeReader; private final AnchorageCallsRangeReader anchorageCallsRangeReader;
private final WebClient maritimeServiceApiWebClient;
private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService;
protected String getApiKey() {return "ANCHORAGE_CALLS_IMPORT_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public AnchorageCallsRangeJobConfig( public AnchorageCallsRangeJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
PlatformTransactionManager transactionManager, PlatformTransactionManager transactionManager,
AnchorageCallsProcessor anchorageCallsProcessor, AnchorageCallsProcessor anchorageCallsProcessor,
AnchorageCallsWriter anchorageCallsWriter, AnchorageCallsWriter anchorageCallsWriter,
AnchorageCallsRangeReader anchorageCallsRangeReader AnchorageCallsRangeReader anchorageCallsRangeReader,
) { // ObjectMapper 주입 추가 @Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient,
JdbcTemplate jdbcTemplate,
BatchDateService batchDateService
) {
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.anchorageCallsProcessor = anchorageCallsProcessor; this.anchorageCallsProcessor = anchorageCallsProcessor;
this.anchorageCallsWriter = anchorageCallsWriter; this.anchorageCallsWriter = anchorageCallsWriter;
this.anchorageCallsRangeReader = anchorageCallsRangeReader; this.anchorageCallsRangeReader = anchorageCallsRangeReader;
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService;
} }
@Override @Override
@ -71,6 +70,14 @@ public class AnchorageCallsRangeJobConfig extends BaseJobConfig<AnchorageCallsDt
return "AnchorageCallsRangeImportStep"; return "AnchorageCallsRangeImportStep";
} }
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(anchorageCallsRangeImportStep()) // 1단계: API 데이터 적재
.next(anchorageCallsLastExecutionUpdateStep()) // 2단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
}
@Override @Override
protected ItemReader<AnchorageCallsDto> createReader() { // 타입 변경 protected ItemReader<AnchorageCallsDto> createReader() { // 타입 변경
return anchorageCallsRangeReader; return anchorageCallsRangeReader;
@ -78,12 +85,8 @@ public class AnchorageCallsRangeJobConfig extends BaseJobConfig<AnchorageCallsDt
@Bean @Bean
@StepScope @StepScope
public AnchorageCallsRangeReader anchorageCallsReader( public AnchorageCallsRangeReader anchorageCallsReader() {
@Qualifier("maritimeServiceApiWebClient") WebClient webClient, return new AnchorageCallsRangeReader(maritimeServiceApiWebClient, batchDateService);
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
return new AnchorageCallsRangeReader(webClient, startDate, stopDate);
} }
@Override @Override
@ -98,7 +101,7 @@ public class AnchorageCallsRangeJobConfig extends BaseJobConfig<AnchorageCallsDt
@Override @Override
protected int getChunkSize() { protected int getChunkSize() {
return 5000; // API에서 100개씩 가져오므로 chunk도 100으로 설정 return 5000;
} }
@Bean(name = "AnchorageCallsRangeImportJob") @Bean(name = "AnchorageCallsRangeImportJob")
@ -110,4 +113,25 @@ public class AnchorageCallsRangeJobConfig extends BaseJobConfig<AnchorageCallsDt
public Step anchorageCallsRangeImportStep() { public Step anchorageCallsRangeImportStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet anchorageCallsLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "AnchorageCallsLastExecutionUpdateStep")
public Step anchorageCallsLastExecutionUpdateStep() {
return new StepBuilder("AnchorageCallsLastExecutionUpdateStep", jobRepository)
.tasklet(anchorageCallsLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -1,72 +1,62 @@
package com.snp.batch.jobs.movement.batch.config; package com.snp.batch.jobs.movement.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.dto.BerthCallsDto; import com.snp.batch.jobs.movement.batch.dto.BerthCallsDto;
import com.snp.batch.jobs.movement.batch.entiity.BerthCallsEntity; import com.snp.batch.jobs.movement.batch.entiity.BerthCallsEntity;
import com.snp.batch.jobs.movement.batch.processor.BerthCallsProcessor; import com.snp.batch.jobs.movement.batch.processor.BerthCallsProcessor;
import com.snp.batch.jobs.movement.batch.reader.BerthCallsRangeReader; import com.snp.batch.jobs.movement.batch.reader.BerthCallsRangeReader;
import com.snp.batch.jobs.movement.batch.writer.BerthCallsWriter; import com.snp.batch.jobs.movement.batch.writer.BerthCallsWriter;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* ShipMovementReader (ship_data Maritime API)
* (PortCallDto)
* ShipMovementProcessor
* (ShipMovementEntity)
* ShipDetailDataWriter
* (ship_movement 테이블)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class BerthCallsRangJobConfig extends BaseJobConfig<BerthCallsDto, BerthCallsEntity> { public class BerthCallsRangJobConfig extends BaseMultiStepJobConfig<BerthCallsDto, BerthCallsEntity> {
private final BerthCallsProcessor berthCallsProcessor; private final BerthCallsProcessor berthCallsProcessor;
private final BerthCallsWriter berthCallsWriter; private final BerthCallsWriter berthCallsWriter;
private final BerthCallsRangeReader berthCallsRangeReader; private final BerthCallsRangeReader berthCallsRangeReader;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
private final ObjectMapper objectMapper; // ObjectMapper 주입 추가 private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService;
protected String getApiKey() {return "BERTH_CALLS_IMPORT_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public BerthCallsRangJobConfig( public BerthCallsRangJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
PlatformTransactionManager transactionManager, PlatformTransactionManager transactionManager,
BerthCallsProcessor berthCallsProcessor, BerthCallsProcessor berthCallsProcessor,
BerthCallsWriter berthCallsWriter, BerthCallsRangeReader berthCallsRangeReader, JdbcTemplate jdbcTemplate, BerthCallsWriter berthCallsWriter,
BerthCallsRangeReader berthCallsRangeReader,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper) { // ObjectMapper 주입 추가 JdbcTemplate jdbcTemplate,
BatchDateService batchDateService
) {
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.berthCallsProcessor = berthCallsProcessor; this.berthCallsProcessor = berthCallsProcessor;
this.berthCallsWriter = berthCallsWriter; this.berthCallsWriter = berthCallsWriter;
this.berthCallsRangeReader = berthCallsRangeReader; this.berthCallsRangeReader = berthCallsRangeReader;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient; this.maritimeApiWebClient = maritimeApiWebClient;
this.objectMapper = objectMapper; // ObjectMapper 초기화 this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService;
} }
@Override @Override
@ -79,17 +69,22 @@ public class BerthCallsRangJobConfig extends BaseJobConfig<BerthCallsDto, BerthC
return "BerthCallsRangeImportStep"; return "BerthCallsRangeImportStep";
} }
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(berthCallsRangeImportStep()) // 1단계: API 데이터 적재
.next(berthCallsLastExecutionUpdateStep()) // 2단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
}
@Override @Override
protected ItemReader<BerthCallsDto> createReader() { // 타입 변경 protected ItemReader<BerthCallsDto> createReader() { // 타입 변경
return berthCallsRangeReader; return berthCallsRangeReader;
} }
@Bean @Bean
@StepScope @StepScope
public BerthCallsRangeReader berthCallsRangeReader( public BerthCallsRangeReader berthCallsRangeReader() {
@Value("#{jobParameters['startDate']}") String startDate, return new BerthCallsRangeReader(maritimeApiWebClient, batchDateService);
@Value("#{jobParameters['stopDate']}") String stopDate
) {
return new BerthCallsRangeReader(maritimeApiWebClient, startDate, stopDate);
} }
@Override @Override
protected ItemProcessor<BerthCallsDto, BerthCallsEntity> createProcessor() { protected ItemProcessor<BerthCallsDto, BerthCallsEntity> createProcessor() {
@ -103,7 +98,7 @@ public class BerthCallsRangJobConfig extends BaseJobConfig<BerthCallsDto, BerthC
@Override @Override
protected int getChunkSize() { protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정 return 5000;
} }
@Bean(name = "BerthCallsRangeImportJob") @Bean(name = "BerthCallsRangeImportJob")
@ -115,4 +110,24 @@ public class BerthCallsRangJobConfig extends BaseJobConfig<BerthCallsDto, BerthC
public Step berthCallsRangeImportStep() { public Step berthCallsRangeImportStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet berthCallsLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "BerthCallsLastExecutionUpdateStep")
public Step berthCallsLastExecutionUpdateStep() {
return new StepBuilder("BerthCallsLastExecutionUpdateStep", jobRepository)
.tasklet(berthCallsLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -1,68 +1,62 @@
package com.snp.batch.jobs.movement.batch.config; package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.movement.batch.reader.CurrentlyAtRangeReader;
import com.snp.batch.jobs.movement.batch.dto.CurrentlyAtDto; import com.snp.batch.jobs.movement.batch.dto.CurrentlyAtDto;
import com.snp.batch.jobs.movement.batch.entity.CurrentlyAtEntity; import com.snp.batch.jobs.movement.batch.entity.CurrentlyAtEntity;
import com.snp.batch.jobs.movement.batch.processor.CurrentlyAtProcessor; import com.snp.batch.jobs.movement.batch.processor.CurrentlyAtProcessor;
import com.snp.batch.jobs.movement.batch.reader.CurrentlyAtRangeReader;
import com.snp.batch.jobs.movement.batch.writer.CurrentlyAtWriter; import com.snp.batch.jobs.movement.batch.writer.CurrentlyAtWriter;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* CurrentlyAtReader (ship_data Maritime API)
* (CurrentlyAtDto)
* CurrentlyAtProcessor
* (CurrentlyAtEntity)
* CurrentlyAtWriter
* (currentlyat 테이블)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class CurrentlyAtRangeJobConfig extends BaseJobConfig<CurrentlyAtDto, CurrentlyAtEntity> { public class CurrentlyAtRangeJobConfig extends BaseMultiStepJobConfig<CurrentlyAtDto, CurrentlyAtEntity> {
private final CurrentlyAtProcessor currentlyAtProcessor; private final CurrentlyAtProcessor currentlyAtProcessor;
private final CurrentlyAtWriter currentlyAtWriter; private final CurrentlyAtWriter currentlyAtWriter;
private final CurrentlyAtRangeReader currentlyAtRangeReader; private final CurrentlyAtRangeReader currentlyAtRangeReader;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService;
protected String getApiKey() {return "CURRENTLY_AT_IMPORT_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public CurrentlyAtRangeJobConfig( public CurrentlyAtRangeJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
PlatformTransactionManager transactionManager, PlatformTransactionManager transactionManager,
CurrentlyAtProcessor currentlyAtProcessor, CurrentlyAtProcessor currentlyAtProcessor,
CurrentlyAtWriter currentlyAtWriter, CurrentlyAtRangeReader currentlyAtRangeReader, JdbcTemplate jdbcTemplate, CurrentlyAtWriter currentlyAtWriter,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 CurrentlyAtRangeReader currentlyAtRangeReader,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
JdbcTemplate jdbcTemplate,
BatchDateService batchDateService
) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.currentlyAtProcessor = currentlyAtProcessor; this.currentlyAtProcessor = currentlyAtProcessor;
this.currentlyAtWriter = currentlyAtWriter; this.currentlyAtWriter = currentlyAtWriter;
this.currentlyAtRangeReader = currentlyAtRangeReader; this.currentlyAtRangeReader = currentlyAtRangeReader;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient; this.maritimeApiWebClient = maritimeApiWebClient;
this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService;
} }
@Override @Override
@ -72,7 +66,15 @@ public class CurrentlyAtRangeJobConfig extends BaseJobConfig<CurrentlyAtDto, Cur
@Override @Override
protected String getStepName() { protected String getStepName() {
return "currentlyAtRangeImportStep"; return "CurrentlyAtRangeImportStep";
}
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(currentlyAtRangeImportStep()) // 1단계: API 데이터 적재
.next(currentlyAtLastExecutionUpdateStep()) // 2단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
} }
@Override @Override
@ -81,12 +83,8 @@ public class CurrentlyAtRangeJobConfig extends BaseJobConfig<CurrentlyAtDto, Cur
} }
@Bean @Bean
@StepScope @StepScope
public CurrentlyAtRangeReader currentlyAtReader( public CurrentlyAtRangeReader currentlyAtReader() {
@Value("#{jobParameters['startDate']}") String startDate, return new CurrentlyAtRangeReader(maritimeApiWebClient, batchDateService);
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new CurrentlyAtRangeReader(maritimeApiWebClient, startDate, stopDate);
} }
@Override @Override
protected ItemProcessor<CurrentlyAtDto, CurrentlyAtEntity> createProcessor() { protected ItemProcessor<CurrentlyAtDto, CurrentlyAtEntity> createProcessor() {
@ -112,4 +110,24 @@ public class CurrentlyAtRangeJobConfig extends BaseJobConfig<CurrentlyAtDto, Cur
public Step currentlyAtRangeImportStep() { public Step currentlyAtRangeImportStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet currentlyAtLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "CurrentlyAtLastExecutionUpdateStep")
public Step currentlyAtLastExecutionUpdateStep() {
return new StepBuilder("CurrentlyAtLastExecutionUpdateStep", jobRepository)
.tasklet(currentlyAtLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -1,65 +1,63 @@
package com.snp.batch.jobs.movement.batch.config; package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.movement.batch.dto.DestinationDto; import com.snp.batch.jobs.movement.batch.dto.DestinationDto;
import com.snp.batch.jobs.movement.batch.entity.DestinationEntity; import com.snp.batch.jobs.movement.batch.entity.DestinationEntity;
import com.snp.batch.jobs.movement.batch.processor.DestinationProcessor; import com.snp.batch.jobs.movement.batch.processor.DestinationProcessor;
import com.snp.batch.jobs.movement.batch.writer.DestinationWriter;
import com.snp.batch.jobs.movement.batch.reader.DestinationRangeReader; import com.snp.batch.jobs.movement.batch.reader.DestinationRangeReader;
import com.snp.batch.jobs.movement.batch.writer.DestinationWriter;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* DestinationReader (ship_data Maritime API)
* (DestinationDto)
* DestinationProcessor
* (DestinationEntity)
* DestinationProcessor
* (t_destination 테이블)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class DestinationsRangeJobConfig extends BaseJobConfig<DestinationDto, DestinationEntity> { public class DestinationsRangeJobConfig extends BaseMultiStepJobConfig<DestinationDto, DestinationEntity> {
private final com.snp.batch.jobs.movement.batch.processor.DestinationProcessor DestinationProcessor; private final DestinationProcessor DestinationProcessor;
private final com.snp.batch.jobs.movement.batch.writer.DestinationWriter DestinationWriter; private final DestinationWriter DestinationWriter;
private final DestinationRangeReader destinationRangeReader; private final DestinationRangeReader destinationRangeReader;
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService;
protected String getApiKey() {return "DESTINATIONS_IMPORT_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public DestinationsRangeJobConfig( public DestinationsRangeJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
PlatformTransactionManager transactionManager, PlatformTransactionManager transactionManager,
DestinationProcessor DestinationProcessor, DestinationProcessor DestinationProcessor,
DestinationWriter DestinationWriter, DestinationRangeReader destinationRangeReader, DestinationWriter DestinationWriter,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 DestinationRangeReader destinationRangeReader,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
JdbcTemplate jdbcTemplate,
BatchDateService batchDateService
) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.DestinationProcessor = DestinationProcessor; this.DestinationProcessor = DestinationProcessor;
this.DestinationWriter = DestinationWriter; this.DestinationWriter = DestinationWriter;
this.destinationRangeReader = destinationRangeReader; this.destinationRangeReader = destinationRangeReader;
this.maritimeApiWebClient = maritimeApiWebClient; this.maritimeApiWebClient = maritimeApiWebClient;
this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService;
} }
@Override @Override
@ -72,18 +70,22 @@ public class DestinationsRangeJobConfig extends BaseJobConfig<DestinationDto, De
return "DestinationsRangeImportStep"; return "DestinationsRangeImportStep";
} }
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(destinationsRangeImportStep()) // 1단계: API 데이터 적재
.next(destinationsLastExecutionUpdateStep()) // 2단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
}
@Override @Override
protected ItemReader<DestinationDto> createReader() { // 타입 변경 protected ItemReader<DestinationDto> createReader() { // 타입 변경
return destinationRangeReader; return destinationRangeReader;
} }
@Bean @Bean
@StepScope @StepScope
public DestinationRangeReader destinationRangeReader( public DestinationRangeReader destinationRangeReader() {
@Value("#{jobParameters['startDate']}") String startDate, return new DestinationRangeReader(maritimeApiWebClient, batchDateService);
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new DestinationRangeReader(maritimeApiWebClient, startDate, stopDate);
} }
@Override @Override
protected ItemProcessor<DestinationDto, DestinationEntity> createProcessor() { protected ItemProcessor<DestinationDto, DestinationEntity> createProcessor() {
@ -109,4 +111,24 @@ public class DestinationsRangeJobConfig extends BaseJobConfig<DestinationDto, De
public Step destinationsRangeImportStep() { public Step destinationsRangeImportStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet destinationsLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "DestinationsLastExecutionUpdateStep")
public Step destinationsLastExecutionUpdateStep() {
return new StepBuilder("DestinationsLastExecutionUpdateStep", jobRepository)
.tasklet(destinationsLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -1,65 +1,62 @@
package com.snp.batch.jobs.movement.batch.config; package com.snp.batch.jobs.movement.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.movement.batch.dto.PortCallsDto; import com.snp.batch.jobs.movement.batch.dto.PortCallsDto;
import com.snp.batch.jobs.movement.batch.entity.PortCallsEntity; import com.snp.batch.jobs.movement.batch.entity.PortCallsEntity;
import com.snp.batch.jobs.movement.batch.processor.PortCallsProcessor; import com.snp.batch.jobs.movement.batch.processor.PortCallsProcessor;
import com.snp.batch.jobs.movement.batch.reader.PortCallsRangeReader; import com.snp.batch.jobs.movement.batch.reader.PortCallsRangeReader;
import com.snp.batch.jobs.movement.batch.writer.PortCallsWriter; import com.snp.batch.jobs.movement.batch.writer.PortCallsWriter;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* PortCallsReader (ship_data Maritime API)
* (PortCallDto)
* PortCallsProcessor
* (PortCallsEntity)
* ShipDetailDataWriter
* (ship_movement 테이블)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class ShipPortCallsRangeJobConfig extends BaseJobConfig<PortCallsDto, PortCallsEntity> { public class ShipPortCallsRangeJobConfig extends BaseMultiStepJobConfig<PortCallsDto, PortCallsEntity> {
private final PortCallsProcessor portCallsProcessor; private final PortCallsProcessor portCallsProcessor;
private final PortCallsWriter portCallsWriter; private final PortCallsWriter portCallsWriter;
private final PortCallsRangeReader portCallsRangeReader; private final PortCallsRangeReader portCallsRangeReader;
private final WebClient maritimeApiWebClient;
private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService;
protected String getApiKey() {return "PORT_CALLS_IMPORT_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public ShipPortCallsRangeJobConfig( public ShipPortCallsRangeJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
PlatformTransactionManager transactionManager, PlatformTransactionManager transactionManager,
PortCallsProcessor portCallsProcessor, PortCallsProcessor portCallsProcessor,
PortCallsWriter portCallsWriter, JdbcTemplate jdbcTemplate, PortCallsWriter portCallsWriter,
PortCallsRangeReader portCallsRangeReader,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
ObjectMapper objectMapper, PortCallsRangeReader portCallsRangeReader) { // ObjectMapper 주입 추가 JdbcTemplate jdbcTemplate,
BatchDateService batchDateService
) {
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.portCallsProcessor = portCallsProcessor; this.portCallsProcessor = portCallsProcessor;
this.portCallsWriter = portCallsWriter; this.portCallsWriter = portCallsWriter;
this.portCallsRangeReader = portCallsRangeReader; this.portCallsRangeReader = portCallsRangeReader;
this.maritimeApiWebClient = maritimeApiWebClient;
this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService;
} }
@Override @Override
@ -74,13 +71,18 @@ public class ShipPortCallsRangeJobConfig extends BaseJobConfig<PortCallsDto, Por
@Bean @Bean
@StepScope @StepScope
public PortCallsRangeReader portCallsRangeReader( public PortCallsRangeReader portCallsRangeReader() {
@Qualifier("maritimeServiceApiWebClient") WebClient webClient, return new PortCallsRangeReader(maritimeApiWebClient, batchDateService);
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
return new PortCallsRangeReader(webClient, startDate, stopDate);
} }
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(portCallsRangeImportStep()) // 1단계: API 데이터 적재
.next(portCallsLastExecutionUpdateStep()) // 2단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
}
@Override @Override
protected ItemReader<PortCallsDto> createReader() { // 타입 변경 protected ItemReader<PortCallsDto> createReader() { // 타입 변경
return portCallsRangeReader; return portCallsRangeReader;
@ -110,4 +112,24 @@ public class ShipPortCallsRangeJobConfig extends BaseJobConfig<PortCallsDto, Por
public Step portCallsRangeImportStep() { public Step portCallsRangeImportStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet portCallsLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "PortCallsLastExecutionUpdateStep")
public Step portCallsLastExecutionUpdateStep() {
return new StepBuilder("PortCallsLastExecutionUpdateStep", jobRepository)
.tasklet(portCallsLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -1,19 +1,25 @@
package com.snp.batch.jobs.movement.batch.config; package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.movement.batch.dto.StsOperationDto; import com.snp.batch.jobs.movement.batch.dto.StsOperationDto;
import com.snp.batch.jobs.movement.batch.entity.StsOperationEntity; import com.snp.batch.jobs.movement.batch.entity.StsOperationEntity;
import com.snp.batch.jobs.movement.batch.processor.StsOperationProcessor; import com.snp.batch.jobs.movement.batch.processor.StsOperationProcessor;
import com.snp.batch.jobs.movement.batch.reader.StsOperationRangeReader; import com.snp.batch.jobs.movement.batch.reader.StsOperationRangeReader;
import com.snp.batch.jobs.movement.batch.writer.StsOperationWriter; import com.snp.batch.jobs.movement.batch.writer.StsOperationWriter;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -22,47 +28,38 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* StsOperationReader (ship_data Maritime API)
* (StsOperationDto)
* StsOperationProcessor
* (StsOperationEntity)
* StsOperationWriter
* (t_stsoperation 테이블)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class StsOperationRangeJobConfig extends BaseJobConfig<StsOperationDto, StsOperationEntity> { public class StsOperationRangeJobConfig extends BaseMultiStepJobConfig<StsOperationDto, StsOperationEntity> {
private final StsOperationProcessor stsOperationProcessor; private final StsOperationProcessor stsOperationProcessor;
private final StsOperationWriter stsOperationWriter; private final StsOperationWriter stsOperationWriter;
private final StsOperationRangeReader stsOperationRangeReader; private final StsOperationRangeReader stsOperationRangeReader;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService;
protected String getApiKey() {return "STS_OPERATION_IMPORT_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public StsOperationRangeJobConfig( public StsOperationRangeJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
PlatformTransactionManager transactionManager, PlatformTransactionManager transactionManager,
StsOperationProcessor stsOperationProcessor, StsOperationProcessor stsOperationProcessor,
StsOperationWriter stsOperationWriter, StsOperationRangeReader stsOperationRangeReader, JdbcTemplate jdbcTemplate, StsOperationWriter stsOperationWriter,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 StsOperationRangeReader stsOperationRangeReader,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
JdbcTemplate jdbcTemplate,
BatchDateService batchDateService
) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.stsOperationProcessor = stsOperationProcessor; this.stsOperationProcessor = stsOperationProcessor;
this.stsOperationWriter = stsOperationWriter; this.stsOperationWriter = stsOperationWriter;
this.stsOperationRangeReader = stsOperationRangeReader; this.stsOperationRangeReader = stsOperationRangeReader;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient; this.maritimeApiWebClient = maritimeApiWebClient;
this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService;
} }
@Override @Override
@ -75,6 +72,14 @@ public class StsOperationRangeJobConfig extends BaseJobConfig<StsOperationDto, S
return "STSOperationRangeImportStep"; return "STSOperationRangeImportStep";
} }
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(STSOperationRangeImportStep()) // 1단계: API 데이터 적재
.next(stsOperationLastExecutionUpdateStep()) // 2단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
}
@Override @Override
protected ItemReader<StsOperationDto> createReader() { // 타입 변경 protected ItemReader<StsOperationDto> createReader() { // 타입 변경
// Reader 생성자 수정: ObjectMapper를 전달합니다. // Reader 생성자 수정: ObjectMapper를 전달합니다.
@ -82,12 +87,9 @@ public class StsOperationRangeJobConfig extends BaseJobConfig<StsOperationDto, S
} }
@Bean @Bean
@StepScope @StepScope
public StsOperationRangeReader stsOperationRangeReader( public StsOperationRangeReader stsOperationRangeReader() {
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리 // jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new StsOperationRangeReader(maritimeApiWebClient, startDate, stopDate); return new StsOperationRangeReader(maritimeApiWebClient, batchDateService);
} }
@Override @Override
protected ItemProcessor<StsOperationDto, StsOperationEntity> createProcessor() { protected ItemProcessor<StsOperationDto, StsOperationEntity> createProcessor() {
@ -113,4 +115,24 @@ public class StsOperationRangeJobConfig extends BaseJobConfig<StsOperationDto, S
public Step STSOperationRangeImportStep() { public Step STSOperationRangeImportStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet stsOperationLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "StsOperationLastExecutionUpdateStep")
public Step stsOperationLastExecutionUpdateStep() {
return new StepBuilder("StsOperationLastExecutionUpdateStep", jobRepository)
.tasklet(stsOperationLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -1,68 +1,63 @@
package com.snp.batch.jobs.movement.batch.config; package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto;
import com.snp.batch.jobs.movement.batch.entity.TerminalCallsEntity; import com.snp.batch.jobs.movement.batch.entity.TerminalCallsEntity;
import com.snp.batch.jobs.movement.batch.processor.TerminalCallsProcessor; import com.snp.batch.jobs.movement.batch.processor.TerminalCallsProcessor;
import com.snp.batch.jobs.movement.batch.writer.TerminalCallsWriter;
import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto;
import com.snp.batch.jobs.movement.batch.reader.TerminalCallsRangeReader; import com.snp.batch.jobs.movement.batch.reader.TerminalCallsRangeReader;
import com.snp.batch.jobs.movement.batch.writer.TerminalCallsWriter;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* TerminalCallsReader (ship_data Maritime API)
* (TerminalCallsDto)
* TerminalCallsProcessor
* (TerminalCallsEntity)
* TerminalCallsWriter
* (t_terminalcall 테이블)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class TerminalCallsRangeJobConfig extends BaseJobConfig<TerminalCallsDto, TerminalCallsEntity> { public class TerminalCallsRangeJobConfig extends BaseMultiStepJobConfig<TerminalCallsDto, TerminalCallsEntity> {
private final TerminalCallsProcessor terminalCallsProcessor; private final TerminalCallsProcessor terminalCallsProcessor;
private final TerminalCallsWriter terminalCallsWriter; private final TerminalCallsWriter terminalCallsWriter;
private final TerminalCallsRangeReader terminalCallsRangeReader; private final TerminalCallsRangeReader terminalCallsRangeReader;
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService;
protected String getApiKey() {return "TERMINAL_CALLS_IMPORT_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public TerminalCallsRangeJobConfig( public TerminalCallsRangeJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
PlatformTransactionManager transactionManager, PlatformTransactionManager transactionManager,
TerminalCallsProcessor terminalCallsProcessor, TerminalCallsProcessor terminalCallsProcessor,
TerminalCallsWriter terminalCallsWriter, TerminalCallsRangeReader terminalCallsRangeReader, JdbcTemplate jdbcTemplate, TerminalCallsWriter terminalCallsWriter,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 TerminalCallsRangeReader terminalCallsRangeReader,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
JdbcTemplate jdbcTemplate,
BatchDateService batchDateService
) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.terminalCallsProcessor = terminalCallsProcessor; this.terminalCallsProcessor = terminalCallsProcessor;
this.terminalCallsWriter = terminalCallsWriter; this.terminalCallsWriter = terminalCallsWriter;
this.terminalCallsRangeReader = terminalCallsRangeReader; this.terminalCallsRangeReader = terminalCallsRangeReader;
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient; this.maritimeApiWebClient = maritimeApiWebClient;
this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService;
} }
@Override @Override
@ -75,18 +70,23 @@ public class TerminalCallsRangeJobConfig extends BaseJobConfig<TerminalCallsDto,
return "TerminalCallsRangeImportStep"; return "TerminalCallsRangeImportStep";
} }
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(terminalCallsRangeImportStep()) // 1단계: API 데이터 적재
.next(terminalCallsLastExecutionUpdateStep()) // 2단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
}
@Override @Override
protected ItemReader<TerminalCallsDto> createReader() { // 타입 변경 protected ItemReader<TerminalCallsDto> createReader() { // 타입 변경
return terminalCallsRangeReader; return terminalCallsRangeReader;
} }
@Bean @Bean
@StepScope @StepScope
public TerminalCallsRangeReader terminalCallsRangeReader( public TerminalCallsRangeReader terminalCallsRangeReader() {
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리 // jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new TerminalCallsRangeReader(maritimeApiWebClient, startDate, stopDate); return new TerminalCallsRangeReader(maritimeApiWebClient, batchDateService);
} }
@Override @Override
protected ItemProcessor<TerminalCallsDto, TerminalCallsEntity> createProcessor() { protected ItemProcessor<TerminalCallsDto, TerminalCallsEntity> createProcessor() {
@ -112,4 +112,24 @@ public class TerminalCallsRangeJobConfig extends BaseJobConfig<TerminalCallsDto,
public Step terminalCallsRangeImportStep() { public Step terminalCallsRangeImportStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet terminalCallsLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "TerminalCallsLastExecutionUpdateStep")
public Step terminalCallsLastExecutionUpdateStep() {
return new StepBuilder("TerminalCallsLastExecutionUpdateStep", jobRepository)
.tasklet(terminalCallsLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -1,65 +1,62 @@
package com.snp.batch.jobs.movement.batch.config; package com.snp.batch.jobs.movement.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.movement.batch.reader.TransitsRangeReader;
import com.snp.batch.jobs.movement.batch.dto.TransitsDto; import com.snp.batch.jobs.movement.batch.dto.TransitsDto;
import com.snp.batch.jobs.movement.batch.entity.TransitsEntity; import com.snp.batch.jobs.movement.batch.entity.TransitsEntity;
import com.snp.batch.jobs.movement.batch.processor.TransitsProcessor; import com.snp.batch.jobs.movement.batch.processor.TransitsProcessor;
import com.snp.batch.jobs.movement.batch.reader.TransitsRangeReader;
import com.snp.batch.jobs.movement.batch.writer.TransitsWriter; import com.snp.batch.jobs.movement.batch.writer.TransitsWriter;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* TransitsReader (ship_data Maritime API)
* (TransitsDto)
* TransitsProcessor
* (TransitsEntity)
* TransitsWriter
* (t_transit 테이블)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class TransitsRangeJobConfig extends BaseJobConfig<TransitsDto, TransitsEntity> { public class TransitsRangeJobConfig extends BaseMultiStepJobConfig<TransitsDto, TransitsEntity> {
private final TransitsProcessor transitsProcessor; private final TransitsProcessor transitsProcessor;
private final TransitsWriter transitsWriter; private final TransitsWriter transitsWriter;
private final TransitsRangeReader transitsRangeReader; private final TransitsRangeReader transitsRangeReader;
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService;
protected String getApiKey() {return "TRANSITS_IMPORT_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public TransitsRangeJobConfig( public TransitsRangeJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
PlatformTransactionManager transactionManager, PlatformTransactionManager transactionManager,
TransitsProcessor TransitsProcessor, TransitsProcessor TransitsProcessor,
TransitsWriter transitsWriter, TransitsRangeReader transitsRangeReader, TransitsWriter transitsWriter,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가 TransitsRangeReader transitsRangeReader,
@Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient,
JdbcTemplate jdbcTemplate,
BatchDateService batchDateService
) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.transitsProcessor = TransitsProcessor; this.transitsProcessor = TransitsProcessor;
this.transitsWriter = transitsWriter; this.transitsWriter = transitsWriter;
this.transitsRangeReader = transitsRangeReader; this.transitsRangeReader = transitsRangeReader;
this.maritimeApiWebClient = maritimeApiWebClient; this.maritimeApiWebClient = maritimeApiWebClient;
this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService;
} }
@Override @Override
@ -72,18 +69,22 @@ public class TransitsRangeJobConfig extends BaseJobConfig<TransitsDto, TransitsE
return "TransitsRangeImportStep"; return "TransitsRangeImportStep";
} }
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(transitsRangeImportStep()) // 1단계: API 데이터 적재
.next(transitsLastExecutionUpdateStep()) // 2단계: 모두 완료 , BATCH_LAST_EXECUTION 마지막 성공일자 업데이트
.build();
}
@Override @Override
protected ItemReader<TransitsDto> createReader() { // 타입 변경 protected ItemReader<TransitsDto> createReader() { // 타입 변경
return transitsRangeReader; return transitsRangeReader;
} }
@Bean @Bean
@StepScope @StepScope
public TransitsRangeReader transitsRangeReader( public TransitsRangeReader transitsRangeReader() {
@Value("#{jobParameters['startDate']}") String startDate, return new TransitsRangeReader(maritimeApiWebClient, batchDateService);
@Value("#{jobParameters['stopDate']}") String stopDate
) {
// jobParameters 없으면 null 넘어오고 Reader에서 default 처리
return new TransitsRangeReader(maritimeApiWebClient, startDate, stopDate);
} }
@Override @Override
protected ItemProcessor<TransitsDto, TransitsEntity> createProcessor() { protected ItemProcessor<TransitsDto, TransitsEntity> createProcessor() {
@ -109,4 +110,24 @@ public class TransitsRangeJobConfig extends BaseJobConfig<TransitsDto, TransitsE
public Step transitsRangeImportStep() { public Step transitsRangeImportStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet transitsLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "TransitsLastExecutionUpdateStep")
public Step transitsLastExecutionUpdateStep() {
return new StepBuilder("TransitsLastExecutionUpdateStep", jobRepository)
.tasklet(transitsLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -2,62 +2,29 @@ package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto; import com.snp.batch.jobs.movement.batch.dto.AnchorageCallsDto;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j @Slf4j
@StepScope @StepScope
public class AnchorageCallsRangeReader extends BaseApiReader<AnchorageCallsDto> { public class AnchorageCallsRangeReader extends BaseApiReader<AnchorageCallsDto> {
private final BatchDateService batchDateService; // BatchDateService 필드 추가
private List<AnchorageCallsDto> allData; private List<AnchorageCallsDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 5000; private final int batchSize = 5000;
private String startDate;
private String stopDate;
public AnchorageCallsRangeReader(WebClient webClient, protected String getApiKey() {
@Value("#{jobParameters['startDate']}") String startDate, return "ANCHORAGE_CALLS_IMPORT_API";
@Value("#{jobParameters['stopDate']}") String stopDate) { }
public AnchorageCallsRangeReader(WebClient webClient, BatchDateService batchDateService) {
super(webClient); super(webClient);
this.batchDateService = batchDateService;
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() ||
stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); enableChunkMode();
} }
@ -82,17 +49,11 @@ public class AnchorageCallsRangeReader extends BaseApiReader<AnchorageCallsDto>
return "https://webservices.maritime.spglobal.com"; return "https://webservices.maritime.spglobal.com";
} }
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
@Override @Override
protected List<AnchorageCallsDto> fetchNextBatch() throws Exception { protected List<AnchorageCallsDto> fetchNextBatch() throws Exception {
// 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다 // 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다
if (allData == null) { if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); allData = callApiWithBatch();
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
@ -130,12 +91,16 @@ public class AnchorageCallsRangeReader extends BaseApiReader<AnchorageCallsDto>
* Query Parameter를 사용한 API 호출 * Query Parameter를 사용한 API 호출
* @return API 응답 * @return API 응답
*/ */
private List<AnchorageCallsDto> callApiWithBatch(String startDate, String stopDate) { private List<AnchorageCallsDto> callApiWithBatch() {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate=" + stopDate; Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
log.info("[{}] API 호출: {}", getReaderName(), url); log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
return webClient.get() return webClient.get()
.uri(url) .uri(getApiPath(), uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅
.queryParam("startDate", params.get("fromDate"))
.queryParam("stopDate", params.get("toDate"))
.build())
.retrieve() .retrieve()
.bodyToFlux(AnchorageCallsDto.class) .bodyToFlux(AnchorageCallsDto.class)
.collectList() .collectList()

파일 보기

@ -2,61 +2,28 @@ package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.BerthCallsDto; import com.snp.batch.jobs.movement.batch.dto.BerthCallsDto;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j @Slf4j
@StepScope @StepScope
public class BerthCallsRangeReader extends BaseApiReader<BerthCallsDto> { public class BerthCallsRangeReader extends BaseApiReader<BerthCallsDto> {
private final BatchDateService batchDateService; // BatchDateService 필드 추가
private List<BerthCallsDto> allData; private List<BerthCallsDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 5000; private final int batchSize = 5000;
private String startDate; protected String getApiKey() {
private String stopDate; return "BERTH_CALLS_IMPORT_API";
}
public BerthCallsRangeReader(WebClient webClient,
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
public BerthCallsRangeReader(WebClient webClient, BatchDateService batchDateService) {
super(webClient); super(webClient);
this.batchDateService = batchDateService;
// 날짜가 없으면 전날 하루 기준
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); enableChunkMode();
} }
@ -81,17 +48,11 @@ public class BerthCallsRangeReader extends BaseApiReader<BerthCallsDto> {
return "https://webservices.maritime.spglobal.com"; return "https://webservices.maritime.spglobal.com";
} }
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
@Override @Override
protected List<BerthCallsDto> fetchNextBatch() throws Exception { protected List<BerthCallsDto> fetchNextBatch() throws Exception {
// 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다 // 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다
if (allData == null) { if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); allData = callApiWithBatch();
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
@ -129,14 +90,16 @@ public class BerthCallsRangeReader extends BaseApiReader<BerthCallsDto> {
* Query Parameter를 사용한 API 호출 * Query Parameter를 사용한 API 호출
* @return API 응답 * @return API 응답
*/ */
private List<BerthCallsDto> callApiWithBatch(String startDate, String stopDate) { private List<BerthCallsDto> callApiWithBatch() {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate=" + stopDate; Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
// "&lrno=" + lrno; log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get() return webClient.get()
.uri(url) .uri(getApiPath(), uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅
.queryParam("startDate", params.get("fromDate"))
.queryParam("stopDate", params.get("toDate"))
.build())
.retrieve() .retrieve()
.bodyToFlux(BerthCallsDto.class) .bodyToFlux(BerthCallsDto.class)
.collectList() .collectList()

파일 보기

@ -2,62 +2,28 @@ package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.CurrentlyAtDto; import com.snp.batch.jobs.movement.batch.dto.CurrentlyAtDto;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
* <p>
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
* <p>
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
* <p>
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j @Slf4j
@StepScope @StepScope
public class CurrentlyAtRangeReader extends BaseApiReader<CurrentlyAtDto> { public class CurrentlyAtRangeReader extends BaseApiReader<CurrentlyAtDto> {
private final BatchDateService batchDateService; // BatchDateService 필드 추가
private List<CurrentlyAtDto> allData; private List<CurrentlyAtDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 5000; private final int batchSize = 5000;
private String startDate; protected String getApiKey() {
private String stopDate; return "CURRENTLY_AT_IMPORT_API";
}
public CurrentlyAtRangeReader(WebClient webClient, public CurrentlyAtRangeReader(WebClient webClient, BatchDateService batchDateService) {
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient); super(webClient);
this.batchDateService = batchDateService;
// 날짜가 없으면 전날 하루 기준 enableChunkMode();
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
} }
@Override @Override
@ -81,19 +47,12 @@ public class CurrentlyAtRangeReader extends BaseApiReader<CurrentlyAtDto> {
return "https://webservices.maritime.spglobal.com"; return "https://webservices.maritime.spglobal.com";
} }
@Override
protected void beforeFetch() {
// 전처리 과정
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
@Override @Override
protected List<CurrentlyAtDto> fetchNextBatch() throws Exception { protected List<CurrentlyAtDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인 // 모든 배치 처리 완료 확인
if (allData == null ) { if (allData == null ) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); allData = callApiWithBatch();
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
@ -130,13 +89,16 @@ public class CurrentlyAtRangeReader extends BaseApiReader<CurrentlyAtDto> {
* Query Parameter를 사용한 API 호출 * Query Parameter를 사용한 API 호출
* @return API 응답 * @return API 응답
*/ */
private List<CurrentlyAtDto> callApiWithBatch(String startDate, String stopDate) { private List<CurrentlyAtDto> callApiWithBatch() {
String url = getApiPath() + "?dateCreatedUpdatedStart=" + startDate +"&dateCreatedUpdatedStop="+stopDate; Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get() return webClient.get()
.uri(url) .uri(getApiPath(), uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅
.queryParam("startDate", params.get("fromDate"))
.queryParam("stopDate", params.get("toDate"))
.build())
.retrieve() .retrieve()
.bodyToFlux(CurrentlyAtDto.class) .bodyToFlux(CurrentlyAtDto.class)
.collectList() .collectList()

파일 보기

@ -2,71 +2,33 @@ package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.DestinationDto; import com.snp.batch.jobs.movement.batch.dto.DestinationDto;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
* <p>
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
* <p>
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
* <p>
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j @Slf4j
@StepScope @StepScope
public class DestinationRangeReader extends BaseApiReader<DestinationDto> { public class DestinationRangeReader extends BaseApiReader<DestinationDto> {
private final BatchDateService batchDateService; // BatchDateService 필드 추가
private List<DestinationDto> allData; private List<DestinationDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 1000; private final int batchSize = 1000;
private String startDate; protected String getApiKey() {
private String stopDate; return "DESTINATIONS_IMPORT_API";
}
public DestinationRangeReader(WebClient webClient, public DestinationRangeReader(WebClient webClient, BatchDateService batchDateService) {
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient); super(webClient);
// 날짜가 + 한달 기간 도착예정지 정보 update this.batchDateService = batchDateService;
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { enableChunkMode();
LocalDate today = LocalDate.now();
this.startDate = today
.atStartOfDay()
.format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = today
.plusDays(15)
.atStartOfDay()
.format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
} }
@Override @Override
protected String getReaderName() { protected String getReaderName() {
return "DestinationsRange"; return "DestinationsRangeReader";
} }
@Override @Override
@ -85,22 +47,11 @@ public class DestinationRangeReader extends BaseApiReader<DestinationDto> {
return "https://webservices.maritime.spglobal.com"; return "https://webservices.maritime.spglobal.com";
} }
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
@Override @Override
protected List<DestinationDto> fetchNextBatch() throws Exception { protected List<DestinationDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인
// 모든 배치 처리 완료 확인
if (allData == null) { if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); allData = callApiWithBatch();
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
@ -136,14 +87,16 @@ public class DestinationRangeReader extends BaseApiReader<DestinationDto> {
* Query Parameter를 사용한 API 호출 * Query Parameter를 사용한 API 호출
* @return API 응답 * @return API 응답
*/ */
private List<DestinationDto> callApiWithBatch(String startDate, String stopDate) { private List<DestinationDto> callApiWithBatch() {
String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate; Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
// +"&lrno=" + lrno; log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get() return webClient.get()
.uri(url) .uri(getApiPath(), uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅
.queryParam("startDate", params.get("fromDate"))
.queryParam("stopDate", params.get("toDate"))
.build())
.retrieve() .retrieve()
.bodyToFlux(DestinationDto.class) .bodyToFlux(DestinationDto.class)
.collectList() .collectList()

파일 보기

@ -2,63 +2,29 @@ package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.PortCallsDto; import com.snp.batch.jobs.movement.batch.dto.PortCallsDto;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j @Slf4j
@StepScope @StepScope
public class PortCallsRangeReader extends BaseApiReader<PortCallsDto> { public class PortCallsRangeReader extends BaseApiReader<PortCallsDto> {
private final BatchDateService batchDateService; // BatchDateService 필드 추가
private List<PortCallsDto> allData; private List<PortCallsDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 5000; private final int batchSize = 5000;
private String startDate; protected String getApiKey() {
private String stopDate; return "PORT_CALLS_IMPORT_API";
public PortCallsRangeReader(WebClient webClient, }
@Value("#{jobParameters['startDate']}") String startDate, public PortCallsRangeReader(WebClient webClient, BatchDateService batchDateService) {
@Value("#{jobParameters['stopDate']}") String stopDate) { super(webClient);
super(webClient); this.batchDateService = batchDateService;
enableChunkMode();
// 날짜가 없으면 전날 하루 기준 }
if (startDate == null || startDate.isBlank() ||
stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode();
}
@Override @Override
protected String getReaderName() { protected String getReaderName() {
@ -81,25 +47,12 @@ public class PortCallsRangeReader extends BaseApiReader<PortCallsDto> {
return "https://webservices.maritime.spglobal.com"; return "https://webservices.maritime.spglobal.com";
} }
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
/**
* Chunk 기반 핵심 메서드: 다음 배치를 조회하여 반환
*
* Spring Batch가 batchsize만큼 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 ( 이상 없으면 null)
*/
@Override @Override
protected List<PortCallsDto> fetchNextBatch() throws Exception { protected List<PortCallsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인 // 모든 배치 처리 완료 확인
if (allData == null) { if (allData == null) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); allData = callApiWithBatch();
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
@ -137,12 +90,15 @@ public class PortCallsRangeReader extends BaseApiReader<PortCallsDto> {
* Query Parameter를 사용한 API 호출 * Query Parameter를 사용한 API 호출
* @return API 응답 * @return API 응답
*/ */
private List<PortCallsDto> callApiWithBatch(String startDate, String stopDate) { private List<PortCallsDto> callApiWithBatch() {
String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate; Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
log.info("[{}] API 호출: {}", getReaderName(), url); log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
return webClient.get() return webClient.get()
.uri(url) .uri(getApiPath(), uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅
.queryParam("startDate", params.get("fromDate"))
.queryParam("stopDate", params.get("toDate"))
.build())
.retrieve() .retrieve()
.bodyToFlux(PortCallsDto.class) .bodyToFlux(PortCallsDto.class)
.collectList() .collectList()

파일 보기

@ -2,6 +2,7 @@ package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.StsOperationDto; import com.snp.batch.jobs.movement.batch.dto.StsOperationDto;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -10,57 +11,28 @@ import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j @Slf4j
@StepScope @StepScope
public class StsOperationRangeReader extends BaseApiReader<StsOperationDto> { public class StsOperationRangeReader extends BaseApiReader<StsOperationDto> {
private final BatchDateService batchDateService; // BatchDateService 필드 추가
private List<StsOperationDto> allData; private List<StsOperationDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 5000; private final int batchSize = 5000;
private String startDate; protected String getApiKey() {
private String stopDate; return "STS_OPERATION_IMPORT_API";
}
public StsOperationRangeReader(WebClient webClient, public StsOperationRangeReader(WebClient webClient, BatchDateService batchDateService) {
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient); super(webClient);
// 날짜가 없으면 전날 하루 기준 this.batchDateService = batchDateService;
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { enableChunkMode();
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
} }
@Override @Override
protected String getReaderName() { protected String getReaderName() {
return "StsOperationReader"; return "StsOperationRangeReader";
} }
@Override @Override
@ -79,28 +51,12 @@ public class StsOperationRangeReader extends BaseApiReader<StsOperationDto> {
return "https://webservices.maritime.spglobal.com"; return "https://webservices.maritime.spglobal.com";
} }
/**
* 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회
*/
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override @Override
protected List<StsOperationDto> fetchNextBatch() throws Exception { protected List<StsOperationDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인 // 모든 배치 처리 완료 확인
if (allData == null ) { if (allData == null ) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); allData = callApiWithBatch();
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
@ -134,17 +90,18 @@ public class StsOperationRangeReader extends BaseApiReader<StsOperationDto> {
/** /**
* Query Parameter를 사용한 API 호출 * Query Parameter를 사용한 API 호출
*
* @param startDate,stopDate
* @return API 응답 * @return API 응답
*/ */
private List<StsOperationDto> callApiWithBatch(String startDate, String stopDate) { private List<StsOperationDto> callApiWithBatch() {
String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate; Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get() return webClient.get()
.uri(url) .uri(getApiPath(), uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅
.queryParam("startDate", params.get("fromDate"))
.queryParam("stopDate", params.get("toDate"))
.build())
.retrieve() .retrieve()
.bodyToFlux(StsOperationDto.class) .bodyToFlux(StsOperationDto.class)
.collectList() .collectList()

파일 보기

@ -2,65 +2,33 @@ package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto; import com.snp.batch.jobs.movement.batch.dto.TerminalCallsDto;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
* <p>
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
* <p>
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
* <p>
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j @Slf4j
@StepScope @StepScope
public class TerminalCallsRangeReader extends BaseApiReader<TerminalCallsDto> { public class TerminalCallsRangeReader extends BaseApiReader<TerminalCallsDto> {
private final BatchDateService batchDateService; // BatchDateService 필드 추가
private List<TerminalCallsDto> allData; private List<TerminalCallsDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 1000; private final int batchSize = 1000;
private String startDate; protected String getApiKey() {
private String stopDate; return "TERMINAL_CALLS_IMPORT_API";
}
public TerminalCallsRangeReader(WebClient webClient, public TerminalCallsRangeReader(WebClient webClient, BatchDateService batchDateService) {
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient); super(webClient);
// 날짜가 없으면 전날 하루 기준 this.batchDateService = batchDateService;
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { enableChunkMode();
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
} }
@Override @Override
protected String getReaderName() { protected String getReaderName() {
return "TerminalCalls"; return "TerminalCallsRangeReader";
} }
@Override @Override
@ -79,26 +47,12 @@ public class TerminalCallsRangeReader extends BaseApiReader<TerminalCallsDto> {
return "https://webservices.maritime.spglobal.com"; return "https://webservices.maritime.spglobal.com";
} }
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
* <p>
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override @Override
protected List<TerminalCallsDto> fetchNextBatch() throws Exception { protected List<TerminalCallsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인 // 모든 배치 처리 완료 확인
if (allData == null ) { if (allData == null ) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); allData = callApiWithBatch();
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
@ -132,15 +86,18 @@ public class TerminalCallsRangeReader extends BaseApiReader<TerminalCallsDto> {
/** /**
* Query Parameter를 사용한 API 호출 * Query Parameter를 사용한 API 호출
* @param startDate, stopDate
* @return API 응답 * @return API 응답
*/ */
private List<TerminalCallsDto> callApiWithBatch(String startDate, String stopDate) { private List<TerminalCallsDto> callApiWithBatch() {
String url = getApiPath() + "?startDate=" + startDate + "&stopDate=" + stopDate; Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
log.debug("[{}] API 호출: {}", getReaderName(), url); log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
return webClient.get() return webClient.get()
.uri(url) .uri(getApiPath(), uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅
.queryParam("startDate", params.get("fromDate"))
.queryParam("stopDate", params.get("toDate"))
.build())
.retrieve() .retrieve()
.bodyToFlux(TerminalCallsDto.class) .bodyToFlux(TerminalCallsDto.class)
.collectList() .collectList()

파일 보기

@ -2,65 +2,33 @@ package com.snp.batch.jobs.movement.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.movement.batch.dto.TransitsDto; import com.snp.batch.jobs.movement.batch.dto.TransitsDto;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.Map;
/**
* 선박 상세 정보 Reader (v2.0 - Chunk 기반)
*
* 기능:
* 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회)
* 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리
* 3. fetchNextBatch() 호출 시마다 100개씩 API 호출
* 4. Spring Batch가 100건씩 Process Write 수행
*
* Chunk 처리 흐름:
* - beforeFetch() IMO 전체 조회 (1회)
* - fetchNextBatch() 100개 IMO로 API 호출 (1,718회)
* - read() 1건씩 반환 (100번)
* - Processor/Writer 100건 처리
* - 반복... (1,718번의 Chunk)
*
* 기존 방식과의 차이:
* - 기존: 17만건 전체 메모리 로드 Process Write
* - 신규: 100건씩 로드 Process Write (Chunk 1,718회)
*/
@Slf4j @Slf4j
@StepScope @StepScope
public class TransitsRangeReader extends BaseApiReader<TransitsDto> { public class TransitsRangeReader extends BaseApiReader<TransitsDto> {
private final BatchDateService batchDateService; // BatchDateService 필드 추가
private List<TransitsDto> allData; private List<TransitsDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 1000; private final int batchSize = 1000;
private String startDate; protected String getApiKey() {
private String stopDate; return "TRANSITS_IMPORT_API";
}
public TransitsRangeReader(WebClient webClient, public TransitsRangeReader(WebClient webClient, BatchDateService batchDateService) {
@Value("#{jobParameters['startDate']}") String startDate,
@Value("#{jobParameters['stopDate']}") String stopDate) {
super(webClient); super(webClient);
// 날짜가 없으면 전날 하루 기준 this.batchDateService = batchDateService;
if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { enableChunkMode();
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode(); // Chunk 모드 활성화
} }
@Override @Override
protected String getReaderName() { protected String getReaderName() {
return "Transits"; return "TransitsRangeReader";
} }
@Override @Override
@ -79,25 +47,12 @@ public class TransitsRangeReader extends BaseApiReader<TransitsDto> {
return "https://webservices.maritime.spglobal.com"; return "https://webservices.maritime.spglobal.com";
} }
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
/**
* Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환
*
* Spring Batch가 100건씩 read() 호출 완료 메서드 재호출
*
* @return 다음 배치 100건 ( 이상 없으면 null)
*/
@Override @Override
protected List<TransitsDto> fetchNextBatch() throws Exception { protected List<TransitsDto> fetchNextBatch() throws Exception {
// 모든 배치 처리 완료 확인 // 모든 배치 처리 완료 확인
if (allData == null ) { if (allData == null ) {
log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); allData = callApiWithBatch();
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName());
@ -134,14 +89,16 @@ public class TransitsRangeReader extends BaseApiReader<TransitsDto> {
* @param startDate,stopDate * @param startDate,stopDate
* @return API 응답 * @return API 응답
*/ */
private List<TransitsDto> callApiWithBatch(String startDate, String stopDate) { private List<TransitsDto> callApiWithBatch() {
String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate; Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
// +"&lrno=" + lrno; log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get() return webClient.get()
.uri(url) .uri(getApiPath(), uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅
.queryParam("startDate", params.get("fromDate"))
.queryParam("stopDate", params.get("toDate"))
.build())
.retrieve() .retrieve()
.bodyToFlux(TransitsDto.class) .bodyToFlux(TransitsDto.class)
.collectList() .collectList()

파일 보기

@ -1,54 +1,42 @@
package com.snp.batch.jobs.pscInspection.batch.config; package com.snp.batch.jobs.pscInspection.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.pscInspection.batch.dto.PscInspectionDto; import com.snp.batch.jobs.pscInspection.batch.dto.PscInspectionDto;
import com.snp.batch.jobs.pscInspection.batch.entity.PscInspectionEntity; import com.snp.batch.jobs.pscInspection.batch.entity.PscInspectionEntity;
import com.snp.batch.jobs.pscInspection.batch.processor.PscInspectionProcessor; import com.snp.batch.jobs.pscInspection.batch.processor.PscInspectionProcessor;
import com.snp.batch.jobs.pscInspection.batch.reader.PscApiReader; import com.snp.batch.jobs.pscInspection.batch.reader.PscApiReader;
import com.snp.batch.jobs.pscInspection.batch.writer.PscInspectionWriter; import com.snp.batch.jobs.pscInspection.batch.writer.PscInspectionWriter;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.beans.factory.annotation.Value;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* ShipMovementReader (ship_data Maritime API)
* (PortCallDto)
* ShipMovementProcessor
* (ShipMovementEntity)
* ShipDetailDataWriter
* (ship_movement 테이블)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class PscInspectionJobConfig extends BaseJobConfig<PscInspectionDto, PscInspectionEntity> { public class PscInspectionJobConfig extends BaseMultiStepJobConfig<PscInspectionDto, PscInspectionEntity> {
private final PscInspectionProcessor pscInspectionProcessor; private final PscInspectionProcessor pscInspectionProcessor;
private final PscInspectionWriter pscInspectionWriter; private final PscInspectionWriter pscInspectionWriter;
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
private final BatchDateService batchDateService;
protected String getApiKey() {return "PSC_IMPORT_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public PscInspectionJobConfig( public PscInspectionJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
@ -56,11 +44,13 @@ public class PscInspectionJobConfig extends BaseJobConfig<PscInspectionDto, PscI
PscInspectionProcessor pscInspectionProcessor, PscInspectionProcessor pscInspectionProcessor,
PscInspectionWriter pscInspectionWriter, PscInspectionWriter pscInspectionWriter,
JdbcTemplate jdbcTemplate, JdbcTemplate jdbcTemplate,
@Qualifier("maritimeApiWebClient") WebClient maritimeApiWebClient, PscApiReader pscApiReader) { // ObjectMapper 주입 추가 BatchDateService batchDateService,
@Qualifier("maritimeApiWebClient") WebClient maritimeApiWebClient) { // ObjectMapper 주입 추가
super(jobRepository, transactionManager); super(jobRepository, transactionManager);
this.pscInspectionProcessor = pscInspectionProcessor; this.pscInspectionProcessor = pscInspectionProcessor;
this.pscInspectionWriter = pscInspectionWriter; this.pscInspectionWriter = pscInspectionWriter;
this.jdbcTemplate = jdbcTemplate; this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService;
this.maritimeApiWebClient = maritimeApiWebClient; this.maritimeApiWebClient = maritimeApiWebClient;
} }
@ -76,19 +66,17 @@ public class PscInspectionJobConfig extends BaseJobConfig<PscInspectionDto, PscI
return "PSCDetailImportStep"; return "PSCDetailImportStep";
} }
@Bean @Override
@StepScope protected Job createJobFlow(JobBuilder jobBuilder) {
public PscApiReader pscApiReader( return jobBuilder
@Qualifier("maritimeApiWebClient") WebClient webClient, .start(PSCDetailImportStep())
@Value("#{jobParameters['startDate']}") String fromDate, .next(pscLastExecutionUpdateStep())
@Value("#{jobParameters['stopDate']}") String toDate .build();
) {
return new PscApiReader(webClient, fromDate, toDate);
} }
@Override @Override
protected ItemReader<PscInspectionDto> createReader() { protected ItemReader<PscInspectionDto> createReader() {
return pscApiReader(null, null, null); return new PscApiReader(maritimeApiWebClient, jdbcTemplate, batchDateService);
} }
@Override @Override
@ -115,4 +103,24 @@ public class PscInspectionJobConfig extends BaseJobConfig<PscInspectionDto, PscI
public Step PSCDetailImportStep() { public Step PSCDetailImportStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet pscLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "PSCLastExecutionUpdateStep")
public Step pscLastExecutionUpdateStep() {
return new StepBuilder("PSCLastExecutionUpdateStep", jobRepository)
.tasklet(pscLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -4,10 +4,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.pscInspection.batch.dto.PscApiResponseDto; import com.snp.batch.jobs.pscInspection.batch.dto.PscApiResponseDto;
import com.snp.batch.jobs.pscInspection.batch.dto.PscInspectionDto; import com.snp.batch.jobs.pscInspection.batch.dto.PscInspectionDto;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate; import java.time.LocalDate;
@ -16,35 +18,26 @@ import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
@Slf4j @Slf4j
@StepScope @StepScope
public class PscApiReader extends BaseApiReader<PscInspectionDto> { public class PscApiReader extends BaseApiReader<PscInspectionDto> {
private final String startDate;
private final String stopDate;
private List<PscInspectionDto> allData; private List<PscInspectionDto> allData;
private int currentBatchIndex = 0; private int currentBatchIndex = 0;
private final int batchSize = 1000; private final int batchSize = 1000;
public PscApiReader(@Qualifier("maritimeApiWebClient") WebClient webClient, private final JdbcTemplate jdbcTemplate;
@Value("#{jobParameters['startDate']}") String startDate, private final BatchDateService batchDateService; // BatchDateService 필드 추가
@Value("#{jobParameters['stopDate']}") String stopDate) { protected String getApiKey() {
return "PSC_IMPORT_API";
}
public PscApiReader(WebClient webClient, JdbcTemplate jdbcTemplate, BatchDateService batchDateService) {
super(webClient); super(webClient);
this.jdbcTemplate = jdbcTemplate;
// 날짜가 없으면 전날 하루 기준 this.batchDateService = batchDateService;
if (startDate == null || startDate.isBlank() || enableChunkMode(); // Chunk 모드 활성화
stopDate == null || stopDate.isBlank()) {
LocalDate yesterday = LocalDate.now().minusDays(1);
this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z";
} else {
this.startDate = startDate;
this.stopDate = stopDate;
}
enableChunkMode();
} }
@Override @Override
@ -63,18 +56,12 @@ public class PscApiReader extends BaseApiReader<PscInspectionDto> {
return "/MaritimeWCF/PSCService.svc/RESTFul/GetPSCDataByLastUpdateDateRange"; return "/MaritimeWCF/PSCService.svc/RESTFul/GetPSCDataByLastUpdateDateRange";
} }
@Override
protected void beforeFetch() {
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate);
}
@Override @Override
protected List<PscInspectionDto> fetchNextBatch() { protected List<PscInspectionDto> fetchNextBatch() {
// 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다 // 1) 처음 호출이면 API 호출해서 전체 데이터를 가져온다
if (allData == null) { if (allData == null) {
log.info("[PSC] 최초 API 조회 실행: {} ~ {}", startDate, stopDate); allData = callApiWithBatch();
allData = callApiWithBatch(startDate, stopDate);
if (allData == null || allData.isEmpty()) { if (allData == null || allData.isEmpty()) {
log.warn("[PSC] 조회된 데이터 없음 → 종료"); log.warn("[PSC] 조회된 데이터 없음 → 종료");
@ -107,24 +94,28 @@ public class PscApiReader extends BaseApiReader<PscInspectionDto> {
return batch; return batch;
} }
private List<PscInspectionDto> callApiWithBatch(String startDate, String stopDate) { private List<PscInspectionDto> callApiWithBatch() {
LocalDateTime fromDay = parseToDateTime(startDate, true); Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(getApiKey());
LocalDateTime toDay = parseToDateTime(stopDate, false);
String url = getApiPath() String url = getApiPath();
+ "?shipsCategory=0"
+ "&fromYear=" + fromDay.getYear()
+ "&fromMonth=" + fromDay.getMonthValue()
+ "&fromDay=" + fromDay.getDayOfMonth()
+ "&toYear=" + toDay.getYear()
+ "&toMonth=" + toDay.getMonthValue()
+ "&toDay=" + toDay.getDayOfMonth();
log.info("[PSC] API 호출 URL = {}", url); log.info("[{}] PSC API Date Range: {} → {}",
getReaderName(),
String.format("%s-%s-%s",params.get("fromYear"),params.get("fromMonth"),params.get("fromDay")),
String.format("%s-%s-%s",params.get("toYear"),params.get("toMonth"),params.get("toDay"))
);
String json = webClient.get() String json = webClient.get()
.uri(url) .uri(url, uriBuilder -> uriBuilder
.queryParam("shipsCategory", params.get("shipsCategory"))
.queryParam("fromYear", params.get("fromYear"))
.queryParam("fromMonth", params.get("fromMonth"))
.queryParam("fromDay", params.get("fromDay"))
.queryParam("toYear", params.get("toYear"))
.queryParam("toMonth", params.get("toMonth"))
.queryParam("toDay", params.get("toDay"))
.build())
.retrieve() .retrieve()
.bodyToMono(String.class) .bodyToMono(String.class)
.block(); .block();

파일 보기

@ -101,7 +101,7 @@ public class RiskDataRangeReader extends BaseApiReader<RiskDto> {
} }
private List<RiskDto> callApiWithBatch() { private List<RiskDto> callApiWithBatch() {
Map<String, String> params = batchDateService.getRiskComplianceApiDateParams(getApiKey()); Map<String, String> params = batchDateService.getDateRangeWithTimezoneParams(getApiKey());
log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate")); log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), params.get("fromDate"), params.get("toDate"));
String url = getApiPath(); String url = getApiPath();
@ -116,4 +116,5 @@ public class RiskDataRangeReader extends BaseApiReader<RiskDto> {
.block(); .block();
} }
} }

파일 보기

@ -1,7 +1,7 @@
package com.snp.batch.jobs.shipdetail.batch.config; package com.snp.batch.jobs.shipdetail.batch.config;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseMultiStepJobConfig;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailComparisonData; import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailComparisonData;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailUpdate; import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailUpdate;
import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor; import com.snp.batch.jobs.shipdetail.batch.processor.ShipDetailDataProcessor;
@ -11,10 +11,14 @@ import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -22,33 +26,9 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/**
* 선박 상세 정보 Import Job Config
*
* 특징:
* - ship_data 테이블에서 IMO 번호 조회
* - IMO 번호를 100개씩 배치로 분할
* - Maritime API GetShipsByIHSLRorIMONumbers 호출
* TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경
* - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT)
*
* 데이터 흐름:
* ShipDetailDataReader (ship_data Maritime API)
* (ShipDetailDto)
* ShipDetailDataProcessor
* (ShipDetailEntity)
* ShipDetailDataWriter
* (ship_detail 테이블)
*/
/**
* 선박 상세 정보 Import Job Config
* I: ShipDetailComparisonData (Reader 출력)
* O: ShipDetailUpdate (Processor 출력)
*/
@Slf4j @Slf4j
@Configuration @Configuration
public class ShipDetailUpdateJobConfig extends BaseJobConfig<ShipDetailComparisonData, ShipDetailUpdate> { public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig<ShipDetailComparisonData, ShipDetailUpdate> {
private final ShipDetailDataProcessor shipDetailDataProcessor; private final ShipDetailDataProcessor shipDetailDataProcessor;
private final ShipDetailDataWriter shipDetailDataWriter; private final ShipDetailDataWriter shipDetailDataWriter;
@ -56,6 +36,10 @@ public class ShipDetailUpdateJobConfig extends BaseJobConfig<ShipDetailCompariso
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
private final ObjectMapper objectMapper; // ObjectMapper 주입 추가 private final ObjectMapper objectMapper; // ObjectMapper 주입 추가
private final BatchDateService batchDateService; private final BatchDateService batchDateService;
protected String getApiKey() {return "SHIP_DETAIL_UPDATE_API";}
protected String getBatchUpdateSql() {
return String.format("UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", getApiKey());}
public ShipDetailUpdateJobConfig( public ShipDetailUpdateJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
@ -85,6 +69,14 @@ public class ShipDetailUpdateJobConfig extends BaseJobConfig<ShipDetailCompariso
return "ShipDetailUpdateStep"; return "ShipDetailUpdateStep";
} }
@Override
protected Job createJobFlow(JobBuilder jobBuilder) {
return jobBuilder
.start(ShipDetailUpdateStep())
.next(shipDetailLastExecutionUpdateStep())
.build();
}
@Override @Override
protected ItemReader<ShipDetailComparisonData> createReader() { // 타입 변경 protected ItemReader<ShipDetailComparisonData> createReader() { // 타입 변경
// Reader 생성자 수정: ObjectMapper를 전달합니다. // Reader 생성자 수정: ObjectMapper를 전달합니다.
@ -115,4 +107,25 @@ public class ShipDetailUpdateJobConfig extends BaseJobConfig<ShipDetailCompariso
public Step ShipDetailUpdateStep() { public Step ShipDetailUpdateStep() {
return step(); return step();
} }
/**
* 2단계: 모든 스텝 성공 배치 실행 로그(날짜) 업데이트
*/
@Bean
public Tasklet shipDetailLastExecutionUpdateTasklet() {
return (contribution, chunkContext) -> {
log.info(">>>>> 모든 스텝 성공: BATCH_LAST_EXECUTION 업데이트 시작");
jdbcTemplate.execute(getBatchUpdateSql());
log.info(">>>>> BATCH_LAST_EXECUTION 업데이트 완료");
return RepeatStatus.FINISHED;
};
}
@Bean(name = "ShipDetailLastExecutionUpdateStep")
public Step shipDetailLastExecutionUpdateStep() {
return new StepBuilder("ShipDetailLastExecutionUpdateStep", jobRepository)
.tasklet(shipDetailLastExecutionUpdateTasklet(), transactionManager)
.build();
}
} }

파일 보기

@ -11,6 +11,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -252,10 +253,15 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailComparis
private ShipUpdateApiResponse callShipUpdateApi(){ private ShipUpdateApiResponse callShipUpdateApi(){
// 1. BatchDateService를 통해 동적 날짜 파라미터 조회 // 1. BatchDateService를 통해 동적 날짜 파라미터 조회
Map<String, String> params = batchDateService.getShipUpdateApiDateParams(getApiKey()); Map<String, String> params = batchDateService.getDateRangeWithoutTimeParams(getApiKey());
String url = getShipUpdateApiPath(); String url = getShipUpdateApiPath();
log.info("[{}] API 호출: {}", getReaderName(), url);
log.info("[{}] Ship Detail Update Date Range: {} → {}",
getReaderName(),
String.format("%s-%s-%s",params.get("fromYear"),params.get("fromMonth"),params.get("fromDay")),
String.format("%s-%s-%s",params.get("toYear"),params.get("toMonth"),params.get("toDay"))
);
return webClient.get() return webClient.get()
.uri(url, uriBuilder -> uriBuilder .uri(url, uriBuilder -> uriBuilder
@ -291,11 +297,6 @@ public class ShipDetailUpdateDataReader extends BaseApiReader<ShipDetailComparis
int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize);
try{ try{
if (data == null) { if (data == null) {
// 3. 배치 성공 상태 업데이트 (트랜잭션 커밋 직전에 실행)
LocalDate successDate = LocalDate.now(); // 현재 배치 실행 시점의 날짜 (Reader의 toDay와 동일한 )
batchDateService.updateLastSuccessDate(getApiKey(), successDate);
log.info("batch_last_execution update 완료 : {}", getApiKey());
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료",
getReaderName(), allImoNumbers.size()); getReaderName(), allImoNumbers.size());

파일 보기

@ -1,15 +1,16 @@
package com.snp.batch.service; package com.snp.batch.service;
import com.snp.batch.global.model.BatchLastExecution; import com.snp.batch.global.model.BatchLastExecution;
import com.snp.batch.global.repository.BatchLastExecutionRepository; import com.snp.batch.global.repository.BatchLastExecutionRepository;
import jakarta.transaction.Transactional; import jakarta.transaction.Transactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDate; import java.time.*;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.HashMap;
import java.util.Map;
@Slf4j
@Service @Service
public class BatchDateService { public class BatchDateService {
private final BatchLastExecutionRepository repository; private final BatchLastExecutionRepository repository;
@ -18,74 +19,78 @@ public class BatchDateService {
this.repository = repository; this.repository = repository;
} }
/** public Map<String, String> getDateRangeWithoutTimeParams(String apiKey) {
* API 호출에 필요한 from/to 날짜 파라미터를 계산하고 반환합니다. return repository.findDateRangeByApiKey(apiKey)
*/ .map(projection -> {
public Map<String, String> getShipUpdateApiDateParams(String apiKey) { Map<String, String> params = new HashMap<>();
// 1. 마지막 성공 일자 (FROM 날짜) DB에서 조회
// 조회된 값이 없으면 (최초 실행), API 호출 시점의 하루 날짜를 사용합니다.
LocalDate lastDate = repository.findLastSuccessDate(apiKey)
.orElse(LocalDate.now().minusDays(1));
// 2. 현재 실행 시점의 일자 (TO 날짜) 계산 LocalDateTime fromTarget = (projection.getRangeFromDate() != null)
LocalDate currentDate = LocalDate.now(); ? projection.getRangeFromDate()
: projection.getLastSuccessDate();
// 3. 파라미터 Map 구성 LocalDateTime toTarget = (projection.getRangeToDate() != null)
Map<String, String> params = new HashMap<>(); ? projection.getRangeToDate()
: LocalDateTime.now();
// FROM Parameters (DB 조회 ) // 2. 파라미터 맵에 날짜 정보 매핑
params.put("fromYear", String.valueOf(lastDate.getYear())); putDateParams(params, "from", fromTarget);
params.put("fromMonth", String.valueOf(lastDate.getMonthValue())); putDateParams(params, "to", toTarget);
params.put("fromDay", String.valueOf(lastDate.getDayOfMonth()));
// TO Parameters (현재 시점 ) // 3. 고정 설정
params.put("toYear", String.valueOf(currentDate.getYear())); params.put("shipsCategory", "0");
params.put("toMonth", String.valueOf(currentDate.getMonthValue()));
params.put("toDay", String.valueOf(currentDate.getDayOfMonth()));
// 고정 return params;
params.put("shipsCategory", "0"); })
.orElseGet(() -> {
return params; log.warn("해당 apiKey에 대한 데이터를 찾을 수 없습니다: {}", apiKey);
} return new HashMap<>();
});
public Map<String, String> getRiskComplianceApiDateParams(String apiKey) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SS'Z'");
// 1. 마지막 성공 일자 (FROM 날짜) DB에서 조회
// 조회된 값이 없으면 (최초 실행), API 호출 시점의 하루 날짜를 사용합니다.
LocalDate lastDate = repository.findLastSuccessDate(apiKey)
.orElse(LocalDate.now().minusDays(1));
// 2. 현재 실행 시점의 일자 (TO 날짜) 계산
ZonedDateTime nowUtc = ZonedDateTime.now(ZoneOffset.UTC);
// 3. 파라미터 Map 구성
Map<String, String> params = new HashMap<>();
// FROM Parameters (DB 조회 )
String fromDateStr = lastDate.atStartOfDay().format(formatter);
params.put("fromDate", fromDateStr);
// TO Parameters (현재 시점 )
String toDateStr = nowUtc.format(formatter);
params.put("toDate", toDateStr);
return params;
} }
/** /**
* 배치 성공 , 다음 실행을 위해 to 날짜를 DB에 저장 업데이트합니다. * LocalDateTime에서 , , 일을 추출하여 Map에 담는 헬퍼 메소드
* @param successDate API 호출 성공 사용된 to 날짜
*/ */
@Transactional // UPDATE 쿼리를 사용하므로 트랜잭션 필요 private void putDateParams(Map<String, String> params, String prefix, LocalDateTime dateTime) {
public void updateLastSuccessDate(String apiKey, LocalDate successDate) { // apiKey 추가 if (dateTime != null) {
params.put(prefix + "Year", String.valueOf(dateTime.getYear()));
// 1. UPDATE 시도 params.put(prefix + "Month", String.valueOf(dateTime.getMonthValue()));
int updatedRows = repository.updateLastSuccessDate(apiKey, successDate); params.put(prefix + "Day", String.valueOf(dateTime.getDayOfMonth()));
// 2. 업데이트된 레코드가 없다면 (최초 실행), INSERT 수행
if (updatedRows == 0) {
BatchLastExecution entity = new BatchLastExecution(apiKey, successDate);
repository.save(entity);
} }
} }
public Map<String, String> getDateRangeWithTimezoneParams(String apiKey) {
return repository.findDateRangeByApiKey(apiKey)
.map(projection -> {
Map<String, String> params = new HashMap<>();
// 'Z' 문자열 리터럴이 아닌 실제 타임존 기호(X) 처리
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSX");
// 한국 시간을 UTC로 변환하는 헬퍼 메소드 (아래 정의)
params.put("fromDate", formatToUtc(projection.getRangeFromDate() != null ?
projection.getRangeFromDate() : projection.getLastSuccessDate(), formatter));
LocalDateTime toDateTime = projection.getRangeToDate() != null ?
projection.getRangeToDate() : LocalDateTime.now();
params.put("toDate", formatToUtc(toDateTime, formatter));
return params;
})
.orElseGet(() -> {
log.warn("해당 apiKey에 대한 데이터를 찾을 수 없습니다: {}", apiKey);
return new HashMap<>();
});
}
// 한국 시간(LocalDateTime) UTC 문자열로 변환하는 로직
private String formatToUtc(LocalDateTime localDateTime, DateTimeFormatter formatter) {
if (localDateTime == null) return null;
// 1. 한국 시간대(KST)임을 명시
// 2. UTC로 시간대를 변경 (9시간 빠짐)
// 3. 포맷팅 (끝에 Z가 자동으로 붙음)
return localDateTime.atZone(ZoneId.of("Asia/Seoul"))
.withZoneSameInstant(ZoneOffset.UTC)
.format(formatter);
}
} }