Merge branch 'ais/ship_position' into dev_ship_detail

# Conflicts:
#	src/main/java/com/snp/batch/jobs/sanction/batch/reader/ComplianceDataReader.java
#	src/main/resources/application.yml
This commit is contained in:
hyojin kim 2025-12-02 19:10:15 +09:00
커밋 44ae82e2fa
36개의 변경된 파일4219개의 추가작업 그리고 37개의 파일을 삭제

14
pom.xml
파일 보기

@ -111,6 +111,20 @@
<version>2.3.0</version> <version>2.3.0</version>
</dependency> </dependency>
<!-- Caffeine Cache -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.8</version>
</dependency>
<!-- JTS (Java Topology Suite) - 공간 연산 라이브러리 -->
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>1.19.0</version>
</dependency>
<!-- Test Dependencies --> <!-- Test Dependencies -->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

파일 보기

@ -0,0 +1,149 @@
package com.snp.batch.api.logging;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import org.springframework.web.util.ContentCachingRequestWrapper;
import org.springframework.web.util.ContentCachingResponseWrapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* API 요청/응답 로깅 필터
*
* 로그 파일: logs/api-access.log
* 기록 내용: 요청 IP, HTTP Method, URI, 파라미터, 응답 상태, 처리 시간
*/
@Slf4j
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class ApiAccessLoggingFilter extends OncePerRequestFilter {
private static final int MAX_PAYLOAD_LENGTH = 1000;
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
// 정적 리소스 actuator 제외
String uri = request.getRequestURI();
if (shouldSkip(uri)) {
filterChain.doFilter(request, response);
return;
}
// 요청 래핑 (body 읽기용)
ContentCachingRequestWrapper requestWrapper = new ContentCachingRequestWrapper(request);
ContentCachingResponseWrapper responseWrapper = new ContentCachingResponseWrapper(response);
String requestId = UUID.randomUUID().toString().substring(0, 8);
long startTime = System.currentTimeMillis();
try {
filterChain.doFilter(requestWrapper, responseWrapper);
} finally {
long duration = System.currentTimeMillis() - startTime;
logRequest(requestId, requestWrapper, responseWrapper, duration);
responseWrapper.copyBodyToResponse();
}
}
private boolean shouldSkip(String uri) {
return uri.startsWith("/actuator")
|| uri.startsWith("/css")
|| uri.startsWith("/js")
|| uri.startsWith("/images")
|| uri.startsWith("/favicon")
|| uri.endsWith(".html")
|| uri.endsWith(".css")
|| uri.endsWith(".js")
|| uri.endsWith(".ico");
}
private void logRequest(String requestId,
ContentCachingRequestWrapper request,
ContentCachingResponseWrapper response,
long duration) {
String clientIp = getClientIp(request);
String method = request.getMethod();
String uri = request.getRequestURI();
String queryString = request.getQueryString();
int status = response.getStatus();
StringBuilder logMessage = new StringBuilder();
logMessage.append(String.format("[%s] %s %s %s",
requestId, clientIp, method, uri));
// Query String
if (queryString != null && !queryString.isEmpty()) {
logMessage.append("?").append(truncate(queryString, 200));
}
// Request Body (POST/PUT/PATCH)
if (isBodyRequest(method)) {
String body = getRequestBody(request);
if (!body.isEmpty()) {
logMessage.append(" | body=").append(truncate(body, MAX_PAYLOAD_LENGTH));
}
}
// Response
logMessage.append(String.format(" | status=%d | %dms", status, duration));
// 상태에 따른 로그 레벨
if (status >= 500) {
log.error(logMessage.toString());
} else if (status >= 400) {
log.warn(logMessage.toString());
} else {
log.info(logMessage.toString());
}
}
private String getClientIp(HttpServletRequest request) {
String ip = request.getHeader("X-Forwarded-For");
if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("X-Real-IP");
}
if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
// 여러 IP가 있는 경우 번째만
if (ip != null && ip.contains(",")) {
ip = ip.split(",")[0].trim();
}
return ip;
}
private boolean isBodyRequest(String method) {
return "POST".equalsIgnoreCase(method)
|| "PUT".equalsIgnoreCase(method)
|| "PATCH".equalsIgnoreCase(method);
}
private String getRequestBody(ContentCachingRequestWrapper request) {
byte[] content = request.getContentAsByteArray();
if (content.length == 0) {
return "";
}
return new String(content, StandardCharsets.UTF_8)
.replaceAll("\\s+", " ")
.trim();
}
private String truncate(String str, int maxLength) {
if (str == null) return "";
if (str.length() <= maxLength) return str;
return str.substring(0, maxLength) + "...";
}
}

파일 보기

@ -55,7 +55,7 @@ public abstract class BaseProcessor<I, O> implements ItemProcessor<I, O> {
return null; return null;
} }
log.debug("데이터 처리 중: {}", item); // log.debug("데이터 처리 중: {}", item);
return processItem(item); return processItem(item);
} }
} }

파일 보기

@ -98,6 +98,9 @@ public abstract class BaseApiReader<T> implements ItemReader<T> {
public void saveApiInfoToContext(StepExecution stepExecution) { public void saveApiInfoToContext(StepExecution stepExecution) {
this.stepExecution = stepExecution; this.stepExecution = stepExecution;
// Reader 상태 초기화 (Job 재실행 필수)
resetReaderState();
// API 정보를 StepExecutionContext에 저장 // API 정보를 StepExecutionContext에 저장
ExecutionContext context = stepExecution.getExecutionContext(); ExecutionContext context = stepExecution.getExecutionContext();
@ -140,6 +143,48 @@ public abstract class BaseApiReader<T> implements ItemReader<T> {
return ""; return "";
} }
/**
* Reader 상태 초기화
* Job 재실행 이전 실행의 상태를 클리어하여 새로 데이터를 읽을 있도록
*/
private void resetReaderState() {
// Chunk 모드 상태 초기화
this.currentBatch = null;
this.initialized = false;
// Legacy 모드 상태 초기화
this.legacyDataList = null;
this.legacyNextIndex = 0;
// 통계 초기화
this.totalApiCalls = 0;
this.completedApiCalls = 0;
// 하위 클래스 상태 초기화 호출
resetCustomState();
log.debug("[{}] Reader 상태 초기화 완료", getReaderName());
}
/**
* 하위 클래스 커스텀 상태 초기화
* Chunk 모드에서 사용하는 currentBatchIndex, allImoNumbers 등의 필드를 초기화할 오버라이드
*
* 예시:
* <pre>
* @Override
* protected void resetCustomState() {
* this.currentBatchIndex = 0;
* this.allImoNumbers = null;
* this.dbMasterHashes = null;
* }
* </pre>
*/
protected void resetCustomState() {
// 기본 구현: 아무것도 하지 않음
// 하위 클래스에서 필요 오버라이드
}
/** /**
* API 호출 통계 업데이트 * API 호출 통계 업데이트
*/ */

파일 보기

@ -64,7 +64,7 @@ public class MaritimeApiWebClientConfig {
.defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword))
.codecs(configurer -> configurer .codecs(configurer -> configurer
.defaultCodecs() .defaultCodecs()
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 .maxInMemorySize(30 * 1024 * 1024)) // 30MB 버퍼
.build(); .build();
} }
@ -80,7 +80,7 @@ public class MaritimeApiWebClientConfig {
.defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword))
.codecs(configurer -> configurer .codecs(configurer -> configurer
.defaultCodecs() .defaultCodecs()
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 .maxInMemorySize(50 * 1024 * 1024)) // 50MB 버퍼 (AIS GetTargets 응답 ~20MB+)
.build(); .build();
} }
@ -96,7 +96,7 @@ public class MaritimeApiWebClientConfig {
.defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword)) .defaultHeaders(headers -> headers.setBasicAuth(maritimeApiUsername, maritimeApiPassword))
.codecs(configurer -> configurer .codecs(configurer -> configurer
.defaultCodecs() .defaultCodecs()
.maxInMemorySize(20 * 1024 * 1024)) // 20MB 버퍼 .maxInMemorySize(30 * 1024 * 1024)) // 30MB 버퍼
.build(); .build();
} }
} }

파일 보기

@ -0,0 +1,50 @@
package com.snp.batch.global.partition;
import lombok.Getter;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 파티션 관리 대상 테이블 설정
*
* Daily 파티션: 매일 실행
* Monthly 파티션: 매월 말일에만 실행
*/
@Getter
@Component
public class PartitionConfig {
/**
* Daily 파티션 대상 테이블 (파티션 네이밍: {table}_YYYY_MM_DD)
*/
private final List<PartitionTableInfo> dailyPartitionTables = List.of(
// 추후 daily 파티션 테이블 추가
);
/**
* Monthly 파티션 대상 테이블 (파티션 네이밍: {table}_YYYY_MM)
*/
private final List<PartitionTableInfo> monthlyPartitionTables = List.of(
new PartitionTableInfo(
"snp_data",
"ais_target",
"message_timestamp",
2 // 미리 생성할 개월
)
);
/**
* 파티션 테이블 정보
*/
public record PartitionTableInfo(
String schema,
String tableName,
String partitionColumn,
int periodsAhead // 미리 생성할 기간 (daily: , monthly: )
) {
public String getFullTableName() {
return schema + "." + tableName;
}
}
}

파일 보기

@ -0,0 +1,68 @@
package com.snp.batch.global.partition;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
/**
* 파티션 관리 Job Config
*
* 스케줄: 매일 00:10 (0 10 0 * * ?)
*
* 동작:
* - Daily 파티션: 매일 실행
* - Monthly 파티션: 매월 말일에만 실행 (Job 내부에서 말일 감지)
*/
@Slf4j
@Configuration
public class PartitionManagerJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final PartitionManagerTasklet partitionManagerTasklet;
public PartitionManagerJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
PartitionManagerTasklet partitionManagerTasklet) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
this.partitionManagerTasklet = partitionManagerTasklet;
}
@Bean(name = "partitionManagerStep")
public Step partitionManagerStep() {
return new StepBuilder("partitionManagerStep", jobRepository)
.tasklet(partitionManagerTasklet, transactionManager)
.build();
}
@Bean(name = "partitionManagerJob")
public Job partitionManagerJob() {
log.info("Job 생성: partitionManagerJob");
return new JobBuilder("partitionManagerJob", jobRepository)
.listener(new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("[partitionManagerJob] 파티션 관리 Job 시작");
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("[partitionManagerJob] 파티션 관리 Job 완료 - 상태: {}",
jobExecution.getStatus());
}
})
.start(partitionManagerStep())
.build();
}
}

파일 보기

@ -0,0 +1,220 @@
package com.snp.batch.global.partition;
import com.snp.batch.global.partition.PartitionConfig.PartitionTableInfo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.YearMonth;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
/**
* 파티션 관리 Tasklet
*
* 스케줄: 매일 실행
* - Daily 파티션: 매일 생성
* - Monthly 파티션: 매월 말일에만 생성
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class PartitionManagerTasklet implements Tasklet {
private final JdbcTemplate jdbcTemplate;
private final PartitionConfig partitionConfig;
private static final String PARTITION_EXISTS_SQL = """
SELECT EXISTS (
SELECT 1 FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = ?
AND c.relname = ?
AND c.relkind = 'r'
)
""";
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
LocalDate today = LocalDate.now();
boolean isLastDayOfMonth = isLastDayOfMonth(today);
log.info("========================================");
log.info("파티션 관리 Job 시작");
log.info("실행 일자: {}", today);
log.info("월 말일 여부: {}", isLastDayOfMonth);
log.info("========================================");
// Daily 파티션 처리 (매일)
processDailyPartitions(today);
// Monthly 파티션 처리 (매월 말일만)
if (isLastDayOfMonth) {
processMonthlyPartitions(today);
} else {
log.info("Monthly 파티션: 말일이 아니므로 스킵");
}
log.info("========================================");
log.info("파티션 관리 Job 완료");
log.info("========================================");
return RepeatStatus.FINISHED;
}
/**
* 매월 말일 여부 확인
*/
private boolean isLastDayOfMonth(LocalDate date) {
return date.getDayOfMonth() == YearMonth.from(date).lengthOfMonth();
}
/**
* Daily 파티션 처리
*/
private void processDailyPartitions(LocalDate today) {
List<PartitionTableInfo> tables = partitionConfig.getDailyPartitionTables();
if (tables.isEmpty()) {
log.info("Daily 파티션: 대상 테이블 없음");
return;
}
log.info("Daily 파티션 처리 시작: {} 개 테이블", tables.size());
for (PartitionTableInfo table : tables) {
processDailyPartition(table, today);
}
}
/**
* 개별 Daily 파티션 생성
*/
private void processDailyPartition(PartitionTableInfo table, LocalDate today) {
List<String> created = new ArrayList<>();
List<String> skipped = new ArrayList<>();
for (int i = 0; i <= table.periodsAhead(); i++) {
LocalDate targetDate = today.plusDays(i);
String partitionName = getDailyPartitionName(table.tableName(), targetDate);
if (partitionExists(table.schema(), partitionName)) {
skipped.add(partitionName);
} else {
createDailyPartition(table, targetDate, partitionName);
created.add(partitionName);
}
}
log.info("[{}] Daily 파티션 - 생성: {}, 스킵: {}",
table.tableName(), created.size(), skipped.size());
}
/**
* Monthly 파티션 처리
*/
private void processMonthlyPartitions(LocalDate today) {
List<PartitionTableInfo> tables = partitionConfig.getMonthlyPartitionTables();
if (tables.isEmpty()) {
log.info("Monthly 파티션: 대상 테이블 없음");
return;
}
log.info("Monthly 파티션 처리 시작: {} 개 테이블", tables.size());
for (PartitionTableInfo table : tables) {
processMonthlyPartition(table, today);
}
}
/**
* 개별 Monthly 파티션 생성
*/
private void processMonthlyPartition(PartitionTableInfo table, LocalDate today) {
List<String> created = new ArrayList<>();
List<String> skipped = new ArrayList<>();
for (int i = 0; i <= table.periodsAhead(); i++) {
LocalDate targetDate = today.plusMonths(i).withDayOfMonth(1);
String partitionName = getMonthlyPartitionName(table.tableName(), targetDate);
if (partitionExists(table.schema(), partitionName)) {
skipped.add(partitionName);
} else {
createMonthlyPartition(table, targetDate, partitionName);
created.add(partitionName);
}
}
log.info("[{}] Monthly 파티션 - 생성: {}, 스킵: {}",
table.tableName(), created.size(), skipped.size());
if (!created.isEmpty()) {
log.info("[{}] 생성된 파티션: {}", table.tableName(), created);
}
}
/**
* Daily 파티션 이름 생성 (table_YYYY_MM_DD)
*/
private String getDailyPartitionName(String tableName, LocalDate date) {
return tableName + "_" + date.format(DateTimeFormatter.ofPattern("yyyy_MM_dd"));
}
/**
* Monthly 파티션 이름 생성 (table_YYYY_MM)
*/
private String getMonthlyPartitionName(String tableName, LocalDate date) {
return tableName + "_" + date.format(DateTimeFormatter.ofPattern("yyyy_MM"));
}
/**
* 파티션 존재 여부 확인
*/
private boolean partitionExists(String schema, String partitionName) {
Boolean exists = jdbcTemplate.queryForObject(PARTITION_EXISTS_SQL, Boolean.class, schema, partitionName);
return Boolean.TRUE.equals(exists);
}
/**
* Daily 파티션 생성
*/
private void createDailyPartition(PartitionTableInfo table, LocalDate targetDate, String partitionName) {
LocalDate endDate = targetDate.plusDays(1);
String sql = String.format("""
CREATE TABLE %s.%s PARTITION OF %s
FOR VALUES FROM ('%s 00:00:00+00') TO ('%s 00:00:00+00')
""",
table.schema(), partitionName, table.getFullTableName(),
targetDate, endDate);
jdbcTemplate.execute(sql);
log.debug("Daily 파티션 생성: {}", partitionName);
}
/**
* Monthly 파티션 생성
*/
private void createMonthlyPartition(PartitionTableInfo table, LocalDate targetDate, String partitionName) {
LocalDate startDate = targetDate.withDayOfMonth(1);
LocalDate endDate = startDate.plusMonths(1);
String sql = String.format("""
CREATE TABLE %s.%s PARTITION OF %s
FOR VALUES FROM ('%s 00:00:00+00') TO ('%s 00:00:00+00')
""",
table.schema(), partitionName, table.getFullTableName(),
startDate, endDate);
jdbcTemplate.execute(sql);
log.debug("Monthly 파티션 생성: {}", partitionName);
}
}

파일 보기

