diff --git a/scripts/collect_signalkind_candidates.sh b/scripts/collect_signalkind_candidates.sh new file mode 100755 index 0000000..8c582b5 --- /dev/null +++ b/scripts/collect_signalkind_candidates.sh @@ -0,0 +1,431 @@ +#!/usr/bin/env bash + +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" + +PROFILE="local" +DURATION_SEC=120 +MAX_MESSAGES=200000 +GROUP_ID="signalkind-collector-v1" +OFFSET_RESET="latest" +OUTPUT_DIR="$ROOT_DIR/docs/signalkind" + +usage() { + cat <<'USAGE' +Usage: collect_signalkind_candidates.sh [options] + +Options: + -p, --profile Spring profile (default: local) + -d, --duration-sec Consume duration seconds (default: 120) + -m, --max-messages Max messages per run (default: 200000) + -g, --group-id Kafka consumer group id (default: signalkind-collector-v1) + -r, --offset-reset auto.offset.reset: earliest|latest (default: latest) + -o, --output-dir Output directory (default: docs/signalkind) + -h, --help Show this help + +Examples: + scripts/collect_signalkind_candidates.sh -p local -d 600 + scripts/collect_signalkind_candidates.sh -p local -r earliest -m 50000 +USAGE +} + +while [[ $# -gt 0 ]]; do + case "$1" in + -p|--profile) + PROFILE="$2" + shift 2 + ;; + -d|--duration-sec) + DURATION_SEC="$2" + shift 2 + ;; + -m|--max-messages) + MAX_MESSAGES="$2" + shift 2 + ;; + -g|--group-id) + GROUP_ID="$2" + shift 2 + ;; + -r|--offset-reset) + OFFSET_RESET="$2" + shift 2 + ;; + -o|--output-dir) + OUTPUT_DIR="$2" + shift 2 + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown option: $1" + usage + exit 1 + ;; + esac +done + +if [[ "$OFFSET_RESET" != "earliest" && "$OFFSET_RESET" != "latest" ]]; then + echo "Invalid --offset-reset value: $OFFSET_RESET" + exit 1 +fi + +CONFIG_FILE="$ROOT_DIR/src/main/resources/application-${PROFILE}.yml" +if [[ ! -f "$CONFIG_FILE" ]]; then + echo "Profile config not found: $CONFIG_FILE" + exit 1 +fi + +BOOTSTRAP_SERVERS="$(awk '/^[[:space:]]*bootstrap-servers:/{print $2; exit}' "$CONFIG_FILE")" +TOPIC_NAME="$(awk '/^[[:space:]]*topic:/{print $2; exit}' "$CONFIG_FILE")" + +if [[ -z "${BOOTSTRAP_SERVERS:-}" || -z "${TOPIC_NAME:-}" ]]; then + echo "Failed to read bootstrap/topic from $CONFIG_FILE" + exit 1 +fi + +mkdir -p "$OUTPUT_DIR" + +CP_FILE="/tmp/snp-signalkind-cp.txt" +mvn -q -DskipTests dependency:build-classpath -Dmdep.outputFile="$CP_FILE" +CLASSPATH="$(cat "$CP_FILE")" + +JAVA_FILE="/tmp/SignalkindCollector.java" +cat >"$JAVA_FILE" <<'JAVA' +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.*; +import java.util.stream.Collectors; + +public class SignalkindCollector { + + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Aggregate { + public long totalConsumed = 0; + public long totalParseError = 0; + public long totalMissingVesselType = 0; + public Map vesselTypeStats = new HashMap<>(); + public Map vesselTypeExtraStats = new HashMap<>(); + } + + @JsonIgnoreProperties(ignoreUnknown = true) + public static class KindStat { + public long count = 0; + public long lastTimestampMs = 0; + public LinkedHashSet sampleMmsi = new LinkedHashSet<>(); + } + + public static class DecisionFields { + String proposedCode = ""; + String status = "PENDING"; + String notes = ""; + } + + public static void main(String[] args) throws Exception { + if (args.length < 7) { + throw new IllegalArgumentException( + "Usage: SignalkindCollector "); + } + + String bootstrap = args[0]; + String topic = args[1]; + Path outputDir = Path.of(args[2]); + int durationSec = Integer.parseInt(args[3]); + int maxMessages = Integer.parseInt(args[4]); + String groupId = args[5]; + String offsetReset = args[6]; + + Files.createDirectories(outputDir); + + ObjectMapper mapper = new ObjectMapper(); + Path aggregateJsonPath = outputDir.resolve("aggregate_store.json"); + Path vesselTypeTsvPath = outputDir.resolve("vesseltype_stats.tsv"); + Path vesselTypeExtraTsvPath = outputDir.resolve("vesseltype_extra_stats.tsv"); + Path mappingDraftTsvPath = outputDir.resolve("signalkind_mapping_draft.tsv"); + Path runSummaryPath = outputDir.resolve("last_run_summary.txt"); + + Aggregate aggregate = loadAggregate(mapper, aggregateJsonPath); + Map existingDraft = loadExistingDraft(mappingDraftTsvPath); + + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000"); + props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); + + long runStart = System.currentTimeMillis(); + long runDeadline = runStart + (durationSec * 1000L); + long runConsumed = 0; + long runParseError = 0; + long runMissingVesselType = 0; + + try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { + consumer.subscribe(Collections.singleton(topic)); + + while (System.currentTimeMillis() < runDeadline && runConsumed < maxMessages) { + ConsumerRecords records = consumer.poll(java.time.Duration.ofMillis(1000)); + if (records.isEmpty()) { + continue; + } + + for (ConsumerRecord record : records) { + if (runConsumed >= maxMessages) { + break; + } + runConsumed++; + + try { + JsonNode root = mapper.readTree(record.value()); + JsonNode payload = root.path("payload"); + + String vesselType = normalize(payload.path("vesselType").asText(null), "N/A"); + String extraInfo = normalize(payload.path("extraInfo").asText(null), "N/A"); + String mmsi = normalize(payload.path("mmsi").asText(null), ""); + + if ("N/A".equals(vesselType)) { + runMissingVesselType++; + } + + updateStat(aggregate.vesselTypeStats, vesselType, record.timestamp(), mmsi); + updateStat(aggregate.vesselTypeExtraStats, vesselType + "\u001F" + extraInfo, record.timestamp(), mmsi); + } catch (Exception e) { + runParseError++; + } + } + + consumer.commitSync(); + } + } + + aggregate.totalConsumed += runConsumed; + aggregate.totalParseError += runParseError; + aggregate.totalMissingVesselType += runMissingVesselType; + + mapper.writerWithDefaultPrettyPrinter().writeValue(aggregateJsonPath.toFile(), aggregate); + + writeVesselTypeStats(aggregate, vesselTypeTsvPath); + writeVesselTypeExtraStats(aggregate, vesselTypeExtraTsvPath); + writeMappingDraft(aggregate, existingDraft, mappingDraftTsvPath); + writeRunSummary(runSummaryPath, bootstrap, topic, groupId, offsetReset, + runStart, System.currentTimeMillis(), runConsumed, runParseError, runMissingVesselType, aggregate.totalConsumed); + + System.out.println("OUTPUT_DIR=" + outputDir); + System.out.println("RUN_CONSUMED=" + runConsumed); + System.out.println("RUN_PARSE_ERROR=" + runParseError); + System.out.println("RUN_MISSING_VESSEL_TYPE=" + runMissingVesselType); + System.out.println("TOTAL_CONSUMED=" + aggregate.totalConsumed); + System.out.println("TOTAL_VESSEL_TYPES=" + aggregate.vesselTypeStats.size()); + System.out.println("TOTAL_VESSEL_TYPE_EXTRA=" + aggregate.vesselTypeExtraStats.size()); + } + + private static Aggregate loadAggregate(ObjectMapper mapper, Path aggregateJsonPath) { + try { + if (Files.exists(aggregateJsonPath)) { + return mapper.readValue(aggregateJsonPath.toFile(), Aggregate.class); + } + } catch (Exception ignored) { + } + return new Aggregate(); + } + + private static Map loadExistingDraft(Path mappingDraftTsvPath) { + Map map = new HashMap<>(); + if (!Files.exists(mappingDraftTsvPath)) { + return map; + } + + try { + List lines = Files.readAllLines(mappingDraftTsvPath, StandardCharsets.UTF_8); + boolean first = true; + for (String line : lines) { + if (first) { + first = false; + continue; + } + String[] arr = line.split("\t", -1); + if (arr.length < 8) { + continue; + } + String key = arr[0] + "\u001F" + arr[1]; + DecisionFields fields = new DecisionFields(); + fields.proposedCode = arr[5]; + fields.status = arr[6].isBlank() ? "PENDING" : arr[6]; + fields.notes = arr[7]; + map.put(key, fields); + } + } catch (Exception ignored) { + } + return map; + } + + private static void updateStat(Map map, String key, long ts, String mmsi) { + KindStat stat = map.computeIfAbsent(key, k -> new KindStat()); + stat.count += 1; + stat.lastTimestampMs = Math.max(stat.lastTimestampMs, ts); + if (mmsi != null && !mmsi.isBlank() && stat.sampleMmsi.size() < 5) { + stat.sampleMmsi.add(mmsi); + } + } + + private static String normalize(String value, String defaultValue) { + if (value == null) { + return defaultValue; + } + String v = value.trim(); + if (v.isEmpty() || "null".equalsIgnoreCase(v)) { + return defaultValue; + } + return v; + } + + private static void writeVesselTypeStats(Aggregate aggregate, Path path) throws Exception { + List> rows = aggregate.vesselTypeStats.entrySet() + .stream() + .sorted((a, b) -> Long.compare(b.getValue().count, a.getValue().count)) + .toList(); + + try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { + w.write("vesselType\tcount\tratio\tlastSeenUtc\tsampleMmsi\n"); + long total = Math.max(1L, aggregate.totalConsumed); + for (Map.Entry row : rows) { + KindStat s = row.getValue(); + double ratio = (s.count * 100.0) / total; + w.write(row.getKey() + "\t" + + s.count + "\t" + + String.format(Locale.US, "%.4f", ratio) + "\t" + + formatTs(s.lastTimestampMs) + "\t" + + String.join(",", s.sampleMmsi) + "\n"); + } + } + } + + private static void writeVesselTypeExtraStats(Aggregate aggregate, Path path) throws Exception { + List> rows = aggregate.vesselTypeExtraStats.entrySet() + .stream() + .sorted((a, b) -> Long.compare(b.getValue().count, a.getValue().count)) + .toList(); + + try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { + w.write("vesselType\textraInfo\tcount\tratio\tlastSeenUtc\tsampleMmsi\n"); + long total = Math.max(1L, aggregate.totalConsumed); + for (Map.Entry row : rows) { + String[] keys = row.getKey().split("\u001F", 2); + String vesselType = keys.length > 0 ? keys[0] : "N/A"; + String extraInfo = keys.length > 1 ? keys[1] : "N/A"; + KindStat s = row.getValue(); + double ratio = (s.count * 100.0) / total; + w.write(vesselType + "\t" + + extraInfo + "\t" + + s.count + "\t" + + String.format(Locale.US, "%.4f", ratio) + "\t" + + formatTs(s.lastTimestampMs) + "\t" + + String.join(",", s.sampleMmsi) + "\n"); + } + } + } + + private static void writeMappingDraft(Aggregate aggregate, Map existing, Path path) throws Exception { + List> rows = aggregate.vesselTypeExtraStats.entrySet() + .stream() + .sorted((a, b) -> Long.compare(b.getValue().count, a.getValue().count)) + .toList(); + + try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { + w.write("vesselType\textraInfo\tcount\tlastSeenUtc\tsampleMmsi\tproposedSignalKindCode\tdecisionStatus\tnotes\n"); + for (Map.Entry row : rows) { + String key = row.getKey(); + String[] keys = key.split("\u001F", 2); + String vesselType = keys.length > 0 ? keys[0] : "N/A"; + String extraInfo = keys.length > 1 ? keys[1] : "N/A"; + KindStat s = row.getValue(); + + DecisionFields d = existing.getOrDefault(key, new DecisionFields()); + w.write(vesselType + "\t" + + extraInfo + "\t" + + s.count + "\t" + + formatTs(s.lastTimestampMs) + "\t" + + String.join(",", s.sampleMmsi) + "\t" + + d.proposedCode + "\t" + + d.status + "\t" + + d.notes + "\n"); + } + } + } + + private static void writeRunSummary( + Path path, + String bootstrap, + String topic, + String groupId, + String offsetReset, + long startMs, + long endMs, + long runConsumed, + long runParseError, + long runMissingVesselType, + long totalConsumed + ) throws Exception { + try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { + w.write("bootstrap=" + bootstrap + "\n"); + w.write("topic=" + topic + "\n"); + w.write("groupId=" + groupId + "\n"); + w.write("offsetReset=" + offsetReset + "\n"); + w.write("runStartUtc=" + Instant.ofEpochMilli(startMs) + "\n"); + w.write("runEndUtc=" + Instant.ofEpochMilli(endMs) + "\n"); + w.write("runDurationSec=" + ((endMs - startMs) / 1000) + "\n"); + w.write("runConsumed=" + runConsumed + "\n"); + w.write("runParseError=" + runParseError + "\n"); + w.write("runMissingVesselType=" + runMissingVesselType + "\n"); + w.write("totalConsumed=" + totalConsumed + "\n"); + } + } + + private static String formatTs(long ts) { + if (ts <= 0) { + return ""; + } + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC).toString(); + } +} +JAVA + +javac -cp "$CLASSPATH" "$JAVA_FILE" + +java -cp "$CLASSPATH:/tmp" SignalkindCollector \ + "$BOOTSTRAP_SERVERS" \ + "$TOPIC_NAME" \ + "$OUTPUT_DIR" \ + "$DURATION_SEC" \ + "$MAX_MESSAGES" \ + "$GROUP_ID" \ + "$OFFSET_RESET" + +echo "[DONE] Output files:" +echo " - $OUTPUT_DIR/aggregate_store.json" +echo " - $OUTPUT_DIR/vesseltype_stats.tsv" +echo " - $OUTPUT_DIR/vesseltype_extra_stats.tsv" +echo " - $OUTPUT_DIR/signalkind_mapping_draft.tsv" +echo " - $OUTPUT_DIR/last_run_summary.txt" diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 07911f7..df9faad 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -118,7 +118,7 @@ app: cron: "15 * * * * ?" # 매 분 15초 실행 kafka: enabled: true - topic: tp_SNP_AIS_Signal + topic: tp_Global_AIS_Signal send-chunk-size: 5000 fail-on-send-error: false # AIS Target 캐시 설정 diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 7852efe..7a0a428 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -120,7 +120,7 @@ app: cron: "15 * * * * ?" # 매 분 15초 실행 kafka: enabled: true - topic: tp_SNP_AIS_Signal + topic: tp_Global_AIS_Signal send-chunk-size: 5000 fail-on-send-error: false # AIS Target 캐시 설정 diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1ad54e6..62d966d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -170,7 +170,7 @@ app: cron: "15 * * * * ?" # 매 분 15초 실행 kafka: enabled: true - topic: tp_SNP_AIS_Signal + topic: tp_Global_AIS_Signal send-chunk-size: 5000 fail-on-send-error: false