Add Port Import Job, Event Import Job

This commit is contained in:
hyojin kim 2025-12-02 18:26:54 +09:00
부모 b3cb4f6f19
커밋 d6cf58d737
18개의 변경된 파일1225개의 추가작업 그리고 0개의 파일을 삭제

파일 보기

@ -0,0 +1,84 @@
package com.snp.batch.jobs.event.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.event.batch.dto.EventDto;
import com.snp.batch.jobs.event.batch.entity.EventEntity;
import com.snp.batch.jobs.event.batch.processor.EventDataProcessor;
import com.snp.batch.jobs.event.batch.reader.EventDataReader;
import com.snp.batch.jobs.event.batch.writer.EventDataWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class EventImportJobConfig extends BaseJobConfig<EventDto, EventEntity> {
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient;
private final EventDataProcessor eventDataProcessor;
private final EventDataWriter eventDataWriter;
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
public EventImportJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
EventDataProcessor eventDataProcessor,
EventDataWriter eventDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeApiWebClient")WebClient maritimeApiWebClient) {
super(jobRepository, transactionManager);
this.jdbcTemplate = jdbcTemplate;
this.maritimeApiWebClient = maritimeApiWebClient;
this.eventDataProcessor = eventDataProcessor;
this.eventDataWriter = eventDataWriter;
}
@Override
protected String getJobName() {
return "eventImportJob";
}
@Override
protected String getStepName() {
return "eventImportStep";
}
@Override
protected ItemReader<EventDto> createReader() {
return new EventDataReader(maritimeApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<EventDto, EventEntity> createProcessor() {
return eventDataProcessor;
}
@Override
protected ItemWriter<EventEntity> createWriter() { return eventDataWriter; }
@Bean(name = "eventImportJob")
public Job eventImportJob() {
return job();
}
@Bean(name = "eventImportStep")
public Step eventImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,51 @@
package com.snp.batch.jobs.event.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventDto {
@JsonProperty("IncidentID")
private Long incidentId;
@JsonProperty("EventID")
private Long eventId;
@JsonProperty("StartDate")
private String startDate;
@JsonProperty("EventType")
private String eventType;
@JsonProperty("Significance")
private String significance;
@JsonProperty("Headline")
private String headline;
@JsonProperty("EndDate")
private String endDate;
@JsonProperty("IHSLRorIMOShipNo")
private String ihslRorImoShipNo;
@JsonProperty("VesselName")
private String vesselName;
@JsonProperty("VesselType")
private String vesselType;
@JsonProperty("LocationName")
private String locationName;
@JsonProperty("PublishedDate")
private String publishedDate;
}

파일 보기

@ -0,0 +1,21 @@
package com.snp.batch.jobs.event.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventResponse {
@JsonProperty("EventCount")
private Integer eventCount;
@JsonProperty("MaritimeEvents")
private List<EventDto> MaritimeEvents;
}

파일 보기

@ -0,0 +1,31 @@
package com.snp.batch.jobs.event.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity;
import jakarta.persistence.Embedded;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class EventEntity extends BaseEntity {
private Long incidentId;
private Long eventId;
private String startDate;
private String eventType;
private String significance;
private String headline;
private String endDate;
private String ihslRorImoShipNo;
private String vesselName;
private String vesselType;
private String locationName;
private String publishedDate;
}

파일 보기

@ -0,0 +1,35 @@
package com.snp.batch.jobs.event.batch.processor;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.event.batch.dto.EventDto;
import com.snp.batch.jobs.event.batch.entity.EventEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class EventDataProcessor extends BaseProcessor<EventDto, EventEntity> {
@Override
protected EventEntity processItem(EventDto dto) throws Exception {
log.debug("Event 데이터 처리 시작: Event ID = {}", dto.getEventId());
EventEntity entity = EventEntity.builder()
.incidentId(dto.getIncidentId())
.eventId(dto.getEventId())
.startDate(dto.getStartDate())
.eventType(dto.getEventType())
.significance(dto.getSignificance())
.headline(dto.getHeadline())
.endDate(dto.getEndDate())
.ihslRorImoShipNo(dto.getIhslRorImoShipNo())
.vesselName(dto.getVesselName())
.vesselType(dto.getVesselType())
.locationName(dto.getLocationName())
.publishedDate(dto.getPublishedDate())
.build();
log.debug("Event 데이터 처리 완료: Event ID = {}", dto.getEventId());
return entity;
}
}

파일 보기

@ -0,0 +1,64 @@
package com.snp.batch.jobs.event.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.event.batch.dto.EventDto;
import com.snp.batch.jobs.event.batch.dto.EventResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class EventDataReader extends BaseApiReader<EventDto> {
private final JdbcTemplate jdbcTemplate;
public EventDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
}
@Override
protected String getReaderName() {
return "EventDataReader";
}
@Override
protected String getApiPath() {
return "MaritimeWCF/MaritimeAndTradeEventsService.svc/RESTFul/GetEventListByEventChangeDateRange?fromYear=2025&fromMonth=01&fromDay=01&fromHour=00&fromMinute=00&toYear=2025&toMonth=12&toDay=31&toHour=00&toMinute=00";
}
@Override
protected List<EventDto> fetchDataFromApi() {
try {
log.info("Event API 호출 시작");
EventResponse response = callEventApiWithBatch();
if (response != null && response.getMaritimeEvents() != null) {
log.info("API 응답 성공: 총 {} 개의 Event 데이터 수신", response.getEventCount());
return response.getMaritimeEvents();
} else {
log.warn("API 응답이 null이거나 Event 데이터가 없습니다");
return new ArrayList<>();
}
} catch (Exception e) {
log.error("Event API 호출 실패", e);
log.error("에러 메시지: {}", e.getMessage());
return new ArrayList<>();
}
}
private EventResponse callEventApiWithBatch() {
String url = getApiPath();
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(EventResponse.class)
.block();
}
}

파일 보기

@ -0,0 +1,9 @@
package com.snp.batch.jobs.event.batch.repository;
import com.snp.batch.jobs.event.batch.entity.EventEntity;
import java.util.List;
public interface EventRepository {
void saveEventAll(List<EventEntity> items);
}

파일 보기

@ -0,0 +1,133 @@
package com.snp.batch.jobs.event.batch.repository;
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.event.batch.entity.EventEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.sql.Types;
import java.util.List;
@Slf4j
@Repository("EventRepository")
public class EventRepositoryImpl extends BaseJdbcRepository<EventEntity, Long> implements EventRepository {
public EventRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate);
}
@Override
protected String getTableName() {
return null;
}
@Override
protected RowMapper<EventEntity> getRowMapper() {
return null;
}
@Override
protected Long extractId(EventEntity entity) {
return null;
}
@Override
protected String getInsertSql() {
return null;
}
@Override
protected String getUpdateSql() {
return """
INSERT INTO snp_data.event (
Event_ID, Incident_ID, IHSLRorIMOShipNo, Vessel_Name, Vessel_Type,
Event_Type, Significance, Headline, Location_Name,
Published_Date, Event_Start_Date, Event_End_Date, batch_flag
) VALUES (
?, ?, ?, ?, ?,
?, ?, ?, ?,
?::timestamptz, ?::timestamptz, ?::timestamptz, 'N'
) ON CONFLICT (Event_ID) DO UPDATE
SET
Incident_ID = EXCLUDED.Incident_ID,
IHSLRorIMOShipNo = EXCLUDED.IHSLRorIMOShipNo,
Vessel_Name = EXCLUDED.Vessel_Name,
Vessel_Type = EXCLUDED.Vessel_Type,
Event_Type = EXCLUDED.Event_Type,
Significance = EXCLUDED.Significance,
Headline = EXCLUDED.Headline,
Location_Name = EXCLUDED.Location_Name,
Published_Date = EXCLUDED.Published_Date,
Event_Start_Date = EXCLUDED.Event_Start_Date,
Event_End_Date = EXCLUDED.Event_End_Date,
batch_flag = 'N'
;
""";
}
@Override
protected void setInsertParameters(PreparedStatement ps, EventEntity entity) throws Exception {
}
@Override
protected void setUpdateParameters(PreparedStatement ps, EventEntity entity) throws Exception {
int idx = 1;
ps.setLong(idx++, entity.getEventId());
ps.setLong(idx++, entity.getIncidentId());
ps.setString(idx++, entity.getIhslRorImoShipNo());
ps.setString(idx++, entity.getVesselName());
ps.setString(idx++, entity.getVesselType());
ps.setString(idx++, entity.getEventType());
ps.setString(idx++, entity.getSignificance());
ps.setString(idx++, entity.getHeadline());
ps.setString(idx++, entity.getLocationName());
ps.setString(idx++, entity.getPublishedDate());
ps.setString(idx++, entity.getStartDate());
ps.setString(idx++, entity.getEndDate());
}
@Override
protected String getEntityName() {
return "EventEntity";
}
@Override
public void saveEventAll(List<EventEntity> items) {
if (items == null || items.isEmpty()) {
return;
}
jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(),
(ps, entity) -> {
try {
setUpdateParameters(ps, entity);
} catch (Exception e) {
log.error("배치 수정 파라미터 설정 실패", e);
throw new RuntimeException(e);
}
});
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size());
}
private static void setStringOrNull(PreparedStatement ps, int index, String value) throws Exception {
if (value == null) {
ps.setNull(index, Types.VARCHAR);
} else {
ps.setString(index, value);
}
}
/**
* Double 값을 PreparedStatement에 설정 (null 처리 포함)
*/
private static void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception {
if (value == null) {
ps.setNull(index, Types.DOUBLE);
} else {
ps.setDouble(index, value);
}
}
}

