✨ AIS Target DB Sync Job 분리 (캐시→DB 15분 주기)
- AisTargetDataWriter: DB 저장 제거, 캐시 업데이트만 수행 - AisTargetDbSyncJob 신규 생성: 15분 주기 캐시→DB 동기화 - AisTargetDbSyncTasklet: 캐시에서 최근 15분 데이터 조회 후 UPSERT - application.yml: ais-target-db-sync 설정 추가 데이터 흐름 변경: - 기존: API(1분) → 캐시 + DB (매분 33K 건 저장) - 변경: API(1분) → 캐시만, DB는 15분마다 MMSI별 최신 1건 저장 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
부모
1ab78e881f
커밋
49d2de1965
@ -2,7 +2,6 @@ package com.snp.batch.jobs.aistarget.batch.writer;
|
|||||||
|
|
||||||
import com.snp.batch.common.batch.writer.BaseWriter;
|
import com.snp.batch.common.batch.writer.BaseWriter;
|
||||||
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository;
|
|
||||||
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
||||||
import com.snp.batch.jobs.aistarget.classifier.AisClassTypeClassifier;
|
import com.snp.batch.jobs.aistarget.classifier.AisClassTypeClassifier;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -11,46 +10,43 @@ import org.springframework.stereotype.Component;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* AIS Target 데이터 Writer
|
* AIS Target 데이터 Writer (캐시 전용)
|
||||||
*
|
*
|
||||||
* 동작:
|
* 동작:
|
||||||
* 1. UPSERT 방식으로 DB 저장 (PK: mmsi + message_timestamp)
|
* 1. ClassType 분류 (Core20 캐시 기반 A/B 분류)
|
||||||
* 2. ClassType 분류 (Core20 캐시 기반 A/B 분류)
|
* 2. 캐시에 최신 위치 정보 업데이트 (classType, core20Mmsi 포함)
|
||||||
* 3. 캐시에 최신 위치 정보 업데이트 (classType, core20Mmsi 포함)
|
*
|
||||||
|
* 참고:
|
||||||
|
* - DB 저장은 별도 Job(aisTargetDbSyncJob)에서 15분 주기로 수행
|
||||||
|
* - 이 Writer는 캐시 업데이트만 담당
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
|
public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
|
||||||
|
|
||||||
private final AisTargetRepository aisTargetRepository;
|
|
||||||
private final AisTargetCacheManager cacheManager;
|
private final AisTargetCacheManager cacheManager;
|
||||||
private final AisClassTypeClassifier classTypeClassifier;
|
private final AisClassTypeClassifier classTypeClassifier;
|
||||||
|
|
||||||
public AisTargetDataWriter(
|
public AisTargetDataWriter(
|
||||||
AisTargetRepository aisTargetRepository,
|
|
||||||
AisTargetCacheManager cacheManager,
|
AisTargetCacheManager cacheManager,
|
||||||
AisClassTypeClassifier classTypeClassifier) {
|
AisClassTypeClassifier classTypeClassifier) {
|
||||||
super("AisTarget");
|
super("AisTarget");
|
||||||
this.aisTargetRepository = aisTargetRepository;
|
|
||||||
this.cacheManager = cacheManager;
|
this.cacheManager = cacheManager;
|
||||||
this.classTypeClassifier = classTypeClassifier;
|
this.classTypeClassifier = classTypeClassifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void writeItems(List<AisTargetEntity> items) throws Exception {
|
protected void writeItems(List<AisTargetEntity> items) throws Exception {
|
||||||
log.debug("AIS Target 데이터 저장 시작: {} 건", items.size());
|
log.debug("AIS Target 캐시 업데이트 시작: {} 건", items.size());
|
||||||
|
|
||||||
// 1. DB 저장 (classType 없이 원본 데이터만 저장)
|
// 1. ClassType 분류 (캐시 저장 전에 분류)
|
||||||
aisTargetRepository.batchUpsert(items);
|
|
||||||
|
|
||||||
// 2. ClassType 분류 (캐시 저장 전에 분류)
|
|
||||||
// - Core20 캐시의 IMO와 매칭하여 classType(A/B), core20Mmsi 설정
|
// - Core20 캐시의 IMO와 매칭하여 classType(A/B), core20Mmsi 설정
|
||||||
classTypeClassifier.classifyAll(items);
|
classTypeClassifier.classifyAll(items);
|
||||||
|
|
||||||
// 3. 캐시 업데이트 (classType, core20Mmsi 포함)
|
// 2. 캐시 업데이트 (classType, core20Mmsi 포함)
|
||||||
cacheManager.putAll(items);
|
cacheManager.putAll(items);
|
||||||
|
|
||||||
log.debug("AIS Target 데이터 저장 완료: {} 건 (캐시 크기: {})",
|
log.debug("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})",
|
||||||
items.size(), cacheManager.size());
|
items.size(), cacheManager.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,79 @@
|
|||||||
|
package com.snp.batch.jobs.aistargetdbsync.batch.config;
|
||||||
|
|
||||||
|
import com.snp.batch.jobs.aistargetdbsync.batch.tasklet.AisTargetDbSyncTasklet;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.JobExecution;
|
||||||
|
import org.springframework.batch.core.JobExecutionListener;
|
||||||
|
import org.springframework.batch.core.Step;
|
||||||
|
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||||
|
import org.springframework.batch.core.repository.JobRepository;
|
||||||
|
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* AIS Target DB Sync Job Config
|
||||||
|
*
|
||||||
|
* 스케줄: 매 15분 (0 0/15 * * * ?)
|
||||||
|
* API: 없음 (캐시 기반)
|
||||||
|
*
|
||||||
|
* 동작:
|
||||||
|
* - Caffeine 캐시에서 최근 15분 이내 데이터 조회
|
||||||
|
* - MMSI별 최신 위치 1건씩 DB에 UPSERT
|
||||||
|
* - 1분 주기 aisTargetImportJob과 분리하여 DB 볼륨 최적화
|
||||||
|
*
|
||||||
|
* 데이터 흐름:
|
||||||
|
* - aisTargetImportJob (1분): API → 캐시 업데이트
|
||||||
|
* - aisTargetDbSyncJob (15분): 캐시 → DB 저장 (이 Job)
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Configuration
|
||||||
|
public class AisTargetDbSyncJobConfig {
|
||||||
|
|
||||||
|
private final JobRepository jobRepository;
|
||||||
|
private final PlatformTransactionManager transactionManager;
|
||||||
|
private final AisTargetDbSyncTasklet aisTargetDbSyncTasklet;
|
||||||
|
|
||||||
|
public AisTargetDbSyncJobConfig(
|
||||||
|
JobRepository jobRepository,
|
||||||
|
PlatformTransactionManager transactionManager,
|
||||||
|
AisTargetDbSyncTasklet aisTargetDbSyncTasklet) {
|
||||||
|
this.jobRepository = jobRepository;
|
||||||
|
this.transactionManager = transactionManager;
|
||||||
|
this.aisTargetDbSyncTasklet = aisTargetDbSyncTasklet;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "aisTargetDbSyncStep")
|
||||||
|
public Step aisTargetDbSyncStep() {
|
||||||
|
return new StepBuilder("aisTargetDbSyncStep", jobRepository)
|
||||||
|
.tasklet(aisTargetDbSyncTasklet, transactionManager)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "aisTargetDbSyncJob")
|
||||||
|
public Job aisTargetDbSyncJob() {
|
||||||
|
log.info("Job 생성: aisTargetDbSyncJob");
|
||||||
|
|
||||||
|
return new JobBuilder("aisTargetDbSyncJob", jobRepository)
|
||||||
|
.listener(new JobExecutionListener() {
|
||||||
|
@Override
|
||||||
|
public void beforeJob(JobExecution jobExecution) {
|
||||||
|
log.info("[aisTargetDbSyncJob] DB Sync Job 시작");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterJob(JobExecution jobExecution) {
|
||||||
|
long writeCount = jobExecution.getStepExecutions().stream()
|
||||||
|
.mapToLong(se -> se.getWriteCount())
|
||||||
|
.sum();
|
||||||
|
|
||||||
|
log.info("[aisTargetDbSyncJob] DB Sync Job 완료 - 상태: {}, 저장 건수: {}",
|
||||||
|
jobExecution.getStatus(), writeCount);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.start(aisTargetDbSyncStep())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,83 @@
|
|||||||
|
package com.snp.batch.jobs.aistargetdbsync.batch.tasklet;
|
||||||
|
|
||||||
|
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
|
||||||
|
import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository;
|
||||||
|
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.batch.core.StepContribution;
|
||||||
|
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||||
|
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||||
|
import org.springframework.batch.repeat.RepeatStatus;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* AIS Target DB Sync Tasklet
|
||||||
|
*
|
||||||
|
* 스케줄: 매 15분 (0 0/15 * * * ?)
|
||||||
|
*
|
||||||
|
* 동작:
|
||||||
|
* - Caffeine 캐시에서 최근 N분 이내 데이터 조회
|
||||||
|
* - MMSI별 최신 위치 1건씩 DB에 UPSERT
|
||||||
|
* - 캐시의 모든 컬럼 정보를 그대로 DB에 저장
|
||||||
|
*
|
||||||
|
* 참고:
|
||||||
|
* - 캐시에는 MMSI별 최신 데이터만 유지됨 (120분 TTL)
|
||||||
|
* - DB 저장은 15분 주기로 수행하여 볼륨 절감
|
||||||
|
* - 기존 aisTargetImportJob은 캐시 업데이트만 수행
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class AisTargetDbSyncTasklet implements Tasklet {
|
||||||
|
|
||||||
|
private final AisTargetCacheManager cacheManager;
|
||||||
|
private final AisTargetRepository aisTargetRepository;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DB 동기화 시 조회할 캐시 데이터 시간 범위 (분)
|
||||||
|
* 기본값: 15분 (스케줄 주기와 동일)
|
||||||
|
*/
|
||||||
|
@Value("${app.batch.ais-target-db-sync.time-range-minutes:15}")
|
||||||
|
private int timeRangeMinutes;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
|
||||||
|
log.info("========================================");
|
||||||
|
log.info("AIS Target DB Sync 시작");
|
||||||
|
log.info("조회 범위: 최근 {}분", timeRangeMinutes);
|
||||||
|
log.info("현재 캐시 크기: {}", cacheManager.size());
|
||||||
|
log.info("========================================");
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// 1. 캐시에서 최근 N분 이내 데이터 조회
|
||||||
|
List<AisTargetEntity> entities = cacheManager.getByTimeRange(timeRangeMinutes);
|
||||||
|
|
||||||
|
if (entities.isEmpty()) {
|
||||||
|
log.warn("캐시에서 조회된 데이터가 없습니다 (범위: {}분)", timeRangeMinutes);
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("캐시에서 {} 건 조회 완료", entities.size());
|
||||||
|
|
||||||
|
// 2. DB에 UPSERT
|
||||||
|
aisTargetRepository.batchUpsert(entities);
|
||||||
|
|
||||||
|
long elapsed = System.currentTimeMillis() - startTime;
|
||||||
|
|
||||||
|
log.info("========================================");
|
||||||
|
log.info("AIS Target DB Sync 완료");
|
||||||
|
log.info("저장 건수: {} 건", entities.size());
|
||||||
|
log.info("소요 시간: {}ms", elapsed);
|
||||||
|
log.info("========================================");
|
||||||
|
|
||||||
|
// Step 통계 업데이트
|
||||||
|
contribution.incrementWriteCount(entities.size());
|
||||||
|
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -92,12 +92,19 @@ app:
|
|||||||
enabled: true
|
enabled: true
|
||||||
cron: "0 0 * * * ?" # Every hour
|
cron: "0 0 * * * ?" # Every hour
|
||||||
|
|
||||||
# AIS Target 배치 설정
|
# AIS Target Import 배치 설정 (캐시 업데이트 전용)
|
||||||
ais-target:
|
ais-target:
|
||||||
since-seconds: 60 # API 조회 범위 (초)
|
since-seconds: 60 # API 조회 범위 (초)
|
||||||
chunk-size: 5000 # 배치 청크 크기
|
chunk-size: 5000 # 배치 청크 크기
|
||||||
schedule:
|
schedule:
|
||||||
cron: "15 * * * * ?" # 매 분 15초 실행
|
cron: "15 * * * * ?" # 매 분 15초 실행
|
||||||
|
|
||||||
|
# AIS Target DB Sync 배치 설정 (캐시 → DB 저장)
|
||||||
|
ais-target-db-sync:
|
||||||
|
time-range-minutes: 15 # 캐시에서 조회할 시간 범위 (분)
|
||||||
|
schedule:
|
||||||
|
cron: "0 0/15 * * * ?" # 매 15분 정각 실행 (00, 15, 30, 45분)
|
||||||
|
|
||||||
# AIS Target 캐시 설정
|
# AIS Target 캐시 설정
|
||||||
ais-target-cache:
|
ais-target-cache:
|
||||||
ttl-minutes: 120 # 캐시 TTL (분) - 2시간
|
ttl-minutes: 120 # 캐시 TTL (분) - 2시간
|
||||||
|
|||||||
불러오는 중...
Reference in New Issue
Block a user