Skip to content

Commit

Permalink
MODSOURCE-752: draft design
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jul 26, 2024
1 parent 2aa801b commit 429e989
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.folio.consumers.RecordMappingUtils.readParsedContentToObjectRepresentation;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.services.util.EventHandlingUtil.createProducer;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;
import static org.folio.services.util.KafkaUtil.extractHeaderValue;

import io.vertx.core.Future;
Expand Down Expand Up @@ -92,7 +93,8 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
.compose(this::createSnapshot)
.compose(event -> retrieveRecords(event, event.getTenant())
.compose(recordCollection -> mapRecordFieldsChanges(event, recordCollection, userId))
.compose(recordCollection -> recordService.saveRecords(recordCollection, event.getTenant()))
.compose(recordCollection -> recordService.saveRecords(recordCollection,
toOkapiHeaders(consumerRecord.headers(), event.getTenant())))
.map(recordsBatchResponse -> sendReports(recordsBatchResponse, event, consumerRecord.headers()))
.map(recordsBatchResponse -> mapRecordsToBibUpdateEvents(recordsBatchResponse, event))
.compose(marcBibUpdates -> sendEvents(marcBibUpdates, event, consumerRecord))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_PARSED_RECORDS_CHUNK_SAVED;
import static org.folio.services.util.EventHandlingUtil.constructModuleName;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;
import static org.folio.services.util.KafkaUtil.extractHeaderValue;

@Component
Expand Down Expand Up @@ -85,7 +86,7 @@ public Future<String> handle(KafkaConsumerRecord<String, byte[]> targetRecord) {
LOGGER.debug("handle:: RecordCollection has been received with event: '{}', jobExecutionId '{}', chunkId: '{}', starting processing... chunkNumber '{}'-'{}'",
event.getEventType(), jobExecutionId, chunkId, chunkNumber, key);
setUserMetadata(recordCollection, userId);
return recordService.saveRecords(recordCollection, tenantId)
return recordService.saveRecords(recordCollection, toOkapiHeaders(kafkaHeaders, null))
.compose(recordsBatchResponse -> sendBackRecordsBatchResponse(recordsBatchResponse, kafkaHeaders, tenantId, chunkNumber, event.getEventType(), targetRecord));
} catch (Exception e) {
LOGGER.warn("handle:: RecordCollection processing has failed with errors jobExecutionId '{}', chunkId: '{}', chunkNumber '{}'-'{}'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

Expand Down Expand Up @@ -189,10 +190,10 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* Saves {@link Record} to the db
*
* @param record Record to save
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with saved Record
*/
Future<Record> saveRecord(Record record, String tenantId);
Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders);

/**
* Saves {@link Record} to the db using {@link ReactiveClassicGenericQueryExecutor}
Expand All @@ -201,25 +202,25 @@ Future<RecordsIdentifiersCollection> getMatchedRecordsIdentifiers(MatchField mat
* @param record Record to save
* @return future with saved Record
*/
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record);
Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record, Map<String, String> okapiHeaders);

/**
* Saves {@link RecordCollection} to the db
*
* @param recordCollection Record collection to save
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with saved {@link RecordsBatchResponse}
*/
Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, String tenantId);
Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders);

/**
* Updates {{@link Record} in the db
*
* @param record Record to update
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with updated Record
*/
Future<Record> updateRecord(Record record, String tenantId);
Future<Record> updateRecord(Record record, Map<String, String> okapiHeaders);

