feat: add AIS target Kafka producer pipeline

This commit is contained in:
LHT 2026-02-13 03:10:38 +09:00
부모 07368f18cb
커밋 178ac506bf
8개의 변경된 파일393개의 추가작업 그리고 4개의 파일을 삭제

파일 보기

@ -111,6 +111,12 @@
<version>2.3.0</version> <version>2.3.0</version>
</dependency> </dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Caffeine Cache --> <!-- Caffeine Cache -->
<dependency> <dependency>
<groupId>com.github.ben-manes.caffeine</groupId> <groupId>com.github.ben-manes.caffeine</groupId>

파일 보기

@ -4,6 +4,7 @@ import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager; import com.snp.batch.jobs.aistarget.cache.AisTargetCacheManager;
import com.snp.batch.jobs.aistarget.classifier.AisClassTypeClassifier; import com.snp.batch.jobs.aistarget.classifier.AisClassTypeClassifier;
import com.snp.batch.jobs.aistarget.kafka.AisTargetKafkaProducer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -15,10 +16,11 @@ import java.util.List;
* 동작: * 동작:
* 1. ClassType 분류 (Core20 캐시 기반 A/B 분류) * 1. ClassType 분류 (Core20 캐시 기반 A/B 분류)
* 2. 캐시에 최신 위치 정보 업데이트 (classType, core20Mmsi 포함) * 2. 캐시에 최신 위치 정보 업데이트 (classType, core20Mmsi 포함)
* 3. Kafka 토픽으로 AIS Target 정보 전송 (서브청크 분할)
* *
* 참고: * 참고:
* - DB 저장은 별도 Job(aisTargetDbSyncJob)에서 15분 주기로 수행 * - DB 저장은 별도 Job(aisTargetDbSyncJob)에서 15분 주기로 수행
* - Writer는 캐시 업데이트만 담당 * - Kafka 전송 실패는 기본적으로 로그만 남기고 다음 처리 계속
*/ */
@Slf4j @Slf4j
@Component @Component
@ -26,13 +28,16 @@ public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
private final AisTargetCacheManager cacheManager; private final AisTargetCacheManager cacheManager;
private final AisClassTypeClassifier classTypeClassifier; private final AisClassTypeClassifier classTypeClassifier;
private final AisTargetKafkaProducer kafkaProducer;
public AisTargetDataWriter( public AisTargetDataWriter(
AisTargetCacheManager cacheManager, AisTargetCacheManager cacheManager,
AisClassTypeClassifier classTypeClassifier) { AisClassTypeClassifier classTypeClassifier,
AisTargetKafkaProducer kafkaProducer) {
super("AisTarget"); super("AisTarget");
this.cacheManager = cacheManager; this.cacheManager = cacheManager;
this.classTypeClassifier = classTypeClassifier; this.classTypeClassifier = classTypeClassifier;
this.kafkaProducer = kafkaProducer;
} }
@Override @Override
@ -48,5 +53,19 @@ public class AisTargetDataWriter extends BaseWriter<AisTargetEntity> {
log.debug("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})", log.debug("AIS Target 캐시 업데이트 완료: {} 건 (캐시 크기: {})",
items.size(), cacheManager.size()); items.size(), cacheManager.size());
// 3. Kafka 전송 (설정 enabled=true 경우)
if (!kafkaProducer.isEnabled()) {
log.debug("AIS Kafka 전송 비활성화 - topic 전송 스킵");
return;
}
AisTargetKafkaProducer.PublishSummary summary = kafkaProducer.publish(items);
log.info("AIS Kafka 전송 완료 - topic: {}, 요청: {}, 성공: {}, 실패: {}, 스킵: {}",
kafkaProducer.getTopic(),
summary.getRequestedCount(),
summary.getSuccessCount(),
summary.getFailedCount(),
summary.getSkippedCount());
} }
} }

파일 보기

