From a93942d4d651cc903b75f6dbc2639cceff51cbf8 Mon Sep 17 00:00:00 2001 From: hyojin-kim4 Date: Thu, 12 Feb 2026 10:27:22 +0900 Subject: [PATCH] =?UTF-8?q?:twisted=5Frightwards=5Farrows:=20=EC=9A=A9?= =?UTF-8?q?=EC=96=B4=20=ED=91=9C=EC=A4=80=ED=99=94=20=EB=B0=98=EC=98=81=20?= =?UTF-8?q?(AIS=20=EC=A0=9C=EC=99=B8)=20(#6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * :wrench: Schema/Table 하드코딩 제거 * :fire: BatchSchemaProperties.java 제거 및 @Value Schema 설정 방식 통일 * :card_file_box: 용어 표준화 - Facility Port - Common Code - Risk&Compliance - Movement - Event - PSC - 선박제원정보 --- .../batch/repository/BaseJdbcRepository.java | 18 +- .../snp/batch/global/model/BatchApiLog.java | 2 +- .../repository/AisTargetRepositoryImpl.java | 201 +++---- .../repository/FlagCodeRepositoryImpl.java | 20 +- .../repository/Stat5CodeRepositoryImpl.java | 21 +- ...CompanyComplianceImportRangeJobConfig.java | 8 +- .../config/ComplianceImportJobConfig.java | 6 +- .../ComplianceImportRangeJobConfig.java | 8 +- .../batch/reader/ComplianceDataReader.java | 15 +- .../CompanyComplianceRepositoryImpl.java | 27 +- .../repository/ComplianceRepositoryImpl.java | 40 +- .../batch/config/EventImportJobConfig.java | 5 +- .../batch/repository/EventRepositoryImpl.java | 18 +- .../jobs/event/batch/repository/EventSql.java | 100 +++- .../repository/FacilityRepositoryImpl.java | 30 +- .../config/AnchorageCallsRangeJobConfig.java | 6 +- .../batch/config/BerthCallsRangJobConfig.java | 6 +- .../config/CurrentlyAtRangeJobConfig.java | 6 +- .../batch/config/DarkActivityJobConfig.java | 106 ---- .../config/DarkActivityRangeJobConfig.java | 119 ---- .../config/DestinationsRangeJobConfig.java | 6 +- .../config/ShipPortCallsRangeJobConfig.java | 6 +- .../config/StsOperationRangeJobConfig.java | 6 +- .../config/TerminalCallsRangeJobConfig.java | 6 +- .../batch/config/TransitsRangeJobConfig.java | 6 +- .../movement/batch/dto/DarkActivityDto.java | 29 - .../batch/dto/DarkActivityPositionDto.java | 17 - .../batch/entity/DarkActivityEntity.java | 44 -- .../processor/DarkActivityProcessor.java | 66 --- .../batch/reader/DarkActivityRangeReader.java | 182 ------ .../batch/reader/DarkActivityReader.java | 212 ------- .../AnchorageCallsRepositoryImpl.java | 45 +- .../repository/BerthCallsRepositoryImpl.java | 45 +- .../repository/CurrentlyAtRepositoryImpl.java | 51 +- .../repository/DarkActivityRepository.java | 13 - .../DarkActivityRepositoryImpl.java | 187 ------ .../repository/DestinationRepositoryImpl.java | 37 +- .../repository/PortCallsRepositoryImpl.java | 51 +- .../StsOperationRepositoryImpl.java | 45 +- .../TerminalCallsRepositoryImpl.java | 53 +- .../repository/TransitsRepositoryImpl.java | 25 +- .../batch/writer/DarkActivityWriter.java | 35 -- .../batch/config/PscInspectionJobConfig.java | 6 +- .../batch/dto/PscCertificateDto.java | 67 --- .../batch/dto/PscInspectionDto.java | 3 - .../batch/entity/PscCertificateEntity.java | 45 -- .../batch/entity/PscInspectionEntity.java | 1 - .../processor/PscInspectionProcessor.java | 31 - .../PscAllCertificateRepositoryImpl.java | 62 +- .../repository/PscCertificateRepository.java | 10 - .../PscCertificateRepositoryImpl.java | 139 ----- .../repository/PscDefectRepositoryImpl.java | 67 ++- .../PscInspectionRepositoryImpl.java | 77 ++- .../batch/writer/PscInspectionWriter.java | 10 +- .../batch/config/RiskImportJobConfig.java | 6 +- .../config/RiskImportRangeJobConfig.java | 5 +- .../risk/batch/reader/RiskDataReader.java | 18 +- .../batch/repository/RiskRepositoryImpl.java | 52 +- .../config/ShipDetailImportJobConfig.java | 14 +- .../batch/config/ShipDetailSyncJobConfig.java | 15 +- .../config/ShipDetailUpdateJobConfig.java | 6 +- .../ShipLastPositionUpdateJobConfig.java | 7 +- .../batch/reader/ShipDetailDataReader.java | 14 +- .../reader/ShipLastPositionDataReader.java | 14 +- .../repository/ShipDetailRepositoryImpl.java | 64 +- .../batch/repository/ShipDetailSql.java | 545 ++++++++++++------ .../repository/ShipHashRepositoryImpl.java | 24 +- .../ShipLastPositionRepositoryImpl.java | 21 +- .../batch/repository/ShipRepositoryImpl.java | 20 +- src/main/resources/application-dev.yml | 12 +- src/main/resources/application.yml | 59 +- 71 files changed, 1340 insertions(+), 2003 deletions(-) delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/config/DarkActivityJobConfig.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/config/DarkActivityRangeJobConfig.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/dto/DarkActivityDto.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/dto/DarkActivityPositionDto.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/entity/DarkActivityEntity.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/processor/DarkActivityProcessor.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/reader/DarkActivityRangeReader.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/reader/DarkActivityReader.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/repository/DarkActivityRepository.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/repository/DarkActivityRepositoryImpl.java delete mode 100644 src/main/java/com/snp/batch/jobs/movement/batch/writer/DarkActivityWriter.java delete mode 100644 src/main/java/com/snp/batch/jobs/pscInspection/batch/dto/PscCertificateDto.java delete mode 100644 src/main/java/com/snp/batch/jobs/pscInspection/batch/entity/PscCertificateEntity.java delete mode 100644 src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscCertificateRepository.java delete mode 100644 src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscCertificateRepositoryImpl.java diff --git a/src/main/java/com/snp/batch/common/batch/repository/BaseJdbcRepository.java b/src/main/java/com/snp/batch/common/batch/repository/BaseJdbcRepository.java index 3bf752a..c7634e6 100644 --- a/src/main/java/com/snp/batch/common/batch/repository/BaseJdbcRepository.java +++ b/src/main/java/com/snp/batch/common/batch/repository/BaseJdbcRepository.java @@ -29,9 +29,23 @@ public abstract class BaseJdbcRepository { protected final JdbcTemplate jdbcTemplate; /** - * 테이블명 반환 (하위 클래스에서 구현) + * 대상 스키마 이름 반환 (하위 클래스에서 구현) + * application.yml의 app.batch.target-schema.name 값을 @Value로 주입받아 반환 */ - protected abstract String getTableName(); + protected abstract String getTargetSchema(); + + /** + * 테이블명만 반환 (스키마 제외, 하위 클래스에서 구현) + */ + protected abstract String getSimpleTableName(); + + /** + * 전체 테이블명 반환 (스키마.테이블) + * 하위 클래스에서는 getSimpleTableName()만 구현하면 됨 + */ + protected String getTableName() { + return getTargetSchema() + "." + getSimpleTableName(); + } /** * ID 컬럼명 반환 (기본값: "id") diff --git a/src/main/java/com/snp/batch/global/model/BatchApiLog.java b/src/main/java/com/snp/batch/global/model/BatchApiLog.java index 71161c0..cdd03a0 100644 --- a/src/main/java/com/snp/batch/global/model/BatchApiLog.java +++ b/src/main/java/com/snp/batch/global/model/BatchApiLog.java @@ -7,7 +7,7 @@ import org.hibernate.annotations.CreationTimestamp; import java.time.LocalDateTime; @Entity -@Table(name = "batch_api_log", schema = "t_snp_data") +@Table(name = "batch_api_log", schema = "t_std_snp_data") @Getter @NoArgsConstructor(access = AccessLevel.PROTECTED) @AllArgsConstructor diff --git a/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepositoryImpl.java index 77803fb..a32d2dc 100644 --- a/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/aistarget/batch/repository/AisTargetRepositoryImpl.java @@ -1,8 +1,8 @@ package com.snp.batch.jobs.aistarget.batch.repository; import com.snp.batch.jobs.aistarget.batch.entity.AisTargetEntity; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -19,104 +19,111 @@ import java.util.Optional; /** * AIS Target Repository 구현체 * - * 테이블: snp_data.ais_target + * 테이블: {targetSchema}.ais_target * PK: mmsi + message_timestamp (복합키) */ @Slf4j @Repository -@RequiredArgsConstructor public class AisTargetRepositoryImpl implements AisTargetRepository { private final JdbcTemplate jdbcTemplate; + private final String tableName; + private final String upsertSql; - private static final String TABLE_NAME = "snp_data.ais_target"; + public AisTargetRepositoryImpl(JdbcTemplate jdbcTemplate, + @Value("${app.batch.target-schema.name}") String targetSchema) { + this.jdbcTemplate = jdbcTemplate; + this.tableName = targetSchema + ".ais_target"; + this.upsertSql = buildUpsertSql(targetSchema); + } - // ==================== UPSERT SQL ==================== + private String buildUpsertSql(String schema) { + return """ + INSERT INTO %s.ais_target ( + mmsi, message_timestamp, imo, name, callsign, vessel_type, extra_info, + lat, lon, geom, + heading, sog, cog, rot, + length, width, draught, length_bow, length_stern, width_port, width_starboard, + destination, eta, status, + age_minutes, position_accuracy, timestamp_utc, repeat_indicator, raim_flag, + radio_status, regional, regional2, spare, spare2, + ais_version, position_fix_type, dte, band_flag, + received_date, collected_at, created_at, updated_at, + tonnes_cargo, in_sts, on_berth, dwt, anomalous, + destination_port_id, destination_tidied, destination_unlocode, imo_verified, last_static_update_received, + lpc_code, message_type, "source", station_id, zone_id + ) VALUES ( + ?, ?, ?, ?, ?, ?, ?, + ?, ?, ST_SetSRID(ST_MakePoint(?, ?), 4326), + ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, + ?, ?, NOW(), NOW(), + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ? + ) + ON CONFLICT (mmsi, message_timestamp) DO UPDATE SET + imo = EXCLUDED.imo, + name = EXCLUDED.name, + callsign = EXCLUDED.callsign, + vessel_type = EXCLUDED.vessel_type, + extra_info = EXCLUDED.extra_info, + lat = EXCLUDED.lat, + lon = EXCLUDED.lon, + geom = EXCLUDED.geom, + heading = EXCLUDED.heading, + sog = EXCLUDED.sog, + cog = EXCLUDED.cog, + rot = EXCLUDED.rot, + length = EXCLUDED.length, + width = EXCLUDED.width, + draught = EXCLUDED.draught, + length_bow = EXCLUDED.length_bow, + length_stern = EXCLUDED.length_stern, + width_port = EXCLUDED.width_port, + width_starboard = EXCLUDED.width_starboard, + destination = EXCLUDED.destination, + eta = EXCLUDED.eta, + status = EXCLUDED.status, + age_minutes = EXCLUDED.age_minutes, + position_accuracy = EXCLUDED.position_accuracy, + timestamp_utc = EXCLUDED.timestamp_utc, + repeat_indicator = EXCLUDED.repeat_indicator, + raim_flag = EXCLUDED.raim_flag, + radio_status = EXCLUDED.radio_status, + regional = EXCLUDED.regional, + regional2 = EXCLUDED.regional2, + spare = EXCLUDED.spare, + spare2 = EXCLUDED.spare2, + ais_version = EXCLUDED.ais_version, + position_fix_type = EXCLUDED.position_fix_type, + dte = EXCLUDED.dte, + band_flag = EXCLUDED.band_flag, + received_date = EXCLUDED.received_date, + collected_at = EXCLUDED.collected_at, + updated_at = NOW(), + tonnes_cargo = EXCLUDED.tonnes_cargo, + in_sts = EXCLUDED.in_sts, + on_berth = EXCLUDED.on_berth, + dwt = EXCLUDED.dwt, + anomalous = EXCLUDED.anomalous, + destination_port_id = EXCLUDED.destination_port_id, + destination_tidied = EXCLUDED.destination_tidied, + destination_unlocode = EXCLUDED.destination_unlocode, + imo_verified = EXCLUDED.imo_verified, + last_static_update_received = EXCLUDED.last_static_update_received, + lpc_code = EXCLUDED.lpc_code, + message_type = EXCLUDED.message_type, + "source" = EXCLUDED."source", + station_id = EXCLUDED.station_id, + zone_id = EXCLUDED.zone_id + """.formatted(schema); + } - private static final String UPSERT_SQL = """ - INSERT INTO snp_data.ais_target ( - mmsi, message_timestamp, imo, name, callsign, vessel_type, extra_info, - lat, lon, geom, - heading, sog, cog, rot, - length, width, draught, length_bow, length_stern, width_port, width_starboard, - destination, eta, status, - age_minutes, position_accuracy, timestamp_utc, repeat_indicator, raim_flag, - radio_status, regional, regional2, spare, spare2, - ais_version, position_fix_type, dte, band_flag, - received_date, collected_at, created_at, updated_at, - tonnes_cargo, in_sts, on_berth, dwt, anomalous, - destination_port_id, destination_tidied, destination_unlocode, imo_verified, last_static_update_received, - lpc_code, message_type, "source", station_id, zone_id - ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, - ?, ?, ST_SetSRID(ST_MakePoint(?, ?), 4326), - ?, ?, ?, ?, - ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, - ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, - ?, ?, ?, ?, - ?, ?, NOW(), NOW(), - ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, - ?, ?, ?, ?, ? - ) - ON CONFLICT (mmsi, message_timestamp) DO UPDATE SET - imo = EXCLUDED.imo, - name = EXCLUDED.name, - callsign = EXCLUDED.callsign, - vessel_type = EXCLUDED.vessel_type, - extra_info = EXCLUDED.extra_info, - lat = EXCLUDED.lat, - lon = EXCLUDED.lon, - geom = EXCLUDED.geom, - heading = EXCLUDED.heading, - sog = EXCLUDED.sog, - cog = EXCLUDED.cog, - rot = EXCLUDED.rot, - length = EXCLUDED.length, - width = EXCLUDED.width, - draught = EXCLUDED.draught, - length_bow = EXCLUDED.length_bow, - length_stern = EXCLUDED.length_stern, - width_port = EXCLUDED.width_port, - width_starboard = EXCLUDED.width_starboard, - destination = EXCLUDED.destination, - eta = EXCLUDED.eta, - status = EXCLUDED.status, - age_minutes = EXCLUDED.age_minutes, - position_accuracy = EXCLUDED.position_accuracy, - timestamp_utc = EXCLUDED.timestamp_utc, - repeat_indicator = EXCLUDED.repeat_indicator, - raim_flag = EXCLUDED.raim_flag, - radio_status = EXCLUDED.radio_status, - regional = EXCLUDED.regional, - regional2 = EXCLUDED.regional2, - spare = EXCLUDED.spare, - spare2 = EXCLUDED.spare2, - ais_version = EXCLUDED.ais_version, - position_fix_type = EXCLUDED.position_fix_type, - dte = EXCLUDED.dte, - band_flag = EXCLUDED.band_flag, - received_date = EXCLUDED.received_date, - collected_at = EXCLUDED.collected_at, - updated_at = NOW(), - tonnes_cargo = EXCLUDED.tonnes_cargo, - in_sts = EXCLUDED.in_sts, - on_berth = EXCLUDED.on_berth, - dwt = EXCLUDED.dwt, - anomalous = EXCLUDED.anomalous, - destination_port_id = EXCLUDED.destination_port_id, - destination_tidied = EXCLUDED.destination_tidied, - destination_unlocode = EXCLUDED.destination_unlocode, - imo_verified = EXCLUDED.imo_verified, - last_static_update_received = EXCLUDED.last_static_update_received, - lpc_code = EXCLUDED.lpc_code, - message_type = EXCLUDED.message_type, - "source" = EXCLUDED."source", - station_id = EXCLUDED.station_id, - zone_id = EXCLUDED.zone_id - """; // ==================== RowMapper ==================== @@ -181,7 +188,7 @@ public class AisTargetRepositoryImpl implements AisTargetRepository { @Override public Optional findByMmsiAndMessageTimestamp(Long mmsi, OffsetDateTime messageTimestamp) { - String sql = "SELECT * FROM " + TABLE_NAME + " WHERE mmsi = ? AND message_timestamp = ?"; + String sql = "SELECT * FROM " + tableName + " WHERE mmsi = ? AND message_timestamp = ?"; List results = jdbcTemplate.query(sql, rowMapper, mmsi, toTimestamp(messageTimestamp)); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } @@ -193,7 +200,7 @@ public class AisTargetRepositoryImpl implements AisTargetRepository { WHERE mmsi = ? ORDER BY message_timestamp DESC LIMIT 1 - """.formatted(TABLE_NAME); + """.formatted(tableName); List results = jdbcTemplate.query(sql, rowMapper, mmsi); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } @@ -210,7 +217,7 @@ public class AisTargetRepositoryImpl implements AisTargetRepository { FROM %s WHERE mmsi = ANY(?) ORDER BY mmsi, message_timestamp DESC - """.formatted(TABLE_NAME); + """.formatted(tableName); Long[] mmsiArray = mmsiList.toArray(new Long[0]); return jdbcTemplate.query(sql, rowMapper, (Object) mmsiArray); @@ -223,7 +230,7 @@ public class AisTargetRepositoryImpl implements AisTargetRepository { WHERE mmsi = ? AND message_timestamp BETWEEN ? AND ? ORDER BY message_timestamp ASC - """.formatted(TABLE_NAME); + """.formatted(tableName); return jdbcTemplate.query(sql, rowMapper, mmsi, toTimestamp(start), toTimestamp(end)); } @@ -245,7 +252,7 @@ public class AisTargetRepositoryImpl implements AisTargetRepository { ? ) ORDER BY mmsi, message_timestamp DESC - """.formatted(TABLE_NAME); + """.formatted(tableName); return jdbcTemplate.query(sql, rowMapper, toTimestamp(start), toTimestamp(end), @@ -261,7 +268,7 @@ public class AisTargetRepositoryImpl implements AisTargetRepository { log.info("AIS Target 배치 UPSERT 시작: {} 건", entities.size()); - jdbcTemplate.batchUpdate(UPSERT_SQL, entities, 1000, (ps, entity) -> { + jdbcTemplate.batchUpdate(upsertSql, entities, 1000, (ps, entity) -> { int idx = 1; // PK ps.setLong(idx++, entity.getMmsi()); @@ -336,7 +343,7 @@ public class AisTargetRepositoryImpl implements AisTargetRepository { @Override public long count() { - String sql = "SELECT COUNT(*) FROM " + TABLE_NAME; + String sql = "SELECT COUNT(*) FROM " + tableName; Long count = jdbcTemplate.queryForObject(sql, Long.class); return count != null ? count : 0L; } @@ -344,7 +351,7 @@ public class AisTargetRepositoryImpl implements AisTargetRepository { @Override @Transactional public int deleteOlderThan(OffsetDateTime threshold) { - String sql = "DELETE FROM " + TABLE_NAME + " WHERE message_timestamp < ?"; + String sql = "DELETE FROM " + tableName + " WHERE message_timestamp < ?"; int deleted = jdbcTemplate.update(sql, toTimestamp(threshold)); log.info("AIS Target 오래된 데이터 삭제 완료: {} 건 (기준: {})", deleted, threshold); return deleted; diff --git a/src/main/java/com/snp/batch/jobs/common/batch/repository/FlagCodeRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/common/batch/repository/FlagCodeRepositoryImpl.java index d87fb20..3a78ad8 100644 --- a/src/main/java/com/snp/batch/jobs/common/batch/repository/FlagCodeRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/common/batch/repository/FlagCodeRepositoryImpl.java @@ -3,6 +3,7 @@ package com.snp.batch.jobs.common.batch.repository; import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.jobs.common.batch.entity.FlagCodeEntity; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -15,18 +16,29 @@ import java.util.List; @Repository("FlagCodeRepository") public class FlagCodeRepositoryImpl extends BaseJdbcRepository implements FlagCodeRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.code-002}") + private String tableName; + public FlagCodeRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } + @Override + protected String getTargetSchema() { + return targetSchema; + } + @Override protected String getEntityName() { return "FlagCodeEntity"; } @Override - protected String getTableName() { - return "t_snp_data.flagcode"; + protected String getSimpleTableName() { + return tableName; } @@ -39,8 +51,8 @@ public class FlagCodeRepositoryImpl extends BaseJdbcRepository implements Stat5CodeRepository{ + + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.code-001}") + private String tableName; + public Stat5CodeRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } + @Override + protected String getTargetSchema() { + return targetSchema; + } + @Override protected String getEntityName() { return "Stat5CodeEntity"; } @Override - protected String getTableName() { - return "t_snp_data.stat5code"; + protected String getSimpleTableName() { + return tableName; } @Override @@ -47,8 +60,8 @@ public class Stat5CodeRepositoryImpl extends BaseJdbcRepository>>>> Company Compliance History Value Change Manage 프로시저 호출 완료"); return RepeatStatus.FINISHED; diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportJobConfig.java b/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportJobConfig.java index 5dfff81..1511042 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportJobConfig.java @@ -14,6 +14,7 @@ import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -30,6 +31,9 @@ public class ComplianceImportJobConfig extends BaseJobConfig createReader() { - return new ComplianceDataReader(maritimeServiceApiWebClient, jdbcTemplate); + return new ComplianceDataReader(maritimeServiceApiWebClient, jdbcTemplate, targetSchema); } @Override diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java index 9c4d624..6e09eb6 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/config/ComplianceImportRangeJobConfig.java @@ -46,9 +46,12 @@ public class ComplianceImportRangeJobConfig extends BaseMultiStepJobConfig>>>> Compliance History Value Change Manage 프로시저 호출 완료"); return RepeatStatus.FINISHED; diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataReader.java b/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataReader.java index aff76d7..97ae451 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataReader.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/reader/ComplianceDataReader.java @@ -19,14 +19,16 @@ public class ComplianceDataReader extends BaseApiReader { // 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복) private final JdbcTemplate jdbcTemplate; + private final String targetSchema; private List allImoNumbers; private int currentBatchIndex = 0; private final int batchSize = 100; - public ComplianceDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) { + public ComplianceDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, String targetSchema) { super(webClient); this.jdbcTemplate = jdbcTemplate; + this.targetSchema = targetSchema; enableChunkMode(); // ✨ Chunk 모드 활성화 } @@ -47,16 +49,17 @@ public class ComplianceDataReader extends BaseApiReader { } private String getTargetTable(){ - return "snp_data.core20"; + return targetSchema + ".ship_data"; + } + + private String getImoQuery() { + return "select imo_number as ihslrorimoshipno from " + getTargetTable() + " order by imo_number"; } - private String GET_CORE_IMO_LIST = -// "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno"; - "select imo_number as ihslrorimoshipno from snp_data.ship_data order by imo_number"; @Override protected void beforeFetch(){ log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName()); - allImoNumbers = jdbcTemplate.queryForList(GET_CORE_IMO_LIST, String.class); + allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class); int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); diff --git a/src/main/java/com/snp/batch/jobs/compliance/batch/repository/CompanyComplianceRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/compliance/batch/repository/CompanyComplianceRepositoryImpl.java index 7b26406..08ded44 100644 --- a/src/main/java/com/snp/batch/jobs/compliance/batch/repository/CompanyComplianceRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/compliance/batch/repository/CompanyComplianceRepositoryImpl.java @@ -3,6 +3,7 @@ package com.snp.batch.jobs.compliance.batch.repository; import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.jobs.compliance.batch.entity.CompanyComplianceEntity; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -14,13 +15,25 @@ import java.util.List; @Slf4j @Repository("CompanyComplianceRepository") public class CompanyComplianceRepositoryImpl extends BaseJdbcRepository implements CompanyComplianceRepository{ + + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.risk-compliance-003}") + private String tableName; + public CompanyComplianceRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "t_snp_data.tb_company_compliance_info"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -42,11 +55,11 @@ public class CompanyComplianceRepositoryImpl extends BaseJdbcRepository implements ComplianceRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.risk-compliance-002}") + private String tableName; + public ComplianceRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "t_snp_data.compliance"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -43,18 +55,18 @@ public class ComplianceRepositoryImpl extends BaseJdbcRepository implements EventRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.event-001}") + private String tableName; + public EventRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return null; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override diff --git a/src/main/java/com/snp/batch/jobs/event/batch/repository/EventSql.java b/src/main/java/com/snp/batch/jobs/event/batch/repository/EventSql.java index 27a7010..f6c7a50 100644 --- a/src/main/java/com/snp/batch/jobs/event/batch/repository/EventSql.java +++ b/src/main/java/com/snp/batch/jobs/event/batch/repository/EventSql.java @@ -1,21 +1,65 @@ package com.snp.batch.jobs.event.batch.repository; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * Event 관련 SQL 생성 클래스 + * application.yml의 app.batch.target-schema.name 값을 사용 + */ +@Component public class EventSql { + + private static String targetSchema; + private static String eventTable; + private static String eventCargoTable; + private static String eventRelationshipTable; + private static String eventHumanCasualtyTable; + + @Value("${app.batch.target-schema.name}") + public void setTargetSchema(String schema) { + EventSql.targetSchema = schema; + } + + @Value("${app.batch.target-schema.tables.event-001}") + public void setEventTable(String table) { + EventSql.eventTable = table; + } + + @Value("${app.batch.target-schema.tables.event-002}") + public void setEventCargoTable(String table) { + EventSql.eventCargoTable = table; + } + + @Value("${app.batch.target-schema.tables.event-004}") + public void setEventRelationshipTable(String table) { + EventSql.eventRelationshipTable = table; + } + + @Value("${app.batch.target-schema.tables.event-003}") + public void setEventHumanCasualtyTable(String table) { + EventSql.eventHumanCasualtyTable = table; + } + + public static String getTargetSchema() { + return targetSchema; + } + public static String getEventDetailUpdateSql(){ return """ - INSERT INTO t_snp_data.event ( - event_id, incident_id, ihslrorimoshipno, published_date, event_start_date, event_end_date, - attempted_boarding, cargo_loading_status_code, casualty_action, - casualty_zone, casualty_zone_code, component2, country_code, - date_of_build, description, environment_location, location_name, - marsden_grid_reference, town_name, event_type, event_type_detail, - event_type_detail_id, event_type_id, fired_upon, headline, - ldt_at_time, significance, weather, pollutant, pollutant_quantity, - pollutant_unit, registered_owner_code_at_time, registered_owner_at_time, - registered_owner_country_code_at_time, registered_owner_country_at_time, - vessel_dwt, vessel_flag_code, vessel_flag_decode, vessel_gt, - vessel_name, vessel_type, vessel_type_decode, - job_execution_id, created_by + INSERT INTO %s.%s ( + event_id, acdnt_id, imo_no, pstg_ymd, event_start_day, event_end_day, + embrk_try_yn, cargo_capacity_status_cd, acdnt_actn, + acdnt_zone, acdnt_zone_cd, cfg_cmpnt_two, country_cd, + build_ymd, event_expln, env_position, position_nm, + masd_grid_ref, cty_nm, event_type, event_type_dtl, + event_type_dtl_id, event_type_id, firedupon_yn, sj, + ldt_timpt, signfct, wethr, pltn_matral, pltn_matral_cnt, + pltn_matral_unit, reg_shponr_cd_hr, reg_shponr_hr, + reg_shponr_country_cd_hr, reg_shponr_country_hr, + ship_dwt, ship_flg_cd, ship_flg_decd, ship_gt, + ship_nm, ship_type, ship_type_nm, + job_execution_id, creatr_id ) VALUES ( ?, ?, ?, ?::timestamptz,?::timestamptz,?::timestamptz, ?, ?, ?, ?, ?, ?, @@ -24,49 +68,49 @@ public class EventSql { ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ); - """; + """.formatted(targetSchema, eventTable); } public static String getEventCargoSql(){ return """ - INSERT INTO t_snp_data.event_cargo ( - event_id, "sequence", ihslrorimoshipno, "type", quantity, - unit_short, unit, cargo_damage, dangerous, "text", - job_execution_id, created_by + INSERT INTO %s.%s ( + event_id, event_seq, imo_no, "type", cnt, + unit_abbr, unit, cargo_damg, risk_yn, "text", + job_execution_id, creatr_id ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ); - """; + """.formatted(targetSchema, eventCargoTable); } public static String getEventRelationshipSql(){ return """ - INSERT INTO t_snp_data.event_relationship ( - incident_id, event_id, relationship_type, relationship_type_code, - event_id_2, event_type, event_type_code, - job_execution_id, created_by + INSERT INTO %s.%s ( + acdnt_id, event_id, rel_type, rel_type_cd, + event_id_two, event_type, event_type_cd, + job_execution_id, creatr_id ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ? ); - """; + """.formatted(targetSchema, eventRelationshipTable); } public static String getEventHumanCasualtySql(){ return """ - INSERT INTO t_snp_data.event_humancasualty ( - event_id, "scope", "type", qualifier, "count", - job_execution_id, created_by + INSERT INTO %s.%s ( + event_id, "scope", "type", qualfr, cnt, + job_execution_id, creatr_id ) VALUES ( ?, ?, ?, ?, ?, ?, ? ); - """; + """.formatted(targetSchema, eventHumanCasualtyTable); } } diff --git a/src/main/java/com/snp/batch/jobs/facility/batch/repository/FacilityRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/facility/batch/repository/FacilityRepositoryImpl.java index 4eaf325..b06eeab 100644 --- a/src/main/java/com/snp/batch/jobs/facility/batch/repository/FacilityRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/facility/batch/repository/FacilityRepositoryImpl.java @@ -3,6 +3,7 @@ package com.snp.batch.jobs.facility.batch.repository; import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.jobs.facility.batch.entity.PortEntity; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -15,13 +16,24 @@ import java.util.List; @Repository("FacilityRepository") public class FacilityRepositoryImpl extends BaseJdbcRepository implements FacilityRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.facility-001}") + private String tableName; + public FacilityRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "t_snp_data.facility_port"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -43,13 +55,13 @@ public class FacilityRepositoryImpl extends BaseJdbcRepository protected String getUpdateSql() { return """ INSERT INTO %s( - port_ID, old_ID, status, port_Name, unlocode, countryCode, country_Name, region_Name, continent_Name, master_POID, - dec_Lat, dec_Long, position_lat, position_long, position_z, position_m, position_hasZ, position_hasM, position_isNull, position_stSrid, time_Zone, dayLight_Saving_Time, - maximum_Draft, max_LOA, max_Beam, max_DWT, max_Offshore_Draught, max_Offshore_LOA, max_Offshore_BCM, max_Offshore_DWT, - breakbulk_Facilities, container_Facilities, dry_Bulk_Facilities, liquid_Facilities, roRo_Facilities, passenger_Facilities, dry_Dock_Facilities, - lpG_Facilities, lnG_Facilities, lnG_Bunker, dO_Bunker, fO_Bunker, ispS_Compliant, csI_Compliant, free_Trade_Zone, ecO_Port, emission_Control_Area, wS_Port, - last_Update, entry_Date, - job_execution_id, created_by + port_id, bfr_id, status, port_nm, un_port_cd, country_cd, country_nm, areanm, cntntnm, mst_port_id, + lat_decml, lon_decml, position_lat, position_lon, position_z_val, position_mval_val, z_val_has_yn, mval_val_has_yn, position_nul_yn, position_sts_id, hr_zone, daylgt_save_hr, + max_draft, max_whlnth, max_beam, max_dwt, max_sea_draft, max_sea_whlnth, max_sea_bcm, max_sea_dwt, + bale_cargo_facility, cntnr_facility, case_cargo_facility, liquid_cargo_facility, roro_facility, paxfclty, drydkfclty, + lpg_facility, lng_facility, lng_bnkr, do_bnkr, fo_bnkr, isps_compliance_yn, csi_compliance_yn, free_trd_zone, ecfrd_port, emsn_ctrl_area, ws_port, + last_mdfcn_dt, reg_ymd, + job_execution_id, creatr_id ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java index e2fc384..9ef17cd 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/AnchorageCallsRangeJobConfig.java @@ -44,9 +44,13 @@ public class AnchorageCallsRangeJobConfig extends BaseMultiStepJobConfig { - - private final DarkActivityProcessor darkActivityProcessor; - private final DarkActivityWriter darkActivityWriter; - private final JdbcTemplate jdbcTemplate; - private final WebClient maritimeApiWebClient; - - public DarkActivityJobConfig( - JobRepository jobRepository, - PlatformTransactionManager transactionManager, - DarkActivityProcessor darkActivityProcessor, - DarkActivityWriter darkActivityWriter, JdbcTemplate jdbcTemplate, - @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, - ObjectMapper objectMapper) { // ObjectMapper 주입 추가 - super(jobRepository, transactionManager); - this.darkActivityProcessor = darkActivityProcessor; - this.darkActivityWriter = darkActivityWriter; - this.jdbcTemplate = jdbcTemplate; - this.maritimeApiWebClient = maritimeApiWebClient; - } - - @Override - protected String getJobName() { - return "DarkActivityImportJob"; - } - - @Override - protected String getStepName() { - return "DarkActivityImportStep"; - } - - @Override - protected ItemReader createReader() { // 타입 변경 - // Reader 생성자 수정: ObjectMapper를 전달합니다. - return new DarkActivityReader(maritimeApiWebClient, jdbcTemplate); - } - - @Override - protected ItemProcessor createProcessor() { - return darkActivityProcessor; - } - - @Override - protected ItemWriter createWriter() { // 타입 변경 - return darkActivityWriter; - } - - @Override - protected int getChunkSize() { - return 5; // API에서 100개씩 가져오므로 chunk도 100으로 설정 - } - - @Bean(name = "DarkActivityImportJob") - public Job darkActivityImportJob() { - return job(); - } - - @Bean(name = "DarkActivityImportStep") - public Step darkActivityImportStep() { - return step(); - } -} diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/DarkActivityRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/DarkActivityRangeJobConfig.java deleted file mode 100644 index bddd338..0000000 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/DarkActivityRangeJobConfig.java +++ /dev/null @@ -1,119 +0,0 @@ -package com.snp.batch.jobs.movement.batch.config; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.snp.batch.common.batch.config.BaseJobConfig; -import com.snp.batch.jobs.movement.batch.dto.DarkActivityDto; -import com.snp.batch.jobs.movement.batch.entity.DarkActivityEntity; -import com.snp.batch.jobs.movement.batch.processor.DarkActivityProcessor; -import com.snp.batch.jobs.movement.batch.writer.DarkActivityWriter; -import com.snp.batch.jobs.movement.batch.reader.DarkActivityRangeReader; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemWriter; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.web.reactive.function.client.WebClient; - -/** - * 선박 상세 정보 Import Job Config - * - * 특징: - * - ship_data 테이블에서 IMO 번호 조회 - * - IMO 번호를 100개씩 배치로 분할 - * - Maritime API GetShipsByIHSLRorIMONumbers 호출 - * TODO : GetShipsByIHSLRorIMONumbersAll 호출로 변경 - * - 선박 상세 정보를 ship_detail 테이블에 저장 (UPSERT) - * - * 데이터 흐름: - * DarkActivityReader (ship_data → Maritime API) - * ↓ (DarkActivityDto) - * DarkActivityProcessor - * ↓ (DarkActivityEntity) - * DarkActivityWriter - * ↓ (t_darkactivity 테이블) - */ - -@Slf4j -@Configuration -public class DarkActivityRangeJobConfig extends BaseJobConfig { - - private final DarkActivityProcessor darkActivityProcessor; - private final DarkActivityWriter darkActivityWriter; - private final DarkActivityRangeReader darkActivityRangeReader; - private final JdbcTemplate jdbcTemplate; - private final WebClient maritimeApiWebClient; - - public DarkActivityRangeJobConfig( - JobRepository jobRepository, - PlatformTransactionManager transactionManager, - DarkActivityProcessor darkActivityProcessor, - DarkActivityWriter darkActivityWriter, JdbcTemplate jdbcTemplate, - @Qualifier("maritimeServiceApiWebClient") WebClient maritimeApiWebClient, - ObjectMapper objectMapper, DarkActivityRangeReader darkActivityRangeReader) { // ObjectMapper 주입 추가 - super(jobRepository, transactionManager); - this.darkActivityProcessor = darkActivityProcessor; - this.darkActivityWriter = darkActivityWriter; - this.jdbcTemplate = jdbcTemplate; - this.maritimeApiWebClient = maritimeApiWebClient; - this.darkActivityRangeReader = darkActivityRangeReader; - } - - @Override - protected String getJobName() { - return "DarkActivityRangeImportJob"; - } - - @Override - protected String getStepName() { - return "DarkActivityRangeImportStep"; - } - - @Override - protected ItemReader createReader() { // 타입 변경 - // Reader 생성자 수정: ObjectMapper를 전달합니다. - return darkActivityRangeReader; - } - @Bean - @StepScope - public DarkActivityRangeReader darkActivityReader( - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate - ) { - // jobParameters 없으면 null 넘어오고 Reader에서 default 처리 - return new DarkActivityRangeReader(maritimeApiWebClient, startDate, stopDate); - } - - @Override - protected ItemProcessor createProcessor() { - return darkActivityProcessor; - } - - @Override - protected ItemWriter createWriter() { // 타입 변경 - return darkActivityWriter; - } - - @Override - protected int getChunkSize() { - return 5000; // API에서 100개씩 가져오므로 chunk도 100으로 설정 - } - - @Bean(name = "DarkActivityRangeImportJob") - public Job darkActivityRangeImportJob() { - return job(); - } - - @Bean(name = "DarkActivityRangeImportStep") - public Step darkActivityRangeImportStep() { - return step(); - } -} diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java index 9612c60..0049772 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/config/DestinationsRangeJobConfig.java @@ -43,9 +43,13 @@ public class DestinationsRangeJobConfig extends BaseMultiStepJobConfig { - - private final ObjectMapper objectMapper; - - public DarkActivityProcessor(ObjectMapper objectMapper) { - this.objectMapper = objectMapper; - } - - @Override - protected DarkActivityEntity processItem(DarkActivityDto dto) throws Exception { - log.debug("선박 상세 정보 처리 시작: imoNumber={}, facilityName={}", - dto.getImolRorIHSNumber(), dto.getFacilityName()); - - JsonNode positionNode = null; - if (dto.getPosition() != null) { - // Position 객체를 JsonNode로 변환 - positionNode = objectMapper.valueToTree(dto.getPosition()); - } - - DarkActivityEntity entity = DarkActivityEntity.builder() - .movementType(dto.getMovementType()) - .imolRorIHSNumber(dto.getImolRorIHSNumber()) - .movementDate(LocalDateTime.parse(dto.getMovementDate())) - .facilityId(dto.getFacilityId()) - .facilityName(dto.getFacilityName()) - .facilityType(dto.getFacilityType()) - .subFacilityId(dto.getSubFacilityId()) - .subFacilityName(dto.getSubFacilityName()) - .subFacilityType(dto.getSubFacilityType()) - .countryCode(dto.getCountryCode()) - .countryName(dto.getCountryName()) - .draught(dto.getDraught()) - .latitude(dto.getLatitude()) - .longitude(dto.getLongitude()) - .position(positionNode) // JsonNode로 매핑 - .eventStartDate(LocalDateTime.parse(dto.getEventStartDate())) - .build(); - - return entity; - } - -} diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/DarkActivityRangeReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/DarkActivityRangeReader.java deleted file mode 100644 index 020f381..0000000 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/DarkActivityRangeReader.java +++ /dev/null @@ -1,182 +0,0 @@ -package com.snp.batch.jobs.movement.batch.reader; - -import com.snp.batch.common.batch.reader.BaseApiReader; -import com.snp.batch.jobs.movement.batch.dto.DarkActivityDto; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.web.reactive.function.client.WebClient; - -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; -import java.util.List; - -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - * - * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - * - * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - * - * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ -@Slf4j -@StepScope -public class DarkActivityRangeReader extends BaseApiReader { - - - private List allData; - // DB 해시값을 저장할 맵 - private int currentBatchIndex = 0; - private final int batchSize = 5000; - - // @Value("#{jobParameters['startDate']}") - private String startDate; -// private String startDate = "2025-01-01"; - - // @Value("#{jobParameters['stopDate']}") - private String stopDate; -// private String stopDate = "2025-12-31"; - - /*public DarkActivityRangeReader(WebClient webClient) { - super(webClient); - enableChunkMode(); // ✨ Chunk 모드 활성화 - }*/ - public DarkActivityRangeReader(WebClient webClient, - @Value("#{jobParameters['startDate']}") String startDate, - @Value("#{jobParameters['stopDate']}") String stopDate) { - super(webClient); - // 날짜가 없으면 전날 하루 기준 - if (startDate == null || startDate.isBlank() || stopDate == null || stopDate.isBlank()) { - LocalDate yesterday = LocalDate.now().minusDays(1); - this.startDate = yesterday.atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - this.stopDate = yesterday.plusDays(1).atStartOfDay().format(DateTimeFormatter.ISO_DATE_TIME) + "Z"; - } else { - this.startDate = startDate; - this.stopDate = stopDate; - } - - enableChunkMode(); // ✨ Chunk 모드 활성화 - } - - @Override - protected String getReaderName() { - return "DarkActivityReader"; - } - - @Override - protected void resetCustomState() { - this.currentBatchIndex = 0; - this.allData = null; - } - - @Override - protected String getApiPath() { - return "/Movements/DarkActivity"; - } - - @Override - protected String getApiBaseUrl() { - return "https://webservices.maritime.spglobal.com"; - } - - private static final String GET_ALL_IMO_QUERY = - "SELECT imo_number FROM ship_data ORDER BY id"; -// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_darkactivity) ORDER BY imo_number"; - - /** - * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 - */ - @Override - protected void beforeFetch() { - log.info("[{}] 요청 날짜 범위: {} → {}", getReaderName(), startDate, stopDate); - } - - /** - * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 - * - * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 - * - * @return 다음 배치 100건 (더 이상 없으면 null) - */ - @Override - protected List fetchNextBatch() throws Exception { - - // 모든 배치 처리 완료 확인 - if (allData == null ) { - log.info("[{}] 최초 API 조회 실행: {} ~ {}", getReaderName(), startDate, stopDate); - allData = callApiWithBatch(startDate, stopDate); - - if (allData == null || allData.isEmpty()) { - log.warn("[{}] 조회된 데이터 없음 → 종료", getReaderName()); - return null; - } - - log.info("[{}] 총 {}건 데이터 조회됨. batchSize = {}", getReaderName(), allData.size(), batchSize); - } - - // 2) 이미 끝까지 읽었으면 종료 - if (currentBatchIndex >= allData.size()) { - log.info("[{}] 모든 배치 처리 완료", getReaderName()); - return null; - } - - // 3) 이번 배치의 end 계산 - int endIndex = Math.min(currentBatchIndex + batchSize, allData.size()); - - // 현재 배치의 IMO 번호 추출 (100개) - List batch = allData.subList(currentBatchIndex, endIndex); - - int currentBatchNumber = (currentBatchIndex / batchSize) + 1; - int totalBatches = (int) Math.ceil((double) allData.size() / batchSize); - - log.info("[{}] 배치 {}/{} 처리 중: {}건", getReaderName(), currentBatchNumber, totalBatches, batch.size()); - - currentBatchIndex = endIndex; - updateApiCallStats(totalBatches, currentBatchNumber); - return batch; - - } - - /** - * Query Parameter를 사용한 API 호출 - * - * @param startDate,stopDate - * @return API 응답 - */ - private List callApiWithBatch(String startDate, String stopDate){ - String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate; -// +"&lrno=" + lrno; - - log.debug("[{}] API 호출: {}", getReaderName(), url); - - return webClient.get() - .uri(url) - .retrieve() - .bodyToFlux(DarkActivityDto.class) - .collectList() - .block(); - } - - @Override - protected void afterFetch(List data) { - if (data == null) { - int totalBatches = (int) Math.ceil((double) allData.size() / batchSize); - log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); - /* log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", - getReaderName(), allData.size());*/ - } - } - -} diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/reader/DarkActivityReader.java b/src/main/java/com/snp/batch/jobs/movement/batch/reader/DarkActivityReader.java deleted file mode 100644 index af2cab9..0000000 --- a/src/main/java/com/snp/batch/jobs/movement/batch/reader/DarkActivityReader.java +++ /dev/null @@ -1,212 +0,0 @@ -package com.snp.batch.jobs.movement.batch.reader; - -import com.snp.batch.common.batch.reader.BaseApiReader; -import com.snp.batch.jobs.movement.batch.dto.DarkActivityDto; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.web.reactive.function.client.WebClient; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -/** - * 선박 상세 정보 Reader (v2.0 - Chunk 기반) - * - * 기능: - * 1. ship_data 테이블에서 IMO 번호 전체 조회 (최초 1회) - * 2. IMO 번호를 100개씩 분할하여 배치 단위로 처리 - * 3. fetchNextBatch() 호출 시마다 100개씩 API 호출 - * 4. Spring Batch가 100건씩 Process → Write 수행 - * - * Chunk 처리 흐름: - * - beforeFetch() → IMO 전체 조회 (1회) - * - fetchNextBatch() → 100개 IMO로 API 호출 (1,718회) - * - read() → 1건씩 반환 (100번) - * - Processor/Writer → 100건 처리 - * - 반복... (1,718번의 Chunk) - * - * 기존 방식과의 차이: - * - 기존: 17만건 전체 메모리 로드 → Process → Write - * - 신규: 100건씩 로드 → Process → Write (Chunk 1,718회) - */ -@Slf4j -@StepScope -public class DarkActivityReader extends BaseApiReader { - - private final JdbcTemplate jdbcTemplate; - - // 배치 처리 상태 - private List allImoNumbers; - // DB 해시값을 저장할 맵 - private Map dbMasterHashes; - private int currentBatchIndex = 0; - private final int batchSize = 5; - - // @Value("#{jobParameters['startDate']}") -// private String startDate; - private String startDate = "2025-01-01"; - - // @Value("#{jobParameters['stopDate']}") -// private String stopDate; - private String stopDate = "2025-12-31"; - - public DarkActivityReader(WebClient webClient, JdbcTemplate jdbcTemplate ) { - super(webClient); - this.jdbcTemplate = jdbcTemplate; - enableChunkMode(); // ✨ Chunk 모드 활성화 - } - - @Override - protected String getReaderName() { - return "DarkActivityReader"; - } - - @Override - protected void resetCustomState() { - this.currentBatchIndex = 0; - this.allImoNumbers = null; - this.dbMasterHashes = null; - } - - @Override - protected String getApiPath() { - return "/Movements/DarkActivity"; - } - - @Override - protected String getApiBaseUrl() { - return "https://webservices.maritime.spglobal.com"; - } - - private static final String GET_ALL_IMO_QUERY = - "SELECT imo_number FROM ship_data ORDER BY id"; -// "SELECT imo_number FROM snp_data.ship_data where imo_number > (select max(imo) from snp_data.t_darkactivity) ORDER BY imo_number"; - - /** - * 최초 1회만 실행: ship_data 테이블에서 IMO 번호 전체 조회 - */ - @Override - protected void beforeFetch() { - // 전처리 과정 - // Step 1. IMO 전체 번호 조회 - log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); - - allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); - int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); - - log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); - log.info("[{}] {}개씩 배치로 분할하여 API 호출 예정", getReaderName(), batchSize); - log.info("[{}] 예상 배치 수: {} 개", getReaderName(), totalBatches); - - // API 통계 초기화 - updateApiCallStats(totalBatches, 0); - } - - /** - * ✨ Chunk 기반 핵심 메서드: 다음 100개 배치를 조회하여 반환 - * - * Spring Batch가 100건씩 read() 호출 완료 후 이 메서드 재호출 - * - * @return 다음 배치 100건 (더 이상 없으면 null) - */ - @Override - protected List fetchNextBatch() throws Exception { - - // 모든 배치 처리 완료 확인 - if (allImoNumbers == null || currentBatchIndex >= allImoNumbers.size()) { - return null; // Job 종료 - } - - // 현재 배치의 시작/끝 인덱스 계산 - int startIndex = currentBatchIndex; - int endIndex = Math.min(currentBatchIndex + batchSize, allImoNumbers.size()); - - // 현재 배치의 IMO 번호 추출 (100개) - List currentBatch = allImoNumbers.subList(startIndex, endIndex); - - int currentBatchNumber = (currentBatchIndex / batchSize) + 1; - int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); - - log.info("[{}] 배치 {}/{} 처리 중 (IMO {} 개)...", - getReaderName(), currentBatchNumber, totalBatches, currentBatch.size()); - - try { - // IMO 번호를 쉼표로 연결 (예: "1000019,1000021,1000033,...") - String imoParam = String.join(",", currentBatch); - // API 호출 - List response = callApiWithBatch(imoParam); - - // 다음 배치로 인덱스 이동 - currentBatchIndex = endIndex; - - - // 응답 처리 - if (response != null ) { - List darkActivityList = response; - log.info("[{}] 배치 {}/{} 완료: {} 건 조회", - getReaderName(), currentBatchNumber, totalBatches, darkActivityList.size()); - - // API 호출 통계 업데이트 - updateApiCallStats(totalBatches, currentBatchNumber); - - // API 과부하 방지 (다음 배치 전 0.5초 대기) - if (currentBatchIndex < allImoNumbers.size()) { - Thread.sleep(500); - } - - return darkActivityList; - - } else { - log.warn("[{}] 배치 {}/{} 응답 없음", - getReaderName(), currentBatchNumber, totalBatches); - - // API 호출 통계 업데이트 (실패도 카운트) - updateApiCallStats(totalBatches, currentBatchNumber); - - return Collections.emptyList(); - } - - } catch (Exception e) { - log.error("[{}] 배치 {}/{} 처리 중 오류: {}", - getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e); - - // 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용) - currentBatchIndex = endIndex; - - // 빈 리스트 반환 (Job 계속 진행) - return Collections.emptyList(); - } - } - - /** - * Query Parameter를 사용한 API 호출 - * - * @param lrno 쉼표로 연결된 IMO 번호 (예: "1000019,1000021,...") - * @return API 응답 - */ - private List callApiWithBatch(String lrno) { - String url = getApiPath() + "?startDate=" + startDate +"&stopDate="+stopDate+"&lrno=" + lrno; - - log.debug("[{}] API 호출: {}", getReaderName(), url); - - return webClient.get() - .uri(url) - .retrieve() - .bodyToFlux(DarkActivityDto.class) - .collectList() - .block(); - } - - @Override - protected void afterFetch(List data) { - if (data == null) { - int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); - log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches); - log.info("[{}] 총 {} 개의 IMO 번호에 대한 API 호출 종료", - getReaderName(), allImoNumbers.size()); - } - } - -} diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/repository/AnchorageCallsRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/movement/batch/repository/AnchorageCallsRepositoryImpl.java index a0e74ce..631fe6a 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/repository/AnchorageCallsRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/repository/AnchorageCallsRepositoryImpl.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.jobs.movement.batch.entity.AnchorageCallsEntity; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -18,13 +19,25 @@ import java.util.List; public class AnchorageCallsRepositoryImpl extends BaseJdbcRepository implements AnchorageCallsRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.movements-001}") + private String tableName; + public AnchorageCallsRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override - protected String getTableName() { - return "t_snp_data.t_anchoragecall"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -41,25 +54,25 @@ public class AnchorageCallsRepositoryImpl extends BaseJdbcRepository implements BerthCallsRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.movements-002}") + private String tableName; + public BerthCallsRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override - protected String getTableName() { - return "t_snp_data.t_berthcall"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -41,25 +54,25 @@ public class BerthCallsRepositoryImpl extends BaseJdbcRepository implements CurrentlyAtRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.movements-003}") + private String tableName; + public CurrentlyAtRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override - protected String getTableName() { - return "t_snp_data.t_currentlyat"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -41,28 +54,28 @@ public class CurrentlyAtRepositoryImpl extends BaseJdbcRepository entities); -} diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/repository/DarkActivityRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/movement/batch/repository/DarkActivityRepositoryImpl.java deleted file mode 100644 index f255d60..0000000 --- a/src/main/java/com/snp/batch/jobs/movement/batch/repository/DarkActivityRepositoryImpl.java +++ /dev/null @@ -1,187 +0,0 @@ -package com.snp.batch.jobs.movement.batch.repository; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.snp.batch.common.batch.repository.BaseJdbcRepository; -import com.snp.batch.jobs.movement.batch.entity.DarkActivityEntity; -import lombok.extern.slf4j.Slf4j; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.stereotype.Repository; - -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.util.List; - -/** - * 선박 상세 정보 Repository 구현체 - * BaseJdbcRepository를 상속하여 JDBC 기반 CRUD 구현 - */ -@Slf4j -@Repository("") -public class DarkActivityRepositoryImpl extends BaseJdbcRepository - implements DarkActivityRepository { - - public DarkActivityRepositoryImpl(JdbcTemplate jdbcTemplate) { - super(jdbcTemplate); - } - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - @Override - protected String getTableName() { - return "new_snp.t_darkactivity"; -// return "snp_data.t_darkactivity"; - } - - @Override - protected String getEntityName() { - return "DarkActivity"; - } - - @Override - protected String extractId(DarkActivityEntity entity) { - return entity.getImolRorIHSNumber(); - } - - @Override - public String getInsertSql() { -// return """ -// INSERT INTO snp_data.t_darkactivity( - return """ - INSERT INTO new_snp.t_darkactivity( - imo, - mvmn_type, - mvmn_dt, - fclty_id, - fclty_nm, - fclty_type, - lwrnk_fclty_id, - lwrnk_fclty_nm, - lwrnk_fclty_type, - ntn_cd, - ntn_nm, - draft, - lat, - lon, - evt_start_dt, - lcinfo - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT (imo, mvmn_type, mvmn_dt, fclty_id) - DO UPDATE SET - mvmn_type = EXCLUDED.mvmn_type, - mvmn_dt = EXCLUDED.mvmn_dt, - fclty_id = EXCLUDED.fclty_id, - fclty_nm = EXCLUDED.fclty_nm, - fclty_type = EXCLUDED.fclty_type, - lwrnk_fclty_id = EXCLUDED.lwrnk_fclty_id, - lwrnk_fclty_nm = EXCLUDED.lwrnk_fclty_nm, - lwrnk_fclty_type = EXCLUDED.lwrnk_fclty_type, - ntn_cd = EXCLUDED.ntn_cd, - ntn_nm = EXCLUDED.ntn_nm, - draft = EXCLUDED.draft, - lat = EXCLUDED.lat, - lon = EXCLUDED.lon, - evt_start_dt = EXCLUDED.evt_start_dt, - lcinfo = EXCLUDED.lcinfo - """; - } - - @Override - protected String getUpdateSql() { - return null; - } - - @Override - protected void setInsertParameters(PreparedStatement ps, DarkActivityEntity e) throws Exception { - int i = 1; - ps.setString(i++, e.getImolRorIHSNumber()); // imo - ps.setString(i++, e.getMovementType()); // mvmn_type - ps.setTimestamp(i++, e.getMovementDate() != null ? Timestamp.valueOf(e.getMovementDate()) : null); // mvmn_dt - ps.setObject(i++, e.getFacilityId()); // fclty_id - ps.setString(i++, e.getFacilityName()); // fclty_nm - ps.setString(i++, e.getFacilityType()); // fclty_type - ps.setObject(i++, e.getSubFacilityId()); //lwrnk_fclty_id - ps.setString(i++, e.getSubFacilityName()); // lwrnk_fclty_nm - ps.setString(i++, e.getSubFacilityType()); //lwrnk_fclty_type - ps.setString(i++, e.getCountryCode()); // ntn_cd - ps.setString(i++, e.getCountryName()); // ntn_nm - setDoubleOrNull(ps, i++, e.getDraught()); // draft - setDoubleOrNull(ps, i++, e.getLatitude()); // lat - setDoubleOrNull(ps, i++, e.getLongitude());// lon - ps.setTimestamp(i++, e.getEventStartDate() != null ? Timestamp.valueOf(e.getEventStartDate()) : null); // evt_start_dt - - if (e.getPosition() != null) { - ps.setObject(i++, OBJECT_MAPPER.writeValueAsString(e.getPosition()), java.sql.Types.OTHER); // lcinfo (jsonb) - } else { - ps.setNull(i++, java.sql.Types.OTHER); - } - } - - private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception { - if (value != null) { - ps.setDouble(index, value); - } else { - // java.sql.Types.DOUBLE을 사용하여 명시적으로 SQL NULL을 설정 - ps.setNull(index, java.sql.Types.DOUBLE); - } - } - - @Override - protected void setUpdateParameters(PreparedStatement ps, DarkActivityEntity entity) throws Exception { - - } - - @Override - protected RowMapper getRowMapper() { - return null; - } - - @Override - public void saveAll(List entities) { - if (entities == null || entities.isEmpty()) return; - - log.info("DarkActivity 저장 시작 = {}건", entities.size()); - batchInsert(entities); - - } - - - /** - * ShipDetailEntity RowMapper - */ - private static class DarkActivityRowMapper implements RowMapper { - @Override - public DarkActivityEntity mapRow(ResultSet rs, int rowNum) throws SQLException { - DarkActivityEntity entity = DarkActivityEntity.builder() - .id(rs.getLong("id")) - .imolRorIHSNumber(rs.getString("imolRorIHSNumber")) - .facilityId(rs.getObject("facilityId", Integer.class)) - .facilityName(rs.getString("facilityName")) - .facilityType(rs.getString("facilityType")) - .countryCode(rs.getString("countryCode")) - .countryName(rs.getString("countryName")) - .draught(rs.getObject("draught", Double.class)) - .latitude(rs.getObject("latitude", Double.class)) - .longitude(rs.getObject("longitude", Double.class)) - .position(parseJson(rs.getString("position"))) - .build(); - - Timestamp movementDate = rs.getTimestamp("movementDate"); - if (movementDate != null) { - entity.setMovementDate(movementDate.toLocalDateTime()); - } - - return entity; - } - - private JsonNode parseJson(String json) { - try { - if (json == null) return null; - return new ObjectMapper().readTree(json); - } catch (Exception e) { - throw new RuntimeException("JSON 파싱 오류: " + json); - } - } - } -} diff --git a/src/main/java/com/snp/batch/jobs/movement/batch/repository/DestinationRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/movement/batch/repository/DestinationRepositoryImpl.java index 8aae798..e766a67 100644 --- a/src/main/java/com/snp/batch/jobs/movement/batch/repository/DestinationRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/movement/batch/repository/DestinationRepositoryImpl.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.jobs.movement.batch.entity.DestinationEntity; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -18,13 +19,25 @@ import java.util.List; public class DestinationRepositoryImpl extends BaseJdbcRepository implements DestinationRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.movements-004}") + private String tableName; + public DestinationRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override - protected String getTableName() { - return "t_snp_data.t_destination"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -39,23 +52,21 @@ public class DestinationRepositoryImpl extends BaseJdbcRepository implements PortCallsRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.movements-005}") + private String tableName; + public PortCallsRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override - protected String getTableName() { - return "t_snp_data.t_ship_stpov_info"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -41,28 +54,28 @@ public class PortCallsRepositoryImpl extends BaseJdbcRepository implements StsOperationRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.movements-006}") + private String tableName; + public StsOperationRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override - protected String getTableName() { - return "t_snp_data.t_stsoperation"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -41,26 +54,26 @@ public class StsOperationRepositoryImpl extends BaseJdbcRepository implements TerminalCallsRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.movements-007}") + private String tableName; + public TerminalCallsRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override - protected String getTableName() { - return "t_snp_data.t_terminalcall"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -39,32 +52,30 @@ public class TerminalCallsRepositoryImpl extends BaseJdbcRepository implements TransitsRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.movements-008}") + private String tableName; + public TransitsRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override - protected String getTableName() { - return "t_snp_data.t_transit"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -41,13 +54,13 @@ public class TransitsRepositoryImpl extends BaseJdbcRepository { - - private final DarkActivityRepository darkActivityRepository; - - - public DarkActivityWriter(DarkActivityRepository darkActivityRepository) { - super("DarkActivity"); - this.darkActivityRepository = darkActivityRepository; - } - - @Override - protected void writeItems(List items) throws Exception { - - if (items.isEmpty()) { return; } - - darkActivityRepository.saveAll(items); - log.info("DarkActivity 데이터 저장: {} 건", items.size()); - } - -} diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java index 02a118a..f8afa4e 100644 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/pscInspection/batch/config/PscInspectionJobConfig.java @@ -43,9 +43,13 @@ public class PscInspectionJobConfig extends BaseMultiStepJobConfig pscDefects; - @JsonProperty("PSCCertificates") - private List pscCertificates; - @JsonProperty("PSCAllCertificates") private List pscAllCertificates; } diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/entity/PscCertificateEntity.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/entity/PscCertificateEntity.java deleted file mode 100644 index d360916..0000000 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/entity/PscCertificateEntity.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.snp.batch.jobs.pscInspection.batch.entity; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; - -import java.time.LocalDateTime; - -@Data -@SuperBuilder -@NoArgsConstructor -@AllArgsConstructor -public class PscCertificateEntity { - - private String certificateId; - - private String typeId; - private String dataSetVersion; - - private String certificateTitle; - private String certificateTitleCode; - - private String classSocOfIssuer; - - private LocalDateTime expiryDate; - private String inspectionId; - private LocalDateTime issueDate; - - private String issuingAuthority; - private String issuingAuthorityCode; - - private LocalDateTime lastSurveyDate; - private String latestSurveyPlace; - private String latestSurveyPlaceCode; - - private String lrno; - - private String otherIssuingAuthority; - private String otherSurveyAuthority; - - private String surveyAuthority; - private String surveyAuthorityCode; - private String surveyAuthorityType; -} diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/entity/PscInspectionEntity.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/entity/PscInspectionEntity.java index d0ac126..bf54ad8 100644 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/entity/PscInspectionEntity.java +++ b/src/main/java/com/snp/batch/jobs/pscInspection/batch/entity/PscInspectionEntity.java @@ -59,6 +59,5 @@ public class PscInspectionEntity extends BaseEntity { private String yearOfBuild; private List defects; - private List certificates; private List allCertificates; } diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/processor/PscInspectionProcessor.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/processor/PscInspectionProcessor.java index f451818..f49f7c1 100644 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/processor/PscInspectionProcessor.java +++ b/src/main/java/com/snp/batch/jobs/pscInspection/batch/processor/PscInspectionProcessor.java @@ -3,11 +3,9 @@ package com.snp.batch.jobs.pscInspection.batch.processor; import com.fasterxml.jackson.databind.ObjectMapper; import com.snp.batch.common.batch.processor.BaseProcessor; import com.snp.batch.jobs.pscInspection.batch.dto.PscAllCertificateDto; -import com.snp.batch.jobs.pscInspection.batch.dto.PscCertificateDto; import com.snp.batch.jobs.pscInspection.batch.dto.PscDefectDto; import com.snp.batch.jobs.pscInspection.batch.dto.PscInspectionDto; import com.snp.batch.jobs.pscInspection.batch.entity.PscAllCertificateEntity; -import com.snp.batch.jobs.pscInspection.batch.entity.PscCertificateEntity; import com.snp.batch.jobs.pscInspection.batch.entity.PscDefectEntity; import com.snp.batch.jobs.pscInspection.batch.entity.PscInspectionEntity; import lombok.extern.slf4j.Slf4j; @@ -85,7 +83,6 @@ public class PscInspectionProcessor extends BaseProcessor convertCertificateDtos(List dtos) { - if (dtos == null || dtos.isEmpty()) return List.of(); - - return dtos.stream() - .map(dto -> PscCertificateEntity.builder() - .certificateId(dto.getCertificateId()) - .typeId(dto.getTypeId()) - .dataSetVersion(dto.getDataSetVersion() != null ? dto.getDataSetVersion().getDataSetVersion() : null) - .certificateTitle(dto.getCertificateTitle()) - .certificateTitleCode(dto.getCertificateTitleCode()) - .classSocOfIssuer(dto.getClassSocOfIssuer()) - .issueDate(dto.getIssueDate() != null ? parseFlexible(dto.getIssueDate()) : null) - .expiryDate(dto.getExpiryDate() != null ? parseFlexible(dto.getExpiryDate()) : null) - .inspectionId(dto.getInspectionId()) - .issuingAuthority(dto.getIssuingAuthority()) - .issuingAuthorityCode(dto.getIssuingAuthorityCode()) - .lastSurveyDate(dto.getLastSurveyDate() != null ? parseFlexible(dto.getLastSurveyDate()) : null) - .latestSurveyPlace(dto.getLatestSurveyPlace()) - .latestSurveyPlaceCode(dto.getLatestSurveyPlaceCode()) - .lrno(dto.getLrno()) - .otherIssuingAuthority(dto.getOtherIssuingAuthority()) - .otherSurveyAuthority(dto.getOtherSurveyAuthority()) - .surveyAuthority(dto.getSurveyAuthority()) - .surveyAuthorityCode(dto.getSurveyAuthorityCode()) - .surveyAuthorityType(dto.getSurveyAuthorityType()) - .build()) - .collect(Collectors.toList()); - } public static List convertAllCertificateDtos(List dtos) { if (dtos == null || dtos.isEmpty()) return List.of(); diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscAllCertificateRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscAllCertificateRepositoryImpl.java index d715d4d..9fb897f 100644 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscAllCertificateRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscAllCertificateRepositoryImpl.java @@ -2,8 +2,8 @@ package com.snp.batch.jobs.pscInspection.batch.repository; import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.jobs.pscInspection.batch.entity.PscAllCertificateEntity; -import com.snp.batch.jobs.pscInspection.batch.entity.PscCertificateEntity; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -17,13 +17,25 @@ import java.util.List; @Repository public class PscAllCertificateRepositoryImpl extends BaseJdbcRepository implements PscAllCertificateRepository { + + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.psc-003}") + private String tableName; + public PscAllCertificateRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "new_snp.psc_all_certificate"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -44,28 +56,28 @@ public class PscAllCertificateRepositoryImpl extends BaseJdbcRepository certificates); -} diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscCertificateRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscCertificateRepositoryImpl.java deleted file mode 100644 index 3edf86c..0000000 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscCertificateRepositoryImpl.java +++ /dev/null @@ -1,139 +0,0 @@ -package com.snp.batch.jobs.pscInspection.batch.repository; - -import com.snp.batch.common.batch.repository.BaseJdbcRepository; -import com.snp.batch.jobs.pscInspection.batch.entity.PscCertificateEntity; -import com.snp.batch.jobs.pscInspection.batch.entity.PscDefectEntity; -import lombok.extern.slf4j.Slf4j; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.stereotype.Repository; - -import java.sql.PreparedStatement; -import java.sql.Timestamp; -import java.util.List; - -@Slf4j -@Repository -public class PscCertificateRepositoryImpl extends BaseJdbcRepository - implements PscCertificateRepository { - public PscCertificateRepositoryImpl(JdbcTemplate jdbcTemplate) { - super(jdbcTemplate); - } - - @Override - protected String getTableName() { - return "new_snp.psc_certificate"; - } - - @Override - protected RowMapper getRowMapper() { - return null; - } - - @Override - protected String getEntityName() { - return "PscCertificate"; - } - - @Override - protected String extractId(PscCertificateEntity entity) { - return entity.getCertificateId(); - } - - @Override - public String getInsertSql() { - return """ - INSERT INTO new_snp.psc_certificate( - certificate_id, - type_id, - data_set_version, - certificate_title, - certificate_title_code, - class_soc_of_issuer, - expiry_date, - inspection_id, - issue_date, - issuing_authority, - issuing_authority_code, - last_survey_date, - latest_survey_place, - latest_survey_place_code, - lrno, - other_issuing_authority, - other_survey_authority, - survey_authority, - survey_authority_code, - survey_authority_type - ) VALUES ( - ?,?,?,?,?,?,?,?,?,?, - ?,?,?,?,?,?,?,?,?,? - ) - ON CONFLICT (certificate_id) - DO UPDATE SET - type_id = EXCLUDED.type_id, - data_set_version = EXCLUDED.data_set_version, - certificate_title = EXCLUDED.certificate_title, - certificate_title_code = EXCLUDED.certificate_title_code, - class_soc_of_issuer = EXCLUDED.class_soc_of_issuer, - expiry_date = EXCLUDED.expiry_date, - inspection_id = EXCLUDED.inspection_id, - issue_date = EXCLUDED.issue_date, - issuing_authority = EXCLUDED.issuing_authority, - issuing_authority_code = EXCLUDED.issuing_authority_code, - last_survey_date = EXCLUDED.last_survey_date, - latest_survey_place = EXCLUDED.latest_survey_place, - latest_survey_place_code = EXCLUDED.latest_survey_place_code, - lrno = EXCLUDED.lrno, - other_issuing_authority = EXCLUDED.other_issuing_authority, - other_survey_authority = EXCLUDED.other_survey_authority, - survey_authority = EXCLUDED.survey_authority, - survey_authority_code = EXCLUDED.survey_authority_code, - survey_authority_type = EXCLUDED.survey_authority_type - """; - - } - - @Override - protected String getUpdateSql() { - return null; - } - - @Override - protected void setInsertParameters(PreparedStatement ps, PscCertificateEntity e) throws Exception { - int i = 1; - - ps.setString(i++, e.getCertificateId()); - ps.setString(i++, e.getTypeId()); - ps.setString(i++, e.getDataSetVersion()); - ps.setString(i++, e.getCertificateTitle()); - ps.setString(i++, e.getCertificateTitleCode()); - ps.setString(i++, e.getClassSocOfIssuer()); - ps.setTimestamp(i++, e.getExpiryDate() != null ? Timestamp.valueOf(e.getExpiryDate()) : null); - ps.setString(i++, e.getInspectionId()); - ps.setTimestamp(i++, e.getIssueDate() != null ? Timestamp.valueOf(e.getIssueDate()) : null); - ps.setString(i++, e.getIssuingAuthority()); - ps.setString(i++, e.getIssuingAuthorityCode()); - ps.setTimestamp(i++, e.getLastSurveyDate() != null ? Timestamp.valueOf(e.getLastSurveyDate()) : null); - ps.setString(i++, e.getLatestSurveyPlace()); - ps.setString(i++, e.getLatestSurveyPlaceCode()); - ps.setString(i++, e.getLrno()); - ps.setString(i++, e.getOtherIssuingAuthority()); - ps.setString(i++, e.getOtherSurveyAuthority()); - ps.setString(i++, e.getSurveyAuthority()); - ps.setString(i++, e.getSurveyAuthorityCode()); - ps.setString(i++, e.getSurveyAuthorityType()); - } - - @Override - protected void setUpdateParameters(PreparedStatement ps, PscCertificateEntity entity) throws Exception { - - } - - @Override - public void saveCertificates(List entities) { - if (entities == null || entities.isEmpty()) return; -// log.info("PSC Certificate 저장 시작 = {}건", entities.size()); - batchInsert(entities); - } - -} diff --git a/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscDefectRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscDefectRepositoryImpl.java index c4f557a..59daa26 100644 --- a/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscDefectRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/pscInspection/batch/repository/PscDefectRepositoryImpl.java @@ -4,6 +4,7 @@ import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.jobs.pscInspection.batch.entity.PscDefectEntity; import com.snp.batch.jobs.pscInspection.batch.entity.PscInspectionEntity; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -17,13 +18,25 @@ import java.util.List; @Repository public class PscDefectRepositoryImpl extends BaseJdbcRepository implements PscDefectRepository { + + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.psc-002}") + private String tableName; + public PscDefectRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "new_snp.psc_detail"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -33,7 +46,7 @@ public class PscDefectRepositoryImpl extends BaseJdbcRepository implements PscInspectionRepository{ + + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.psc-001}") + private String tableName; + public PscInspectionRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "new_snp.psc_detail"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -38,38 +51,38 @@ public class PscInspectionRepositoryImpl extends BaseJdbcRepository { private final PscInspectionRepository pscInspectionRepository; private final PscDefectRepository pscDefectRepository; - private final PscCertificateRepository pscCertificateRepository; private final PscAllCertificateRepository pscAllCertificateRepository; public PscInspectionWriter(PscInspectionRepository pscInspectionRepository, PscDefectRepository pscDefectRepository, - PscCertificateRepository pscCertificateRepository, PscAllCertificateRepository pscAllCertificateRepository) { super("PscInspection"); this.pscInspectionRepository = pscInspectionRepository; this.pscDefectRepository = pscDefectRepository; - this.pscCertificateRepository = pscCertificateRepository; this.pscAllCertificateRepository = pscAllCertificateRepository; } @@ -40,16 +36,14 @@ public class PscInspectionWriter extends BaseWriter { for (PscInspectionEntity entity : items) { pscInspectionRepository.saveAll(List.of(entity)); pscDefectRepository.saveDefects(entity.getDefects()); - pscCertificateRepository.saveCertificates(entity.getCertificates()); pscAllCertificateRepository.saveAllCertificates(entity.getAllCertificates()); // 효율적으로 로그 int defectCount = entity.getDefects() != null ? entity.getDefects().size() : 0; - int certificateCount = entity.getCertificates() != null ? entity.getCertificates().size() : 0; int allCertificateCount = entity.getAllCertificates() != null ? entity.getAllCertificates().size() : 0; - log.info("Inspection ID: {}, Defects: {}, Certificates: {}, AllCertificates: {}", - entity.getInspectionId(), defectCount, certificateCount, allCertificateCount); + log.info("Inspection ID: {}, Defects: {}, AllCertificates: {}", + entity.getInspectionId(), defectCount, allCertificateCount); } } } diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportJobConfig.java b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportJobConfig.java index 4e04f9d..be3665f 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportJobConfig.java @@ -14,6 +14,7 @@ import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -30,6 +31,9 @@ public class RiskImportJobConfig extends BaseJobConfig { private final RiskDataWriter riskDataWriter; + @Value("${app.batch.target-schema.name}") + private String targetSchema; + @Override protected int getChunkSize() { return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정 @@ -60,7 +64,7 @@ public class RiskImportJobConfig extends BaseJobConfig { @Override protected ItemReader createReader() { - return new RiskDataReader(maritimeServiceApiWebClient, jdbcTemplate); + return new RiskDataReader(maritimeServiceApiWebClient, jdbcTemplate, targetSchema); } @Override diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java index 50d074e..5189e96 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/config/RiskImportRangeJobConfig.java @@ -42,9 +42,12 @@ public class RiskImportRangeJobConfig extends BaseMultiStepJobConfig { // 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복) private final JdbcTemplate jdbcTemplate; + private final String targetSchema; private List allImoNumbers; private int currentBatchIndex = 0; private final int batchSize = 100; - public RiskDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) { + public RiskDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, String targetSchema) { super(webClient); this.jdbcTemplate = jdbcTemplate; + this.targetSchema = targetSchema; enableChunkMode(); // ✨ Chunk 모드 활성화 } @@ -46,20 +48,18 @@ public class RiskDataReader extends BaseApiReader { return "/RiskAndCompliance/RisksByImos"; } -// private String getTargetTable(){ -// return "snp_data.core20"; -// } private String getTargetTable(){ - return "snp_data.ship_data"; + return targetSchema + ".ship_data"; + } + + private String getImoQuery() { + return "select imo_number as ihslrorimoshipno from " + getTargetTable() + " order by imo_number"; } - private String GET_CORE_IMO_LIST = -// "SELECT ihslrorimoshipno FROM " + getTargetTable() + " ORDER BY ihslrorimoshipno"; - "select imo_number as ihslrorimoshipno from snp_data.ship_data order by imo_number"; @Override protected void beforeFetch(){ log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName()); - allImoNumbers = jdbcTemplate.queryForList(GET_CORE_IMO_LIST, String.class); + allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class); int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); diff --git a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java index 58f3637..42d431d 100644 --- a/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/risk/batch/repository/RiskRepositoryImpl.java @@ -3,6 +3,7 @@ package com.snp.batch.jobs.risk.batch.repository; import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.jobs.risk.batch.entity.RiskEntity; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -15,13 +16,24 @@ import java.util.List; @Repository("riskRepository") public class RiskRepositoryImpl extends BaseJdbcRepository implements RiskRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.risk-compliance-001}") + private String tableName; + public RiskRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "t_snp_data.risk"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -43,26 +55,26 @@ public class RiskRepositoryImpl extends BaseJdbcRepository imp protected String getUpdateSql() { return """ INSERT INTO %s( - lrno, lastupdated, - riskdatamaintained, dayssincelastseenonais, daysunderais, imocorrectonais, sailingundername, - anomalousmessagesfrommmsi, mostrecentdarkactivity, portcalls, portrisk, stsoperations, - driftinghighseas, riskevents, flagchanges, flagparismouperformance, flagtokyomoupeformance, - flaguscgmouperformance, uscgqualship21, timesincepscinspection, pscinspections, pscdefects, - pscdetentions, currentsmccertificate, docchanges, currentclass, classstatuschanges, - pandicoverage, namechanges, gbochanges, ageofship, iuufishingviolation, - draughtchanges, mostrecentsanctionedportcall, singleshipoperation, fleetsafety, fleetpsc, - specialsurveyoverdue, ownerunknown, russianportcall, russianownerregistration, russiansts, - job_execution_id, created_by + imo_no, last_mdfcn_dt, + risk_data_maint, ais_notrcv_elps_days, ais_lwrnk_days, ais_up_imo_desc, othr_ship_nm_voy_yn, + mmsi_anom_message, recent_dark_actv, port_prtcll, port_risk, sts_job, + drift_chg, risk_event, ntnlty_chg, ntnlty_prs_mou_perf, ntnlty_tky_mou_perf, + ntnlty_uscg_mou_perf, uscg_excl_ship_cert, psc_inspection_elps_hr, psc_inspection, psc_defect, + psc_detained, now_smgrc_evdc, docc_chg, now_clfic, clfic_status_chg, + pni_insrnc, ship_nm_chg, gbo_chg, vslage, ilgl_fshr_viol, + draft_chg, recent_sanction_prtcll, sngl_ship_voy, fltsfty, flt_psc, + spc_inspection_ovdue, ownr_unk, rss_port_call, rss_ownr_reg, rss_sts, + job_execution_id, creatr_id ) VALUES ( - ?, ?::timestamptz, - ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, + ?, ?::timestamptz, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ); diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImportJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImportJobConfig.java index 42db89b..3811507 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImportJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailImportJobConfig.java @@ -15,6 +15,7 @@ import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; @@ -51,7 +52,10 @@ public class ShipDetailImportJobConfig extends BaseJobConfig createReader() { // 타입 변경 - return new ShipDetailDataReader(maritimeApiWebClient, jdbcTemplate, objectMapper); + protected ItemReader createReader() { + return new ShipDetailDataReader(maritimeApiWebClient, jdbcTemplate, objectMapper, targetSchema); } @Override diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailSyncJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailSyncJobConfig.java index b671162..e4aae3b 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailSyncJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailSyncJobConfig.java @@ -13,6 +13,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.beans.factory.annotation.Value; + import java.util.Arrays; import java.util.List; @@ -23,6 +25,9 @@ public class ShipDetailSyncJobConfig { private final PlatformTransactionManager transactionManager; private final JdbcTemplate jdbcTemplate; + @Value("${app.batch.target-schema.name}") + private String targetSchema; + // API 키 정의 (배치 로그 관리용) protected String getApiKey() { return "SHIP_DETAIL_SYNC_API"; @@ -31,8 +36,8 @@ public class ShipDetailSyncJobConfig { // 마지막 실행 일자 업데이트 SQL protected String getBatchUpdateSql() { return String.format( - "UPDATE SNP_DATA.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", - getApiKey() + "UPDATE %s.BATCH_LAST_EXECUTION SET LAST_SUCCESS_DATE = NOW(), UPDATED_AT = NOW() WHERE API_KEY = '%s'", + targetSchema, getApiKey() ); } @@ -80,7 +85,8 @@ public class ShipDetailSyncJobConfig { log.info(">>>>> SHIP MASTER & CORE20 동기화 프로시저 호출 시작"); // PostgreSQL 기준 프로시저 호출 (CALL) - jdbcTemplate.execute("CALL snp_data.proc_sync_ship_master_and_core()"); + String procedureCall = String.format("CALL %s.proc_sync_ship_master_and_core()", targetSchema); + jdbcTemplate.execute(procedureCall); log.info(">>>>> SHIP MASTER & CORE20 동기화 프로시저 호출 완료"); return RepeatStatus.FINISHED; @@ -106,7 +112,8 @@ public class ShipDetailSyncJobConfig { try { log.info("테이블 동기화 중: {}", tableName); // 이전에 생성한 동적 프로시저 호출 - jdbcTemplate.execute("CALL snp_data.proc_sync_ship_detail('" + tableName + "')"); + String procedureCall = String.format("CALL %s.proc_sync_ship_detail('%s')", targetSchema, tableName); + jdbcTemplate.execute(procedureCall); } catch (Exception e) { log.error("테이블 동기화 실패: {}. 에러: {}", tableName, e.getMessage()); // 특정 테이블 실패 시 중단할지, 계속 진행할지에 따라 throw 여부 결정 diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java index 4156866..4862631 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/config/ShipDetailUpdateJobConfig.java @@ -44,9 +44,13 @@ public class ShipDetailUpdateJobConfig extends BaseMultiStepJobConfig { @@ -29,6 +31,9 @@ public class ShipLastPositionUpdateJobConfig extends BaseJobConfig createReader() { - return new ShipLastPositionDataReader(maritimeAisApiWebClient, jdbcTemplate); + return new ShipLastPositionDataReader(maritimeAisApiWebClient, jdbcTemplate, targetSchema); } @Override diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailDataReader.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailDataReader.java index 1f22405..11530c4 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailDataReader.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipDetailDataReader.java @@ -39,16 +39,18 @@ public class ShipDetailDataReader extends BaseApiReader { private final JdbcTemplate jdbcTemplate; private final ObjectMapper objectMapper; + private final String targetSchema; // 배치 처리 상태 private List allImoNumbers; private int currentBatchIndex = 0; private final int batchSize = 30; - public ShipDetailDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) { + public ShipDetailDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, ObjectMapper objectMapper, String targetSchema) { super(webClient); this.jdbcTemplate = jdbcTemplate; this.objectMapper = objectMapper; + this.targetSchema = targetSchema; enableChunkMode(); // ✨ Chunk 모드 활성화 } @@ -68,8 +70,12 @@ public class ShipDetailDataReader extends BaseApiReader { return "/MaritimeWCF/APSShipService.svc/RESTFul/GetShipsByIHSLRorIMONumbersAll"; } - private static final String GET_ALL_IMO_QUERY = - "select imo_number from t_snp_data.ship_data order by imo_number"; + /** + * IMO 번호 조회 쿼리 생성 (스키마 동적 적용) + */ + private String getImoQuery() { + return "select imo_no from " + targetSchema + ".tb_ship_default_info order by imo_no"; + } /** @@ -80,7 +86,7 @@ public class ShipDetailDataReader extends BaseApiReader { // Step 1. IMO 전체 번호 조회 log.info("[{}] ship_data 테이블에서 IMO 번호 조회 시작...", getReaderName()); - allImoNumbers = jdbcTemplate.queryForList(GET_ALL_IMO_QUERY, String.class); + allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class); int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); log.info("[{}] 총 {} 개의 IMO 번호 조회 완료", getReaderName(), allImoNumbers.size()); diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java index d63cbbb..e7ef0ca 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/reader/ShipLastPositionDataReader.java @@ -17,14 +17,16 @@ public class ShipLastPositionDataReader extends BaseApiReader // 3. Response Data -> Core20에 업데이트 (Chunk 단위로 반복) private final JdbcTemplate jdbcTemplate; + private final String targetSchema; private List allImoNumbers; private int currentBatchIndex = 0; private final int batchSize = 5000; - public ShipLastPositionDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) { + public ShipLastPositionDataReader(WebClient webClient, JdbcTemplate jdbcTemplate, String targetSchema) { super(webClient); this.jdbcTemplate = jdbcTemplate; + this.targetSchema = targetSchema; enableChunkMode(); // ✨ Chunk 모드 활성화 } @@ -45,16 +47,18 @@ public class ShipLastPositionDataReader extends BaseApiReader } private String getTargetTable(){ - return "new_snp.core20"; + return targetSchema + ".core20"; + } + + private String getImoQuery() { + return "SELECT lrno FROM " + getTargetTable() + " ORDER BY lrno"; } - private String GET_CORE_IMO_LIST = - "SELECT lrno FROM " + getTargetTable() + " ORDER BY lrno"; @Override protected void beforeFetch(){ log.info("[{}] Core20 테이블에서 IMO 번호 조회 시작...", getReaderName()); - allImoNumbers = jdbcTemplate.queryForList(GET_CORE_IMO_LIST, String.class); + allImoNumbers = jdbcTemplate.queryForList(getImoQuery(), String.class); int totalBatches = (int) Math.ceil((double) allImoNumbers.size() / batchSize); diff --git a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipDetailRepositoryImpl.java b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipDetailRepositoryImpl.java index 193588c..e70f8fb 100644 --- a/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipDetailRepositoryImpl.java +++ b/src/main/java/com/snp/batch/jobs/shipdetail/batch/repository/ShipDetailRepositoryImpl.java @@ -3,6 +3,7 @@ package com.snp.batch.jobs.shipdetail.batch.repository; import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.jobs.shipdetail.batch.entity.*; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Repository; @@ -21,13 +22,24 @@ import java.util.*; public class ShipDetailRepositoryImpl extends BaseJdbcRepository implements ShipDetailRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.ship-002}") + private String tableName; + public ShipDetailRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "t_snp_data.ship_detail_data"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -44,28 +56,28 @@ public class ShipDetailRepositoryImpl extends BaseJdbcRepository 0; } @@ -932,7 +944,7 @@ public class ShipDetailRepositoryImpl extends BaseJdbcRepository implements ShipHashRepository{ + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.ship-028}") + private String tableName; + public ShipHashRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "snp_data.ship_detail_hash_json"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -37,7 +49,7 @@ public class ShipHashRepositoryImpl extends BaseJdbcRepository implements ShipLastPositionRepository { + + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.ship-027}") + private String tableName; + public ShipLastPositionRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return null; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -42,7 +55,7 @@ public class ShipLastPositionRepositoryImpl extends BaseJdbcRepository implements ShipRepository { + @Value("${app.batch.target-schema.name}") + private String targetSchema; + + @Value("${app.batch.target-schema.tables.ship-001}") + private String tableName; + public ShipRepositoryImpl(JdbcTemplate jdbcTemplate) { super(jdbcTemplate); } @Override - protected String getTableName() { - return "t_snp_data.ship_data"; + protected String getTargetSchema() { + return targetSchema; + } + + @Override + protected String getSimpleTableName() { + return tableName; } @Override @@ -51,8 +63,8 @@ public class ShipRepositoryImpl extends BaseJdbcRepository imp protected String getUpdateSql() { return """ INSERT INTO %s( - imo_number, core_ship_ind, dataset_version, - job_execution_id, created_by + imo_no, core_ship_ind, dataset_ver, + job_execution_id, creatr_id ) VALUES (?, ?, ?, ?, ?) """.formatted(getTableName()); } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index b0edeb6..2cc93b6 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -22,13 +22,13 @@ spring: hibernate: dialect: org.hibernate.dialect.PostgreSQLDialect format_sql: true - default_schema: t_snp_data + default_schema: t_std_snp_data # Batch Configuration batch: jdbc: - table-prefix: "t_snp_data.batch_" - initialize-schema: never # Changed to 'never' as tables already exist + table-prefix: "t_std_snp_data.batch_" + initialize-schema: always # Changed to 'never' as tables already exist job: enabled: false # Prevent auto-run on startup @@ -49,7 +49,7 @@ spring: org.quartz.threadPool.threadCount: 10 org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate - org.quartz.jobStore.tablePrefix: t_snp_data.QRTZ_ + org.quartz.jobStore.tablePrefix: t_std_snp_data.QRTZ_ org.quartz.jobStore.isClustered: false org.quartz.jobStore.misfireThreshold: 60000 @@ -110,7 +110,7 @@ app: # Core20 캐시 테이블 설정 (환경별로 테이블/컬럼명이 다를 수 있음) core20: - schema: t_snp_data # 스키마명 + schema: t_std_snp_data # 스키마명 table: ship_detail_data # 테이블명 imo-column: ihslrorimoshipno # IMO/LRNO 컬럼명 (PK, NOT NULL) mmsi-column: maritimemobileserviceidentitymmsinumber # MMSI 컬럼명 (NULLABLE) @@ -119,7 +119,7 @@ app: partition: # 일별 파티션 테이블 목록 (네이밍: {table}_YYMMDD) daily-tables: - - schema: t_snp_data + - schema: t_std_snp_data table-name: ais_target partition-column: message_timestamp periods-ahead: 3 # 미리 생성할 일수 diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ca1c4b6..45e4cab 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -77,6 +77,59 @@ logging: app: batch: chunk-size: 1000 + target-schema: + name: t_std_snp_data + tables: + ship-001: tb_ship_default_info + ship-002: tb_ship_info_mst + ship-003: tb_ship_add_info + ship-004: tb_ship_bbctr_hstry + ship-005: tb_ship_idntf_info_hstry + ship-006: tb_ship_clfic_hstry + ship-007: tb_ship_company_rel + ship-008: tb_ship_crew_list + ship-009: tb_ship_dark_actv_idnty + ship-010: tb_ship_country_hstry + ship-011: tb_ship_group_revn_ownr_hstry + ship-012: tb_ship_ice_grd + ship-013: tb_ship_nm_chg_hstry + ship-014: tb_ship_operator_hstry + ship-015: tb_ship_ownr_hstry + ship-016: tb_ship_prtc_rpn_hstry + ship-017: tb_ship_sfty_mng_evdc_hstry + ship-018: tb_ship_mng_company_hstry + ship-019: tb_ship_sstrvsl_rel + ship-020: tb_ship_spc_fetr + ship-021: tb_ship_status_hstry + ship-022: tb_ship_cargo_capacity + ship-023: tb_ship_inspection_ymd + ship-024: tb_ship_inspection_ymd_hstry + ship-025: tb_ship_tech_mng_company_hstry + ship-026: tb_ship_thrstr_info + company-001: tb_company_dtl_info + event-001: tb_event_mst + event-002: tb_event_cargo + event-003: tb_event_humn_acdnt + event-004: tb_event_rel + facility-001: tb_port_facility_info + psc-001: tb_psc_mst + psc-002: tb_psc_defect + psc-003: tb_psc_oa_certf + movements-001: tb_ship_anchrgcall_hstry + movements-002: tb_ship_berthcall_hstry + movements-003: tb_ship_now_status_hstry + movements-004: tb_ship_dest_hstry + movements-005: tb_ship_prtcll_hstry + movements-006: tb_ship_sts_opert_hstry + movements-007: tb_ship_teminalcall_hstry + movements-008: tb_ship_trnst_hstry + code-001: tb_ship_type_cd + code-002: tb_ship_country_cd + risk-compliance-001: tb_ship_risk_info + risk-compliance-002: tb_ship_compliance_info + risk-compliance-003: tb_company_compliance_info + ship-027: core20 + ship-028: ship_detail_hash_json api: url: https://api.example.com/data timeout: 30000 @@ -117,9 +170,9 @@ app: # Core20 캐시 테이블 설정 (환경별로 테이블/컬럼명이 다를 수 있음) core20: schema: t_snp_data # 스키마명 - table: ship_detail_data # 테이블명 - imo-column: ihslrorimoshipno # IMO/LRNO 컬럼명 (PK, NOT NULL) - mmsi-column: maritimemobileserviceidentitymmsinumber # MMSI 컬럼명 (NULLABLE) + table: tb_ship_info_mst # 테이블명 + imo-column: imo_no # IMO/LRNO 컬럼명 (PK, NOT NULL) + mmsi-column: mmsi_no # MMSI 컬럼명 (NULLABLE) # 파티션 관리 설정 partition: