Skip to content

Commit

Permalink
MODSOURMAN-1020: Save incoming records for DI logs (#819)
Browse files Browse the repository at this point in the history
* MODSOURMAN-1020: add IncomingRecord entity DAO and service layers;
save IncomingRecords with JournalRecords after incoming records are parsed
  • Loading branch information
yaroslav-epam authored Nov 3, 2023
1 parent f6f6410 commit bb0745e
Show file tree
Hide file tree
Showing 21 changed files with 425 additions and 26 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2023-xx-xx v3.8.0-SNAPSHOT
* [MODSOURMAN-1020](https://issues.folio.org/browse/MODSOURMAN-1020) Add table to save incoming records for DI logs

## 2023-10-13 v3.7.0
* [MODSOURMAN-1045](https://issues.folio.org/browse/MODSOURMAN-1045) Allow create action with non-matches for instance without match profile
* [MODSOURMAN-1003](https://issues.folio.org/browse/MODSOURMAN-1003) Allow create action with non-matches for instance
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.folio.dao;

import io.vertx.core.Future;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import org.folio.rest.jaxrs.model.IncomingRecord;

import java.util.List;

/**
* DAO interface for the {@link IncomingRecord} entity
*/
public interface IncomingRecordDao {

/**
* Saves {@link IncomingRecord} entities into DB
*
* @param incomingRecords {@link IncomingRecord} entities to save
* @param tenantId tenant id
* @return future with created incomingRecords entities represented as row set
*/
Future<List<RowSet<Row>>> saveBatch(List<IncomingRecord> incomingRecords, String tenantId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.folio.dao;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.util.PostgresClientFactory;
import org.folio.rest.jaxrs.model.IncomingRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import java.util.List;
import java.util.UUID;

import static java.lang.String.format;
import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard;

@Repository
public class IncomingRecordDaoImpl implements IncomingRecordDao {

private static final Logger LOGGER = LogManager.getLogger();
public static final String INCOMING_RECORDS_TABLE = "incoming_records";
private static final String INSERT_SQL = "INSERT INTO %s.%s (id, job_execution_id, incoming_record) VALUES ($1, $2, $3)";

@Autowired
private PostgresClientFactory pgClientFactory;

@Override
public Future<List<RowSet<Row>>> saveBatch(List<IncomingRecord> incomingRecords, String tenantId) {
LOGGER.info("saveBatch:: Save IncomingRecord entity to the {} table", INCOMING_RECORDS_TABLE);
Promise<List<RowSet<Row>>> promise = Promise.promise();
try {
String query = format(INSERT_SQL, convertToPsqlStandard(tenantId), INCOMING_RECORDS_TABLE);
List<Tuple> tuples = incomingRecords.stream().map(this::prepareInsertQueryParameters).toList();
LOGGER.debug("IncomingRecordDaoImpl:: Save query = {}; tuples = {}", query, tuples);
pgClientFactory.createInstance(tenantId).execute(query, tuples, promise);
} catch (Exception e) {
LOGGER.warn("saveBatch:: Error saving IncomingRecord entity", e);
promise.fail(e);
}
return promise.future().onFailure(e -> LOGGER.warn("saveBatch:: Error saving IncomingRecord entity", e));
}

private Tuple prepareInsertQueryParameters(IncomingRecord incomingRecord) {
return Tuple.of(UUID.fromString(incomingRecord.getId()), UUID.fromString(incomingRecord.getJobExecutionId()),
JsonObject.mapFrom(incomingRecord));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static java.lang.String.format;
import static java.util.Objects.nonNull;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.folio.dao.IncomingRecordDaoImpl.INCOMING_RECORDS_TABLE;
import static org.folio.dao.util.JobExecutionDBConstants.COMPLETED_DATE_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.CURRENTLY_PROCESSED_FIELD;
import static org.folio.dao.util.JobExecutionDBConstants.ERROR_STATUS_FIELD;
Expand Down Expand Up @@ -597,12 +598,13 @@ public Future<Boolean> hardDeleteJobExecutions(long diffNumberOfDays, String ten
return Future.succeededFuture();
}

UUID[] uuids = jobExecutionIds.stream().map(UUID::fromString).collect(Collectors.toList()).toArray(UUID[]::new);
UUID[] uuids = jobExecutionIds.stream().map(UUID::fromString).toList().toArray(UUID[]::new);

Future<RowSet<Row>> jobExecutionProgressFuture = Future.future(rowSetPromise -> deleteFromRelatedTable(PROGRESS_TABLE_NAME, uuids, sqlConnection, tenantId, rowSetPromise, postgresClient));
Future<RowSet<Row>> jobExecutionSourceChunksFuture = Future.future(rowSetPromise -> deleteFromRelatedTableWithDeprecatedNaming(JOB_EXECUTION_SOURCE_CHUNKS_TABLE_NAME, uuids, sqlConnection, tenantId, rowSetPromise, postgresClient));
Future<RowSet<Row>> journalRecordsFuture = Future.future(rowSetPromise -> deleteFromRelatedTable(JOURNAL_RECORDS_TABLE_NAME, uuids, sqlConnection, tenantId, rowSetPromise, postgresClient));
return CompositeFuture.all(jobExecutionProgressFuture, jobExecutionSourceChunksFuture, journalRecordsFuture)
Future<RowSet<Row>> incomingRecordsFuture = Future.future(rowSetPromise -> deleteFromRelatedTable(INCOMING_RECORDS_TABLE, uuids, sqlConnection, tenantId, rowSetPromise, postgresClient));
return CompositeFuture.all(jobExecutionProgressFuture, jobExecutionSourceChunksFuture, journalRecordsFuture, incomingRecordsFuture)
.compose(ar -> Future.<RowSet<Row>>future(rowSetPromise -> deleteFromJobExecutionTable(uuids, sqlConnection, tenantId, rowSetPromise, postgresClient)))
.map(true);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private Tuple prepareInsertQueryParameters(JournalRecord journalRecord) {
UUID.fromString(journalRecord.getJobExecutionId()),
UUID.fromString(journalRecord.getSourceId()),
journalRecord.getSourceRecordOrder(),
journalRecord.getEntityType().toString(),
journalRecord.getEntityType() != null ? journalRecord.getEntityType().toString() : EMPTY,
journalRecord.getEntityId(),
journalRecord.getEntityHrId() != null ? journalRecord.getEntityHrId() : EMPTY,
journalRecord.getActionType().toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.logging.log4j.Logger;
import org.folio.MappingProfile;
import org.folio.services.afterprocessing.FieldModificationService;
import org.folio.services.journal.JournalUtil;
import org.folio.services.validation.JobProfileSnapshotValidationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -123,6 +124,8 @@ public class ChangeEngineServiceImpl implements ChangeEngineService {
private final JobProfileSnapshotValidationService jobProfileSnapshotValidationService;
private final KafkaConfig kafkaConfig;
private final FieldModificationService fieldModificationService;
private final IncomingRecordService incomingRecordService;
private final JournalRecordService journalRecordService;

@Value("${srm.kafka.RawChunksKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;
Expand All @@ -138,7 +141,9 @@ public ChangeEngineServiceImpl(@Autowired JobExecutionSourceChunkDao jobExecutio
@Autowired MappingMetadataService mappingMetadataService,
@Autowired JobProfileSnapshotValidationService jobProfileSnapshotValidationService,
@Autowired KafkaConfig kafkaConfig,
@Autowired FieldModificationService fieldModificationService) {
@Autowired FieldModificationService fieldModificationService,
@Autowired IncomingRecordService incomingRecordService,
@Autowired JournalRecordService journalRecordService) {
this.jobExecutionSourceChunkDao = jobExecutionSourceChunkDao;
this.jobExecutionService = jobExecutionService;
this.marcRecordAnalyzer = marcRecordAnalyzer;
Expand All @@ -148,6 +153,8 @@ public ChangeEngineServiceImpl(@Autowired JobExecutionSourceChunkDao jobExecutio
this.jobProfileSnapshotValidationService = jobProfileSnapshotValidationService;
this.kafkaConfig = kafkaConfig;
this.fieldModificationService = fieldModificationService;
this.incomingRecordService = incomingRecordService;
this.journalRecordService = journalRecordService;
}

@Override
Expand All @@ -159,9 +166,13 @@ public Future<List<Record>> parseRawRecordsChunkForJobExecution(RawRecordsDto ch
params.getTenantId(), acceptInstanceId, params);

futureParsedRecords
.compose(parsedRecords -> isJobProfileCompatibleWithRecordsType(jobExecution.getJobProfileSnapshotWrapper(), parsedRecords)
? Future.succeededFuture(parsedRecords)
: Future.failedFuture(prepareWrongJobProfileErrorMessage(jobExecution, parsedRecords)))
.compose(parsedRecords -> {
saveIncomingAndJournalRecords(parsedRecords, params.getTenantId());

return isJobProfileCompatibleWithRecordsType(jobExecution.getJobProfileSnapshotWrapper(), parsedRecords)
? Future.succeededFuture(parsedRecords)
: Future.failedFuture(prepareWrongJobProfileErrorMessage(jobExecution, parsedRecords));
})
.compose(parsedRecords -> ensureMappingMetaDataSnapshot(jobExecution.getId(), parsedRecords, params)
.map(parsedRecords))
.onSuccess(parsedRecords -> {
Expand Down Expand Up @@ -209,6 +220,13 @@ public Future<List<Record>> parseRawRecordsChunkForJobExecution(RawRecordsDto ch
return promise.future();
}

private void saveIncomingAndJournalRecords(List<Record> parsedRecords, String tenantId) {
if (!parsedRecords.isEmpty()) {
incomingRecordService.saveBatch(JournalUtil.buildIncomingRecordsByRecords(parsedRecords), tenantId);
journalRecordService.saveBatch(JournalUtil.buildJournalRecordsByRecords(parsedRecords), tenantId);
}
}

/**
* Checks whether job profile snapshot is compatible with record type of the specified {@code records}.
* Returns {@code true} for the specified records that have not been parsed successfully and therefore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.folio.services;

import org.folio.rest.jaxrs.model.IncomingRecord;

import java.util.List;

/**
* {@link IncomingRecord} Service interface
*/
public interface IncomingRecordService {

/**
* Saves {@link IncomingRecord}s into DB
*
* @param incomingRecords incoming records to be saved
* @param tenantId tenant
*/
void saveBatch(List<IncomingRecord> incomingRecords, String tenantId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.folio.services;

import org.folio.dao.IncomingRecordDao;
import org.folio.rest.jaxrs.model.IncomingRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class IncomingRecordServiceImpl implements IncomingRecordService {

@Autowired
private IncomingRecordDao incomingRecordDao;

@Override
public void saveBatch(List<IncomingRecord> incomingRecords, String tenantId) {
incomingRecordDao.saveBatch(incomingRecords, tenantId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import io.vertx.core.Future;
import org.folio.rest.jaxrs.model.JobExecutionSummaryDto;
import org.folio.rest.jaxrs.model.JournalRecord;
import org.folio.rest.jaxrs.model.JobLogEntryDtoCollection;
import org.folio.rest.jaxrs.model.JournalRecordCollection;
import org.folio.rest.jaxrs.model.RecordProcessingLogDto;

import java.util.List;
import java.util.Optional;

/**
Expand Down Expand Up @@ -75,4 +77,12 @@ public interface JournalRecordService {
* @return Future with JournalRecords updated number
*/
Future<Integer> updateErrorJournalRecordsByOrderIdAndJobExecution(String jobExecutionId, String orderId, String error, String tenantId);

/**
* Saves set of {@link JournalRecord} entities
*
* @param journalRecords journal records to save
* @param tenantId tenant id
*/
void saveBatch(List<JournalRecord> journalRecords, String tenantId);
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package org.folio.services;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import org.folio.dao.JournalRecordDao;
import org.folio.rest.jaxrs.model.JobExecutionSummaryDto;
import org.folio.rest.jaxrs.model.JournalRecord;
import org.folio.rest.jaxrs.model.JobLogEntryDtoCollection;
import org.folio.rest.jaxrs.model.JournalRecordCollection;
import org.folio.rest.jaxrs.model.RecordProcessingLogDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Optional;

/**
Expand Down Expand Up @@ -55,4 +56,9 @@ public Future<Optional<JobExecutionSummaryDto>> getJobExecutionSummaryDto(String
public Future<Integer> updateErrorJournalRecordsByOrderIdAndJobExecution(String jobExecutionId, String orderId, String error, String tenantId) {
return journalRecordDao.updateErrorJournalRecordsByOrderIdAndJobExecution(jobExecutionId, orderId, error, tenantId);
}

@Override
public void saveBatch(List<JournalRecord> journalRecords, String tenantId) {
journalRecordDao.saveBatch(journalRecords, tenantId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import org.apache.logging.log4j.Logger;
import org.folio.DataImportEventPayload;
import org.folio.rest.jaxrs.model.DataImportEventTypes;
import org.folio.rest.jaxrs.model.IncomingRecord;
import org.folio.rest.jaxrs.model.JournalRecord;
import org.folio.rest.jaxrs.model.Record;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -62,6 +64,45 @@ private static String extractRecord(HashMap<String, String> context) {
.orElse(EMPTY);
}

public static List<JournalRecord> buildJournalRecordsByRecords(List<Record> records) {
return records.stream().map(record -> {
JournalRecord journalRecord = new JournalRecord()
.withId(UUID.randomUUID().toString())
.withJobExecutionId(record.getSnapshotId())
.withSourceId(record.getId())
.withSourceRecordOrder(record.getOrder())
.withActionType(JournalRecord.ActionType.PARSE)
.withActionDate(new Date())
.withActionStatus(record.getErrorRecord() == null ? JournalRecord.ActionStatus.COMPLETED : JournalRecord.ActionStatus.ERROR);
if (record.getRecordType() != null) {
Arrays.stream(JournalRecord.EntityType.values())
.filter(v -> v.value().startsWith(record.getRecordType().value()))
.findFirst().ifPresent(journalRecord::setEntityType);
}
if (record.getErrorRecord() != null) {
journalRecord.setError(record.getErrorRecord().getDescription());
}
return journalRecord;
}).toList();
}

public static List<IncomingRecord> buildIncomingRecordsByRecords(List<Record> records) {
return records.stream().map(record -> {
IncomingRecord incomingRecord = new IncomingRecord()
.withId(record.getId())
.withJobExecutionId(record.getSnapshotId())
.withOrder(record.getOrder())
.withRawRecordContent(record.getRawRecord().getContent());
if (record.getRecordType() != null) {
incomingRecord.setRecordType(IncomingRecord.RecordType.fromValue(record.getRecordType().value()));
}
if (record.getParsedRecord() != null) {
incomingRecord.setParsedRecordContent(String.valueOf(record.getParsedRecord().getContent()));
}
return incomingRecord;
}).toList();
}

public static List<JournalRecord> buildJournalRecordsByEvent(DataImportEventPayload eventPayload, JournalRecord.ActionType actionType, JournalRecord.EntityType entityType,
JournalRecord.ActionStatus actionStatus) throws JournalRecordMapperException {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS incoming_records (
id uuid NOT NULL,
job_execution_id uuid NOT NULL,
incoming_record jsonb NOT NULL,
CONSTRAINT incoming_records_pkey PRIMARY KEY (id),
CONSTRAINT incoming_records_jobexecutionid_fkey FOREIGN KEY (job_execution_id)
REFERENCES job_execution (id)
);

CREATE INDEX IF NOT EXISTS incoming_records_jobexecutionid_index ON incoming_records USING BTREE (job_execution_id);
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@
"run": "after",
"snippet": "ALTER TABLE journal_records ADD COLUMN IF NOT EXISTS tenant_id text;",
"fromModuleVersion": "mod-source-record-manager-3.7.0"
},
{
"run": "after",
"snippetPath": "create_incoming_records_table.sql",
"fromModuleVersion": "mod-source-record-manager-3.8.0"
}
]
}
Loading

0 comments on commit bb0745e

Please sign in to comment.