@ -0,0 +1,55 @@
package com.snp.batch.jobs.aistarget.kafka;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
/**
* AIS Target Kafka 메시지 스키마
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class AisTargetKafkaMessage {
/**
* 이벤트 고유 식별자
* - 형식: {mmsi}_{messageTimestamp}
*/
private String eventId;
/**
* Kafka key와 동일한 선박 식별자
*/
private String key;
/**
* Kafka 발행 시각(UTC)
*/
private OffsetDateTime publishedAt;
/**
* AIS 원본/가공 데이터 전체 필드
*/
private AisTargetEntity payload;
public static AisTargetKafkaMessage from(AisTargetEntity entity) {
String key = entity.getMmsi() != null ? String.valueOf(entity.getMmsi()) : null;
String messageTs = entity.getMessageTimestamp() != null ? entity.getMessageTimestamp().toString() : "null";
return AisTargetKafkaMessage.builder()
.eventId(key + "_" + messageTs)
.key(key)
.publishedAt(OffsetDateTime.now(ZoneOffset.UTC))
.payload(entity)
.build();
}
}

파일 보기

@ -0,0 +1,207 @@
package com.snp.batch.jobs.aistarget.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
/**
* AIS Target Kafka Producer
*
* 정책:
* - key: MMSI
* - value: AisTargetKafkaMessage(JSON)
* - 실패 기본적으로 로그만 남기고 계속 진행 (failOnSendError=false)
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AisTargetKafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final AisTargetKafkaProperties kafkaProperties;
public boolean isEnabled() {
return kafkaProperties.isEnabled();
}
public String getTopic() {
return kafkaProperties.getTopic();
}
/**
* 수집 청크 데이터를 Kafka 전송용 서브청크로 분할해 전송
*/
public PublishSummary publish(List<AisTargetEntity> entities) {
if (!isEnabled()) {
return PublishSummary.disabled();
}
if (entities == null || entities.isEmpty()) {
return PublishSummary.empty();
}
int subChunkSize = Math.max(1, kafkaProperties.getSendChunkSize());
PublishSummary totalSummary = PublishSummary.empty();
for (int from = 0; from < entities.size(); from += subChunkSize) {
int to = Math.min(from + subChunkSize, entities.size());
List<AisTargetEntity> subChunk = entities.subList(from, to);
PublishSummary chunkSummary = publishSubChunk(subChunk);
totalSummary.merge(chunkSummary);
log.info("AIS Kafka 서브청크 전송 완료 - topic: {}, 범위: {}~{}, 요청: {}, 성공: {}, 실패: {}, 스킵: {}",
getTopic(), from, to - 1,
chunkSummary.getRequestedCount(),
chunkSummary.getSuccessCount(),
chunkSummary.getFailedCount(),
chunkSummary.getSkippedCount());
}
if (kafkaProperties.isFailOnSendError() && totalSummary.getFailedCount() > 0) {
throw new IllegalStateException("AIS Kafka 전송 실패 건수: " + totalSummary.getFailedCount());
}
return totalSummary;
}
private PublishSummary publishSubChunk(List<AisTargetEntity> subChunk) {
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failedCount = new AtomicInteger(0);
AtomicInteger skippedCount = new AtomicInteger(0);
AtomicInteger sampledErrorLogs = new AtomicInteger(0);
List<CompletableFuture<Void>> futures = new ArrayList<>(subChunk.size());
for (AisTargetEntity entity : subChunk) {
if (!isValid(entity)) {
skippedCount.incrementAndGet();
continue;
}
try {
String key = String.valueOf(entity.getMmsi());
String payload = objectMapper.writeValueAsString(AisTargetKafkaMessage.from(entity));
CompletableFuture<Void> trackedFuture = kafkaTemplate.send(getTopic(), key, payload)
.handle((result, ex) -> {
if (ex != null) {
failedCount.incrementAndGet();
logSendError(sampledErrorLogs,
"AIS Kafka 전송 실패 - topic: " + getTopic()
+ ", key: " + key
+ ", messageTimestamp: " + entity.getMessageTimestamp()
+ ", error: " + ex.getMessage());
} else {
successCount.incrementAndGet();
}
return null;
});
futures.add(trackedFuture);
} catch (JsonProcessingException e) {
failedCount.incrementAndGet();
logSendError(sampledErrorLogs,
"AIS Kafka 메시지 직렬화 실패 - mmsi: " + entity.getMmsi()
+ ", messageTimestamp: " + entity.getMessageTimestamp()
+ ", error: " + e.getMessage());
} catch (Exception e) {
failedCount.incrementAndGet();
logSendError(sampledErrorLogs,
"AIS Kafka 전송 요청 실패 - mmsi: " + entity.getMmsi()
+ ", messageTimestamp: " + entity.getMessageTimestamp()
+ ", error: " + e.getMessage());
}
}
if (!futures.isEmpty()) {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
kafkaTemplate.flush();
}
return PublishSummary.of(
false,
subChunk.size(),
successCount.get(),
failedCount.get(),
skippedCount.get()
);
}
private boolean isValid(AisTargetEntity entity) {
return entity != null
&& entity.getMmsi() != null
&& entity.getMessageTimestamp() != null;
}
private void logSendError(AtomicInteger sampledErrorLogs, String message) {
int current = sampledErrorLogs.incrementAndGet();
if (current <= 5) {
log.error(message);
return;
}
if (current == 6) {
log.error("AIS Kafka 전송 오류 로그가 많아 이후 상세 로그는 생략합니다.");
}
}
@Getter
public static class PublishSummary {
private final boolean disabled;
private int requestedCount;
private int successCount;
private int failedCount;
private int skippedCount;
private PublishSummary(
boolean disabled,
int requestedCount,
int successCount,
int failedCount,
int skippedCount
) {
this.disabled = disabled;
this.requestedCount = requestedCount;
this.successCount = successCount;
this.failedCount = failedCount;
this.skippedCount = skippedCount;
}
public static PublishSummary disabled() {
return of(true, 0, 0, 0, 0);
}
public static PublishSummary empty() {
return of(false, 0, 0, 0, 0);
}
public static PublishSummary of(
boolean disabled,
int requestedCount,
int successCount,
int failedCount,
int skippedCount
) {
return new PublishSummary(disabled, requestedCount, successCount, failedCount, skippedCount);
}
public void merge(PublishSummary other) {
this.requestedCount += other.requestedCount;
this.successCount += other.successCount;
this.failedCount += other.failedCount;
this.skippedCount += other.skippedCount;
}
}
}

