diff --git a/NEWS.md b/NEWS.md index 4d32578ff..f2bed6a2e 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,5 @@ ## 2024-xx-xx 5.8.0-SNAPSHOT +* [MODSOURCE-733](https://issues.folio.org/browse/MODSOURCE-733) Reduce Memory Allocation of Strings * [MODSOURCE-506](https://issues.folio.org/browse/MODSOURCE-506) Remove rawRecord field from source record * [MODSOURCE-709](https://issues.folio.org/browse/MODSOURCE-709) MARC authority record is not created when use Job profile with match profile by absent subfield/field * [MODSOURCE-677](https://issues.folio.org/browse/MODSOURCE-677) Import is completed with errors when control field that differs from 001 is used for marc-to-marc matching diff --git a/mod-source-record-storage-server/pom.xml b/mod-source-record-storage-server/pom.xml index 2bb4333c3..676a1871e 100644 --- a/mod-source-record-storage-server/pom.xml +++ b/mod-source-record-storage-server/pom.xml @@ -188,7 +188,7 @@ org.folio folio-kafka-wrapper - 3.0.0 + 3.1.0-SNAPSHOT net.mguenther.kafka diff --git a/mod-source-record-storage-server/src/main/java/org/folio/consumers/DataImportKafkaHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/consumers/DataImportKafkaHandler.java index 57f2dd2f0..2336f5148 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/consumers/DataImportKafkaHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/consumers/DataImportKafkaHandler.java @@ -4,13 +4,13 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.Json; +import io.vertx.core.json.jackson.DatabindCodec; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.DataImportEventPayload; import org.folio.dataimport.util.OkapiConnectionParams; -import org.folio.dbschema.ObjectMapperTool; import org.folio.kafka.AsyncRecordHandler; import org.folio.processing.events.EventManager; import org.folio.processing.exceptions.EventProcessingException; @@ -29,7 +29,7 @@ @Component @Qualifier("DataImportKafkaHandler") -public class DataImportKafkaHandler implements AsyncRecordHandler { +public class DataImportKafkaHandler implements AsyncRecordHandler { private static final Logger LOGGER = LogManager.getLogger(); @@ -48,14 +48,14 @@ public DataImportKafkaHandler(Vertx vertx, JobProfileSnapshotCache profileSnapsh } @Override - public Future handle(KafkaConsumerRecord targetRecord) { + public Future handle(KafkaConsumerRecord targetRecord) { LOGGER.trace("handle:: Handling kafka record: {}", targetRecord); String recordId = extractHeaderValue(RECORD_ID_HEADER, targetRecord.headers()); String chunkId = extractHeaderValue(CHUNK_ID_HEADER, targetRecord.headers()); String userId = extractHeaderValue(USER_ID_HEADER, targetRecord.headers()); try { Promise promise = Promise.promise(); - Event event = ObjectMapperTool.getMapper().readValue(targetRecord.value(), Event.class); + Event event = DatabindCodec.mapper().readValue(targetRecord.value(), Event.class); DataImportEventPayload eventPayload = Json.decodeValue(event.getEventPayload(), DataImportEventPayload.class); LOGGER.debug("handle:: Data import event payload has been received with event type: '{}' by jobExecutionId: '{}' and recordId: '{}' and chunkId: '{}' and userId: '{}'", eventPayload.getEventType(), eventPayload.getJobExecutionId(), recordId, chunkId, userId); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/consumers/ParsedRecordChunksKafkaHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/consumers/ParsedRecordChunksKafkaHandler.java index 46908a125..e9cdbe4e7 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/consumers/ParsedRecordChunksKafkaHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/consumers/ParsedRecordChunksKafkaHandler.java @@ -4,6 +4,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.Json; +import io.vertx.core.json.jackson.DatabindCodec; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducer; @@ -37,7 +38,7 @@ import static org.folio.services.util.KafkaUtil.extractHeaderValue; @Component -public class ParsedRecordChunksKafkaHandler implements AsyncRecordHandler { +public class ParsedRecordChunksKafkaHandler implements AsyncRecordHandler { private static final Logger LOGGER = LogManager.getLogger(); public static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId"; @@ -65,7 +66,7 @@ public ParsedRecordChunksKafkaHandler(@Autowired RecordService recordService, } @Override - public Future handle(KafkaConsumerRecord targetRecord) { + public Future handle(KafkaConsumerRecord targetRecord) { LOGGER.trace("handle:: Handling kafka record: {}", targetRecord); String jobExecutionId = extractHeaderValue(JOB_EXECUTION_ID_HEADER, targetRecord.headers()); String chunkId = extractHeaderValue(CHUNK_ID_HEADER, targetRecord.headers()); @@ -74,7 +75,7 @@ public Future handle(KafkaConsumerRecord targetRecord) { String key = targetRecord.key(); try { - Event event = Json.decodeValue(targetRecord.value(), Event.class); + Event event = DatabindCodec.mapper().readValue(targetRecord.value(), Event.class); RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class); List kafkaHeaders = targetRecord.headers(); @@ -93,7 +94,7 @@ public Future handle(KafkaConsumerRecord targetRecord) { } } - private Future sendBackRecordsBatchResponse(RecordsBatchResponse recordsBatchResponse, List kafkaHeaders, String tenantId, int chunkNumber, String eventType, KafkaConsumerRecord commonRecord) { + private Future sendBackRecordsBatchResponse(RecordsBatchResponse recordsBatchResponse, List kafkaHeaders, String tenantId, int chunkNumber, String eventType, KafkaConsumerRecord commonRecord) { Event event; event = new Event() .withId(UUID.randomUUID().toString()) diff --git a/mod-source-record-storage-server/src/main/java/org/folio/errorhandlers/ParsedRecordChunksErrorHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/errorhandlers/ParsedRecordChunksErrorHandler.java index 93a03e0ad..adf1c9d47 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/errorhandlers/ParsedRecordChunksErrorHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/errorhandlers/ParsedRecordChunksErrorHandler.java @@ -3,6 +3,7 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.Json; +import io.vertx.core.json.jackson.DatabindCodec; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl; @@ -23,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -38,7 +40,7 @@ * with status 'Completed with errors' with showing error messge instead of hanging progress bar. */ @Component -public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler { +public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler { private static final Logger LOGGER = LogManager.getLogger(); @@ -53,12 +55,18 @@ public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler private Vertx vertx; @Override - public void handle(Throwable throwable, KafkaConsumerRecord record) { - LOGGER.trace("handle:: Handling record {}", record); - Event event = Json.decodeValue(record.value(), Event.class); - RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class); + public void handle(Throwable throwable, KafkaConsumerRecord consumerRecord) { + LOGGER.trace("handle:: Handling record {}", consumerRecord); + Event event; + try { + event = DatabindCodec.mapper().readValue(consumerRecord.value(), Event.class); + } catch (IOException e) { + LOGGER.error("Something happened when deserializing record", e); + return; + } + RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class); - List kafkaHeaders = record.headers(); + List kafkaHeaders = consumerRecord.headers(); OkapiConnectionParams okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx); String jobExecutionId = okapiConnectionParams.getHeaders().get(JOB_EXECUTION_ID_HEADER); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AbstractConsumerVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AbstractConsumerVerticle.java index 062110016..eb2d8658e 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AbstractConsumerVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AbstractConsumerVerticle.java @@ -19,9 +19,9 @@ import org.folio.kafka.SubscriptionDefinition; import org.folio.okapi.common.GenericCompositeFuture; -public abstract class AbstractConsumerVerticle extends AbstractVerticle { +public abstract class AbstractConsumerVerticle extends AbstractVerticle { - private final List> consumers = new ArrayList<>(); + private final List> consumers = new ArrayList<>(); private final KafkaConfig kafkaConfig; @@ -31,12 +31,20 @@ protected AbstractConsumerVerticle(KafkaConfig kafkaConfig) { @Override public void start(Promise startPromise) { - eventTypes().forEach(eventType -> { + KafkaConfig config; + if (getDeserializerClass() != null) { + config = kafkaConfig.toBuilder() + .consumerValueDeserializerClass(getDeserializerClass()) + .build(); + } else { + config = kafkaConfig; + } + eventTypes().forEach(eventType -> { SubscriptionDefinition subscriptionDefinition = getSubscriptionDefinition(eventType); - consumers.add(KafkaConsumerWrapper.builder() + consumers.add(KafkaConsumerWrapper.builder() .context(context) .vertx(vertx) - .kafkaConfig(kafkaConfig) + .kafkaConfig(config) .loadLimit(loadLimit()) .globalLoadSensor(new GlobalLoadSensor()) .subscriptionDefinition(subscriptionDefinition) @@ -64,9 +72,9 @@ protected Optional namespace() { return Optional.of(getDefaultNameSpace()); } - protected abstract AsyncRecordHandler recordHandler(); + protected abstract AsyncRecordHandler recordHandler(); - protected ProcessRecordErrorHandler processRecordErrorHandler() { + protected ProcessRecordErrorHandler processRecordErrorHandler() { return null; } @@ -89,4 +97,11 @@ private SubscriptionDefinition getSubscriptionDefinition(String eventType) { private String getConsumerName() { return constructModuleName() + "_" + getClass().getSimpleName(); } + + /** + * Set a custom deserializer class for this kafka consumer + */ + public String getDeserializerClass() { + return null; + } } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityDomainConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityDomainConsumersVerticle.java index f4d7fb02f..d45f3f708 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityDomainConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityDomainConsumersVerticle.java @@ -15,7 +15,7 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class AuthorityDomainConsumersVerticle extends AbstractConsumerVerticle { +public class AuthorityDomainConsumersVerticle extends AbstractConsumerVerticle { private final AuthorityDomainKafkaHandler authorityDomainKafkaHandler; diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityLinkChunkConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityLinkChunkConsumersVerticle.java index 17b995301..8094b3841 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityLinkChunkConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/AuthorityLinkChunkConsumersVerticle.java @@ -15,7 +15,7 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class AuthorityLinkChunkConsumersVerticle extends AbstractConsumerVerticle { +public class AuthorityLinkChunkConsumersVerticle extends AbstractConsumerVerticle { private final AuthorityLinkChunkKafkaHandler kafkaHandler; diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/DataImportConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/DataImportConsumersVerticle.java index f508bb4b8..a4f020945 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/DataImportConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/DataImportConsumersVerticle.java @@ -37,7 +37,7 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class DataImportConsumersVerticle extends AbstractConsumerVerticle { +public class DataImportConsumersVerticle extends AbstractConsumerVerticle { private static final List EVENTS = Arrays.asList( DI_INVENTORY_AUTHORITY_CREATED_READY_FOR_POST_PROCESSING.value(), @@ -65,7 +65,7 @@ public class DataImportConsumersVerticle extends AbstractConsumerVerticle { DI_SRS_MARC_HOLDINGS_RECORD_MATCHED.value() ); - private final AsyncRecordHandler dataImportKafkaHandler; + private final AsyncRecordHandler dataImportKafkaHandler; @Value("${srs.kafka.DataImportConsumer.loadLimit:5}") private int loadLimit; @@ -73,7 +73,7 @@ public class DataImportConsumersVerticle extends AbstractConsumerVerticle { @Autowired public DataImportConsumersVerticle(KafkaConfig kafkaConfig, @Qualifier("DataImportKafkaHandler") - AsyncRecordHandler dataImportKafkaHandler) { + AsyncRecordHandler dataImportKafkaHandler) { super(kafkaConfig); this.dataImportKafkaHandler = dataImportKafkaHandler; } @@ -84,7 +84,7 @@ protected int loadLimit() { } @Override - protected AsyncRecordHandler recordHandler() { + protected AsyncRecordHandler recordHandler() { return dataImportKafkaHandler; } @@ -93,4 +93,9 @@ protected List eventTypes() { return EVENTS; } + @Override + public String getDeserializerClass() { + return "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + } + } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticle.java index fe3f32e48..05c1dccaa 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticle.java @@ -15,11 +15,11 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class ParsedRecordChunkConsumersVerticle extends AbstractConsumerVerticle { +public class ParsedRecordChunkConsumersVerticle extends AbstractConsumerVerticle { - private final AsyncRecordHandler parsedRecordChunksKafkaHandler; + private final AsyncRecordHandler parsedRecordChunksKafkaHandler; - private final ProcessRecordErrorHandler parsedRecordChunksErrorHandler; + private final ProcessRecordErrorHandler parsedRecordChunksErrorHandler; @Value("${srs.kafka.ParsedMarcChunkConsumer.loadLimit:5}") private int loadLimit; @@ -27,16 +27,16 @@ public class ParsedRecordChunkConsumersVerticle extends AbstractConsumerVerticle @Autowired protected ParsedRecordChunkConsumersVerticle(KafkaConfig kafkaConfig, @Qualifier("parsedRecordChunksKafkaHandler") - AsyncRecordHandler parsedRecordChunksKafkaHandler, + AsyncRecordHandler parsedRecordChunksKafkaHandler, @Qualifier("parsedRecordChunksErrorHandler") - ProcessRecordErrorHandler parsedRecordChunksErrorHandler) { + ProcessRecordErrorHandler parsedRecordChunksErrorHandler) { super(kafkaConfig); this.parsedRecordChunksKafkaHandler = parsedRecordChunksKafkaHandler; this.parsedRecordChunksErrorHandler = parsedRecordChunksErrorHandler; } @Override - protected ProcessRecordErrorHandler processRecordErrorHandler() { + protected ProcessRecordErrorHandler processRecordErrorHandler() { return parsedRecordChunksErrorHandler; } @@ -46,7 +46,7 @@ protected int loadLimit() { } @Override - protected AsyncRecordHandler recordHandler() { + protected AsyncRecordHandler recordHandler() { return parsedRecordChunksKafkaHandler; } @@ -55,4 +55,9 @@ protected List eventTypes() { return List.of(DI_RAW_RECORDS_CHUNK_PARSED.value()); } + @Override + public String getDeserializerClass() { + return "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + } + } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/QuickMarcConsumersVerticle.java b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/QuickMarcConsumersVerticle.java index a4551c1c1..7e3e61764 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/QuickMarcConsumersVerticle.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/verticle/consumers/QuickMarcConsumersVerticle.java @@ -14,7 +14,7 @@ @Component @Scope(SCOPE_PROTOTYPE) -public class QuickMarcConsumersVerticle extends AbstractConsumerVerticle { +public class QuickMarcConsumersVerticle extends AbstractConsumerVerticle { private final QuickMarcKafkaHandler kafkaHandler; diff --git a/mod-source-record-storage-server/src/main/resources/vertx-default-jul-logging.properties b/mod-source-record-storage-server/src/main/resources/vertx-default-jul-logging.properties deleted file mode 100644 index b2a56574d..000000000 --- a/mod-source-record-storage-server/src/main/resources/vertx-default-jul-logging.properties +++ /dev/null @@ -1,5 +0,0 @@ -handlers = java.util.logging.ConsoleHandler -.level = ALL - -logger.cql2pgjson.level = ERROR -logger.cql2pgjson.name = org.folio.cql2pgjson.CQL2PgJSON