파일 보기

@ -0,0 +1,26 @@
package com.snp.batch.jobs.event.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.event.batch.entity.EventEntity;
import com.snp.batch.jobs.event.batch.repository.EventRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class EventDataWriter extends BaseWriter<EventEntity> {
private final EventRepository eventRepository;
public EventDataWriter(EventRepository eventRepository) {
super("EventRepository");
this.eventRepository = eventRepository;
}
@Override
protected void writeItems(List<EventEntity> items) throws Exception {
eventRepository.saveEventAll(items);
log.info("Event 저장 완료: 수정={} 건", items.size());
}
}

파일 보기

@ -0,0 +1,84 @@
package com.snp.batch.jobs.facility.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.facility.batch.dto.PortDto;
import com.snp.batch.jobs.facility.batch.entity.PortEntity;
import com.snp.batch.jobs.facility.batch.processor.PortDataProcessor;
import com.snp.batch.jobs.facility.batch.reader.PortDataReader;
import com.snp.batch.jobs.facility.batch.writer.PortDataWriter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.WebClient;
@Slf4j
@Configuration
public class PortImportJobConfig extends BaseJobConfig<PortDto, PortEntity> {
private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeServiceApiWebClient;
private final PortDataProcessor portDataProcessor;
private final PortDataWriter portDataWriter;
@Override
protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
}
public PortImportJobConfig(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
PortDataProcessor portDataProcessor,
PortDataWriter portDataWriter,
JdbcTemplate jdbcTemplate,
@Qualifier("maritimeServiceApiWebClient")WebClient maritimeServiceApiWebClient) {
super(jobRepository, transactionManager);
this.jdbcTemplate = jdbcTemplate;
this.maritimeServiceApiWebClient = maritimeServiceApiWebClient;
this.portDataProcessor = portDataProcessor;
this.portDataWriter = portDataWriter;
}
@Override
protected String getJobName() {
return "portImportJob";
}
@Override
protected String getStepName() {
return "portImportStep";
}
@Override
protected ItemReader<PortDto> createReader() {
return new PortDataReader(maritimeServiceApiWebClient, jdbcTemplate);
}
@Override
protected ItemProcessor<PortDto, PortEntity> createProcessor() {
return portDataProcessor;
}
@Override
protected ItemWriter<PortEntity> createWriter() { return portDataWriter; }
@Bean(name = "portImportJob")
public Job portImportJob() {
return job();
}
@Bean(name = "portImportStep")
public Step portImportStep() {
return step();
}
}

파일 보기

@ -0,0 +1,172 @@
package com.snp.batch.jobs.facility.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PortDto {
@JsonProperty("port_ID")
private Long portId;
@JsonProperty("old_ID")
private String oldId;
@JsonProperty("status")
private String status;
@JsonProperty("port_Name")
private String portName;
@JsonProperty("unlocode")
private String unlocode;
@JsonProperty("countryCode")
private String countryCode;
@JsonProperty("country_Name")
private String countryName;
@JsonProperty("dec_Lat")
private Double decLat;
@JsonProperty("dec_Long")
private Double decLong;
private PositionDto position;
@JsonProperty("time_Zone")
private String timeZone;
@JsonProperty("dayLight_Saving_Time")
private Boolean dayLightSavingTime;
@JsonProperty("maximum_Draft")
private Double maximumDraft;
@JsonProperty("breakbulk_Facilities")
private Boolean breakbulkFacilities;
@JsonProperty("container_Facilities")
private Boolean containerFacilities;
@JsonProperty("dry_Bulk_Facilities")
private Boolean dryBulkFacilities;
@JsonProperty("liquid_Facilities")
private Boolean liquidFacilities;
@JsonProperty("roRo_Facilities")
private Boolean roRoFacilities;
@JsonProperty("passenger_Facilities")
private Boolean passengerFacilities;
@JsonProperty("dry_Dock_Facilities")
private Boolean dryDockFacilities;
@JsonProperty("lpG_Facilities")
private Integer lpgFacilities;
@JsonProperty("lnG_Facilities")
private Integer lngFacilities;
@JsonProperty("ispS_Compliant")
private Boolean ispsCompliant;
@JsonProperty("csI_Compliant")
private Boolean csiCompliant;
@JsonProperty("last_Update")
private String lastUpdate;
@JsonProperty("entry_Date")
private String entryDate;
@JsonProperty("region_Name")
private String regionName;
@JsonProperty("continent_Name")
private String continentName;
@JsonProperty("master_POID")
private String masterPoid;
@JsonProperty("wS_Port")
private Integer wsPort;
@JsonProperty("max_LOA")
private Double maxLoa;
@JsonProperty("max_Beam")
private Double maxBeam;
@JsonProperty("max_DWT")
private Double maxDwt;
@JsonProperty("max_Offshore_Draught")
private Double maxOffshoreDraught;
@JsonProperty("max_Offshore_LOA")
private Double maxOffshoreLoa;
@JsonProperty("max_Offshore_BCM")
private Double maxOffshoreBcm;
@JsonProperty("max_Offshore_DWT")
private Double maxOffshoreDwt;
@JsonProperty("lnG_Bunker")
private Boolean lngBunker;
@JsonProperty("dO_Bunker")
private Boolean doBunker;
@JsonProperty("fO_Bunker")
private Boolean foBunker;
@JsonProperty("free_Trade_Zone")
private Boolean freeTradeZone;
@JsonProperty("ecO_Port")
private Boolean ecoPort;
@JsonProperty("emission_Control_Area")
private Boolean emissionControlArea;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public static class PositionDto {
@JsonProperty("isNull")
private Boolean isNull;
@JsonProperty("stSrid")
private Integer stSrid;
@JsonProperty("lat")
private Double lat;
@JsonProperty("long") // JSON 키가 Java 예약어 'long'이므로 @JsonProperty를 사용
private Double longitude;
@JsonProperty("z")
private Object z;
@JsonProperty("m")
private Object m;
@JsonProperty("hasZ")
private Boolean hasZ;
@JsonProperty("hasM")
private Boolean hasM;
}
}

파일 보기

@ -0,0 +1,16 @@
package com.snp.batch.jobs.facility.batch.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PortResponse {
private List<PortDto> portDtoList;
}

파일 보기

@ -0,0 +1,78 @@
package com.snp.batch.jobs.facility.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity;
import jakarta.persistence.Embedded;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class PortEntity extends BaseEntity {
private Long portID;
private String oldID;
private String status;
private String portName;
private String unlocode;
private String countryCode;
private String countryName;
private Double decLat;
private Double decLong;
@Embedded
private PositionEntity position;
private String timeZone;
private Boolean dayLightSavingTime;
private Double maximumDraft;
private Boolean breakbulkFacilities;
private Boolean containerFacilities;
private Boolean dryBulkFacilities;
private Boolean liquidFacilities;
private Boolean roRoFacilities;
private Boolean passengerFacilities;
private Boolean dryDockFacilities;
private Integer lpGFacilities;
private Integer lnGFacilities;
private Boolean ispsCompliant;
private Boolean csiCompliant;
private String lastUpdate;
private String entryDate;
private String regionName;
private String continentName;
private String masterPoid;
private Integer wsPort;
private Double maxLoa;
private Double maxBeam;
private Double maxDwt;
private Double maxOffshoreDraught;
private Double maxOffshoreLoa;
private Double maxOffshoreBcm;
private Double maxOffshoreDwt;
private Boolean lngBunker;
private Boolean doBunker;
private Boolean foBunker;
private Boolean freeTradeZone;
private Boolean ecoPort;
private Boolean emissionControlArea;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public static class PositionEntity {
private Boolean isNull;
private Integer stSrid;
private Double lat;
private Double longitude;
private Object z;
private Object m;
private Boolean hasZ;
private Boolean hasM;
}
}

