Event Detail 적재 프로세스 개발

- StartDate, EndDate 추출작업 필요
This commit is contained in:
hyojin kim 2025-12-19 10:57:40 +09:00
부모 270b2a0b55
커밋 acd76bd358
17개의 변경된 파일902개의 추가작업 그리고 84개의 파일을 삭제

파일 보기

@ -1,8 +1,8 @@
package com.snp.batch.jobs.event.batch.config; package com.snp.batch.jobs.event.batch.config;
import com.snp.batch.common.batch.config.BaseJobConfig; import com.snp.batch.common.batch.config.BaseJobConfig;
import com.snp.batch.jobs.event.batch.dto.EventDto; import com.snp.batch.jobs.event.batch.dto.EventDetailDto;
import com.snp.batch.jobs.event.batch.entity.EventEntity; import com.snp.batch.jobs.event.batch.entity.EventDetailEntity;
import com.snp.batch.jobs.event.batch.processor.EventDataProcessor; import com.snp.batch.jobs.event.batch.processor.EventDataProcessor;
import com.snp.batch.jobs.event.batch.reader.EventDataReader; import com.snp.batch.jobs.event.batch.reader.EventDataReader;
import com.snp.batch.jobs.event.batch.writer.EventDataWriter; import com.snp.batch.jobs.event.batch.writer.EventDataWriter;
@ -23,7 +23,7 @@ import org.springframework.web.reactive.function.client.WebClient;
@Slf4j @Slf4j
@Configuration @Configuration
public class EventImportJobConfig extends BaseJobConfig<EventDto, EventEntity> { public class EventImportJobConfig extends BaseJobConfig<EventDetailDto, EventDetailEntity> {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final WebClient maritimeApiWebClient; private final WebClient maritimeApiWebClient;
@ -34,7 +34,7 @@ public class EventImportJobConfig extends BaseJobConfig<EventDto, EventEntity> {
@Override @Override
protected int getChunkSize() { protected int getChunkSize() {
return 5000; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정 return 10; // API에서 5000개씩 가져오므로 chunk도 5000으로 설정
} }
public EventImportJobConfig( public EventImportJobConfig(
JobRepository jobRepository, JobRepository jobRepository,
@ -63,17 +63,17 @@ public class EventImportJobConfig extends BaseJobConfig<EventDto, EventEntity> {
} }
@Override @Override
protected ItemReader<EventDto> createReader() { protected ItemReader<EventDetailDto> createReader() {
return new EventDataReader(maritimeApiWebClient, jdbcTemplate, batchDateService); return new EventDataReader(maritimeApiWebClient, jdbcTemplate, batchDateService);
} }
@Override @Override
protected ItemProcessor<EventDto, EventEntity> createProcessor() { protected ItemProcessor<EventDetailDto, EventDetailEntity> createProcessor() {
return eventDataProcessor; return eventDataProcessor;
} }
@Override @Override
protected ItemWriter<EventEntity> createWriter() { return eventDataWriter; } protected ItemWriter<EventDetailEntity> createWriter() { return eventDataWriter; }
@Bean(name = "eventImportJob") @Bean(name = "eventImportJob")
public Job eventImportJob() { public Job eventImportJob() {

파일 보기

@ -0,0 +1,50 @@
package com.snp.batch.jobs.event.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.snp.batch.jobs.event.batch.entity.CargoEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CargoDto {
@JsonProperty("EventID")
private Integer eventID;
@JsonProperty("Sequence")
private String sequence;
@JsonProperty("IHSLRorIMOShipNo")
private String ihslrOrImoShipNo;
@JsonProperty("Type")
private String type;
@JsonProperty("Quantity")
private Integer quantity;
@JsonProperty("UnitShort")
private String unitShort;
@JsonProperty("Unit")
private String unit;
@JsonProperty("Text")
private String text;
@JsonProperty("CargoDamage")
private String cargoDamage;
@JsonProperty("Dangerous")
private String dangerous;
public CargoEntity toEntity() {
return CargoEntity.builder()
.eventID(this.eventID)
.sequence(this.sequence)
.ihslrOrImoShipNo(this.ihslrOrImoShipNo)
.type(this.type)
.unit(this.unit)
.quantity(this.quantity)
.unitShort(this.unitShort)
.text(this.text)
.cargoDamage(this.cargoDamage)
.dangerous(this.dangerous)
.build();
}
}

파일 보기

@ -0,0 +1,105 @@
package com.snp.batch.jobs.event.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventDetailDto {
@JsonProperty("IncidentID")
private Integer incidentID;
@JsonProperty("EventID")
private Integer eventID;
@JsonProperty("EventTypeID")
private Integer eventTypeID;
@JsonProperty("EventType")
private String eventType;
@JsonProperty("Significance")
private String significance;
@JsonProperty("Headline")
private String headline;
@JsonProperty("IHSLRorIMOShipNo")
private String ihslrOrImoShipNo;
@JsonProperty("VesselName")
private String vesselName;
@JsonProperty("VesselType")
private String vesselType;
@JsonProperty("VesselTypeDecode")
private String vesselTypeDecode;
@JsonProperty("VesselFlag")
private String vesselFlagCode;
@JsonProperty("Flag")
private String vesselFlagDecode;
@JsonProperty("CargoLoadingStatusCode")
private String cargoLoadingStatusCode;
@JsonProperty("VesselDWT")
private Integer vesselDWT;
@JsonProperty("VesselGT")
private Integer vesselGT;
@JsonProperty("LDTAtTime")
private Integer ldtAtTime;
@JsonProperty("DateOfBuild")
private Integer dateOfBuild;
@JsonProperty("RegisteredOwnerCodeAtTime")
private String registeredOwnerCodeAtTime;
@JsonProperty("RegisteredOwnerAtTime")
private String registeredOwnerAtTime;
@JsonProperty("RegisteredOwnerCoDAtTime")
private String registeredOwnerCountryCodeAtTime;
@JsonProperty("RegisteredOwnerCountryAtTime")
private String registeredOwnerCountryAtTime;
@JsonProperty("Weather")
private String weather;
@JsonProperty("EventTypeDetail")
private String eventTypeDetail;
@JsonProperty("EventTypeDetailID")
private Integer eventTypeDetailID;
@JsonProperty("CasualtyAction")
private String casualtyAction;
@JsonProperty("LocationName")
private String locationName;
@JsonProperty("TownName")
private String townName;
@JsonProperty("MarsdenGridReference")
private Integer marsdenGridReference;
@JsonProperty("EnvironmentLocation")
private String environmentLocation;
@JsonProperty("CasualtyZone")
private String casualtyZone;
@JsonProperty("CasualtyZoneCode")
private String casualtyZoneCode;
@JsonProperty("CountryCode")
private String countryCode;
@JsonProperty("AttemptedBoarding")
private String attemptedBoarding;
@JsonProperty("Description")
private String description;
@JsonProperty("Pollutant")
private String pollutant;
@JsonProperty("PollutantUnit")
private String pollutantUnit;
@JsonProperty("PollutantQuantity")
private Double pollutantQuantity;
@JsonProperty("PublishedDate")
private String publishedDate;
@JsonProperty("Component2")
private String component2;
@JsonProperty("FiredUpon")
private String firedUpon;
@JsonProperty("Cargoes")
private List<CargoDto> cargoes;
@JsonProperty("HumanCasualties")
private List<HumanCasualtyDto> humanCasualties;
@JsonProperty("Relationships")
private List<RelationshipDto> relationships;
}

파일 보기

@ -0,0 +1,18 @@
package com.snp.batch.jobs.event.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventDetailResponse {
@JsonProperty("MaritimeEvent")
private EventDetailDto eventDetailDto;
}

파일 보기

@ -0,0 +1,35 @@
package com.snp.batch.jobs.event.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.snp.batch.jobs.event.batch.entity.HumanCasualtyEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HumanCasualtyDto {
@JsonProperty("EventID")
private Integer eventID;
@JsonProperty("Scope")
private String scope;
@JsonProperty("Type")
private String type;
@JsonProperty("Qualifier")
private String qualifier;
@JsonProperty("Count")
private Integer count;
public HumanCasualtyEntity toEntity() {
return HumanCasualtyEntity.builder()
.eventID(this.eventID)
.scope(this.scope)
.type(this.type)
.qualifier(this.qualifier)
.count(this.count)
.build();
}
}

파일 보기

@ -0,0 +1,41 @@
package com.snp.batch.jobs.event.batch.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.snp.batch.jobs.event.batch.entity.RelationshipEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RelationshipDto {
@JsonProperty("IncidentID")
private String incidentID;
@JsonProperty("EventID")
private Integer eventID;
@JsonProperty("RelationshipType")
private String relationshipType;
@JsonProperty("RelationshipTypeCode")
private String relationshipTypeCode;
@JsonProperty("EventID2")
private Integer eventID2;
@JsonProperty("EventType")
private String eventType;
@JsonProperty("EventTypeCode")
private String eventTypeCode;
public RelationshipEntity toEntity() {
return RelationshipEntity.builder()
.incidentID(this.incidentID)
.eventID(this.eventID)
.relationshipType(this.relationshipType)
.relationshipTypeCode(this.relationshipTypeCode)
.eventID2(this.eventID2)
.eventType(this.eventType)
.eventTypeCode(this.eventTypeCode)
.build();
}
}

파일 보기

@ -0,0 +1,26 @@
package com.snp.batch.jobs.event.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class CargoEntity extends BaseEntity {
private Integer eventID;
private String sequence;
private String ihslrOrImoShipNo;
private String type;
private Integer quantity;
private String unitShort;
private String unit;
private String text;
private String cargoDamage;
private String dangerous;
}

파일 보기

@ -0,0 +1,62 @@
package com.snp.batch.jobs.event.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class EventDetailEntity extends BaseEntity {
private Integer incidentID;
private Integer eventID;
private Integer eventTypeID;
private String eventType;
private String significance;
private String headline;
private String ihslrOrImoShipNo;
private String vesselName;
private String vesselType;
private String vesselTypeDecode;
private String vesselFlagCode;
private String vesselFlagDecode;
private String cargoLoadingStatusCode;
private Integer vesselDWT;
private Integer vesselGT;
private Integer ldtAtTime;
private Integer dateOfBuild;
private String registeredOwnerCodeAtTime;
private String registeredOwnerAtTime;
private String registeredOwnerCountryCodeAtTime;
private String registeredOwnerCountryAtTime;
private String weather;
private String eventTypeDetail;
private Integer eventTypeDetailID;
private String casualtyAction;
private String locationName;
private String townName;
private Integer marsdenGridReference;
private String environmentLocation;
private String casualtyZone;
private String casualtyZoneCode;
private String countryCode;
private String attemptedBoarding;
private String description;
private String pollutant;
private String pollutantUnit;
private Double pollutantQuantity;
private String publishedDate;
private String component2;
private String firedUpon;
private List<CargoEntity> cargoes;
private List<HumanCasualtyEntity> humanCasualties;
private List<RelationshipEntity> relationships;
}

파일 보기

@ -1,7 +1,6 @@
package com.snp.batch.jobs.event.batch.entity; package com.snp.batch.jobs.event.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity; import com.snp.batch.common.batch.entity.BaseEntity;
import jakarta.persistence.Embedded;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

파일 보기

@ -0,0 +1,21 @@
package com.snp.batch.jobs.event.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class HumanCasualtyEntity extends BaseEntity {
private Integer eventID;
private String scope;
private String type;
private String qualifier;
private Integer count;
}

파일 보기

@ -0,0 +1,23 @@
package com.snp.batch.jobs.event.batch.entity;
import com.snp.batch.common.batch.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class RelationshipEntity extends BaseEntity {
private String incidentID;
private Integer eventID;
private String relationshipType;
private String relationshipTypeCode;
private Integer eventID2;
private String eventType;
private String eventTypeCode;
}

파일 보기

@ -1,34 +1,73 @@
package com.snp.batch.jobs.event.batch.processor; package com.snp.batch.jobs.event.batch.processor;
import com.snp.batch.common.batch.processor.BaseProcessor; import com.snp.batch.common.batch.processor.BaseProcessor;
import com.snp.batch.jobs.event.batch.dto.EventDto; import com.snp.batch.jobs.event.batch.dto.CargoDto;
import com.snp.batch.jobs.event.batch.entity.EventEntity; import com.snp.batch.jobs.event.batch.dto.EventDetailDto;
import com.snp.batch.jobs.event.batch.dto.HumanCasualtyDto;
import com.snp.batch.jobs.event.batch.dto.RelationshipDto;
import com.snp.batch.jobs.event.batch.entity.EventDetailEntity;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.stream.Collectors;
@Slf4j @Slf4j
@Component @Component
public class EventDataProcessor extends BaseProcessor<EventDto, EventEntity> { public class EventDataProcessor extends BaseProcessor<EventDetailDto, EventDetailEntity> {
@Override @Override
protected EventEntity processItem(EventDto dto) throws Exception { protected EventDetailEntity processItem(EventDetailDto dto) throws Exception {
log.debug("Event 데이터 처리 시작: Event ID = {}", dto.getEventId()); log.debug("Event 데이터 처리 시작: Event ID = {}", dto.getEventID());
EventEntity entity = EventEntity.builder() EventDetailEntity entity = EventDetailEntity.builder()
.incidentId(dto.getIncidentId()) .incidentID(dto.getIncidentID())
.eventId(dto.getEventId()) .eventID(dto.getEventID())
.startDate(dto.getStartDate()) .eventTypeID(dto.getEventTypeID())
.eventType(dto.getEventType()) .eventType(dto.getEventType())
.significance(dto.getSignificance()) .significance(dto.getSignificance())
.headline(dto.getHeadline()) .headline(dto.getHeadline())
.endDate(dto.getEndDate()) .ihslrOrImoShipNo(dto.getIhslrOrImoShipNo())
.ihslRorImoShipNo(dto.getIhslRorImoShipNo())
.vesselName(dto.getVesselName()) .vesselName(dto.getVesselName())
.vesselType(dto.getVesselType()) .vesselType(dto.getVesselType())
.vesselTypeDecode(dto.getVesselTypeDecode())
.vesselFlagCode(dto.getVesselFlagCode())
.vesselFlagDecode(dto.getVesselFlagDecode())
.cargoLoadingStatusCode(dto.getCargoLoadingStatusCode())
.vesselDWT(dto.getVesselDWT())
.vesselGT(dto.getVesselGT())
.ldtAtTime(dto.getLdtAtTime())
.dateOfBuild(dto.getDateOfBuild())
.registeredOwnerCodeAtTime(dto.getRegisteredOwnerCodeAtTime())
.registeredOwnerAtTime(dto.getRegisteredOwnerAtTime())
.registeredOwnerCountryCodeAtTime(dto.getRegisteredOwnerCountryCodeAtTime())
.registeredOwnerCountryAtTime(dto.getRegisteredOwnerCountryAtTime())
.weather(dto.getWeather())
.eventTypeDetail(dto.getEventTypeDetail())
.eventTypeDetailID(dto.getEventTypeDetailID())
.casualtyAction(dto.getCasualtyAction())
.locationName(dto.getLocationName()) .locationName(dto.getLocationName())
.townName(dto.getTownName())
.marsdenGridReference(dto.getMarsdenGridReference())
.environmentLocation(dto.getEnvironmentLocation())
.casualtyZone(dto.getCasualtyZone())
.casualtyZoneCode(dto.getCasualtyZoneCode())
.countryCode(dto.getCountryCode())
.attemptedBoarding(dto.getAttemptedBoarding())
.description(dto.getDescription())
.pollutant(dto.getPollutant())
.pollutantUnit(dto.getPollutantUnit())
.pollutantQuantity(dto.getPollutantQuantity())
.publishedDate(dto.getPublishedDate()) .publishedDate(dto.getPublishedDate())
.component2(dto.getComponent2())
.firedUpon(dto.getFiredUpon())
.cargoes(dto.getCargoes() != null ?
dto.getCargoes().stream().map(CargoDto::toEntity).collect(Collectors.toList()) : null)
.humanCasualties(dto.getHumanCasualties() != null ?
dto.getHumanCasualties().stream().map(HumanCasualtyDto::toEntity).collect(Collectors.toList()) : null)
.relationships(dto.getRelationships() != null ?
dto.getRelationships().stream().map(RelationshipDto::toEntity).collect(Collectors.toList()) : null)
.build(); .build();
log.debug("Event 데이터 처리 완료: Event ID = {}", dto.getEventId()); log.debug("Event 데이터 처리 완료: Event ID = {}", dto.getEventID());
return entity; return entity;
} }

파일 보기

@ -1,20 +1,24 @@
package com.snp.batch.jobs.event.batch.reader; package com.snp.batch.jobs.event.batch.reader;
import com.snp.batch.common.batch.reader.BaseApiReader; import com.snp.batch.common.batch.reader.BaseApiReader;
import com.snp.batch.jobs.event.batch.dto.EventDto; import com.snp.batch.jobs.event.batch.dto.*;
import com.snp.batch.jobs.event.batch.dto.EventResponse; import com.snp.batch.jobs.event.batch.dto.EventDetailDto;
import com.snp.batch.jobs.event.batch.entity.EventDetailEntity;
import com.snp.batch.jobs.shipdetail.batch.dto.ShipDetailComparisonData;
import com.snp.batch.service.BatchDateService; import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.LocalDate;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
@Slf4j @Slf4j
public class EventDataReader extends BaseApiReader<EventDto> { public class EventDataReader extends BaseApiReader<EventDetailDto> {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final BatchDateService batchDateService; // BatchDateService 필드 추가 private final BatchDateService batchDateService; // BatchDateService 필드 추가
@ -22,6 +26,7 @@ public class EventDataReader extends BaseApiReader<EventDto> {
super(webClient); super(webClient);
this.jdbcTemplate = jdbcTemplate; this.jdbcTemplate = jdbcTemplate;
this.batchDateService = batchDateService; this.batchDateService = batchDateService;
enableChunkMode(); // Chunk 모드 활성화
} }
@Override @Override
@ -33,27 +38,138 @@ public class EventDataReader extends BaseApiReader<EventDto> {
protected String getApiPath() { protected String getApiPath() {
return "/MaritimeWCF/MaritimeAndTradeEventsService.svc/RESTFul/GetEventListByEventChangeDateRange"; return "/MaritimeWCF/MaritimeAndTradeEventsService.svc/RESTFul/GetEventListByEventChangeDateRange";
} }
protected String getEventDetailApiPath(){
return "/MaritimeWCF/MaritimeAndTradeEventsService.svc/RESTFul/GetEventDataByEventID";
}
protected String getApiKey() {return "EVENT_IMPORT_JOB";} protected String getApiKey() {return "EVENT_IMPORT_JOB";}
@Override // 배치 처리 상태
protected List<EventDto> fetchDataFromApi() { private List<Long> eventIds;
try { // DB 해시값을 저장할
log.info("Event API 호출 시작"); private int currentBatchIndex = 0;
EventResponse response = callEventApiWithBatch(); private final int batchSize = 1;
@Override
protected void resetCustomState() {
this.currentBatchIndex = 0;
this.eventIds = null;
}
@Override
protected void beforeFetch(){
// 1. 기간내 기록된 Event List 조회 (API 요청)
log.info("Event API 호출");
EventResponse response = callEventApiWithBatch();
// 2-1. Event List 에서 EventID List 추출
// TODO: 2-2. Event List 에서 Map<EventId,Map<StartDate,EndDate>> 추출
eventIds = extractEventIdList(response);
log.info("EvnetId List 추출 완료 : {} 개", eventIds.size());
updateApiCallStats(eventIds.size(), 0);
}
@Override
protected List<EventDetailDto> fetchNextBatch() throws Exception {
// 3. EventID List Event Detail 조회 (API요청) : 청크단위 실행
// 모든 배치 처리 완료 확인
if (eventIds == null || currentBatchIndex >= eventIds.size()) {
return null; // Job 종료
}
// 현재 배치의 시작/ 인덱스 계산
int startIndex = currentBatchIndex;
int endIndex = Math.min(currentBatchIndex + batchSize, eventIds.size());
// 현재 배치의 IMO 번호 추출 (100개)
List<Long> currentBatch = eventIds.subList(startIndex, endIndex);
int currentBatchNumber = (currentBatchIndex / batchSize) + 1;
int totalBatches = (int) Math.ceil((double) eventIds.size() / batchSize);
log.info("[{}] 배치 {}/{} 처리 중 (Event ID : {} 개)...",
getReaderName(), currentBatchNumber, totalBatches, currentBatch.size());
try {
// API 호출
EventDetailResponse response = callEventDetailApiWithBatch(currentBatch.get(0));
// 다음 배치로 인덱스 이동
currentBatchIndex = endIndex;
List<EventDetailDto> eventDetailList = new ArrayList<>();
// TODO: getEventDetailDto에 Map<EventId,Map<StartDate,EndDate>> 데이터 세팅
eventDetailList.add(response.getEventDetailDto());
// 응답 처리
if (response != null && response.getEventDetailDto() != null) {
log.info("[{}] 배치 {}/{} 완료: {} 건 조회",
getReaderName(), currentBatchNumber, totalBatches, eventDetailList.size());
// API 호출 통계 업데이트
updateApiCallStats(totalBatches, currentBatchNumber);
// API 과부하 방지 (다음 배치 0.5초 대기)
if (currentBatchIndex < eventIds.size()) {
Thread.sleep(500);
}
return eventDetailList;
if (response != null && response.getMaritimeEvents() != null) {
log.info("API 응답 성공: 총 {} 개의 Event 데이터 수신", response.getEventCount());
return response.getMaritimeEvents();
} else { } else {
log.warn("API 응답이 null이거나 Event 데이터가 없습니다"); log.warn("[{}] 배치 {}/{} 응답 없음",
return new ArrayList<>(); getReaderName(), currentBatchNumber, totalBatches);
// API 호출 통계 업데이트 (실패도 카운트)
updateApiCallStats(totalBatches, currentBatchNumber);
return Collections.emptyList();
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Event API 호출 실패", e); log.error("[{}] 배치 {}/{} 처리 중 오류: {}",
log.error("에러 메시지: {}", e.getMessage()); getReaderName(), currentBatchNumber, totalBatches, e.getMessage(), e);
return new ArrayList<>();
// 오류 발생 시에도 다음 배치로 이동 (부분 실패 허용)
currentBatchIndex = endIndex;
// 리스트 반환 (Job 계속 진행)
return Collections.emptyList();
} }
}
@Override
protected void afterFetch(List<EventDetailDto> data) {
int totalBatches = (int) Math.ceil((double) eventIds.size() / batchSize);
try{
if (data == null) {
// 3. 배치 성공 상태 업데이트 (트랜잭션 커밋 직전에 실행)
LocalDate successDate = LocalDate.now(); // 현재 배치 실행 시점의 날짜 (Reader의 toDay와 동일한 )
batchDateService.updateLastSuccessDate(getApiKey(), successDate);
log.info("batch_last_execution update 완료 : {}", getApiKey());
log.info("[{}] 전체 {} 개 배치 처리 완료", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 Event ID에 대한 API 호출 종료",
getReaderName(), eventIds.size());
}
}catch (Exception e){
log.info("[{}] 전체 {} 개 배치 처리 실패", getReaderName(), totalBatches);
log.info("[{}] 총 {} 개의 Event ID에 대한 API 호출 종료",
getReaderName(), eventIds.size());
}
}
private List<Long> extractEventIdList(EventResponse response) {
if (response.getMaritimeEvents() == null) {
return Collections.emptyList();
}
return response.getMaritimeEvents() .stream()
// ShipDto 객체에서 imoNumber 필드 (String 타입) 추출
.map(EventDto::getEventId)
// IMO 번호가 null이 아닌 경우만 필터링 (선택 사항이지만 안전성을 위해)
.filter(eventId -> eventId != null)
// 추출된 String imoNumber들을 List<String>으로 수집
.collect(Collectors.toList());
} }
private EventResponse callEventApiWithBatch() { private EventResponse callEventApiWithBatch() {
@ -77,4 +193,18 @@ public class EventDataReader extends BaseApiReader<EventDto> {
.block(); .block();
} }
private EventDetailResponse callEventDetailApiWithBatch(Long eventId) {
String url = getEventDetailApiPath();
log.info("[{}] API 호출: {}", getReaderName(), url);
return webClient.get()
.uri(url, uriBuilder -> uriBuilder
// 맵에서 파라미터 값을 동적으로 가져와 세팅
.queryParam("eventID", eventId)
.build())
.retrieve()
.bodyToMono(EventDetailResponse.class)
.block();
}
} }

파일 보기

@ -1,9 +1,15 @@
package com.snp.batch.jobs.event.batch.repository; package com.snp.batch.jobs.event.batch.repository;
import com.snp.batch.jobs.event.batch.entity.EventEntity; import com.snp.batch.jobs.event.batch.entity.CargoEntity;
import com.snp.batch.jobs.event.batch.entity.EventDetailEntity;
import com.snp.batch.jobs.event.batch.entity.HumanCasualtyEntity;
import com.snp.batch.jobs.event.batch.entity.RelationshipEntity;
import java.util.List; import java.util.List;
public interface EventRepository { public interface EventRepository {
void saveEventAll(List<EventEntity> items); void saveEventAll(List<EventDetailEntity> items);
void saveCargoAll(List<CargoEntity> items);
void saveHumanCasualtyAll(List<HumanCasualtyEntity> items);
void saveRelationshipAll(List<RelationshipEntity> items);
} }

파일 보기

@ -1,7 +1,12 @@
package com.snp.batch.jobs.event.batch.repository; package com.snp.batch.jobs.event.batch.repository;
import com.snp.batch.common.batch.repository.BaseJdbcRepository; import com.snp.batch.common.batch.repository.BaseJdbcRepository;
import com.snp.batch.jobs.event.batch.entity.EventEntity; import com.snp.batch.jobs.event.batch.entity.CargoEntity;
import com.snp.batch.jobs.event.batch.entity.EventDetailEntity;
import com.snp.batch.jobs.event.batch.entity.HumanCasualtyEntity;
import com.snp.batch.jobs.event.batch.entity.RelationshipEntity;
import com.snp.batch.jobs.shipdetail.batch.entity.GroupBeneficialOwnerHistoryEntity;
import com.snp.batch.jobs.shipdetail.batch.repository.ShipDetailSql;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.RowMapper;
@ -13,7 +18,7 @@ import java.util.List;
@Slf4j @Slf4j
@Repository("EventRepository") @Repository("EventRepository")
public class EventRepositoryImpl extends BaseJdbcRepository<EventEntity, Long> implements EventRepository { public class EventRepositoryImpl extends BaseJdbcRepository<EventDetailEntity, Long> implements EventRepository {
public EventRepositoryImpl(JdbcTemplate jdbcTemplate) { public EventRepositoryImpl(JdbcTemplate jdbcTemplate) {
super(jdbcTemplate); super(jdbcTemplate);
@ -25,12 +30,12 @@ public class EventRepositoryImpl extends BaseJdbcRepository<EventEntity, Long> i
} }
@Override @Override
protected RowMapper<EventEntity> getRowMapper() { protected RowMapper<EventDetailEntity> getRowMapper() {
return null; return null;
} }
@Override @Override
protected Long extractId(EventEntity entity) { protected Long extractId(EventDetailEntity entity) {
return null; return null;
} }
@ -42,7 +47,7 @@ public class EventRepositoryImpl extends BaseJdbcRepository<EventEntity, Long> i
@Override @Override
protected String getUpdateSql() { protected String getUpdateSql() {
return """ return """
INSERT INTO snp_data.event ( INSERT INTO snp_data.event_detail (
Event_ID, Incident_ID, IHSLRorIMOShipNo, Vessel_Name, Vessel_Type, Event_ID, Incident_ID, IHSLRorIMOShipNo, Vessel_Name, Vessel_Type,
Event_Type, Significance, Headline, Location_Name, Event_Type, Significance, Headline, Location_Name,
Published_Date, Event_Start_Date, Event_End_Date, batch_flag Published_Date, Event_Start_Date, Event_End_Date, batch_flag
@ -69,48 +74,168 @@ public class EventRepositoryImpl extends BaseJdbcRepository<EventEntity, Long> i
} }
@Override @Override
protected void setInsertParameters(PreparedStatement ps, EventEntity entity) throws Exception { protected void setInsertParameters(PreparedStatement ps, EventDetailEntity entity) throws Exception {
} }
@Override
protected void setUpdateParameters(PreparedStatement ps, EventEntity entity) throws Exception {
int idx = 1;
ps.setLong(idx++, entity.getEventId());
ps.setLong(idx++, entity.getIncidentId());
ps.setString(idx++, entity.getIhslRorImoShipNo());
ps.setString(idx++, entity.getVesselName());
ps.setString(idx++, entity.getVesselType());
ps.setString(idx++, entity.getEventType());
ps.setString(idx++, entity.getSignificance());
ps.setString(idx++, entity.getHeadline());
ps.setString(idx++, entity.getLocationName());
ps.setString(idx++, entity.getPublishedDate());
ps.setString(idx++, entity.getStartDate());
ps.setString(idx++, entity.getEndDate());
}
@Override @Override
protected String getEntityName() { protected String getEntityName() {
return "EventEntity"; return "EventDetailEntity";
} }
@Override @Override
public void saveEventAll(List<EventEntity> items) { public void saveEventAll(List<EventDetailEntity> items) {
if (items == null || items.isEmpty()) { String entityName = "EventDetailEntity";
return; String sql = EventSql.getEventDetailUpdateSql();
}
jdbcTemplate.batchUpdate(getUpdateSql(), items, items.size(), jdbcTemplate.batchUpdate(sql, items, items.size(),
(ps, entity) -> { (ps, entity) -> {
try { try {
setUpdateParameters(ps, entity); setUpdateParameters(ps, (EventDetailEntity) entity);
} catch (Exception e) { } catch (Exception e) {
log.error("배치 수정 파라미터 설정 실패", e); log.error("배치 수정 파라미터 설정 실패", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
log.info("{} 전체 저장 완료: 수정={} 건", getEntityName(), items.size()); log.info("{} 배치 삽입 완료: {} 건", entityName, items.size());
}
@Override
public void saveCargoAll(List<CargoEntity> items) {
String entityName = "CargoEntity";
String sql = EventSql.getEventCargoSql();
jdbcTemplate.batchUpdate(sql, items, items.size(),
(ps, entity) -> {
try {
setCargoInsertParameters(ps, (CargoEntity) entity);
} catch (Exception e) {
log.error("배치 삽입 파라미터 설정 실패 - " + entityName, e);
throw new RuntimeException(e);
}
});
log.info("{} 배치 삽입 완료: {} 건", entityName, items.size());
}
@Override
public void saveHumanCasualtyAll(List<HumanCasualtyEntity> items) {
String entityName = "HumanCasualtyEntity";
String sql = EventSql.getEventHumanCasualtySql();
jdbcTemplate.batchUpdate(sql, items, items.size(),
(ps, entity) -> {
try {
setHumanCasualtyInsertParameters(ps, (HumanCasualtyEntity) entity);
} catch (Exception e) {
log.error("배치 삽입 파라미터 설정 실패 - " + entityName, e);
throw new RuntimeException(e);
}
});
log.info("{} 배치 삽입 완료: {} 건", entityName, items.size());
}
@Override
public void saveRelationshipAll(List<RelationshipEntity> items) {
String entityName = "RelationshipEntity";
String sql = EventSql.getEventRelationshipSql();
jdbcTemplate.batchUpdate(sql, items, items.size(),
(ps, entity) -> {
try {
setRelationshipInsertParameters(ps, (RelationshipEntity) entity);
} catch (Exception e) {
log.error("배치 삽입 파라미터 설정 실패 - " + entityName, e);
throw new RuntimeException(e);
}
});
log.info("{} 배치 삽입 완료: {} 건", entityName, items.size());
}
@Override
protected void setUpdateParameters(PreparedStatement ps, EventDetailEntity entity) throws Exception {
int idx = 1;
ps.setObject(idx++, entity.getEventID()); // event_id
ps.setObject(idx++, entity.getIncidentID()); // incident_id (누락됨)
ps.setObject(idx++, entity.getIhslrOrImoShipNo()); // ihslrorimoshipno (누락됨)
ps.setObject(idx++, entity.getPublishedDate()); // published_date (누락됨)
ps.setString(idx++, entity.getAttemptedBoarding()); // attempted_boarding
ps.setString(idx++, entity.getCargoLoadingStatusCode());// cargo_loading_status_code
ps.setString(idx++, entity.getCasualtyAction()); // casualty_action
ps.setString(idx++, entity.getCasualtyZone()); // casualty_zone
ps.setString(idx++, entity.getCasualtyZoneCode()); // casualty_zone_code
ps.setString(idx++, entity.getComponent2()); // component2
// 11~20
ps.setString(idx++, entity.getCountryCode()); // country_code
ps.setObject(idx++, entity.getDateOfBuild()); // date_of_build (Integer)
ps.setString(idx++, entity.getDescription()); // description
ps.setString(idx++, entity.getEnvironmentLocation()); // environment_location
ps.setString(idx++, entity.getLocationName()); // location_name (누락됨)
ps.setObject(idx++, entity.getMarsdenGridReference()); // marsden_grid_reference (Integer)
ps.setString(idx++, entity.getTownName()); // town_name
ps.setString(idx++, entity.getEventType()); // event_type (누락됨)
ps.setString(idx++, entity.getEventTypeDetail()); // event_type_detail
ps.setObject(idx++, entity.getEventTypeDetailID()); // event_type_detail_id (Integer)
// 21~30
ps.setObject(idx++, entity.getEventTypeID()); // event_type_id (Integer)
ps.setString(idx++, entity.getFiredUpon()); // fired_upon
ps.setString(idx++, entity.getHeadline()); // headline (누락됨)
ps.setObject(idx++, entity.getLdtAtTime()); // ldt_at_time (Integer)
ps.setString(idx++, entity.getSignificance()); // significance (누락됨)
ps.setString(idx++, entity.getWeather()); // weather
ps.setString(idx++, entity.getPollutant()); // pollutant
ps.setObject(idx++, entity.getPollutantQuantity()); // pollutant_quantity (Double)
ps.setString(idx++, entity.getPollutantUnit()); // pollutant_unit
ps.setString(idx++, entity.getRegisteredOwnerCodeAtTime()); // registered_owner_code_at_time
// 31~40
ps.setString(idx++, entity.getRegisteredOwnerAtTime()); // registered_owner_at_time
ps.setString(idx++, entity.getRegisteredOwnerCountryCodeAtTime()); // registered_owner_country_code_at_time
ps.setString(idx++, entity.getRegisteredOwnerCountryAtTime()); // registered_owner_country_at_time
ps.setObject(idx++, entity.getVesselDWT()); // vessel_dwt (Integer)
ps.setString(idx++, entity.getVesselFlagCode()); // vessel_flag_code
ps.setString(idx++, entity.getVesselFlagDecode()); // vessel_flag_decode (누락됨)
ps.setObject(idx++, entity.getVesselGT()); // vessel_gt (Integer)
ps.setString(idx++, entity.getVesselName()); // vessel_name (누락됨)
ps.setString(idx++, entity.getVesselType()); // vessel_type (누락됨)
ps.setString(idx++, entity.getVesselTypeDecode()); // vessel_type_decode
}
private void setCargoInsertParameters(PreparedStatement ps, CargoEntity entity)throws Exception{
int idx = 1;
// INSERT 필드
ps.setObject(idx++, entity.getEventID());
ps.setString(idx++, entity.getSequence());
ps.setString(idx++, entity.getIhslrOrImoShipNo());
ps.setString(idx++, entity.getType());
ps.setObject(idx++, entity.getQuantity()); // quantity 필드 (Entity에 없을 경우 null 처리)
ps.setString(idx++, entity.getUnitShort()); // unit_short 필드
ps.setString(idx++, entity.getUnit());
ps.setString(idx++, entity.getCargoDamage());
ps.setString(idx++, entity.getDangerous());
ps.setString(idx++, entity.getText());
}
private void setHumanCasualtyInsertParameters(PreparedStatement ps, HumanCasualtyEntity entity)throws Exception{
int idx = 1;
ps.setObject(idx++, entity.getEventID());
ps.setString(idx++, entity.getScope());
ps.setString(idx++, entity.getType());
ps.setString(idx++, entity.getQualifier());
ps.setObject(idx++, entity.getCount());
}
private void setRelationshipInsertParameters(PreparedStatement ps, RelationshipEntity entity)throws Exception{
int idx = 1;
ps.setString(idx++, entity.getIncidentID());
ps.setObject(idx++, entity.getEventID());
ps.setString(idx++, entity.getRelationshipType());
ps.setString(idx++, entity.getRelationshipTypeCode());
ps.setObject(idx++, entity.getEventID2());
ps.setString(idx++, entity.getEventType());
ps.setString(idx++, entity.getEventTypeCode());
} }
private static void setStringOrNull(PreparedStatement ps, int index, String value) throws Exception { private static void setStringOrNull(PreparedStatement ps, int index, String value) throws Exception {

파일 보기

@ -0,0 +1,124 @@
package com.snp.batch.jobs.event.batch.repository;
public class EventSql {
public static String getEventDetailUpdateSql(){
return """
INSERT INTO snp_data.event_detail (
event_id, incident_id, ihslrorimoshipno, published_date,
attempted_boarding, cargo_loading_status_code, casualty_action,
casualty_zone, casualty_zone_code, component2, country_code,
date_of_build, description, environment_location, location_name,
marsden_grid_reference, town_name, event_type, event_type_detail,
event_type_detail_id, event_type_id, fired_upon, headline,
ldt_at_time, significance, weather, pollutant, pollutant_quantity,
pollutant_unit, registered_owner_code_at_time, registered_owner_at_time,
registered_owner_country_code_at_time, registered_owner_country_at_time,
vessel_dwt, vessel_flag_code, vessel_flag_decode, vessel_gt,
vessel_name, vessel_type, vessel_type_decode
)
VALUES (
?, ?, ?, ?::timestamptz, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
ON CONFLICT (event_id)
DO UPDATE SET
incident_id = EXCLUDED.incident_id,
ihslrorimoshipno = EXCLUDED.ihslrorimoshipno,
published_date = EXCLUDED.published_date,
attempted_boarding = EXCLUDED.attempted_boarding,
cargo_loading_status_code = EXCLUDED.cargo_loading_status_code,
casualty_action = EXCLUDED.casualty_action,
casualty_zone = EXCLUDED.casualty_zone,
casualty_zone_code = EXCLUDED.casualty_zone_code,
component2 = EXCLUDED.component2,
country_code = EXCLUDED.country_code,
date_of_build = EXCLUDED.date_of_build,
description = EXCLUDED.description,
environment_location = EXCLUDED.environment_location,
location_name = EXCLUDED.location_name,
marsden_grid_reference = EXCLUDED.marsden_grid_reference,
town_name = EXCLUDED.town_name,
event_type = EXCLUDED.event_type,
event_type_detail = EXCLUDED.event_type_detail,
event_type_detail_id = EXCLUDED.event_type_detail_id,
event_type_id = EXCLUDED.event_type_id,
fired_upon = EXCLUDED.fired_upon,
headline = EXCLUDED.headline,
ldt_at_time = EXCLUDED.ldt_at_time,
significance = EXCLUDED.significance,
weather = EXCLUDED.weather,
pollutant = EXCLUDED.pollutant,
pollutant_quantity = EXCLUDED.pollutant_quantity,
pollutant_unit = EXCLUDED.pollutant_unit,
registered_owner_code_at_time = EXCLUDED.registered_owner_code_at_time,
registered_owner_at_time = EXCLUDED.registered_owner_at_time,
registered_owner_country_code_at_time = EXCLUDED.registered_owner_country_code_at_time,
registered_owner_country_at_time = EXCLUDED.registered_owner_country_at_time,
vessel_dwt = EXCLUDED.vessel_dwt,
vessel_flag_code = EXCLUDED.vessel_flag_code,
vessel_flag_decode = EXCLUDED.vessel_flag_decode,
vessel_gt = EXCLUDED.vessel_gt,
vessel_name = EXCLUDED.vessel_name,
vessel_type = EXCLUDED.vessel_type,
vessel_type_decode = EXCLUDED.vessel_type_decode,
batch_flag = 'N';
""";
}
public static String getEventCargoSql(){
return """
INSERT INTO snp_data.event_cargo (
event_id, "sequence", ihslrorimoshipno, "type", quantity,
unit_short, unit, cargo_damage, dangerous, "text"
)
VALUES (
?, ?, ?, ?, ?,
?, ?, ?, ?, ?
)
ON CONFLICT (event_id, ihslrorimoshipno, "type", "sequence")
DO UPDATE SET
quantity = EXCLUDED.quantity,
unit_short = EXCLUDED.unit_short,
unit = EXCLUDED.unit,
cargo_damage = EXCLUDED.cargo_damage,
dangerous = EXCLUDED.dangerous,
"text" = EXCLUDED."text",
batch_flag = 'N';
""";
}
public static String getEventRelationshipSql(){
return """
INSERT INTO snp_data.event_relationship (
incident_id, event_id, relationship_type, relationship_type_code,
event_id_2, event_type, event_type_code
)
VALUES (
?, ?, ?, ?,
?, ?, ?
)
ON CONFLICT (incident_id, event_id, event_id_2, event_type_code, relationship_type_code)
DO UPDATE SET
relationship_type = EXCLUDED.relationship_type,
event_type = EXCLUDED.event_type,
batch_flag = 'N';
""";
}
public static String getEventHumanCasualtySql(){
return """
INSERT INTO snp_data.event_humancasualty (
event_id, "scope", "type", qualifier, "count"
)
VALUES (
?, ?, ?, ?, ?
)
ON CONFLICT (event_id, "scope", "type", qualifier)
DO UPDATE SET
"count" = EXCLUDED."count",
batch_flag = 'N';
""";
}
}

파일 보기

@ -1,35 +1,49 @@
package com.snp.batch.jobs.event.batch.writer; package com.snp.batch.jobs.event.batch.writer;
import com.snp.batch.common.batch.writer.BaseWriter; import com.snp.batch.common.batch.writer.BaseWriter;
import com.snp.batch.jobs.event.batch.entity.EventEntity; import com.snp.batch.jobs.event.batch.entity.EventDetailEntity;
import com.snp.batch.jobs.event.batch.repository.EventRepository; import com.snp.batch.jobs.event.batch.repository.EventRepository;
import com.snp.batch.service.BatchDateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDate;
import java.util.List; import java.util.List;
@Slf4j @Slf4j
@Component @Component
public class EventDataWriter extends BaseWriter<EventEntity> { public class EventDataWriter extends BaseWriter<EventDetailEntity> {
private final EventRepository eventRepository; private final EventRepository eventRepository;
private final BatchDateService batchDateService; // BatchDateService 필드 추가
protected String getApiKey() {return "EVENT_IMPORT_JOB";} protected String getApiKey() {return "EVENT_IMPORT_JOB";}
public EventDataWriter(EventRepository eventRepository, BatchDateService batchDateService) { public EventDataWriter(EventRepository eventRepository) {
super("EventRepository"); super("EventRepository");
this.eventRepository = eventRepository; this.eventRepository = eventRepository;
this.batchDateService = batchDateService;
} }
@Override @Override
protected void writeItems(List<EventEntity> items) throws Exception { protected void writeItems(List<EventDetailEntity> items) throws Exception {
eventRepository.saveEventAll(items);
log.info("Event 저장 완료: 수정={} 건", items.size());
// 배치 성공 상태 업데이트 (트랜잭션 커밋 직전에 실행) if (CollectionUtils.isEmpty(items)) {
LocalDate successDate = LocalDate.now(); return;
batchDateService.updateLastSuccessDate(getApiKey(), successDate); }
log.info("batch_last_execution update 완료 : {}", getApiKey());
// 1. EventDetail 메인 데이터 저장
eventRepository.saveEventAll(items);
for (EventDetailEntity event : items) {
// 2. CargoEntityList Save
if (!CollectionUtils.isEmpty(event.getCargoes())) {
eventRepository.saveCargoAll(event.getCargoes());
}
// 3. HumanCasualtyEntityList Save
if (!CollectionUtils.isEmpty(event.getHumanCasualties())) {
eventRepository.saveHumanCasualtyAll(event.getHumanCasualties());
}
// 4. RelationshipEntityList Save
if (!CollectionUtils.isEmpty(event.getRelationships())) {
eventRepository.saveRelationshipAll(event.getRelationships());
}
}
log.info("Batch Write 완료: {} 건의 Event 처리됨", items.size());
} }
} }