/**
* Increments generation in case a record with the same matchedId exists
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,42 @@
package org.folio.dao;

import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static org.folio.dao.util.AdvisoryLockUtil.acquireLock;
import static org.folio.dao.util.ErrorRecordDaoUtil.ERROR_RECORD_CONTENT;
import static org.folio.dao.util.ParsedRecordDaoUtil.PARSED_RECORD_CONTENT;
import static org.folio.dao.util.RawRecordDaoUtil.RAW_RECORD_CONTENT;
import static org.folio.dao.util.RecordDaoUtil.RECORD_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.RecordDaoUtil.ensureRecordForeignKeys;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalIdNonNull;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByState;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByType;
import static org.folio.dao.util.RecordDaoUtil.getExternalHrid;
import static org.folio.dao.util.RecordDaoUtil.getExternalId;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE;
import static org.folio.okapi.common.XOkapiHeaders.TENANT;
import static org.folio.rest.jooq.Tables.ERROR_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING;
import static org.folio.rest.jooq.Tables.RAW_RECORDS_LB;
import static org.folio.rest.jooq.Tables.RECORDS_LB;
import static org.folio.rest.jooq.Tables.SNAPSHOTS_LB;
import static org.folio.rest.jooq.enums.RecordType.MARC_BIB;
import static org.folio.rest.util.QueryParamUtil.toRecordType;
import static org.jooq.impl.DSL.condition;
import static org.jooq.impl.DSL.countDistinct;
import static org.jooq.impl.DSL.exists;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.inline;
import static org.jooq.impl.DSL.max;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.primaryKey;
import static org.jooq.impl.DSL.select;
import static org.jooq.impl.DSL.selectDistinct;
import static org.jooq.impl.DSL.table;
import static org.jooq.impl.DSL.trueCondition;

import com.google.common.collect.Lists;
import io.github.jklingsporn.vertx.jooq.classic.reactivepg.ReactiveClassicGenericQueryExecutor;
import io.github.jklingsporn.vertx.jooq.shared.internal.QueryResult;
Expand All @@ -11,6 +48,24 @@
import io.vertx.reactivex.pgclient.PgPool;
import io.vertx.reactivex.sqlclient.SqlConnection;
import io.vertx.sqlclient.Row;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.NotFoundException;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.BinaryExpression;
import net.sf.jsqlparser.expression.Expression;
Expand Down Expand Up @@ -60,6 +115,7 @@
import org.folio.rest.jooq.tables.records.RecordsLbRecord;
import org.folio.rest.jooq.tables.records.SnapshotsLbRecord;
import org.folio.services.RecordSearchParameters;
import org.folio.services.domainevent.RecordDomainEventPublisher;
import org.folio.services.util.TypeConnection;
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
Expand Down Expand Up @@ -90,50 +146,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.ws.rs.BadRequestException;
import javax.ws.rs.NotFoundException;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static org.folio.dao.util.AdvisoryLockUtil.acquireLock;
import static org.folio.dao.util.ErrorRecordDaoUtil.ERROR_RECORD_CONTENT;
import static org.folio.dao.util.ParsedRecordDaoUtil.PARSED_RECORD_CONTENT;
import static org.folio.dao.util.RawRecordDaoUtil.RAW_RECORD_CONTENT;
import static org.folio.dao.util.RecordDaoUtil.RECORD_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.RecordDaoUtil.ensureRecordForeignKeys;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalIdNonNull;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByState;
import static org.folio.dao.util.RecordDaoUtil.filterRecordByType;
import static org.folio.dao.util.RecordDaoUtil.getExternalHrid;
import static org.folio.dao.util.RecordDaoUtil.getExternalId;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE;
import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE;
import static org.folio.rest.jooq.Tables.ERROR_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_LB;
import static org.folio.rest.jooq.Tables.MARC_RECORDS_TRACKING;
import static org.folio.rest.jooq.Tables.RAW_RECORDS_LB;
import static org.folio.rest.jooq.Tables.RECORDS_LB;
import static org.folio.rest.jooq.Tables.SNAPSHOTS_LB;
import static org.folio.rest.jooq.enums.RecordType.MARC_BIB;
import static org.folio.rest.util.QueryParamUtil.toRecordType;
import static org.jooq.impl.DSL.*;

@Component
public class RecordDaoImpl implements RecordDao {

Expand Down Expand Up @@ -216,13 +228,16 @@ public class RecordDaoImpl implements RecordDao {
public static final Field<UUID> MARC_INDEXERS_MARC_ID = field(TABLE_FIELD_TEMPLATE, UUID.class, field(MARC_INDEXERS), field(MARC_ID));

private final PostgresClientFactory postgresClientFactory;
private final RecordDomainEventPublisher recordDomainEventPublisher;

@org.springframework.beans.factory.annotation.Value("${srs.record.matching.fallback-query.enable:false}")
private boolean enableFallbackQuery;

@Autowired
public RecordDaoImpl(final PostgresClientFactory postgresClientFactory) {
public RecordDaoImpl(final PostgresClientFactory postgresClientFactory,
final RecordDomainEventPublisher recordDomainEventPublisher) {
this.postgresClientFactory = postgresClientFactory;
this.recordDomainEventPublisher = recordDomainEventPublisher;
}

@Override
Expand Down Expand Up @@ -695,19 +710,24 @@ public Future<Optional<Record>> getRecordByCondition(ReactiveClassicGenericQuery
}

@Override
public Future<Record> saveRecord(Record record, String tenantId) {
public Future<Record> saveRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
LOG.trace("saveRecord:: Saving {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record));
return getQueryExecutor(tenantId).transaction(txQE -> saveRecord(txQE, record, okapiHeaders))
.onSuccess(created -> recordDomainEventPublisher.publishRecordCreated(created, okapiHeaders));
}

@Override
public Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record) {
public Future<Record> saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record,
Map<String, String> okapiHeaders) {
LOG.trace("saveRecord:: Saving {} record {}", record.getRecordType(), record.getId());
return insertOrUpdateRecord(txQE, record);
return insertOrUpdateRecord(txQE, record)
.onSuccess(created -> recordDomainEventPublisher.publishRecordCreated(created, okapiHeaders));
}

@Override
public Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, String tenantId) {
public Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollection, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
logRecordCollection("saveRecords:: Saving", recordCollection, tenantId);
Promise<RecordsBatchResponse> finalPromise = Promise.promise();
Context context = Vertx.currentContext();
Expand Down Expand Up @@ -917,15 +937,19 @@ public Future<RecordsBatchResponse> saveRecords(RecordCollection recordCollectio
}
});

return finalPromise.future();
return finalPromise.future()
.onSuccess(response -> response.getRecords()
.forEach(r -> recordDomainEventPublisher.publishRecordCreated(r, okapiHeaders))
);
}

@Override
public Future<Record> updateRecord(Record record, String tenantId) {
public Future<Record> updateRecord(Record record, Map<String, String> okapiHeaders) {
var tenantId = okapiHeaders.get(TENANT);
LOG.trace("updateRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId);
return getQueryExecutor(tenantId).transaction(txQE -> getRecordById(txQE, record.getId())
.compose(optionalRecord -> optionalRecord
.map(r -> saveRecord(txQE, record))
.map(r -> saveRecord(txQE, record, okapiHeaders))
.orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId()))))));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void postSourceStorageBatchRecords(RecordCollection entity, Map<String, S
vertxContext.runOnContext(v -> {
try {
MetadataUtil.populateMetadata(entity.getRecords(), okapiHeaders);
recordService.saveRecords(entity, tenantId)
recordService.saveRecords(entity, okapiHeaders)
.map(recordsBatchResponse -> {
if (!recordsBatchResponse.getRecords().isEmpty()) {
return PostSourceStorageBatchRecordsResponse.respond201WithApplicationJson(recordsBatchResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void postSourceStoragePopulateTestMarcRecords(TestMarcRecordsCollection e
}
return record;
})
.forEach(marcRecord -> futures.add(recordService.saveRecord(marcRecord, tenantId)));
.forEach(marcRecord -> futures.add(recordService.saveRecord(marcRecord, okapiHeaders)));

GenericCompositeFuture.all(futures).onComplete(result -> {
if (result.succeeded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void postSourceStorageRecords(Record entity, Map<String, String> okapiHea
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
try {
recordService.saveRecord(entity, tenantId)
recordService.saveRecord(entity, okapiHeaders)
.map((Response) PostSourceStorageRecordsResponse.respond201WithApplicationJson(entity, PostSourceStorageRecordsResponse.headersFor201()))
.otherwise(ExceptionHelper::mapExceptionToResponse).onComplete(asyncResultHandler);
} catch (Exception e) {
Expand Down Expand Up @@ -88,7 +88,7 @@ public void putSourceStorageRecordsById(String id, Record entity, Map<String, St
vertxContext.runOnContext(v -> {
try {
entity.setId(id);
recordService.updateRecord(entity, tenantId)
recordService.updateRecord(entity, okapiHeaders)
.map(updated -> PutSourceStorageRecordsByIdResponse.respond200WithApplicationJson(entity))
.map(Response.class::cast).otherwise(ExceptionHelper::mapExceptionToResponse)
.onComplete(asyncResultHandler);
Expand All @@ -103,7 +103,7 @@ public void putSourceStorageRecordsGenerationById(String matchedId, Record entit
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
try {
recordService.updateRecordGeneration(matchedId, entity, tenantId)
recordService.updateRecordGeneration(matchedId, entity, okapiHeaders)
.map(updated -> PutSourceStorageRecordsGenerationByIdResponse.respond200WithApplicationJson(entity))
.map(Response.class::cast).otherwise(ExceptionHelper::mapExceptionToResponse)
.onComplete(asyncResultHandler);
Expand All @@ -119,7 +119,7 @@ public void deleteSourceStorageRecordsById(String id, String idType, Map<String,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
vertxContext.runOnContext(v -> {
try {
recordService.deleteRecordById(id, toExternalIdType(idType), tenantId).map(r -> true)
recordService.deleteRecordById(id, toExternalIdType(idType), okapiHeaders).map(r -> true)
.map(updated -> DeleteSourceStorageRecordsByIdResponse.respond204()).map(Response.class::cast)
.otherwise(ExceptionHelper::mapExceptionToResponse).onComplete(asyncResultHandler);
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 429e989

Please sign in to comment.