@ -1,6 +1,6 @@
package com.snp.batch.global.repository; package com.snp.batch.global.repository;
import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
@ -13,10 +13,25 @@ import java.util.Map;
* Step Context 불필요한 데이터를 조회하지 않고 필요한 정보만 가져옴 * Step Context 불필요한 데이터를 조회하지 않고 필요한 정보만 가져옴
*/ */
@Repository @Repository
@RequiredArgsConstructor
public class TimelineRepository { public class TimelineRepository {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final String tablePrefix;
public TimelineRepository(
JdbcTemplate jdbcTemplate,
@Value("${spring.batch.jdbc.table-prefix:BATCH_}") String tablePrefix) {
this.jdbcTemplate = jdbcTemplate;
this.tablePrefix = tablePrefix;
}
private String getJobExecutionTable() {
return tablePrefix + "JOB_EXECUTION";
}
private String getJobInstanceTable() {
return tablePrefix + "JOB_INSTANCE";
}
/** /**
* 특정 Job의 특정 범위 실행 이력 조회 (경량) * 특정 Job의 특정 범위 실행 이력 조회 (경량)
@ -27,19 +42,19 @@ public class TimelineRepository {
LocalDateTime startTime, LocalDateTime startTime,
LocalDateTime endTime) { LocalDateTime endTime) {
String sql = """ String sql = String.format("""
SELECT SELECT
je.JOB_EXECUTION_ID as executionId, je.JOB_EXECUTION_ID as executionId,
je.STATUS as status, je.STATUS as status,
je.START_TIME as startTime, je.START_TIME as startTime,
je.END_TIME as endTime je.END_TIME as endTime
FROM BATCH_JOB_EXECUTION je FROM %s je
INNER JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID INNER JOIN %s ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE ji.JOB_NAME = ? WHERE ji.JOB_NAME = ?
AND je.START_TIME >= ? AND je.START_TIME >= ?
AND je.START_TIME < ? AND je.START_TIME < ?
ORDER BY je.START_TIME DESC ORDER BY je.START_TIME DESC
"""; """, getJobExecutionTable(), getJobInstanceTable());
return jdbcTemplate.queryForList(sql, jobName, startTime, endTime); return jdbcTemplate.queryForList(sql, jobName, startTime, endTime);
} }
@ -51,19 +66,19 @@ public class TimelineRepository {
LocalDateTime startTime, LocalDateTime startTime,
LocalDateTime endTime) { LocalDateTime endTime) {
String sql = """ String sql = String.format("""
SELECT SELECT
ji.JOB_NAME as jobName, ji.JOB_NAME as jobName,
je.JOB_EXECUTION_ID as executionId, je.JOB_EXECUTION_ID as executionId,
je.STATUS as status, je.STATUS as status,
je.START_TIME as startTime, je.START_TIME as startTime,
je.END_TIME as endTime je.END_TIME as endTime
FROM BATCH_JOB_EXECUTION je FROM %s je
INNER JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID INNER JOIN %s ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE je.START_TIME >= ? WHERE je.START_TIME >= ?
AND je.START_TIME < ? AND je.START_TIME < ?
ORDER BY ji.JOB_NAME, je.START_TIME DESC ORDER BY ji.JOB_NAME, je.START_TIME DESC
"""; """, getJobExecutionTable(), getJobInstanceTable());
return jdbcTemplate.queryForList(sql, startTime, endTime); return jdbcTemplate.queryForList(sql, startTime, endTime);
} }
@ -72,17 +87,17 @@ public class TimelineRepository {
* 현재 실행 중인 Job 조회 (STARTED, STARTING 상태) * 현재 실행 중인 Job 조회 (STARTED, STARTING 상태)
*/ */
public List<Map<String, Object>> findRunningExecutions() { public List<Map<String, Object>> findRunningExecutions() {
String sql = """ String sql = String.format("""
SELECT SELECT
ji.JOB_NAME as jobName, ji.JOB_NAME as jobName,
je.JOB_EXECUTION_ID as executionId, je.JOB_EXECUTION_ID as executionId,
je.STATUS as status, je.STATUS as status,
je.START_TIME as startTime je.START_TIME as startTime
FROM BATCH_JOB_EXECUTION je FROM %s je
INNER JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID INNER JOIN %s ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE je.STATUS IN ('STARTED', 'STARTING') WHERE je.STATUS IN ('STARTED', 'STARTING')
ORDER BY je.START_TIME DESC ORDER BY je.START_TIME DESC
"""; """, getJobExecutionTable(), getJobInstanceTable());
return jdbcTemplate.queryForList(sql); return jdbcTemplate.queryForList(sql);
} }
@ -91,18 +106,18 @@ public class TimelineRepository {
* 최근 실행 이력 조회 (상위 N개) * 최근 실행 이력 조회 (상위 N개)
*/ */
public List<Map<String, Object>> findRecentExecutions(int limit) { public List<Map<String, Object>> findRecentExecutions(int limit) {
String sql = """ String sql = String.format("""
SELECT SELECT
ji.JOB_NAME as jobName, ji.JOB_NAME as jobName,
je.JOB_EXECUTION_ID as executionId, je.JOB_EXECUTION_ID as executionId,
je.STATUS as status, je.STATUS as status,
je.START_TIME as startTime, je.START_TIME as startTime,
je.END_TIME as endTime je.END_TIME as endTime
FROM BATCH_JOB_EXECUTION je FROM %s je
INNER JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID INNER JOIN %s ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
ORDER BY je.START_TIME DESC ORDER BY je.START_TIME DESC
LIMIT ? LIMIT ?
"""; """, getJobExecutionTable(), getJobInstanceTable());
return jdbcTemplate.queryForList(sql, limit); return jdbcTemplate.queryForList(sql, limit);
} }

파일 보기

@ -0,0 +1,122 @@
package com.snp.batch.jobs.aistarget.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import com.snp.batch.jobs.aistarget.batch.processor.AisTargetDataProcessor;
import com.snp.batch.jobs.aistarget.batch.reader.AisTargetDataReader;
import com.snp.batch.jobs.aistarget.batch.writer.AisTargetDataWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.OffsetDateTime;
/**
* AIS Target Import Job Config
*
* 스케줄: 15초 (0 15 * * * * ?)
* API: POST /AisSvc.svc/AIS/GetTargets
* 파라미터: {"sinceSeconds": "60"}
*
* 동작:
* - 최근 60초 동안의 전체 선박 위치 정보 수집
* - 33,000건/ 처리
* - UPSERT 방식으로 DB 저장
*/
@Slf4j
@Configuration
public class AisTargetImportJobConfig extends BaseJobConfig<AisTargetDto, AisTargetEntity> {
private final AisTargetDataProcessor aisTargetDataProcessor;
private final AisTargetDataWriter aisTargetDataWriter;
private final WebClient maritimeAisApiWebClient;
@Value("${app.batch.ais-target.since-seconds:60}")
private int sinceSeconds;
@Value("${app.batch.ais-target.chunk-size:5000}")
private int chunkSize;
public AisTargetImportJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
AisTargetDataProcessor aisTargetDataProcessor,
AisTargetDataWriter aisTargetDataWriter,
@Qualifier("maritimeAisApiWebClient") WebClient maritimeAisApiWebClient) {
super(jobRepository, transactionManager);
this.aisTargetDataProcessor = aisTargetDataProcessor;
this.aisTargetDataWriter = aisTargetDataWriter;
this.maritimeAisApiWebClient = maritimeAisApiWebClient;
}
@Override
protected String getJobName() {
return "aisTargetImportJob";
}
@Override
protected String getStepName() {
return "aisTargetImportStep";
}
@Override
protected ItemReader<AisTargetDto> createReader() {
return new AisTargetDataReader(maritimeAisApiWebClient, sinceSeconds);
}
@Override
protected ItemProcessor<AisTargetDto, AisTargetEntity> createProcessor() {
return aisTargetDataProcessor;
}
@Override
protected ItemWriter<AisTargetEntity> createWriter() {
return aisTargetDataWriter;
}
@Override
protected int getChunkSize() {
return chunkSize;
}
@Override
protected void configureJob(JobBuilder jobBuilder) {
jobBuilder.listener(new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
// 배치 수집 시점 설정
OffsetDateTime collectedAt = OffsetDateTime.now();
aisTargetDataProcessor.setCollectedAt(collectedAt);
log.info("[{}] Job 시작 - 수집 시점: {}", getJobName(), collectedAt);
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("[{}] Job 완료 - 상태: {}, 처리 건수: {}",
getJobName(),
jobExecution.getStatus(),
jobExecution.getStepExecutions().stream()
.mapToLong(se -> se.getWriteCount())
.sum());
}
});
}
@Bean(name = "aisTargetImportJob")
public Job aisTargetImportJob() {
return job();
}
}

파일 보기

@ -0,0 +1,27 @@
package com.snp.batch.jobs.aistarget.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* AIS GetTargets API 응답 래퍼
*
* API 응답 구조:
* {
* "targetArr": [...]
* }
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AisTargetApiResponse {
@JsonProperty("targetArr")
private List<AisTargetDto> targetArr;
}

파일 보기

@ -0,0 +1,135 @@
package com.snp.batch.jobs.aistarget.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* AIS Target API 응답 DTO
*
* API: POST /AisSvc.svc/AIS/GetTargets
* Request: {"sinceSeconds": "60"}
* Response: {"targetArr": [...]}
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AisTargetDto {
@JsonProperty("MMSI")
private Long mmsi;
@JsonProperty("IMO")
private Long imo;
@JsonProperty("AgeMinutes")
private Double ageMinutes;
@JsonProperty("Lat")
private Double lat;
@JsonProperty("Lon")
private Double lon;
@JsonProperty("Heading")
private Double heading;
@JsonProperty("SoG")
private Double sog; // Speed over Ground
@JsonProperty("CoG")
private Double cog; // Course over Ground (if available)
@JsonProperty("Width")
private Integer width;
@JsonProperty("Length")
private Integer length;
@JsonProperty("Draught")
private Double draught;
@JsonProperty("Name")
private String name;
@JsonProperty("Callsign")
private String callsign;
@JsonProperty("Destination")
private String destination;
@JsonProperty("ETA")
private String eta;
@JsonProperty("Status")
private String status;
@JsonProperty("VesselType")
private String vesselType;
@JsonProperty("ExtraInfo")
private String extraInfo;
@JsonProperty("PositionAccuracy")
private Integer positionAccuracy;
@JsonProperty("RoT")
private Integer rot; // Rate of Turn
@JsonProperty("TimestampUTC")
private Integer timestampUtc;
@JsonProperty("RepeatIndicator")
private Integer repeatIndicator;
@JsonProperty("RAIMFlag")
private Integer raimFlag;
@JsonProperty("RadioStatus")
private Integer radioStatus;
@JsonProperty("Regional")
private Integer regional;
@JsonProperty("Regional2")
private Integer regional2;
@JsonProperty("Spare")
private Integer spare;
@JsonProperty("Spare2")
private Integer spare2;
@JsonProperty("AISVersion")
private Integer aisVersion;
@JsonProperty("PositionFixType")
private Integer positionFixType;
@JsonProperty("DTE")
private Integer dte;
@JsonProperty("BandFlag")
private Integer bandFlag;
@JsonProperty("ReceivedDate")
private String receivedDate;
@JsonProperty("MessageTimestamp")
private String messageTimestamp;
@JsonProperty("LengthBow")
private Integer lengthBow;
@JsonProperty("LengthStern")
private Integer lengthStern;
@JsonProperty("WidthPort")
private Integer widthPort;
@JsonProperty("WidthStarboard")
private Integer widthStarboard;
}

파일 보기

@ -0,0 +1,85 @@
package com.snp.batch.jobs.aistarget.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.OffsetDateTime;
/**
* AIS Target Entity
*
* 테이블: snp_data.ais_target
* PK: mmsi + message_timestamp (복합키)
*
* 용도:
* - 선박 위치 이력 저장 (항적 분석용)
* - 특정 시점/구역 선박 조회
* - LineString 항적 생성 기반 데이터
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class AisTargetEntity extends BaseEntity {
// ========== PK (복합키) ==========
private Long mmsi;
private OffsetDateTime messageTimestamp;
// ========== 선박 식별 정보 ==========
private Long imo;
private String name;
private String callsign;
private String vesselType;
private String extraInfo;
// ========== 위치 정보 ==========
private Double lat;
private Double lon;
// geom은 DB에서 ST_SetSRID(ST_MakePoint(lon, lat), 4326) 생성
// ========== 항해 정보 ==========
private Double heading;
private Double sog; // Speed over Ground
private Double cog; // Course over Ground
private Integer rot; // Rate of Turn
// ========== 선박 제원 ==========
private Integer length;
private Integer width;
private Double draught;
private Integer lengthBow;
private Integer lengthStern;
private Integer widthPort;
private Integer widthStarboard;
// ========== 목적지 정보 ==========
private String destination;
private OffsetDateTime eta;
private String status;
// ========== AIS 메시지 정보 ==========
private Double ageMinutes;
private Integer positionAccuracy;
private Integer timestampUtc;
private Integer repeatIndicator;
private Integer raimFlag;
private Integer radioStatus;
private Integer regional;
private Integer regional2;
private Integer spare;
private Integer spare2;
private Integer aisVersion;
private Integer positionFixType;
private Integer dte;
private Integer bandFlag;
// ========== 타임스탬프 ==========
private OffsetDateTime receivedDate;
private OffsetDateTime collectedAt; // 배치 수집 시점
}

파일 보기

@ -0,0 +1,121 @@
package com.snp.batch.jobs.aistarget.batch.processor;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
/**
* AIS Target 데이터 Processor
*
* DTO Entity 변환
* - 타임스탬프 파싱
* - 필터링 (유효한 위치 정보만)
*/
@Slf4j
@Component
public class AisTargetDataProcessor extends BaseProcessor<AisTargetDto, AisTargetEntity> {
private static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_DATE_TIME;
// 배치 수집 시점 (모든 레코드에 동일하게 적용)
private OffsetDateTime collectedAt;
public void setCollectedAt(OffsetDateTime collectedAt) {
this.collectedAt = collectedAt;
}
@Override
protected AisTargetEntity processItem(AisTargetDto dto) throws Exception {
// 유효성 검사: MMSI와 위치 정보는 필수
if (dto.getMmsi() == null || dto.getLat() == null || dto.getLon() == null) {
log.debug("유효하지 않은 데이터 스킵 - MMSI: {}, Lat: {}, Lon: {}",
dto.getMmsi(), dto.getLat(), dto.getLon());
return null;
}
// MessageTimestamp 파싱 (PK의 일부)
OffsetDateTime messageTimestamp = parseTimestamp(dto.getMessageTimestamp());
if (messageTimestamp == null) {
log.debug("MessageTimestamp 파싱 실패 - MMSI: {}, Timestamp: {}",
dto.getMmsi(), dto.getMessageTimestamp());
return null;
}
return AisTargetEntity.builder()
// PK
.mmsi(dto.getMmsi())
.messageTimestamp(messageTimestamp)
// 선박 식별 정보
.imo(dto.getImo())
.name(dto.getName())
.callsign(dto.getCallsign())
.vesselType(dto.getVesselType())
.extraInfo(dto.getExtraInfo())
// 위치 정보
.lat(dto.getLat())
.lon(dto.getLon())
// 항해 정보
.heading(dto.getHeading())
.sog(dto.getSog())
.cog(dto.getCog())
.rot(dto.getRot())
// 선박 제원
.length(dto.getLength())
.width(dto.getWidth())
.draught(dto.getDraught())
.lengthBow(dto.getLengthBow())
.lengthStern(dto.getLengthStern())
.widthPort(dto.getWidthPort())
.widthStarboard(dto.getWidthStarboard())
// 목적지 정보
.destination(dto.getDestination())
.eta(parseEta(dto.getEta()))
.status(dto.getStatus())
// AIS 메시지 정보
.ageMinutes(dto.getAgeMinutes())
.positionAccuracy(dto.getPositionAccuracy())
.timestampUtc(dto.getTimestampUtc())
.repeatIndicator(dto.getRepeatIndicator())
.raimFlag(dto.getRaimFlag())
.radioStatus(dto.getRadioStatus())
.regional(dto.getRegional())
.regional2(dto.getRegional2())
.spare(dto.getSpare())
.spare2(dto.getSpare2())
.aisVersion(dto.getAisVersion())
.positionFixType(dto.getPositionFixType())
.dte(dto.getDte())
.bandFlag(dto.getBandFlag())
// 타임스탬프
.receivedDate(parseTimestamp(dto.getReceivedDate()))
.collectedAt(collectedAt != null ? collectedAt : OffsetDateTime.now())
.build();
}
private OffsetDateTime parseTimestamp(String timestamp) {
if (timestamp == null || timestamp.isEmpty()) {
return null;
}
try {
// ISO 8601 형식 파싱 (: "2025-12-01T23:55:01.073Z")
return OffsetDateTime.parse(timestamp, ISO_FORMATTER);
} catch (DateTimeParseException e) {
log.trace("타임스탬프 파싱 실패: {}", timestamp);
return null;
}
}
private OffsetDateTime parseEta(String eta) {
if (eta == null || eta.isEmpty() || "9999-12-31T23:59:59Z".equals(eta)) {
return null; // 유효하지 않은 ETA는 null 처리
}
return parseTimestamp(eta);
}
}

파일 보기

