#!/usr/bin/env bash set -euo pipefail ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" PROFILE="local" DURATION_SEC=120 MAX_MESSAGES=200000 GROUP_ID="signalkind-collector-v1" OFFSET_RESET="latest" OUTPUT_DIR="$ROOT_DIR/docs/signalkind" usage() { cat <<'USAGE' Usage: collect_signalkind_candidates.sh [options] Options: -p, --profile Spring profile (default: local) -d, --duration-sec Consume duration seconds (default: 120) -m, --max-messages Max messages per run (default: 200000) -g, --group-id Kafka consumer group id (default: signalkind-collector-v1) -r, --offset-reset auto.offset.reset: earliest|latest (default: latest) -o, --output-dir Output directory (default: docs/signalkind) -h, --help Show this help Examples: scripts/collect_signalkind_candidates.sh -p local -d 600 scripts/collect_signalkind_candidates.sh -p local -r earliest -m 50000 USAGE } while [[ $# -gt 0 ]]; do case "$1" in -p|--profile) PROFILE="$2" shift 2 ;; -d|--duration-sec) DURATION_SEC="$2" shift 2 ;; -m|--max-messages) MAX_MESSAGES="$2" shift 2 ;; -g|--group-id) GROUP_ID="$2" shift 2 ;; -r|--offset-reset) OFFSET_RESET="$2" shift 2 ;; -o|--output-dir) OUTPUT_DIR="$2" shift 2 ;; -h|--help) usage exit 0 ;; *) echo "Unknown option: $1" usage exit 1 ;; esac done if [[ "$OFFSET_RESET" != "earliest" && "$OFFSET_RESET" != "latest" ]]; then echo "Invalid --offset-reset value: $OFFSET_RESET" exit 1 fi CONFIG_FILE="$ROOT_DIR/src/main/resources/application-${PROFILE}.yml" if [[ ! -f "$CONFIG_FILE" ]]; then echo "Profile config not found: $CONFIG_FILE" exit 1 fi BOOTSTRAP_SERVERS="$(awk '/^[[:space:]]*bootstrap-servers:/{print $2; exit}' "$CONFIG_FILE")" TOPIC_NAME="$(awk '/^[[:space:]]*topic:/{print $2; exit}' "$CONFIG_FILE")" if [[ -z "${BOOTSTRAP_SERVERS:-}" || -z "${TOPIC_NAME:-}" ]]; then echo "Failed to read bootstrap/topic from $CONFIG_FILE" exit 1 fi mkdir -p "$OUTPUT_DIR" CP_FILE="/tmp/snp-signalkind-cp.txt" mvn -q -DskipTests dependency:build-classpath -Dmdep.outputFile="$CP_FILE" CLASSPATH="$(cat "$CP_FILE")" JAVA_FILE="/tmp/SignalkindCollector.java" cat >"$JAVA_FILE" <<'JAVA' import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.*; import java.util.stream.Collectors; public class SignalkindCollector { @JsonIgnoreProperties(ignoreUnknown = true) public static class Aggregate { public long totalConsumed = 0; public long totalParseError = 0; public long totalMissingVesselType = 0; public Map vesselTypeStats = new HashMap<>(); public Map vesselTypeExtraStats = new HashMap<>(); } @JsonIgnoreProperties(ignoreUnknown = true) public static class KindStat { public long count = 0; public long lastTimestampMs = 0; public LinkedHashSet sampleMmsi = new LinkedHashSet<>(); } public static class DecisionFields { String proposedCode = ""; String status = "PENDING"; String notes = ""; } public static void main(String[] args) throws Exception { if (args.length < 7) { throw new IllegalArgumentException( "Usage: SignalkindCollector "); } String bootstrap = args[0]; String topic = args[1]; Path outputDir = Path.of(args[2]); int durationSec = Integer.parseInt(args[3]); int maxMessages = Integer.parseInt(args[4]); String groupId = args[5]; String offsetReset = args[6]; Files.createDirectories(outputDir); ObjectMapper mapper = new ObjectMapper(); Path aggregateJsonPath = outputDir.resolve("aggregate_store.json"); Path vesselTypeTsvPath = outputDir.resolve("vesseltype_stats.tsv"); Path vesselTypeExtraTsvPath = outputDir.resolve("vesseltype_extra_stats.tsv"); Path mappingDraftTsvPath = outputDir.resolve("signalkind_mapping_draft.tsv"); Path runSummaryPath = outputDir.resolve("last_run_summary.txt"); Aggregate aggregate = loadAggregate(mapper, aggregateJsonPath); Map existingDraft = loadExistingDraft(mappingDraftTsvPath); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000"); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); long runStart = System.currentTimeMillis(); long runDeadline = runStart + (durationSec * 1000L); long runConsumed = 0; long runParseError = 0; long runMissingVesselType = 0; try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singleton(topic)); while (System.currentTimeMillis() < runDeadline && runConsumed < maxMessages) { ConsumerRecords records = consumer.poll(java.time.Duration.ofMillis(1000)); if (records.isEmpty()) { continue; } for (ConsumerRecord record : records) { if (runConsumed >= maxMessages) { break; } runConsumed++; try { JsonNode root = mapper.readTree(record.value()); JsonNode payload = root.path("payload"); String vesselType = normalize(payload.path("vesselType").asText(null), "N/A"); String extraInfo = normalize(payload.path("extraInfo").asText(null), "N/A"); String mmsi = normalize(payload.path("mmsi").asText(null), ""); if ("N/A".equals(vesselType)) { runMissingVesselType++; } updateStat(aggregate.vesselTypeStats, vesselType, record.timestamp(), mmsi); updateStat(aggregate.vesselTypeExtraStats, vesselType + "\u001F" + extraInfo, record.timestamp(), mmsi); } catch (Exception e) { runParseError++; } } consumer.commitSync(); } } aggregate.totalConsumed += runConsumed; aggregate.totalParseError += runParseError; aggregate.totalMissingVesselType += runMissingVesselType; mapper.writerWithDefaultPrettyPrinter().writeValue(aggregateJsonPath.toFile(), aggregate); writeVesselTypeStats(aggregate, vesselTypeTsvPath); writeVesselTypeExtraStats(aggregate, vesselTypeExtraTsvPath); writeMappingDraft(aggregate, existingDraft, mappingDraftTsvPath); writeRunSummary(runSummaryPath, bootstrap, topic, groupId, offsetReset, runStart, System.currentTimeMillis(), runConsumed, runParseError, runMissingVesselType, aggregate.totalConsumed); System.out.println("OUTPUT_DIR=" + outputDir); System.out.println("RUN_CONSUMED=" + runConsumed); System.out.println("RUN_PARSE_ERROR=" + runParseError); System.out.println("RUN_MISSING_VESSEL_TYPE=" + runMissingVesselType); System.out.println("TOTAL_CONSUMED=" + aggregate.totalConsumed); System.out.println("TOTAL_VESSEL_TYPES=" + aggregate.vesselTypeStats.size()); System.out.println("TOTAL_VESSEL_TYPE_EXTRA=" + aggregate.vesselTypeExtraStats.size()); } private static Aggregate loadAggregate(ObjectMapper mapper, Path aggregateJsonPath) { try { if (Files.exists(aggregateJsonPath)) { return mapper.readValue(aggregateJsonPath.toFile(), Aggregate.class); } } catch (Exception ignored) { } return new Aggregate(); } private static Map loadExistingDraft(Path mappingDraftTsvPath) { Map map = new HashMap<>(); if (!Files.exists(mappingDraftTsvPath)) { return map; } try { List lines = Files.readAllLines(mappingDraftTsvPath, StandardCharsets.UTF_8); boolean first = true; for (String line : lines) { if (first) { first = false; continue; } String[] arr = line.split("\t", -1); if (arr.length < 8) { continue; } String key = arr[0] + "\u001F" + arr[1]; DecisionFields fields = new DecisionFields(); fields.proposedCode = arr[5]; fields.status = arr[6].isBlank() ? "PENDING" : arr[6]; fields.notes = arr[7]; map.put(key, fields); } } catch (Exception ignored) { } return map; } private static void updateStat(Map map, String key, long ts, String mmsi) { KindStat stat = map.computeIfAbsent(key, k -> new KindStat()); stat.count += 1; stat.lastTimestampMs = Math.max(stat.lastTimestampMs, ts); if (mmsi != null && !mmsi.isBlank() && stat.sampleMmsi.size() < 5) { stat.sampleMmsi.add(mmsi); } } private static String normalize(String value, String defaultValue) { if (value == null) { return defaultValue; } String v = value.trim(); if (v.isEmpty() || "null".equalsIgnoreCase(v)) { return defaultValue; } return v; } private static void writeVesselTypeStats(Aggregate aggregate, Path path) throws Exception { List> rows = aggregate.vesselTypeStats.entrySet() .stream() .sorted((a, b) -> Long.compare(b.getValue().count, a.getValue().count)) .toList(); try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { w.write("vesselType\tcount\tratio\tlastSeenUtc\tsampleMmsi\n"); long total = Math.max(1L, aggregate.totalConsumed); for (Map.Entry row : rows) { KindStat s = row.getValue(); double ratio = (s.count * 100.0) / total; w.write(row.getKey() + "\t" + s.count + "\t" + String.format(Locale.US, "%.4f", ratio) + "\t" + formatTs(s.lastTimestampMs) + "\t" + String.join(",", s.sampleMmsi) + "\n"); } } } private static void writeVesselTypeExtraStats(Aggregate aggregate, Path path) throws Exception { List> rows = aggregate.vesselTypeExtraStats.entrySet() .stream() .sorted((a, b) -> Long.compare(b.getValue().count, a.getValue().count)) .toList(); try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { w.write("vesselType\textraInfo\tcount\tratio\tlastSeenUtc\tsampleMmsi\n"); long total = Math.max(1L, aggregate.totalConsumed); for (Map.Entry row : rows) { String[] keys = row.getKey().split("\u001F", 2); String vesselType = keys.length > 0 ? keys[0] : "N/A"; String extraInfo = keys.length > 1 ? keys[1] : "N/A"; KindStat s = row.getValue(); double ratio = (s.count * 100.0) / total; w.write(vesselType + "\t" + extraInfo + "\t" + s.count + "\t" + String.format(Locale.US, "%.4f", ratio) + "\t" + formatTs(s.lastTimestampMs) + "\t" + String.join(",", s.sampleMmsi) + "\n"); } } } private static void writeMappingDraft(Aggregate aggregate, Map existing, Path path) throws Exception { List> rows = aggregate.vesselTypeExtraStats.entrySet() .stream() .sorted((a, b) -> Long.compare(b.getValue().count, a.getValue().count)) .toList(); try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { w.write("vesselType\textraInfo\tcount\tlastSeenUtc\tsampleMmsi\tproposedSignalKindCode\tdecisionStatus\tnotes\n"); for (Map.Entry row : rows) { String key = row.getKey(); String[] keys = key.split("\u001F", 2); String vesselType = keys.length > 0 ? keys[0] : "N/A"; String extraInfo = keys.length > 1 ? keys[1] : "N/A"; KindStat s = row.getValue(); DecisionFields d = existing.getOrDefault(key, new DecisionFields()); w.write(vesselType + "\t" + extraInfo + "\t" + s.count + "\t" + formatTs(s.lastTimestampMs) + "\t" + String.join(",", s.sampleMmsi) + "\t" + d.proposedCode + "\t" + d.status + "\t" + d.notes + "\n"); } } } private static void writeRunSummary( Path path, String bootstrap, String topic, String groupId, String offsetReset, long startMs, long endMs, long runConsumed, long runParseError, long runMissingVesselType, long totalConsumed ) throws Exception { try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { w.write("bootstrap=" + bootstrap + "\n"); w.write("topic=" + topic + "\n"); w.write("groupId=" + groupId + "\n"); w.write("offsetReset=" + offsetReset + "\n"); w.write("runStartUtc=" + Instant.ofEpochMilli(startMs) + "\n"); w.write("runEndUtc=" + Instant.ofEpochMilli(endMs) + "\n"); w.write("runDurationSec=" + ((endMs - startMs) / 1000) + "\n"); w.write("runConsumed=" + runConsumed + "\n"); w.write("runParseError=" + runParseError + "\n"); w.write("runMissingVesselType=" + runMissingVesselType + "\n"); w.write("totalConsumed=" + totalConsumed + "\n"); } } private static String formatTs(long ts) { if (ts <= 0) { return ""; } return OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC).toString(); } } JAVA javac -cp "$CLASSPATH" "$JAVA_FILE" java -cp "$CLASSPATH:/tmp" SignalkindCollector \ "$BOOTSTRAP_SERVERS" \ "$TOPIC_NAME" \ "$OUTPUT_DIR" \ "$DURATION_SEC" \ "$MAX_MESSAGES" \ "$GROUP_ID" \ "$OFFSET_RESET" echo "[DONE] Output files:" echo " - $OUTPUT_DIR/aggregate_store.json" echo " - $OUTPUT_DIR/vesseltype_stats.tsv" echo " - $OUTPUT_DIR/vesseltype_extra_stats.tsv" echo " - $OUTPUT_DIR/signalkind_mapping_draft.tsv" echo " - $OUTPUT_DIR/last_run_summary.txt"