From 49d2de19651d0290e2f063af7cb60a7c6ae1b45a Mon Sep 17 00:00:00 2001 From: HeungTak Lee Date: Fri, 9 Jan 2026 14:25:27 +0900 Subject: [PATCH] =?UTF-8?q?:sparkles:=20AIS=20Target=20DB=20Sync=20Job=20?= =?UTF-8?q?=EB=B6=84=EB=A6=AC=20(=EC=BA=90=EC=8B=9C=E2=86=92DB=2015?= =?UTF-8?q?=EB=B6=84=20=EC=A3=BC=EA=B8=B0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AisTargetDataWriter: DB 저장 제거, 캐시 업데이트만 수행 - AisTargetDbSyncJob 신규 생성: 15분 주기 캐시→DB 동기화 - AisTargetDbSyncTasklet: 캐시에서 최근 15분 데이터 조회 후 UPSERT - application.yml: ais-target-db-sync 설정 추가 데이터 흐름 변경: - 기존: API(1분) → 캐시 + DB (매분 33K 건 저장) - 변경: API(1분) → 캐시만, DB는 15분마다 MMSI별 최신 1건 저장 Co-Authored-By: Claude Opus 4.5 --- .../batch/writer/AisTargetDataWriter.java | 26 +++--- .../config/AisTargetDbSyncJobConfig.java | 79 ++++++++++++++++++ .../batch/tasklet/AisTargetDbSyncTasklet.java | 83 +++++++++++++++++++ src/main/resources/application.yml | 9 +- 4 files changed, 181 insertions(+), 16 deletions(-) create mode 100644 src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java create mode 100644 src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/AisTargetDbSyncTasklet.java diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java index 76f81b0..60dd073 100644 --- a/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java @@ -2,7 +2,6 @@ package com.snp.batch.jobs.aistarget.batch.writer; import com.snp.batch.common.batch.writer.BaseWriter; import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; -import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository; import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager; import com.snp.batch.jobs.aistarget.classifier.AisClassTypeClassifier; import lombok.extern.slf4j.Slf4j; @@ -11,46 +10,43 @@ import org.springframework.stereotype.Component; import java.util.List; /** - * AIS Target 데이터 Writer + * AIS Target 데이터 Writer (캐시 전용) * * 동작: - * 1. UPSERT 방식으로 DB 저장 (PK: mmsi + message_timestamp) - * 2. ClassType 분류 (Core20 캐시 기반 A/B 분류) - * 3. 캐시에 최신 위치 정보 업데이트 (classType, core20Mmsi 포함) + * 1. ClassType 분류 (Core20 캐시 기반 A/B 분류) + * 2. 캐시에 최신 위치 정보 업데이트 (classType, core20Mmsi 포함) + * + * 참고: + * - DB 저장은 별도 Job(aisTargetDbSyncJob)에서 15분 주기로 수행 + * - 이 Writer는 캐시 업데이트만 담당 */ @Slf4j @Component public class AisTargetDataWriter extends BaseWriter { - private final AisTargetRepository aisTargetRepository; private final AisTargetCacheManager cacheManager; private final AisClassTypeClassifier classTypeClassifier; public AisTargetDataWriter( - AisTargetRepository aisTargetRepository, AisTargetCacheManager cacheManager, AisClassTypeClassifier classTypeClassifier) { super("AisTarget"); - this.aisTargetRepository = aisTargetRepository; this.cacheManager = cacheManager; this.classTypeClassifier = classTypeClassifier; } @Override protected void writeItems(List items) throws Exception { - log.debug("AIS Target 데이터 저장 시작: {} 건", items.size()); + log.debug("AIS Target 캐시 업데이트 시작: {} 건", items.size()); - // 1. DB 저장 (classType 없이 원본 데이터만 저장) - aisTargetRepository.batchUpsert(items); - - // 2. ClassType 분류 (캐시 저장 전에 분류) + // 1. ClassType 분류 (캐시 저장 전에 분류) // - Core20 캐시의 IMO와 매칭하여 classType(A/B), core20Mmsi 설정 classTypeClassifier.classifyAll(items); - // 3. 캐시 업데이트 (classType, core20Mmsi 포함) + // 2. 캐시 업데이트 (classType, core20Mmsi 포함) cacheManager.putAll(items); - log.debug("AIS Target 데이터 저장 완료: {} 건 (캐시 크기: {})", + log.debug("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})", items.size(), cacheManager.size()); } } diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java new file mode 100644 index 0000000..48e723a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/config/AisTargetDbSyncJobConfig.java @@ -0,0 +1,79 @@ +package com.snp.batch.jobs.aistargetdbsync.batch.config; + +import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.AisTargetDbSyncTasklet; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * AIS Target DB Sync Job Config + * + * 스케줄: 매 15분 (0 0/15 * * * ?) + * API: 없음 (캐시 기반) + * + * 동작: + * - Caffeine 캐시에서 최근 15분 이내 데이터 조회 + * - MMSI별 최신 위치 1건씩 DB에 UPSERT + * - 1분 주기 aisTargetImportJob과 분리하여 DB 볼륨 최적화 + * + * 데이터 흐름: + * - aisTargetImportJob (1분): API → 캐시 업데이트 + * - aisTargetDbSyncJob (15분): 캐시 → DB 저장 (이 Job) + */ +@Slf4j +@Configuration +public class AisTargetDbSyncJobConfig { + + private final JobRepository jobRepository; + private final PlatformTransactionManager transactionManager; + private final AisTargetDbSyncTasklet aisTargetDbSyncTasklet; + + public AisTargetDbSyncJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + AisTargetDbSyncTasklet aisTargetDbSyncTasklet) { + this.jobRepository = jobRepository; + this.transactionManager = transactionManager; + this.aisTargetDbSyncTasklet = aisTargetDbSyncTasklet; + } + + @Bean(name = "aisTargetDbSyncStep") + public Step aisTargetDbSyncStep() { + return new StepBuilder("aisTargetDbSyncStep", jobRepository) + .tasklet(aisTargetDbSyncTasklet, transactionManager) + .build(); + } + + @Bean(name = "aisTargetDbSyncJob") + public Job aisTargetDbSyncJob() { + log.info("Job 생성: aisTargetDbSyncJob"); + + return new JobBuilder("aisTargetDbSyncJob", jobRepository) + .listener(new JobExecutionListener() { + @Override + public void beforeJob(JobExecution jobExecution) { + log.info("[aisTargetDbSyncJob] DB Sync Job 시작"); + } + + @Override + public void afterJob(JobExecution jobExecution) { + long writeCount = jobExecution.getStepExecutions().stream() + .mapToLong(se -> se.getWriteCount()) + .sum(); + + log.info("[aisTargetDbSyncJob] DB Sync Job 완료 - 상태: {}, 저장 건수: {}", + jobExecution.getStatus(), writeCount); + } + }) + .start(aisTargetDbSyncStep()) + .build(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/AisTargetDbSyncTasklet.java b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/AisTargetDbSyncTasklet.java new file mode 100644 index 0000000..ffe5d9f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistargetdbsync/batch/tasklet/AisTargetDbSyncTasklet.java @@ -0,0 +1,83 @@ +package com.snp.batch.jobs.aistargetdbsync.batch.tasklet; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository; +import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * AIS Target DB Sync Tasklet + * + * 스케줄: 매 15분 (0 0/15 * * * ?) + * + * 동작: + * - Caffeine 캐시에서 최근 N분 이내 데이터 조회 + * - MMSI별 최신 위치 1건씩 DB에 UPSERT + * - 캐시의 모든 컬럼 정보를 그대로 DB에 저장 + * + * 참고: + * - 캐시에는 MMSI별 최신 데이터만 유지됨 (120분 TTL) + * - DB 저장은 15분 주기로 수행하여 볼륨 절감 + * - 기존 aisTargetImportJob은 캐시 업데이트만 수행 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class AisTargetDbSyncTasklet implements Tasklet { + + private final AisTargetCacheManager cacheManager; + private final AisTargetRepository aisTargetRepository; + + /** + * DB 동기화 시 조회할 캐시 데이터 시간 범위 (분) + * 기본값: 15분 (스케줄 주기와 동일) + */ + @Value("${app.batch.ais-target-db-sync.time-range-minutes:15}") + private int timeRangeMinutes; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + log.info("========================================"); + log.info("AIS Target DB Sync 시작"); + log.info("조회 범위: 최근 {}분", timeRangeMinutes); + log.info("현재 캐시 크기: {}", cacheManager.size()); + log.info("========================================"); + + long startTime = System.currentTimeMillis(); + + // 1. 캐시에서 최근 N분 이내 데이터 조회 + List entities = cacheManager.getByTimeRange(timeRangeMinutes); + + if (entities.isEmpty()) { + log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", timeRangeMinutes); + return RepeatStatus.FINISHED; + } + + log.info("캐시에서 {} 건 조회 완료", entities.size()); + + // 2. DB에 UPSERT + aisTargetRepository.batchUpsert(entities); + + long elapsed = System.currentTimeMillis() - startTime; + + log.info("========================================"); + log.info("AIS Target DB Sync 완료"); + log.info("저장 건수: {} 건", entities.size()); + log.info("소요 시간: {}ms", elapsed); + log.info("========================================"); + + // Step 통계 업데이트 + contribution.incrementWriteCount(entities.size()); + + return RepeatStatus.FINISHED; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 39aaf01..bb78857 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -92,12 +92,19 @@ app: enabled: true cron: "0 0 * * * ?" # Every hour - # AIS Target 배치 설정 + # AIS Target Import 배치 설정 (캐시 업데이트 전용) ais-target: since-seconds: 60 # API 조회 범위 (초) chunk-size: 5000 # 배치 청크 크기 schedule: cron: "15 * * * * ?" # 매 분 15초 실행 + + # AIS Target DB Sync 배치 설정 (캐시 → DB 저장) + ais-target-db-sync: + time-range-minutes: 15 # 캐시에서 조회할 시간 범위 (분) + schedule: + cron: "0 0/15 * * * ?" # 매 15분 정각 실행 (00, 15, 30, 45분) + # AIS Target 캐시 설정 ais-target-cache: ttl-minutes: 120 # 캐시 TTL (분) - 2시간