@ -0,0 +1,98 @@
package com.snp.batch.jobs.aistarget.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetApiResponse;
import com.snp.batch.jobs.aistarget.batch.dto.AisTargetDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* AIS Target 데이터 Reader
*
* API: POST /AisSvc.svc/AIS/GetTargets
* Request: {"sinceSeconds": "60"}
*
* 동작:
* - 15초에 실행 (Quartz 스케줄)
* - 최근 60초 동안의 전체 선박 위치 정보 조회
* - 33,000건/ 처리
*/
@Slf4j
public class AisTargetDataReader extends BaseApiReader<AisTargetDto> {
private final int sinceSeconds;
public AisTargetDataReader(WebClient webClient, int sinceSeconds) {
super(webClient);
this.sinceSeconds = sinceSeconds;
}
@Override
protected String getReaderName() {
return "AisTargetDataReader";
}
@Override
protected String getApiPath() {
return "/AisSvc.svc/AIS/GetTargets";
}
@Override
protected String getHttpMethod() {
return "POST";
}
@Override
protected Object getRequestBody() {
return Map.of("sinceSeconds", String.valueOf(sinceSeconds));
}
@Override
protected Class<?> getResponseType() {
return AisTargetApiResponse.class;
}
@Override
protected void beforeFetch() {
log.info("[{}] AIS GetTargets API 호출 준비 - sinceSeconds: {}", getReaderName(), sinceSeconds);
}
@Override
protected List<AisTargetDto> fetchDataFromApi() {
try {
log.info("[{}] API 호출 시작: {} {}", getReaderName(), getHttpMethod(), getApiPath());
AisTargetApiResponse response = webClient.post()
.uri(getApiPath())
.bodyValue(getRequestBody())
.retrieve()
.bodyToMono(AisTargetApiResponse.class)
.block();
if (response != null && response.getTargetArr() != null) {
List<AisTargetDto> targets = response.getTargetArr();
log.info("[{}] API 호출 완료: {} 건 조회", getReaderName(), targets.size());
updateApiCallStats(1, 1);
return targets;
} else {
log.warn("[{}] API 응답이 비어있습니다", getReaderName());
return Collections.emptyList();
}
} catch (Exception e) {
log.error("[{}] API 호출 실패: {}", getReaderName(), e.getMessage(), e);
return handleApiError(e);
}
}
@Override
protected void afterFetch(List<AisTargetDto> data) {
if (data != null && !data.isEmpty()) {
log.info("[{}] 데이터 조회 완료 - 총 {} 건", getReaderName(), data.size());
}
}
}

파일 보기

@ -0,0 +1,59 @@
package com.snp.batch.jobs.aistarget.batch.repository;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Optional;
/**
* AIS Target Repository 인터페이스
*/
public interface AisTargetRepository {
/**
* 복합키로 조회 (MMSI + MessageTimestamp)
*/
Optional<AisTargetEntity> findByMmsiAndMessageTimestamp(Long mmsi, OffsetDateTime messageTimestamp);
/**
* MMSI로 최신 위치 조회
*/
Optional<AisTargetEntity> findLatestByMmsi(Long mmsi);
/**
* 여러 MMSI의 최신 위치 조회
*/
List<AisTargetEntity> findLatestByMmsiIn(List<Long> mmsiList);
/**
* 시간 범위 특정 MMSI의 항적 조회
*/
List<AisTargetEntity> findByMmsiAndTimeRange(Long mmsi, OffsetDateTime start, OffsetDateTime end);
/**
* 시간 범위 + 공간 범위 선박 조회
*/
List<AisTargetEntity> findByTimeRangeAndArea(
OffsetDateTime start,
OffsetDateTime end,
Double centerLon,
Double centerLat,
Double radiusMeters
);
/**
* 배치 INSERT (UPSERT)
*/
void batchUpsert(List<AisTargetEntity> entities);
/**
* 전체 건수 조회
*/
long count();
/**
* 오래된 데이터 삭제 (보존 기간 이전 데이터)
*/
int deleteOlderThan(OffsetDateTime threshold);
}

파일 보기

@ -0,0 +1,315 @@
package com.snp.batch.jobs.aistarget.batch.repository;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Optional;
/**
* AIS Target Repository 구현체
*
* 테이블: snp_data.ais_target
* PK: mmsi + message_timestamp (복합키)
*/
@Slf4j
@Repository
@RequiredArgsConstructor
public class AisTargetRepositoryImpl implements AisTargetRepository {
private final JdbcTemplate jdbcTemplate;
private static final String TABLE_NAME = "snp_data.ais_target";
// ==================== UPSERT SQL ====================
private static final String UPSERT_SQL = """
INSERT INTO snp_data.ais_target (
mmsi, message_timestamp, imo, name, callsign, vessel_type, extra_info,
lat, lon, geom,
heading, sog, cog, rot,
length, width, draught, length_bow, length_stern, width_port, width_starboard,
destination, eta, status,
age_minutes, position_accuracy, timestamp_utc, repeat_indicator, raim_flag,
radio_status, regional, regional2, spare, spare2,
ais_version, position_fix_type, dte, band_flag,
received_date, collected_at, created_at, updated_at
) VALUES (
?, ?, ?, ?, ?, ?, ?,
?, ?, ST_SetSRID(ST_MakePoint(?, ?), 4326),
?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?,
?, ?, NOW(), NOW()
)
ON CONFLICT (mmsi, message_timestamp) DO UPDATE SET
imo = EXCLUDED.imo,
name = EXCLUDED.name,
callsign = EXCLUDED.callsign,
vessel_type = EXCLUDED.vessel_type,
extra_info = EXCLUDED.extra_info,
lat = EXCLUDED.lat,
lon = EXCLUDED.lon,
geom = EXCLUDED.geom,
heading = EXCLUDED.heading,
sog = EXCLUDED.sog,
cog = EXCLUDED.cog,
rot = EXCLUDED.rot,
length = EXCLUDED.length,
width = EXCLUDED.width,
draught = EXCLUDED.draught,
length_bow = EXCLUDED.length_bow,
length_stern = EXCLUDED.length_stern,
width_port = EXCLUDED.width_port,
width_starboard = EXCLUDED.width_starboard,
destination = EXCLUDED.destination,
eta = EXCLUDED.eta,
status = EXCLUDED.status,
age_minutes = EXCLUDED.age_minutes,
position_accuracy = EXCLUDED.position_accuracy,
timestamp_utc = EXCLUDED.timestamp_utc,
repeat_indicator = EXCLUDED.repeat_indicator,
raim_flag = EXCLUDED.raim_flag,
radio_status = EXCLUDED.radio_status,
regional = EXCLUDED.regional,
regional2 = EXCLUDED.regional2,
spare = EXCLUDED.spare,
spare2 = EXCLUDED.spare2,
ais_version = EXCLUDED.ais_version,
position_fix_type = EXCLUDED.position_fix_type,
dte = EXCLUDED.dte,
band_flag = EXCLUDED.band_flag,
received_date = EXCLUDED.received_date,
collected_at = EXCLUDED.collected_at,
updated_at = NOW()
""";
// ==================== RowMapper ====================
private final RowMapper<AisTargetEntity> rowMapper = (rs, rowNum) -> AisTargetEntity.builder()
.mmsi(rs.getLong("mmsi"))
.messageTimestamp(toOffsetDateTime(rs.getTimestamp("message_timestamp")))
.imo(rs.getObject("imo", Long.class))
.name(rs.getString("name"))
.callsign(rs.getString("callsign"))
.vesselType(rs.getString("vessel_type"))
.extraInfo(rs.getString("extra_info"))
.lat(rs.getObject("lat", Double.class))
.lon(rs.getObject("lon", Double.class))
.heading(rs.getObject("heading", Double.class))
.sog(rs.getObject("sog", Double.class))
.cog(rs.getObject("cog", Double.class))
.rot(rs.getObject("rot", Integer.class))
.length(rs.getObject("length", Integer.class))
.width(rs.getObject("width", Integer.class))
.draught(rs.getObject("draught", Double.class))
.lengthBow(rs.getObject("length_bow", Integer.class))
.lengthStern(rs.getObject("length_stern", Integer.class))
.widthPort(rs.getObject("width_port", Integer.class))
.widthStarboard(rs.getObject("width_starboard", Integer.class))
.destination(rs.getString("destination"))
.eta(toOffsetDateTime(rs.getTimestamp("eta")))
.status(rs.getString("status"))
.ageMinutes(rs.getObject("age_minutes", Double.class))
.positionAccuracy(rs.getObject("position_accuracy", Integer.class))
.timestampUtc(rs.getObject("timestamp_utc", Integer.class))
.repeatIndicator(rs.getObject("repeat_indicator", Integer.class))
.raimFlag(rs.getObject("raim_flag", Integer.class))
.radioStatus(rs.getObject("radio_status", Integer.class))
.regional(rs.getObject("regional", Integer.class))
.regional2(rs.getObject("regional2", Integer.class))
.spare(rs.getObject("spare", Integer.class))
.spare2(rs.getObject("spare2", Integer.class))
.aisVersion(rs.getObject("ais_version", Integer.class))
.positionFixType(rs.getObject("position_fix_type", Integer.class))
.dte(rs.getObject("dte", Integer.class))
.bandFlag(rs.getObject("band_flag", Integer.class))
.receivedDate(toOffsetDateTime(rs.getTimestamp("received_date")))
.collectedAt(toOffsetDateTime(rs.getTimestamp("collected_at")))
.build();
// ==================== Repository Methods ====================
@Override
public Optional<AisTargetEntity> findByMmsiAndMessageTimestamp(Long mmsi, OffsetDateTime messageTimestamp) {
String sql = "SELECT * FROM " + TABLE_NAME + " WHERE mmsi = ? AND message_timestamp = ?";
List<AisTargetEntity> results = jdbcTemplate.query(sql, rowMapper, mmsi, toTimestamp(messageTimestamp));
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public Optional<AisTargetEntity> findLatestByMmsi(Long mmsi) {
String sql = """
SELECT * FROM %s
WHERE mmsi = ?
ORDER BY message_timestamp DESC
LIMIT 1
""".formatted(TABLE_NAME);
List<AisTargetEntity> results = jdbcTemplate.query(sql, rowMapper, mmsi);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
@Override
public List<AisTargetEntity> findLatestByMmsiIn(List<Long> mmsiList) {
if (mmsiList == null || mmsiList.isEmpty()) {
return List.of();
}
// DISTINCT ON을 사용하여 MMSI별 최신 레코드 조회
String sql = """
SELECT DISTINCT ON (mmsi) *
FROM %s
WHERE mmsi = ANY(?)
ORDER BY mmsi, message_timestamp DESC
""".formatted(TABLE_NAME);
Long[] mmsiArray = mmsiList.toArray(new Long[0]);
return jdbcTemplate.query(sql, rowMapper, (Object) mmsiArray);
}
@Override
public List<AisTargetEntity> findByMmsiAndTimeRange(Long mmsi, OffsetDateTime start, OffsetDateTime end) {
String sql = """
SELECT * FROM %s
WHERE mmsi = ?
AND message_timestamp BETWEEN ? AND ?
ORDER BY message_timestamp ASC
""".formatted(TABLE_NAME);
return jdbcTemplate.query(sql, rowMapper, mmsi, toTimestamp(start), toTimestamp(end));
}
@Override
public List<AisTargetEntity> findByTimeRangeAndArea(
OffsetDateTime start,
OffsetDateTime end,
Double centerLon,
Double centerLat,
Double radiusMeters
) {
String sql = """
SELECT DISTINCT ON (mmsi) *
FROM %s
WHERE message_timestamp BETWEEN ? AND ?
AND ST_DWithin(
geom::geography,
ST_SetSRID(ST_MakePoint(?, ?), 4326)::geography,
?
)
ORDER BY mmsi, message_timestamp DESC
""".formatted(TABLE_NAME);
return jdbcTemplate.query(sql, rowMapper,
toTimestamp(start), toTimestamp(end),
centerLon, centerLat, radiusMeters);
}
@Override
@Transactional
public void batchUpsert(List<AisTargetEntity> entities) {
if (entities == null || entities.isEmpty()) {
return;
}
log.info("AIS Target 배치 UPSERT 시작: {} 건", entities.size());
jdbcTemplate.batchUpdate(UPSERT_SQL, entities, 1000, (ps, entity) -> {
int idx = 1;
// PK
ps.setLong(idx++, entity.getMmsi());
ps.setTimestamp(idx++, toTimestamp(entity.getMessageTimestamp()));
// 선박 식별 정보
ps.setObject(idx++, entity.getImo());
ps.setString(idx++, truncate(entity.getName(), 100));
ps.setString(idx++, truncate(entity.getCallsign(), 20));
ps.setString(idx++, truncate(entity.getVesselType(), 50));
ps.setString(idx++, truncate(entity.getExtraInfo(), 100));
// 위치 정보
ps.setObject(idx++, entity.getLat());
ps.setObject(idx++, entity.getLon());
// geom용 lon, lat
ps.setObject(idx++, entity.getLon());
ps.setObject(idx++, entity.getLat());
// 항해 정보
ps.setObject(idx++, entity.getHeading());
ps.setObject(idx++, entity.getSog());
ps.setObject(idx++, entity.getCog());
ps.setObject(idx++, entity.getRot());
// 선박 제원
ps.setObject(idx++, entity.getLength());
ps.setObject(idx++, entity.getWidth());
ps.setObject(idx++, entity.getDraught());
ps.setObject(idx++, entity.getLengthBow());
ps.setObject(idx++, entity.getLengthStern());
ps.setObject(idx++, entity.getWidthPort());
ps.setObject(idx++, entity.getWidthStarboard());
// 목적지 정보
ps.setString(idx++, truncate(entity.getDestination(), 200));
ps.setTimestamp(idx++, toTimestamp(entity.getEta()));
ps.setString(idx++, truncate(entity.getStatus(), 50));
// AIS 메시지 정보
ps.setObject(idx++, entity.getAgeMinutes());
ps.setObject(idx++, entity.getPositionAccuracy());
ps.setObject(idx++, entity.getTimestampUtc());
ps.setObject(idx++, entity.getRepeatIndicator());
ps.setObject(idx++, entity.getRaimFlag());
ps.setObject(idx++, entity.getRadioStatus());
ps.setObject(idx++, entity.getRegional());
ps.setObject(idx++, entity.getRegional2());
ps.setObject(idx++, entity.getSpare());
ps.setObject(idx++, entity.getSpare2());
ps.setObject(idx++, entity.getAisVersion());
ps.setObject(idx++, entity.getPositionFixType());
ps.setObject(idx++, entity.getDte());
ps.setObject(idx++, entity.getBandFlag());
// 타임스탬프
ps.setTimestamp(idx++, toTimestamp(entity.getReceivedDate()));
ps.setTimestamp(idx++, toTimestamp(entity.getCollectedAt()));
});
log.info("AIS Target 배치 UPSERT 완료: {} 건", entities.size());
}
@Override
public long count() {
String sql = "SELECT COUNT(*) FROM " + TABLE_NAME;
Long count = jdbcTemplate.queryForObject(sql, Long.class);
return count != null ? count : 0L;
}
@Override
@Transactional
public int deleteOlderThan(OffsetDateTime threshold) {
String sql = "DELETE FROM " + TABLE_NAME + " WHERE message_timestamp < ?";
int deleted = jdbcTemplate.update(sql, toTimestamp(threshold));
log.info("AIS Target 오래된 데이터 삭제 완료: {} 건 (기준: {})", deleted, threshold);
return deleted;
}
// ==================== Helper Methods ====================
private Timestamp toTimestamp(OffsetDateTime odt) {
return odt != null ? Timestamp.from(odt.toInstant()) : null;
}
private OffsetDateTime toOffsetDateTime(Timestamp ts) {
return ts != null ? ts.toInstant().atOffset(ZoneOffset.UTC) : null;
}
private String truncate(String value, int maxLength) {
if (value == null) return null;
return value.length() > maxLength ? value.substring(0, maxLength) : value;
}
}

파일 보기

@ -0,0 +1,47 @@
package com.snp.batch.jobs.aistarget.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository;
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* AIS Target 데이터 Writer
*
* 동작:
* - UPSERT 방식으로 DB 저장 (PK: mmsi + message_timestamp)
* - 동시에 캐시에도 최신 위치 정보 업데이트
*/
@Slf4j
@Component
public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
private final AisTargetRepository aisTargetRepository;
private final AisTargetCacheManager cacheManager;
public AisTargetDataWriter(
AisTargetRepository aisTargetRepository,
AisTargetCacheManager cacheManager) {
super("AisTarget");
this.aisTargetRepository = aisTargetRepository;
this.cacheManager = cacheManager;
}
@Override
protected void writeItems(List<AisTargetEntity> items) throws Exception {
log.debug("AIS Target 데이터 저장 시작: {} 건", items.size());
// 1. DB 저장
aisTargetRepository.batchUpsert(items);
// 2. 캐시 업데이트 (최신 위치 정보)
cacheManager.putAll(items);
log.debug("AIS Target 데이터 저장 완료: {} 건 (캐시 크기: {})",
items.size(), cacheManager.size());
}
}

파일 보기

@ -0,0 +1,272 @@
package com.snp.batch.jobs.aistarget.cache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* AIS Target 캐시 매니저 (Caffeine 기반)
*
* Caffeine 캐시의 장점:
* - 고성능: ConcurrentHashMap 대비 우수한 성능
* - 자동 만료: expireAfterWrite/expireAfterAccess 내장
* - 최대 크기 제한: maximumSize + LRU/LFU 자동 정리
* - 통계: 히트율, 미스율, 로드 시간 상세 통계
* - 비동기 지원: AsyncCache로 비동기 로딩 가능
*
* 동작:
* - 배치 Writer에서 DB 저장과 동시에 캐시 업데이트
* - API 조회 캐시 우선 조회
* - 캐시 미스 DB 조회 캐시 갱신
* - TTL: 마지막 쓰기 이후 N분 자동 만료
*/
@Slf4j
@Component
public class AisTargetCacheManager {
private Cache<Long, AisTargetEntity> cache;
@Value("${app.batch.ais-target-cache.ttl-minutes:5}")
private long ttlMinutes;
@Value("${app.batch.ais-target-cache.max-size:100000}")
private int maxSize;
@PostConstruct
public void init() {
this.cache = Caffeine.newBuilder()
// 최대 캐시 크기 (초과 LRU 방식으로 정리)
.maximumSize(maxSize)
// 마지막 쓰기 이후 TTL (데이터 업데이트 자동 갱신)
.expireAfterWrite(ttlMinutes, TimeUnit.MINUTES)
// 통계 수집 활성화
.recordStats()
// 제거 리스너 (디버깅/모니터링용)
.removalListener((Long key, AisTargetEntity value, RemovalCause cause) -> {
if (cause != RemovalCause.REPLACED) {
log.trace("캐시 제거 - MMSI: {}, 원인: {}", key, cause);
}
})
.build();
log.info("AIS Target Caffeine 캐시 초기화 - TTL: {}분, 최대 크기: {}", ttlMinutes, maxSize);
}
// ==================== 단건 조회/업데이트 ====================
/**
* 캐시에서 최신 위치 조회
*
* @param mmsi MMSI 번호
* @return 캐시된 데이터 (없으면 Optional.empty)
*/
public Optional<AisTargetEntity> get(Long mmsi) {
AisTargetEntity entity = cache.getIfPresent(mmsi);
return Optional.ofNullable(entity);
}
/**
* 캐시에 데이터 저장/업데이트
* - 기존 데이터보다 최신인 경우에만 업데이트
* - 업데이트 TTL 자동 갱신 (expireAfterWrite)
*
* @param entity AIS Target 엔티티
*/
public void put(AisTargetEntity entity) {
if (entity == null || entity.getMmsi() == null) {
return;
}
Long mmsi = entity.getMmsi();
AisTargetEntity existing = cache.getIfPresent(mmsi);
// 기존 데이터보다 최신인 경우에만 업데이트
if (existing == null || isNewer(entity, existing)) {
cache.put(mmsi, entity);
log.trace("캐시 저장 - MMSI: {}", mmsi);
}
}
// ==================== 배치 조회/업데이트 ====================
/**
* 여러 MMSI의 최신 위치 조회
*
* @param mmsiList MMSI 목록
* @return 캐시에서 찾은 데이터 (MMSI -> Entity)
*/
public Map<Long, AisTargetEntity> getAll(List<Long> mmsiList) {
if (mmsiList == null || mmsiList.isEmpty()) {
return Collections.emptyMap();
}
// Caffeine의 getAllPresent는 존재하는 키만 반환
Map<Long, AisTargetEntity> result = cache.getAllPresent(mmsiList);
log.debug("캐시 배치 조회 - 요청: {}, 히트: {}",
mmsiList.size(), result.size());
return result;
}
/**
* 여러 데이터 일괄 저장/업데이트 (배치 Writer에서 호출)
*
* @param entities AIS Target 엔티티 목록
*/
public void putAll(List<AisTargetEntity> entities) {
if (entities == null || entities.isEmpty()) {
return;
}
int updated = 0;
int skipped = 0;
for (AisTargetEntity entity : entities) {
if (entity == null || entity.getMmsi() == null) {
continue;
}
Long mmsi = entity.getMmsi();
AisTargetEntity existing = cache.getIfPresent(mmsi);
// 기존 데이터보다 최신인 경우에만 업데이트
if (existing == null || isNewer(entity, existing)) {
cache.put(mmsi, entity);
updated++;
} else {
skipped++;
}
}
log.debug("캐시 배치 업데이트 - 입력: {}, 업데이트: {}, 스킵: {}, 현재 크기: {}",
entities.size(), updated, skipped, cache.estimatedSize());
}
// ==================== 캐시 관리 ====================
/**
* 특정 MMSI 캐시 삭제
*/
public void evict(Long mmsi) {
cache.invalidate(mmsi);
}
/**
* 여러 MMSI 캐시 삭제
*/
public void evictAll(List<Long> mmsiList) {
cache.invalidateAll(mmsiList);
}
/**
* 전체 캐시 삭제
*/
public void clear() {
long size = cache.estimatedSize();
cache.invalidateAll();
log.info("캐시 전체 삭제 - {} 건", size);
}
/**
* 현재 캐시 크기 (추정값)
*/
public long size() {
return cache.estimatedSize();
}
/**
* 캐시 정리 (만료된 엔트리 즉시 제거)
*/
public void cleanup() {
cache.cleanUp();
}
// ==================== 통계 ====================
/**
* 캐시 통계 조회
*/
public Map<String, Object> getStats() {
CacheStats stats = cache.stats();
Map<String, Object> result = new LinkedHashMap<>();
result.put("estimatedSize", cache.estimatedSize());
result.put("maxSize", maxSize);
result.put("ttlMinutes", ttlMinutes);
result.put("hitCount", stats.hitCount());
result.put("missCount", stats.missCount());
result.put("hitRate", String.format("%.2f%%", stats.hitRate() * 100));
result.put("missRate", String.format("%.2f%%", stats.missRate() * 100));
result.put("evictionCount", stats.evictionCount());
result.put("loadCount", stats.loadCount());
result.put("averageLoadPenalty", String.format("%.2fms", stats.averageLoadPenalty() / 1_000_000.0));
result.put("utilizationPercent", String.format("%.2f%%", (cache.estimatedSize() * 100.0 / maxSize)));
return result;
}
/**
* 상세 통계 조회 (Caffeine CacheStats 원본)
*/
public CacheStats getCacheStats() {
return cache.stats();
}
// ==================== 전체 데이터 조회 (공간 필터링용) ====================
/**
* 캐시의 모든 데이터 조회 (공간 필터링용)
* 주의: 대용량 데이터이므로 신중하게 사용
*
* @return 캐시된 모든 엔티티
*/
public Collection<AisTargetEntity> getAllValues() {
return cache.asMap().values();
}
/**
* 시간 범위 데이터 필터링
*
* @param minutes 최근 N분
* @return 시간 범위 엔티티 목록
*/
public List<AisTargetEntity> getByTimeRange(int minutes) {
java.time.OffsetDateTime threshold = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC)
.minusMinutes(minutes);
return cache.asMap().values().stream()
.filter(entity -> entity.getMessageTimestamp() != null)
.filter(entity -> entity.getMessageTimestamp().isAfter(threshold))
.collect(java.util.stream.Collectors.toList());
}
// ==================== Private Methods ====================
/**
* 데이터가 기존 데이터보다 최신인지 확인
*/
private boolean isNewer(AisTargetEntity newEntity, AisTargetEntity existing) {
OffsetDateTime newTimestamp = newEntity.getMessageTimestamp();
OffsetDateTime existingTimestamp = existing.getMessageTimestamp();
if (newTimestamp == null) {
return false;
}
if (existingTimestamp == null) {
return true;
}
return newTimestamp.isAfter(existingTimestamp);
}
}