파일 보기

@ -0,0 +1,80 @@
package com.snp.batch.jobs.facility.batch.processor;
import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.facility.batch.dto.PortDto;
import com.snp.batch.jobs.facility.batch.entity.PortEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class PortDataProcessor extends BaseProcessor<PortDto, PortEntity> {
@Override
protected PortEntity processItem(PortDto dto) throws Exception {
log.debug("Port 데이터 처리 시작: Port ID = {}", dto.getPortId());
PortEntity.PositionEntity positionEntity = null;
if (dto.getPosition() != null) {
positionEntity = PortEntity.PositionEntity.builder()
.isNull(dto.getPosition().getIsNull())
.stSrid(dto.getPosition().getStSrid())
.lat(dto.getPosition().getLat())
.longitude(dto.getPosition().getLongitude())
.z(dto.getPosition().getZ())
.m(dto.getPosition().getM())
.hasZ(dto.getPosition().getHasZ())
.hasM(dto.getPosition().getHasM())
.build();
}
PortEntity entity = PortEntity.builder()
.portID(dto.getPortId())
.oldID(dto.getOldId())
.status(dto.getStatus())
.portName(dto.getPortName())
.unlocode(dto.getUnlocode())
.countryCode(dto.getCountryCode())
.countryName(dto.getCountryName())
.decLat(dto.getDecLat())
.decLong(dto.getDecLong())
.position(positionEntity) // 변환된 PositionEntity 객체
.timeZone(dto.getTimeZone())
.dayLightSavingTime(dto.getDayLightSavingTime())
.maximumDraft(dto.getMaximumDraft())
.breakbulkFacilities(dto.getBreakbulkFacilities())
.containerFacilities(dto.getContainerFacilities())
.dryBulkFacilities(dto.getDryBulkFacilities())
.liquidFacilities(dto.getLiquidFacilities())
.roRoFacilities(dto.getRoRoFacilities())
.passengerFacilities(dto.getPassengerFacilities())
.dryDockFacilities(dto.getDryDockFacilities())
.lpGFacilities(dto.getLpgFacilities())
.lnGFacilities(dto.getLngFacilities())
.ispsCompliant(dto.getIspsCompliant())
.csiCompliant(dto.getCsiCompliant())
.lastUpdate(dto.getLastUpdate())
.entryDate(dto.getEntryDate())
.regionName(dto.getRegionName())
.continentName(dto.getContinentName())
.masterPoid(dto.getMasterPoid())
.wsPort(dto.getWsPort())
.maxLoa(dto.getMaxLoa())
.maxBeam(dto.getMaxBeam())
.maxDwt(dto.getMaxDwt())
.maxOffshoreDraught(dto.getMaxOffshoreDraught())
.maxOffshoreLoa(dto.getMaxOffshoreLoa())
.maxOffshoreBcm(dto.getMaxOffshoreBcm())
.maxOffshoreDwt(dto.getMaxOffshoreDwt())
.lngBunker(dto.getLngBunker())
.doBunker(dto.getDoBunker())
.foBunker(dto.getFoBunker())
.freeTradeZone(dto.getFreeTradeZone())
.ecoPort(dto.getEcoPort())
.emissionControlArea(dto.getEmissionControlArea())
.build();
log.debug("Port 데이터 처리 완료: Port ID = {}", dto.getPortId());
return entity;
}
}

파일 보기

