Skip to content

Commit

Permalink
MODDATAIMP-1065 The imported file is displayed in the 'Running' secti…
Browse files Browse the repository at this point in the history
…on even it has been imported and displayed in the logs (#770)

* send di error when token expired

* small refactor

* add logging

* add // TODO: possible wrong placement of logs for entity when cache error
  • Loading branch information
JavokhirAbdullayev authored Oct 8, 2024
1 parent 399c2e6 commit 2026301
Showing 1 changed file with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ public DataImportKafkaHandler(Vertx vertx, Storage storage, HttpClient client,
registerDataImportProcessingHandlers(storage, client);
}

// TODO: possible wrong placement of entity logs when cache error
private void sendPayloadWithDiError(DataImportEventPayload eventPayload) {
eventPayload.setEventType(DI_ERROR.value());
try (var eventPublisher = new KafkaEventPublisher(kafkaConfig, vertx, 100)) {
eventPublisher.publish(eventPayload);
var eventType = eventPayload.getEventType();
LOGGER.warn("publish:: {} send error for event: '{}' by jobExecutionId: '{}' ",
eventType + "_Producer",
eventType,
eventPayload.getJobExecutionId());
} catch (Exception e) {
LOGGER.error("Error closing kafka publisher: {}", e.getMessage());
}
}

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> record) {
try {
Expand All @@ -134,6 +149,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> record) {
Context context = EventHandlingUtil.constructContext(eventPayload.getTenant(), eventPayload.getToken(), eventPayload.getOkapiUrl());
String jobProfileSnapshotId = eventPayload.getContext().get(PROFILE_SNAPSHOT_ID_KEY);
profileSnapshotCache.get(jobProfileSnapshotId, context)
.onFailure(e -> sendPayloadWithDiError(eventPayload))
.toCompletionStage()
.thenCompose(snapshotOptional -> snapshotOptional
.map(profileSnapshot -> EventManager.handleEvent(eventPayload, profileSnapshot))
Expand Down

0 comments on commit 2026301

Please sign in to comment.