diff --git a/README.md b/README.md index 9b29d53cc..b76acd5ba 100644 --- a/README.md +++ b/README.md @@ -119,13 +119,13 @@ After setup, it is good to check logs in all related modules for errors. Data im There are another important properties - `number of partitions` for topics `DI_COMPLETED`, `DI_ERROR`, `DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED`, `DI_SRS_MARC_AUTHORITY_RECORD_CREATED`, `DI_SRS_MARC_HOLDING_RECORD_CREATED`, `DI_MARC_FOR_UPDATE_RECEIVED`, `DI_MARC_FOR_DELETE_RECEIVED`, -`DI_INCOMING_MARC_BIB_RECORD_PARSED`, `DI_INCOMING_EDIFACT_RECORD_PARSED` +`DI_INCOMING_MARC_BIB_RECORD_PARSED`, `DI_INCOMING_EDIFACT_RECORD_PARSED`, `DI_JOB_COMPLETED` and `DI_RAW_RECORDS_CHUNK_PARSED` which are created during tenant initialization, the values of which can be customized with `DI_COMPLETED_PARTITIONS`, `DI_ERROR_PARTITIONS`, `DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED_PARTITIONS`, `DI_SRS_MARC_AUTHORITY_RECORD_CREATED_PARTITIONS`, `DI_SRS_MARC_HOLDINGS_RECORD_CREATED_PARTITIONS`, `DI_MARC_FOR_UPDATE_RECEIVED_PARTITIONS`, `DI_MARC_FOR_DELETE_RECEIVED_PARTITIONS`, -`DI_INCOMING_MARC_BIB_RECORD_PARSED_PARTITIONS`, `DI_INCOMING_EDIFACT_RECORD_PARSED_PARTITIONS` +`DI_INCOMING_MARC_BIB_RECORD_PARSED_PARTITIONS`, `DI_INCOMING_EDIFACT_RECORD_PARSED_PARTITIONS`, `DI_JOB_COMPLETED_PARTITIONS` and `DI_RAW_RECORDS_CHUNK_PARSED_PARTITIONS` env variables respectively. Default value - `1`. diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/kafka/SRMKafkaTopicService.java b/mod-source-record-manager-server/src/main/java/org/folio/services/kafka/SRMKafkaTopicService.java index f875e7b22..f943948af 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/kafka/SRMKafkaTopicService.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/kafka/SRMKafkaTopicService.java @@ -40,6 +40,9 @@ public class SRMKafkaTopicService { @Value("${di_edifact_parsed.partitions}") private Integer diEdifactRecordParsedNumPartitions; + @Value("${di_job_completed.partitions}") + private Integer diJobCompletedNumPartitions; + public KafkaTopic[] createTopicObjects() { return new SRMKafkaTopic[] { new SRMKafkaTopic("DI_COMPLETED", diCompletedNumPartitions), @@ -51,7 +54,8 @@ public KafkaTopic[] createTopicObjects() { new SRMKafkaTopic("DI_MARC_FOR_DELETE_RECEIVED", diMarcForDeleteReceivedNumPartitions), new SRMKafkaTopic("DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED", diMarcOrderParsedNumPartitions), new SRMKafkaTopic("DI_INCOMING_MARC_BIB_RECORD_PARSED", diMarcBibRecordParsedNumPartitions), - new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", diEdifactRecordParsedNumPartitions) + new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", diEdifactRecordParsedNumPartitions), + new SRMKafkaTopic("DI_JOB_COMPLETED", diJobCompletedNumPartitions), }; } } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/verticle/DataImportJournalBatchConsumerVerticle.java b/mod-source-record-manager-server/src/main/java/org/folio/verticle/DataImportJournalBatchConsumerVerticle.java index 91ef84682..0f7cfffff 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/verticle/DataImportJournalBatchConsumerVerticle.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/verticle/DataImportJournalBatchConsumerVerticle.java @@ -283,6 +283,7 @@ private Flowable, Collection>> lis } private Completable saveJournalRecords(ConnectableFlowable, Collection>> flowable) { + LOGGER.debug("saveJournalRecords:: starting to save records."); Completable completable = flowable // Filter out pairs with empty records .filter(pair -> !pair.getRight().isEmpty()) @@ -333,9 +334,11 @@ private Completable saveJournalRecords(ConnectableFlowable } private Single> createJournalRecords(Bundle bundle) throws JsonProcessingException, JournalRecordMapperException { + LOGGER.debug("createJournalRecords :: start to handle bundle."); DataImportEventPayloadWithoutCurrentNode eventPayload = bundle.event().getEventPayload(); String tenantId = bundle.okapiConnectionParams.getTenantId(); - return AsyncResultSingle.toSingle(eventTypeHandlerSelector.getHandler(eventPayload).transform(batchJournalService.getJournalService(), eventPayload, tenantId), + return AsyncResultSingle.toSingle(eventTypeHandlerSelector.getHandler(eventPayload) + .transform(batchJournalService.getJournalService(), eventPayload, tenantId), col -> col.stream().map(res -> new BatchableJournalRecord(res, tenantId)).toList()); } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/verticle/JobExecutionProgressVerticle.java b/mod-source-record-manager-server/src/main/java/org/folio/verticle/JobExecutionProgressVerticle.java index 170b25a63..fed0925d0 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/verticle/JobExecutionProgressVerticle.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/verticle/JobExecutionProgressVerticle.java @@ -3,11 +3,13 @@ import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Maybe; -import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.flowables.GroupedFlowable; import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.json.Json; +import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl; import io.vertx.rxjava3.core.AbstractVerticle; import io.vertx.rxjava3.core.RxHelper; import io.vertx.rxjava3.core.eventbus.EventBus; @@ -18,6 +20,8 @@ import org.apache.logging.log4j.Logger; import org.folio.dao.JobExecutionProgressDao; import org.folio.dataimport.util.OkapiConnectionParams; +import org.folio.kafka.KafkaConfig; +import org.folio.kafka.KafkaHeaderUtils; import org.folio.rest.jaxrs.model.JobExecution; import org.folio.rest.jaxrs.model.JobExecutionDto; import org.folio.rest.jaxrs.model.JobExecutionDtoCollection; @@ -28,6 +32,7 @@ import org.folio.services.Status; import org.folio.services.progress.BatchableJobExecutionProgress; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -36,12 +41,15 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static java.lang.String.format; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_COMPLETED; import static org.folio.rest.jaxrs.model.JobExecution.Status.CANCELLED; import static org.folio.rest.jaxrs.model.JobExecution.Status.COMMITTED; import static org.folio.services.progress.JobExecutionProgressUtil.BATCH_JOB_PROGRESS_ADDRESS; +import static org.folio.services.util.EventHandlingUtil.sendEventToKafka; import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE; @@ -53,15 +61,24 @@ public class JobExecutionProgressVerticle extends AbstractVerticle { private static final Logger LOGGER = LogManager.getLogger(); private static final int MAX_NUM_EVENTS = 100; + private static final int MAX_DISTRIBUTION = 100; + private static final String USER_ID_HEADER = "userId"; + private static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId"; private final JobExecutionProgressDao jobExecutionProgressDao; private final JobExecutionService jobExecutionService; + private static final AtomicInteger indexer = new AtomicInteger(); private Scheduler scheduler; - public JobExecutionProgressVerticle(@Autowired JobExecutionProgressDao jobExecutionProgressDao, - @Autowired JobExecutionService jobExecutionService) { + private final KafkaConfig kafkaConfig; + + @Autowired + public JobExecutionProgressVerticle(JobExecutionProgressDao jobExecutionProgressDao, + JobExecutionService jobExecutionService, + @Qualifier("newKafkaConfig") KafkaConfig kafkaConfig) { this.jobExecutionProgressDao = jobExecutionProgressDao; this.jobExecutionService = jobExecutionService; + this.kafkaConfig = kafkaConfig; } @@ -94,7 +111,7 @@ private void consumeJobExecutionProgress(MessageConsumer groupByTenantIdAndJobExecutionId(flowable) .map(groupedMessages -> reduceManyJobExecutionProgressObjectsIntoSingleJobExecutionProgress(groupedMessages.toList(), groupedMessages.getKey().jobExecutionId())) - .flatMapCompletable(progressMaybe -> saveJobExecutionProgress(progressMaybe)) + .flatMapCompletable(this::saveJobExecutionProgress) ) .subscribeOn(scheduler) .observeOn(scheduler) @@ -239,6 +256,7 @@ private Future updateJobExecutionIfAllRecordsProcessed(String jobExecut parentExecution.withStatus(JobExecution.Status.COMMITTED) .withUiStatus(JobExecution.UiStatus.RUNNING_COMPLETE) .withCompletedDate(new Date()); + sendDiJobCompletedEvent(parentExecution, params); return jobExecutionService.updateJobExecutionWithSnapshotStatus(parentExecution, params); } return Future.succeededFuture(parentExecution); @@ -255,6 +273,16 @@ private Future updateJobExecutionIfAllRecordsProcessed(String jobExecut return Future.succeededFuture(false); } + private void sendDiJobCompletedEvent(JobExecution jobExecution, OkapiConnectionParams params) { + var kafkaHeaders = KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()); + kafkaHeaders.add(new KafkaHeaderImpl(JOB_EXECUTION_ID_HEADER, jobExecution.getId())); + kafkaHeaders.add(new KafkaHeaderImpl(USER_ID_HEADER, jobExecution.getUserId())); + var key = String.valueOf(indexer.incrementAndGet() % MAX_DISTRIBUTION); + sendEventToKafka(params.getTenantId(), Json.encode(jobExecution), DI_JOB_COMPLETED.value(), kafkaHeaders, kafkaConfig, key) + .onSuccess(event -> LOGGER.info("sendDiJobCompletedEvent:: DI_JOB_COMPLETED event published, jobExecutionId={}", jobExecution.getId())) + .onFailure(event -> LOGGER.warn("sendDiJobCompletedEvent:: Error publishing DI_JOB_COMPLETED event, jobExecutionId = {}", jobExecution.getId(), event)); + } + private Future updateJobStatusToError(String jobExecutionId, OkapiConnectionParams params) { return jobExecutionService.updateJobExecutionStatus(jobExecutionId, new StatusDto() .withStatus(StatusDto.Status.ERROR) diff --git a/mod-source-record-manager-server/src/main/resources/kafka.properties b/mod-source-record-manager-server/src/main/resources/kafka.properties index d80ebae8e..6a86da63b 100644 --- a/mod-source-record-manager-server/src/main/resources/kafka.properties +++ b/mod-source-record-manager-server/src/main/resources/kafka.properties @@ -9,3 +9,4 @@ di_marc_for_delete_received.partitions = ${DI_MARC_FOR_DELETE_RECEIVED_PARTITION di_marc_for_order_parsed.partitions = ${DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED_PARTITIONS:1} di_marc_bib_record_parsed.partitions = ${DI_INCOMING_MARC_BIB_RECORD_PARSED_PARTITIONS:1} di_edifact_parsed.partitions = ${DI_INCOMING_EDIFACT_RECORD_PARSED_PARTITIONS:1} +di_job_completed.partitions = ${DI_JOB_COMPLETED_PARTITIONS:1} diff --git a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java index bd02e0e0b..e17729238 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java @@ -57,6 +57,7 @@ import org.folio.MappingProfile; import org.folio.MatchProfile; import org.folio.TestUtil; +import org.folio.kafka.KafkaConfig; import org.folio.kafka.KafkaTopicNameHelper; import org.folio.rest.RestVerticle; import org.folio.rest.client.TenantClient; @@ -144,10 +145,11 @@ public abstract class AbstractRestTest { private static final String KAFKA_HOST = "KAFKA_HOST"; private static final String KAFKA_PORT = "KAFKA_PORT"; private static final String KAFKA_ENV = "ENV"; - private static final String KAFKA_ENV_VALUE = "test-env"; + protected static final String KAFKA_ENV_VALUE = "test-env"; public static final String OKAPI_URL_ENV = "OKAPI_URL"; private static final int PORT = NetworkUtils.nextFreePort(); protected static final String OKAPI_URL = "http://localhost:" + PORT; + protected static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId"; private final JsonObject userResponse = new JsonObject() .put("users", @@ -342,6 +344,7 @@ public abstract class AbstractRestTest { ); public static EmbeddedKafkaCluster kafkaCluster; + protected static KafkaConfig kafkaConfig; @BeforeClass public static void setUpClass(final TestContext context) throws Exception { @@ -356,6 +359,11 @@ public static void setUpClass(final TestContext context) throws Exception { System.setProperty(KAFKA_ENV, KAFKA_ENV_VALUE); System.setProperty(OKAPI_URL_ENV, OKAPI_URL); runDatabase(); + kafkaConfig = KafkaConfig.builder() + .kafkaHost(hostAndPort[0]) + .kafkaPort(hostAndPort[1]) + .envId(KAFKA_ENV_VALUE) + .build(); deployVerticle(context); } diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/KafkaAdminClientServiceTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/KafkaAdminClientServiceTest.java index 496ee068c..87e27b5ed 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/KafkaAdminClientServiceTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/KafkaAdminClientServiceTest.java @@ -60,7 +60,8 @@ public void setUp() { new SRMKafkaTopic("DI_MARC_FOR_DELETE_RECEIVED", 10), new SRMKafkaTopic("DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED", 10), new SRMKafkaTopic("DI_INCOMING_MARC_BIB_RECORD_PARSED", 10), - new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", 10) + new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", 10), + new SRMKafkaTopic("DI_JOB_COMPLETED", 10) }; when(srmKafkaTopicService.createTopicObjects()).thenReturn(topicObjects); @@ -157,6 +158,7 @@ private Future createKafkaTopicsAsync(KafkaAdminClient client) { env + ".Default.foo-tenant.DI_MARC_FOR_DELETE_RECEIVED", env + ".Default.foo-tenant.DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED", env + ".Default.foo-tenant.DI_INCOMING_MARC_BIB_RECORD_PARSED", - env + ".Default.foo-tenant.DI_INCOMING_EDIFACT_RECORD_PARSED" + env + ".Default.foo-tenant.DI_INCOMING_EDIFACT_RECORD_PARSED", + env + ".Default.foo-tenant.DI_JOB_COMPLETED" ); } diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java index 3703d37bf..90c64345c 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java @@ -189,7 +189,7 @@ public void setUp() throws IOException { MockitoAnnotations.openMocks(this); registerCodecs(vertx); - vertx.deployVerticle(new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService)); + vertx.deployVerticle(new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService, kafkaConfig)); MappingRuleCache mappingRuleCache = new MappingRuleCache(mappingRuleDao, vertx); marcRecordAnalyzer = new MarcRecordAnalyzer(); diff --git a/mod-source-record-manager-server/src/test/java/org/folio/verticle/JobExecutionProgressVerticleTest.java b/mod-source-record-manager-server/src/test/java/org/folio/verticle/JobExecutionProgressVerticleTest.java index 6ee715153..35601d37c 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/verticle/JobExecutionProgressVerticleTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/verticle/JobExecutionProgressVerticleTest.java @@ -3,12 +3,18 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.eventbus.MessageProducer; +import io.vertx.core.json.Json; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.RunTestOnContext; import io.vertx.ext.unit.junit.VertxUnitRunner; +import net.mguenther.kafka.junit.KeyValue; +import net.mguenther.kafka.junit.ObserveKeyValues; +import net.mguenther.kafka.junit.SendKeyValues; import org.folio.dao.JobExecutionProgressDao; import org.folio.dataimport.util.OkapiConnectionParams; +import org.folio.rest.impl.AbstractRestTest; +import org.folio.rest.jaxrs.model.Event; import org.folio.rest.jaxrs.model.JobExecution; import org.folio.rest.jaxrs.model.JobExecutionDto; import org.folio.rest.jaxrs.model.JobExecutionDtoCollection; @@ -32,20 +38,19 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.singletonList; import static org.folio.dataimport.util.RestUtil.OKAPI_URL_HEADER; +import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_COMPLETED; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER; import static org.folio.services.progress.JobExecutionProgressUtil.getBatchJobProgressProducer; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @RunWith(VertxUnitRunner.class) -public class JobExecutionProgressVerticleTest { +public class JobExecutionProgressVerticleTest extends AbstractRestTest { private final int AWAIT_TIME = 3; @@ -64,13 +69,14 @@ public class JobExecutionProgressVerticleTest { private String jobExecutionId; private String tenantId; + @Override @Before public void setUp(TestContext context) { MockitoAnnotations.openMocks(this); vertx = rule.vertx(); vertx.eventBus().registerCodec(new BatchableJobExecutionProgressCodec()); JobExecutionProgressVerticle jobExecutionProgressVerticle = - new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService); + new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService, kafkaConfig); vertx.deployVerticle(jobExecutionProgressVerticle, context.asyncAssertSuccess()); batchJobProgressProducer = getBatchJobProgressProducer(vertx); @@ -87,7 +93,7 @@ private OkapiConnectionParams createOkapiConnectionParams(String tenantId) { } @Test - public void testSingleProgressUpdate(TestContext context) { + public void testSingleProgressUpdate(TestContext context) throws InterruptedException { Async async = context.async(); // Arrange @@ -139,6 +145,10 @@ public void testSingleProgressUpdate(TestContext context) { ) ) ); + var topic = formatToKafkaTopicName(DI_JOB_COMPLETED.value()); + var request = prepareWithSpecifiedEventPayload(Json.encode(parentJobExecution), topic); + + kafkaCluster.send(request); // Act batchJobProgressProducer.write(batchableJobExecutionProgress) @@ -150,6 +160,9 @@ public void testSingleProgressUpdate(TestContext context) { .atMost(AWAIT_TIME, TimeUnit.SECONDS) .untilAsserted(() -> verify(jobExecutionProgressDao) .updateCompletionCounts(any(), eq(2), eq(1), eq(tenantId))); + kafkaCluster.observeValues(ObserveKeyValues.on(topic, 1) + .observeFor(30, TimeUnit.SECONDS) + .build()); } catch (Exception e) { context.fail(e); } @@ -160,6 +173,17 @@ public void testSingleProgressUpdate(TestContext context) { }); } + private SendKeyValues prepareWithSpecifiedEventPayload(String eventPayload, String topic) { + Event event = new Event().withId(UUID.randomUUID().toString()).withEventPayload(eventPayload); + KeyValue kafkaRecord = new KeyValue<>("key", Json.encode(event)); + kafkaRecord.addHeader(OKAPI_TENANT_HEADER, TENANT_ID, UTF_8); + kafkaRecord.addHeader(OKAPI_URL_HEADER, snapshotMockServer.baseUrl(), UTF_8); + kafkaRecord.addHeader(JOB_EXECUTION_ID_HEADER, jobExecutionId, UTF_8); + + return SendKeyValues.to(topic, singletonList(kafkaRecord)) + .useDefaults(); + } + @Test public void testMultipleProgressUpdateShouldBatch(TestContext context) { Async async = context.async(); diff --git a/ramls/raml-storage b/ramls/raml-storage index 8f35e1fa5..ac1fd05b9 160000 --- a/ramls/raml-storage +++ b/ramls/raml-storage @@ -1 +1 @@ -Subproject commit 8f35e1fa5902bab4792a2b4a5511e9eb2a860aef +Subproject commit ac1fd05b9ff5fd1b653c8e12a7d01254d2b4de1c