From 0743fd43220446daa38a9199fac9e4049e7916a0 Mon Sep 17 00:00:00 2001 From: htlee Date: Sat, 14 Feb 2026 21:54:29 +0900 Subject: [PATCH] =?UTF-8?q?chore:=20=EB=B6=88=ED=95=84=EC=9A=94=20?= =?UTF-8?q?=EC=8A=A4=ED=81=AC=EB=A6=BD=ED=8A=B8=20=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - scripts/collect_signalkind_candidates.sh 제거 Co-Authored-By: Claude Opus 4.6 --- scripts/collect_signalkind_candidates.sh | 431 ----------------------- 1 file changed, 431 deletions(-) delete mode 100755 scripts/collect_signalkind_candidates.sh diff --git a/scripts/collect_signalkind_candidates.sh b/scripts/collect_signalkind_candidates.sh deleted file mode 100755 index 8c582b5..0000000 --- a/scripts/collect_signalkind_candidates.sh +++ /dev/null @@ -1,431 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" - -PROFILE="local" -DURATION_SEC=120 -MAX_MESSAGES=200000 -GROUP_ID="signalkind-collector-v1" -OFFSET_RESET="latest" -OUTPUT_DIR="$ROOT_DIR/docs/signalkind" - -usage() { - cat <<'USAGE' -Usage: collect_signalkind_candidates.sh [options] - -Options: - -p, --profile Spring profile (default: local) - -d, --duration-sec Consume duration seconds (default: 120) - -m, --max-messages Max messages per run (default: 200000) - -g, --group-id Kafka consumer group id (default: signalkind-collector-v1) - -r, --offset-reset auto.offset.reset: earliest|latest (default: latest) - -o, --output-dir Output directory (default: docs/signalkind) - -h, --help Show this help - -Examples: - scripts/collect_signalkind_candidates.sh -p local -d 600 - scripts/collect_signalkind_candidates.sh -p local -r earliest -m 50000 -USAGE -} - -while [[ $# -gt 0 ]]; do - case "$1" in - -p|--profile) - PROFILE="$2" - shift 2 - ;; - -d|--duration-sec) - DURATION_SEC="$2" - shift 2 - ;; - -m|--max-messages) - MAX_MESSAGES="$2" - shift 2 - ;; - -g|--group-id) - GROUP_ID="$2" - shift 2 - ;; - -r|--offset-reset) - OFFSET_RESET="$2" - shift 2 - ;; - -o|--output-dir) - OUTPUT_DIR="$2" - shift 2 - ;; - -h|--help) - usage - exit 0 - ;; - *) - echo "Unknown option: $1" - usage - exit 1 - ;; - esac -done - -if [[ "$OFFSET_RESET" != "earliest" && "$OFFSET_RESET" != "latest" ]]; then - echo "Invalid --offset-reset value: $OFFSET_RESET" - exit 1 -fi - -CONFIG_FILE="$ROOT_DIR/src/main/resources/application-${PROFILE}.yml" -if [[ ! -f "$CONFIG_FILE" ]]; then - echo "Profile config not found: $CONFIG_FILE" - exit 1 -fi - -BOOTSTRAP_SERVERS="$(awk '/^[[:space:]]*bootstrap-servers:/{print $2; exit}' "$CONFIG_FILE")" -TOPIC_NAME="$(awk '/^[[:space:]]*topic:/{print $2; exit}' "$CONFIG_FILE")" - -if [[ -z "${BOOTSTRAP_SERVERS:-}" || -z "${TOPIC_NAME:-}" ]]; then - echo "Failed to read bootstrap/topic from $CONFIG_FILE" - exit 1 -fi - -mkdir -p "$OUTPUT_DIR" - -CP_FILE="/tmp/snp-signalkind-cp.txt" -mvn -q -DskipTests dependency:build-classpath -Dmdep.outputFile="$CP_FILE" -CLASSPATH="$(cat "$CP_FILE")" - -JAVA_FILE="/tmp/SignalkindCollector.java" -cat >"$JAVA_FILE" <<'JAVA' -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.*; -import java.util.stream.Collectors; - -public class SignalkindCollector { - - @JsonIgnoreProperties(ignoreUnknown = true) - public static class Aggregate { - public long totalConsumed = 0; - public long totalParseError = 0; - public long totalMissingVesselType = 0; - public Map vesselTypeStats = new HashMap<>(); - public Map vesselTypeExtraStats = new HashMap<>(); - } - - @JsonIgnoreProperties(ignoreUnknown = true) - public static class KindStat { - public long count = 0; - public long lastTimestampMs = 0; - public LinkedHashSet sampleMmsi = new LinkedHashSet<>(); - } - - public static class DecisionFields { - String proposedCode = ""; - String status = "PENDING"; - String notes = ""; - } - - public static void main(String[] args) throws Exception { - if (args.length < 7) { - throw new IllegalArgumentException( - "Usage: SignalkindCollector "); - } - - String bootstrap = args[0]; - String topic = args[1]; - Path outputDir = Path.of(args[2]); - int durationSec = Integer.parseInt(args[3]); - int maxMessages = Integer.parseInt(args[4]); - String groupId = args[5]; - String offsetReset = args[6]; - - Files.createDirectories(outputDir); - - ObjectMapper mapper = new ObjectMapper(); - Path aggregateJsonPath = outputDir.resolve("aggregate_store.json"); - Path vesselTypeTsvPath = outputDir.resolve("vesseltype_stats.tsv"); - Path vesselTypeExtraTsvPath = outputDir.resolve("vesseltype_extra_stats.tsv"); - Path mappingDraftTsvPath = outputDir.resolve("signalkind_mapping_draft.tsv"); - Path runSummaryPath = outputDir.resolve("last_run_summary.txt"); - - Aggregate aggregate = loadAggregate(mapper, aggregateJsonPath); - Map existingDraft = loadExistingDraft(mappingDraftTsvPath); - - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000"); - props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); - - long runStart = System.currentTimeMillis(); - long runDeadline = runStart + (durationSec * 1000L); - long runConsumed = 0; - long runParseError = 0; - long runMissingVesselType = 0; - - try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { - consumer.subscribe(Collections.singleton(topic)); - - while (System.currentTimeMillis() < runDeadline && runConsumed < maxMessages) { - ConsumerRecords records = consumer.poll(java.time.Duration.ofMillis(1000)); - if (records.isEmpty()) { - continue; - } - - for (ConsumerRecord record : records) { - if (runConsumed >= maxMessages) { - break; - } - runConsumed++; - - try { - JsonNode root = mapper.readTree(record.value()); - JsonNode payload = root.path("payload"); - - String vesselType = normalize(payload.path("vesselType").asText(null), "N/A"); - String extraInfo = normalize(payload.path("extraInfo").asText(null), "N/A"); - String mmsi = normalize(payload.path("mmsi").asText(null), ""); - - if ("N/A".equals(vesselType)) { - runMissingVesselType++; - } - - updateStat(aggregate.vesselTypeStats, vesselType, record.timestamp(), mmsi); - updateStat(aggregate.vesselTypeExtraStats, vesselType + "\u001F" + extraInfo, record.timestamp(), mmsi); - } catch (Exception e) { - runParseError++; - } - } - - consumer.commitSync(); - } - } - - aggregate.totalConsumed += runConsumed; - aggregate.totalParseError += runParseError; - aggregate.totalMissingVesselType += runMissingVesselType; - - mapper.writerWithDefaultPrettyPrinter().writeValue(aggregateJsonPath.toFile(), aggregate); - - writeVesselTypeStats(aggregate, vesselTypeTsvPath); - writeVesselTypeExtraStats(aggregate, vesselTypeExtraTsvPath); - writeMappingDraft(aggregate, existingDraft, mappingDraftTsvPath); - writeRunSummary(runSummaryPath, bootstrap, topic, groupId, offsetReset, - runStart, System.currentTimeMillis(), runConsumed, runParseError, runMissingVesselType, aggregate.totalConsumed); - - System.out.println("OUTPUT_DIR=" + outputDir); - System.out.println("RUN_CONSUMED=" + runConsumed); - System.out.println("RUN_PARSE_ERROR=" + runParseError); - System.out.println("RUN_MISSING_VESSEL_TYPE=" + runMissingVesselType); - System.out.println("TOTAL_CONSUMED=" + aggregate.totalConsumed); - System.out.println("TOTAL_VESSEL_TYPES=" + aggregate.vesselTypeStats.size()); - System.out.println("TOTAL_VESSEL_TYPE_EXTRA=" + aggregate.vesselTypeExtraStats.size()); - } - - private static Aggregate loadAggregate(ObjectMapper mapper, Path aggregateJsonPath) { - try { - if (Files.exists(aggregateJsonPath)) { - return mapper.readValue(aggregateJsonPath.toFile(), Aggregate.class); - } - } catch (Exception ignored) { - } - return new Aggregate(); - } - - private static Map loadExistingDraft(Path mappingDraftTsvPath) { - Map map = new HashMap<>(); - if (!Files.exists(mappingDraftTsvPath)) { - return map; - } - - try { - List lines = Files.readAllLines(mappingDraftTsvPath, StandardCharsets.UTF_8); - boolean first = true; - for (String line : lines) { - if (first) { - first = false; - continue; - } - String[] arr = line.split("\t", -1); - if (arr.length < 8) { - continue; - } - String key = arr[0] + "\u001F" + arr[1]; - DecisionFields fields = new DecisionFields(); - fields.proposedCode = arr[5]; - fields.status = arr[6].isBlank() ? "PENDING" : arr[6]; - fields.notes = arr[7]; - map.put(key, fields); - } - } catch (Exception ignored) { - } - return map; - } - - private static void updateStat(Map map, String key, long ts, String mmsi) { - KindStat stat = map.computeIfAbsent(key, k -> new KindStat()); - stat.count += 1; - stat.lastTimestampMs = Math.max(stat.lastTimestampMs, ts); - if (mmsi != null && !mmsi.isBlank() && stat.sampleMmsi.size() < 5) { - stat.sampleMmsi.add(mmsi); - } - } - - private static String normalize(String value, String defaultValue) { - if (value == null) { - return defaultValue; - } - String v = value.trim(); - if (v.isEmpty() || "null".equalsIgnoreCase(v)) { - return defaultValue; - } - return v; - } - - private static void writeVesselTypeStats(Aggregate aggregate, Path path) throws Exception { - List> rows = aggregate.vesselTypeStats.entrySet() - .stream() - .sorted((a, b) -> Long.compare(b.getValue().count, a.getValue().count)) - .toList(); - - try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { - w.write("vesselType\tcount\tratio\tlastSeenUtc\tsampleMmsi\n"); - long total = Math.max(1L, aggregate.totalConsumed); - for (Map.Entry row : rows) { - KindStat s = row.getValue(); - double ratio = (s.count * 100.0) / total; - w.write(row.getKey() + "\t" - + s.count + "\t" - + String.format(Locale.US, "%.4f", ratio) + "\t" - + formatTs(s.lastTimestampMs) + "\t" - + String.join(",", s.sampleMmsi) + "\n"); - } - } - } - - private static void writeVesselTypeExtraStats(Aggregate aggregate, Path path) throws Exception { - List> rows = aggregate.vesselTypeExtraStats.entrySet() - .stream() - .sorted((a, b) -> Long.compare(b.getValue().count, a.getValue().count)) - .toList(); - - try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { - w.write("vesselType\textraInfo\tcount\tratio\tlastSeenUtc\tsampleMmsi\n"); - long total = Math.max(1L, aggregate.totalConsumed); - for (Map.Entry row : rows) { - String[] keys = row.getKey().split("\u001F", 2); - String vesselType = keys.length > 0 ? keys[0] : "N/A"; - String extraInfo = keys.length > 1 ? keys[1] : "N/A"; - KindStat s = row.getValue(); - double ratio = (s.count * 100.0) / total; - w.write(vesselType + "\t" - + extraInfo + "\t" - + s.count + "\t" - + String.format(Locale.US, "%.4f", ratio) + "\t" - + formatTs(s.lastTimestampMs) + "\t" - + String.join(",", s.sampleMmsi) + "\n"); - } - } - } - - private static void writeMappingDraft(Aggregate aggregate, Map existing, Path path) throws Exception { - List> rows = aggregate.vesselTypeExtraStats.entrySet() - .stream() - .sorted((a, b) -> Long.compare(b.getValue().count, a.getValue().count)) - .toList(); - - try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { - w.write("vesselType\textraInfo\tcount\tlastSeenUtc\tsampleMmsi\tproposedSignalKindCode\tdecisionStatus\tnotes\n"); - for (Map.Entry row : rows) { - String key = row.getKey(); - String[] keys = key.split("\u001F", 2); - String vesselType = keys.length > 0 ? keys[0] : "N/A"; - String extraInfo = keys.length > 1 ? keys[1] : "N/A"; - KindStat s = row.getValue(); - - DecisionFields d = existing.getOrDefault(key, new DecisionFields()); - w.write(vesselType + "\t" - + extraInfo + "\t" - + s.count + "\t" - + formatTs(s.lastTimestampMs) + "\t" - + String.join(",", s.sampleMmsi) + "\t" - + d.proposedCode + "\t" - + d.status + "\t" - + d.notes + "\n"); - } - } - } - - private static void writeRunSummary( - Path path, - String bootstrap, - String topic, - String groupId, - String offsetReset, - long startMs, - long endMs, - long runConsumed, - long runParseError, - long runMissingVesselType, - long totalConsumed - ) throws Exception { - try (BufferedWriter w = new BufferedWriter(new FileWriter(path.toFile(), false))) { - w.write("bootstrap=" + bootstrap + "\n"); - w.write("topic=" + topic + "\n"); - w.write("groupId=" + groupId + "\n"); - w.write("offsetReset=" + offsetReset + "\n"); - w.write("runStartUtc=" + Instant.ofEpochMilli(startMs) + "\n"); - w.write("runEndUtc=" + Instant.ofEpochMilli(endMs) + "\n"); - w.write("runDurationSec=" + ((endMs - startMs) / 1000) + "\n"); - w.write("runConsumed=" + runConsumed + "\n"); - w.write("runParseError=" + runParseError + "\n"); - w.write("runMissingVesselType=" + runMissingVesselType + "\n"); - w.write("totalConsumed=" + totalConsumed + "\n"); - } - } - - private static String formatTs(long ts) { - if (ts <= 0) { - return ""; - } - return OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC).toString(); - } -} -JAVA - -javac -cp "$CLASSPATH" "$JAVA_FILE" - -java -cp "$CLASSPATH:/tmp" SignalkindCollector \ - "$BOOTSTRAP_SERVERS" \ - "$TOPIC_NAME" \ - "$OUTPUT_DIR" \ - "$DURATION_SEC" \ - "$MAX_MESSAGES" \ - "$GROUP_ID" \ - "$OFFSET_RESET" - -echo "[DONE] Output files:" -echo " - $OUTPUT_DIR/aggregate_store.json" -echo " - $OUTPUT_DIR/vesseltype_stats.tsv" -echo " - $OUTPUT_DIR/vesseltype_extra_stats.tsv" -echo " - $OUTPUT_DIR/signalkind_mapping_draft.tsv" -echo " - $OUTPUT_DIR/last_run_summary.txt"