Skip to content

Commit

Permalink
MODSOURCE-727 - Provide central tenant id in DI payload to ensure ins…
Browse files Browse the repository at this point in the history
…tance update on central tenant after instance hrid has been set to record (#585)
  • Loading branch information
RuslanLavrov authored Dec 3, 2023
1 parent 9ae1632 commit 66ddcba
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class ParsedRecordChunksKafkaHandler implements AsyncRecordHandler<String
private static final Logger LOGGER = LogManager.getLogger();

public static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId";
private static final String RECORD_ID_HEADER = "recordId";
private static final String CHUNK_ID_HEADER = "chunkId";
private static final String USER_ID_HEADER = "userId";
private static final AtomicInteger chunkCounter = new AtomicInteger();
Expand Down Expand Up @@ -77,7 +76,6 @@ public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
OkapiConnectionParams okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
String tenantId = okapiConnectionParams.getTenantId();
String jobExecutionId = extractHeaderValue(JOB_EXECUTION_ID_HEADER, targetRecord.headers());
String recordId = extractHeaderValue(RECORD_ID_HEADER, targetRecord.headers());
String chunkId = extractHeaderValue(CHUNK_ID_HEADER, targetRecord.headers());
String userId = extractHeaderValue(USER_ID_HEADER, targetRecord.headers());
String key = targetRecord.key();
Expand All @@ -86,14 +84,14 @@ public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
DataImportEventPayload eventPayload = Json.decodeValue(event.getEventPayload(), DataImportEventPayload.class);

try {
LOGGER.debug("handle:: RecordCollection has been received with event: '{}', jobExecutionId '{}', chunkId: '{}', starting processing... chunkNumber '{}'-'{}' with recordId: '{}'' ",
eventPayload.getEventType(), jobExecutionId, chunkId, chunkNumber, key, recordId);
LOGGER.debug("handle:: RecordCollection has been received with event: '{}', jobExecutionId '{}', chunkId: '{}', starting processing... chunkNumber '{}'-'{}'",
eventPayload.getEventType(), jobExecutionId, chunkId, chunkNumber, key);
setUserMetadata(recordCollection, userId);
return recordService.saveRecords(recordCollection, tenantId)
.compose(recordsBatchResponse -> sendBackRecordsBatchResponse(recordsBatchResponse, kafkaHeaders, tenantId, chunkNumber, eventPayload.getEventType(), targetRecord));
} catch (Exception e) {
LOGGER.warn("handle:: RecordCollection processing has failed with errors with event: '{}', jobExecutionId '{}', chunkId: '{}', chunkNumber '{}'-'{}' with recordId: '{}' ",
eventPayload.getEventType(), jobExecutionId, chunkId, chunkNumber, key, recordId);
LOGGER.warn("handle:: RecordCollection processing has failed with errors with event: '{}', jobExecutionId '{}', chunkId: '{}', chunkNumber '{}'-'{}'",
eventPayload.getEventType(), jobExecutionId, chunkId, chunkNumber, key);
return Future.failedFuture(e);
}
}
Expand Down Expand Up @@ -132,10 +130,9 @@ private Future<String> sendBackRecordsBatchResponse(RecordsBatchResponse records
.<Void>mapEmpty()
.eventually(x -> producer.close())
.onSuccess(res -> {
String recordId = extractHeaderValue(RECORD_ID_HEADER, commonRecord.headers());
String chunkId = extractHeaderValue(CHUNK_ID_HEADER, commonRecord.headers());
LOGGER.debug("sendBackRecordsBatchResponse:: RecordCollection processing has been completed with response sent... event: '{}', chunkId: '{}', chunkNumber '{}'-'{}' with recordId: '{}'",
eventType, chunkId, chunkNumber, targetRecord.key(), recordId);
LOGGER.debug("sendBackRecordsBatchResponse:: RecordCollection processing has been completed with response sent... event: '{}', chunkId: '{}', chunkNumber '{}'-'{}'",
eventType, chunkId, chunkNumber, targetRecord.key());
writePromise.complete(targetRecord.key());
})
.onFailure(err -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ private Future<Record> saveRecordForCentralTenant(DataImportEventPayload dataImp
record, String jobExecutionId) {
String centralTenantId = dataImportEventPayload.getContext().get(CENTRAL_TENANT_ID);
dataImportEventPayload.getContext().remove(CENTRAL_TENANT_INSTANCE_UPDATED_FLAG);
dataImportEventPayload.getContext().remove(CENTRAL_TENANT_ID);
LOG.info("handle:: Processing AbstractPostProcessingEventHandler - saving record by jobExecutionId: {} for the central tenantId: {}", jobExecutionId, centralTenantId);
if (centralTenantId != null) {
return snapshotService.copySnapshotToOtherTenant(record.getSnapshotId(), dataImportEventPayload.getTenant(), centralTenantId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void shouldSetInstanceIdToRecord(TestContext context) {
}

@Test
public void shouldProceedIfConosrtiumTrackExists(TestContext context) {
public void shouldProceedIfConsortiumTrackExists(TestContext context) {
MockitoAnnotations.openMocks(this);

Async async = context.async();
Expand All @@ -213,6 +213,7 @@ public void shouldProceedIfConosrtiumTrackExists(TestContext context) {

String expectedInstanceId = UUID.randomUUID().toString();
String expectedHrId = UUID.randomUUID().toString();
String expectedCentralTenantId = "centralTenantId";

JsonObject instance = createExternalEntity(expectedInstanceId, expectedHrId);

Expand All @@ -221,7 +222,7 @@ public void shouldProceedIfConosrtiumTrackExists(TestContext context) {
payloadContext.put(MARC_BIBLIOGRAPHIC.value(), Json.encode(record));
payloadContext.put("recordId", record.getId());
payloadContext.put(CENTRAL_TENANT_INSTANCE_UPDATED_FLAG, "true");
payloadContext.put(CENTRAL_TENANT_ID, "centralTenantId");
payloadContext.put(CENTRAL_TENANT_ID, expectedCentralTenantId);

DataImportEventPayload dataImportEventPayload =
createDataImportEventPayload(payloadContext, DI_INVENTORY_INSTANCE_CREATED_READY_FOR_POST_PROCESSING);
Expand All @@ -238,7 +239,7 @@ public void shouldProceedIfConosrtiumTrackExists(TestContext context) {
}
verify(mockedRecordService, times(1)).updateParsedRecord(any(), anyString());
context.assertNull(payload.getContext().get(CENTRAL_TENANT_INSTANCE_UPDATED_FLAG));
context.assertNull(payload.getContext().get(CENTRAL_TENANT_ID));
context.assertEquals(expectedCentralTenantId, payload.getContext().get(CENTRAL_TENANT_ID));
async.complete();
});
}
Expand Down

0 comments on commit 66ddcba

Please sign in to comment.