Skip to content

Commit

Permalink
MODSOURMAN-1246: DI_MARC_FOR_UPDATE_RECEIVED was added and a few logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Aliaksandr-Fedasiuk committed Nov 14, 2024
1 parent cb785df commit ccbc8f7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,36 +54,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_COMPLETED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_AUTHORITY_NOT_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_AUTHORITY_UPDATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_HOLDING_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_HOLDING_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_HOLDING_NOT_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_HOLDING_UPDATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_INSTANCE_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_INSTANCE_NOT_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_ITEM_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_ITEM_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_ITEM_NOT_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_ITEM_UPDATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVOICE_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_LOG_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_LOG_SRS_MARC_BIB_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ORDER_CREATED_READY_FOR_POST_PROCESSING;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MODIFIED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_NOT_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_UPDATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_UPDATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.*;
import static org.folio.services.RecordsPublishingServiceImpl.RECORD_ID_HEADER;
import static org.folio.services.util.EventHandlingUtil.constructModuleName;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE;
Expand Down Expand Up @@ -158,6 +129,7 @@ public List<String> getEvents() {
DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED.value(),
DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED.value(),
DI_SRS_MARC_HOLDINGS_RECORD_MATCHED.value(),
DI_MARC_FOR_UPDATE_RECEIVED.value(),
DI_INVENTORY_INSTANCE_CREATED.value(),
DI_INVENTORY_INSTANCE_UPDATED.value(),
DI_INVENTORY_INSTANCE_NOT_MATCHED.value(),
Expand Down Expand Up @@ -245,11 +217,11 @@ private Flowable<Pair<Optional<Bundle>, Collection<BatchableJournalRecord>>> lis
String recordId = okapiConnectionParams.getHeaders().get(RECORD_ID_HEADER);
JournalEvent event = DatabindCodec.mapper().readValue(consumerRecord.value(), JournalEvent.class);

LOGGER.debug("handle:: Event was received with recordId: {} event type: {}", recordId, event.getEventType());
LOGGER.debug("listenKafkaEvents:: Event was received with recordId: {} event type: {}", recordId, event.getEventType());
// Successfully create and return a Bundle object containing the record and event details
return Optional.of(new Bundle(consumerRecord, event, okapiConnectionParams));
} catch (Exception e) {
LOGGER.error("Error processing Kafka event with exception: {}", e.getMessage());
LOGGER.error("listenKafkaEvents:: Error processing Kafka event with exception: {}", e.getMessage());
// Return empty Optional to skip this record and continue processing
return Optional.<Bundle>empty();
}
Expand Down Expand Up @@ -283,6 +255,7 @@ private Flowable<Pair<Optional<Bundle>, Collection<BatchableJournalRecord>>> lis
}

private Completable saveJournalRecords(ConnectableFlowable<Pair<Optional<Bundle>, Collection<BatchableJournalRecord>>> flowable) {
LOGGER.debug("saveJournalRecords:: starting to save records.");
Completable completable = flowable
// Filter out pairs with empty records
.filter(pair -> !pair.getRight().isEmpty())
Expand Down Expand Up @@ -333,9 +306,11 @@ private Completable saveJournalRecords(ConnectableFlowable<Pair<Optional<Bundle>
}

private Single<Collection<BatchableJournalRecord>> createJournalRecords(Bundle bundle) throws JsonProcessingException, JournalRecordMapperException {
LOGGER.debug("createJournalRecords :: start to handle bundle.");
DataImportEventPayloadWithoutCurrentNode eventPayload = bundle.event().getEventPayload();
String tenantId = bundle.okapiConnectionParams.getTenantId();
return AsyncResultSingle.toSingle(eventTypeHandlerSelector.getHandler(eventPayload).transform(batchJournalService.getJournalService(), eventPayload, tenantId),
return AsyncResultSingle.toSingle(eventTypeHandlerSelector.getHandler(eventPayload)
.transform(batchJournalService.getJournalService(), eventPayload, tenantId),
col -> col.stream().map(res -> new BatchableJournalRecord(res, tenantId)).toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public Future<String> handle(KafkaConsumerRecord<String, byte[]> record) {
}

private void saveCreatedRecordsInfoToDataImportLog(List<Record> storedRecords, String tenantId) {
LOGGER.debug("saveCreatedRecordsInfoToDataImportLog :: count: {}", storedRecords.size());
MappingRuleCacheKey cacheKey = new MappingRuleCacheKey(tenantId, storedRecords.get(0).getRecordType());

mappingRuleCache.get(cacheKey).onComplete(rulesAr -> {
Expand Down

0 comments on commit ccbc8f7

Please sign in to comment.