@ -0,0 +1,69 @@
package com.snp.batch.jobs.facility.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.facility.batch.dto.PortDto;
import com.snp.batch.jobs.shipimport.batch.dto.ShipApiResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Slf4j
public class PortDataReader extends BaseApiReader<PortDto> {
private final JdbcTemplate jdbcTemplate;
private List<String> allImoNumbers;
private int currentBatchIndex = 0;
private final int batchSize = 100;
public PortDataReader(WebClient webClient, JdbcTemplate jdbcTemplate) {
super(webClient);
this.jdbcTemplate = jdbcTemplate;
}
@Override
protected String getReaderName() {
return "PortDataReader";
}
@Override
protected String getApiPath() {
return "/Facilities/Ports";
}
@Override
protected List<PortDto> fetchDataFromApi() {
try {
log.info("Facility Port API 호출 시작");
List<PortDto> response = callFacilityPortApiWithBatch();
if (response != null) {
log.info("API 응답 성공: 총 {} 개의 Port 데이터 수신", response.size());
return response;
} else {
log.warn("API 응답이 null이거나 Port 데이터가 없습니다");
return new ArrayList<>();
}
} catch (Exception e) {
log.error("Facility Port API 호출 실패", e);
log.error("에러 메시지: {}", e.getMessage());
return new ArrayList<>();
}
}
private List<PortDto> callFacilityPortApiWithBatch() {
String url = getApiPath();
log.debug("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<PortDto>>() {})
.block();
}
}

파일 보기

@ -0,0 +1,9 @@
package com.snp.batch.jobs.facility.batch.repository;
import com.snp.batch.jobs.facility.batch.entity.PortEntity;
import java.util.List;
public interface FacilityRepository {
void savePortAll(List<PortEntity> items);
}

파일 보기

@ -0,0 +1,237 @@
package com.snp.batch.jobs.facility.batch.repository;
import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.facility.batch.entity.PortEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.PreparedStatement;
import java.sql.Types;
import java.util.List;
@Slf4j
@Repository("FacilityRepository")
public class FacilityRepositoryImpl extends BaseJdbcRepository<PortEntity, Long> implements FacilityRepository {
public FacilityRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate);
}
@Override
protected String getTableName() {
return null;
}
@Override
protected RowMapper<PortEntity> getRowMapper() {
return null;
}
@Override
protected Long extractId(PortEntity entity) {
return null;
}
@Override
protected String getInsertSql() {
return null;
}
@Override
protected String getUpdateSql() {
return """
INSERT INTO snp_data.facility_port (
port_ID, old_ID, status, port_Name, unlocode, countryCode, country_Name, region_Name, continent_Name, master_POID,
dec_Lat, dec_Long, position_lat, position_long, position_z, position_m, position_hasZ, position_hasM, position_isNull, position_stSrid, time_Zone, dayLight_Saving_Time,
maximum_Draft, max_LOA, max_Beam, max_DWT, max_Offshore_Draught, max_Offshore_LOA, max_Offshore_BCM, max_Offshore_DWT,
breakbulk_Facilities, container_Facilities, dry_Bulk_Facilities, liquid_Facilities, roRo_Facilities, passenger_Facilities, dry_Dock_Facilities,
lpG_Facilities, lnG_Facilities, lnG_Bunker, dO_Bunker, fO_Bunker, ispS_Compliant, csI_Compliant, free_Trade_Zone, ecO_Port, emission_Control_Area, wS_Port,
last_Update, entry_Date, batch_flag
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?::timestamptz, ?::timestamptz, 'N'
) ON CONFLICT (port_ID) DO UPDATE
SET
old_ID = EXCLUDED.old_ID,
status = EXCLUDED.status,
port_Name = EXCLUDED.port_Name,
unlocode = EXCLUDED.unlocode,
countryCode = EXCLUDED.countryCode,
country_Name = EXCLUDED.country_Name,
region_Name = EXCLUDED.region_Name,
continent_Name = EXCLUDED.continent_Name,
master_POID = EXCLUDED.master_POID,
dec_Lat = EXCLUDED.dec_Lat,
dec_Long = EXCLUDED.dec_Long,
position_lat = EXCLUDED.position_lat,
position_long = EXCLUDED.position_long,
position_z = EXCLUDED.position_z,
position_m = EXCLUDED.position_m,
position_hasZ = EXCLUDED.position_hasZ,
position_hasM = EXCLUDED.position_hasM,
position_isNull = EXCLUDED.position_isNull,
position_stSrid = EXCLUDED.position_stSrid,
time_Zone = EXCLUDED.time_Zone,
dayLight_Saving_Time = EXCLUDED.dayLight_Saving_Time,
maximum_Draft = EXCLUDED.maximum_Draft,
max_LOA = EXCLUDED.max_LOA,
max_Beam = EXCLUDED.max_Beam,
max_DWT = EXCLUDED.max_DWT,
max_Offshore_Draught = EXCLUDED.max_Offshore_Draught,
max_Offshore_LOA = EXCLUDED.max_Offshore_LOA,
max_Offshore_BCM = EXCLUDED.max_Offshore_BCM,
max_Offshore_DWT = EXCLUDED.max_Offshore_DWT,
breakbulk_Facilities = EXCLUDED.breakbulk_Facilities,
container_Facilities = EXCLUDED.container_Facilities,
dry_Bulk_Facilities = EXCLUDED.dry_Bulk_Facilities,
liquid_Facilities = EXCLUDED.liquid_Facilities,
roRo_Facilities = EXCLUDED.roRo_Facilities,
passenger_Facilities = EXCLUDED.passenger_Facilities,
dry_Dock_Facilities = EXCLUDED.dry_Dock_Facilities,
lpG_Facilities = EXCLUDED.lpG_Facilities,
lnG_Facilities = EXCLUDED.lnG_Facilities,
lnG_Bunker = EXCLUDED.lnG_Bunker,
dO_Bunker = EXCLUDED.dO_Bunker,
fO_Bunker = EXCLUDED.fO_Bunker,
ispS_Compliant = EXCLUDED.ispS_Compliant,
csI_Compliant = EXCLUDED.csI_Compliant,
free_Trade_Zone = EXCLUDED.free_Trade_Zone,
ecO_Port = EXCLUDED.ecO_Port,
emission_Control_Area = EXCLUDED.emission_Control_Area,
wS_Port = EXCLUDED.wS_Port,
last_Update = EXCLUDED.last_Update,
entry_Date = EXCLUDED.entry_Date,
batch_flag = 'N'
""";
}
@Override
protected void setInsertParameters(PreparedStatement ps, PortEntity entity) throws Exception {
}
@Override
protected void setUpdateParameters(PreparedStatement ps, PortEntity entity) throws Exception {
int idx = 1;
ps.setLong(idx++, entity.getPortID());
ps.setString(idx++, entity.getOldID());
ps.setString(idx++, entity.getStatus());
ps.setString(idx++, entity.getPortName());
ps.setString(idx++, entity.getUnlocode());
ps.setString(idx++, entity.getCountryCode());
ps.setString(idx++, entity.getCountryName());
ps.setString(idx++, entity.getRegionName());
ps.setString(idx++, entity.getContinentName());
ps.setString(idx++, entity.getMasterPoid());
setDoubleOrNull(ps, idx++, entity.getDecLat());
setDoubleOrNull(ps, idx++, entity.getDecLong());
PortEntity.PositionEntity pos = entity.getPosition();
if (pos != null) {
setDoubleOrNull(ps, idx++, pos.getLat());
setDoubleOrNull(ps, idx++, pos.getLongitude());
ps.setObject(idx++, pos.getZ(), Types.OTHER);
ps.setObject(idx++, pos.getM(), Types.OTHER);
setBooleanOrNull(ps, idx++, pos.getHasZ());
setBooleanOrNull(ps, idx++, pos.getHasM());
setBooleanOrNull(ps, idx++, pos.getIsNull());
setIntegerOrNull(ps, idx++, pos.getStSrid());
} else {
for (int i = 0; i < 8; i++) {
ps.setNull(idx++, Types.NULL);
}
}
ps.setString(idx++, entity.getTimeZone());
setBooleanOrNull(ps, idx++, entity.getDayLightSavingTime());
setDoubleOrNull(ps, idx++, entity.getMaximumDraft()); // 원본: setIntegerOrNull(getMaximumDraft())였으나 FLOAT에 맞게 수정
setDoubleOrNull(ps, idx++, entity.getMaxLoa()); // 원본: setIntegerOrNull(getMaxLoa())였으나 FLOAT에 맞게 수정
setDoubleOrNull(ps, idx++, entity.getMaxBeam()); // 원본: setIntegerOrNull(getMaxBeam())였으나 FLOAT에 맞게 수정
setDoubleOrNull(ps, idx++, entity.getMaxDwt()); // 원본: setIntegerOrNull(getMaxDwt())였으나 FLOAT에 맞게 수정
setDoubleOrNull(ps, idx++, entity.getMaxOffshoreDraught()); // 원본: setIntegerOrNull(getMaxOffshoreDraught())였으나 FLOAT에 맞게 수정
setDoubleOrNull(ps, idx++, entity.getMaxOffshoreLoa()); // 원본: setIntegerOrNull(getMaxOffshoreLoa())였으나 FLOAT에 맞게 수정
setDoubleOrNull(ps, idx++, entity.getMaxOffshoreBcm()); // 원본: setIntegerOrNull(getMaxOffshoreBcm())였으나 FLOAT에 맞게 수정
setDoubleOrNull(ps, idx++, entity.getMaxOffshoreDwt()); // 원본: setIntegerOrNull(getMaxOffshoreDwt())였으나 FLOAT에 맞게 수정
setBooleanOrNull(ps, idx++, entity.getBreakbulkFacilities());
setBooleanOrNull(ps, idx++, entity.getContainerFacilities());
setBooleanOrNull(ps, idx++, entity.getDryBulkFacilities());
setBooleanOrNull(ps, idx++, entity.getLiquidFacilities());
setBooleanOrNull(ps, idx++, entity.getRoRoFacilities());
setBooleanOrNull(ps, idx++, entity.getPassengerFacilities());
setBooleanOrNull(ps, idx++, entity.getDryDockFacilities());
setIntegerOrNull(ps, idx++, entity.getLpGFacilities()); // INT8(BIGINT) 맞게 setLongOrNull 사용 가정
setIntegerOrNull(ps, idx++, entity.getLnGFacilities()); // INT8(BIGINT) 맞게 setLongOrNull 사용 가정
setBooleanOrNull(ps, idx++, entity.getLngBunker()); // 원본 위치: 마지막 부분
setBooleanOrNull(ps, idx++, entity.getDoBunker()); // 원본 위치: 마지막 부분
setBooleanOrNull(ps, idx++, entity.getFoBunker()); // 원본 위치: 마지막 부분
setBooleanOrNull(ps, idx++, entity.getIspsCompliant());
setBooleanOrNull(ps, idx++, entity.getCsiCompliant());
setBooleanOrNull(ps, idx++, entity.getFreeTradeZone()); // 원본 위치: 마지막 부분
setBooleanOrNull(ps, idx++, entity.getEcoPort()); // 원본 위치: 마지막 부분
setBooleanOrNull(ps, idx++, entity.getEmissionControlArea()); // 원본 위치: 마지막 부분
setIntegerOrNull(ps, idx++, entity.getWsPort()); // 원본 위치: 마지막 부분 (INT8에 맞게 setLongOrNull 사용 가정)
ps.setString(idx++, entity.getLastUpdate()); // String 대신 Timestamp 타입이 JDBC 표준에 적합합니다.
ps.setString(idx++, entity.getEntryDate()); // String 대신 Timestamp 타입이 JDBC 표준에 적합합니다.
}
@Override
protected String getEntityName() {
return "RiskEntity";
}
@Override
public void savePortAll(List<PortEntity> items) {
if (items == null || items.isEmpty()) {
return;
}
jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(),
(ps, entity) -> {
try {
setUpdateParameters(ps, entity);
} catch (Exception e) {
log.error("배치 수정 파라미터 설정 실패", e);
throw new RuntimeException(e);
}
});
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size());
}
/**
* Integer 값을 PreparedStatement에 설정 (null 처리 포함)
*/
private void setIntegerOrNull(PreparedStatement ps, int index, Integer value) throws Exception {
if (value == null) {
ps.setNull(index, Types.INTEGER);
} else {
ps.setInt(index, value);
}
}
/**
* Double 값을 PreparedStatement에 설정 (null 처리 포함)
*/
private void setDoubleOrNull(PreparedStatement ps, int index, Double value) throws Exception {
if (value == null) {
ps.setNull(index, Types.DOUBLE);
} else {
ps.setDouble(index, value);
}
}
/**
* Boolean 값을 PreparedStatement에 설정 (null 처리 포함)
*/
private void setBooleanOrNull(PreparedStatement ps, int index, Boolean value) throws Exception {
if (value == null) {
// DB 타입에 따라 BOOLEAN 또는 TINYINT(1) 사용
ps.setNull(index, Types.BOOLEAN);
} else {
ps.setBoolean(index, value);
}
}
}

파일 보기

@ -0,0 +1,26 @@
package com.snp.batch.jobs.facility.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.facility.batch.entity.PortEntity;
import com.snp.batch.jobs.facility.batch.repository.FacilityRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class PortDataWriter extends BaseWriter<PortEntity> {
private final FacilityRepository facilityRepository;
public PortDataWriter(FacilityRepository facilityRepository) {
super("FacilityRepository");
this.facilityRepository = facilityRepository;
}
@Override
protected void writeItems(List<PortEntity> items) throws Exception {
facilityRepository.savePortAll(items);
log.info("Port 저장 완료: 수정={} 건", items.size());
}
}