diff --git a/pom.xml b/pom.xml index ca2538e..c7b1736 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,20 @@ 2.3.0 + + + com.github.ben-manes.caffeine + caffeine + 3.1.8 + + + + + org.locationtech.jts + jts-core + 1.19.0 + + org.springframework.boot diff --git a/src/main/java/com/snp/batch/api/logging/ApiAccessLoggingFilter.java b/src/main/java/com/snp/batch/api/logging/ApiAccessLoggingFilter.java new file mode 100644 index 0000000..2322337 --- /dev/null +++ b/src/main/java/com/snp/batch/api/logging/ApiAccessLoggingFilter.java @@ -0,0 +1,149 @@ +package com.snp.batch.api.logging; + +import jakarta.servlet.FilterChain; +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import org.springframework.web.filter.OncePerRequestFilter; +import org.springframework.web.util.ContentCachingRequestWrapper; +import org.springframework.web.util.ContentCachingResponseWrapper; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +/** + * API 요청/응답 로깅 필터 + * + * 로그 파일: logs/api-access.log + * 기록 내용: 요청 IP, HTTP Method, URI, 파라미터, 응답 상태, 처리 시간 + */ +@Slf4j +@Component +@Order(Ordered.HIGHEST_PRECEDENCE) +public class ApiAccessLoggingFilter extends OncePerRequestFilter { + + private static final int MAX_PAYLOAD_LENGTH = 1000; + + @Override + protected void doFilterInternal(HttpServletRequest request, + HttpServletResponse response, + FilterChain filterChain) throws ServletException, IOException { + + // 정적 리소스 및 actuator 제외 + String uri = request.getRequestURI(); + if (shouldSkip(uri)) { + filterChain.doFilter(request, response); + return; + } + + // 요청 래핑 (body 읽기용) + ContentCachingRequestWrapper requestWrapper = new ContentCachingRequestWrapper(request); + ContentCachingResponseWrapper responseWrapper = new ContentCachingResponseWrapper(response); + + String requestId = UUID.randomUUID().toString().substring(0, 8); + long startTime = System.currentTimeMillis(); + + try { + filterChain.doFilter(requestWrapper, responseWrapper); + } finally { + long duration = System.currentTimeMillis() - startTime; + logRequest(requestId, requestWrapper, responseWrapper, duration); + responseWrapper.copyBodyToResponse(); + } + } + + private boolean shouldSkip(String uri) { + return uri.startsWith("/actuator") + || uri.startsWith("/css") + || uri.startsWith("/js") + || uri.startsWith("/images") + || uri.startsWith("/favicon") + || uri.endsWith(".html") + || uri.endsWith(".css") + || uri.endsWith(".js") + || uri.endsWith(".ico"); + } + + private void logRequest(String requestId, + ContentCachingRequestWrapper request, + ContentCachingResponseWrapper response, + long duration) { + + String clientIp = getClientIp(request); + String method = request.getMethod(); + String uri = request.getRequestURI(); + String queryString = request.getQueryString(); + int status = response.getStatus(); + + StringBuilder logMessage = new StringBuilder(); + logMessage.append(String.format("[%s] %s %s %s", + requestId, clientIp, method, uri)); + + // Query String + if (queryString != null && !queryString.isEmpty()) { + logMessage.append("?").append(truncate(queryString, 200)); + } + + // Request Body (POST/PUT/PATCH) + if (isBodyRequest(method)) { + String body = getRequestBody(request); + if (!body.isEmpty()) { + logMessage.append(" | body=").append(truncate(body, MAX_PAYLOAD_LENGTH)); + } + } + + // Response + logMessage.append(String.format(" | status=%d | %dms", status, duration)); + + // 상태에 따른 로그 레벨 + if (status >= 500) { + log.error(logMessage.toString()); + } else if (status >= 400) { + log.warn(logMessage.toString()); + } else { + log.info(logMessage.toString()); + } + } + + private String getClientIp(HttpServletRequest request) { + String ip = request.getHeader("X-Forwarded-For"); + if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) { + ip = request.getHeader("X-Real-IP"); + } + if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) { + ip = request.getRemoteAddr(); + } + // 여러 IP가 있는 경우 첫 번째만 + if (ip != null && ip.contains(",")) { + ip = ip.split(",")[0].trim(); + } + return ip; + } + + private boolean isBodyRequest(String method) { + return "POST".equalsIgnoreCase(method) + || "PUT".equalsIgnoreCase(method) + || "PATCH".equalsIgnoreCase(method); + } + + private String getRequestBody(ContentCachingRequestWrapper request) { + byte[] content = request.getContentAsByteArray(); + if (content.length == 0) { + return ""; + } + return new String(content, StandardCharsets.UTF_8) + .replaceAll("\\s+", " ") + .trim(); + } + + private String truncate(String str, int maxLength) { + if (str == null) return ""; + if (str.length() <= maxLength) return str; + return str.substring(0, maxLength) + "..."; + } +} diff --git a/src/main/java/com/snp/batch/common/batch/processor/BaseProcessor.java b/src/main/java/com/snp/batch/common/batch/processor/BaseProcessor.java index 0add9cc..a9ad40c 100644 --- a/src/main/java/com/snp/batch/common/batch/processor/BaseProcessor.java +++ b/src/main/java/com/snp/batch/common/batch/processor/BaseProcessor.java @@ -55,7 +55,7 @@ public abstract class BaseProcessor implements ItemProcessor { return null; } - log.debug("데이터 처리 중: {}", item); +// log.debug("데이터 처리 중: {}", item); return processItem(item); } } diff --git a/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java b/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java index 6923f99..664c8de 100644 --- a/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java +++ b/src/main/java/com/snp/batch/common/batch/reader/BaseApiReader.java @@ -98,6 +98,9 @@ public abstract class BaseApiReader implements ItemReader { public void saveApiInfoToContext(StepExecution stepExecution) { this.stepExecution = stepExecution; + // Reader 상태 초기화 (Job 재실행 시 필수) + resetReaderState(); + // API 정보를 StepExecutionContext에 저장 ExecutionContext context = stepExecution.getExecutionContext(); @@ -140,6 +143,48 @@ public abstract class BaseApiReader implements ItemReader { return ""; } + /** + * Reader 상태 초기화 + * Job 재실행 시 이전 실행의 상태를 클리어하여 새로 데이터를 읽을 수 있도록 함 + */ + private void resetReaderState() { + // Chunk 모드 상태 초기화 + this.currentBatch = null; + this.initialized = false; + + // Legacy 모드 상태 초기화 + this.legacyDataList = null; + this.legacyNextIndex = 0; + + // 통계 초기화 + this.totalApiCalls = 0; + this.completedApiCalls = 0; + + // 하위 클래스 상태 초기화 훅 호출 + resetCustomState(); + + log.debug("[{}] Reader 상태 초기화 완료", getReaderName()); + } + + /** + * 하위 클래스 커스텀 상태 초기화 훅 + * Chunk 모드에서 사용하는 currentBatchIndex, allImoNumbers 등의 필드를 초기화할 때 오버라이드 + * + * 예시: + *
+     * @Override
+     * protected void resetCustomState() {
+     *     this.currentBatchIndex = 0;
+     *     this.allImoNumbers = null;
+     *     this.dbMasterHashes = null;
+     * }
+     * 
+ */ + protected void resetCustomState() { + // 기본 구현: 아무것도 하지 않음 + // 하위 클래스에서 필요 시 오버라이드 + } + /** * API 호출 통계 업데이트 */ diff --git a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java index 4f0d7f9..e954a9c 100644 --- a/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java +++ b/src/main/java/com/snp/batch/global/config/MaritimeApiWebClientConfig.java @@ -64,7 +64,7 @@ public class MaritimeApiWebClientConfig { .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .codecs(configurer -> configurer .defaultCodecs() - .maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 + .maxInMemorySize(30 * 1024 * 1024)) // 30MB 버퍼 .build(); } @@ -80,7 +80,7 @@ public class MaritimeApiWebClientConfig { .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .codecs(configurer -> configurer .defaultCodecs() - .maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 + .maxInMemorySize(50 * 1024 * 1024)) // 50MB 버퍼 (AIS GetTargets 응답 ~20MB+) .build(); } @@ -96,7 +96,7 @@ public class MaritimeApiWebClientConfig { .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .codecs(configurer -> configurer .defaultCodecs() - .maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 + .maxInMemorySize(30 * 1024 * 1024)) // 30MB 버퍼 .build(); } } diff --git a/src/main/java/com/snp/batch/global/partition/PartitionConfig.java b/src/main/java/com/snp/batch/global/partition/PartitionConfig.java new file mode 100644 index 0000000..60bc58f --- /dev/null +++ b/src/main/java/com/snp/batch/global/partition/PartitionConfig.java @@ -0,0 +1,50 @@ +package com.snp.batch.global.partition; + +import lombok.Getter; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 파티션 관리 대상 테이블 설정 + * + * Daily 파티션: 매일 실행 + * Monthly 파티션: 매월 말일에만 실행 + */ +@Getter +@Component +public class PartitionConfig { + + /** + * Daily 파티션 대상 테이블 (파티션 네이밍: {table}_YYYY_MM_DD) + */ + private final List dailyPartitionTables = List.of( + // 추후 daily 파티션 테이블 추가 + ); + + /** + * Monthly 파티션 대상 테이블 (파티션 네이밍: {table}_YYYY_MM) + */ + private final List monthlyPartitionTables = List.of( + new PartitionTableInfo( + "snp_data", + "ais_target", + "message_timestamp", + 2 // 미리 생성할 개월 수 + ) + ); + + /** + * 파티션 테이블 정보 + */ + public record PartitionTableInfo( + String schema, + String tableName, + String partitionColumn, + int periodsAhead // 미리 생성할 기간 수 (daily: 일, monthly: 월) + ) { + public String getFullTableName() { + return schema + "." + tableName; + } + } +} diff --git a/src/main/java/com/snp/batch/global/partition/PartitionManagerJobConfig.java b/src/main/java/com/snp/batch/global/partition/PartitionManagerJobConfig.java new file mode 100644 index 0000000..ca132c8 --- /dev/null +++ b/src/main/java/com/snp/batch/global/partition/PartitionManagerJobConfig.java @@ -0,0 +1,68 @@ +package com.snp.batch.global.partition; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * 파티션 관리 Job Config + * + * 스케줄: 매일 00:10 (0 10 0 * * ?) + * + * 동작: + * - Daily 파티션: 매일 실행 + * - Monthly 파티션: 매월 말일에만 실행 (Job 내부에서 말일 감지) + */ +@Slf4j +@Configuration +public class PartitionManagerJobConfig { + + private final JobRepository jobRepository; + private final PlatformTransactionManager transactionManager; + private final PartitionManagerTasklet partitionManagerTasklet; + + public PartitionManagerJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + PartitionManagerTasklet partitionManagerTasklet) { + this.jobRepository = jobRepository; + this.transactionManager = transactionManager; + this.partitionManagerTasklet = partitionManagerTasklet; + } + + @Bean(name = "partitionManagerStep") + public Step partitionManagerStep() { + return new StepBuilder("partitionManagerStep", jobRepository) + .tasklet(partitionManagerTasklet, transactionManager) + .build(); + } + + @Bean(name = "partitionManagerJob") + public Job partitionManagerJob() { + log.info("Job 생성: partitionManagerJob"); + + return new JobBuilder("partitionManagerJob", jobRepository) + .listener(new JobExecutionListener() { + @Override + public void beforeJob(JobExecution jobExecution) { + log.info("[partitionManagerJob] 파티션 관리 Job 시작"); + } + + @Override + public void afterJob(JobExecution jobExecution) { + log.info("[partitionManagerJob] 파티션 관리 Job 완료 - 상태: {}", + jobExecution.getStatus()); + } + }) + .start(partitionManagerStep()) + .build(); + } +} diff --git a/src/main/java/com/snp/batch/global/partition/PartitionManagerTasklet.java b/src/main/java/com/snp/batch/global/partition/PartitionManagerTasklet.java new file mode 100644 index 0000000..e904116 --- /dev/null +++ b/src/main/java/com/snp/batch/global/partition/PartitionManagerTasklet.java @@ -0,0 +1,220 @@ +package com.snp.batch.global.partition; + +import com.snp.batch.global.partition.PartitionConfig.PartitionTableInfo; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; +import java.time.YearMonth; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; + +/** + * 파티션 관리 Tasklet + * + * 스케줄: 매일 실행 + * - Daily 파티션: 매일 생성 + * - Monthly 파티션: 매월 말일에만 생성 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class PartitionManagerTasklet implements Tasklet { + + private final JdbcTemplate jdbcTemplate; + private final PartitionConfig partitionConfig; + + private static final String PARTITION_EXISTS_SQL = """ + SELECT EXISTS ( + SELECT 1 FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = ? + AND c.relname = ? + AND c.relkind = 'r' + ) + """; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + LocalDate today = LocalDate.now(); + boolean isLastDayOfMonth = isLastDayOfMonth(today); + + log.info("========================================"); + log.info("파티션 관리 Job 시작"); + log.info("실행 일자: {}", today); + log.info("월 말일 여부: {}", isLastDayOfMonth); + log.info("========================================"); + + // Daily 파티션 처리 (매일) + processDailyPartitions(today); + + // Monthly 파티션 처리 (매월 말일만) + if (isLastDayOfMonth) { + processMonthlyPartitions(today); + } else { + log.info("Monthly 파티션: 말일이 아니므로 스킵"); + } + + log.info("========================================"); + log.info("파티션 관리 Job 완료"); + log.info("========================================"); + + return RepeatStatus.FINISHED; + } + + /** + * 매월 말일 여부 확인 + */ + private boolean isLastDayOfMonth(LocalDate date) { + return date.getDayOfMonth() == YearMonth.from(date).lengthOfMonth(); + } + + /** + * Daily 파티션 처리 + */ + private void processDailyPartitions(LocalDate today) { + List tables = partitionConfig.getDailyPartitionTables(); + + if (tables.isEmpty()) { + log.info("Daily 파티션: 대상 테이블 없음"); + return; + } + + log.info("Daily 파티션 처리 시작: {} 개 테이블", tables.size()); + + for (PartitionTableInfo table : tables) { + processDailyPartition(table, today); + } + } + + /** + * 개별 Daily 파티션 생성 + */ + private void processDailyPartition(PartitionTableInfo table, LocalDate today) { + List created = new ArrayList<>(); + List skipped = new ArrayList<>(); + + for (int i = 0; i <= table.periodsAhead(); i++) { + LocalDate targetDate = today.plusDays(i); + String partitionName = getDailyPartitionName(table.tableName(), targetDate); + + if (partitionExists(table.schema(), partitionName)) { + skipped.add(partitionName); + } else { + createDailyPartition(table, targetDate, partitionName); + created.add(partitionName); + } + } + + log.info("[{}] Daily 파티션 - 생성: {}, 스킵: {}", + table.tableName(), created.size(), skipped.size()); + } + + /** + * Monthly 파티션 처리 + */ + private void processMonthlyPartitions(LocalDate today) { + List tables = partitionConfig.getMonthlyPartitionTables(); + + if (tables.isEmpty()) { + log.info("Monthly 파티션: 대상 테이블 없음"); + return; + } + + log.info("Monthly 파티션 처리 시작: {} 개 테이블", tables.size()); + + for (PartitionTableInfo table : tables) { + processMonthlyPartition(table, today); + } + } + + /** + * 개별 Monthly 파티션 생성 + */ + private void processMonthlyPartition(PartitionTableInfo table, LocalDate today) { + List created = new ArrayList<>(); + List skipped = new ArrayList<>(); + + for (int i = 0; i <= table.periodsAhead(); i++) { + LocalDate targetDate = today.plusMonths(i).withDayOfMonth(1); + String partitionName = getMonthlyPartitionName(table.tableName(), targetDate); + + if (partitionExists(table.schema(), partitionName)) { + skipped.add(partitionName); + } else { + createMonthlyPartition(table, targetDate, partitionName); + created.add(partitionName); + } + } + + log.info("[{}] Monthly 파티션 - 생성: {}, 스킵: {}", + table.tableName(), created.size(), skipped.size()); + if (!created.isEmpty()) { + log.info("[{}] 생성된 파티션: {}", table.tableName(), created); + } + } + + /** + * Daily 파티션 이름 생성 (table_YYYY_MM_DD) + */ + private String getDailyPartitionName(String tableName, LocalDate date) { + return tableName + "_" + date.format(DateTimeFormatter.ofPattern("yyyy_MM_dd")); + } + + /** + * Monthly 파티션 이름 생성 (table_YYYY_MM) + */ + private String getMonthlyPartitionName(String tableName, LocalDate date) { + return tableName + "_" + date.format(DateTimeFormatter.ofPattern("yyyy_MM")); + } + + /** + * 파티션 존재 여부 확인 + */ + private boolean partitionExists(String schema, String partitionName) { + Boolean exists = jdbcTemplate.queryForObject(PARTITION_EXISTS_SQL, Boolean.class, schema, partitionName); + return Boolean.TRUE.equals(exists); + } + + /** + * Daily 파티션 생성 + */ + private void createDailyPartition(PartitionTableInfo table, LocalDate targetDate, String partitionName) { + LocalDate endDate = targetDate.plusDays(1); + + String sql = String.format(""" + CREATE TABLE %s.%s PARTITION OF %s + FOR VALUES FROM ('%s 00:00:00+00') TO ('%s 00:00:00+00') + """, + table.schema(), partitionName, table.getFullTableName(), + targetDate, endDate); + + jdbcTemplate.execute(sql); + log.debug("Daily 파티션 생성: {}", partitionName); + } + + /** + * Monthly 파티션 생성 + */ + private void createMonthlyPartition(PartitionTableInfo table, LocalDate targetDate, String partitionName) { + LocalDate startDate = targetDate.withDayOfMonth(1); + LocalDate endDate = startDate.plusMonths(1); + + String sql = String.format(""" + CREATE TABLE %s.%s PARTITION OF %s + FOR VALUES FROM ('%s 00:00:00+00') TO ('%s 00:00:00+00') + """, + table.schema(), partitionName, table.getFullTableName(), + startDate, endDate); + + jdbcTemplate.execute(sql); + log.debug("Monthly 파티션 생성: {}", partitionName); + } +} diff --git a/src/main/java/com/snp/batch/global/repository/TimelineRepository.java b/src/main/java/com/snp/batch/global/repository/TimelineRepository.java index 2c28809..7b7b7f5 100644 --- a/src/main/java/com/snp/batch/global/repository/TimelineRepository.java +++ b/src/main/java/com/snp/batch/global/repository/TimelineRepository.java @@ -1,6 +1,6 @@ package com.snp.batch.global.repository; -import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; @@ -13,10 +13,25 @@ import java.util.Map; * Step Context 등 불필요한 데이터를 조회하지 않고 필요한 정보만 가져옴 */ @Repository -@RequiredArgsConstructor public class TimelineRepository { private final JdbcTemplate jdbcTemplate; + private final String tablePrefix; + + public TimelineRepository( + JdbcTemplate jdbcTemplate, + @Value("${spring.batch.jdbc.table-prefix:BATCH_}") String tablePrefix) { + this.jdbcTemplate = jdbcTemplate; + this.tablePrefix = tablePrefix; + } + + private String getJobExecutionTable() { + return tablePrefix + "JOB_EXECUTION"; + } + + private String getJobInstanceTable() { + return tablePrefix + "JOB_INSTANCE"; + } /** * 특정 Job의 특정 범위 내 실행 이력 조회 (경량) @@ -27,19 +42,19 @@ public class TimelineRepository { LocalDateTime startTime, LocalDateTime endTime) { - String sql = """ + String sql = String.format(""" SELECT je.JOB_EXECUTION_ID as executionId, je.STATUS as status, je.START_TIME as startTime, je.END_TIME as endTime - FROM BATCH_JOB_EXECUTION je - INNER JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID + FROM %s je + INNER JOIN %s ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID WHERE ji.JOB_NAME = ? AND je.START_TIME >= ? AND je.START_TIME < ? ORDER BY je.START_TIME DESC - """; + """, getJobExecutionTable(), getJobInstanceTable()); return jdbcTemplate.queryForList(sql, jobName, startTime, endTime); } @@ -51,19 +66,19 @@ public class TimelineRepository { LocalDateTime startTime, LocalDateTime endTime) { - String sql = """ + String sql = String.format(""" SELECT ji.JOB_NAME as jobName, je.JOB_EXECUTION_ID as executionId, je.STATUS as status, je.START_TIME as startTime, je.END_TIME as endTime - FROM BATCH_JOB_EXECUTION je - INNER JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID + FROM %s je + INNER JOIN %s ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID WHERE je.START_TIME >= ? AND je.START_TIME < ? ORDER BY ji.JOB_NAME, je.START_TIME DESC - """; + """, getJobExecutionTable(), getJobInstanceTable()); return jdbcTemplate.queryForList(sql, startTime, endTime); } @@ -72,17 +87,17 @@ public class TimelineRepository { * 현재 실행 중인 Job 조회 (STARTED, STARTING 상태) */ public List> findRunningExecutions() { - String sql = """ + String sql = String.format(""" SELECT ji.JOB_NAME as jobName, je.JOB_EXECUTION_ID as executionId, je.STATUS as status, je.START_TIME as startTime - FROM BATCH_JOB_EXECUTION je - INNER JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID + FROM %s je + INNER JOIN %s ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID WHERE je.STATUS IN ('STARTED', 'STARTING') ORDER BY je.START_TIME DESC - """; + """, getJobExecutionTable(), getJobInstanceTable()); return jdbcTemplate.queryForList(sql); } @@ -91,18 +106,18 @@ public class TimelineRepository { * 최근 실행 이력 조회 (상위 N개) */ public List> findRecentExecutions(int limit) { - String sql = """ + String sql = String.format(""" SELECT ji.JOB_NAME as jobName, je.JOB_EXECUTION_ID as executionId, je.STATUS as status, je.START_TIME as startTime, je.END_TIME as endTime - FROM BATCH_JOB_EXECUTION je - INNER JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID + FROM %s je + INNER JOIN %s ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID ORDER BY je.START_TIME DESC LIMIT ? - """; + """, getJobExecutionTable(), getJobInstanceTable()); return jdbcTemplate.queryForList(sql, limit); } diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/config/AisTargetImportJobConfig.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/config/AisTargetImportJobConfig.java new file mode 100644 index 0000000..c26f40a --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/config/AisTargetImportJobConfig.java @@ -0,0 +1,122 @@ +package com.snp.batch.jobs.aistarget.batch.config; + +import com.snp.batch.common.batch.config.BaseJobConfig; +import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto; +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import com.snp.batch.jobs.aistarget.batch.processor.AisTargetDataProcessor; +import com.snp.batch.jobs.aistarget.batch.reader.AisTargetDataReader; +import com.snp.batch.jobs.aistarget.batch.writer.AisTargetDataWriter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.WebClient; + +import java.time.OffsetDateTime; + +/** + * AIS Target Import Job Config + * + * 스케줄: 매 분 15초 (0 15 * * * * ?) + * API: POST /AisSvc.svc/AIS/GetTargets + * 파라미터: {"sinceSeconds": "60"} + * + * 동작: + * - 최근 60초 동안의 전체 선박 위치 정보 수집 + * - 약 33,000건/분 처리 + * - UPSERT 방식으로 DB 저장 + */ +@Slf4j +@Configuration +public class AisTargetImportJobConfig extends BaseJobConfig { + + private final AisTargetDataProcessor aisTargetDataProcessor; + private final AisTargetDataWriter aisTargetDataWriter; + private final WebClient maritimeAisApiWebClient; + + @Value("${app.batch.ais-target.since-seconds:60}") + private int sinceSeconds; + + @Value("${app.batch.ais-target.chunk-size:5000}") + private int chunkSize; + + public AisTargetImportJobConfig( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + AisTargetDataProcessor aisTargetDataProcessor, + AisTargetDataWriter aisTargetDataWriter, + @Qualifier("maritimeAisApiWebClient") WebClient maritimeAisApiWebClient) { + super(jobRepository, transactionManager); + this.aisTargetDataProcessor = aisTargetDataProcessor; + this.aisTargetDataWriter = aisTargetDataWriter; + this.maritimeAisApiWebClient = maritimeAisApiWebClient; + } + + @Override + protected String getJobName() { + return "aisTargetImportJob"; + } + + @Override + protected String getStepName() { + return "aisTargetImportStep"; + } + + @Override + protected ItemReader createReader() { + return new AisTargetDataReader(maritimeAisApiWebClient, sinceSeconds); + } + + @Override + protected ItemProcessor createProcessor() { + return aisTargetDataProcessor; + } + + @Override + protected ItemWriter createWriter() { + return aisTargetDataWriter; + } + + @Override + protected int getChunkSize() { + return chunkSize; + } + + @Override + protected void configureJob(JobBuilder jobBuilder) { + jobBuilder.listener(new JobExecutionListener() { + @Override + public void beforeJob(JobExecution jobExecution) { + // 배치 수집 시점 설정 + OffsetDateTime collectedAt = OffsetDateTime.now(); + aisTargetDataProcessor.setCollectedAt(collectedAt); + log.info("[{}] Job 시작 - 수집 시점: {}", getJobName(), collectedAt); + } + + @Override + public void afterJob(JobExecution jobExecution) { + log.info("[{}] Job 완료 - 상태: {}, 처리 건수: {}", + getJobName(), + jobExecution.getStatus(), + jobExecution.getStepExecutions().stream() + .mapToLong(se -> se.getWriteCount()) + .sum()); + } + }); + } + + @Bean(name = "aisTargetImportJob") + public Job aisTargetImportJob() { + return job(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/dto/AisTargetApiResponse.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/dto/AisTargetApiResponse.java new file mode 100644 index 0000000..5d300c6 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/dto/AisTargetApiResponse.java @@ -0,0 +1,27 @@ +package com.snp.batch.jobs.aistarget.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * AIS GetTargets API 응답 래퍼 + * + * API 응답 구조: + * { + * "targetArr": [...] + * } + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class AisTargetApiResponse { + + @JsonProperty("targetArr") + private List targetArr; +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/dto/AisTargetDto.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/dto/AisTargetDto.java new file mode 100644 index 0000000..017e50e --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/dto/AisTargetDto.java @@ -0,0 +1,135 @@ +package com.snp.batch.jobs.aistarget.batch.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * AIS Target API 응답 DTO + * + * API: POST /AisSvc.svc/AIS/GetTargets + * Request: {"sinceSeconds": "60"} + * Response: {"targetArr": [...]} + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class AisTargetDto { + + @JsonProperty("MMSI") + private Long mmsi; + + @JsonProperty("IMO") + private Long imo; + + @JsonProperty("AgeMinutes") + private Double ageMinutes; + + @JsonProperty("Lat") + private Double lat; + + @JsonProperty("Lon") + private Double lon; + + @JsonProperty("Heading") + private Double heading; + + @JsonProperty("SoG") + private Double sog; // Speed over Ground + + @JsonProperty("CoG") + private Double cog; // Course over Ground (if available) + + @JsonProperty("Width") + private Integer width; + + @JsonProperty("Length") + private Integer length; + + @JsonProperty("Draught") + private Double draught; + + @JsonProperty("Name") + private String name; + + @JsonProperty("Callsign") + private String callsign; + + @JsonProperty("Destination") + private String destination; + + @JsonProperty("ETA") + private String eta; + + @JsonProperty("Status") + private String status; + + @JsonProperty("VesselType") + private String vesselType; + + @JsonProperty("ExtraInfo") + private String extraInfo; + + @JsonProperty("PositionAccuracy") + private Integer positionAccuracy; + + @JsonProperty("RoT") + private Integer rot; // Rate of Turn + + @JsonProperty("TimestampUTC") + private Integer timestampUtc; + + @JsonProperty("RepeatIndicator") + private Integer repeatIndicator; + + @JsonProperty("RAIMFlag") + private Integer raimFlag; + + @JsonProperty("RadioStatus") + private Integer radioStatus; + + @JsonProperty("Regional") + private Integer regional; + + @JsonProperty("Regional2") + private Integer regional2; + + @JsonProperty("Spare") + private Integer spare; + + @JsonProperty("Spare2") + private Integer spare2; + + @JsonProperty("AISVersion") + private Integer aisVersion; + + @JsonProperty("PositionFixType") + private Integer positionFixType; + + @JsonProperty("DTE") + private Integer dte; + + @JsonProperty("BandFlag") + private Integer bandFlag; + + @JsonProperty("ReceivedDate") + private String receivedDate; + + @JsonProperty("MessageTimestamp") + private String messageTimestamp; + + @JsonProperty("LengthBow") + private Integer lengthBow; + + @JsonProperty("LengthStern") + private Integer lengthStern; + + @JsonProperty("WidthPort") + private Integer widthPort; + + @JsonProperty("WidthStarboard") + private Integer widthStarboard; +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/entity/AisTargetEntity.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/entity/AisTargetEntity.java new file mode 100644 index 0000000..57bb322 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/entity/AisTargetEntity.java @@ -0,0 +1,85 @@ +package com.snp.batch.jobs.aistarget.batch.entity; + +import com.snp.batch.common.batch.entity.BaseEntity; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.OffsetDateTime; + +/** + * AIS Target Entity + * + * 테이블: snp_data.ais_target + * PK: mmsi + message_timestamp (복합키) + * + * 용도: + * - 선박 위치 이력 저장 (항적 분석용) + * - 특정 시점/구역 선박 조회 + * - LineString 항적 생성 기반 데이터 + */ +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class AisTargetEntity extends BaseEntity { + + // ========== PK (복합키) ========== + private Long mmsi; + private OffsetDateTime messageTimestamp; + + // ========== 선박 식별 정보 ========== + private Long imo; + private String name; + private String callsign; + private String vesselType; + private String extraInfo; + + // ========== 위치 정보 ========== + private Double lat; + private Double lon; + // geom은 DB에서 ST_SetSRID(ST_MakePoint(lon, lat), 4326)로 생성 + + // ========== 항해 정보 ========== + private Double heading; + private Double sog; // Speed over Ground + private Double cog; // Course over Ground + private Integer rot; // Rate of Turn + + // ========== 선박 제원 ========== + private Integer length; + private Integer width; + private Double draught; + private Integer lengthBow; + private Integer lengthStern; + private Integer widthPort; + private Integer widthStarboard; + + // ========== 목적지 정보 ========== + private String destination; + private OffsetDateTime eta; + private String status; + + // ========== AIS 메시지 정보 ========== + private Double ageMinutes; + private Integer positionAccuracy; + private Integer timestampUtc; + private Integer repeatIndicator; + private Integer raimFlag; + private Integer radioStatus; + private Integer regional; + private Integer regional2; + private Integer spare; + private Integer spare2; + private Integer aisVersion; + private Integer positionFixType; + private Integer dte; + private Integer bandFlag; + + // ========== 타임스탬프 ========== + private OffsetDateTime receivedDate; + private OffsetDateTime collectedAt; // 배치 수집 시점 +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/processor/AisTargetDataProcessor.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/processor/AisTargetDataProcessor.java new file mode 100644 index 0000000..da146f7 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/processor/AisTargetDataProcessor.java @@ -0,0 +1,121 @@ +package com.snp.batch.jobs.aistarget.batch.processor; + +import com.snp.batch.common.batch.processor.BaseProcessor; +import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto; +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; + +/** + * AIS Target 데이터 Processor + * + * DTO → Entity 변환 + * - 타임스탬프 파싱 + * - 필터링 (유효한 위치 정보만) + */ +@Slf4j +@Component +public class AisTargetDataProcessor extends BaseProcessor { + + private static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_DATE_TIME; + + // 배치 수집 시점 (모든 레코드에 동일하게 적용) + private OffsetDateTime collectedAt; + + public void setCollectedAt(OffsetDateTime collectedAt) { + this.collectedAt = collectedAt; + } + + @Override + protected AisTargetEntity processItem(AisTargetDto dto) throws Exception { + // 유효성 검사: MMSI와 위치 정보는 필수 + if (dto.getMmsi() == null || dto.getLat() == null || dto.getLon() == null) { + log.debug("유효하지 않은 데이터 스킵 - MMSI: {}, Lat: {}, Lon: {}", + dto.getMmsi(), dto.getLat(), dto.getLon()); + return null; + } + + // MessageTimestamp 파싱 (PK의 일부) + OffsetDateTime messageTimestamp = parseTimestamp(dto.getMessageTimestamp()); + if (messageTimestamp == null) { + log.debug("MessageTimestamp 파싱 실패 - MMSI: {}, Timestamp: {}", + dto.getMmsi(), dto.getMessageTimestamp()); + return null; + } + + return AisTargetEntity.builder() + // PK + .mmsi(dto.getMmsi()) + .messageTimestamp(messageTimestamp) + // 선박 식별 정보 + .imo(dto.getImo()) + .name(dto.getName()) + .callsign(dto.getCallsign()) + .vesselType(dto.getVesselType()) + .extraInfo(dto.getExtraInfo()) + // 위치 정보 + .lat(dto.getLat()) + .lon(dto.getLon()) + // 항해 정보 + .heading(dto.getHeading()) + .sog(dto.getSog()) + .cog(dto.getCog()) + .rot(dto.getRot()) + // 선박 제원 + .length(dto.getLength()) + .width(dto.getWidth()) + .draught(dto.getDraught()) + .lengthBow(dto.getLengthBow()) + .lengthStern(dto.getLengthStern()) + .widthPort(dto.getWidthPort()) + .widthStarboard(dto.getWidthStarboard()) + // 목적지 정보 + .destination(dto.getDestination()) + .eta(parseEta(dto.getEta())) + .status(dto.getStatus()) + // AIS 메시지 정보 + .ageMinutes(dto.getAgeMinutes()) + .positionAccuracy(dto.getPositionAccuracy()) + .timestampUtc(dto.getTimestampUtc()) + .repeatIndicator(dto.getRepeatIndicator()) + .raimFlag(dto.getRaimFlag()) + .radioStatus(dto.getRadioStatus()) + .regional(dto.getRegional()) + .regional2(dto.getRegional2()) + .spare(dto.getSpare()) + .spare2(dto.getSpare2()) + .aisVersion(dto.getAisVersion()) + .positionFixType(dto.getPositionFixType()) + .dte(dto.getDte()) + .bandFlag(dto.getBandFlag()) + // 타임스탬프 + .receivedDate(parseTimestamp(dto.getReceivedDate())) + .collectedAt(collectedAt != null ? collectedAt : OffsetDateTime.now()) + .build(); + } + + private OffsetDateTime parseTimestamp(String timestamp) { + if (timestamp == null || timestamp.isEmpty()) { + return null; + } + + try { + // ISO 8601 형식 파싱 (예: "2025-12-01T23:55:01.073Z") + return OffsetDateTime.parse(timestamp, ISO_FORMATTER); + } catch (DateTimeParseException e) { + log.trace("타임스탬프 파싱 실패: {}", timestamp); + return null; + } + } + + private OffsetDateTime parseEta(String eta) { + if (eta == null || eta.isEmpty() || "9999-12-31T23:59:59Z".equals(eta)) { + return null; // 유효하지 않은 ETA는 null 처리 + } + return parseTimestamp(eta); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/reader/AisTargetDataReader.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/reader/AisTargetDataReader.java new file mode 100644 index 0000000..7d5aa58 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/reader/AisTargetDataReader.java @@ -0,0 +1,98 @@ +package com.snp.batch.jobs.aistarget.batch.reader; + +import com.snp.batch.common.batch.reader.BaseApiReader; +import com.snp.batch.jobs.aistarget.batch.dto.AisTargetApiResponse; +import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * AIS Target 데이터 Reader + * + * API: POST /AisSvc.svc/AIS/GetTargets + * Request: {"sinceSeconds": "60"} + * + * 동작: + * - 매 분 15초에 실행 (Quartz 스케줄) + * - 최근 60초 동안의 전체 선박 위치 정보 조회 + * - 약 33,000건/분 처리 + */ +@Slf4j +public class AisTargetDataReader extends BaseApiReader { + + private final int sinceSeconds; + + public AisTargetDataReader(WebClient webClient, int sinceSeconds) { + super(webClient); + this.sinceSeconds = sinceSeconds; + } + + @Override + protected String getReaderName() { + return "AisTargetDataReader"; + } + + @Override + protected String getApiPath() { + return "/AisSvc.svc/AIS/GetTargets"; + } + + @Override + protected String getHttpMethod() { + return "POST"; + } + + @Override + protected Object getRequestBody() { + return Map.of("sinceSeconds", String.valueOf(sinceSeconds)); + } + + @Override + protected Class getResponseType() { + return AisTargetApiResponse.class; + } + + @Override + protected void beforeFetch() { + log.info("[{}] AIS GetTargets API 호출 준비 - sinceSeconds: {}", getReaderName(), sinceSeconds); + } + + @Override + protected List fetchDataFromApi() { + try { + log.info("[{}] API 호출 시작: {} {}", getReaderName(), getHttpMethod(), getApiPath()); + + AisTargetApiResponse response = webClient.post() + .uri(getApiPath()) + .bodyValue(getRequestBody()) + .retrieve() + .bodyToMono(AisTargetApiResponse.class) + .block(); + + if (response != null && response.getTargetArr() != null) { + List targets = response.getTargetArr(); + log.info("[{}] API 호출 완료: {} 건 조회", getReaderName(), targets.size()); + updateApiCallStats(1, 1); + return targets; + } else { + log.warn("[{}] API 응답이 비어있습니다", getReaderName()); + return Collections.emptyList(); + } + + } catch (Exception e) { + log.error("[{}] API 호출 실패: {}", getReaderName(), e.getMessage(), e); + return handleApiError(e); + } + } + + @Override + protected void afterFetch(List data) { + if (data != null && !data.isEmpty()) { + log.info("[{}] 데이터 조회 완료 - 총 {} 건", getReaderName(), data.size()); + } + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepository.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepository.java new file mode 100644 index 0000000..afc1505 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepository.java @@ -0,0 +1,59 @@ +package com.snp.batch.jobs.aistarget.batch.repository; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; + +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Optional; + +/** + * AIS Target Repository 인터페이스 + */ +public interface AisTargetRepository { + + /** + * 복합키로 조회 (MMSI + MessageTimestamp) + */ + Optional findByMmsiAndMessageTimestamp(Long mmsi, OffsetDateTime messageTimestamp); + + /** + * MMSI로 최신 위치 조회 + */ + Optional findLatestByMmsi(Long mmsi); + + /** + * 여러 MMSI의 최신 위치 조회 + */ + List findLatestByMmsiIn(List mmsiList); + + /** + * 시간 범위 내 특정 MMSI의 항적 조회 + */ + List findByMmsiAndTimeRange(Long mmsi, OffsetDateTime start, OffsetDateTime end); + + /** + * 시간 범위 + 공간 범위 내 선박 조회 + */ + List findByTimeRangeAndArea( + OffsetDateTime start, + OffsetDateTime end, + Double centerLon, + Double centerLat, + Double radiusMeters + ); + + /** + * 배치 INSERT (UPSERT) + */ + void batchUpsert(List entities); + + /** + * 전체 건수 조회 + */ + long count(); + + /** + * 오래된 데이터 삭제 (보존 기간 이전 데이터) + */ + int deleteOlderThan(OffsetDateTime threshold); +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepositoryImpl.java new file mode 100644 index 0000000..d70e02b --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepositoryImpl.java @@ -0,0 +1,315 @@ +package com.snp.batch.jobs.aistarget.batch.repository; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.Optional; + +/** + * AIS Target Repository 구현체 + * + * 테이블: snp_data.ais_target + * PK: mmsi + message_timestamp (복합키) + */ +@Slf4j +@Repository +@RequiredArgsConstructor +public class AisTargetRepositoryImpl implements AisTargetRepository { + + private final JdbcTemplate jdbcTemplate; + + private static final String TABLE_NAME = "snp_data.ais_target"; + + // ==================== UPSERT SQL ==================== + + private static final String UPSERT_SQL = """ + INSERT INTO snp_data.ais_target ( + mmsi, message_timestamp, imo, name, callsign, vessel_type, extra_info, + lat, lon, geom, + heading, sog, cog, rot, + length, width, draught, length_bow, length_stern, width_port, width_starboard, + destination, eta, status, + age_minutes, position_accuracy, timestamp_utc, repeat_indicator, raim_flag, + radio_status, regional, regional2, spare, spare2, + ais_version, position_fix_type, dte, band_flag, + received_date, collected_at, created_at, updated_at + ) VALUES ( + ?, ?, ?, ?, ?, ?, ?, + ?, ?, ST_SetSRID(ST_MakePoint(?, ?), 4326), + ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, + ?, ?, NOW(), NOW() + ) + ON CONFLICT (mmsi, message_timestamp) DO UPDATE SET + imo = EXCLUDED.imo, + name = EXCLUDED.name, + callsign = EXCLUDED.callsign, + vessel_type = EXCLUDED.vessel_type, + extra_info = EXCLUDED.extra_info, + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + geom = EXCLUDED.geom, + heading = EXCLUDED.heading, + sog = EXCLUDED.sog, + cog = EXCLUDED.cog, + rot = EXCLUDED.rot, + length = EXCLUDED.length, + width = EXCLUDED.width, + draught = EXCLUDED.draught, + length_bow = EXCLUDED.length_bow, + length_stern = EXCLUDED.length_stern, + width_port = EXCLUDED.width_port, + width_starboard = EXCLUDED.width_starboard, + destination = EXCLUDED.destination, + eta = EXCLUDED.eta, + status = EXCLUDED.status, + age_minutes = EXCLUDED.age_minutes, + position_accuracy = EXCLUDED.position_accuracy, + timestamp_utc = EXCLUDED.timestamp_utc, + repeat_indicator = EXCLUDED.repeat_indicator, + raim_flag = EXCLUDED.raim_flag, + radio_status = EXCLUDED.radio_status, + regional = EXCLUDED.regional, + regional2 = EXCLUDED.regional2, + spare = EXCLUDED.spare, + spare2 = EXCLUDED.spare2, + ais_version = EXCLUDED.ais_version, + position_fix_type = EXCLUDED.position_fix_type, + dte = EXCLUDED.dte, + band_flag = EXCLUDED.band_flag, + received_date = EXCLUDED.received_date, + collected_at = EXCLUDED.collected_at, + updated_at = NOW() + """; + + // ==================== RowMapper ==================== + + private final RowMapper rowMapper = (rs, rowNum) -> AisTargetEntity.builder() + .mmsi(rs.getLong("mmsi")) + .messageTimestamp(toOffsetDateTime(rs.getTimestamp("message_timestamp"))) + .imo(rs.getObject("imo", Long.class)) + .name(rs.getString("name")) + .callsign(rs.getString("callsign")) + .vesselType(rs.getString("vessel_type")) + .extraInfo(rs.getString("extra_info")) + .lat(rs.getObject("lat", Double.class)) + .lon(rs.getObject("lon", Double.class)) + .heading(rs.getObject("heading", Double.class)) + .sog(rs.getObject("sog", Double.class)) + .cog(rs.getObject("cog", Double.class)) + .rot(rs.getObject("rot", Integer.class)) + .length(rs.getObject("length", Integer.class)) + .width(rs.getObject("width", Integer.class)) + .draught(rs.getObject("draught", Double.class)) + .lengthBow(rs.getObject("length_bow", Integer.class)) + .lengthStern(rs.getObject("length_stern", Integer.class)) + .widthPort(rs.getObject("width_port", Integer.class)) + .widthStarboard(rs.getObject("width_starboard", Integer.class)) + .destination(rs.getString("destination")) + .eta(toOffsetDateTime(rs.getTimestamp("eta"))) + .status(rs.getString("status")) + .ageMinutes(rs.getObject("age_minutes", Double.class)) + .positionAccuracy(rs.getObject("position_accuracy", Integer.class)) + .timestampUtc(rs.getObject("timestamp_utc", Integer.class)) + .repeatIndicator(rs.getObject("repeat_indicator", Integer.class)) + .raimFlag(rs.getObject("raim_flag", Integer.class)) + .radioStatus(rs.getObject("radio_status", Integer.class)) + .regional(rs.getObject("regional", Integer.class)) + .regional2(rs.getObject("regional2", Integer.class)) + .spare(rs.getObject("spare", Integer.class)) + .spare2(rs.getObject("spare2", Integer.class)) + .aisVersion(rs.getObject("ais_version", Integer.class)) + .positionFixType(rs.getObject("position_fix_type", Integer.class)) + .dte(rs.getObject("dte", Integer.class)) + .bandFlag(rs.getObject("band_flag", Integer.class)) + .receivedDate(toOffsetDateTime(rs.getTimestamp("received_date"))) + .collectedAt(toOffsetDateTime(rs.getTimestamp("collected_at"))) + .build(); + + // ==================== Repository Methods ==================== + + @Override + public Optional findByMmsiAndMessageTimestamp(Long mmsi, OffsetDateTime messageTimestamp) { + String sql = "SELECT * FROM " + TABLE_NAME + " WHERE mmsi = ? AND message_timestamp = ?"; + List results = jdbcTemplate.query(sql, rowMapper, mmsi, toTimestamp(messageTimestamp)); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public Optional findLatestByMmsi(Long mmsi) { + String sql = """ + SELECT * FROM %s + WHERE mmsi = ? + ORDER BY message_timestamp DESC + LIMIT 1 + """.formatted(TABLE_NAME); + List results = jdbcTemplate.query(sql, rowMapper, mmsi); + return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); + } + + @Override + public List findLatestByMmsiIn(List mmsiList) { + if (mmsiList == null || mmsiList.isEmpty()) { + return List.of(); + } + + // DISTINCT ON을 사용하여 각 MMSI별 최신 레코드 조회 + String sql = """ + SELECT DISTINCT ON (mmsi) * + FROM %s + WHERE mmsi = ANY(?) + ORDER BY mmsi, message_timestamp DESC + """.formatted(TABLE_NAME); + + Long[] mmsiArray = mmsiList.toArray(new Long[0]); + return jdbcTemplate.query(sql, rowMapper, (Object) mmsiArray); + } + + @Override + public List findByMmsiAndTimeRange(Long mmsi, OffsetDateTime start, OffsetDateTime end) { + String sql = """ + SELECT * FROM %s + WHERE mmsi = ? + AND message_timestamp BETWEEN ? AND ? + ORDER BY message_timestamp ASC + """.formatted(TABLE_NAME); + return jdbcTemplate.query(sql, rowMapper, mmsi, toTimestamp(start), toTimestamp(end)); + } + + @Override + public List findByTimeRangeAndArea( + OffsetDateTime start, + OffsetDateTime end, + Double centerLon, + Double centerLat, + Double radiusMeters + ) { + String sql = """ + SELECT DISTINCT ON (mmsi) * + FROM %s + WHERE message_timestamp BETWEEN ? AND ? + AND ST_DWithin( + geom::geography, + ST_SetSRID(ST_MakePoint(?, ?), 4326)::geography, + ? + ) + ORDER BY mmsi, message_timestamp DESC + """.formatted(TABLE_NAME); + + return jdbcTemplate.query(sql, rowMapper, + toTimestamp(start), toTimestamp(end), + centerLon, centerLat, radiusMeters); + } + + @Override + @Transactional + public void batchUpsert(List entities) { + if (entities == null || entities.isEmpty()) { + return; + } + + log.info("AIS Target 배치 UPSERT 시작: {} 건", entities.size()); + + jdbcTemplate.batchUpdate(UPSERT_SQL, entities, 1000, (ps, entity) -> { + int idx = 1; + // PK + ps.setLong(idx++, entity.getMmsi()); + ps.setTimestamp(idx++, toTimestamp(entity.getMessageTimestamp())); + // 선박 식별 정보 + ps.setObject(idx++, entity.getImo()); + ps.setString(idx++, truncate(entity.getName(), 100)); + ps.setString(idx++, truncate(entity.getCallsign(), 20)); + ps.setString(idx++, truncate(entity.getVesselType(), 50)); + ps.setString(idx++, truncate(entity.getExtraInfo(), 100)); + // 위치 정보 + ps.setObject(idx++, entity.getLat()); + ps.setObject(idx++, entity.getLon()); + // geom용 lon, lat + ps.setObject(idx++, entity.getLon()); + ps.setObject(idx++, entity.getLat()); + // 항해 정보 + ps.setObject(idx++, entity.getHeading()); + ps.setObject(idx++, entity.getSog()); + ps.setObject(idx++, entity.getCog()); + ps.setObject(idx++, entity.getRot()); + // 선박 제원 + ps.setObject(idx++, entity.getLength()); + ps.setObject(idx++, entity.getWidth()); + ps.setObject(idx++, entity.getDraught()); + ps.setObject(idx++, entity.getLengthBow()); + ps.setObject(idx++, entity.getLengthStern()); + ps.setObject(idx++, entity.getWidthPort()); + ps.setObject(idx++, entity.getWidthStarboard()); + // 목적지 정보 + ps.setString(idx++, truncate(entity.getDestination(), 200)); + ps.setTimestamp(idx++, toTimestamp(entity.getEta())); + ps.setString(idx++, truncate(entity.getStatus(), 50)); + // AIS 메시지 정보 + ps.setObject(idx++, entity.getAgeMinutes()); + ps.setObject(idx++, entity.getPositionAccuracy()); + ps.setObject(idx++, entity.getTimestampUtc()); + ps.setObject(idx++, entity.getRepeatIndicator()); + ps.setObject(idx++, entity.getRaimFlag()); + ps.setObject(idx++, entity.getRadioStatus()); + ps.setObject(idx++, entity.getRegional()); + ps.setObject(idx++, entity.getRegional2()); + ps.setObject(idx++, entity.getSpare()); + ps.setObject(idx++, entity.getSpare2()); + ps.setObject(idx++, entity.getAisVersion()); + ps.setObject(idx++, entity.getPositionFixType()); + ps.setObject(idx++, entity.getDte()); + ps.setObject(idx++, entity.getBandFlag()); + // 타임스탬프 + ps.setTimestamp(idx++, toTimestamp(entity.getReceivedDate())); + ps.setTimestamp(idx++, toTimestamp(entity.getCollectedAt())); + }); + + log.info("AIS Target 배치 UPSERT 완료: {} 건", entities.size()); + } + + @Override + public long count() { + String sql = "SELECT COUNT(*) FROM " + TABLE_NAME; + Long count = jdbcTemplate.queryForObject(sql, Long.class); + return count != null ? count : 0L; + } + + @Override + @Transactional + public int deleteOlderThan(OffsetDateTime threshold) { + String sql = "DELETE FROM " + TABLE_NAME + " WHERE message_timestamp < ?"; + int deleted = jdbcTemplate.update(sql, toTimestamp(threshold)); + log.info("AIS Target 오래된 데이터 삭제 완료: {} 건 (기준: {})", deleted, threshold); + return deleted; + } + + // ==================== Helper Methods ==================== + + private Timestamp toTimestamp(OffsetDateTime odt) { + return odt != null ? Timestamp.from(odt.toInstant()) : null; + } + + private OffsetDateTime toOffsetDateTime(Timestamp ts) { + return ts != null ? ts.toInstant().atOffset(ZoneOffset.UTC) : null; + } + + private String truncate(String value, int maxLength) { + if (value == null) return null; + return value.length() > maxLength ? value.substring(0, maxLength) : value; + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java new file mode 100644 index 0000000..05e302f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java @@ -0,0 +1,47 @@ +package com.snp.batch.jobs.aistarget.batch.writer; + +import com.snp.batch.common.batch.writer.BaseWriter; +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository; +import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * AIS Target 데이터 Writer + * + * 동작: + * - UPSERT 방식으로 DB 저장 (PK: mmsi + message_timestamp) + * - 동시에 캐시에도 최신 위치 정보 업데이트 + */ +@Slf4j +@Component +public class AisTargetDataWriter extends BaseWriter { + + private final AisTargetRepository aisTargetRepository; + private final AisTargetCacheManager cacheManager; + + public AisTargetDataWriter( + AisTargetRepository aisTargetRepository, + AisTargetCacheManager cacheManager) { + super("AisTarget"); + this.aisTargetRepository = aisTargetRepository; + this.cacheManager = cacheManager; + } + + @Override + protected void writeItems(List items) throws Exception { + log.debug("AIS Target 데이터 저장 시작: {} 건", items.size()); + + // 1. DB 저장 + aisTargetRepository.batchUpsert(items); + + // 2. 캐시 업데이트 (최신 위치 정보) + cacheManager.putAll(items); + + log.debug("AIS Target 데이터 저장 완료: {} 건 (캐시 크기: {})", + items.size(), cacheManager.size()); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/cache/AisTargetCacheManager.java b/src/main/java/com/snp/batch/jobs/aistarget/cache/AisTargetCacheManager.java new file mode 100644 index 0000000..fb6e22b --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/cache/AisTargetCacheManager.java @@ -0,0 +1,272 @@ +package com.snp.batch.jobs.aistarget.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import jakarta.annotation.PostConstruct; +import java.time.OffsetDateTime; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * AIS Target 캐시 매니저 (Caffeine 기반) + * + * Caffeine 캐시의 장점: + * - 고성능: ConcurrentHashMap 대비 우수한 성능 + * - 자동 만료: expireAfterWrite/expireAfterAccess 내장 + * - 최대 크기 제한: maximumSize + LRU/LFU 자동 정리 + * - 통계: 히트율, 미스율, 로드 시간 등 상세 통계 + * - 비동기 지원: AsyncCache로 비동기 로딩 가능 + * + * 동작: + * - 배치 Writer에서 DB 저장과 동시에 캐시 업데이트 + * - API 조회 시 캐시 우선 조회 + * - 캐시 미스 시 DB 조회 후 캐시 갱신 + * - TTL: 마지막 쓰기 이후 N분 뒤 자동 만료 + */ +@Slf4j +@Component +public class AisTargetCacheManager { + + private Cache cache; + + @Value("${app.batch.ais-target-cache.ttl-minutes:5}") + private long ttlMinutes; + + @Value("${app.batch.ais-target-cache.max-size:100000}") + private int maxSize; + + @PostConstruct + public void init() { + this.cache = Caffeine.newBuilder() + // 최대 캐시 크기 (초과 시 LRU 방식으로 정리) + .maximumSize(maxSize) + // 마지막 쓰기 이후 TTL (데이터 업데이트 시 자동 갱신) + .expireAfterWrite(ttlMinutes, TimeUnit.MINUTES) + // 통계 수집 활성화 + .recordStats() + // 제거 리스너 (디버깅/모니터링용) + .removalListener((Long key, AisTargetEntity value, RemovalCause cause) -> { + if (cause != RemovalCause.REPLACED) { + log.trace("캐시 제거 - MMSI: {}, 원인: {}", key, cause); + } + }) + .build(); + + log.info("AIS Target Caffeine 캐시 초기화 - TTL: {}분, 최대 크기: {}", ttlMinutes, maxSize); + } + + // ==================== 단건 조회/업데이트 ==================== + + /** + * 캐시에서 최신 위치 조회 + * + * @param mmsi MMSI 번호 + * @return 캐시된 데이터 (없으면 Optional.empty) + */ + public Optional get(Long mmsi) { + AisTargetEntity entity = cache.getIfPresent(mmsi); + return Optional.ofNullable(entity); + } + + /** + * 캐시에 데이터 저장/업데이트 + * - 기존 데이터보다 최신인 경우에만 업데이트 + * - 업데이트 시 TTL 자동 갱신 (expireAfterWrite) + * + * @param entity AIS Target 엔티티 + */ + public void put(AisTargetEntity entity) { + if (entity == null || entity.getMmsi() == null) { + return; + } + + Long mmsi = entity.getMmsi(); + AisTargetEntity existing = cache.getIfPresent(mmsi); + + // 기존 데이터보다 최신인 경우에만 업데이트 + if (existing == null || isNewer(entity, existing)) { + cache.put(mmsi, entity); + log.trace("캐시 저장 - MMSI: {}", mmsi); + } + } + + // ==================== 배치 조회/업데이트 ==================== + + /** + * 여러 MMSI의 최신 위치 조회 + * + * @param mmsiList MMSI 목록 + * @return 캐시에서 찾은 데이터 맵 (MMSI -> Entity) + */ + public Map getAll(List mmsiList) { + if (mmsiList == null || mmsiList.isEmpty()) { + return Collections.emptyMap(); + } + + // Caffeine의 getAllPresent는 존재하는 키만 반환 + Map result = cache.getAllPresent(mmsiList); + + log.debug("캐시 배치 조회 - 요청: {}, 히트: {}", + mmsiList.size(), result.size()); + + return result; + } + + /** + * 여러 데이터 일괄 저장/업데이트 (배치 Writer에서 호출) + * + * @param entities AIS Target 엔티티 목록 + */ + public void putAll(List entities) { + if (entities == null || entities.isEmpty()) { + return; + } + + int updated = 0; + int skipped = 0; + + for (AisTargetEntity entity : entities) { + if (entity == null || entity.getMmsi() == null) { + continue; + } + + Long mmsi = entity.getMmsi(); + AisTargetEntity existing = cache.getIfPresent(mmsi); + + // 기존 데이터보다 최신인 경우에만 업데이트 + if (existing == null || isNewer(entity, existing)) { + cache.put(mmsi, entity); + updated++; + } else { + skipped++; + } + } + + log.debug("캐시 배치 업데이트 - 입력: {}, 업데이트: {}, 스킵: {}, 현재 크기: {}", + entities.size(), updated, skipped, cache.estimatedSize()); + } + + // ==================== 캐시 관리 ==================== + + /** + * 특정 MMSI 캐시 삭제 + */ + public void evict(Long mmsi) { + cache.invalidate(mmsi); + } + + /** + * 여러 MMSI 캐시 삭제 + */ + public void evictAll(List mmsiList) { + cache.invalidateAll(mmsiList); + } + + /** + * 전체 캐시 삭제 + */ + public void clear() { + long size = cache.estimatedSize(); + cache.invalidateAll(); + log.info("캐시 전체 삭제 - {} 건", size); + } + + /** + * 현재 캐시 크기 (추정값) + */ + public long size() { + return cache.estimatedSize(); + } + + /** + * 캐시 정리 (만료된 엔트리 즉시 제거) + */ + public void cleanup() { + cache.cleanUp(); + } + + // ==================== 통계 ==================== + + /** + * 캐시 통계 조회 + */ + public Map getStats() { + CacheStats stats = cache.stats(); + + Map result = new LinkedHashMap<>(); + result.put("estimatedSize", cache.estimatedSize()); + result.put("maxSize", maxSize); + result.put("ttlMinutes", ttlMinutes); + result.put("hitCount", stats.hitCount()); + result.put("missCount", stats.missCount()); + result.put("hitRate", String.format("%.2f%%", stats.hitRate() * 100)); + result.put("missRate", String.format("%.2f%%", stats.missRate() * 100)); + result.put("evictionCount", stats.evictionCount()); + result.put("loadCount", stats.loadCount()); + result.put("averageLoadPenalty", String.format("%.2fms", stats.averageLoadPenalty() / 1_000_000.0)); + result.put("utilizationPercent", String.format("%.2f%%", (cache.estimatedSize() * 100.0 / maxSize))); + + return result; + } + + /** + * 상세 통계 조회 (Caffeine CacheStats 원본) + */ + public CacheStats getCacheStats() { + return cache.stats(); + } + + // ==================== 전체 데이터 조회 (공간 필터링용) ==================== + + /** + * 캐시의 모든 데이터 조회 (공간 필터링용) + * 주의: 대용량 데이터이므로 신중하게 사용 + * + * @return 캐시된 모든 엔티티 + */ + public Collection getAllValues() { + return cache.asMap().values(); + } + + /** + * 시간 범위 내 데이터 필터링 + * + * @param minutes 최근 N분 + * @return 시간 범위 내 엔티티 목록 + */ + public List getByTimeRange(int minutes) { + java.time.OffsetDateTime threshold = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC) + .minusMinutes(minutes); + + return cache.asMap().values().stream() + .filter(entity -> entity.getMessageTimestamp() != null) + .filter(entity -> entity.getMessageTimestamp().isAfter(threshold)) + .collect(java.util.stream.Collectors.toList()); + } + + // ==================== Private Methods ==================== + + /** + * 새 데이터가 기존 데이터보다 최신인지 확인 + */ + private boolean isNewer(AisTargetEntity newEntity, AisTargetEntity existing) { + OffsetDateTime newTimestamp = newEntity.getMessageTimestamp(); + OffsetDateTime existingTimestamp = existing.getMessageTimestamp(); + + if (newTimestamp == null) { + return false; + } + if (existingTimestamp == null) { + return true; + } + + return newTimestamp.isAfter(existingTimestamp); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/cache/AisTargetFilterUtil.java b/src/main/java/com/snp/batch/jobs/aistarget/cache/AisTargetFilterUtil.java new file mode 100644 index 0000000..ecffa32 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/cache/AisTargetFilterUtil.java @@ -0,0 +1,153 @@ +package com.snp.batch.jobs.aistarget.cache; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import com.snp.batch.jobs.aistarget.web.dto.AisTargetFilterRequest; +import com.snp.batch.jobs.aistarget.web.dto.NumericCondition; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * AIS Target 필터링 유틸리티 + * + * 캐시 데이터에 대한 조건 필터링 수행 + * - SOG, COG, Heading: 숫자 범위 조건 + * - Destination: 문자열 부분 일치 + * - Status: 다중 선택 일치 + */ +@Slf4j +@Component +public class AisTargetFilterUtil { + + /** + * 필터 조건에 따라 엔티티 목록 필터링 + * + * @param entities 원본 엔티티 목록 + * @param request 필터 조건 + * @return 필터링된 엔티티 목록 + */ + public List filter(List entities, AisTargetFilterRequest request) { + if (entities == null || entities.isEmpty()) { + return List.of(); + } + + if (!request.hasAnyFilter()) { + return entities; + } + + long startTime = System.currentTimeMillis(); + + List result = entities.parallelStream() + .filter(entity -> matchesSog(entity, request)) + .filter(entity -> matchesCog(entity, request)) + .filter(entity -> matchesHeading(entity, request)) + .filter(entity -> matchesDestination(entity, request)) + .filter(entity -> matchesStatus(entity, request)) + .collect(Collectors.toList()); + + long elapsed = System.currentTimeMillis() - startTime; + log.debug("필터링 완료 - 입력: {}, 결과: {}, 소요: {}ms", + entities.size(), result.size(), elapsed); + + return result; + } + + /** + * SOG (속도) 조건 매칭 + */ + private boolean matchesSog(AisTargetEntity entity, AisTargetFilterRequest request) { + if (!request.hasSogFilter()) { + return true; // 필터 없으면 통과 + } + + NumericCondition condition = NumericCondition.fromString(request.getSogCondition()); + if (condition == null) { + return true; + } + + return condition.matches( + entity.getSog(), + request.getSogValue(), + request.getSogMin(), + request.getSogMax() + ); + } + + /** + * COG (침로) 조건 매칭 + */ + private boolean matchesCog(AisTargetEntity entity, AisTargetFilterRequest request) { + if (!request.hasCogFilter()) { + return true; + } + + NumericCondition condition = NumericCondition.fromString(request.getCogCondition()); + if (condition == null) { + return true; + } + + return condition.matches( + entity.getCog(), + request.getCogValue(), + request.getCogMin(), + request.getCogMax() + ); + } + + /** + * Heading (선수방위) 조건 매칭 + */ + private boolean matchesHeading(AisTargetEntity entity, AisTargetFilterRequest request) { + if (!request.hasHeadingFilter()) { + return true; + } + + NumericCondition condition = NumericCondition.fromString(request.getHeadingCondition()); + if (condition == null) { + return true; + } + + return condition.matches( + entity.getHeading(), + request.getHeadingValue(), + request.getHeadingMin(), + request.getHeadingMax() + ); + } + + /** + * Destination (목적지) 조건 매칭 - 부분 일치, 대소문자 무시 + */ + private boolean matchesDestination(AisTargetEntity entity, AisTargetFilterRequest request) { + if (!request.hasDestinationFilter()) { + return true; + } + + String entityDestination = entity.getDestination(); + if (entityDestination == null || entityDestination.isEmpty()) { + return false; + } + + return entityDestination.toUpperCase().contains(request.getDestination().toUpperCase().trim()); + } + + /** + * Status (항행상태) 조건 매칭 - 다중 선택 중 하나라도 일치 + */ + private boolean matchesStatus(AisTargetEntity entity, AisTargetFilterRequest request) { + if (!request.hasStatusFilter()) { + return true; + } + + String entityStatus = entity.getStatus(); + if (entityStatus == null || entityStatus.isEmpty()) { + return false; + } + + // statusList에 포함되어 있으면 통과 + return request.getStatusList().stream() + .anyMatch(status -> entityStatus.equalsIgnoreCase(status.trim())); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/cache/SpatialFilterUtil.java b/src/main/java/com/snp/batch/jobs/aistarget/cache/SpatialFilterUtil.java new file mode 100644 index 0000000..b2c2fce --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/cache/SpatialFilterUtil.java @@ -0,0 +1,317 @@ +package com.snp.batch.jobs.aistarget.cache; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import lombok.extern.slf4j.Slf4j; +import org.locationtech.jts.geom.*; +import org.locationtech.jts.geom.impl.CoordinateArraySequence; +import org.locationtech.jts.operation.distance.DistanceOp; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 공간 필터링 유틸리티 (JTS 기반) + * + * 지원 기능: + * - 원형 범위 내 선박 필터링 (Point + Radius) + * - 폴리곤 범위 내 선박 필터링 (Polygon) + * - 거리 계산 (Haversine 공식 - 지구 곡률 고려) + * + * 성능: + * - 25만 건 필터링: 약 50-100ms (병렬 처리 시) + * - 단순 거리 계산은 JTS 없이 Haversine으로 처리 (더 빠름) + * - 복잡한 폴리곤은 JTS 사용 + */ +@Slf4j +@Component +public class SpatialFilterUtil { + + private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory(new PrecisionModel(), 4326); + + // 지구 반경 (미터) + private static final double EARTH_RADIUS_METERS = 6_371_000; + + // ==================== 원형 범위 필터링 ==================== + + /** + * 원형 범위 내 선박 필터링 (Haversine 거리 계산 - 빠름) + * + * @param entities 전체 엔티티 목록 + * @param centerLon 중심 경도 + * @param centerLat 중심 위도 + * @param radiusMeters 반경 (미터) + * @return 범위 내 엔티티 목록 + */ + public List filterByCircle( + Collection entities, + double centerLon, + double centerLat, + double radiusMeters) { + + if (entities == null || entities.isEmpty()) { + return new ArrayList<>(); + } + + long startTime = System.currentTimeMillis(); + + // 병렬 스트림으로 필터링 (대용량 데이터 최적화) + List result = entities.parallelStream() + .filter(entity -> entity.getLat() != null && entity.getLon() != null) + .filter(entity -> { + double distance = haversineDistance( + centerLat, centerLon, + entity.getLat(), entity.getLon() + ); + return distance <= radiusMeters; + }) + .collect(Collectors.toList()); + + long elapsed = System.currentTimeMillis() - startTime; + log.debug("원형 필터링 완료 - 입력: {}, 결과: {}, 소요: {}ms", + entities.size(), result.size(), elapsed); + + return result; + } + + /** + * 원형 범위 내 선박 필터링 + 거리 정보 포함 + */ + public List filterByCircleWithDistance( + Collection entities, + double centerLon, + double centerLat, + double radiusMeters) { + + if (entities == null || entities.isEmpty()) { + return new ArrayList<>(); + } + + return entities.parallelStream() + .filter(entity -> entity.getLat() != null && entity.getLon() != null) + .map(entity -> { + double distance = haversineDistance( + centerLat, centerLon, + entity.getLat(), entity.getLon() + ); + return new EntityWithDistance(entity, distance); + }) + .filter(ewd -> ewd.getDistanceMeters() <= radiusMeters) + .sorted((a, b) -> Double.compare(a.getDistanceMeters(), b.getDistanceMeters())) + .collect(Collectors.toList()); + } + + // ==================== 폴리곤 범위 필터링 ==================== + + /** + * 폴리곤 범위 내 선박 필터링 (JTS 사용) + * + * @param entities 전체 엔티티 목록 + * @param polygonCoordinates 폴리곤 좌표 [[lon, lat], [lon, lat], ...] (닫힌 형태) + * @return 범위 내 엔티티 목록 + */ + public List filterByPolygon( + Collection entities, + double[][] polygonCoordinates) { + + if (entities == null || entities.isEmpty()) { + return new ArrayList<>(); + } + + if (polygonCoordinates == null || polygonCoordinates.length < 4) { + log.warn("유효하지 않은 폴리곤 좌표 (최소 4개 점 필요)"); + return new ArrayList<>(); + } + + long startTime = System.currentTimeMillis(); + + // JTS Polygon 생성 + Polygon polygon = createPolygon(polygonCoordinates); + + if (polygon == null || !polygon.isValid()) { + log.warn("유효하지 않은 폴리곤"); + return new ArrayList<>(); + } + + // 병렬 스트림으로 필터링 + List result = entities.parallelStream() + .filter(entity -> entity.getLat() != null && entity.getLon() != null) + .filter(entity -> { + Point point = createPoint(entity.getLon(), entity.getLat()); + return polygon.contains(point); + }) + .collect(Collectors.toList()); + + long elapsed = System.currentTimeMillis() - startTime; + log.debug("폴리곤 필터링 완료 - 입력: {}, 결과: {}, 소요: {}ms", + entities.size(), result.size(), elapsed); + + return result; + } + + /** + * WKT(Well-Known Text) 형식 폴리곤으로 필터링 + * + * @param entities 전체 엔티티 목록 + * @param wkt WKT 문자열 (예: "POLYGON((129 35, 130 35, 130 36, 129 36, 129 35))") + * @return 범위 내 엔티티 목록 + */ + public List filterByWkt( + Collection entities, + String wkt) { + + if (entities == null || entities.isEmpty()) { + return new ArrayList<>(); + } + + try { + Geometry geometry = new org.locationtech.jts.io.WKTReader(GEOMETRY_FACTORY).read(wkt); + + return entities.parallelStream() + .filter(entity -> entity.getLat() != null && entity.getLon() != null) + .filter(entity -> { + Point point = createPoint(entity.getLon(), entity.getLat()); + return geometry.contains(point); + }) + .collect(Collectors.toList()); + + } catch (Exception e) { + log.error("WKT 파싱 실패: {}", wkt, e); + return new ArrayList<>(); + } + } + + // ==================== GeoJSON 지원 ==================== + + /** + * GeoJSON 형식 폴리곤으로 필터링 + * + * @param entities 전체 엔티티 목록 + * @param geoJsonCoordinates GeoJSON coordinates 배열 [[[lon, lat], ...]] + * @return 범위 내 엔티티 목록 + */ + public List filterByGeoJson( + Collection entities, + double[][][] geoJsonCoordinates) { + + if (geoJsonCoordinates == null || geoJsonCoordinates.length == 0) { + return new ArrayList<>(); + } + + // GeoJSON의 첫 번째 링 (외부 경계) + return filterByPolygon(entities, geoJsonCoordinates[0]); + } + + // ==================== 거리 계산 ==================== + + /** + * Haversine 공식을 사용한 두 지점 간 거리 계산 (미터) + * 지구 곡률을 고려한 정확한 거리 계산 + */ + public double haversineDistance(double lat1, double lon1, double lat2, double lon2) { + double dLat = Math.toRadians(lat2 - lat1); + double dLon = Math.toRadians(lon2 - lon1); + + double a = Math.sin(dLat / 2) * Math.sin(dLat / 2) + + Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) + * Math.sin(dLon / 2) * Math.sin(dLon / 2); + + double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); + + return EARTH_RADIUS_METERS * c; + } + + /** + * 두 엔티티 간 거리 계산 (미터) + */ + public double calculateDistance(AisTargetEntity entity1, AisTargetEntity entity2) { + if (entity1.getLat() == null || entity1.getLon() == null || + entity2.getLat() == null || entity2.getLon() == null) { + return Double.MAX_VALUE; + } + + return haversineDistance( + entity1.getLat(), entity1.getLon(), + entity2.getLat(), entity2.getLon() + ); + } + + // ==================== JTS 헬퍼 메서드 ==================== + + /** + * JTS Point 생성 + */ + public Point createPoint(double lon, double lat) { + return GEOMETRY_FACTORY.createPoint(new Coordinate(lon, lat)); + } + + /** + * JTS Polygon 생성 + */ + public Polygon createPolygon(double[][] coordinates) { + try { + Coordinate[] coords = new Coordinate[coordinates.length]; + for (int i = 0; i < coordinates.length; i++) { + coords[i] = new Coordinate(coordinates[i][0], coordinates[i][1]); + } + + // 폴리곤이 닫혀있지 않으면 닫기 + if (!coords[0].equals(coords[coords.length - 1])) { + Coordinate[] closedCoords = new Coordinate[coords.length + 1]; + System.arraycopy(coords, 0, closedCoords, 0, coords.length); + closedCoords[coords.length] = coords[0]; + coords = closedCoords; + } + + LinearRing ring = GEOMETRY_FACTORY.createLinearRing(coords); + return GEOMETRY_FACTORY.createPolygon(ring); + + } catch (Exception e) { + log.error("폴리곤 생성 실패", e); + return null; + } + } + + /** + * 원형 폴리곤 생성 (근사치) + * + * @param centerLon 중심 경도 + * @param centerLat 중심 위도 + * @param radiusMeters 반경 (미터) + * @param numPoints 폴리곤 점 개수 (기본: 64) + */ + public Polygon createCirclePolygon(double centerLon, double centerLat, double radiusMeters, int numPoints) { + Coordinate[] coords = new Coordinate[numPoints + 1]; + + for (int i = 0; i < numPoints; i++) { + double angle = (2 * Math.PI * i) / numPoints; + + // 위도/경도 변환 (근사치) + double dLat = (radiusMeters / EARTH_RADIUS_METERS) * (180 / Math.PI); + double dLon = dLat / Math.cos(Math.toRadians(centerLat)); + + double lat = centerLat + dLat * Math.sin(angle); + double lon = centerLon + dLon * Math.cos(angle); + + coords[i] = new Coordinate(lon, lat); + } + coords[numPoints] = coords[0]; // 닫기 + + LinearRing ring = GEOMETRY_FACTORY.createLinearRing(coords); + return GEOMETRY_FACTORY.createPolygon(ring); + } + + // ==================== 내부 클래스 ==================== + + /** + * 엔티티 + 거리 정보 + */ + @lombok.Data + @lombok.AllArgsConstructor + public static class EntityWithDistance { + private AisTargetEntity entity; + private double distanceMeters; + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/web/controller/AisTargetController.java b/src/main/java/com/snp/batch/jobs/aistarget/web/controller/AisTargetController.java new file mode 100644 index 0000000..fea1864 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/web/controller/AisTargetController.java @@ -0,0 +1,393 @@ +package com.snp.batch.jobs.aistarget.web.controller; + +import com.snp.batch.common.web.ApiResponse; +import com.snp.batch.jobs.aistarget.web.dto.AisTargetFilterRequest; +import com.snp.batch.jobs.aistarget.web.dto.AisTargetResponseDto; +import com.snp.batch.jobs.aistarget.web.dto.AisTargetSearchRequest; +import com.snp.batch.jobs.aistarget.web.service.AisTargetService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import jakarta.validation.Valid; +import java.util.List; +import java.util.Map; + +/** + * AIS Target REST API Controller + * + * 캐시 우선 조회 전략: + * - 캐시에서 먼저 조회 + * - 캐시 미스 시 DB 조회 후 캐시 업데이트 + */ +@Slf4j +@RestController +@RequestMapping("/api/ais-target") +@RequiredArgsConstructor +@Tag(name = "AIS Target", description = "AIS 선박 위치 정보 API") +public class AisTargetController { + + private final AisTargetService aisTargetService; + + // ==================== 단건 조회 ==================== + + @Operation( + summary = "MMSI로 최신 위치 조회", + description = "특정 MMSI의 최신 위치 정보를 조회합니다 (캐시 우선)" + ) + @GetMapping("/{mmsi}") + public ResponseEntity> getLatestByMmsi( + @Parameter(description = "MMSI 번호", required = true, example = "440123456") + @PathVariable Long mmsi) { + log.info("최신 위치 조회 요청 - MMSI: {}", mmsi); + + return aisTargetService.findLatestByMmsi(mmsi) + .map(dto -> ResponseEntity.ok(ApiResponse.success(dto))) + .orElse(ResponseEntity.notFound().build()); + } + + // ==================== 다건 조회 ==================== + + @Operation( + summary = "여러 MMSI의 최신 위치 조회", + description = "여러 MMSI의 최신 위치 정보를 일괄 조회합니다 (캐시 우선)" + ) + @PostMapping("/batch") + public ResponseEntity>> getLatestByMmsiList( + @Parameter(description = "MMSI 번호 목록", required = true) + @RequestBody List mmsiList) { + log.info("다건 최신 위치 조회 요청 - 요청 수: {}", mmsiList.size()); + + List result = aisTargetService.findLatestByMmsiList(mmsiList); + return ResponseEntity.ok(ApiResponse.success( + "조회 완료: " + result.size() + "/" + mmsiList.size() + " 건", + result + )); + } + + // ==================== 검색 조회 ==================== + + @Operation( + summary = "시간/공간 범위로 선박 검색", + description = """ + 시간 범위 (필수) + 공간 범위 (옵션)로 선박을 검색합니다. + + - minutes: 조회 범위 (분, 필수) + - centerLon, centerLat: 중심 좌표 (옵션) + - radiusMeters: 반경 (미터, 옵션) + + 공간 범위가 지정되지 않으면 전체 선박의 최신 위치를 반환합니다. + """ + ) + @GetMapping("/search") + public ResponseEntity>> search( + @Parameter(description = "조회 범위 (분)", required = true, example = "5") + @RequestParam Integer minutes, + @Parameter(description = "중심 경도", example = "129.0") + @RequestParam(required = false) Double centerLon, + @Parameter(description = "중심 위도", example = "35.0") + @RequestParam(required = false) Double centerLat, + @Parameter(description = "반경 (미터)", example = "50000") + @RequestParam(required = false) Double radiusMeters) { + + log.info("선박 검색 요청 - minutes: {}, center: ({}, {}), radius: {}", + minutes, centerLon, centerLat, radiusMeters); + + AisTargetSearchRequest request = AisTargetSearchRequest.builder() + .minutes(minutes) + .centerLon(centerLon) + .centerLat(centerLat) + .radiusMeters(radiusMeters) + .build(); + + List result = aisTargetService.search(request); + return ResponseEntity.ok(ApiResponse.success( + "검색 완료: " + result.size() + " 건", + result + )); + } + + @Operation( + summary = "시간/공간 범위로 선박 검색 (POST)", + description = "POST 방식으로 검색 조건을 전달합니다" + ) + @PostMapping("/search") + public ResponseEntity>> searchPost( + @Valid @RequestBody AisTargetSearchRequest request) { + log.info("선박 검색 요청 (POST) - minutes: {}, hasArea: {}", + request.getMinutes(), request.hasAreaFilter()); + + List result = aisTargetService.search(request); + return ResponseEntity.ok(ApiResponse.success( + "검색 완료: " + result.size() + " 건", + result + )); + } + + // ==================== 조건 필터 검색 ==================== + + @Operation( + summary = "항해 조건 필터 검색", + description = """ + 속도(SOG), 침로(COG), 선수방위(Heading), 목적지, 항행상태로 선박을 필터링합니다. + + --- + ## 조건 타입 및 파라미터 사용법 + + | 조건 | 의미 | 사용 파라미터 | + |------|------|--------------| + | GTE | 이상 (>=) | *Value (예: sogValue) | + | GT | 초과 (>) | *Value | + | LTE | 이하 (<=) | *Value | + | LT | 미만 (<) | *Value | + | BETWEEN | 범위 | *Min, *Max (예: sogMin, sogMax) | + + --- + ## 요청 예시 + + **예시 1: 단일 값 조건 (속도 10knots 이상)** + ```json + { + "minutes": 5, + "sogCondition": "GTE", + "sogValue": 10.0 + } + ``` + + **예시 2: 범위 조건 (속도 5~15knots, 침로 90~180도)** + ```json + { + "minutes": 5, + "sogCondition": "BETWEEN", + "sogMin": 5.0, + "sogMax": 15.0, + "cogCondition": "BETWEEN", + "cogMin": 90.0, + "cogMax": 180.0 + } + ``` + + **예시 3: 복합 조건** + ```json + { + "minutes": 5, + "sogCondition": "GTE", + "sogValue": 10.0, + "cogCondition": "BETWEEN", + "cogMin": 90.0, + "cogMax": 180.0, + "headingCondition": "LT", + "headingValue": 180.0, + "destination": "BUSAN", + "statusList": ["0", "1", "5"] + } + ``` + + --- + ## 항행상태 코드 (statusList) + + | 코드 | 상태 | + |------|------| + | 0 | Under way using engine (기관 사용 항해 중) | + | 1 | At anchor (정박 중) | + | 2 | Not under command (조종불능) | + | 3 | Restricted manoeuverability (조종제한) | + | 4 | Constrained by her draught (흘수제약) | + | 5 | Moored (계류 중) | + | 6 | Aground (좌초) | + | 7 | Engaged in Fishing (어로 중) | + | 8 | Under way sailing (돛 항해 중) | + | 9-10 | Reserved for future use | + | 11 | Power-driven vessel towing astern | + | 12 | Power-driven vessel pushing ahead | + | 13 | Reserved for future use | + | 14 | AIS-SART, MOB-AIS, EPIRB-AIS | + | 15 | Undefined (default) | + + --- + **참고:** 모든 필터는 선택사항이며, 미지정 시 해당 필드는 조건에서 제외됩니다 (전체 값 포함). + """ + ) + @PostMapping("/search/filter") + public ResponseEntity>> searchByFilter( + @Valid @RequestBody AisTargetFilterRequest request) { + log.info("필터 검색 요청 - minutes: {}, sog: {}/{}, cog: {}/{}, heading: {}/{}, dest: {}, status: {}", + request.getMinutes(), + request.getSogCondition(), request.getSogValue(), + request.getCogCondition(), request.getCogValue(), + request.getHeadingCondition(), request.getHeadingValue(), + request.getDestination(), + request.getStatusList()); + + List result = aisTargetService.searchByFilter(request); + return ResponseEntity.ok(ApiResponse.success( + "필터 검색 완료: " + result.size() + " 건", + result + )); + } + + // ==================== 폴리곤 검색 ==================== + + @Operation( + summary = "폴리곤 범위 내 선박 검색", + description = """ + 폴리곤 범위 내 선박을 검색합니다. + + 요청 예시: + { + "minutes": 5, + "coordinates": [[129.0, 35.0], [130.0, 35.0], [130.0, 36.0], [129.0, 36.0], [129.0, 35.0]] + } + + 좌표는 [경도, 위도] 순서이며, 폴리곤은 닫힌 형태여야 합니다 (첫점 = 끝점). + """ + ) + @PostMapping("/search/polygon") + public ResponseEntity>> searchByPolygon( + @RequestBody PolygonSearchRequest request) { + log.info("폴리곤 검색 요청 - minutes: {}, points: {}", + request.getMinutes(), request.getCoordinates().length); + + List result = aisTargetService.searchByPolygon( + request.getMinutes(), + request.getCoordinates() + ); + return ResponseEntity.ok(ApiResponse.success( + "폴리곤 검색 완료: " + result.size() + " 건", + result + )); + } + + @Operation( + summary = "WKT 범위 내 선박 검색", + description = """ + WKT(Well-Known Text) 형식으로 정의된 범위 내 선박을 검색합니다. + + 요청 예시: + { + "minutes": 5, + "wkt": "POLYGON((129 35, 130 35, 130 36, 129 36, 129 35))" + } + + 지원 형식: POLYGON, MULTIPOLYGON + """ + ) + @PostMapping("/search/wkt") + public ResponseEntity>> searchByWkt( + @RequestBody WktSearchRequest request) { + log.info("WKT 검색 요청 - minutes: {}, wkt: {}", request.getMinutes(), request.getWkt()); + + List result = aisTargetService.searchByWkt( + request.getMinutes(), + request.getWkt() + ); + return ResponseEntity.ok(ApiResponse.success( + "WKT 검색 완료: " + result.size() + " 건", + result + )); + } + + @Operation( + summary = "거리 포함 원형 범위 검색", + description = """ + 원형 범위 내 선박을 검색하고, 중심점으로부터의 거리 정보를 함께 반환합니다. + 결과는 거리순으로 정렬됩니다. + """ + ) + @GetMapping("/search/with-distance") + public ResponseEntity>> searchWithDistance( + @Parameter(description = "조회 범위 (분)", required = true, example = "5") + @RequestParam Integer minutes, + @Parameter(description = "중심 경도", required = true, example = "129.0") + @RequestParam Double centerLon, + @Parameter(description = "중심 위도", required = true, example = "35.0") + @RequestParam Double centerLat, + @Parameter(description = "반경 (미터)", required = true, example = "50000") + @RequestParam Double radiusMeters) { + + log.info("거리 포함 검색 요청 - minutes: {}, center: ({}, {}), radius: {}", + minutes, centerLon, centerLat, radiusMeters); + + List result = + aisTargetService.searchWithDistance(minutes, centerLon, centerLat, radiusMeters); + return ResponseEntity.ok(ApiResponse.success( + "거리 포함 검색 완료: " + result.size() + " 건", + result + )); + } + + // ==================== 항적 조회 ==================== + + @Operation( + summary = "항적 조회", + description = "특정 MMSI의 시간 범위 내 항적 (위치 이력)을 조회합니다" + ) + @GetMapping("/{mmsi}/track") + public ResponseEntity>> getTrack( + @Parameter(description = "MMSI 번호", required = true, example = "440123456") + @PathVariable Long mmsi, + @Parameter(description = "조회 범위 (분)", required = true, example = "60") + @RequestParam Integer minutes) { + log.info("항적 조회 요청 - MMSI: {}, 범위: {}분", mmsi, minutes); + + List track = aisTargetService.getTrack(mmsi, minutes); + return ResponseEntity.ok(ApiResponse.success( + "항적 조회 완료: " + track.size() + " 포인트", + track + )); + } + + // ==================== 캐시 관리 ==================== + + @Operation( + summary = "캐시 통계 조회", + description = "AIS Target 캐시의 현재 상태를 조회합니다" + ) + @GetMapping("/cache/stats") + public ResponseEntity>> getCacheStats() { + Map stats = aisTargetService.getCacheStats(); + return ResponseEntity.ok(ApiResponse.success(stats)); + } + + @Operation( + summary = "캐시 초기화", + description = "AIS Target 캐시를 초기화합니다" + ) + @DeleteMapping("/cache") + public ResponseEntity> clearCache() { + log.warn("캐시 초기화 요청"); + aisTargetService.clearCache(); + return ResponseEntity.ok(ApiResponse.success("캐시가 초기화되었습니다", null)); + } + + // ==================== 요청 DTO (내부 클래스) ==================== + + /** + * 폴리곤 검색 요청 DTO + */ + @lombok.Data + public static class PolygonSearchRequest { + @Parameter(description = "조회 범위 (분)", required = true, example = "5") + private int minutes; + + @Parameter(description = "폴리곤 좌표 [[lon, lat], ...]", required = true) + private double[][] coordinates; + } + + /** + * WKT 검색 요청 DTO + */ + @lombok.Data + public static class WktSearchRequest { + @Parameter(description = "조회 범위 (분)", required = true, example = "5") + private int minutes; + + @Parameter(description = "WKT 문자열", required = true, + example = "POLYGON((129 35, 130 35, 130 36, 129 36, 129 35))") + private String wkt; + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/web/dto/AisTargetFilterRequest.java b/src/main/java/com/snp/batch/jobs/aistarget/web/dto/AisTargetFilterRequest.java new file mode 100644 index 0000000..cc34c45 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/web/dto/AisTargetFilterRequest.java @@ -0,0 +1,153 @@ +package com.snp.batch.jobs.aistarget.web.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import java.util.List; + +/** + * AIS Target 필터 검색 요청 DTO + * + * 조건 타입 (condition): + * - GTE: 이상 (>=) + * - GT: 초과 (>) + * - LTE: 이하 (<=) + * - LT: 미만 (<) + * - BETWEEN: 범위 (min <= value <= max) + * + * 모든 필터는 선택사항이며, 미지정 시 해당 필드 전체 포함 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "AIS Target 필터 검색 요청") +public class AisTargetFilterRequest { + + @NotNull(message = "minutes는 필수입니다") + @Min(value = 1, message = "minutes는 최소 1분 이상이어야 합니다") + @Schema(description = "조회 범위 (분)", example = "5", requiredMode = Schema.RequiredMode.REQUIRED) + private Integer minutes; + + // ==================== 속도 (SOG) 필터 ==================== + @Schema(description = """ + 속도(SOG) 조건 타입 + - GTE: 이상 (>=) - sogValue 사용 + - GT: 초과 (>) - sogValue 사용 + - LTE: 이하 (<=) - sogValue 사용 + - LT: 미만 (<) - sogValue 사용 + - BETWEEN: 범위 - sogMin, sogMax 사용 + """, + example = "GTE", allowableValues = {"GTE", "GT", "LTE", "LT", "BETWEEN"}) + private String sogCondition; + + @Schema(description = "속도 값 (knots) - GTE/GT/LTE/LT 조건에서 사용", example = "10.0") + private Double sogValue; + + @Schema(description = "속도 최소값 (knots) - BETWEEN 조건에서 사용 (sogMin <= 속도 <= sogMax)", example = "5.0") + private Double sogMin; + + @Schema(description = "속도 최대값 (knots) - BETWEEN 조건에서 사용 (sogMin <= 속도 <= sogMax)", example = "15.0") + private Double sogMax; + + // ==================== 침로 (COG) 필터 ==================== + @Schema(description = """ + 침로(COG) 조건 타입 + - GTE: 이상 (>=) - cogValue 사용 + - GT: 초과 (>) - cogValue 사용 + - LTE: 이하 (<=) - cogValue 사용 + - LT: 미만 (<) - cogValue 사용 + - BETWEEN: 범위 - cogMin, cogMax 사용 + """, + example = "BETWEEN", allowableValues = {"GTE", "GT", "LTE", "LT", "BETWEEN"}) + private String cogCondition; + + @Schema(description = "침로 값 (degrees, 0-360) - GTE/GT/LTE/LT 조건에서 사용", example = "180.0") + private Double cogValue; + + @Schema(description = "침로 최소값 (degrees) - BETWEEN 조건에서 사용 (cogMin <= 침로 <= cogMax)", example = "90.0") + private Double cogMin; + + @Schema(description = "침로 최대값 (degrees) - BETWEEN 조건에서 사용 (cogMin <= 침로 <= cogMax)", example = "270.0") + private Double cogMax; + + // ==================== 선수방위 (Heading) 필터 ==================== + @Schema(description = """ + 선수방위(Heading) 조건 타입 + - GTE: 이상 (>=) - headingValue 사용 + - GT: 초과 (>) - headingValue 사용 + - LTE: 이하 (<=) - headingValue 사용 + - LT: 미만 (<) - headingValue 사용 + - BETWEEN: 범위 - headingMin, headingMax 사용 + """, + example = "LTE", allowableValues = {"GTE", "GT", "LTE", "LT", "BETWEEN"}) + private String headingCondition; + + @Schema(description = "선수방위 값 (degrees, 0-360) - GTE/GT/LTE/LT 조건에서 사용", example = "90.0") + private Double headingValue; + + @Schema(description = "선수방위 최소값 (degrees) - BETWEEN 조건에서 사용 (headingMin <= 선수방위 <= headingMax)", example = "0.0") + private Double headingMin; + + @Schema(description = "선수방위 최대값 (degrees) - BETWEEN 조건에서 사용 (headingMin <= 선수방위 <= headingMax)", example = "180.0") + private Double headingMax; + + // ==================== 목적지 (Destination) 필터 ==================== + @Schema(description = "목적지 (부분 일치, 대소문자 무시)", example = "BUSAN") + private String destination; + + // ==================== 항행상태 (Status) 필터 ==================== + @Schema(description = """ + 항행상태 목록 (다중 선택 가능, 미선택 시 전체) + - 0: Under way using engine + - 1: At anchor + - 2: Not under command + - 3: Restricted manoeuverability + - 4: Constrained by her draught + - 5: Moored + - 6: Aground + - 7: Engaged in Fishing + - 8: Under way sailing + - 9: Reserved for future (HSC) + - 10: Reserved for future (WIG) + - 11: Power-driven vessel towing astern + - 12: Power-driven vessel pushing ahead or towing alongside + - 13: Reserved for future use + - 14: AIS-SART, MOB-AIS, EPIRB-AIS + - 15: Undefined (default) + """, + example = "[\"0\", \"1\", \"5\"]") + private List statusList; + + // ==================== 필터 존재 여부 확인 ==================== + + public boolean hasSogFilter() { + return sogCondition != null && !sogCondition.isEmpty(); + } + + public boolean hasCogFilter() { + return cogCondition != null && !cogCondition.isEmpty(); + } + + public boolean hasHeadingFilter() { + return headingCondition != null && !headingCondition.isEmpty(); + } + + public boolean hasDestinationFilter() { + return destination != null && !destination.trim().isEmpty(); + } + + public boolean hasStatusFilter() { + return statusList != null && !statusList.isEmpty(); + } + + public boolean hasAnyFilter() { + return hasSogFilter() || hasCogFilter() || hasHeadingFilter() + || hasDestinationFilter() || hasStatusFilter(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/web/dto/AisTargetResponseDto.java b/src/main/java/com/snp/batch/jobs/aistarget/web/dto/AisTargetResponseDto.java new file mode 100644 index 0000000..b7b0c74 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/web/dto/AisTargetResponseDto.java @@ -0,0 +1,87 @@ +package com.snp.batch.jobs.aistarget.web.dto; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.OffsetDateTime; + +/** + * AIS Target API 응답 DTO + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AisTargetResponseDto { + + // 선박 식별 정보 + private Long mmsi; + private Long imo; + private String name; + private String callsign; + private String vesselType; + + // 위치 정보 + private Double lat; + private Double lon; + + // 항해 정보 + private Double heading; + private Double sog; // Speed over Ground + private Double cog; // Course over Ground + private Integer rot; // Rate of Turn + + // 선박 제원 + private Integer length; + private Integer width; + private Double draught; + + // 목적지 정보 + private String destination; + private OffsetDateTime eta; + private String status; + + // 타임스탬프 + private OffsetDateTime messageTimestamp; + private OffsetDateTime receivedDate; + + // 데이터 소스 (캐시/DB) + private String source; + + /** + * Entity -> DTO 변환 + */ + public static AisTargetResponseDto from(AisTargetEntity entity, String source) { + if (entity == null) { + return null; + } + + return AisTargetResponseDto.builder() + .mmsi(entity.getMmsi()) + .imo(entity.getImo()) + .name(entity.getName()) + .callsign(entity.getCallsign()) + .vesselType(entity.getVesselType()) + .lat(entity.getLat()) + .lon(entity.getLon()) + .heading(entity.getHeading()) + .sog(entity.getSog()) + .cog(entity.getCog()) + .rot(entity.getRot()) + .length(entity.getLength()) + .width(entity.getWidth()) + .draught(entity.getDraught()) + .destination(entity.getDestination()) + .eta(entity.getEta()) + .status(entity.getStatus()) + .messageTimestamp(entity.getMessageTimestamp()) + .receivedDate(entity.getReceivedDate()) + .source(source) + .build(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/web/dto/AisTargetSearchRequest.java b/src/main/java/com/snp/batch/jobs/aistarget/web/dto/AisTargetSearchRequest.java new file mode 100644 index 0000000..9e7a7a8 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/web/dto/AisTargetSearchRequest.java @@ -0,0 +1,48 @@ +package com.snp.batch.jobs.aistarget.web.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; + +/** + * AIS Target 검색 요청 DTO + * + * 필수 파라미터: + * - minutes: 분 단위 조회 범위 (1~60) + * + * 옵션 파라미터: + * - centerLon, centerLat, radiusMeters: 공간 범위 필터 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "AIS Target 검색 요청") +public class AisTargetSearchRequest { + + @NotNull(message = "minutes는 필수입니다") + @Min(value = 1, message = "minutes는 최소 1분 이상이어야 합니다") + @Schema(description = "조회 범위 (분)", example = "5", required = true) + private Integer minutes; + + @Schema(description = "중심 경도 (옵션)", example = "129.0") + private Double centerLon; + + @Schema(description = "중심 위도 (옵션)", example = "35.0") + private Double centerLat; + + @Schema(description = "반경 (미터, 옵션)", example = "50000") + private Double radiusMeters; + + /** + * 공간 필터 사용 여부 + */ + public boolean hasAreaFilter() { + return centerLon != null && centerLat != null && radiusMeters != null; + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/web/dto/NumericCondition.java b/src/main/java/com/snp/batch/jobs/aistarget/web/dto/NumericCondition.java new file mode 100644 index 0000000..cca3f54 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/web/dto/NumericCondition.java @@ -0,0 +1,90 @@ +package com.snp.batch.jobs.aistarget.web.dto; + +/** + * 숫자 비교 조건 열거형 + * + * 사용: SOG, COG, Heading 필터링 + */ +public enum NumericCondition { + /** + * 이상 (>=) + */ + GTE, + + /** + * 초과 (>) + */ + GT, + + /** + * 이하 (<=) + */ + LTE, + + /** + * 미만 (<) + */ + LT, + + /** + * 범위 (min <= value <= max) + */ + BETWEEN; + + /** + * 문자열을 NumericCondition으로 변환 + * + * @param value 조건 문자열 + * @return NumericCondition (null이면 null 반환) + */ + public static NumericCondition fromString(String value) { + if (value == null || value.trim().isEmpty()) { + return null; + } + try { + return NumericCondition.valueOf(value.toUpperCase().trim()); + } catch (IllegalArgumentException e) { + return null; + } + } + + /** + * 주어진 값이 조건을 만족하는지 확인 + * + * @param fieldValue 필드 값 + * @param compareValue 비교 값 (GTE, GT, LTE, LT용) + * @param minValue 최소값 (BETWEEN용) + * @param maxValue 최대값 (BETWEEN용) + * @return 조건 만족 여부 + */ + public boolean matches(Double fieldValue, Double compareValue, Double minValue, Double maxValue) { + if (fieldValue == null) { + return false; + } + + return switch (this) { + case GTE -> compareValue != null && fieldValue >= compareValue; + case GT -> compareValue != null && fieldValue > compareValue; + case LTE -> compareValue != null && fieldValue <= compareValue; + case LT -> compareValue != null && fieldValue < compareValue; + case BETWEEN -> minValue != null && maxValue != null + && fieldValue >= minValue && fieldValue <= maxValue; + }; + } + + /** + * SQL 조건절 생성 (DB 쿼리용) + * + * @param columnName 컬럼명 + * @return SQL 조건절 문자열 + */ + public String toSqlCondition(String columnName) { + return switch (this) { + case GTE -> columnName + " >= ?"; + case GT -> columnName + " > ?"; + case LTE -> columnName + " <= ?"; + case LT -> columnName + " < ?"; + case BETWEEN -> columnName + " BETWEEN ? AND ?"; + }; + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/web/service/AisTargetService.java b/src/main/java/com/snp/batch/jobs/aistarget/web/service/AisTargetService.java new file mode 100644 index 0000000..6b1b79f --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/web/service/AisTargetService.java @@ -0,0 +1,384 @@ +package com.snp.batch.jobs.aistarget.web.service; + +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository; +import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager; +import com.snp.batch.jobs.aistarget.cache.AisTargetFilterUtil; +import com.snp.batch.jobs.aistarget.cache.SpatialFilterUtil; +import com.snp.batch.jobs.aistarget.web.dto.AisTargetFilterRequest; +import com.snp.batch.jobs.aistarget.web.dto.AisTargetResponseDto; +import com.snp.batch.jobs.aistarget.web.dto.AisTargetSearchRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.*; +import java.util.stream.Collectors; + +/** + * AIS Target 서비스 + * + * 조회 전략: + * 1. 캐시 우선 조회 (Caffeine 캐시) + * 2. 캐시 미스 시 DB Fallback + * 3. 공간 필터링은 캐시에서 수행 (JTS 기반) + * + * 성능: + * - 캐시 조회: O(1) + * - 공간 필터링: O(n) with 병렬 처리 (25만건 ~50-100ms) + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class AisTargetService { + + private final AisTargetRepository aisTargetRepository; + private final AisTargetCacheManager cacheManager; + private final SpatialFilterUtil spatialFilterUtil; + private final AisTargetFilterUtil filterUtil; + + private static final String SOURCE_CACHE = "cache"; + private static final String SOURCE_DB = "db"; + + // ==================== 단건 조회 ==================== + + /** + * MMSI로 최신 위치 조회 (캐시 우선) + */ + public Optional findLatestByMmsi(Long mmsi) { + log.debug("최신 위치 조회 - MMSI: {}", mmsi); + + // 1. 캐시 조회 + Optional cached = cacheManager.get(mmsi); + if (cached.isPresent()) { + log.debug("캐시 히트 - MMSI: {}", mmsi); + return Optional.of(AisTargetResponseDto.from(cached.get(), SOURCE_CACHE)); + } + + // 2. DB 조회 (캐시 미스) + log.debug("캐시 미스, DB 조회 - MMSI: {}", mmsi); + Optional fromDb = aisTargetRepository.findLatestByMmsi(mmsi); + + if (fromDb.isPresent()) { + // 3. 캐시 업데이트 + cacheManager.put(fromDb.get()); + log.debug("DB 조회 성공, 캐시 업데이트 - MMSI: {}", mmsi); + return Optional.of(AisTargetResponseDto.from(fromDb.get(), SOURCE_DB)); + } + + return Optional.empty(); + } + + // ==================== 다건 조회 ==================== + + /** + * 여러 MMSI의 최신 위치 조회 (캐시 우선) + */ + public List findLatestByMmsiList(List mmsiList) { + if (mmsiList == null || mmsiList.isEmpty()) { + return Collections.emptyList(); + } + + log.debug("다건 최신 위치 조회 - 요청: {} 건", mmsiList.size()); + + List result = new ArrayList<>(); + + // 1. 캐시에서 조회 + Map cachedData = cacheManager.getAll(mmsiList); + for (AisTargetEntity entity : cachedData.values()) { + result.add(AisTargetResponseDto.from(entity, SOURCE_CACHE)); + } + + // 2. 캐시 미스 목록 + List missedMmsiList = mmsiList.stream() + .filter(mmsi -> !cachedData.containsKey(mmsi)) + .collect(Collectors.toList()); + + // 3. DB에서 캐시 미스 데이터 조회 + if (!missedMmsiList.isEmpty()) { + log.debug("캐시 미스 DB 조회 - {} 건", missedMmsiList.size()); + List fromDb = aisTargetRepository.findLatestByMmsiIn(missedMmsiList); + + for (AisTargetEntity entity : fromDb) { + // 캐시 업데이트 + cacheManager.put(entity); + result.add(AisTargetResponseDto.from(entity, SOURCE_DB)); + } + } + + log.debug("조회 완료 - 캐시: {}, DB: {}, 총: {}", + cachedData.size(), result.size() - cachedData.size(), result.size()); + + return result; + } + + // ==================== 검색 조회 (캐시 기반) ==================== + + /** + * 시간 범위 + 옵션 공간 범위로 선박 검색 (캐시 우선) + * + * 전략: + * 1. 캐시에서 시간 범위 내 데이터 조회 + * 2. 공간 필터 있으면 JTS로 필터링 + * 3. 캐시 데이터가 없으면 DB Fallback + */ + public List search(AisTargetSearchRequest request) { + log.debug("선박 검색 - minutes: {}, hasArea: {}", + request.getMinutes(), request.hasAreaFilter()); + + long startTime = System.currentTimeMillis(); + + // 1. 캐시에서 시간 범위 내 데이터 조회 + List entities = cacheManager.getByTimeRange(request.getMinutes()); + String source = SOURCE_CACHE; + + // 캐시가 비어있으면 DB Fallback + if (entities.isEmpty()) { + log.debug("캐시 비어있음, DB Fallback"); + entities = searchFromDb(request); + source = SOURCE_DB; + + // DB 결과를 캐시에 저장 + for (AisTargetEntity entity : entities) { + cacheManager.put(entity); + } + } else if (request.hasAreaFilter()) { + // 2. 공간 필터링 (JTS 기반, 병렬 처리) + entities = spatialFilterUtil.filterByCircle( + entities, + request.getCenterLon(), + request.getCenterLat(), + request.getRadiusMeters() + ); + } + + long elapsed = System.currentTimeMillis() - startTime; + log.info("선박 검색 완료 - 소스: {}, 결과: {} 건, 소요: {}ms", + source, entities.size(), elapsed); + + final String finalSource = source; + return entities.stream() + .map(e -> AisTargetResponseDto.from(e, finalSource)) + .collect(Collectors.toList()); + } + + /** + * DB에서 검색 (Fallback) + */ + private List searchFromDb(AisTargetSearchRequest request) { + OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC); + OffsetDateTime start = now.minusMinutes(request.getMinutes()); + + if (request.hasAreaFilter()) { + return aisTargetRepository.findByTimeRangeAndArea( + start, now, + request.getCenterLon(), + request.getCenterLat(), + request.getRadiusMeters() + ); + } else { + // 공간 필터 없으면 전체 조회 (주의: 대량 데이터) + return aisTargetRepository.findByTimeRangeAndArea( + start, now, + 0.0, 0.0, Double.MAX_VALUE + ); + } + } + + // ==================== 조건 필터 검색 ==================== + + /** + * 항해 조건 필터 검색 (캐시 우선) + * + * 필터 조건: + * - SOG (속도): 이상/초과/이하/미만/범위 + * - COG (침로): 이상/초과/이하/미만/범위 + * - Heading (선수방위): 이상/초과/이하/미만/범위 + * - Destination (목적지): 부분 일치 + * - Status (항행상태): 다중 선택 + * + * @param request 필터 조건 + * @return 조건에 맞는 선박 목록 + */ + public List searchByFilter(AisTargetFilterRequest request) { + log.debug("필터 검색 - minutes: {}, hasFilter: {}", + request.getMinutes(), request.hasAnyFilter()); + + long startTime = System.currentTimeMillis(); + + // 1. 캐시에서 시간 범위 내 데이터 조회 + List entities = cacheManager.getByTimeRange(request.getMinutes()); + String source = SOURCE_CACHE; + + // 캐시가 비어있으면 DB Fallback + if (entities.isEmpty()) { + log.debug("캐시 비어있음, DB Fallback"); + entities = searchByFilterFromDb(request); + source = SOURCE_DB; + + // DB 결과를 캐시에 저장 + for (AisTargetEntity entity : entities) { + cacheManager.put(entity); + } + + // DB에서 가져온 후에도 필터 적용 (DB 쿼리는 시간 범위만 적용) + entities = filterUtil.filter(entities, request); + } else { + // 2. 캐시 데이터에 필터 적용 + entities = filterUtil.filter(entities, request); + } + + long elapsed = System.currentTimeMillis() - startTime; + log.info("필터 검색 완료 - 소스: {}, 결과: {} 건, 소요: {}ms", + source, entities.size(), elapsed); + + final String finalSource = source; + return entities.stream() + .map(e -> AisTargetResponseDto.from(e, finalSource)) + .collect(Collectors.toList()); + } + + /** + * DB에서 필터 검색 (Fallback) - 시간 범위만 적용 + */ + private List searchByFilterFromDb(AisTargetFilterRequest request) { + OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC); + OffsetDateTime start = now.minusMinutes(request.getMinutes()); + + // DB에서는 시간 범위만 조회하고, 나머지 필터는 메모리에서 적용 + return aisTargetRepository.findByTimeRangeAndArea( + start, now, + 0.0, 0.0, Double.MAX_VALUE + ); + } + + // ==================== 폴리곤 검색 ==================== + + /** + * 폴리곤 범위 내 선박 검색 (캐시 기반) + * + * @param minutes 시간 범위 (분) + * @param polygonCoordinates 폴리곤 좌표 [[lon, lat], ...] + * @return 범위 내 선박 목록 + */ + public List searchByPolygon(int minutes, double[][] polygonCoordinates) { + log.debug("폴리곤 검색 - minutes: {}, points: {}", minutes, polygonCoordinates.length); + + long startTime = System.currentTimeMillis(); + + // 1. 캐시에서 시간 범위 내 데이터 조회 + List entities = cacheManager.getByTimeRange(minutes); + + // 2. 폴리곤 필터링 (JTS 기반) + entities = spatialFilterUtil.filterByPolygon(entities, polygonCoordinates); + + long elapsed = System.currentTimeMillis() - startTime; + log.info("폴리곤 검색 완료 - 결과: {} 건, 소요: {}ms", entities.size(), elapsed); + + return entities.stream() + .map(e -> AisTargetResponseDto.from(e, SOURCE_CACHE)) + .collect(Collectors.toList()); + } + + /** + * WKT 형식 폴리곤으로 검색 + * + * @param minutes 시간 범위 (분) + * @param wkt WKT 문자열 (예: "POLYGON((129 35, 130 35, 130 36, 129 36, 129 35))") + * @return 범위 내 선박 목록 + */ + public List searchByWkt(int minutes, String wkt) { + log.debug("WKT 검색 - minutes: {}, wkt: {}", minutes, wkt); + + long startTime = System.currentTimeMillis(); + + // 1. 캐시에서 시간 범위 내 데이터 조회 + List entities = cacheManager.getByTimeRange(minutes); + + // 2. WKT 필터링 (JTS 기반) + entities = spatialFilterUtil.filterByWkt(entities, wkt); + + long elapsed = System.currentTimeMillis() - startTime; + log.info("WKT 검색 완료 - 결과: {} 건, 소요: {}ms", entities.size(), elapsed); + + return entities.stream() + .map(e -> AisTargetResponseDto.from(e, SOURCE_CACHE)) + .collect(Collectors.toList()); + } + + // ==================== 거리 포함 검색 ==================== + + /** + * 원형 범위 검색 + 거리 정보 포함 + */ + public List searchWithDistance( + int minutes, double centerLon, double centerLat, double radiusMeters) { + + log.debug("거리 포함 검색 - minutes: {}, center: ({}, {}), radius: {}", + minutes, centerLon, centerLat, radiusMeters); + + // 1. 캐시에서 시간 범위 내 데이터 조회 + List entities = cacheManager.getByTimeRange(minutes); + + // 2. 거리 포함 필터링 + List filtered = + spatialFilterUtil.filterByCircleWithDistance(entities, centerLon, centerLat, radiusMeters); + + return filtered.stream() + .map(ewd -> new AisTargetWithDistanceDto( + AisTargetResponseDto.from(ewd.getEntity(), SOURCE_CACHE), + ewd.getDistanceMeters() + )) + .collect(Collectors.toList()); + } + + // ==================== 항적 조회 ==================== + + /** + * 특정 MMSI의 시간 범위 내 항적 조회 + */ + public List getTrack(Long mmsi, Integer minutes) { + log.debug("항적 조회 - MMSI: {}, 범위: {}분", mmsi, minutes); + + OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC); + OffsetDateTime start = now.minusMinutes(minutes); + + List track = aisTargetRepository.findByMmsiAndTimeRange(mmsi, start, now); + + log.debug("항적 조회 완료 - MMSI: {}, 포인트: {} 개", mmsi, track.size()); + + return track.stream() + .map(e -> AisTargetResponseDto.from(e, SOURCE_DB)) + .collect(Collectors.toList()); + } + + // ==================== 캐시 관리 ==================== + + /** + * 캐시 통계 조회 + */ + public Map getCacheStats() { + return cacheManager.getStats(); + } + + /** + * 캐시 초기화 + */ + public void clearCache() { + cacheManager.clear(); + } + + // ==================== 내부 DTO ==================== + + /** + * 거리 정보 포함 응답 DTO + */ + @lombok.Data + @lombok.AllArgsConstructor + public static class AisTargetWithDistanceDto { + private AisTargetResponseDto target; + private double distanceMeters; + } +} diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataReader.java b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataReader.java index be6108f..efe4e56 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataReader.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/reader/RiskDataReader.java @@ -35,6 +35,12 @@ public class RiskDataReader extends BaseApiReader { return "riskDataReader"; } + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + } + @Override protected String getApiPath() { return "/RiskAndCompliance/RisksByImos"; diff --git a/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataReader.java b/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataReader.java index ac9efb5..1ba57a9 100644 --- a/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataReader.java +++ b/src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataReader.java @@ -33,7 +33,13 @@ public class ComplianceDataReader extends BaseApiReader { @Override protected String getReaderName() { - return "ShipLastPositionDataReader"; + return "ComplianceDataReader"; + } + + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; } @Override diff --git a/src/main/java/com/snp/batch/jobs/shipMovement/batch/reader/ShipMovementReader.java b/src/main/java/com/snp/batch/jobs/shipMovement/batch/reader/ShipMovementReader.java index c029d5d..f251206 100644 --- a/src/main/java/com/snp/batch/jobs/shipMovement/batch/reader/ShipMovementReader.java +++ b/src/main/java/com/snp/batch/jobs/shipMovement/batch/reader/ShipMovementReader.java @@ -73,6 +73,13 @@ public class ShipMovementReader extends BaseApiReader { return "ShipMovementReader"; } + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + this.dbMasterHashes = null; + } + @Override protected String getApiPath() { return "/Movements"; diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailDataReader.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailDataReader.java index 1176131..ee78b5d 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailDataReader.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailDataReader.java @@ -59,6 +59,13 @@ public class ShipDetailDataReader extends BaseApiReader return "ShipLastPositionDataReader"; } + @Override + protected void resetCustomState() { + this.currentBatchIndex = 0; + this.allImoNumbers = null; + } + @Override protected String getApiPath() { return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced"; diff --git a/src/main/java/com/snp/batch/jobs/shipimport/batch/processor/ShipDataProcessor.java b/src/main/java/com/snp/batch/jobs/shipimport/batch/processor/ShipDataProcessor.java index c5af094..ff5d98a 100644 --- a/src/main/java/com/snp/batch/jobs/shipimport/batch/processor/ShipDataProcessor.java +++ b/src/main/java/com/snp/batch/jobs/shipimport/batch/processor/ShipDataProcessor.java @@ -29,7 +29,7 @@ public class ShipDataProcessor extends BaseProcessor { return null; // 스킵 } - log.debug("선박 데이터 처리 중: IMO {}", item.getImoNumber()); +// log.debug("선박 데이터 처리 중: IMO {}", item.getImoNumber()); // 중복 체크 및 업데이트 return shipRepository.findByImoNumber(item.getImoNumber()) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 05a292d..e57e57e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -17,11 +17,11 @@ spring: jpa: hibernate: ddl-auto: update - show-sql: true + show-sql: false properties: hibernate: dialect: org.hibernate.dialect.PostgreSQLDialect - format_sql: true + format_sql: false default_schema: public # Batch Configuration @@ -55,9 +55,9 @@ spring: # Server Configuration server: - port: 8041 + port: 8081 servlet: - context-path: / + context-path: /snp-api # Actuator Configuration management: @@ -69,18 +69,9 @@ management: health: show-details: always -# Logging Configuration +# Logging Configuration (logback-spring.xml에서 상세 설정) logging: - level: - root: INFO - com.snp.batch: DEBUG - org.springframework.batch: DEBUG - org.springframework.jdbc: DEBUG - pattern: - console: "%d{yyyy-MM-dd HH:mm:ss} - %msg%n" - file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n" - file: - name: logs/snp-batch.log + config: classpath:logback-spring.xml # Custom Application Properties app: @@ -100,3 +91,15 @@ app: schedule: enabled: true cron: "0 0 * * * ?" # Every hour + # AIS Target 배치 설정 + ais-target: + since-seconds: 60 # API 조회 범위 (초) + chunk-size: 5000 # 배치 청크 크기 + schedule: + cron: "15 * * * * ?" # 매 분 15초 실행 + partition: + months-ahead: 2 # 미리 생성할 파티션 개월 수 + # AIS Target 캐시 설정 + ais-target-cache: + ttl-minutes: 120 # 캐시 TTL (분) - 2시간 + max-size: 300000 # 최대 캐시 크기 - 30만 건 diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..e3e9083 --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,234 @@ + + + + + + + + + + + + + + + + + ${LOG_PATTERN_SIMPLE} + UTF-8 + + + + + + ${LOG_PATH}/application.log + + ${LOG_PATTERN} + UTF-8 + + + ${LOG_PATH}/archive/application.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + ${TOTAL_SIZE_CAP} + + + + + + ${LOG_PATH}/batch.log + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{jobName}] %logger{30} - %msg%n + UTF-8 + + + ${LOG_PATH}/archive/batch.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + ${TOTAL_SIZE_CAP} + + + + + + ${LOG_PATH}/api-access.log + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %msg%n + UTF-8 + + + ${LOG_PATH}/archive/api-access.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + ${TOTAL_SIZE_CAP} + + + + + + ${LOG_PATH}/metrics.log + + %d{yyyy-MM-dd HH:mm:ss.SSS} %msg%n + UTF-8 + + + ${LOG_PATH}/archive/metrics.%d{yyyy-MM-dd}.%i.log.gz + 50MB + 7 + 1GB + + + + + + ${LOG_PATH}/error.log + + WARN + + + ${LOG_PATTERN} + UTF-8 + + + ${LOG_PATH}/archive/error.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + 60 + 5GB + + + + + + 512 + 0 + + + + + 256 + 0 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/resources/sql/ais_target_ddl.sql b/src/main/resources/sql/ais_target_ddl.sql new file mode 100644 index 0000000..8e34732 --- /dev/null +++ b/src/main/resources/sql/ais_target_ddl.sql @@ -0,0 +1,449 @@ +-- ============================================ +-- AIS Target 파티션 테이블 DDL +-- ============================================ +-- 용도: 선박 AIS 위치 정보 저장 (항적 분석용) +-- 수집 주기: 매 분 15초 +-- 예상 데이터량: 약 33,000건/분 +-- 파티셔닝: 월별 파티션 (ais_target_YYYY_MM) +-- ============================================ + +-- PostGIS 확장 활성화 (이미 설치되어 있다면 생략) +CREATE EXTENSION IF NOT EXISTS postgis; + +-- ============================================ +-- 1. 부모 테이블 생성 (파티션 테이블) +-- ============================================ +CREATE TABLE IF NOT EXISTS snp_data.ais_target ( + -- ========== PK (복합키) ========== + mmsi BIGINT NOT NULL, + message_timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + + -- ========== 선박 식별 정보 ========== + imo BIGINT, + name VARCHAR(100), + callsign VARCHAR(20), + vessel_type VARCHAR(50), + extra_info VARCHAR(100), + + -- ========== 위치 정보 ========== + lat DOUBLE PRECISION NOT NULL, + lon DOUBLE PRECISION NOT NULL, + geom GEOMETRY(Point, 4326), + + -- ========== 항해 정보 ========== + heading DOUBLE PRECISION, + sog DOUBLE PRECISION, -- Speed over Ground (knots) + cog DOUBLE PRECISION, -- Course over Ground (degrees) + rot INTEGER, -- Rate of Turn (degrees/min) + + -- ========== 선박 제원 ========== + length INTEGER, + width INTEGER, + draught DOUBLE PRECISION, + length_bow INTEGER, + length_stern INTEGER, + width_port INTEGER, + width_starboard INTEGER, + + -- ========== 목적지 정보 ========== + destination VARCHAR(200), + eta TIMESTAMP WITH TIME ZONE, + status VARCHAR(50), + + -- ========== AIS 메시지 정보 ========== + age_minutes DOUBLE PRECISION, + position_accuracy INTEGER, + timestamp_utc INTEGER, + repeat_indicator INTEGER, + raim_flag INTEGER, + radio_status INTEGER, + regional INTEGER, + regional2 INTEGER, + spare INTEGER, + spare2 INTEGER, + ais_version INTEGER, + position_fix_type INTEGER, + dte INTEGER, + band_flag INTEGER, + + -- ========== 타임스탬프 ========== + received_date TIMESTAMP WITH TIME ZONE, + collected_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + + -- ========== 제약조건 ========== + CONSTRAINT pk_ais_target PRIMARY KEY (mmsi, message_timestamp) +) PARTITION BY RANGE (message_timestamp); + +-- ============================================ +-- 2. 초기 파티션 생성 (현재 월 + 다음 월) +-- ============================================ +-- 예: 2025년 12월과 2026년 1월 파티션 +-- 실제 운영 시 create_ais_target_partition 함수로 자동 생성 + +-- 2025년 12월 파티션 +CREATE TABLE IF NOT EXISTS snp_data.ais_target_2025_12 PARTITION OF snp_data.ais_target + FOR VALUES FROM ('2025-12-01 00:00:00+00') TO ('2026-01-01 00:00:00+00'); + +-- 2026년 1월 파티션 +CREATE TABLE IF NOT EXISTS snp_data.ais_target_2026_01 PARTITION OF snp_data.ais_target + FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2026-02-01 00:00:00+00'); + +-- ============================================ +-- 3. 인덱스 생성 (각 파티션에 자동 상속) +-- ============================================ + +-- 1. MMSI 인덱스 (특정 선박 조회) +CREATE INDEX IF NOT EXISTS idx_ais_target_mmsi + ON snp_data.ais_target (mmsi); + +-- 2. IMO 인덱스 (IMO가 있는 선박만) +CREATE INDEX IF NOT EXISTS idx_ais_target_imo + ON snp_data.ais_target (imo) + WHERE imo IS NOT NULL AND imo > 0; + +-- 3. 메시지 타임스탬프 인덱스 (시간 범위 조회) +CREATE INDEX IF NOT EXISTS idx_ais_target_message_timestamp + ON snp_data.ais_target (message_timestamp DESC); + +-- 4. MMSI + 타임스탬프 복합 인덱스 (항적 조회 최적화) +CREATE INDEX IF NOT EXISTS idx_ais_target_mmsi_timestamp + ON snp_data.ais_target (mmsi, message_timestamp DESC); + +-- 5. 공간 인덱스 (GIST) - 공간 쿼리 최적화 +CREATE INDEX IF NOT EXISTS idx_ais_target_geom + ON snp_data.ais_target USING GIST (geom); + +-- 6. 수집 시점 인덱스 (배치 모니터링용) +CREATE INDEX IF NOT EXISTS idx_ais_target_collected_at + ON snp_data.ais_target (collected_at DESC); + +-- ============================================ +-- 4. 파티션 자동 생성 함수 +-- ============================================ + +-- 파티션 존재 여부 확인 함수 +CREATE OR REPLACE FUNCTION snp_data.partition_exists(partition_name TEXT) +RETURNS BOOLEAN AS $$ +BEGIN + RETURN EXISTS ( + SELECT 1 FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'snp_data' + AND c.relname = partition_name + AND c.relkind = 'r' + ); +END; +$$ LANGUAGE plpgsql; + +-- 특정 월의 파티션 생성 함수 +CREATE OR REPLACE FUNCTION snp_data.create_ais_target_partition(target_date DATE) +RETURNS TEXT AS $$ +DECLARE + partition_name TEXT; + start_date DATE; + end_date DATE; + create_sql TEXT; +BEGIN + -- 파티션 이름 생성: ais_target_YYYY_MM + partition_name := 'ais_target_' || TO_CHAR(target_date, 'YYYY_MM'); + + -- 시작/종료 날짜 계산 + start_date := DATE_TRUNC('month', target_date)::DATE; + end_date := (DATE_TRUNC('month', target_date) + INTERVAL '1 month')::DATE; + + -- 이미 존재하면 스킵 + IF snp_data.partition_exists(partition_name) THEN + RAISE NOTICE 'Partition % already exists, skipping', partition_name; + RETURN partition_name || ' (already exists)'; + END IF; + + -- 파티션 생성 SQL + create_sql := format( + 'CREATE TABLE snp_data.%I PARTITION OF snp_data.ais_target FOR VALUES FROM (%L) TO (%L)', + partition_name, + start_date::TIMESTAMP WITH TIME ZONE, + end_date::TIMESTAMP WITH TIME ZONE + ); + + EXECUTE create_sql; + + RAISE NOTICE 'Created partition: % (% to %)', partition_name, start_date, end_date; + + RETURN partition_name; +END; +$$ LANGUAGE plpgsql; + +-- 다음 N개월 파티션 사전 생성 함수 +CREATE OR REPLACE FUNCTION snp_data.create_future_ais_target_partitions(months_ahead INTEGER DEFAULT 2) +RETURNS TABLE (partition_name TEXT, status TEXT) AS $$ +DECLARE + i INTEGER; + target_date DATE; + result TEXT; +BEGIN + FOR i IN 0..months_ahead LOOP + target_date := DATE_TRUNC('month', CURRENT_DATE + (i || ' months')::INTERVAL)::DATE; + result := snp_data.create_ais_target_partition(target_date); + partition_name := 'ais_target_' || TO_CHAR(target_date, 'YYYY_MM'); + status := result; + RETURN NEXT; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +-- ============================================ +-- 5. 오래된 파티션 삭제 함수 +-- ============================================ + +-- 특정 월의 파티션 삭제 함수 +CREATE OR REPLACE FUNCTION snp_data.drop_ais_target_partition(target_date DATE) +RETURNS TEXT AS $$ +DECLARE + partition_name TEXT; + drop_sql TEXT; +BEGIN + partition_name := 'ais_target_' || TO_CHAR(target_date, 'YYYY_MM'); + + -- 존재하지 않으면 스킵 + IF NOT snp_data.partition_exists(partition_name) THEN + RAISE NOTICE 'Partition % does not exist, skipping', partition_name; + RETURN partition_name || ' (not found)'; + END IF; + + drop_sql := format('DROP TABLE snp_data.%I', partition_name); + EXECUTE drop_sql; + + RAISE NOTICE 'Dropped partition: %', partition_name; + + RETURN partition_name || ' (dropped)'; +END; +$$ LANGUAGE plpgsql; + +-- N개월 이전 파티션 정리 함수 +CREATE OR REPLACE FUNCTION snp_data.cleanup_old_ais_target_partitions(retention_months INTEGER DEFAULT 3) +RETURNS TABLE (partition_name TEXT, status TEXT) AS $$ +DECLARE + rec RECORD; + partition_date DATE; + cutoff_date DATE; +BEGIN + cutoff_date := DATE_TRUNC('month', CURRENT_DATE - (retention_months || ' months')::INTERVAL)::DATE; + + -- ais_target_YYYY_MM 패턴의 파티션 조회 + FOR rec IN + SELECT c.relname + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN pg_inherits i ON i.inhrelid = c.oid + WHERE n.nspname = 'snp_data' + AND c.relname LIKE 'ais_target_%' + AND c.relkind = 'r' + ORDER BY c.relname + LOOP + -- 파티션 이름에서 날짜 추출 (ais_target_YYYY_MM) + BEGIN + partition_date := TO_DATE(SUBSTRING(rec.relname FROM 'ais_target_(\d{4}_\d{2})'), 'YYYY_MM'); + + IF partition_date < cutoff_date THEN + EXECUTE format('DROP TABLE snp_data.%I', rec.relname); + partition_name := rec.relname; + status := 'dropped'; + RETURN NEXT; + RAISE NOTICE 'Dropped old partition: %', rec.relname; + END IF; + EXCEPTION WHEN OTHERS THEN + -- 날짜 파싱 실패 시 스킵 + CONTINUE; + END; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +-- 파티션별 통계 조회 함수 +CREATE OR REPLACE FUNCTION snp_data.ais_target_partition_stats() +RETURNS TABLE ( + partition_name TEXT, + row_count BIGINT, + size_bytes BIGINT, + size_pretty TEXT +) AS $$ +BEGIN + RETURN QUERY + SELECT + c.relname::TEXT as partition_name, + (SELECT COUNT(*)::BIGINT FROM snp_data.ais_target WHERE message_timestamp >= + TO_DATE(SUBSTRING(c.relname FROM 'ais_target_(\d{4}_\d{2})'), 'YYYY_MM') + AND message_timestamp < + TO_DATE(SUBSTRING(c.relname FROM 'ais_target_(\d{4}_\d{2})'), 'YYYY_MM') + INTERVAL '1 month' + ) as row_count, + pg_relation_size(c.oid) as size_bytes, + pg_size_pretty(pg_relation_size(c.oid)) as size_pretty + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN pg_inherits i ON i.inhrelid = c.oid + WHERE n.nspname = 'snp_data' + AND c.relname LIKE 'ais_target_%' + AND c.relkind = 'r' + ORDER BY c.relname; +END; +$$ LANGUAGE plpgsql; + +-- ============================================ +-- 6. 코멘트 +-- ============================================ + +COMMENT ON TABLE snp_data.ais_target IS 'AIS 선박 위치 정보 (매 분 15초 수집, 월별 파티션)'; + +COMMENT ON COLUMN snp_data.ais_target.mmsi IS 'Maritime Mobile Service Identity (복합 PK)'; +COMMENT ON COLUMN snp_data.ais_target.message_timestamp IS 'AIS 메시지 발생 시간 (복합 PK, 파티션 키)'; +COMMENT ON COLUMN snp_data.ais_target.imo IS 'IMO 선박 번호'; +COMMENT ON COLUMN snp_data.ais_target.geom IS 'PostGIS Point geometry (SRID 4326 - WGS84)'; +COMMENT ON COLUMN snp_data.ais_target.sog IS 'Speed over Ground (대지속력, knots)'; +COMMENT ON COLUMN snp_data.ais_target.cog IS 'Course over Ground (대지침로, degrees)'; +COMMENT ON COLUMN snp_data.ais_target.rot IS 'Rate of Turn (선회율, degrees/min)'; +COMMENT ON COLUMN snp_data.ais_target.heading IS '선수 방향 (degrees)'; +COMMENT ON COLUMN snp_data.ais_target.draught IS '흘수 (meters)'; +COMMENT ON COLUMN snp_data.ais_target.collected_at IS '배치 수집 시점'; +COMMENT ON COLUMN snp_data.ais_target.received_date IS 'API 수신 시간'; + +-- ============================================ +-- 유지보수용 함수: 오래된 데이터 정리 +-- ============================================ + +-- 오래된 데이터 삭제 함수 (기본: 7일 이전) +CREATE OR REPLACE FUNCTION snp_data.cleanup_ais_target(retention_days INTEGER DEFAULT 7) +RETURNS INTEGER AS $$ +DECLARE + deleted_count INTEGER; +BEGIN + DELETE FROM snp_data.ais_target + WHERE message_timestamp < NOW() - (retention_days || ' days')::INTERVAL; + + GET DIAGNOSTICS deleted_count = ROW_COUNT; + + RAISE NOTICE 'Deleted % rows older than % days', deleted_count, retention_days; + + RETURN deleted_count; +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION snp_data.create_ais_target_partition IS '특정 월의 AIS Target 파티션 생성'; +COMMENT ON FUNCTION snp_data.create_future_ais_target_partitions IS '향후 N개월 파티션 사전 생성'; +COMMENT ON FUNCTION snp_data.drop_ais_target_partition IS '특정 월의 파티션 삭제'; +COMMENT ON FUNCTION snp_data.cleanup_old_ais_target_partitions IS 'N개월 이전 파티션 정리'; +COMMENT ON FUNCTION snp_data.ais_target_partition_stats IS '파티션별 통계 조회'; + +-- ============================================ +-- 7. 유지보수용 함수: 통계 조회 +-- ============================================ + +CREATE OR REPLACE FUNCTION snp_data.ais_target_stats() +RETURNS TABLE ( + total_count BIGINT, + unique_mmsi_count BIGINT, + unique_imo_count BIGINT, + oldest_record TIMESTAMP WITH TIME ZONE, + newest_record TIMESTAMP WITH TIME ZONE, + last_hour_count BIGINT +) AS $$ +BEGIN + RETURN QUERY + SELECT + COUNT(*)::BIGINT as total_count, + COUNT(DISTINCT mmsi)::BIGINT as unique_mmsi_count, + COUNT(DISTINCT imo) FILTER (WHERE imo IS NOT NULL AND imo > 0)::BIGINT as unique_imo_count, + MIN(message_timestamp) as oldest_record, + MAX(message_timestamp) as newest_record, + COUNT(*) FILTER (WHERE message_timestamp > NOW() - INTERVAL '1 hour')::BIGINT as last_hour_count + FROM snp_data.ais_target; +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION snp_data.ais_target_stats IS 'AIS Target 테이블 통계 조회'; + +-- ============================================ +-- 예시 쿼리 +-- ============================================ + +-- 1. 특정 MMSI의 최신 위치 조회 +-- SELECT * FROM snp_data.ais_target WHERE mmsi = 123456789 ORDER BY message_timestamp DESC LIMIT 1; + +-- 2. 특정 시간 범위의 항적 조회 +-- SELECT * FROM snp_data.ais_target +-- WHERE mmsi = 123456789 +-- AND message_timestamp BETWEEN '2025-12-01 00:00:00+00' AND '2025-12-01 01:00:00+00' +-- ORDER BY message_timestamp; + +-- 3. 특정 구역(원형) 내 선박 조회 +-- SELECT DISTINCT ON (mmsi) * +-- FROM snp_data.ais_target +-- WHERE message_timestamp > NOW() - INTERVAL '1 hour' +-- AND ST_DWithin( +-- geom::geography, +-- ST_SetSRID(ST_MakePoint(129.0, 35.0), 4326)::geography, +-- 50000 -- 50km 반경 +-- ) +-- ORDER BY mmsi, message_timestamp DESC; + +-- 4. LineString 항적 생성 +-- SELECT mmsi, ST_MakeLine(geom ORDER BY message_timestamp) as track +-- FROM snp_data.ais_target +-- WHERE mmsi = 123456789 +-- AND message_timestamp BETWEEN '2025-12-01 00:00:00+00' AND '2025-12-01 01:00:00+00' +-- GROUP BY mmsi; + +-- 5. 다음 3개월 파티션 미리 생성 +-- SELECT * FROM snp_data.create_future_ais_target_partitions(3); + +-- 6. 특정 월 파티션 생성 +-- SELECT snp_data.create_ais_target_partition('2026-03-01'); + +-- 7. 3개월 이전 파티션 정리 +-- SELECT * FROM snp_data.cleanup_old_ais_target_partitions(3); + +-- 8. 파티션별 통계 조회 +-- SELECT * FROM snp_data.ais_target_partition_stats(); + +-- 9. 전체 통계 조회 +-- SELECT * FROM snp_data.ais_target_stats(); + +-- ============================================ +-- Job Schedule 등록 +-- ============================================ + +-- 1. aisTargetImportJob: 매 분 15초에 실행 +INSERT INTO public.job_schedule (job_name, cron_expression, description, active, created_at, updated_at, created_by, updated_by) +VALUES ( + 'aisTargetImportJob', + '15 * * * * ?', + 'AIS Target 위치 정보 수집 (S&P Global API) - 매 분 15초 실행', + true, + NOW(), + NOW(), + 'SYSTEM', + 'SYSTEM' +) ON CONFLICT (job_name) DO UPDATE SET + cron_expression = EXCLUDED.cron_expression, + description = EXCLUDED.description, + active = EXCLUDED.active, + updated_at = NOW(); + +-- 2. partitionManagerJob: 매일 00:10에 실행 +-- Daily 파티션: 매일 생성, Monthly 파티션: 말일에만 생성 (Job 내부에서 분기) +INSERT INTO public.job_schedule (job_name, cron_expression, description, active, created_at, updated_at, created_by, updated_by) +VALUES ( + 'partitionManagerJob', + '0 10 0 * * ?', + '파티션 관리 - 매일 00:10 실행 (Daily: 매일, Monthly: 말일만)', + true, + NOW(), + NOW(), + 'SYSTEM', + 'SYSTEM' +) ON CONFLICT (job_name) DO UPDATE SET + cron_expression = EXCLUDED.cron_expression, + description = EXCLUDED.description, + active = EXCLUDED.active, + updated_at = NOW();