Skip to content

Commit

Permalink
Fixed handling of response deserialization error (#836)
Browse files Browse the repository at this point in the history
* Fixed response deserialization error handling

* Added handling of possible deserialization error
  • Loading branch information
RuslanLavrov authored Jan 9, 2024
1 parent 94df65b commit 5ae52c7
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 380 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
Expand Down Expand Up @@ -39,14 +40,19 @@ public DataImportInitKafkaHandler(@Autowired Vertx vertx,

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> record) {
List<KafkaHeader> kafkaHeaders = record.headers();
OkapiConnectionParams okapiParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
Event event = Json.decodeValue(record.value(), Event.class);
DataImportInitConfig initConfig = Json.decodeValue(event.getEventPayload(), DataImportInitConfig.class);
try {
List<KafkaHeader> kafkaHeaders = record.headers();
OkapiConnectionParams okapiParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
Event event = Json.decodeValue(record.value(), Event.class);
DataImportInitConfig initConfig = Json.decodeValue(event.getEventPayload(), DataImportInitConfig.class);

return jobExecutionProgressService.initializeJobExecutionProgress(initConfig.getJobExecutionId(), initConfig.getTotalRecords(), okapiParams.getTenantId())
.compose(p -> checkAndUpdateToInProgressState(initConfig.getJobExecutionId(), okapiParams))
.compose(p -> Future.succeededFuture(record.key()));
return jobExecutionProgressService.initializeJobExecutionProgress(initConfig.getJobExecutionId(), initConfig.getTotalRecords(), okapiParams.getTenantId())
.compose(p -> checkAndUpdateToInProgressState(initConfig.getJobExecutionId(), okapiParams))
.compose(p -> Future.succeededFuture(record.key()));
} catch (Exception e) {
LOGGER.warn("handle:: Error during processing event for import job progress initialization", e);
return Future.failedFuture(e);
}
}

private Future<JobExecution> checkAndUpdateToInProgressState(String jobExecutionId, OkapiConnectionParams params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
Expand Down Expand Up @@ -49,18 +50,23 @@ public DataImportJournalKafkaHandler(@Autowired Vertx vertx,

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> record) {
Promise<String> result = Promise.promise();
List<KafkaHeader> kafkaHeaders = record.headers();
OkapiConnectionParams okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
String recordId = okapiConnectionParams.getHeaders().get(RECORD_ID_HEADER);
Event event = new JsonObject(record.value()).mapTo(Event.class);
LOGGER.debug("handle:: Event was received with recordId: {} event type: {}", recordId, event.getEventType());
try {
Promise<String> result = Promise.promise();
List<KafkaHeader> kafkaHeaders = record.headers();
OkapiConnectionParams okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
String recordId = okapiConnectionParams.getHeaders().get(RECORD_ID_HEADER);
Event event = Json.decodeValue(record.value(), Event.class);
LOGGER.debug("handle:: Event was received with recordId: {} event type: {}", recordId, event.getEventType());

eventProcessedService.collectData(DATA_IMPORT_JOURNAL_KAFKA_HANDLER_UUID, event.getId(), okapiConnectionParams.getTenantId())
.onSuccess(res -> processJournalEvent(result, record, event, okapiConnectionParams.getTenantId()))
.onFailure(e -> processDeduplicationFailure(result, record, event, e));
eventProcessedService.collectData(DATA_IMPORT_JOURNAL_KAFKA_HANDLER_UUID, event.getId(), okapiConnectionParams.getTenantId())
.onSuccess(res -> processJournalEvent(result, record, event, okapiConnectionParams.getTenantId()))
.onFailure(e -> processDeduplicationFailure(result, record, event, e));

return result.future();
return result.future();
} catch (Exception e) {
LOGGER.warn("handle:: Error during processing event for data-import journal", e);
return Future.failedFuture(e);
}
}

private void processJournalEvent(Promise<String> result, KafkaConsumerRecord<String, String> record, Event event, String tenantId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> record) {
Event event = Json.decodeValue(record.value(), Event.class);
LOGGER.debug("handle:: Starting to handle of raw mark chunks from Kafka for event type: {}", event.getEventType());
try {
RawRecordsDto rawRecordsDto = new JsonObject(event.getEventPayload()).mapTo(RawRecordsDto.class);
RawRecordsDto rawRecordsDto = Json.decodeValue(event.getEventPayload(), RawRecordsDto.class);
if (!rawRecordsDto.getRecordsMetadata().getLast()) {
flowControlService.trackChunkReceivedEvent(okapiParams.getTenantId(), rawRecordsDto.getInitialRecords().size());
}
Expand Down Expand Up @@ -99,7 +99,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> record) {
}
});
} catch (Exception e) {
LOGGER.warn("handle:: Can't process kafka record: ", e);
LOGGER.warn("handle:: Can't process kafka record, jobExecutionId: {}", jobExecutionId, e);
return new FailedFuture<String>(e);
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> record) {
});
});
} catch (Exception e) {
LOGGER.warn("handle:: Can't process kafka record: ", e);
LOGGER.warn("handle:: Can't process kafka record, jobExecutionId: {}", jobExecutionId, e);
return new FailedFuture<String>(e);
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,20 @@ public void shouldReturnEmptyLinkingRulesOnEmptyApiResponseGetItemFromCache(Test
});
}

@Test
public void shouldReturnNullWhenMappingParamsDataResponseIsUnrecognized(TestContext context) {
Async async = context.async();
JsonObject invalidNoteTypes = new JsonObject()
.put("instanceNoteTypes", JsonArray.of(new JsonObject().put("invalidField", "value")));
WireMock.stubFor(get(INSTANCE_NOTE_TYPES_URL)
.willReturn(okJson(invalidNoteTypes.encode())));

mappingParametersProvider.get("1", okapiConnectionParams).onComplete(ar -> {
context.assertNull(ar.result());
async.complete();
});
}

/**
* Test that multiple requests to get the same item concurrently do not attempt to load mapping parameters concurrently.
* only one load should occur. All requests for the same cache key should return the same cache value.
Expand Down

0 comments on commit 5ae52c7

Please sign in to comment.