파일 보기

@ -0,0 +1,153 @@
package com.snp.batch.jobs.aistarget.cache;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import com.snp.batch.jobs.aistarget.web.dto.AisTargetFilterRequest;
import com.snp.batch.jobs.aistarget.web.dto.NumericCondition;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
/**
* AIS Target 필터링 유틸리티
*
* 캐시 데이터에 대한 조건 필터링 수행
* - SOG, COG, Heading: 숫자 범위 조건
* - Destination: 문자열 부분 일치
* - Status: 다중 선택 일치
*/
@Slf4j
@Component
public class AisTargetFilterUtil {
/**
* 필터 조건에 따라 엔티티 목록 필터링
*
* @param entities 원본 엔티티 목록
* @param request 필터 조건
* @return 필터링된 엔티티 목록
*/
public List<AisTargetEntity> filter(List<AisTargetEntity> entities, AisTargetFilterRequest request) {
if (entities == null || entities.isEmpty()) {
return List.of();
}
if (!request.hasAnyFilter()) {
return entities;
}
long startTime = System.currentTimeMillis();
List<AisTargetEntity> result = entities.parallelStream()
.filter(entity -> matchesSog(entity, request))
.filter(entity -> matchesCog(entity, request))
.filter(entity -> matchesHeading(entity, request))
.filter(entity -> matchesDestination(entity, request))
.filter(entity -> matchesStatus(entity, request))
.collect(Collectors.toList());
long elapsed = System.currentTimeMillis() - startTime;
log.debug("필터링 완료 - 입력: {}, 결과: {}, 소요: {}ms",
entities.size(), result.size(), elapsed);
return result;
}
/**
* SOG (속도) 조건 매칭
*/
private boolean matchesSog(AisTargetEntity entity, AisTargetFilterRequest request) {
if (!request.hasSogFilter()) {
return true; // 필터 없으면 통과
}
NumericCondition condition = NumericCondition.fromString(request.getSogCondition());
if (condition == null) {
return true;
}
return condition.matches(
entity.getSog(),
request.getSogValue(),
request.getSogMin(),
request.getSogMax()
);
}
/**
* COG (침로) 조건 매칭
*/
private boolean matchesCog(AisTargetEntity entity, AisTargetFilterRequest request) {
if (!request.hasCogFilter()) {
return true;
}
NumericCondition condition = NumericCondition.fromString(request.getCogCondition());
if (condition == null) {
return true;
}
return condition.matches(
entity.getCog(),
request.getCogValue(),
request.getCogMin(),
request.getCogMax()
);
}
/**
* Heading (선수방위) 조건 매칭
*/
private boolean matchesHeading(AisTargetEntity entity, AisTargetFilterRequest request) {
if (!request.hasHeadingFilter()) {
return true;
}
NumericCondition condition = NumericCondition.fromString(request.getHeadingCondition());
if (condition == null) {
return true;
}
return condition.matches(
entity.getHeading(),
request.getHeadingValue(),
request.getHeadingMin(),
request.getHeadingMax()
);
}
/**
* Destination (목적지) 조건 매칭 - 부분 일치, 대소문자 무시
*/
private boolean matchesDestination(AisTargetEntity entity, AisTargetFilterRequest request) {
if (!request.hasDestinationFilter()) {
return true;
}
String entityDestination = entity.getDestination();
if (entityDestination == null || entityDestination.isEmpty()) {
return false;
}
return entityDestination.toUpperCase().contains(request.getDestination().toUpperCase().trim());
}
/**
* Status (항행상태) 조건 매칭 - 다중 선택 하나라도 일치
*/
private boolean matchesStatus(AisTargetEntity entity, AisTargetFilterRequest request) {
if (!request.hasStatusFilter()) {
return true;
}
String entityStatus = entity.getStatus();
if (entityStatus == null || entityStatus.isEmpty()) {
return false;
}
// statusList에 포함되어 있으면 통과
return request.getStatusList().stream()
.anyMatch(status -> entityStatus.equalsIgnoreCase(status.trim()));
}
}

파일 보기

@ -0,0 +1,317 @@
package com.snp.batch.jobs.aistarget.cache;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import lombok.extern.slf4j.Slf4j;
import org.locationtech.jts.geom.*;
import org.locationtech.jts.geom.impl.CoordinateArraySequence;
import org.locationtech.jts.operation.distance.DistanceOp;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* 공간 필터링 유틸리티 (JTS 기반)
*
* 지원 기능:
* - 원형 범위 선박 필터링 (Point + Radius)
* - 폴리곤 범위 선박 필터링 (Polygon)
* - 거리 계산 (Haversine 공식 - 지구 곡률 고려)
*
* 성능:
* - 25만 필터링: 50-100ms (병렬 처리 )
* - 단순 거리 계산은 JTS 없이 Haversine으로 처리 ( 빠름)
* - 복잡한 폴리곤은 JTS 사용
*/
@Slf4j
@Component
public class SpatialFilterUtil {
private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory(new PrecisionModel(), 4326);
// 지구 반경 (미터)
private static final double EARTH_RADIUS_METERS = 6_371_000;
// ==================== 원형 범위 필터링 ====================
/**
* 원형 범위 선박 필터링 (Haversine 거리 계산 - 빠름)
*
* @param entities 전체 엔티티 목록
* @param centerLon 중심 경도
* @param centerLat 중심 위도
* @param radiusMeters 반경 (미터)
* @return 범위 엔티티 목록
*/
public List<AisTargetEntity> filterByCircle(
Collection<AisTargetEntity> entities,
double centerLon,
double centerLat,
double radiusMeters) {
if (entities == null || entities.isEmpty()) {
return new ArrayList<>();
}
long startTime = System.currentTimeMillis();
// 병렬 스트림으로 필터링 (대용량 데이터 최적화)
List<AisTargetEntity> result = entities.parallelStream()
.filter(entity -> entity.getLat() != null && entity.getLon() != null)
.filter(entity -> {
double distance = haversineDistance(
centerLat, centerLon,
entity.getLat(), entity.getLon()
);
return distance <= radiusMeters;
})
.collect(Collectors.toList());
long elapsed = System.currentTimeMillis() - startTime;
log.debug("원형 필터링 완료 - 입력: {}, 결과: {}, 소요: {}ms",
entities.size(), result.size(), elapsed);
return result;
}
/**
* 원형 범위 선박 필터링 + 거리 정보 포함
*/
public List<EntityWithDistance> filterByCircleWithDistance(
Collection<AisTargetEntity> entities,
double centerLon,
double centerLat,
double radiusMeters) {
if (entities == null || entities.isEmpty()) {
return new ArrayList<>();
}
return entities.parallelStream()
.filter(entity -> entity.getLat() != null && entity.getLon() != null)
.map(entity -> {
double distance = haversineDistance(
centerLat, centerLon,
entity.getLat(), entity.getLon()
);
return new EntityWithDistance(entity, distance);
})
.filter(ewd -> ewd.getDistanceMeters() <= radiusMeters)
.sorted((a, b) -> Double.compare(a.getDistanceMeters(), b.getDistanceMeters()))
.collect(Collectors.toList());
}
// ==================== 폴리곤 범위 필터링 ====================
/**
* 폴리곤 범위 선박 필터링 (JTS 사용)
*
* @param entities 전체 엔티티 목록
* @param polygonCoordinates 폴리곤 좌표 [[lon, lat], [lon, lat], ...] (닫힌 형태)
* @return 범위 엔티티 목록
*/
public List<AisTargetEntity> filterByPolygon(
Collection<AisTargetEntity> entities,
double[][] polygonCoordinates) {
if (entities == null || entities.isEmpty()) {
return new ArrayList<>();
}
if (polygonCoordinates == null || polygonCoordinates.length < 4) {
log.warn("유효하지 않은 폴리곤 좌표 (최소 4개 점 필요)");
return new ArrayList<>();
}
long startTime = System.currentTimeMillis();
// JTS Polygon 생성
Polygon polygon = createPolygon(polygonCoordinates);
if (polygon == null || !polygon.isValid()) {
log.warn("유효하지 않은 폴리곤");
return new ArrayList<>();
}
// 병렬 스트림으로 필터링
List<AisTargetEntity> result = entities.parallelStream()
.filter(entity -> entity.getLat() != null && entity.getLon() != null)
.filter(entity -> {
Point point = createPoint(entity.getLon(), entity.getLat());
return polygon.contains(point);
})
.collect(Collectors.toList());
long elapsed = System.currentTimeMillis() - startTime;
log.debug("폴리곤 필터링 완료 - 입력: {}, 결과: {}, 소요: {}ms",
entities.size(), result.size(), elapsed);
return result;
}
/**
* WKT(Well-Known Text) 형식 폴리곤으로 필터링
*
* @param entities 전체 엔티티 목록
* @param wkt WKT 문자열 (: "POLYGON((129 35, 130 35, 130 36, 129 36, 129 35))")
* @return 범위 엔티티 목록
*/
public List<AisTargetEntity> filterByWkt(
Collection<AisTargetEntity> entities,
String wkt) {
if (entities == null || entities.isEmpty()) {
return new ArrayList<>();
}
try {
Geometry geometry = new org.locationtech.jts.io.WKTReader(GEOMETRY_FACTORY).read(wkt);
return entities.parallelStream()
.filter(entity -> entity.getLat() != null && entity.getLon() != null)
.filter(entity -> {
Point point = createPoint(entity.getLon(), entity.getLat());
return geometry.contains(point);
})
.collect(Collectors.toList());
} catch (Exception e) {
log.error("WKT 파싱 실패: {}", wkt, e);
return new ArrayList<>();
}
}
// ==================== GeoJSON 지원 ====================
/**
* GeoJSON 형식 폴리곤으로 필터링
*
* @param entities 전체 엔티티 목록
* @param geoJsonCoordinates GeoJSON coordinates 배열 [[[lon, lat], ...]]
* @return 범위 엔티티 목록
*/
public List<AisTargetEntity> filterByGeoJson(
Collection<AisTargetEntity> entities,
double[][][] geoJsonCoordinates) {
if (geoJsonCoordinates == null || geoJsonCoordinates.length == 0) {
return new ArrayList<>();
}
// GeoJSON의 번째 (외부 경계)
return filterByPolygon(entities, geoJsonCoordinates[0]);
}
// ==================== 거리 계산 ====================
/**
* Haversine 공식을 사용한 지점 거리 계산 (미터)
* 지구 곡률을 고려한 정확한 거리 계산
*/
public double haversineDistance(double lat1, double lon1, double lat2, double lon2) {
double dLat = Math.toRadians(lat2 - lat1);
double dLon = Math.toRadians(lon2 - lon1);
double a = Math.sin(dLat / 2) * Math.sin(dLat / 2)
+ Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2))
* Math.sin(dLon / 2) * Math.sin(dLon / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
return EARTH_RADIUS_METERS * c;
}
/**
* 엔티티 거리 계산 (미터)
*/
public double calculateDistance(AisTargetEntity entity1, AisTargetEntity entity2) {
if (entity1.getLat() == null || entity1.getLon() == null ||
entity2.getLat() == null || entity2.getLon() == null) {
return Double.MAX_VALUE;
}
return haversineDistance(
entity1.getLat(), entity1.getLon(),
entity2.getLat(), entity2.getLon()
);
}
// ==================== JTS 헬퍼 메서드 ====================
/**
* JTS Point 생성
*/
public Point createPoint(double lon, double lat) {
return GEOMETRY_FACTORY.createPoint(new Coordinate(lon, lat));
}
/**
* JTS Polygon 생성
*/
public Polygon createPolygon(double[][] coordinates) {
try {
Coordinate[] coords = new Coordinate[coordinates.length];
for (int i = 0; i < coordinates.length; i++) {
coords[i] = new Coordinate(coordinates[i][0], coordinates[i][1]);
}
// 폴리곤이 닫혀있지 않으면 닫기
if (!coords[0].equals(coords[coords.length - 1])) {
Coordinate[] closedCoords = new Coordinate[coords.length + 1];
System.arraycopy(coords, 0, closedCoords, 0, coords.length);
closedCoords[coords.length] = coords[0];
coords = closedCoords;
}
LinearRing ring = GEOMETRY_FACTORY.createLinearRing(coords);
return GEOMETRY_FACTORY.createPolygon(ring);
} catch (Exception e) {
log.error("폴리곤 생성 실패", e);
return null;
}
}
/**
* 원형 폴리곤 생성 (근사치)
*
* @param centerLon 중심 경도
* @param centerLat 중심 위도
* @param radiusMeters 반경 (미터)
* @param numPoints 폴리곤 개수 (기본: 64)
*/
public Polygon createCirclePolygon(double centerLon, double centerLat, double radiusMeters, int numPoints) {
Coordinate[] coords = new Coordinate[numPoints + 1];
for (int i = 0; i < numPoints; i++) {
double angle = (2 * Math.PI * i) / numPoints;
// 위도/경도 변환 (근사치)
double dLat = (radiusMeters / EARTH_RADIUS_METERS) * (180 / Math.PI);
double dLon = dLat / Math.cos(Math.toRadians(centerLat));
double lat = centerLat + dLat * Math.sin(angle);
double lon = centerLon + dLon * Math.cos(angle);
coords[i] = new Coordinate(lon, lat);
}
coords[numPoints] = coords[0]; // 닫기
LinearRing ring = GEOMETRY_FACTORY.createLinearRing(coords);
return GEOMETRY_FACTORY.createPolygon(ring);
}
// ==================== 내부 클래스 ====================
/**
* 엔티티 + 거리 정보
*/
@lombok.Data
@lombok.AllArgsConstructor
public static class EntityWithDistance {
private AisTargetEntity entity;
private double distanceMeters;
}
}

