Skip to content

Commit

Permalink
MODSOURCE-707 test
Browse files Browse the repository at this point in the history
  • Loading branch information
psmagin committed Dec 18, 2023
1 parent 66ddcba commit 2036fff
Show file tree
Hide file tree
Showing 17 changed files with 648 additions and 315 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.folio;


import org.folio.kafka.services.KafkaTopic;

public enum AuthorityDomainKafkaTopic implements KafkaTopic {
AUTHORITY("authority");

private static final String AUTHORITIES_PREFIX = "authorities";
private final String topic;

AuthorityDomainKafkaTopic(String topic) {
this.topic = topic;
}

@Override
public String moduleName() {
return AUTHORITIES_PREFIX;
}

@Override
public String topicName() {
return topic;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.folio.consumers;

import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalId;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.type.TypeReference;
import io.vertx.core.Future;
import io.vertx.core.json.Json;
import io.vertx.core.json.jackson.JacksonCodec;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Collections;
import java.util.Map;
import lombok.extern.log4j.Log4j2;
import org.folio.dao.util.RecordType;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.services.RecordService;
import org.folio.services.util.KafkaUtil;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class AuthorityDomainKafkaHandler implements AsyncRecordHandler<String, String> {

public static final String DELETE_DOMAIN_EVENT_TYPE = "DELETE";
public static final String DELETE_EVENT_SUB_TYPE_FLD = "deleteEventSubType";
public static final String TENANT_FIELD = "tenant";
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() { };
private static final String DOMAIN_EVENT_TYPE_HEADER = "domain-event-type";
private final RecordService recordService;

public AuthorityDomainKafkaHandler(RecordService recordService) {
this.recordService = recordService;
}

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
log.trace("handle:: Handling kafka record: '{}'", consumerRecord);
String authorityId = consumerRecord.key();
if (isUnexpectedDomainEvent(consumerRecord)) {
log.debug("handle:: Expected only DELETE domain type. Skipping authority domain kafka record with ID: '{}'",
authorityId);
}

var recordValue = ((JacksonCodec) Json.CODEC).fromString(consumerRecord.value(), MAP_TYPE_REFERENCE);
var tenantId = recordValue.get(TENANT_FIELD).toString();
var deleteEventSubType =
AuthorityDeleteEventSubType.fromValue(recordValue.get(DELETE_EVENT_SUB_TYPE_FLD).toString());

return switch (deleteEventSubType) {
case SOFT_DELETE -> performSoftDelete(authorityId, tenantId);
case HARD_DELETE -> performHardDelete(authorityId, tenantId);
};
}

private Future<String> performSoftDelete(String authorityId, String tenantId) {
var condition = filterRecordByExternalId(authorityId);
return recordService.getRecords(condition, RecordType.MARC_AUTHORITY, Collections.emptyList(), 0, 1, tenantId)
.compose(recordCollection -> {
if (recordCollection.getRecords().isEmpty()) {
log.debug("handle:: No records found for externalId '{}', tenantId '{}'", authorityId, tenantId);
return Future.succeededFuture();
}
var matchedId = recordCollection.getRecords().get(0).getMatchedId();
log.info("handle:: Trying to soft-delete records with: matchedId '{}', externalId '{}', tenantId '{}'",
matchedId, authorityId, tenantId);
return recordService.updateRecordsState(matchedId, RecordState.DELETED, RecordType.MARC_AUTHORITY, tenantId);
})
.map(authorityId)
.onFailure(throwable ->
log.error("handle:: Failed to soft-delete records with externalId '{}', tenantId '{}'", authorityId, tenantId));
}

private Future<String> performHardDelete(String authorityId, String tenantId) {
return recordService.deleteRecordsByExternalId(authorityId, tenantId)
.map(authorityId);
}

private static boolean isUnexpectedDomainEvent(KafkaConsumerRecord<String, String> consumerRecord) {
return !KafkaUtil.headerExists(DOMAIN_EVENT_TYPE_HEADER, DELETE_DOMAIN_EVENT_TYPE, consumerRecord.headers());
}

public enum AuthorityDeleteEventSubType {

SOFT_DELETE("SOFT_DELETE"),
HARD_DELETE("HARD_DELETE");

private final String value;

AuthorityDeleteEventSubType(String type) {
this.value = type;
}

@JsonValue
public String getValue() {
return value;
}

@JsonCreator
public static AuthorityDeleteEventSubType fromValue(String value) {
for (AuthorityDeleteEventSubType b : AuthorityDeleteEventSubType.values()) {
if (b.value.equals(value)) {
return b;
}
}
throw new IllegalArgumentException("Unexpected value '" + value + "'");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.OrderField;

import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor;
Expand Down Expand Up @@ -302,6 +303,17 @@ public interface RecordDao {
*/
Future<Boolean> deleteRecordsBySnapshotId(String snapshotId, String tenantId);

/**
* Deletes in transaction all records associated with matchedId
*
* @param matchedId snapshot id
* @param tenantId tenant id
* @return future with true if succeeded
*/
Future<Boolean> deleteRecordsByExternalId(String matchedId, String tenantId);

Future<Boolean> deleteRecordsByCondition(Condition condition, String tenantId);

/**
* Performs purge the 'DELETED' records.
* Purges a given limited number of 'DELETED' records updated more the than given number of days back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,24 @@ public Future<Boolean> deleteRecordsBySnapshotId(String snapshotId, String tenan
return SnapshotDaoUtil.delete(getQueryExecutor(tenantId), snapshotId);
}

@Override
public Future<Boolean> deleteRecordsByExternalId(String externalId, String tenantId) {
LOG.trace("deleteRecordsBySnapshotId:: Deleting records by externalId {} for tenant {}", externalId, tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> txQE
.execute(dsl -> dsl.deleteFrom(MARC_RECORDS_LB)
.using(RECORDS_LB)
.where(MARC_RECORDS_LB.ID.eq(RECORDS_LB.ID))
.and(RECORDS_LB.EXTERNAL_ID.eq(UUID.fromString(externalId))))
.compose(integer -> txQE.execute(dsl -> dsl.deleteFrom(RECORDS_LB)
.where(RECORDS_LB.EXTERNAL_ID.eq(UUID.fromString(externalId))))))
.map(u -> true);
}

@Override
public Future<Boolean> deleteRecordsByCondition(Condition condition, String tenantId) {
return null;
}

@Override
public Future<Void> deleteRecords(int lastUpdatedDays, int limit, String tenantId) {
LOG.trace("deleteRecords:: Deleting record by last {} days for tenant {}", lastUpdatedDays, tenantId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,21 @@ public static Condition filterRecordBySnapshotId(String snapshotId) {
*/
public static Condition filterRecordByType(String type) {
if (StringUtils.isNotEmpty(type)) {
return RECORDS_LB.RECORD_TYPE.eq(toRecordType(type));
return filterRecordByType(toRecordType(type));
}
return DSL.noCondition();
}

/**
* Get {@link Condition} to filter by type
*
* @param recordType type to equal
* @return condition
*/
public static Condition filterRecordByType(RecordType recordType) {
return RECORDS_LB.RECORD_TYPE.eq(recordType);
}

/**
* Get {@link Condition} to filter by state
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,20 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.spi.VerticleFactory;
import java.util.List;
import java.util.OptionalInt;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.config.ApplicationConfig;
import org.folio.kafka.KafkaConfig;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.processing.events.EventManager;
import org.folio.processing.events.services.handler.EventHandler;
import org.folio.rest.resource.interfaces.InitAPI;
import org.folio.services.handlers.AuthorityPostProcessingEventHandler;
import org.folio.services.handlers.HoldingsPostProcessingEventHandler;
import org.folio.services.handlers.InstancePostProcessingEventHandler;
import org.folio.services.handlers.actions.MarcAuthorityDeleteEventHandler;
import org.folio.services.handlers.actions.MarcAuthorityUpdateModifyEventHandler;
import org.folio.services.handlers.actions.MarcBibUpdateModifyEventHandler;
import org.folio.services.handlers.actions.MarcHoldingsUpdateModifyEventHandler;
import org.folio.services.handlers.match.MarcAuthorityMatchEventHandler;
import org.folio.services.handlers.match.MarcBibliographicMatchEventHandler;
import org.folio.services.handlers.match.MarcHoldingsMatchEventHandler;
import org.folio.spring.SpringContextUtil;
import org.folio.verticle.MarcIndexersVersionDeletionVerticle;
import org.folio.verticle.SpringVerticleFactory;
import org.folio.verticle.consumers.AuthorityDomainConsumersVerticle;
import org.folio.verticle.consumers.AuthorityLinkChunkConsumersVerticle;
import org.folio.verticle.consumers.DataImportConsumersVerticle;
import org.folio.verticle.consumers.ParsedRecordChunkConsumersVerticle;
Expand All @@ -35,42 +30,16 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.support.AbstractApplicationContext;

import java.util.List;

public class InitAPIImpl implements InitAPI {

private static final String SPRING_CONTEXT = "springContext";
private static final Logger LOGGER = LogManager.getLogger();

@Autowired
private InstancePostProcessingEventHandler instancePostProcessingEventHandler;

@Autowired
private HoldingsPostProcessingEventHandler holdingsPostProcessingEventHandler;

@Autowired
private AuthorityPostProcessingEventHandler authorityPostProcessingEventHandler;

@Autowired
private MarcBibUpdateModifyEventHandler marcBibUpdateModifyEventHandler;

@Autowired
private MarcAuthorityUpdateModifyEventHandler marcAuthorityUpdateModifyEventHandler;

@Autowired
private MarcBibliographicMatchEventHandler marcBibliographicMatchEventHandler;

@Autowired
private MarcAuthorityMatchEventHandler marcAuthorityMatchEventHandler;
private KafkaConfig kafkaConfig;

@Autowired
private MarcAuthorityDeleteEventHandler marcAuthorityDeleteEventHandler;

@Autowired
private MarcHoldingsMatchEventHandler marcHoldingsMatchEventHandler;

@Autowired
private MarcHoldingsUpdateModifyEventHandler marcHoldingsUpdateModifyEventHandler;
private List<EventHandler> eventHandlers;

@Value("${srs.kafka.ParsedMarcChunkConsumer.instancesNumber:1}")
private int parsedMarcChunkConsumerInstancesNumber;
Expand All @@ -84,6 +53,12 @@ public class InitAPIImpl implements InitAPI {
@Value("${srs.kafka.AuthorityLinkChunkConsumer.instancesNumber:1}")
private int authorityLinkChunkConsumerInstancesNumber;

@Value("${srs.kafka.AuthorityDomainConsumer.instancesNumber:1}")
private int authorityDomainConsumerInstancesNumber;

@Value("${srs.kafka.DataImportConsumerVerticle.maxDistributionNum:100}")
private int maxDistributionNumber;

@Override
public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> handler) {
try {
Expand All @@ -93,9 +68,11 @@ public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> han
VerticleFactory verticleFactory = springContext.getBean(SpringVerticleFactory.class);
vertx.registerVerticleFactory(verticleFactory);

EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber);

registerEventHandlers();
deployMarcIndexersVersionDeletionVerticle(vertx, verticleFactory);
deployConsumerVerticles(vertx).onComplete(ar -> {
deployConsumerVerticles(vertx, verticleFactory).onComplete(ar -> {
if (ar.succeeded()) {
handler.handle(Future.succeededFuture(true));
} else {
Expand All @@ -109,56 +86,49 @@ public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> han
}

private void registerEventHandlers() {
EventManager.registerEventHandler(instancePostProcessingEventHandler);
EventManager.registerEventHandler(holdingsPostProcessingEventHandler);
EventManager.registerEventHandler(authorityPostProcessingEventHandler);
EventManager.registerEventHandler(marcBibUpdateModifyEventHandler);
EventManager.registerEventHandler(marcAuthorityUpdateModifyEventHandler);
EventManager.registerEventHandler(marcBibliographicMatchEventHandler);
EventManager.registerEventHandler(marcAuthorityMatchEventHandler);
EventManager.registerEventHandler(marcAuthorityDeleteEventHandler);
EventManager.registerEventHandler(marcHoldingsMatchEventHandler) ;
EventManager.registerEventHandler(marcHoldingsUpdateModifyEventHandler);
eventHandlers.forEach(EventManager::registerEventHandler);
}

private Future<?> deployConsumerVerticles(Vertx vertx) {
//TODO: get rid of this workaround with global spring context
ParsedRecordChunkConsumersVerticle.setSpringGlobalContext(vertx.getOrCreateContext().get(SPRING_CONTEXT));
DataImportConsumersVerticle.setSpringGlobalContext(vertx.getOrCreateContext().get(SPRING_CONTEXT));
QuickMarcConsumersVerticle.setSpringGlobalContext(vertx.getOrCreateContext().get(SPRING_CONTEXT));
AuthorityLinkChunkConsumersVerticle.setSpringGlobalContext(vertx.getOrCreateContext().get(SPRING_CONTEXT));

private Future<?> deployConsumerVerticles(Vertx vertx, VerticleFactory verticleFactory) {
Promise<String> deployConsumer1 = Promise.promise();
Promise<String> deployConsumer2 = Promise.promise();
Promise<String> deployConsumer3 = Promise.promise();
Promise<String> deployConsumer4 = Promise.promise();

vertx.deployVerticle(ParsedRecordChunkConsumersVerticle.class.getCanonicalName(),
new DeploymentOptions().setWorker(true).setInstances(parsedMarcChunkConsumerInstancesNumber), deployConsumer1);

vertx.deployVerticle(DataImportConsumersVerticle.class.getCanonicalName(),
new DeploymentOptions().setWorker(true).setInstances(dataImportConsumerInstancesNumber), deployConsumer2);

vertx.deployVerticle(QuickMarcConsumersVerticle.class.getCanonicalName(),
new DeploymentOptions().setWorker(true).setInstances(quickMarcConsumerInstancesNumber), deployConsumer3);

vertx.deployVerticle(AuthorityLinkChunkConsumersVerticle.class.getCanonicalName(),
new DeploymentOptions().setWorker(true).setInstances(authorityLinkChunkConsumerInstancesNumber), deployConsumer4);
Promise<String> deployConsumer5 = Promise.promise();

deployVerticle(vertx, verticleFactory, AuthorityLinkChunkConsumersVerticle.class,
OptionalInt.of(authorityLinkChunkConsumerInstancesNumber), deployConsumer1);
deployVerticle(vertx, verticleFactory, AuthorityDomainConsumersVerticle.class,
OptionalInt.of(authorityDomainConsumerInstancesNumber), deployConsumer2);
deployVerticle(vertx, verticleFactory, DataImportConsumersVerticle.class,
OptionalInt.of(dataImportConsumerInstancesNumber), deployConsumer3);
deployVerticle(vertx, verticleFactory, ParsedRecordChunkConsumersVerticle.class,
OptionalInt.of(parsedMarcChunkConsumerInstancesNumber), deployConsumer4);
deployVerticle(vertx, verticleFactory, QuickMarcConsumersVerticle.class,
OptionalInt.of(quickMarcConsumerInstancesNumber), deployConsumer5);

return GenericCompositeFuture.all(List.of(
deployConsumer1.future(),
deployConsumer2.future(),
deployConsumer3.future(),
deployConsumer4.future()));
deployConsumer4.future(),
deployConsumer5.future()
));
}

private <T> String getVerticleName(VerticleFactory verticleFactory, Class<T> clazz) {
return verticleFactory.prefix() + ":" + clazz.getName();
}

private void deployMarcIndexersVersionDeletionVerticle(Vertx vertx, VerticleFactory verticleFactory){
vertx.deployVerticle(getVerticleName(verticleFactory, MarcIndexersVersionDeletionVerticle.class),
private void deployMarcIndexersVersionDeletionVerticle(Vertx vertx, VerticleFactory verticleFactory) {
vertx.deployVerticle(getVerticleName(verticleFactory, (Class<?>) MarcIndexersVersionDeletionVerticle.class),
new DeploymentOptions().setWorker(true));
}

private void deployVerticle(Vertx vertx, VerticleFactory verticleFactory, Class<?> verticleClass,
OptionalInt instancesNumber, Promise<String> promise) {
vertx.deployVerticle(getVerticleName(verticleFactory, verticleClass),
new DeploymentOptions().setWorker(true).setInstances(instancesNumber.orElse(1)), promise);
}

}
Loading

0 comments on commit 2036fff

Please sign in to comment.