Skip to content

Commit

Permalink
feat(reindex): Extend range tables with the status column (#717)
Browse files Browse the repository at this point in the history
* feat(reindex): Extend range tables with the status column

- add status, failCause columns to merge/upload range tables
- implement status population logic

Closes: MSEARCH-870
  • Loading branch information
viacheslavkol authored Dec 23, 2024
1 parent 4982d9f commit a60ac4c
Show file tree
Hide file tree
Showing 20 changed files with 222 additions and 36 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Features
* Move Instance sub-entities population from database trigger to code ([MSEARCH-887](https://folio-org.atlassian.net/browse/MSEARCH-887))
* Update reindex merge failed status only for failed entity type ([MSEARCH-909](https://folio-org.atlassian.net/browse/MSEARCH-909))
* Extend reindex range tables with status, fail_cause columns ([MSEARCH-870](https://folio-org.atlassian.net/browse/MSEARCH-870))

### Bug fixes
* Remove shelving order calculation for local call-number types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.UUID;
import lombok.Data;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;

@Data
public class MergeRangeEntity {
Expand All @@ -15,6 +16,8 @@ public class MergeRangeEntity {
public static final String RANGE_UPPER_COLUMN = "upper";
public static final String CREATED_AT_COLUMN = "created_at";
public static final String FINISHED_AT_COLUMN = "finished_at";
public static final String STATUS_COLUMN = "status";
public static final String FAIL_CAUSE_COLUMN = "fail_cause";

private final UUID id;
private final ReindexEntityType entityType;
Expand All @@ -23,5 +26,7 @@ public class MergeRangeEntity {
private final String upperId;
private final Timestamp createdAt;
private Timestamp finishedAt;
private final ReindexRangeStatus status;
private final String failCause;

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.UUID;
import lombok.Data;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;

@Data
public class UploadRangeEntity {
Expand All @@ -14,12 +15,16 @@ public class UploadRangeEntity {
public static final String UPPER_BOUND_COLUMN = "upper";
public static final String CREATED_AT_COLUMN = "created_at";
public static final String FINISHED_AT_COLUMN = "finished_at";
public static final String STATUS_COLUMN = "status";
public static final String FAIL_CAUSE_COLUMN = "fail_cause";

private final UUID id;
private final ReindexEntityType entityType;
private final String lower;
private final String upper;
private final Timestamp createdAt;
private Timestamp finishedAt;
private final ReindexRangeStatus status;
private final String failCause;

}
28 changes: 28 additions & 0 deletions src/main/java/org/folio/search/model/types/ReindexRangeStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.folio.search.model.types;

import lombok.Getter;

@Getter
public enum ReindexRangeStatus {
SUCCESS("Success"),
FAIL("Fail");

private final String value;

ReindexRangeStatus(String value) {
this.value = value;
}

public static ReindexRangeStatus valueOfNullable(String value) {
if (value == null) {
return null;
}

for (ReindexRangeStatus b : ReindexRangeStatus.values()) {
if (b.name().equalsIgnoreCase(value)) {
return b;
}
}
throw new IllegalArgumentException("Unexpected value '" + value + "'");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.folio.search.model.reindex.MergeRangeEntity;
import org.folio.search.model.types.InventoryRecordType;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;
import org.folio.search.service.InstanceChildrenResourceService;
import org.folio.search.service.reindex.jdbc.MergeRangeRepository;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -67,9 +68,9 @@ public List<MergeRangeEntity> fetchMergeRanges(ReindexEntityType entityType) {
return repositories.get(entityType).getMergeRanges();
}

public void updateFinishDate(ReindexEntityType entityType, String rangeId) {
public void updateStatus(ReindexEntityType entityType, String rangeId, ReindexRangeStatus status, String failCause) {
var repository = repositories.get(entityType);
repository.setIndexRangeFinishDate(UUID.fromString(rangeId), Timestamp.from(Instant.now()));
repository.updateRangeStatus(UUID.fromString(rangeId), Timestamp.from(Instant.now()), status, failCause);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -110,7 +111,7 @@ private MergeRangeEntity mergeEntity(InventoryRecordType recordType, String tena

private MergeRangeEntity mergeEntity(UUID id, InventoryRecordType recordType, String tenantId, String lowerId,
String upperId, Timestamp createdAt) {
return new MergeRangeEntity(id, asEntityType(recordType), tenantId, lowerId, upperId, createdAt);
return new MergeRangeEntity(id, asEntityType(recordType), tenantId, lowerId, upperId, createdAt, null, null);
}

private ReindexEntityType asEntityType(InventoryRecordType recordType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.folio.search.model.event.ReindexRangeIndexEvent;
import org.folio.search.model.event.ReindexRecordsEvent;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;
import org.folio.search.repository.PrimaryResourceRepository;
import org.folio.search.service.converter.MultiTenantSearchDocumentConverter;
import org.folio.spring.FolioExecutionContext;
Expand All @@ -34,13 +35,14 @@ public boolean process(ReindexRangeIndexEvent event) {
var resourceEvents = uploadRangeService.fetchRecordRange(event);
var documents = documentConverter.convert(resourceEvents).values().stream().flatMap(Collection::stream).toList();
var folioIndexOperationResponse = elasticRepository.indexResources(documents);
uploadRangeService.updateFinishDate(event);
if (folioIndexOperationResponse.getStatus() == FolioIndexOperationResponse.StatusEnum.ERROR) {
log.warn("process:: ReindexRangeIndexEvent indexing error [id: {}, error: {}]",
event.getId(), folioIndexOperationResponse.getErrorMessage());
uploadRangeService.updateStatus(event, ReindexRangeStatus.FAIL, folioIndexOperationResponse.getErrorMessage());
reindexStatusService.updateReindexUploadFailed(event.getEntityType());
throw new ReindexException(folioIndexOperationResponse.getErrorMessage());
}
uploadRangeService.updateStatus(event, ReindexRangeStatus.SUCCESS, null);

log.info("process:: ReindexRangeIndexEvent processed [id: {}]", event.getId());
reindexStatusService.addProcessedUploadRanges(event.getEntityType(), 1);
Expand All @@ -55,7 +57,7 @@ public boolean process(ReindexRecordsEvent event) {
try {
mergeRangeService.saveEntities(event);
reindexStatusService.addProcessedMergeRanges(entityType, 1);
mergeRangeService.updateFinishDate(entityType, event.getRangeId());
mergeRangeService.updateStatus(entityType, event.getRangeId(), ReindexRangeStatus.SUCCESS, null);
log.info("process:: ReindexRecordsEvent processed [rangeId: {}, recordType: {}]",
event.getRangeId(), event.getRecordType());
if (reindexStatusService.isMergeCompleted()) {
Expand All @@ -69,7 +71,7 @@ public boolean process(ReindexRecordsEvent event) {
log.error(new FormattedMessage("process:: ReindexRecordsEvent indexing error [rangeId: {}, error: {}]",
event.getRangeId(), ex.getMessage()), ex);
reindexStatusService.updateReindexMergeFailed(entityType);
mergeRangeService.updateFinishDate(entityType, event.getRangeId());
mergeRangeService.updateStatus(entityType, event.getRangeId(), ReindexRangeStatus.FAIL, ex.getMessage());
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.folio.search.model.event.ReindexRangeIndexEvent;
import org.folio.search.model.reindex.UploadRangeEntity;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;
import org.folio.search.service.reindex.jdbc.UploadRangeRepository;
import org.folio.spring.tools.kafka.FolioMessageProducer;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -56,9 +57,9 @@ public Collection<ResourceEvent> fetchRecordRange(ReindexRangeIndexEvent rangeIn
.toList();
}

public void updateFinishDate(ReindexRangeIndexEvent event) {
public void updateStatus(ReindexRangeIndexEvent event, ReindexRangeStatus status, String failCause) {
var repository = repositories.get(event.getEntityType());
repository.setIndexRangeFinishDate(event.getId(), Timestamp.from(Instant.now()));
repository.updateRangeStatus(event.getId(), Timestamp.from(Instant.now()), status, failCause);
}

private List<ReindexRangeIndexEvent> prepareEvents(List<UploadRangeEntity> uploadRanges) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.UUID;
import org.folio.search.model.reindex.MergeRangeEntity;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;
import org.folio.search.utils.JdbcUtils;
import org.folio.search.utils.JsonConverter;
import org.folio.spring.FolioExecutionContext;
Expand Down Expand Up @@ -102,7 +103,9 @@ private RowMapper<MergeRangeEntity> mergeRangeEntityRowMapper() {
rs.getString(MergeRangeEntity.TENANT_ID_COLUMN),
rs.getString(MergeRangeEntity.RANGE_LOWER_COLUMN),
rs.getString(MergeRangeEntity.RANGE_UPPER_COLUMN),
rs.getTimestamp(MergeRangeEntity.CREATED_AT_COLUMN)
rs.getTimestamp(MergeRangeEntity.CREATED_AT_COLUMN),
ReindexRangeStatus.valueOfNullable(rs.getString(MergeRangeEntity.STATUS_COLUMN)),
rs.getString(MergeRangeEntity.FAIL_CAUSE_COLUMN)
);
mergeRange.setFinishedAt(rs.getTimestamp(MergeRangeEntity.FINISHED_AT_COLUMN));
return mergeRange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Optional;
import java.util.UUID;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;
import org.folio.search.utils.JdbcUtils;
import org.folio.search.utils.JsonConverter;
import org.folio.spring.FolioExecutionContext;
Expand All @@ -15,7 +16,11 @@ public abstract class ReindexJdbcRepository {

protected static final int BATCH_OPERATION_SIZE = 100;
private static final String COUNT_SQL = "SELECT COUNT(*) FROM %s;";
private static final String UPDATE_FINISHED_AT_RANGE_SQL = "UPDATE %s SET finished_at = ? WHERE id = ?;";
private static final String UPDATE_STATUS_SQL = """
UPDATE %s
SET finished_at = ?, status = ?, fail_cause = ?
WHERE id = ?;
""";

protected final JsonConverter jsonConverter;
protected final FolioExecutionContext context;
Expand All @@ -40,9 +45,9 @@ public void truncate() {
JdbcUtils.truncateTable(entityTable(), jdbcTemplate, context);
}

public void setIndexRangeFinishDate(UUID id, Timestamp timestamp) {
var sql = UPDATE_FINISHED_AT_RANGE_SQL.formatted(getFullTableName(context, rangeTable()));
jdbcTemplate.update(sql, timestamp, id);
public void updateRangeStatus(UUID id, Timestamp timestamp, ReindexRangeStatus status, String failCause) {
var sql = UPDATE_STATUS_SQL.formatted(getFullTableName(context, rangeTable()));
jdbcTemplate.update(sql, timestamp, status.name(), failCause, id);
}

public abstract ReindexEntityType entityType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import static org.folio.search.model.reindex.UploadRangeEntity.CREATED_AT_COLUMN;
import static org.folio.search.model.reindex.UploadRangeEntity.ENTITY_TYPE_COLUMN;
import static org.folio.search.model.reindex.UploadRangeEntity.FAIL_CAUSE_COLUMN;
import static org.folio.search.model.reindex.UploadRangeEntity.FINISHED_AT_COLUMN;
import static org.folio.search.model.reindex.UploadRangeEntity.ID_COLUMN;
import static org.folio.search.model.reindex.UploadRangeEntity.LOWER_BOUND_COLUMN;
import static org.folio.search.model.reindex.UploadRangeEntity.STATUS_COLUMN;
import static org.folio.search.model.reindex.UploadRangeEntity.UPPER_BOUND_COLUMN;
import static org.folio.search.service.reindex.ReindexConstants.UPLOAD_RANGE_TABLE;
import static org.folio.search.utils.JdbcUtils.getFullTableName;
Expand All @@ -21,6 +23,7 @@
import org.folio.search.model.index.InstanceSubResource;
import org.folio.search.model.reindex.UploadRangeEntity;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;
import org.folio.search.service.reindex.RangeGenerator;
import org.folio.search.utils.JsonConverter;
import org.folio.spring.FolioExecutionContext;
Expand Down Expand Up @@ -92,7 +95,9 @@ private RowMapper<UploadRangeEntity> uploadRangeRowMapper() {
ReindexEntityType.fromValue(rs.getString(ENTITY_TYPE_COLUMN)),
rs.getString(LOWER_BOUND_COLUMN),
rs.getString(UPPER_BOUND_COLUMN),
rs.getTimestamp(CREATED_AT_COLUMN)
rs.getTimestamp(CREATED_AT_COLUMN),
ReindexRangeStatus.valueOfNullable(rs.getString(STATUS_COLUMN)),
rs.getString(FAIL_CAUSE_COLUMN)
);
uploadRange.setFinishedAt(rs.getTimestamp(FINISHED_AT_COLUMN));
return uploadRange;
Expand All @@ -103,7 +108,7 @@ private List<UploadRangeEntity> prepareAndSaveUploadRanges() {
var ranges = createRanges()
.stream()
.map(range -> new UploadRangeEntity(UUID.randomUUID(), entityType(), range.lowerBound(), range.upperBound(),
Timestamp.from(Instant.now())))
Timestamp.from(Instant.now()), null, null))
.toList();

upsertUploadRanges(ranges);
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/changelog/changelog-master.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
<include file="changes/v3.2/create_browse_config_table.xml" relativeToChangelogFile="true"/>
<include file="changes/v4.0/create-reindex-entity-tables.xml" relativeToChangelogFile="true"/>
<include file="changes/v4.0/delete-instance-trigger.xml" relativeToChangelogFile="true"/>
<include file="changes/v4.0/add-reindex-range-statuses.xml" relativeToChangelogFile="true"/>
<include file="changes/v4.1/alter-browse-config-type-ids.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.27.xsd">

<changeSet id="MSEARCH-870@@add-merge_range-status" author="viacheslav_kolesnyk">
<preConditions onFail="MARK_RAN">
<and>
<tableExists tableName="merge_range"/>
<not>
<columnExists tableName="merge_range" columnName="status" />
</not>
</and>
</preConditions>

<addColumn tableName="merge_range">
<column name="status" type="VARCHAR(20)"/>
<column name="fail_cause" type="TEXT"/>
</addColumn>
</changeSet>

<changeSet id="MSEARCH-870@@add-upload_range-status" author="viacheslav_kolesnyk">
<preConditions onFail="MARK_RAN">
<and>
<tableExists tableName="upload_range"/>
<not>
<columnExists tableName="upload_range" columnName="status" />
</not>
</and>
</preConditions>

<addColumn tableName="upload_range">
<column name="status" type="VARCHAR(20)"/>
<column name="fail_cause" type="TEXT"/>
</addColumn>
</changeSet>

</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void publishReindexRecordsRange_ShouldIgnoreInvalidInput() {
void publishReindexRecordsRange_ValidExecutionPath() {
var id = UUID.randomUUID();
var validRange = new MergeRangeEntity(id, INSTANCE, TENANT_ID, "low", "high", Timestamp.from(
Instant.now()));
Instant.now()), null, null);
var request = constructRequest(id.toString(), INSTANCE.getType(), "low", "high");
doNothing().when(reindexRecordsClient).publishReindexRecords(request);

Expand All @@ -92,7 +92,7 @@ void publishReindexRecordsRange_ValidExecutionPath() {
void publishReindexRecordsRange_ShouldRetryOnFailure() {
var id = UUID.randomUUID();
var validRange = new MergeRangeEntity(id, INSTANCE, TENANT_ID, "low", "high", Timestamp.from(
Instant.now()));
Instant.now()), null, null);
var request = constructRequest(id.toString(), INSTANCE.getType(), "low", "high");
doThrow(new RuntimeException("API failure")).when(reindexRecordsClient).publishReindexRecords(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.folio.search.model.reindex.MergeRangeEntity;
import org.folio.search.model.types.InventoryRecordType;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;
import org.folio.search.service.InstanceChildrenResourceService;
import org.folio.search.service.reindex.jdbc.HoldingRepository;
import org.folio.search.service.reindex.jdbc.ItemRepository;
Expand Down Expand Up @@ -99,14 +100,16 @@ void createMergeRanges_positive() {
}

@Test
void updateFinishDate() {
void updateStatus() {
var testStartTime = Timestamp.from(Instant.now());
var rangeId = UUID.randomUUID();
var captor = ArgumentCaptor.<Timestamp>captor();
var failCause = "fail cause";

service.updateFinishDate(ReindexEntityType.INSTANCE, rangeId.toString());
service.updateStatus(ReindexEntityType.INSTANCE, rangeId.toString(), ReindexRangeStatus.FAIL, failCause);

verify(instanceRepository).setIndexRangeFinishDate(eq(rangeId), captor.capture());
verify(instanceRepository)
.updateRangeStatus(eq(rangeId), captor.capture(), eq(ReindexRangeStatus.FAIL), eq(failCause));

var timestamp = captor.getValue();
assertThat(timestamp).isAfterOrEqualTo(testStartTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.folio.search.model.types.IndexActionType;
import org.folio.search.model.types.IndexingDataFormat;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexRangeStatus;
import org.folio.search.repository.PrimaryResourceRepository;
import org.folio.search.service.converter.MultiTenantSearchDocumentConverter;
import org.folio.spring.testing.type.UnitTest;
Expand Down Expand Up @@ -107,7 +108,8 @@ void process_positive_reindexRecordsEvent() {

verify(mergeRangeIndexService).saveEntities(event);
verify(reindexStatusService).addProcessedMergeRanges(ReindexEntityType.INSTANCE, 1);
verify(mergeRangeIndexService).updateFinishDate(ReindexEntityType.INSTANCE, event.getRangeId());
verify(mergeRangeIndexService)
.updateStatus(ReindexEntityType.INSTANCE, event.getRangeId(), ReindexRangeStatus.SUCCESS, null);
}

@Test
Expand All @@ -116,12 +118,14 @@ void process_negative_reindexRecordsEvent_shouldFailMergeOnException() {
event.setRangeId(UUID.randomUUID().toString());
event.setRecordType(ReindexRecordsEvent.ReindexRecordType.INSTANCE);
event.setRecords(emptyList());
doThrow(new RuntimeException()).when(mergeRangeIndexService).saveEntities(event);
var failCause = "exception occurred";
doThrow(new RuntimeException(failCause)).when(mergeRangeIndexService).saveEntities(event);

service.process(event);

verify(reindexStatusService).updateReindexMergeFailed(ReindexEntityType.INSTANCE);
verify(mergeRangeIndexService).updateFinishDate(ReindexEntityType.INSTANCE, event.getRangeId());
verify(mergeRangeIndexService)
.updateStatus(ReindexEntityType.INSTANCE, event.getRangeId(), ReindexRangeStatus.FAIL, failCause);
verifyNoMoreInteractions(reindexStatusService);
}

Expand Down
Loading

0 comments on commit a60ac4c

Please sign in to comment.