파일 보기

@ -0,0 +1,36 @@
package com.snp.batch.jobs.aistarget.kafka;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* AIS Target Kafka 전송 설정
*/
@Getter
@Setter
@ConfigurationProperties(prefix = "app.batch.ais-target.kafka")
public class AisTargetKafkaProperties {
/**
* Kafka 전송 활성화 여부
*/
private boolean enabled = true;
/**
* 전송 대상 토픽
*/
private String topic = "tp_SNP_AIS_Signal";
/**
* Kafka 전송 서브청크 크기
* 수집 청크(: 5만) 별도로 전송 배치를 분할한다.
*/
private int sendChunkSize = 5000;
/**
* 전송 실패 Step 실패 여부
* false면 실패 로그만 남기고 다음 처리를 계속한다.
*/
private boolean failOnSendError = false;
}

파일 보기

@ -53,6 +53,23 @@ spring:
org.quartz.jobStore.isClustered: false org.quartz.jobStore.isClustered: false
org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.misfireThreshold: 60000
# Kafka Configuration (DEV)
kafka:
bootstrap-servers: localhost:9092 # TODO: DEV Kafka Broker IP/PORT 설정
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
compression.type: snappy
linger.ms: 20
batch.size: 65536
max.block.ms: 3000
request.timeout.ms: 5000
delivery.timeout.ms: 10000
# Server Configuration # Server Configuration
server: server:
port: 8041 port: 8041
@ -99,6 +116,11 @@ app:
chunk-size: 50000 # 배치 청크 크기 chunk-size: 50000 # 배치 청크 크기
schedule: schedule:
cron: "15 * * * * ?" # 매 분 15초 실행 cron: "15 * * * * ?" # 매 분 15초 실행
kafka:
enabled: true
topic: tp_SNP_AIS_Signal
send-chunk-size: 5000
fail-on-send-error: false
# AIS Target 캐시 설정 # AIS Target 캐시 설정
ais-target-cache: ais-target-cache:
ttl-minutes: 120 # 캐시 TTL (분) - 2시간 ttl-minutes: 120 # 캐시 TTL (분) - 2시간
@ -132,4 +154,4 @@ app:
# 개별 테이블 보관기간 설정 (옵션) # 개별 테이블 보관기간 설정 (옵션)
custom: custom:
# - table-name: ais_target # - table-name: ais_target
# retention-days: 30 # ais_target만 30일 보관 # retention-days: 30 # ais_target만 30일 보관