파일 보기

@ -0,0 +1,393 @@
package com.snp.batch.jobs.aistarget.web.controller;
import com.snp.batch.common.web.ApiResponse;
import com.snp.batch.jobs.aistarget.web.dto.AisTargetFilterRequest;
import com.snp.batch.jobs.aistarget.web.dto.AisTargetResponseDto;
import com.snp.batch.jobs.aistarget.web.dto.AisTargetSearchRequest;
import com.snp.batch.jobs.aistarget.web.service.AisTargetService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import jakarta.validation.Valid;
import java.util.List;
import java.util.Map;
/**
* AIS Target REST API Controller
*
* 캐시 우선 조회 전략:
* - 캐시에서 먼저 조회
* - 캐시 미스 DB 조회 캐시 업데이트
*/
@Slf4j
@RestController
@RequestMapping("/api/ais-target")
@RequiredArgsConstructor
@Tag(name = "AIS Target", description = "AIS 선박 위치 정보 API")
public class AisTargetController {
private final AisTargetService aisTargetService;
// ==================== 단건 조회 ====================
@Operation(
summary = "MMSI로 최신 위치 조회",
description = "특정 MMSI의 최신 위치 정보를 조회합니다 (캐시 우선)"
)
@GetMapping("/{mmsi}")
public ResponseEntity<ApiResponse<AisTargetResponseDto>> getLatestByMmsi(
@Parameter(description = "MMSI 번호", required = true, example = "440123456")
@PathVariable Long mmsi) {
log.info("최신 위치 조회 요청 - MMSI: {}", mmsi);
return aisTargetService.findLatestByMmsi(mmsi)
.map(dto -> ResponseEntity.ok(ApiResponse.success(dto)))
.orElse(ResponseEntity.notFound().build());
}
// ==================== 다건 조회 ====================
@Operation(
summary = "여러 MMSI의 최신 위치 조회",
description = "여러 MMSI의 최신 위치 정보를 일괄 조회합니다 (캐시 우선)"
)
@PostMapping("/batch")
public ResponseEntity<ApiResponse<List<AisTargetResponseDto>>> getLatestByMmsiList(
@Parameter(description = "MMSI 번호 목록", required = true)
@RequestBody List<Long> mmsiList) {
log.info("다건 최신 위치 조회 요청 - 요청 수: {}", mmsiList.size());
List<AisTargetResponseDto> result = aisTargetService.findLatestByMmsiList(mmsiList);
return ResponseEntity.ok(ApiResponse.success(
"조회 완료: " + result.size() + "/" + mmsiList.size() + "",
result
));
}
// ==================== 검색 조회 ====================
@Operation(
summary = "시간/공간 범위로 선박 검색",
description = """
시간 범위 (필수) + 공간 범위 (옵션) 선박을 검색합니다.
- minutes: 조회 범위 (, 필수)
- centerLon, centerLat: 중심 좌표 (옵션)
- radiusMeters: 반경 (미터, 옵션)
공간 범위가 지정되지 않으면 전체 선박의 최신 위치를 반환합니다.
"""
)
@GetMapping("/search")
public ResponseEntity<ApiResponse<List<AisTargetResponseDto>>> search(
@Parameter(description = "조회 범위 (분)", required = true, example = "5")
@RequestParam Integer minutes,
@Parameter(description = "중심 경도", example = "129.0")
@RequestParam(required = false) Double centerLon,
@Parameter(description = "중심 위도", example = "35.0")
@RequestParam(required = false) Double centerLat,
@Parameter(description = "반경 (미터)", example = "50000")
@RequestParam(required = false) Double radiusMeters) {
log.info("선박 검색 요청 - minutes: {}, center: ({}, {}), radius: {}",
minutes, centerLon, centerLat, radiusMeters);
AisTargetSearchRequest request = AisTargetSearchRequest.builder()
.minutes(minutes)
.centerLon(centerLon)
.centerLat(centerLat)
.radiusMeters(radiusMeters)
.build();
List<AisTargetResponseDto> result = aisTargetService.search(request);
return ResponseEntity.ok(ApiResponse.success(
"검색 완료: " + result.size() + "",
result
));
}
@Operation(
summary = "시간/공간 범위로 선박 검색 (POST)",
description = "POST 방식으로 검색 조건을 전달합니다"
)
@PostMapping("/search")
public ResponseEntity<ApiResponse<List<AisTargetResponseDto>>> searchPost(
@Valid @RequestBody AisTargetSearchRequest request) {
log.info("선박 검색 요청 (POST) - minutes: {}, hasArea: {}",
request.getMinutes(), request.hasAreaFilter());
List<AisTargetResponseDto> result = aisTargetService.search(request);
return ResponseEntity.ok(ApiResponse.success(
"검색 완료: " + result.size() + "",
result
));
}
// ==================== 조건 필터 검색 ====================
@Operation(
summary = "항해 조건 필터 검색",
description = """
속도(SOG), 침로(COG), 선수방위(Heading), 목적지, 항행상태로 선박을 필터링합니다.
---
## 조건 타입 파라미터 사용법
| 조건 | 의미 | 사용 파라미터 |
|------|------|--------------|
| GTE | 이상 (>=) | *Value (: sogValue) |
| GT | 초과 (>) | *Value |
| LTE | 이하 (<=) | *Value |
| LT | 미만 (<) | *Value |
| BETWEEN | 범위 | *Min, *Max (: sogMin, sogMax) |
---
## 요청 예시
**예시 1: 단일 조건 (속도 10knots 이상)**
```json
{
"minutes": 5,
"sogCondition": "GTE",
"sogValue": 10.0
}
```
**예시 2: 범위 조건 (속도 5~15knots, 침로 90~180도)**
```json
{
"minutes": 5,
"sogCondition": "BETWEEN",
"sogMin": 5.0,
"sogMax": 15.0,
"cogCondition": "BETWEEN",
"cogMin": 90.0,
"cogMax": 180.0
}
```
**예시 3: 복합 조건**
```json
{
"minutes": 5,
"sogCondition": "GTE",
"sogValue": 10.0,
"cogCondition": "BETWEEN",
"cogMin": 90.0,
"cogMax": 180.0,
"headingCondition": "LT",
"headingValue": 180.0,
"destination": "BUSAN",
"statusList": ["0", "1", "5"]
}
```
---
## 항행상태 코드 (statusList)
| 코드 | 상태 |
|------|------|
| 0 | Under way using engine (기관 사용 항해 ) |
| 1 | At anchor (정박 ) |
| 2 | Not under command (조종불능) |
| 3 | Restricted manoeuverability (조종제한) |
| 4 | Constrained by her draught (흘수제약) |
| 5 | Moored (계류 ) |
| 6 | Aground (좌초) |
| 7 | Engaged in Fishing (어로 ) |
| 8 | Under way sailing ( 항해 ) |
| 9-10 | Reserved for future use |
| 11 | Power-driven vessel towing astern |
| 12 | Power-driven vessel pushing ahead |
| 13 | Reserved for future use |
| 14 | AIS-SART, MOB-AIS, EPIRB-AIS |
| 15 | Undefined (default) |
---
**참고:** 모든 필터는 선택사항이며, 미지정 해당 필드는 조건에서 제외됩니다 (전체 포함).
"""
)
@PostMapping("/search/filter")
public ResponseEntity<ApiResponse<List<AisTargetResponseDto>>> searchByFilter(
@Valid @RequestBody AisTargetFilterRequest request) {
log.info("필터 검색 요청 - minutes: {}, sog: {}/{}, cog: {}/{}, heading: {}/{}, dest: {}, status: {}",
request.getMinutes(),
request.getSogCondition(), request.getSogValue(),
request.getCogCondition(), request.getCogValue(),
request.getHeadingCondition(), request.getHeadingValue(),
request.getDestination(),
request.getStatusList());
List<AisTargetResponseDto> result = aisTargetService.searchByFilter(request);
return ResponseEntity.ok(ApiResponse.success(
"필터 검색 완료: " + result.size() + "",
result
));
}
// ==================== 폴리곤 검색 ====================
@Operation(
summary = "폴리곤 범위 내 선박 검색",
description = """
폴리곤 범위 선박을 검색합니다.
요청 예시:
{
"minutes": 5,
"coordinates": [[129.0, 35.0], [130.0, 35.0], [130.0, 36.0], [129.0, 36.0], [129.0, 35.0]]
}
좌표는 [경도, 위도] 순서이며, 폴리곤은 닫힌 형태여야 합니다 (첫점 = 끝점).
"""
)
@PostMapping("/search/polygon")
public ResponseEntity<ApiResponse<List<AisTargetResponseDto>>> searchByPolygon(
@RequestBody PolygonSearchRequest request) {
log.info("폴리곤 검색 요청 - minutes: {}, points: {}",
request.getMinutes(), request.getCoordinates().length);
List<AisTargetResponseDto> result = aisTargetService.searchByPolygon(
request.getMinutes(),
request.getCoordinates()
);
return ResponseEntity.ok(ApiResponse.success(
"폴리곤 검색 완료: " + result.size() + "",
result
));
}
@Operation(
summary = "WKT 범위 내 선박 검색",
description = """
WKT(Well-Known Text) 형식으로 정의된 범위 선박을 검색합니다.
요청 예시:
{
"minutes": 5,
"wkt": "POLYGON((129 35, 130 35, 130 36, 129 36, 129 35))"
}
지원 형식: POLYGON, MULTIPOLYGON
"""
)
@PostMapping("/search/wkt")
public ResponseEntity<ApiResponse<List<AisTargetResponseDto>>> searchByWkt(
@RequestBody WktSearchRequest request) {
log.info("WKT 검색 요청 - minutes: {}, wkt: {}", request.getMinutes(), request.getWkt());
List<AisTargetResponseDto> result = aisTargetService.searchByWkt(
request.getMinutes(),
request.getWkt()
);
return ResponseEntity.ok(ApiResponse.success(
"WKT 검색 완료: " + result.size() + "",
result
));
}
@Operation(
summary = "거리 포함 원형 범위 검색",
description = """
원형 범위 선박을 검색하고, 중심점으로부터의 거리 정보를 함께 반환합니다.
결과는 거리순으로 정렬됩니다.
"""
)
@GetMapping("/search/with-distance")
public ResponseEntity<ApiResponse<List<AisTargetService.AisTargetWithDistanceDto>>> searchWithDistance(
@Parameter(description = "조회 범위 (분)", required = true, example = "5")
@RequestParam Integer minutes,
@Parameter(description = "중심 경도", required = true, example = "129.0")
@RequestParam Double centerLon,
@Parameter(description = "중심 위도", required = true, example = "35.0")
@RequestParam Double centerLat,
@Parameter(description = "반경 (미터)", required = true, example = "50000")
@RequestParam Double radiusMeters) {
log.info("거리 포함 검색 요청 - minutes: {}, center: ({}, {}), radius: {}",
minutes, centerLon, centerLat, radiusMeters);
List<AisTargetService.AisTargetWithDistanceDto> result =
aisTargetService.searchWithDistance(minutes, centerLon, centerLat, radiusMeters);
return ResponseEntity.ok(ApiResponse.success(
"거리 포함 검색 완료: " + result.size() + "",
result
));
}
// ==================== 항적 조회 ====================
@Operation(
summary = "항적 조회",
description = "특정 MMSI의 시간 범위 내 항적 (위치 이력)을 조회합니다"
)
@GetMapping("/{mmsi}/track")
public ResponseEntity<ApiResponse<List<AisTargetResponseDto>>> getTrack(
@Parameter(description = "MMSI 번호", required = true, example = "440123456")
@PathVariable Long mmsi,
@Parameter(description = "조회 범위 (분)", required = true, example = "60")
@RequestParam Integer minutes) {
log.info("항적 조회 요청 - MMSI: {}, 범위: {}분", mmsi, minutes);
List<AisTargetResponseDto> track = aisTargetService.getTrack(mmsi, minutes);
return ResponseEntity.ok(ApiResponse.success(
"항적 조회 완료: " + track.size() + " 포인트",
track
));
}
// ==================== 캐시 관리 ====================
@Operation(
summary = "캐시 통계 조회",
description = "AIS Target 캐시의 현재 상태를 조회합니다"
)
@GetMapping("/cache/stats")
public ResponseEntity<ApiResponse<Map<String, Object>>> getCacheStats() {
Map<String, Object> stats = aisTargetService.getCacheStats();
return ResponseEntity.ok(ApiResponse.success(stats));
}
@Operation(
summary = "캐시 초기화",
description = "AIS Target 캐시를 초기화합니다"
)
@DeleteMapping("/cache")
public ResponseEntity<ApiResponse<Void>> clearCache() {
log.warn("캐시 초기화 요청");
aisTargetService.clearCache();
return ResponseEntity.ok(ApiResponse.success("캐시가 초기화되었습니다", null));
}
// ==================== 요청 DTO (내부 클래스) ====================
/**
* 폴리곤 검색 요청 DTO
*/
@lombok.Data
public static class PolygonSearchRequest {
@Parameter(description = "조회 범위 (분)", required = true, example = "5")
private int minutes;
@Parameter(description = "폴리곤 좌표 [[lon, lat], ...]", required = true)
private double[][] coordinates;
}
/**
* WKT 검색 요청 DTO
*/
@lombok.Data
public static class WktSearchRequest {
@Parameter(description = "조회 범위 (분)", required = true, example = "5")
private int minutes;
@Parameter(description = "WKT 문자열", required = true,
example = "POLYGON((129 35, 130 35, 130 36, 129 36, 129 35))")
private String wkt;
}
}

파일 보기

@ -0,0 +1,150 @@
package com.snp.batch.jobs.aistarget.web.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import java.util.List;
/**
* AIS Target 필터 검색 요청 DTO
*
* 조건 타입 (condition):
* - GTE: 이상 (>=)
* - GT: 초과 (>)
* - LTE: 이하 (<=)
* - LT: 미만 (<)
* - BETWEEN: 범위 (min <= value <= max)
*
* 모든 필터는 선택사항이며, 미지정 해당 필드 전체 포함
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "AIS Target 필터 검색 요청")
public class AisTargetFilterRequest {
@NotNull(message = "minutes는 필수입니다")
@Min(value = 1, message = "minutes는 최소 1분 이상이어야 합니다")
@Schema(description = "조회 범위 (분)", example = "5", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer minutes;
// ==================== 속도 (SOG) 필터 ====================
@Schema(description = """
속도(SOG) 조건 타입
- GTE: 이상 (>=) - sogValue 사용
- GT: 초과 (>) - sogValue 사용
- LTE: 이하 (<=) - sogValue 사용
- LT: 미만 (<) - sogValue 사용
- BETWEEN: 범위 - sogMin, sogMax 사용
""",
example = "GTE", allowableValues = {"GTE", "GT", "LTE", "LT", "BETWEEN"})
private String sogCondition;
@Schema(description = "속도 값 (knots) - GTE/GT/LTE/LT 조건에서 사용", example = "10.0")
private Double sogValue;
@Schema(description = "속도 최소값 (knots) - BETWEEN 조건에서 사용 (sogMin <= 속도 <= sogMax)", example = "5.0")
private Double sogMin;
@Schema(description = "속도 최대값 (knots) - BETWEEN 조건에서 사용 (sogMin <= 속도 <= sogMax)", example = "15.0")
private Double sogMax;
// ==================== 침로 (COG) 필터 ====================
@Schema(description = """
침로(COG) 조건 타입
- GTE: 이상 (>=) - cogValue 사용
- GT: 초과 (>) - cogValue 사용
- LTE: 이하 (<=) - cogValue 사용
- LT: 미만 (<) - cogValue 사용
- BETWEEN: 범위 - cogMin, cogMax 사용
""",
example = "BETWEEN", allowableValues = {"GTE", "GT", "LTE", "LT", "BETWEEN"})
private String cogCondition;
@Schema(description = "침로 값 (degrees, 0-360) - GTE/GT/LTE/LT 조건에서 사용", example = "180.0")
private Double cogValue;
@Schema(description = "침로 최소값 (degrees) - BETWEEN 조건에서 사용 (cogMin <= 침로 <= cogMax)", example = "90.0")
private Double cogMin;
@Schema(description = "침로 최대값 (degrees) - BETWEEN 조건에서 사용 (cogMin <= 침로 <= cogMax)", example = "270.0")
private Double cogMax;
// ==================== 선수방위 (Heading) 필터 ====================
@Schema(description = """
선수방위(Heading) 조건 타입
- GTE: 이상 (>=) - headingValue 사용
- GT: 초과 (>) - headingValue 사용
- LTE: 이하 (<=) - headingValue 사용
- LT: 미만 (<) - headingValue 사용
- BETWEEN: 범위 - headingMin, headingMax 사용
""",
example = "LTE", allowableValues = {"GTE", "GT", "LTE", "LT", "BETWEEN"})
private String headingCondition;
@Schema(description = "선수방위 값 (degrees, 0-360) - GTE/GT/LTE/LT 조건에서 사용", example = "90.0")
private Double headingValue;
@Schema(description = "선수방위 최소값 (degrees) - BETWEEN 조건에서 사용 (headingMin <= 선수방위 <= headingMax)", example = "0.0")
private Double headingMin;
@Schema(description = "선수방위 최대값 (degrees) - BETWEEN 조건에서 사용 (headingMin <= 선수방위 <= headingMax)", example = "180.0")
private Double headingMax;
// ==================== 목적지 (Destination) 필터 ====================
@Schema(description = "목적지 (부분 일치, 대소문자 무시)", example = "BUSAN")
private String destination;
// ==================== 항행상태 (Status) 필터 ====================
@Schema(description = """
항행상태 목록 (다중 선택 가능, 미선택 전체)
- Under way using engine (기관 사용 항해 )
- Under way sailing ( 항해 )
- Anchored (정박 )
- Moored (계류 )
- Not under command (조종불능)
- Restriced manoeuverability (조종제한)
- Constrained by draught (흘수제약)
- Aground (좌초)
- Engaged in fishing (어로 )
- Power Driven Towing Astern (예인선-후방)
- Power Driven Towing Alongside (예인선-측방)
- AIS Sart (비상위치지시기)
- N/A (정보없음)
""",
example = "[\"Under way using engine\", \"Anchored\", \"Moored\"]")
private List<String> statusList;
// ==================== 필터 존재 여부 확인 ====================
public boolean hasSogFilter() {
return sogCondition != null && !sogCondition.isEmpty();
}
public boolean hasCogFilter() {
return cogCondition != null && !cogCondition.isEmpty();
}
public boolean hasHeadingFilter() {
return headingCondition != null && !headingCondition.isEmpty();
}
public boolean hasDestinationFilter() {
return destination != null && !destination.trim().isEmpty();
}
public boolean hasStatusFilter() {
return statusList != null && !statusList.isEmpty();
}
public boolean hasAnyFilter() {
return hasSogFilter() || hasCogFilter() || hasHeadingFilter()
|| hasDestinationFilter() || hasStatusFilter();
}
}

파일 보기

@ -0,0 +1,87 @@
package com.snp.batch.jobs.aistarget.web.dto;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.OffsetDateTime;
/**
* AIS Target API 응답 DTO
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class AisTargetResponseDto {
// 선박 식별 정보
private Long mmsi;
private Long imo;
private String name;
private String callsign;
private String vesselType;
// 위치 정보
private Double lat;
private Double lon;
// 항해 정보
private Double heading;
private Double sog; // Speed over Ground
private Double cog; // Course over Ground
private Integer rot; // Rate of Turn
// 선박 제원
private Integer length;
private Integer width;
private Double draught;
// 목적지 정보
private String destination;
private OffsetDateTime eta;
private String status;
// 타임스탬프
private OffsetDateTime messageTimestamp;
private OffsetDateTime receivedDate;
// 데이터 소스 (캐시/DB)
private String source;
/**
* Entity -> DTO 변환
*/
public static AisTargetResponseDto from(AisTargetEntity entity, String source) {
if (entity == null) {
return null;
}
return AisTargetResponseDto.builder()
.mmsi(entity.getMmsi())
.imo(entity.getImo())
.name(entity.getName())
.callsign(entity.getCallsign())
.vesselType(entity.getVesselType())
.lat(entity.getLat())
.lon(entity.getLon())
.heading(entity.getHeading())
.sog(entity.getSog())
.cog(entity.getCog())
.rot(entity.getRot())
.length(entity.getLength())
.width(entity.getWidth())
.draught(entity.getDraught())
.destination(entity.getDestination())
.eta(entity.getEta())
.status(entity.getStatus())
.messageTimestamp(entity.getMessageTimestamp())
.receivedDate(entity.getReceivedDate())
.source(source)
.build();
}
}

파일 보기

@ -0,0 +1,48 @@
package com.snp.batch.jobs.aistarget.web.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
/**
* AIS Target 검색 요청 DTO
*
* 필수 파라미터:
* - minutes: 단위 조회 범위 (1~60)
*
* 옵션 파라미터:
* - centerLon, centerLat, radiusMeters: 공간 범위 필터
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "AIS Target 검색 요청")
public class AisTargetSearchRequest {
@NotNull(message = "minutes는 필수입니다")
@Min(value = 1, message = "minutes는 최소 1분 이상이어야 합니다")
@Schema(description = "조회 범위 (분)", example = "5", required = true)
private Integer minutes;
@Schema(description = "중심 경도 (옵션)", example = "129.0")
private Double centerLon;
@Schema(description = "중심 위도 (옵션)", example = "35.0")
private Double centerLat;
@Schema(description = "반경 (미터, 옵션)", example = "50000")
private Double radiusMeters;
/**
* 공간 필터 사용 여부
*/
public boolean hasAreaFilter() {
return centerLon != null && centerLat != null && radiusMeters != null;
}
}

파일 보기

@ -0,0 +1,90 @@
package com.snp.batch.jobs.aistarget.web.dto;
/**
* 숫자 비교 조건 열거형
*
* 사용: SOG, COG, Heading 필터링
*/
public enum NumericCondition {
/**
* 이상 (>=)
*/
GTE,
/**
* 초과 (>)
*/
GT,
/**
* 이하 (<=)
*/
LTE,
/**
* 미만 (<)
*/
LT,
/**
* 범위 (min <= value <= max)
*/
BETWEEN;
/**
* 문자열을 NumericCondition으로 변환
*
* @param value 조건 문자열
* @return NumericCondition (null이면 null 반환)
*/
public static NumericCondition fromString(String value) {
if (value == null || value.trim().isEmpty()) {
return null;
}
try {
return NumericCondition.valueOf(value.toUpperCase().trim());
} catch (IllegalArgumentException e) {
return null;
}
}
/**
* 주어진 값이 조건을 만족하는지 확인
*
* @param fieldValue 필드
* @param compareValue 비교 (GTE, GT, LTE, LT용)
* @param minValue 최소값 (BETWEEN용)
* @param maxValue 최대값 (BETWEEN용)
* @return 조건 만족 여부
*/
public boolean matches(Double fieldValue, Double compareValue, Double minValue, Double maxValue) {
if (fieldValue == null) {
return false;
}
return switch (this) {
case GTE -> compareValue != null && fieldValue >= compareValue;
case GT -> compareValue != null && fieldValue > compareValue;
case LTE -> compareValue != null && fieldValue <= compareValue;
case LT -> compareValue != null && fieldValue < compareValue;
case BETWEEN -> minValue != null && maxValue != null
&& fieldValue >= minValue && fieldValue <= maxValue;
};
}
/**
* SQL 조건절 생성 (DB 쿼리용)
*
* @param columnName 컬럼명
* @return SQL 조건절 문자열
*/
public String toSqlCondition(String columnName) {
return switch (this) {
case GTE -> columnName + " >= ?";
case GT -> columnName + " > ?";
case LTE -> columnName + " <= ?";
case LT -> columnName + " < ?";
case BETWEEN -> columnName + " BETWEEN ? AND ?";
};
}
}

파일 보기

@ -0,0 +1,384 @@
package com.snp.batch.jobs.aistarget.web.service;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import com.snp.batch.jobs.aistarget.batch.repository.AisTargetRepository;
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
import com.snp.batch.jobs.aistarget.cache.AisTargetFilterUtil;
import com.snp.batch.jobs.aistarget.cache.SpatialFilterUtil;
import com.snp.batch.jobs.aistarget.web.dto.AisTargetFilterRequest;
import com.snp.batch.jobs.aistarget.web.dto.AisTargetResponseDto;
import com.snp.batch.jobs.aistarget.web.dto.AisTargetSearchRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.stream.Collectors;
/**
* AIS Target 서비스
*
* 조회 전략:
* 1. 캐시 우선 조회 (Caffeine 캐시)
* 2. 캐시 미스 DB Fallback
* 3. 공간 필터링은 캐시에서 수행 (JTS 기반)
*
* 성능:
* - 캐시 조회: O(1)
* - 공간 필터링: O(n) with 병렬 처리 (25만건 ~50-100ms)
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class AisTargetService {
private final AisTargetRepository aisTargetRepository;
private final AisTargetCacheManager cacheManager;
private final SpatialFilterUtil spatialFilterUtil;
private final AisTargetFilterUtil filterUtil;
private static final String SOURCE_CACHE = "cache";
private static final String SOURCE_DB = "db";
// ==================== 단건 조회 ====================
/**
* MMSI로 최신 위치 조회 (캐시 우선)
*/
public Optional<AisTargetResponseDto> findLatestByMmsi(Long mmsi) {
log.debug("최신 위치 조회 - MMSI: {}", mmsi);
// 1. 캐시 조회
Optional<AisTargetEntity> cached = cacheManager.get(mmsi);
if (cached.isPresent()) {
log.debug("캐시 히트 - MMSI: {}", mmsi);
return Optional.of(AisTargetResponseDto.from(cached.get(), SOURCE_CACHE));
}
// 2. DB 조회 (캐시 미스)
log.debug("캐시 미스, DB 조회 - MMSI: {}", mmsi);
Optional<AisTargetEntity> fromDb = aisTargetRepository.findLatestByMmsi(mmsi);
if (fromDb.isPresent()) {
// 3. 캐시 업데이트
cacheManager.put(fromDb.get());
log.debug("DB 조회 성공, 캐시 업데이트 - MMSI: {}", mmsi);
return Optional.of(AisTargetResponseDto.from(fromDb.get(), SOURCE_DB));
}
return Optional.empty();
}
// ==================== 다건 조회 ====================
/**
* 여러 MMSI의 최신 위치 조회 (캐시 우선)
*/
public List<AisTargetResponseDto> findLatestByMmsiList(List<Long> mmsiList) {
if (mmsiList == null || mmsiList.isEmpty()) {
return Collections.emptyList();
}
log.debug("다건 최신 위치 조회 - 요청: {} 건", mmsiList.size());
List<AisTargetResponseDto> result = new ArrayList<>();
// 1. 캐시에서 조회
Map<Long, AisTargetEntity> cachedData = cacheManager.getAll(mmsiList);
for (AisTargetEntity entity : cachedData.values()) {
result.add(AisTargetResponseDto.from(entity, SOURCE_CACHE));
}
// 2. 캐시 미스 목록
List<Long> missedMmsiList = mmsiList.stream()
.filter(mmsi -> !cachedData.containsKey(mmsi))
.collect(Collectors.toList());
// 3. DB에서 캐시 미스 데이터 조회
if (!missedMmsiList.isEmpty()) {
log.debug("캐시 미스 DB 조회 - {} 건", missedMmsiList.size());
List<AisTargetEntity> fromDb = aisTargetRepository.findLatestByMmsiIn(missedMmsiList);
for (AisTargetEntity entity : fromDb) {
// 캐시 업데이트
cacheManager.put(entity);
result.add(AisTargetResponseDto.from(entity, SOURCE_DB));
}
}
log.debug("조회 완료 - 캐시: {}, DB: {}, 총: {}",
cachedData.size(), result.size() - cachedData.size(), result.size());
return result;
}
// ==================== 검색 조회 (캐시 기반) ====================
/**
* 시간 범위 + 옵션 공간 범위로 선박 검색 (캐시 우선)
*
* 전략:
* 1. 캐시에서 시간 범위 데이터 조회
* 2. 공간 필터 있으면 JTS로 필터링
* 3. 캐시 데이터가 없으면 DB Fallback
*/
public List<AisTargetResponseDto> search(AisTargetSearchRequest request) {
log.debug("선박 검색 - minutes: {}, hasArea: {}",
request.getMinutes(), request.hasAreaFilter());
long startTime = System.currentTimeMillis();
// 1. 캐시에서 시간 범위 데이터 조회
List<AisTargetEntity> entities = cacheManager.getByTimeRange(request.getMinutes());
String source = SOURCE_CACHE;
// 캐시가 비어있으면 DB Fallback
if (entities.isEmpty()) {
log.debug("캐시 비어있음, DB Fallback");
entities = searchFromDb(request);
source = SOURCE_DB;
// DB 결과를 캐시에 저장
for (AisTargetEntity entity : entities) {
cacheManager.put(entity);
}
} else if (request.hasAreaFilter()) {
// 2. 공간 필터링 (JTS 기반, 병렬 처리)
entities = spatialFilterUtil.filterByCircle(
entities,
request.getCenterLon(),
request.getCenterLat(),
request.getRadiusMeters()
);
}
long elapsed = System.currentTimeMillis() - startTime;
log.info("선박 검색 완료 - 소스: {}, 결과: {} 건, 소요: {}ms",
source, entities.size(), elapsed);
final String finalSource = source;
return entities.stream()
.map(e -> AisTargetResponseDto.from(e, finalSource))
.collect(Collectors.toList());
}
/**
* DB에서 검색 (Fallback)
*/
private List<AisTargetEntity> searchFromDb(AisTargetSearchRequest request) {
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
OffsetDateTime start = now.minusMinutes(request.getMinutes());
if (request.hasAreaFilter()) {
return aisTargetRepository.findByTimeRangeAndArea(
start, now,
request.getCenterLon(),
request.getCenterLat(),
request.getRadiusMeters()
);
} else {
// 공간 필터 없으면 전체 조회 (주의: 대량 데이터)
return aisTargetRepository.findByTimeRangeAndArea(
start, now,
0.0, 0.0, Double.MAX_VALUE
);
}
}
// ==================== 조건 필터 검색 ====================
/**
* 항해 조건 필터 검색 (캐시 우선)
*
* 필터 조건:
* - SOG (속도): 이상/초과/이하/미만/범위
* - COG (침로): 이상/초과/이하/미만/범위
* - Heading (선수방위): 이상/초과/이하/미만/범위
* - Destination (목적지): 부분 일치
* - Status (항행상태): 다중 선택
*
* @param request 필터 조건
* @return 조건에 맞는 선박 목록
*/
public List<AisTargetResponseDto> searchByFilter(AisTargetFilterRequest request) {
log.debug("필터 검색 - minutes: {}, hasFilter: {}",
request.getMinutes(), request.hasAnyFilter());
long startTime = System.currentTimeMillis();
// 1. 캐시에서 시간 범위 데이터 조회
List<AisTargetEntity> entities = cacheManager.getByTimeRange(request.getMinutes());
String source = SOURCE_CACHE;
// 캐시가 비어있으면 DB Fallback
if (entities.isEmpty()) {
log.debug("캐시 비어있음, DB Fallback");
entities = searchByFilterFromDb(request);
source = SOURCE_DB;
// DB 결과를 캐시에 저장
for (AisTargetEntity entity : entities) {
cacheManager.put(entity);
}
// DB에서 가져온 후에도 필터 적용 (DB 쿼리는 시간 범위만 적용)
entities = filterUtil.filter(entities, request);
} else {
// 2. 캐시 데이터에 필터 적용
entities = filterUtil.filter(entities, request);
}
long elapsed = System.currentTimeMillis() - startTime;
log.info("필터 검색 완료 - 소스: {}, 결과: {} 건, 소요: {}ms",
source, entities.size(), elapsed);
final String finalSource = source;
return entities.stream()
.map(e -> AisTargetResponseDto.from(e, finalSource))
.collect(Collectors.toList());
}
/**
* DB에서 필터 검색 (Fallback) - 시간 범위만 적용
*/
private List<AisTargetEntity> searchByFilterFromDb(AisTargetFilterRequest request) {
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
OffsetDateTime start = now.minusMinutes(request.getMinutes());
// DB에서는 시간 범위만 조회하고, 나머지 필터는 메모리에서 적용
return aisTargetRepository.findByTimeRangeAndArea(
start, now,
0.0, 0.0, Double.MAX_VALUE
);
}
// ==================== 폴리곤 검색 ====================
/**
* 폴리곤 범위 선박 검색 (캐시 기반)
*
* @param minutes 시간 범위 ()
* @param polygonCoordinates 폴리곤 좌표 [[lon, lat], ...]
* @return 범위 선박 목록
*/
public List<AisTargetResponseDto> searchByPolygon(int minutes, double[][] polygonCoordinates) {
log.debug("폴리곤 검색 - minutes: {}, points: {}", minutes, polygonCoordinates.length);
long startTime = System.currentTimeMillis();
// 1. 캐시에서 시간 범위 데이터 조회
List<AisTargetEntity> entities = cacheManager.getByTimeRange(minutes);
// 2. 폴리곤 필터링 (JTS 기반)
entities = spatialFilterUtil.filterByPolygon(entities, polygonCoordinates);
long elapsed = System.currentTimeMillis() - startTime;
log.info("폴리곤 검색 완료 - 결과: {} 건, 소요: {}ms", entities.size(), elapsed);
return entities.stream()
.map(e -> AisTargetResponseDto.from(e, SOURCE_CACHE))
.collect(Collectors.toList());
}
/**
* WKT 형식 폴리곤으로 검색
*
* @param minutes 시간 범위 ()
* @param wkt WKT 문자열 (: "POLYGON((129 35, 130 35, 130 36, 129 36, 129 35))")
* @return 범위 선박 목록
*/
public List<AisTargetResponseDto> searchByWkt(int minutes, String wkt) {
log.debug("WKT 검색 - minutes: {}, wkt: {}", minutes, wkt);
long startTime = System.currentTimeMillis();
// 1. 캐시에서 시간 범위 데이터 조회
List<AisTargetEntity> entities = cacheManager.getByTimeRange(minutes);
// 2. WKT 필터링 (JTS 기반)
entities = spatialFilterUtil.filterByWkt(entities, wkt);
long elapsed = System.currentTimeMillis() - startTime;
log.info("WKT 검색 완료 - 결과: {} 건, 소요: {}ms", entities.size(), elapsed);
return entities.stream()
.map(e -> AisTargetResponseDto.from(e, SOURCE_CACHE))
.collect(Collectors.toList());
}
// ==================== 거리 포함 검색 ====================
/**
* 원형 범위 검색 + 거리 정보 포함
*/
public List<AisTargetWithDistanceDto> searchWithDistance(
int minutes, double centerLon, double centerLat, double radiusMeters) {
log.debug("거리 포함 검색 - minutes: {}, center: ({}, {}), radius: {}",
minutes, centerLon, centerLat, radiusMeters);
// 1. 캐시에서 시간 범위 데이터 조회
List<AisTargetEntity> entities = cacheManager.getByTimeRange(minutes);
// 2. 거리 포함 필터링
List<SpatialFilterUtil.EntityWithDistance> filtered =
spatialFilterUtil.filterByCircleWithDistance(entities, centerLon, centerLat, radiusMeters);
return filtered.stream()
.map(ewd -> new AisTargetWithDistanceDto(
AisTargetResponseDto.from(ewd.getEntity(), SOURCE_CACHE),
ewd.getDistanceMeters()
))
.collect(Collectors.toList());
}
// ==================== 항적 조회 ====================
/**
* 특정 MMSI의 시간 범위 항적 조회
*/
public List<AisTargetResponseDto> getTrack(Long mmsi, Integer minutes) {
log.debug("항적 조회 - MMSI: {}, 범위: {}분", mmsi, minutes);
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
OffsetDateTime start = now.minusMinutes(minutes);
List<AisTargetEntity> track = aisTargetRepository.findByMmsiAndTimeRange(mmsi, start, now);
log.debug("항적 조회 완료 - MMSI: {}, 포인트: {} 개", mmsi, track.size());
return track.stream()
.map(e -> AisTargetResponseDto.from(e, SOURCE_DB))
.collect(Collectors.toList());
}
// ==================== 캐시 관리 ====================
/**
* 캐시 통계 조회
*/
public Map<String, Object> getCacheStats() {
return cacheManager.getStats();
}
/**
* 캐시 초기화
*/
public void clearCache() {
cacheManager.clear();
}
// ==================== 내부 DTO ====================
/**
* 거리 정보 포함 응답 DTO
*/
@lombok.Data
@lombok.AllArgsConstructor
public static class AisTargetWithDistanceDto {
private AisTargetResponseDto target;
private double distanceMeters;
}
}

파일 보기

@ -35,6 +35,12 @@ public class RiskDataReader extends BaseApiReader<RiskDto> {
return "riskDataReader"; return "riskDataReader";
} }
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override @Override
protected String getApiPath() { protected String getApiPath() {
return "/RiskAndCompliance/RisksByImos"; return "/RiskAndCompliance/RisksByImos";

파일 보기

@ -36,6 +36,12 @@ public class ComplianceDataReader extends BaseApiReader<ComplianceDto> {
return "ComplianceDataReader"; return "ComplianceDataReader";
} }
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override @Override
protected String getApiPath() { protected String getApiPath() {
return "/RiskAndCompliance/CompliancesByImos"; return "/RiskAndCompliance/CompliancesByImos";

파일 보기

@ -73,6 +73,13 @@ public class ShipMovementReader extends BaseApiReader<PortCallDto> {
return "ShipMovementReader"; return "ShipMovementReader";
} }
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
this.dbMasterHashes = null;
}
@Override @Override
protected String getApiPath() { protected String getApiPath() {
return "/Movements"; return "/Movements";

파일 보기

@ -59,6 +59,13 @@ public class ShipDetailDataReader extends BaseApiReader<ShipDetailComparisonData
return "ShipDetailDataReader"; return "ShipDetailDataReader";
} }
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
this.dbMasterHashes = null;
}
@Override @Override
protected String getApiPath() { protected String getApiPath() {
return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll"; return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll";

파일 보기

@ -33,6 +33,12 @@ public class ShipLastPositionDataReader extends BaseApiReader<TargetEnhancedDto>
return "ShipLastPositionDataReader"; return "ShipLastPositionDataReader";
} }
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.allImoNumbers = null;
}
@Override @Override
protected String getApiPath() { protected String getApiPath() {
return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced"; return "/AisSvc.svc/AIS/GetTargetsByIMOsEnhanced";

파일 보기

@ -29,7 +29,7 @@ public class ShipDataProcessor extends BaseProcessor<ShipDto, ShipEntity> {
return null; // 스킵 return null; // 스킵
} }
log.debug("선박 데이터 처리 중: IMO {}", item.getImoNumber()); // log.debug("선박 데이터 처리 중: IMO {}", item.getImoNumber());
// 중복 체크 업데이트 // 중복 체크 업데이트
return shipRepository.findByImoNumber(item.getImoNumber()) return shipRepository.findByImoNumber(item.getImoNumber())

파일 보기

@ -17,7 +17,7 @@ spring:
jpa: jpa:
hibernate: hibernate:
ddl-auto: update ddl-auto: update
show-sql: true show-sql: false
properties: properties:
hibernate: hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect dialect: org.hibernate.dialect.PostgreSQLDialect
@ -55,9 +55,9 @@ spring:
# Server Configuration # Server Configuration
server: server:
port: 8041 port: 8081
servlet: servlet:
context-path: / context-path: /snp-api
# Actuator Configuration # Actuator Configuration
management: management:
@ -69,18 +69,9 @@ management:
health: health:
show-details: always show-details: always
# Logging Configuration # Logging Configuration (logback-spring.xml에서 상세 설정)
logging: logging:
level: config: classpath:logback-spring.xml
root: INFO
com.snp.batch: DEBUG
org.springframework.batch: DEBUG
org.springframework.jdbc: DEBUG
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
file:
name: logs/snp-batch.log
# Custom Application Properties # Custom Application Properties
app: app:
@ -100,3 +91,15 @@ app:
schedule: schedule:
enabled: true enabled: true
cron: "0 0 * * * ?" # Every hour cron: "0 0 * * * ?" # Every hour
# AIS Target 배치 설정
ais-target:
since-seconds: 60 # API 조회 범위 (초)
chunk-size: 5000 # 배치 청크 크기
schedule:
cron: "15 * * * * ?" # 매 분 15초 실행
partition:
months-ahead: 2 # 미리 생성할 파티션 개월 수
# AIS Target 캐시 설정
ais-target-cache:
ttl-minutes: 120 # 캐시 TTL (분) - 2시간
max-size: 300000 # 최대 캐시 크기 - 30만 건

파일 보기

@ -0,0 +1,234 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="30 seconds">
<!-- ========================================
프로퍼티 정의
======================================== -->
<property name="LOG_PATH" value="logs"/>
<property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{40} - %msg%n"/>
<property name="LOG_PATTERN_SIMPLE" value="%d{HH:mm:ss.SSS} %-5level %logger{20} - %msg%n"/>
<!-- 파일 크기 및 보관 설정 -->
<property name="MAX_FILE_SIZE" value="100MB"/>
<property name="MAX_HISTORY" value="30"/>
<property name="TOTAL_SIZE_CAP" value="10GB"/>
<!-- ========================================
콘솔 Appender (개발용)
======================================== -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${LOG_PATTERN_SIMPLE}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- ========================================
메인 로그 (application.log)
- 일반 애플리케이션 로그
- INFO 레벨 이상
======================================== -->
<appender name="APP_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/application.log</file>
<encoder>
<pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/archive/application.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
<maxHistory>${MAX_HISTORY}</maxHistory>
<totalSizeCap>${TOTAL_SIZE_CAP}</totalSizeCap>
</rollingPolicy>
</appender>
<!-- ========================================
배치 로그 (batch.log)
- Spring Batch Job/Step 실행 이력
- Job 시작/종료, 처리 건수 등
======================================== -->
<appender name="BATCH_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/batch.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{jobName}] %logger{30} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/archive/batch.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
<maxHistory>${MAX_HISTORY}</maxHistory>
<totalSizeCap>${TOTAL_SIZE_CAP}</totalSizeCap>
</rollingPolicy>
</appender>
<!-- ========================================
API 요청 로그 (api-access.log)
- REST API 요청/응답 이력
- 요청 IP, 파라미터, 응답시간 등
======================================== -->
<appender name="API_ACCESS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/api-access.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/archive/api-access.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
<maxHistory>${MAX_HISTORY}</maxHistory>
<totalSizeCap>${TOTAL_SIZE_CAP}</totalSizeCap>
</rollingPolicy>
</appender>
<!-- ========================================
메트릭 로그 (metrics.log)
- 성능 지표, 통계 정보
- 주기적 상태 체크 로그
======================================== -->
<appender name="METRICS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/metrics.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/archive/metrics.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>50MB</maxFileSize>
<maxHistory>7</maxHistory>
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
</appender>
<!-- ========================================
에러 로그 (error.log)
- ERROR, WARN 레벨만 기록
- 문제 추적용
======================================== -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/error.log</file>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
<encoder>
<pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/archive/error.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
<maxHistory>60</maxHistory>
<totalSizeCap>5GB</totalSizeCap>
</rollingPolicy>
</appender>
<!-- ========================================
비동기 Appender (성능 최적화)
======================================== -->
<appender name="ASYNC_APP" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>512</queueSize>
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="APP_FILE"/>
</appender>
<appender name="ASYNC_BATCH" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>256</queueSize>
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="BATCH_FILE"/>
</appender>
<!-- ========================================
Logger 설정 - 패키지별 레벨
======================================== -->
<!-- 애플리케이션 로거 -->
<logger name="com.snp.batch" level="INFO" additivity="false">
<appender-ref ref="ASYNC_APP"/>
<appender-ref ref="ERROR_FILE"/>
<appender-ref ref="CONSOLE"/>
</logger>
<!-- 배치 Job/Config 로거 -->
<logger name="com.snp.batch.jobs" level="INFO" additivity="false">
<appender-ref ref="ASYNC_BATCH"/>
<appender-ref ref="ERROR_FILE"/>
<appender-ref ref="CONSOLE"/>
</logger>
<!-- 배치 Reader/Processor/Writer - 개별 row 로그 최소화 -->
<logger name="com.snp.batch.common.batch.reader" level="INFO" additivity="false">
<appender-ref ref="ASYNC_BATCH"/>
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="com.snp.batch.jobs.aistarget.batch.reader" level="INFO" additivity="false">
<appender-ref ref="ASYNC_BATCH"/>
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="com.snp.batch.jobs.aistarget.batch.processor" level="WARN" additivity="false">
<appender-ref ref="ASYNC_BATCH"/>
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="com.snp.batch.jobs.aistarget.batch.writer" level="INFO" additivity="false">
<appender-ref ref="ASYNC_BATCH"/>
<appender-ref ref="CONSOLE"/>
</logger>
<!-- API 요청 로거 -->
<logger name="com.snp.batch.api.logging" level="INFO" additivity="false">
<appender-ref ref="API_ACCESS_FILE"/>
</logger>
<!-- 메트릭 로거 -->
<logger name="com.snp.batch.metrics" level="INFO" additivity="false">
<appender-ref ref="METRICS_FILE"/>
</logger>
<!-- 파티션 관리 로거 -->
<logger name="com.snp.batch.global.partition" level="INFO" additivity="false">
<appender-ref ref="ASYNC_BATCH"/>
<appender-ref ref="CONSOLE"/>
</logger>
<!-- Spring Batch 프레임워크 - 핵심 로그만 -->
<logger name="org.springframework.batch" level="WARN" additivity="false">
<appender-ref ref="ASYNC_BATCH"/>
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="org.springframework.batch.core.job" level="INFO" additivity="false">
<appender-ref ref="ASYNC_BATCH"/>
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="org.springframework.batch.core.step" level="INFO" additivity="false">
<appender-ref ref="ASYNC_BATCH"/>
<appender-ref ref="CONSOLE"/>
</logger>
<!-- Spring 프레임워크 -->
<logger name="org.springframework" level="WARN"/>
<logger name="org.springframework.web" level="WARN"/>
<!-- Hibernate/JPA - SQL 로그 최소화 -->
<logger name="org.hibernate" level="WARN"/>
<logger name="org.hibernate.SQL" level="WARN"/>
<logger name="org.hibernate.type.descriptor.sql" level="WARN"/>
<!-- JDBC -->
<logger name="org.springframework.jdbc" level="WARN"/>
<logger name="com.zaxxer.hikari" level="WARN"/>
<!-- Quartz 스케줄러 -->
<logger name="org.quartz" level="WARN"/>
<!-- WebClient/Reactor -->
<logger name="reactor.netty" level="WARN"/>
<logger name="io.netty" level="WARN"/>
<!-- ========================================
Root Logger
======================================== -->
<root level="INFO">
<appender-ref ref="ASYNC_APP"/>
<appender-ref ref="ERROR_FILE"/>
<appender-ref ref="CONSOLE"/>
</root>
</configuration>

파일 보기

@ -0,0 +1,449 @@
-- ============================================
-- AIS Target 파티션 테이블 DDL
-- ============================================
-- 용도: 선박 AIS 위치 정보 저장 (항적 분석용)
-- 수집 주기: 매 분 15초
-- 예상 데이터량: 약 33,000건/분
-- 파티셔닝: 월별 파티션 (ais_target_YYYY_MM)
-- ============================================
-- PostGIS 확장 활성화 (이미 설치되어 있다면 생략)
CREATE EXTENSION IF NOT EXISTS postgis;
-- ============================================
-- 1. 부모 테이블 생성 (파티션 테이블)
-- ============================================
CREATE TABLE IF NOT EXISTS snp_data.ais_target (
-- ========== PK (복합키) ==========
mmsi BIGINT NOT NULL,
message_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
-- ========== 선박 식별 정보 ==========
imo BIGINT,
name VARCHAR(100),
callsign VARCHAR(20),
vessel_type VARCHAR(50),
extra_info VARCHAR(100),
-- ========== 위치 정보 ==========
lat DOUBLE PRECISION NOT NULL,
lon DOUBLE PRECISION NOT NULL,
geom GEOMETRY(Point, 4326),
-- ========== 항해 정보 ==========
heading DOUBLE PRECISION,
sog DOUBLE PRECISION, -- Speed over Ground (knots)
cog DOUBLE PRECISION, -- Course over Ground (degrees)
rot INTEGER, -- Rate of Turn (degrees/min)
-- ========== 선박 제원 ==========
length INTEGER,
width INTEGER,
draught DOUBLE PRECISION,
length_bow INTEGER,
length_stern INTEGER,
width_port INTEGER,
width_starboard INTEGER,
-- ========== 목적지 정보 ==========
destination VARCHAR(200),
eta TIMESTAMP WITH TIME ZONE,
status VARCHAR(50),
-- ========== AIS 메시지 정보 ==========
age_minutes DOUBLE PRECISION,
position_accuracy INTEGER,
timestamp_utc INTEGER,
repeat_indicator INTEGER,
raim_flag INTEGER,
radio_status INTEGER,
regional INTEGER,
regional2 INTEGER,
spare INTEGER,
spare2 INTEGER,
ais_version INTEGER,
position_fix_type INTEGER,
dte INTEGER,
band_flag INTEGER,
-- ========== 타임스탬프 ==========
received_date TIMESTAMP WITH TIME ZONE,
collected_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- ========== 제약조건 ==========
CONSTRAINT pk_ais_target PRIMARY KEY (mmsi, message_timestamp)
) PARTITION BY RANGE (message_timestamp);
-- ============================================
-- 2. 초기 파티션 생성 (현재 월 + 다음 월)
-- ============================================
-- 예: 2025년 12월과 2026년 1월 파티션
-- 실제 운영 시 create_ais_target_partition 함수로 자동 생성
-- 2025년 12월 파티션
CREATE TABLE IF NOT EXISTS snp_data.ais_target_2025_12 PARTITION OF snp_data.ais_target
FOR VALUES FROM ('2025-12-01 00:00:00+00') TO ('2026-01-01 00:00:00+00');
-- 2026년 1월 파티션
CREATE TABLE IF NOT EXISTS snp_data.ais_target_2026_01 PARTITION OF snp_data.ais_target
FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2026-02-01 00:00:00+00');
-- ============================================
-- 3. 인덱스 생성 (각 파티션에 자동 상속)
-- ============================================
-- 1. MMSI 인덱스 (특정 선박 조회)
CREATE INDEX IF NOT EXISTS idx_ais_target_mmsi
ON snp_data.ais_target (mmsi);
-- 2. IMO 인덱스 (IMO가 있는 선박만)
CREATE INDEX IF NOT EXISTS idx_ais_target_imo
ON snp_data.ais_target (imo)
WHERE imo IS NOT NULL AND imo > 0;
-- 3. 메시지 타임스탬프 인덱스 (시간 범위 조회)
CREATE INDEX IF NOT EXISTS idx_ais_target_message_timestamp
ON snp_data.ais_target (message_timestamp DESC);
-- 4. MMSI + 타임스탬프 복합 인덱스 (항적 조회 최적화)
CREATE INDEX IF NOT EXISTS idx_ais_target_mmsi_timestamp
ON snp_data.ais_target (mmsi, message_timestamp DESC);
-- 5. 공간 인덱스 (GIST) - 공간 쿼리 최적화
CREATE INDEX IF NOT EXISTS idx_ais_target_geom
ON snp_data.ais_target USING GIST (geom);
-- 6. 수집 시점 인덱스 (배치 모니터링용)
CREATE INDEX IF NOT EXISTS idx_ais_target_collected_at
ON snp_data.ais_target (collected_at DESC);
-- ============================================
-- 4. 파티션 자동 생성 함수
-- ============================================
-- 파티션 존재 여부 확인 함수
CREATE OR REPLACE FUNCTION snp_data.partition_exists(partition_name TEXT)
RETURNS BOOLEAN AS $$
BEGIN
RETURN EXISTS (
SELECT 1 FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'snp_data'
AND c.relname = partition_name
AND c.relkind = 'r'
);
END;
$$ LANGUAGE plpgsql;
-- 특정 월의 파티션 생성 함수
CREATE OR REPLACE FUNCTION snp_data.create_ais_target_partition(target_date DATE)
RETURNS TEXT AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
create_sql TEXT;
BEGIN
-- 파티션 이름 생성: ais_target_YYYY_MM
partition_name := 'ais_target_' || TO_CHAR(target_date, 'YYYY_MM');
-- 시작/종료 날짜 계산
start_date := DATE_TRUNC('month', target_date)::DATE;
end_date := (DATE_TRUNC('month', target_date) + INTERVAL '1 month')::DATE;
-- 이미 존재하면 스킵
IF snp_data.partition_exists(partition_name) THEN
RAISE NOTICE 'Partition % already exists, skipping', partition_name;
RETURN partition_name || ' (already exists)';
END IF;
-- 파티션 생성 SQL
create_sql := format(
'CREATE TABLE snp_data.%I PARTITION OF snp_data.ais_target FOR VALUES FROM (%L) TO (%L)',
partition_name,
start_date::TIMESTAMP WITH TIME ZONE,
end_date::TIMESTAMP WITH TIME ZONE
);
EXECUTE create_sql;
RAISE NOTICE 'Created partition: % (% to %)', partition_name, start_date, end_date;
RETURN partition_name;
END;
$$ LANGUAGE plpgsql;
-- 다음 N개월 파티션 사전 생성 함수
CREATE OR REPLACE FUNCTION snp_data.create_future_ais_target_partitions(months_ahead INTEGER DEFAULT 2)
RETURNS TABLE (partition_name TEXT, status TEXT) AS $$
DECLARE
i INTEGER;
target_date DATE;
result TEXT;
BEGIN
FOR i IN 0..months_ahead LOOP
target_date := DATE_TRUNC('month', CURRENT_DATE + (i || ' months')::INTERVAL)::DATE;
result := snp_data.create_ais_target_partition(target_date);
partition_name := 'ais_target_' || TO_CHAR(target_date, 'YYYY_MM');
status := result;
RETURN NEXT;
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- ============================================
-- 5. 오래된 파티션 삭제 함수
-- ============================================
-- 특정 월의 파티션 삭제 함수
CREATE OR REPLACE FUNCTION snp_data.drop_ais_target_partition(target_date DATE)
RETURNS TEXT AS $$
DECLARE
partition_name TEXT;
drop_sql TEXT;
BEGIN
partition_name := 'ais_target_' || TO_CHAR(target_date, 'YYYY_MM');
-- 존재하지 않으면 스킵
IF NOT snp_data.partition_exists(partition_name) THEN
RAISE NOTICE 'Partition % does not exist, skipping', partition_name;
RETURN partition_name || ' (not found)';
END IF;
drop_sql := format('DROP TABLE snp_data.%I', partition_name);
EXECUTE drop_sql;
RAISE NOTICE 'Dropped partition: %', partition_name;
RETURN partition_name || ' (dropped)';
END;
$$ LANGUAGE plpgsql;
-- N개월 이전 파티션 정리 함수
CREATE OR REPLACE FUNCTION snp_data.cleanup_old_ais_target_partitions(retention_months INTEGER DEFAULT 3)
RETURNS TABLE (partition_name TEXT, status TEXT) AS $$
DECLARE
rec RECORD;
partition_date DATE;
cutoff_date DATE;
BEGIN
cutoff_date := DATE_TRUNC('month', CURRENT_DATE - (retention_months || ' months')::INTERVAL)::DATE;
-- ais_target_YYYY_MM 패턴의 파티션 조회
FOR rec IN
SELECT c.relname
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN pg_inherits i ON i.inhrelid = c.oid
WHERE n.nspname = 'snp_data'
AND c.relname LIKE 'ais_target_%'
AND c.relkind = 'r'
ORDER BY c.relname
LOOP
-- 파티션 이름에서 날짜 추출 (ais_target_YYYY_MM)
BEGIN
partition_date := TO_DATE(SUBSTRING(rec.relname FROM 'ais_target_(\d{4}_\d{2})'), 'YYYY_MM');
IF partition_date < cutoff_date THEN
EXECUTE format('DROP TABLE snp_data.%I', rec.relname);
partition_name := rec.relname;
status := 'dropped';
RETURN NEXT;
RAISE NOTICE 'Dropped old partition: %', rec.relname;
END IF;
EXCEPTION WHEN OTHERS THEN
-- 날짜 파싱 실패 시 스킵
CONTINUE;
END;
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- 파티션별 통계 조회 함수
CREATE OR REPLACE FUNCTION snp_data.ais_target_partition_stats()
RETURNS TABLE (
partition_name TEXT,
row_count BIGINT,
size_bytes BIGINT,
size_pretty TEXT
) AS $$
BEGIN
RETURN QUERY
SELECT
c.relname::TEXT as partition_name,
(SELECT COUNT(*)::BIGINT FROM snp_data.ais_target WHERE message_timestamp >=
TO_DATE(SUBSTRING(c.relname FROM 'ais_target_(\d{4}_\d{2})'), 'YYYY_MM')
AND message_timestamp <
TO_DATE(SUBSTRING(c.relname FROM 'ais_target_(\d{4}_\d{2})'), 'YYYY_MM') + INTERVAL '1 month'
) as row_count,
pg_relation_size(c.oid) as size_bytes,
pg_size_pretty(pg_relation_size(c.oid)) as size_pretty
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN pg_inherits i ON i.inhrelid = c.oid
WHERE n.nspname = 'snp_data'
AND c.relname LIKE 'ais_target_%'
AND c.relkind = 'r'
ORDER BY c.relname;
END;
$$ LANGUAGE plpgsql;
-- ============================================
-- 6. 코멘트
-- ============================================
COMMENT ON TABLE snp_data.ais_target IS 'AIS 선박 위치 정보 (매 분 15초 수집, 월별 파티션)';
COMMENT ON COLUMN snp_data.ais_target.mmsi IS 'Maritime Mobile Service Identity (복합 PK)';
COMMENT ON COLUMN snp_data.ais_target.message_timestamp IS 'AIS 메시지 발생 시간 (복합 PK, 파티션 키)';
COMMENT ON COLUMN snp_data.ais_target.imo IS 'IMO 선박 번호';
COMMENT ON COLUMN snp_data.ais_target.geom IS 'PostGIS Point geometry (SRID 4326 - WGS84)';
COMMENT ON COLUMN snp_data.ais_target.sog IS 'Speed over Ground (대지속력, knots)';
COMMENT ON COLUMN snp_data.ais_target.cog IS 'Course over Ground (대지침로, degrees)';
COMMENT ON COLUMN snp_data.ais_target.rot IS 'Rate of Turn (선회율, degrees/min)';
COMMENT ON COLUMN snp_data.ais_target.heading IS '선수 방향 (degrees)';
COMMENT ON COLUMN snp_data.ais_target.draught IS '흘수 (meters)';
COMMENT ON COLUMN snp_data.ais_target.collected_at IS '배치 수집 시점';
COMMENT ON COLUMN snp_data.ais_target.received_date IS 'API 수신 시간';
-- ============================================
-- 유지보수용 함수: 오래된 데이터 정리
-- ============================================
-- 오래된 데이터 삭제 함수 (기본: 7일 이전)
CREATE OR REPLACE FUNCTION snp_data.cleanup_ais_target(retention_days INTEGER DEFAULT 7)
RETURNS INTEGER AS $$
DECLARE
deleted_count INTEGER;
BEGIN
DELETE FROM snp_data.ais_target
WHERE message_timestamp < NOW() - (retention_days || ' days')::INTERVAL;
GET DIAGNOSTICS deleted_count = ROW_COUNT;
RAISE NOTICE 'Deleted % rows older than % days', deleted_count, retention_days;
RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION snp_data.create_ais_target_partition IS '특정 월의 AIS Target 파티션 생성';
COMMENT ON FUNCTION snp_data.create_future_ais_target_partitions IS '향후 N개월 파티션 사전 생성';
COMMENT ON FUNCTION snp_data.drop_ais_target_partition IS '특정 월의 파티션 삭제';
COMMENT ON FUNCTION snp_data.cleanup_old_ais_target_partitions IS 'N개월 이전 파티션 정리';
COMMENT ON FUNCTION snp_data.ais_target_partition_stats IS '파티션별 통계 조회';
-- ============================================
-- 7. 유지보수용 함수: 통계 조회
-- ============================================
CREATE OR REPLACE FUNCTION snp_data.ais_target_stats()
RETURNS TABLE (
total_count BIGINT,
unique_mmsi_count BIGINT,
unique_imo_count BIGINT,
oldest_record TIMESTAMP WITH TIME ZONE,
newest_record TIMESTAMP WITH TIME ZONE,
last_hour_count BIGINT
) AS $$
BEGIN
RETURN QUERY
SELECT
COUNT(*)::BIGINT as total_count,
COUNT(DISTINCT mmsi)::BIGINT as unique_mmsi_count,
COUNT(DISTINCT imo) FILTER (WHERE imo IS NOT NULL AND imo > 0)::BIGINT as unique_imo_count,
MIN(message_timestamp) as oldest_record,
MAX(message_timestamp) as newest_record,
COUNT(*) FILTER (WHERE message_timestamp > NOW() - INTERVAL '1 hour')::BIGINT as last_hour_count
FROM snp_data.ais_target;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION snp_data.ais_target_stats IS 'AIS Target 테이블 통계 조회';
-- ============================================
-- 예시 쿼리
-- ============================================
-- 1. 특정 MMSI의 최신 위치 조회
-- SELECT * FROM snp_data.ais_target WHERE mmsi = 123456789 ORDER BY message_timestamp DESC LIMIT 1;
-- 2. 특정 시간 범위의 항적 조회
-- SELECT * FROM snp_data.ais_target
-- WHERE mmsi = 123456789
-- AND message_timestamp BETWEEN '2025-12-01 00:00:00+00' AND '2025-12-01 01:00:00+00'
-- ORDER BY message_timestamp;
-- 3. 특정 구역(원형) 내 선박 조회
-- SELECT DISTINCT ON (mmsi) *
-- FROM snp_data.ais_target
-- WHERE message_timestamp > NOW() - INTERVAL '1 hour'
-- AND ST_DWithin(
-- geom::geography,
-- ST_SetSRID(ST_MakePoint(129.0, 35.0), 4326)::geography,
-- 50000 -- 50km 반경
-- )
-- ORDER BY mmsi, message_timestamp DESC;
-- 4. LineString 항적 생성
-- SELECT mmsi, ST_MakeLine(geom ORDER BY message_timestamp) as track
-- FROM snp_data.ais_target
-- WHERE mmsi = 123456789
-- AND message_timestamp BETWEEN '2025-12-01 00:00:00+00' AND '2025-12-01 01:00:00+00'
-- GROUP BY mmsi;
-- 5. 다음 3개월 파티션 미리 생성
-- SELECT * FROM snp_data.create_future_ais_target_partitions(3);
-- 6. 특정 월 파티션 생성
-- SELECT snp_data.create_ais_target_partition('2026-03-01');
-- 7. 3개월 이전 파티션 정리
-- SELECT * FROM snp_data.cleanup_old_ais_target_partitions(3);
-- 8. 파티션별 통계 조회
-- SELECT * FROM snp_data.ais_target_partition_stats();
-- 9. 전체 통계 조회
-- SELECT * FROM snp_data.ais_target_stats();
-- ============================================
-- Job Schedule 등록
-- ============================================
-- 1. aisTargetImportJob: 매 분 15초에 실행
INSERT INTO public.job_schedule (job_name, cron_expression, description, active, created_at, updated_at, created_by, updated_by)
VALUES (
'aisTargetImportJob',
'15 * * * * ?',
'AIS Target 위치 정보 수집 (S&P Global API) - 매 분 15초 실행',
true,
NOW(),
NOW(),
'SYSTEM',
'SYSTEM'
) ON CONFLICT (job_name) DO UPDATE SET
cron_expression = EXCLUDED.cron_expression,
description = EXCLUDED.description,
active = EXCLUDED.active,
updated_at = NOW();
-- 2. partitionManagerJob: 매일 00:10에 실행
-- Daily 파티션: 매일 생성, Monthly 파티션: 말일에만 생성 (Job 내부에서 분기)
INSERT INTO public.job_schedule (job_name, cron_expression, description, active, created_at, updated_at, created_by, updated_by)
VALUES (
'partitionManagerJob',
'0 10 0 * * ?',
'파티션 관리 - 매일 00:10 실행 (Daily: 매일, Monthly: 말일만)',
true,
NOW(),
NOW(),
'SYSTEM',
'SYSTEM'
) ON CONFLICT (job_name) DO UPDATE SET
cron_expression = EXCLUDED.cron_expression,
description = EXCLUDED.description,
active = EXCLUDED.active,
updated_at = NOW();