Skip to content

Commit

Permalink
MODSOURMAN-1246 Data import completion notifications (#946)
Browse files Browse the repository at this point in the history
* MODSOURMAN-1246 Data import completion notifications

* add test case

* init new kafka topic, cover with tests

* fix code smells

* MODSOURMAN-1248 Missing interface dependencies in module descriptor

* MODSOURMAN-1248 Missing interface dependencies in module descriptor

* fix comment

* MODSOURMAN-1246: change log level mechanism

* MODSOURMAN-1246: DI_MARC_FOR_UPDATE_RECEIVED was added and a few logs

* MODSOURMAN-1246: DI_MARC_FOR_UPDATE_RECEIVED was added

* MODSOURMAN-1246: removing extra changes

* rename log msg

* MODSOURMAN-1246: Remove extra changes

---------

Co-authored-by: aliaksandr_fedasiuk <[email protected]>
  • Loading branch information
JavokhirAbdullayev and Aliaksandr-Fedasiuk authored Nov 15, 2024
1 parent c597b4f commit ac44ce9
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 22 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ After setup, it is good to check logs in all related modules for errors. Data im
There are another important properties - `number of partitions` for topics `DI_COMPLETED`, `DI_ERROR`, `DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED`,
`DI_SRS_MARC_AUTHORITY_RECORD_CREATED`, `DI_SRS_MARC_HOLDING_RECORD_CREATED`, `DI_MARC_FOR_UPDATE_RECEIVED`,
`DI_MARC_FOR_DELETE_RECEIVED`,
`DI_INCOMING_MARC_BIB_RECORD_PARSED`, `DI_INCOMING_EDIFACT_RECORD_PARSED`
`DI_INCOMING_MARC_BIB_RECORD_PARSED`, `DI_INCOMING_EDIFACT_RECORD_PARSED`, `DI_JOB_COMPLETED`
and `DI_RAW_RECORDS_CHUNK_PARSED`
which are created during tenant initialization, the values of which can be customized with
`DI_COMPLETED_PARTITIONS`, `DI_ERROR_PARTITIONS`, `DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED_PARTITIONS`,
`DI_SRS_MARC_AUTHORITY_RECORD_CREATED_PARTITIONS`, `DI_SRS_MARC_HOLDINGS_RECORD_CREATED_PARTITIONS`,
`DI_MARC_FOR_UPDATE_RECEIVED_PARTITIONS`, `DI_MARC_FOR_DELETE_RECEIVED_PARTITIONS`,
`DI_INCOMING_MARC_BIB_RECORD_PARSED_PARTITIONS`, `DI_INCOMING_EDIFACT_RECORD_PARSED_PARTITIONS`
`DI_INCOMING_MARC_BIB_RECORD_PARSED_PARTITIONS`, `DI_INCOMING_EDIFACT_RECORD_PARSED_PARTITIONS`, `DI_JOB_COMPLETED_PARTITIONS`
and `DI_RAW_RECORDS_CHUNK_PARSED_PARTITIONS` env variables respectively.
Default value - `1`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class SRMKafkaTopicService {
@Value("${di_edifact_parsed.partitions}")
private Integer diEdifactRecordParsedNumPartitions;

@Value("${di_job_completed.partitions}")
private Integer diJobCompletedNumPartitions;

public KafkaTopic[] createTopicObjects() {
return new SRMKafkaTopic[] {
new SRMKafkaTopic("DI_COMPLETED", diCompletedNumPartitions),
Expand All @@ -51,7 +54,8 @@ public KafkaTopic[] createTopicObjects() {
new SRMKafkaTopic("DI_MARC_FOR_DELETE_RECEIVED", diMarcForDeleteReceivedNumPartitions),
new SRMKafkaTopic("DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED", diMarcOrderParsedNumPartitions),
new SRMKafkaTopic("DI_INCOMING_MARC_BIB_RECORD_PARSED", diMarcBibRecordParsedNumPartitions),
new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", diEdifactRecordParsedNumPartitions)
new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", diEdifactRecordParsedNumPartitions),
new SRMKafkaTopic("DI_JOB_COMPLETED", diJobCompletedNumPartitions),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ private Flowable<Pair<Optional<Bundle>, Collection<BatchableJournalRecord>>> lis
}

private Completable saveJournalRecords(ConnectableFlowable<Pair<Optional<Bundle>, Collection<BatchableJournalRecord>>> flowable) {
LOGGER.debug("saveJournalRecords:: starting to save records.");
Completable completable = flowable
// Filter out pairs with empty records
.filter(pair -> !pair.getRight().isEmpty())
Expand Down Expand Up @@ -333,9 +334,11 @@ private Completable saveJournalRecords(ConnectableFlowable<Pair<Optional<Bundle>
}

private Single<Collection<BatchableJournalRecord>> createJournalRecords(Bundle bundle) throws JsonProcessingException, JournalRecordMapperException {
LOGGER.debug("createJournalRecords :: start to handle bundle.");
DataImportEventPayloadWithoutCurrentNode eventPayload = bundle.event().getEventPayload();
String tenantId = bundle.okapiConnectionParams.getTenantId();
return AsyncResultSingle.toSingle(eventTypeHandlerSelector.getHandler(eventPayload).transform(batchJournalService.getJournalService(), eventPayload, tenantId),
return AsyncResultSingle.toSingle(eventTypeHandlerSelector.getHandler(eventPayload)
.transform(batchJournalService.getJournalService(), eventPayload, tenantId),
col -> col.stream().map(res -> new BatchableJournalRecord(res, tenantId)).toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.flowables.GroupedFlowable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl;
import io.vertx.rxjava3.core.AbstractVerticle;
import io.vertx.rxjava3.core.RxHelper;
import io.vertx.rxjava3.core.eventbus.EventBus;
Expand All @@ -18,6 +20,8 @@
import org.apache.logging.log4j.Logger;
import org.folio.dao.JobExecutionProgressDao;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
Expand All @@ -28,6 +32,7 @@
import org.folio.services.Status;
import org.folio.services.progress.BatchableJobExecutionProgress;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

Expand All @@ -36,12 +41,15 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static java.lang.String.format;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_COMPLETED;
import static org.folio.rest.jaxrs.model.JobExecution.Status.CANCELLED;
import static org.folio.rest.jaxrs.model.JobExecution.Status.COMMITTED;
import static org.folio.services.progress.JobExecutionProgressUtil.BATCH_JOB_PROGRESS_ADDRESS;
import static org.folio.services.util.EventHandlingUtil.sendEventToKafka;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE;


Expand All @@ -53,15 +61,24 @@
public class JobExecutionProgressVerticle extends AbstractVerticle {
private static final Logger LOGGER = LogManager.getLogger();
private static final int MAX_NUM_EVENTS = 100;
private static final int MAX_DISTRIBUTION = 100;
private static final String USER_ID_HEADER = "userId";
private static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId";

private final JobExecutionProgressDao jobExecutionProgressDao;
private final JobExecutionService jobExecutionService;
private static final AtomicInteger indexer = new AtomicInteger();
private Scheduler scheduler;

public JobExecutionProgressVerticle(@Autowired JobExecutionProgressDao jobExecutionProgressDao,
@Autowired JobExecutionService jobExecutionService) {
private final KafkaConfig kafkaConfig;

@Autowired
public JobExecutionProgressVerticle(JobExecutionProgressDao jobExecutionProgressDao,
JobExecutionService jobExecutionService,
@Qualifier("newKafkaConfig") KafkaConfig kafkaConfig) {
this.jobExecutionProgressDao = jobExecutionProgressDao;
this.jobExecutionService = jobExecutionService;
this.kafkaConfig = kafkaConfig;
}


Expand Down Expand Up @@ -94,7 +111,7 @@ private void consumeJobExecutionProgress(MessageConsumer<BatchableJobExecutionPr
.flatMapCompletable(flowable ->
groupByTenantIdAndJobExecutionId(flowable)
.map(groupedMessages -> reduceManyJobExecutionProgressObjectsIntoSingleJobExecutionProgress(groupedMessages.toList(), groupedMessages.getKey().jobExecutionId()))
.flatMapCompletable(progressMaybe -> saveJobExecutionProgress(progressMaybe))
.flatMapCompletable(this::saveJobExecutionProgress)
)
.subscribeOn(scheduler)
.observeOn(scheduler)
Expand Down Expand Up @@ -239,6 +256,7 @@ private Future<Boolean> updateJobExecutionIfAllRecordsProcessed(String jobExecut
parentExecution.withStatus(JobExecution.Status.COMMITTED)
.withUiStatus(JobExecution.UiStatus.RUNNING_COMPLETE)
.withCompletedDate(new Date());
sendDiJobCompletedEvent(parentExecution, params);
return jobExecutionService.updateJobExecutionWithSnapshotStatus(parentExecution, params);
}
return Future.succeededFuture(parentExecution);
Expand All @@ -255,6 +273,16 @@ private Future<Boolean> updateJobExecutionIfAllRecordsProcessed(String jobExecut
return Future.succeededFuture(false);
}

private void sendDiJobCompletedEvent(JobExecution jobExecution, OkapiConnectionParams params) {
var kafkaHeaders = KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders());
kafkaHeaders.add(new KafkaHeaderImpl(JOB_EXECUTION_ID_HEADER, jobExecution.getId()));
kafkaHeaders.add(new KafkaHeaderImpl(USER_ID_HEADER, jobExecution.getUserId()));
var key = String.valueOf(indexer.incrementAndGet() % MAX_DISTRIBUTION);
sendEventToKafka(params.getTenantId(), Json.encode(jobExecution), DI_JOB_COMPLETED.value(), kafkaHeaders, kafkaConfig, key)
.onSuccess(event -> LOGGER.info("sendDiJobCompletedEvent:: DI_JOB_COMPLETED event published, jobExecutionId={}", jobExecution.getId()))
.onFailure(event -> LOGGER.warn("sendDiJobCompletedEvent:: Error publishing DI_JOB_COMPLETED event, jobExecutionId = {}", jobExecution.getId(), event));
}

private Future<JobExecution> updateJobStatusToError(String jobExecutionId, OkapiConnectionParams params) {
return jobExecutionService.updateJobExecutionStatus(jobExecutionId, new StatusDto()
.withStatus(StatusDto.Status.ERROR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ di_marc_for_delete_received.partitions = ${DI_MARC_FOR_DELETE_RECEIVED_PARTITION
di_marc_for_order_parsed.partitions = ${DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED_PARTITIONS:1}
di_marc_bib_record_parsed.partitions = ${DI_INCOMING_MARC_BIB_RECORD_PARSED_PARTITIONS:1}
di_edifact_parsed.partitions = ${DI_INCOMING_EDIFACT_RECORD_PARSED_PARTITIONS:1}
di_job_completed.partitions = ${DI_JOB_COMPLETED_PARTITIONS:1}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.folio.MappingProfile;
import org.folio.MatchProfile;
import org.folio.TestUtil;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.rest.RestVerticle;
import org.folio.rest.client.TenantClient;
Expand Down Expand Up @@ -144,10 +145,11 @@ public abstract class AbstractRestTest {
private static final String KAFKA_HOST = "KAFKA_HOST";
private static final String KAFKA_PORT = "KAFKA_PORT";
private static final String KAFKA_ENV = "ENV";
private static final String KAFKA_ENV_VALUE = "test-env";
protected static final String KAFKA_ENV_VALUE = "test-env";
public static final String OKAPI_URL_ENV = "OKAPI_URL";
private static final int PORT = NetworkUtils.nextFreePort();
protected static final String OKAPI_URL = "http://localhost:" + PORT;
protected static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId";

private final JsonObject userResponse = new JsonObject()
.put("users",
Expand Down Expand Up @@ -342,6 +344,7 @@ public abstract class AbstractRestTest {
);

public static EmbeddedKafkaCluster kafkaCluster;
protected static KafkaConfig kafkaConfig;

@BeforeClass
public static void setUpClass(final TestContext context) throws Exception {
Expand All @@ -356,6 +359,11 @@ public static void setUpClass(final TestContext context) throws Exception {
System.setProperty(KAFKA_ENV, KAFKA_ENV_VALUE);
System.setProperty(OKAPI_URL_ENV, OKAPI_URL);
runDatabase();
kafkaConfig = KafkaConfig.builder()
.kafkaHost(hostAndPort[0])
.kafkaPort(hostAndPort[1])
.envId(KAFKA_ENV_VALUE)
.build();
deployVerticle(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void setUp() {
new SRMKafkaTopic("DI_MARC_FOR_DELETE_RECEIVED", 10),
new SRMKafkaTopic("DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED", 10),
new SRMKafkaTopic("DI_INCOMING_MARC_BIB_RECORD_PARSED", 10),
new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", 10)
new SRMKafkaTopic("DI_INCOMING_EDIFACT_RECORD_PARSED", 10),
new SRMKafkaTopic("DI_JOB_COMPLETED", 10)
};

when(srmKafkaTopicService.createTopicObjects()).thenReturn(topicObjects);
Expand Down Expand Up @@ -157,6 +158,7 @@ private Future<Void> createKafkaTopicsAsync(KafkaAdminClient client) {
env + ".Default.foo-tenant.DI_MARC_FOR_DELETE_RECEIVED",
env + ".Default.foo-tenant.DI_INCOMING_MARC_BIB_FOR_ORDER_PARSED",
env + ".Default.foo-tenant.DI_INCOMING_MARC_BIB_RECORD_PARSED",
env + ".Default.foo-tenant.DI_INCOMING_EDIFACT_RECORD_PARSED"
env + ".Default.foo-tenant.DI_INCOMING_EDIFACT_RECORD_PARSED",
env + ".Default.foo-tenant.DI_JOB_COMPLETED"
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void setUp() throws IOException {
MockitoAnnotations.openMocks(this);

registerCodecs(vertx);
vertx.deployVerticle(new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService));
vertx.deployVerticle(new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService, kafkaConfig));

MappingRuleCache mappingRuleCache = new MappingRuleCache(mappingRuleDao, vertx);
marcRecordAnalyzer = new MarcRecordAnalyzer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.Json;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.ObserveKeyValues;
import net.mguenther.kafka.junit.SendKeyValues;
import org.folio.dao.JobExecutionProgressDao;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.rest.impl.AbstractRestTest;
import org.folio.rest.jaxrs.model.Event;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionDto;
import org.folio.rest.jaxrs.model.JobExecutionDtoCollection;
Expand All @@ -32,20 +38,19 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singletonList;
import static org.folio.dataimport.util.RestUtil.OKAPI_URL_HEADER;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_JOB_COMPLETED;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.services.progress.JobExecutionProgressUtil.getBatchJobProgressProducer;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

@RunWith(VertxUnitRunner.class)
public class JobExecutionProgressVerticleTest {
public class JobExecutionProgressVerticleTest extends AbstractRestTest {

private final int AWAIT_TIME = 3;

Expand All @@ -64,13 +69,14 @@ public class JobExecutionProgressVerticleTest {
private String jobExecutionId;
private String tenantId;

@Override
@Before
public void setUp(TestContext context) {
MockitoAnnotations.openMocks(this);
vertx = rule.vertx();
vertx.eventBus().registerCodec(new BatchableJobExecutionProgressCodec());
JobExecutionProgressVerticle jobExecutionProgressVerticle =
new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService);
new JobExecutionProgressVerticle(jobExecutionProgressDao, jobExecutionService, kafkaConfig);
vertx.deployVerticle(jobExecutionProgressVerticle,
context.asyncAssertSuccess());
batchJobProgressProducer = getBatchJobProgressProducer(vertx);
Expand All @@ -87,7 +93,7 @@ private OkapiConnectionParams createOkapiConnectionParams(String tenantId) {
}

@Test
public void testSingleProgressUpdate(TestContext context) {
public void testSingleProgressUpdate(TestContext context) throws InterruptedException {
Async async = context.async();

// Arrange
Expand Down Expand Up @@ -139,6 +145,10 @@ public void testSingleProgressUpdate(TestContext context) {
)
)
);
var topic = formatToKafkaTopicName(DI_JOB_COMPLETED.value());
var request = prepareWithSpecifiedEventPayload(Json.encode(parentJobExecution), topic);

kafkaCluster.send(request);

// Act
batchJobProgressProducer.write(batchableJobExecutionProgress)
Expand All @@ -150,6 +160,9 @@ public void testSingleProgressUpdate(TestContext context) {
.atMost(AWAIT_TIME, TimeUnit.SECONDS)
.untilAsserted(() -> verify(jobExecutionProgressDao)
.updateCompletionCounts(any(), eq(2), eq(1), eq(tenantId)));
kafkaCluster.observeValues(ObserveKeyValues.on(topic, 1)
.observeFor(30, TimeUnit.SECONDS)
.build());
} catch (Exception e) {
context.fail(e);
}
Expand All @@ -160,6 +173,17 @@ public void testSingleProgressUpdate(TestContext context) {
});
}

private SendKeyValues<String, String> prepareWithSpecifiedEventPayload(String eventPayload, String topic) {
Event event = new Event().withId(UUID.randomUUID().toString()).withEventPayload(eventPayload);
KeyValue<String, String> kafkaRecord = new KeyValue<>("key", Json.encode(event));
kafkaRecord.addHeader(OKAPI_TENANT_HEADER, TENANT_ID, UTF_8);
kafkaRecord.addHeader(OKAPI_URL_HEADER, snapshotMockServer.baseUrl(), UTF_8);
kafkaRecord.addHeader(JOB_EXECUTION_ID_HEADER, jobExecutionId, UTF_8);

return SendKeyValues.to(topic, singletonList(kafkaRecord))
.useDefaults();
}

@Test
public void testMultipleProgressUpdateShouldBatch(TestContext context) {
Async async = context.async();
Expand Down
2 changes: 1 addition & 1 deletion ramls/raml-storage

0 comments on commit ac44ce9

Please sign in to comment.