파일 보기

@ -53,6 +53,23 @@ spring:
org.quartz.jobStore.isClustered: false org.quartz.jobStore.isClustered: false
org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.misfireThreshold: 60000
# Kafka Configuration (PROD)
kafka:
bootstrap-servers: localhost:9092 # TODO: PROD Kafka Broker IP/PORT 설정
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
compression.type: snappy
linger.ms: 20
batch.size: 65536
max.block.ms: 3000
request.timeout.ms: 5000
delivery.timeout.ms: 10000
# Server Configuration # Server Configuration
server: server:
port: 8041 port: 8041
@ -101,6 +118,11 @@ app:
chunk-size: 50000 # 배치 청크 크기 chunk-size: 50000 # 배치 청크 크기
schedule: schedule:
cron: "15 * * * * ?" # 매 분 15초 실행 cron: "15 * * * * ?" # 매 분 15초 실행
kafka:
enabled: true
topic: tp_SNP_AIS_Signal
send-chunk-size: 5000
fail-on-send-error: false
# AIS Target 캐시 설정 # AIS Target 캐시 설정
ais-target-cache: ais-target-cache:
ttl-minutes: 120 # 캐시 TTL (분) - 2시간 ttl-minutes: 120 # 캐시 TTL (분) - 2시간
@ -134,4 +156,4 @@ app:
# 개별 테이블 보관기간 설정 (옵션) # 개별 테이블 보관기간 설정 (옵션)
custom: custom:
# - table-name: ais_target # - table-name: ais_target
# retention-days: 30 # ais_target만 30일 보관 # retention-days: 30 # ais_target만 30일 보관

파일 보기

@ -53,6 +53,23 @@ spring:
org.quartz.jobStore.isClustered: false org.quartz.jobStore.isClustered: false
org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.misfireThreshold: 60000
# Kafka Configuration
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
compression.type: snappy
linger.ms: 20
batch.size: 65536
max.block.ms: 3000
request.timeout.ms: 5000
delivery.timeout.ms: 10000
# Server Configuration # Server Configuration
server: server:
port: 8041 port: 8041
@ -151,6 +168,11 @@ app:
chunk-size: 50000 # 배치 청크 크기 chunk-size: 50000 # 배치 청크 크기
schedule: schedule:
cron: "15 * * * * ?" # 매 분 15초 실행 cron: "15 * * * * ?" # 매 분 15초 실행
kafka:
enabled: true
topic: tp_SNP_AIS_Signal
send-chunk-size: 5000
fail-on-send-error: false
# AIS Target DB Sync 배치 설정 (캐시 → DB 저장) # AIS Target DB Sync 배치 설정 (캐시 → DB 저장)
ais-target-db-sync: ais-target-db-sync: