diff --git a/pom.xml b/pom.xml index d1d5aa1..c47e084 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,12 @@ 2.3.0 + + + org.springframework.kafka + spring-kafka + + com.github.ben-manes.caffeine diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java index 60dd073..2c6a0da 100644 --- a/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/writer/AisTargetDataWriter.java @@ -4,6 +4,7 @@ import com.snp.batch.common.batch.writer.BaseWriter; import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager; import com.snp.batch.jobs.aistarget.classifier.AisClassTypeClassifier; +import com.snp.batch.jobs.aistarget.kafka.AisTargetKafkaProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -15,10 +16,11 @@ import java.util.List; * 동작: * 1. ClassType 분류 (Core20 캐시 기반 A/B 분류) * 2. 캐시에 최신 위치 정보 업데이트 (classType, core20Mmsi 포함) + * 3. Kafka 토픽으로 AIS Target 정보 전송 (서브청크 분할) * * 참고: * - DB 저장은 별도 Job(aisTargetDbSyncJob)에서 15분 주기로 수행 - * - 이 Writer는 캐시 업데이트만 담당 + * - Kafka 전송 실패는 기본적으로 로그만 남기고 다음 처리 계속 */ @Slf4j @Component @@ -26,13 +28,16 @@ public class AisTargetDataWriter extends BaseWriter { private final AisTargetCacheManager cacheManager; private final AisClassTypeClassifier classTypeClassifier; + private final AisTargetKafkaProducer kafkaProducer; public AisTargetDataWriter( AisTargetCacheManager cacheManager, - AisClassTypeClassifier classTypeClassifier) { + AisClassTypeClassifier classTypeClassifier, + AisTargetKafkaProducer kafkaProducer) { super("AisTarget"); this.cacheManager = cacheManager; this.classTypeClassifier = classTypeClassifier; + this.kafkaProducer = kafkaProducer; } @Override @@ -48,5 +53,19 @@ public class AisTargetDataWriter extends BaseWriter { log.debug("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})", items.size(), cacheManager.size()); + + // 3. Kafka 전송 (설정 enabled=true 인 경우) + if (!kafkaProducer.isEnabled()) { + log.debug("AIS Kafka 전송 비활성화 - topic 전송 스킵"); + return; + } + + AisTargetKafkaProducer.PublishSummary summary = kafkaProducer.publish(items); + log.info("AIS Kafka 전송 완료 - topic: {}, 요청: {}, 성공: {}, 실패: {}, 스킵: {}", + kafkaProducer.getTopic(), + summary.getRequestedCount(), + summary.getSuccessCount(), + summary.getFailedCount(), + summary.getSkippedCount()); } } diff --git a/src/main/java/com/snp/batch/jobs/aistarget/kafka/AisTargetKafkaMessage.java b/src/main/java/com/snp/batch/jobs/aistarget/kafka/AisTargetKafkaMessage.java new file mode 100644 index 0000000..fa064a7 --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/kafka/AisTargetKafkaMessage.java @@ -0,0 +1,55 @@ +package com.snp.batch.jobs.aistarget.kafka; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.OffsetDateTime; +import java.time.ZoneOffset; + +/** + * AIS Target Kafka 메시지 스키마 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class AisTargetKafkaMessage { + + /** + * 이벤트 고유 식별자 + * - 형식: {mmsi}_{messageTimestamp} + */ + private String eventId; + + /** + * Kafka key와 동일한 선박 식별자 + */ + private String key; + + /** + * Kafka 발행 시각(UTC) + */ + private OffsetDateTime publishedAt; + + /** + * AIS 원본/가공 데이터 전체 필드 + */ + private AisTargetEntity payload; + + public static AisTargetKafkaMessage from(AisTargetEntity entity) { + String key = entity.getMmsi() != null ? String.valueOf(entity.getMmsi()) : null; + String messageTs = entity.getMessageTimestamp() != null ? entity.getMessageTimestamp().toString() : "null"; + + return AisTargetKafkaMessage.builder() + .eventId(key + "_" + messageTs) + .key(key) + .publishedAt(OffsetDateTime.now(ZoneOffset.UTC)) + .payload(entity) + .build(); + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/kafka/AisTargetKafkaProducer.java b/src/main/java/com/snp/batch/jobs/aistarget/kafka/AisTargetKafkaProducer.java new file mode 100644 index 0000000..fb58d8e --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/kafka/AisTargetKafkaProducer.java @@ -0,0 +1,207 @@ +package com.snp.batch.jobs.aistarget.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * AIS Target Kafka Producer + * + * 정책: + * - key: MMSI + * - value: AisTargetKafkaMessage(JSON) + * - 실패 시 기본적으로 로그만 남기고 계속 진행 (failOnSendError=false) + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class AisTargetKafkaProducer { + + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; + private final AisTargetKafkaProperties kafkaProperties; + + public boolean isEnabled() { + return kafkaProperties.isEnabled(); + } + + public String getTopic() { + return kafkaProperties.getTopic(); + } + + /** + * 수집 청크 데이터를 Kafka 전송용 서브청크로 분할해 전송 + */ + public PublishSummary publish(List entities) { + if (!isEnabled()) { + return PublishSummary.disabled(); + } + + if (entities == null || entities.isEmpty()) { + return PublishSummary.empty(); + } + + int subChunkSize = Math.max(1, kafkaProperties.getSendChunkSize()); + PublishSummary totalSummary = PublishSummary.empty(); + + for (int from = 0; from < entities.size(); from += subChunkSize) { + int to = Math.min(from + subChunkSize, entities.size()); + List subChunk = entities.subList(from, to); + + PublishSummary chunkSummary = publishSubChunk(subChunk); + totalSummary.merge(chunkSummary); + + log.info("AIS Kafka 서브청크 전송 완료 - topic: {}, 범위: {}~{}, 요청: {}, 성공: {}, 실패: {}, 스킵: {}", + getTopic(), from, to - 1, + chunkSummary.getRequestedCount(), + chunkSummary.getSuccessCount(), + chunkSummary.getFailedCount(), + chunkSummary.getSkippedCount()); + } + + if (kafkaProperties.isFailOnSendError() && totalSummary.getFailedCount() > 0) { + throw new IllegalStateException("AIS Kafka 전송 실패 건수: " + totalSummary.getFailedCount()); + } + + return totalSummary; + } + + private PublishSummary publishSubChunk(List subChunk) { + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failedCount = new AtomicInteger(0); + AtomicInteger skippedCount = new AtomicInteger(0); + AtomicInteger sampledErrorLogs = new AtomicInteger(0); + List> futures = new ArrayList<>(subChunk.size()); + + for (AisTargetEntity entity : subChunk) { + if (!isValid(entity)) { + skippedCount.incrementAndGet(); + continue; + } + + try { + String key = String.valueOf(entity.getMmsi()); + String payload = objectMapper.writeValueAsString(AisTargetKafkaMessage.from(entity)); + + CompletableFuture trackedFuture = kafkaTemplate.send(getTopic(), key, payload) + .handle((result, ex) -> { + if (ex != null) { + failedCount.incrementAndGet(); + logSendError(sampledErrorLogs, + "AIS Kafka 전송 실패 - topic: " + getTopic() + + ", key: " + key + + ", messageTimestamp: " + entity.getMessageTimestamp() + + ", error: " + ex.getMessage()); + } else { + successCount.incrementAndGet(); + } + return null; + }); + + futures.add(trackedFuture); + + } catch (JsonProcessingException e) { + failedCount.incrementAndGet(); + logSendError(sampledErrorLogs, + "AIS Kafka 메시지 직렬화 실패 - mmsi: " + entity.getMmsi() + + ", messageTimestamp: " + entity.getMessageTimestamp() + + ", error: " + e.getMessage()); + } catch (Exception e) { + failedCount.incrementAndGet(); + logSendError(sampledErrorLogs, + "AIS Kafka 전송 요청 실패 - mmsi: " + entity.getMmsi() + + ", messageTimestamp: " + entity.getMessageTimestamp() + + ", error: " + e.getMessage()); + } + } + + if (!futures.isEmpty()) { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + kafkaTemplate.flush(); + } + + return PublishSummary.of( + false, + subChunk.size(), + successCount.get(), + failedCount.get(), + skippedCount.get() + ); + } + + private boolean isValid(AisTargetEntity entity) { + return entity != null + && entity.getMmsi() != null + && entity.getMessageTimestamp() != null; + } + + private void logSendError(AtomicInteger sampledErrorLogs, String message) { + int current = sampledErrorLogs.incrementAndGet(); + if (current <= 5) { + log.error(message); + return; + } + + if (current == 6) { + log.error("AIS Kafka 전송 오류 로그가 많아 이후 상세 로그는 생략합니다."); + } + } + + @Getter + public static class PublishSummary { + private final boolean disabled; + private int requestedCount; + private int successCount; + private int failedCount; + private int skippedCount; + + private PublishSummary( + boolean disabled, + int requestedCount, + int successCount, + int failedCount, + int skippedCount + ) { + this.disabled = disabled; + this.requestedCount = requestedCount; + this.successCount = successCount; + this.failedCount = failedCount; + this.skippedCount = skippedCount; + } + + public static PublishSummary disabled() { + return of(true, 0, 0, 0, 0); + } + + public static PublishSummary empty() { + return of(false, 0, 0, 0, 0); + } + + public static PublishSummary of( + boolean disabled, + int requestedCount, + int successCount, + int failedCount, + int skippedCount + ) { + return new PublishSummary(disabled, requestedCount, successCount, failedCount, skippedCount); + } + + public void merge(PublishSummary other) { + this.requestedCount += other.requestedCount; + this.successCount += other.successCount; + this.failedCount += other.failedCount; + this.skippedCount += other.skippedCount; + } + } +} diff --git a/src/main/java/com/snp/batch/jobs/aistarget/kafka/AisTargetKafkaProperties.java b/src/main/java/com/snp/batch/jobs/aistarget/kafka/AisTargetKafkaProperties.java new file mode 100644 index 0000000..a49e0ab --- /dev/null +++ b/src/main/java/com/snp/batch/jobs/aistarget/kafka/AisTargetKafkaProperties.java @@ -0,0 +1,36 @@ +package com.snp.batch.jobs.aistarget.kafka; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * AIS Target Kafka 전송 설정 + */ +@Getter +@Setter +@ConfigurationProperties(prefix = "app.batch.ais-target.kafka") +public class AisTargetKafkaProperties { + + /** + * Kafka 전송 활성화 여부 + */ + private boolean enabled = true; + + /** + * 전송 대상 토픽 + */ + private String topic = "tp_SNP_AIS_Signal"; + + /** + * Kafka 전송 서브청크 크기 + * 수집 청크(예: 5만)와 별도로 전송 배치를 분할한다. + */ + private int sendChunkSize = 5000; + + /** + * 전송 실패 시 Step 실패 여부 + * false면 실패 로그만 남기고 다음 처리를 계속한다. + */ + private boolean failOnSendError = false; +} diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 51fada2..07911f7 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -53,6 +53,23 @@ spring: org.quartz.jobStore.isClustered: false org.quartz.jobStore.misfireThreshold: 60000 + # Kafka Configuration (DEV) + kafka: + bootstrap-servers: localhost:9092 # TODO: DEV Kafka Broker IP/PORT 설정 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + acks: all + retries: 3 + properties: + enable.idempotence: true + compression.type: snappy + linger.ms: 20 + batch.size: 65536 + max.block.ms: 3000 + request.timeout.ms: 5000 + delivery.timeout.ms: 10000 + # Server Configuration server: port: 8041 @@ -99,6 +116,11 @@ app: chunk-size: 50000 # 배치 청크 크기 schedule: cron: "15 * * * * ?" # 매 분 15초 실행 + kafka: + enabled: true + topic: tp_SNP_AIS_Signal + send-chunk-size: 5000 + fail-on-send-error: false # AIS Target 캐시 설정 ais-target-cache: ttl-minutes: 120 # 캐시 TTL (분) - 2시간 @@ -132,4 +154,4 @@ app: # 개별 테이블 보관기간 설정 (옵션) custom: # - table-name: ais_target - # retention-days: 30 # ais_target만 30일 보관 \ No newline at end of file + # retention-days: 30 # ais_target만 30일 보관 diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 1291ece..7852efe 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -53,6 +53,23 @@ spring: org.quartz.jobStore.isClustered: false org.quartz.jobStore.misfireThreshold: 60000 + # Kafka Configuration (PROD) + kafka: + bootstrap-servers: localhost:9092 # TODO: PROD Kafka Broker IP/PORT 설정 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + acks: all + retries: 3 + properties: + enable.idempotence: true + compression.type: snappy + linger.ms: 20 + batch.size: 65536 + max.block.ms: 3000 + request.timeout.ms: 5000 + delivery.timeout.ms: 10000 + # Server Configuration server: port: 8041 @@ -101,6 +118,11 @@ app: chunk-size: 50000 # 배치 청크 크기 schedule: cron: "15 * * * * ?" # 매 분 15초 실행 + kafka: + enabled: true + topic: tp_SNP_AIS_Signal + send-chunk-size: 5000 + fail-on-send-error: false # AIS Target 캐시 설정 ais-target-cache: ttl-minutes: 120 # 캐시 TTL (분) - 2시간 @@ -134,4 +156,4 @@ app: # 개별 테이블 보관기간 설정 (옵션) custom: # - table-name: ais_target - # retention-days: 30 # ais_target만 30일 보관 \ No newline at end of file + # retention-days: 30 # ais_target만 30일 보관 diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 042b784..1ad54e6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -53,6 +53,23 @@ spring: org.quartz.jobStore.isClustered: false org.quartz.jobStore.misfireThreshold: 60000 + # Kafka Configuration + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + acks: all + retries: 3 + properties: + enable.idempotence: true + compression.type: snappy + linger.ms: 20 + batch.size: 65536 + max.block.ms: 3000 + request.timeout.ms: 5000 + delivery.timeout.ms: 10000 + # Server Configuration server: port: 8041 @@ -151,6 +168,11 @@ app: chunk-size: 50000 # 배치 청크 크기 schedule: cron: "15 * * * * ?" # 매 분 15초 실행 + kafka: + enabled: true + topic: tp_SNP_AIS_Signal + send-chunk-size: 5000 + fail-on-send-error: false # AIS Target DB Sync 배치 설정 (캐시 → DB 저장) ais-target-db-sync: