Skip to content

Commit

Permalink
MODSOURCE-733 Reduce memory allocations of Strings (#591)
Browse files Browse the repository at this point in the history
* MODSOURCE-733 Reduce memory allocations of Strings
instead of byte[] -> String -> POJO, it is now byte[] -> POJO
  • Loading branch information
okolawole-ebsco authored Jan 11, 2024
1 parent cb5f83c commit 308292b
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 41 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## 2024-xx-xx 5.8.0-SNAPSHOT
* [MODSOURCE-733](https://issues.folio.org/browse/MODSOURCE-733) Reduce Memory Allocation of Strings
* [MODSOURCE-506](https://issues.folio.org/browse/MODSOURCE-506) Remove rawRecord field from source record
* [MODSOURCE-709](https://issues.folio.org/browse/MODSOURCE-709) MARC authority record is not created when use Job profile with match profile by absent subfield/field
* [MODSOURCE-677](https://issues.folio.org/browse/MODSOURCE-677) Import is completed with errors when control field that differs from 001 is used for marc-to-marc matching
Expand Down
2 changes: 1 addition & 1 deletion mod-source-record-storage-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
<dependency>
<groupId>org.folio</groupId>
<artifactId>folio-kafka-wrapper</artifactId>
<version>3.0.0</version>
<version>3.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>net.mguenther.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.DataImportEventPayload;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.processing.events.EventManager;
import org.folio.processing.exceptions.EventProcessingException;
Expand All @@ -29,7 +29,7 @@

@Component
@Qualifier("DataImportKafkaHandler")
public class DataImportKafkaHandler implements AsyncRecordHandler<String, String> {
public class DataImportKafkaHandler implements AsyncRecordHandler<String, byte[]> {

private static final Logger LOGGER = LogManager.getLogger();

Expand All @@ -48,14 +48,14 @@ public DataImportKafkaHandler(Vertx vertx, JobProfileSnapshotCache profileSnapsh
}

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
public Future<String> handle(KafkaConsumerRecord<String, byte[]> targetRecord) {
LOGGER.trace("handle:: Handling kafka record: {}", targetRecord);
String recordId = extractHeaderValue(RECORD_ID_HEADER, targetRecord.headers());
String chunkId = extractHeaderValue(CHUNK_ID_HEADER, targetRecord.headers());
String userId = extractHeaderValue(USER_ID_HEADER, targetRecord.headers());
try {
Promise<String> promise = Promise.promise();
Event event = ObjectMapperTool.getMapper().readValue(targetRecord.value(), Event.class);
Event event = DatabindCodec.mapper().readValue(targetRecord.value(), Event.class);
DataImportEventPayload eventPayload = Json.decodeValue(event.getEventPayload(), DataImportEventPayload.class);
LOGGER.debug("handle:: Data import event payload has been received with event type: '{}' by jobExecutionId: '{}' and recordId: '{}' and chunkId: '{}' and userId: '{}'",
eventPayload.getEventType(), eventPayload.getJobExecutionId(), recordId, chunkId, userId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
Expand Down Expand Up @@ -37,7 +38,7 @@
import static org.folio.services.util.KafkaUtil.extractHeaderValue;

@Component
public class ParsedRecordChunksKafkaHandler implements AsyncRecordHandler<String, String> {
public class ParsedRecordChunksKafkaHandler implements AsyncRecordHandler<String, byte[]> {
private static final Logger LOGGER = LogManager.getLogger();

public static final String JOB_EXECUTION_ID_HEADER = "jobExecutionId";
Expand Down Expand Up @@ -65,7 +66,7 @@ public ParsedRecordChunksKafkaHandler(@Autowired RecordService recordService,
}

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
public Future<String> handle(KafkaConsumerRecord<String, byte[]> targetRecord) {
LOGGER.trace("handle:: Handling kafka record: {}", targetRecord);
String jobExecutionId = extractHeaderValue(JOB_EXECUTION_ID_HEADER, targetRecord.headers());
String chunkId = extractHeaderValue(CHUNK_ID_HEADER, targetRecord.headers());
Expand All @@ -74,7 +75,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
String key = targetRecord.key();

try {
Event event = Json.decodeValue(targetRecord.value(), Event.class);
Event event = DatabindCodec.mapper().readValue(targetRecord.value(), Event.class);
RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class);

List<KafkaHeader> kafkaHeaders = targetRecord.headers();
Expand All @@ -93,7 +94,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> targetRecord) {
}
}

private Future<String> sendBackRecordsBatchResponse(RecordsBatchResponse recordsBatchResponse, List<KafkaHeader> kafkaHeaders, String tenantId, int chunkNumber, String eventType, KafkaConsumerRecord<String, String> commonRecord) {
private Future<String> sendBackRecordsBatchResponse(RecordsBatchResponse recordsBatchResponse, List<KafkaHeader> kafkaHeaders, String tenantId, int chunkNumber, String eventType, KafkaConsumerRecord<String, byte[]> commonRecord) {
Event event;
event = new Event()
.withId(UUID.randomUUID().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl;
Expand All @@ -23,6 +24,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -38,7 +40,7 @@
* with status 'Completed with errors' with showing error messge instead of hanging progress bar.
*/
@Component
public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler<String, String> {
public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler<String, byte[]> {

private static final Logger LOGGER = LogManager.getLogger();

Expand All @@ -53,12 +55,18 @@ public class ParsedRecordChunksErrorHandler implements ProcessRecordErrorHandler
private Vertx vertx;

@Override
public void handle(Throwable throwable, KafkaConsumerRecord<String, String> record) {
LOGGER.trace("handle:: Handling record {}", record);
Event event = Json.decodeValue(record.value(), Event.class);
RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class);
public void handle(Throwable throwable, KafkaConsumerRecord<String, byte[]> consumerRecord) {
LOGGER.trace("handle:: Handling record {}", consumerRecord);
Event event;
try {
event = DatabindCodec.mapper().readValue(consumerRecord.value(), Event.class);
} catch (IOException e) {
LOGGER.error("Something happened when deserializing record", e);
return;
}
RecordCollection recordCollection = Json.decodeValue(event.getEventPayload(), RecordCollection.class);

List<KafkaHeader> kafkaHeaders = record.headers();
List<KafkaHeader> kafkaHeaders = consumerRecord.headers();
OkapiConnectionParams okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);

String jobExecutionId = okapiConnectionParams.getHeaders().get(JOB_EXECUTION_ID_HEADER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import org.folio.kafka.SubscriptionDefinition;
import org.folio.okapi.common.GenericCompositeFuture;

public abstract class AbstractConsumerVerticle extends AbstractVerticle {
public abstract class AbstractConsumerVerticle<K,V> extends AbstractVerticle {

private final List<KafkaConsumerWrapper<String, String>> consumers = new ArrayList<>();
private final List<KafkaConsumerWrapper<K, V>> consumers = new ArrayList<>();

private final KafkaConfig kafkaConfig;

Expand All @@ -31,12 +31,20 @@ protected AbstractConsumerVerticle(KafkaConfig kafkaConfig) {

@Override
public void start(Promise<Void> startPromise) {
eventTypes().forEach(eventType -> {
KafkaConfig config;
if (getDeserializerClass() != null) {
config = kafkaConfig.toBuilder()
.consumerValueDeserializerClass(getDeserializerClass())
.build();
} else {
config = kafkaConfig;
}
eventTypes().forEach(eventType -> {
SubscriptionDefinition subscriptionDefinition = getSubscriptionDefinition(eventType);
consumers.add(KafkaConsumerWrapper.<String, String>builder()
consumers.add(KafkaConsumerWrapper.<K, V>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.kafkaConfig(config)
.loadLimit(loadLimit())
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(subscriptionDefinition)
Expand Down Expand Up @@ -64,9 +72,9 @@ protected Optional<String> namespace() {
return Optional.of(getDefaultNameSpace());
}

protected abstract AsyncRecordHandler<String, String> recordHandler();
protected abstract AsyncRecordHandler<K, V> recordHandler();

protected ProcessRecordErrorHandler<String, String> processRecordErrorHandler() {
protected ProcessRecordErrorHandler<K, V> processRecordErrorHandler() {
return null;
}

Expand All @@ -89,4 +97,11 @@ private SubscriptionDefinition getSubscriptionDefinition(String eventType) {
private String getConsumerName() {
return constructModuleName() + "_" + getClass().getSimpleName();
}

/**
* Set a custom deserializer class for this kafka consumer
*/
public String getDeserializerClass() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@Component
@Scope(SCOPE_PROTOTYPE)
public class AuthorityDomainConsumersVerticle extends AbstractConsumerVerticle {
public class AuthorityDomainConsumersVerticle extends AbstractConsumerVerticle<String, String> {

private final AuthorityDomainKafkaHandler authorityDomainKafkaHandler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@Component
@Scope(SCOPE_PROTOTYPE)
public class AuthorityLinkChunkConsumersVerticle extends AbstractConsumerVerticle {
public class AuthorityLinkChunkConsumersVerticle extends AbstractConsumerVerticle<String, String> {

private final AuthorityLinkChunkKafkaHandler kafkaHandler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

@Component
@Scope(SCOPE_PROTOTYPE)
public class DataImportConsumersVerticle extends AbstractConsumerVerticle {
public class DataImportConsumersVerticle extends AbstractConsumerVerticle<String, byte[]> {

private static final List<String> EVENTS = Arrays.asList(
DI_INVENTORY_AUTHORITY_CREATED_READY_FOR_POST_PROCESSING.value(),
Expand Down Expand Up @@ -65,15 +65,15 @@ public class DataImportConsumersVerticle extends AbstractConsumerVerticle {
DI_SRS_MARC_HOLDINGS_RECORD_MATCHED.value()
);

private final AsyncRecordHandler<String, String> dataImportKafkaHandler;
private final AsyncRecordHandler<String, byte[]> dataImportKafkaHandler;

@Value("${srs.kafka.DataImportConsumer.loadLimit:5}")
private int loadLimit;

@Autowired
public DataImportConsumersVerticle(KafkaConfig kafkaConfig,
@Qualifier("DataImportKafkaHandler")
AsyncRecordHandler<String, String> dataImportKafkaHandler) {
AsyncRecordHandler<String, byte[]> dataImportKafkaHandler) {
super(kafkaConfig);
this.dataImportKafkaHandler = dataImportKafkaHandler;
}
Expand All @@ -84,7 +84,7 @@ protected int loadLimit() {
}

@Override
protected AsyncRecordHandler<String, String> recordHandler() {
protected AsyncRecordHandler<String, byte[]> recordHandler() {
return dataImportKafkaHandler;
}

Expand All @@ -93,4 +93,9 @@ protected List<String> eventTypes() {
return EVENTS;
}

@Override
public String getDeserializerClass() {
return "org.apache.kafka.common.serialization.ByteArrayDeserializer";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@

@Component
@Scope(SCOPE_PROTOTYPE)
public class ParsedRecordChunkConsumersVerticle extends AbstractConsumerVerticle {
public class ParsedRecordChunkConsumersVerticle extends AbstractConsumerVerticle<String, byte[]> {

private final AsyncRecordHandler<String, String> parsedRecordChunksKafkaHandler;
private final AsyncRecordHandler<String, byte[]> parsedRecordChunksKafkaHandler;

private final ProcessRecordErrorHandler<String, String> parsedRecordChunksErrorHandler;
private final ProcessRecordErrorHandler<String, byte[]> parsedRecordChunksErrorHandler;

@Value("${srs.kafka.ParsedMarcChunkConsumer.loadLimit:5}")
private int loadLimit;

@Autowired
protected ParsedRecordChunkConsumersVerticle(KafkaConfig kafkaConfig,
@Qualifier("parsedRecordChunksKafkaHandler")
AsyncRecordHandler<String, String> parsedRecordChunksKafkaHandler,
AsyncRecordHandler<String, byte[]> parsedRecordChunksKafkaHandler,
@Qualifier("parsedRecordChunksErrorHandler")
ProcessRecordErrorHandler<String, String> parsedRecordChunksErrorHandler) {
ProcessRecordErrorHandler<String, byte[]> parsedRecordChunksErrorHandler) {
super(kafkaConfig);
this.parsedRecordChunksKafkaHandler = parsedRecordChunksKafkaHandler;
this.parsedRecordChunksErrorHandler = parsedRecordChunksErrorHandler;
}

@Override
protected ProcessRecordErrorHandler<String, String> processRecordErrorHandler() {
protected ProcessRecordErrorHandler<String, byte[]> processRecordErrorHandler() {
return parsedRecordChunksErrorHandler;
}

Expand All @@ -46,7 +46,7 @@ protected int loadLimit() {
}

@Override
protected AsyncRecordHandler<String, String> recordHandler() {
protected AsyncRecordHandler<String, byte[]> recordHandler() {
return parsedRecordChunksKafkaHandler;
}

Expand All @@ -55,4 +55,9 @@ protected List<String> eventTypes() {
return List.of(DI_RAW_RECORDS_CHUNK_PARSED.value());
}

@Override
public String getDeserializerClass() {
return "org.apache.kafka.common.serialization.ByteArrayDeserializer";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

@Component
@Scope(SCOPE_PROTOTYPE)
public class QuickMarcConsumersVerticle extends AbstractConsumerVerticle {
public class QuickMarcConsumersVerticle extends AbstractConsumerVerticle<String, String> {

private final QuickMarcKafkaHandler kafkaHandler;

Expand Down

This file was deleted.

0 comments on commit 308292b

Please sign in to comment.