Skip to content

Commit

Permalink
update recover state - move to generate accessions
Browse files Browse the repository at this point in the history
  • Loading branch information
nitin-ebi committed Apr 12, 2024
1 parent 3246b01 commit 1111fd5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,56 +47,21 @@ public class MonotonicAccessionGenerator<MODEL> implements AccessionGenerator<MO
private final String categoryId;
private final String applicationInstanceId;
private final ContiguousIdBlockService blockService;
private MonotonicDatabaseService monotonicDatabaseService;

private boolean SHUTDOWN = false;
private boolean RUN_RECOVERY = true;

public MonotonicAccessionGenerator(String categoryId,
String applicationInstanceId,
ContiguousIdBlockService contiguousIdBlockService,
MonotonicDatabaseService databaseService) {
this(categoryId, applicationInstanceId, contiguousIdBlockService);
// As we are going through the available ranges and at the same time we are also going to manipulate/update them
// Need to make a copy of the original for iteration to avoid ConcurrentModificationException
MonotonicRangePriorityQueue copyOfAvailableRanges = new MonotonicRangePriorityQueue();
for (MonotonicRange range : getAvailableRanges()) {
copyOfAvailableRanges.offer(range);
}
for (MonotonicRange monotonicRange : copyOfAvailableRanges) {
recoverState(databaseService.getAccessionsInRanges(Collections.singletonList(monotonicRange)));
}
}

public MonotonicAccessionGenerator(String categoryId,
String applicationInstanceId,
ContiguousIdBlockService contiguousIdBlockService,
long[] initializedAccessions) {
this(categoryId, applicationInstanceId, contiguousIdBlockService);
if (initializedAccessions != null) {
recoverState(initializedAccessions);
}
}

//Package protected for testing without initialized Accessions
MonotonicAccessionGenerator(String categoryId,
String applicationInstanceId,
ContiguousIdBlockService contiguousIdBlockService) {
MonotonicDatabaseService monotonicDatabaseService) {
this.categoryId = categoryId;
this.applicationInstanceId = applicationInstanceId;
this.blockService = contiguousIdBlockService;
this.blockManager = initializeBlockManager(blockService, categoryId, applicationInstanceId);
}

private static BlockManager initializeBlockManager(ContiguousIdBlockService blockService, String categoryId,
String applicationInstanceId) {
this.monotonicDatabaseService = monotonicDatabaseService;
assertBlockParametersAreInitialized(blockService, categoryId);
BlockManager blockManager = new BlockManager();
List<ContiguousIdBlock> uncompletedBlocks = blockService
.reserveUncompletedBlocksForCategoryIdAndApplicationInstanceId(categoryId, applicationInstanceId);
//Insert as available ranges
for (ContiguousIdBlock block : uncompletedBlocks) {
blockManager.addBlock(block);
}
return blockManager;
this.blockManager = new BlockManager();
}

private static void assertBlockParametersAreInitialized(ContiguousIdBlockService blockService, String categoryId) {
Expand All @@ -105,20 +70,43 @@ private static void assertBlockParametersAreInitialized(ContiguousIdBlockService
}
}

private void recoverState() {
if (RUN_RECOVERY && monotonicDatabaseService != null) {
List<ContiguousIdBlock> uncompletedBlocks = blockService
.reserveUncompletedBlocksForCategoryIdAndApplicationInstanceId(categoryId, applicationInstanceId);
//Insert as available ranges
for (ContiguousIdBlock block : uncompletedBlocks) {
blockManager.addBlock(block);
}
// As we are going through the available ranges and at the same time we are also going to manipulate/update them
// Need to make a copy of the original for iteration to avoid ConcurrentModificationException
MonotonicRangePriorityQueue copyOfAvailableRanges = new MonotonicRangePriorityQueue();
for (MonotonicRange range : getAvailableRanges()) {
copyOfAvailableRanges.offer(range);
}
for (MonotonicRange monotonicRange : copyOfAvailableRanges) {
recoverStateForElements(monotonicDatabaseService.getAccessionsInRanges(Collections.singletonList(monotonicRange)));
}

RUN_RECOVERY = false;
}
}

/**
* This function will recover the internal state of committed elements and will remove them from the available
* ranges.
*
* @param committedElements
* @throws AccessionIsNotPendingException
*/
private void recoverState(long[] committedElements) throws AccessionIsNotPendingException {
private void recoverStateForElements(long[] committedElements) throws AccessionIsNotPendingException {
blockService.save(blockManager.recoverState(committedElements));
}

public synchronized long[] generateAccessions(int numAccessionsToGenerate)
throws AccessionCouldNotBeGeneratedException {
checkAccessionGeneratorNotShutDown();
recoverState();
long[] accessions = new long[numAccessionsToGenerate];
reserveNewBlocksUntilSizeIs(numAccessionsToGenerate);

Expand Down Expand Up @@ -186,7 +174,7 @@ public synchronized void postSave(SaveResponse<Long> response) {
release(response.getSaveFailedAccessions().stream().mapToLong(l -> l).toArray());
}

public void shutDownAccessionGenerator(){
public void shutDownAccessionGenerator() {
List<ContiguousIdBlock> blockList = blockManager.getAssignedBlocks();
blockList.stream().forEach(block -> block.releaseReserved());
blockService.save(blockList);
Expand All @@ -198,8 +186,8 @@ public void shutDownAccessionGenerator(){
* Before doing any operation on Accession Generator, we need to make sure it has not been shut down.
* We should make the check by calling this method as the first thing in all public methods of this class
*/
private void checkAccessionGeneratorNotShutDown(){
if(SHUTDOWN){
private void checkAccessionGeneratorNotShutDown() {
if (SHUTDOWN) {
throw new AccessionGeneratorShutDownException("Accession Generator has been shut down and is no longer available");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testUnknownCategory() throws AccessionCouldNotBeGeneratedException {
}

@Test
public void testRecoverState() {
public void testRecoverState() throws AccessionCouldNotBeGeneratedException {
String categoryId = "eva_2";
String instanceId2 = "test-instance_2";

Expand Down Expand Up @@ -102,6 +102,7 @@ public void testRecoverState() {

// run recover state
MonotonicAccessionGenerator generator = getGenerator(categoryId, instanceId2);
generator.generateAccessions(0);

// As we have already saved accessions in db from 100 to 124, the status should be
// block-1 (100 to 109) : fully complete
Expand Down Expand Up @@ -174,7 +175,7 @@ public void testAlternateRangesWithDifferentGenerators() throws AccessionCouldNo
}

@Test
public void testInitializeBlockManagerInMonotonicAccessionGenerator() {
public void testInitializeBlockManagerInMonotonicAccessionGenerator() throws AccessionCouldNotBeGeneratedException {
String categoryId = "eva_2";
String instanceId2 = "test-instance_2";

Expand All @@ -193,7 +194,8 @@ public void testInitializeBlockManagerInMonotonicAccessionGenerator() {
assertEquals(false, unreservedAndNotFullBlocks.get(0).isReserved());

// this will run the recover state
BasicAccessioningService accService = getAccessioningService(categoryId, instanceId2);
MonotonicAccessionGenerator monotonicAccessionGenerator = getGenerator(categoryId, instanceId2);
monotonicAccessionGenerator.generateAccessions(0);

// assert block gets reserved after recover state
blockInDBList = getAllBlocksForCategoryId(contiguousIdBlockRepository, categoryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionCouldNotBeGeneratedException;
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionGeneratorShutDownException;
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionIsNotPendingException;
import uk.ac.ebi.ampt2d.commons.accession.core.models.AccessionWrapper;
import uk.ac.ebi.ampt2d.commons.accession.core.models.SaveResponse;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.repositories.ContiguousIdBlockRepository;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService;
import uk.ac.ebi.ampt2d.commons.accession.service.BasicSpringDataRepositoryMonotonicDatabaseService;
import uk.ac.ebi.ampt2d.commons.accession.utils.exceptions.ExponentialBackOffMaxRetriesRuntimeException;
import uk.ac.ebi.ampt2d.test.configuration.MonotonicAccessionGeneratorTestConfiguration;
import uk.ac.ebi.ampt2d.test.configuration.TestMonotonicDatabaseServiceTestConfiguration;

import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -44,10 +47,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -57,7 +58,7 @@

@RunWith(SpringRunner.class)
@DataJpaTest
@ContextConfiguration(classes = {MonotonicAccessionGeneratorTestConfiguration.class})
@ContextConfiguration(classes = {MonotonicAccessionGeneratorTestConfiguration.class, TestMonotonicDatabaseServiceTestConfiguration.class})
public class MonotonicAccessionGeneratorTest {

private static final int BLOCK_SIZE = 1000;
Expand All @@ -74,6 +75,9 @@ public class MonotonicAccessionGeneratorTest {
@Autowired
private ContiguousIdBlockService service;

@Autowired
private BasicSpringDataRepositoryMonotonicDatabaseService monotonicDBService;

@Test
public void assertNoBlockGeneratedAtLoadIfNoneExists() throws Exception {
MonotonicAccessionGenerator generator = getMonotonicAccessionGenerator();
Expand All @@ -97,7 +101,7 @@ public void assertBlockNotGeneratedIfPreviousExists() throws Exception {
repository.save(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, BLOCK_SIZE));
assertEquals(1, repository.count());
MonotonicAccessionGenerator generator = new MonotonicAccessionGenerator(
CATEGORY_ID, INSTANCE_ID, service);
CATEGORY_ID, INSTANCE_ID, service, monotonicDBService);

assertEquals(1, repository.count());
}
Expand All @@ -108,9 +112,9 @@ public void assertNewBlockGeneratedInSecondInstance() throws Exception {
assertEquals(0, repository.count());

MonotonicAccessionGenerator generator1 = new MonotonicAccessionGenerator(
CATEGORY_ID, INSTANCE_ID, service);
CATEGORY_ID, INSTANCE_ID, service, monotonicDBService);
MonotonicAccessionGenerator generator2 = new MonotonicAccessionGenerator(
CATEGORY_ID, INSTANCE_2_ID, service);
CATEGORY_ID, INSTANCE_2_ID, service, monotonicDBService);

generator1.generateAccessions(TENTH_BLOCK_SIZE);
assertEquals(1, repository.count());
Expand Down Expand Up @@ -306,12 +310,11 @@ public void assertRecoverNoPendingCommit() throws Exception {
// Now assume that the db layer has stored some elements and that the application has died and restarted.

MonotonicAccessionGenerator generatorRecovering =
new MonotonicAccessionGenerator(CATEGORY_ID, INSTANCE_ID, service, new long[]{2, 3, 5});
new MonotonicAccessionGenerator(CATEGORY_ID, INSTANCE_ID, service, monotonicDBService);
ContiguousIdBlock block = findFirstByCategoryIdAndApplicationInstanceIdOrderByLastValueDesc(CATEGORY_ID);
assertEquals(-1, block.getLastCommitted());
generatorRecovering.generateAccessions(0);
assertFalse(generatorRecovering.getAvailableRanges().isEmpty());
assertThat(generatorRecovering.getAvailableRanges(),
contains(new MonotonicRange(0, 1), new MonotonicRange(4, 4), new MonotonicRange(6, BLOCK_SIZE - 1)));
}

@Test
Expand All @@ -323,11 +326,14 @@ public void assertRecoverPendingCommit() throws Exception {
// Now assume that the db layer has stored some elements and that the application has died and restarted.

MonotonicAccessionGenerator generatorRecovering = new MonotonicAccessionGenerator(
CATEGORY_ID, INSTANCE_ID, service, new long[]{2, 3, 5});
CATEGORY_ID, INSTANCE_ID, service, monotonicDBService);
ContiguousIdBlock block = findFirstByCategoryIdAndApplicationInstanceIdOrderByLastValueDesc(CATEGORY_ID);
assertEquals(3, block.getLastCommitted());
assertThat(generatorRecovering.getAvailableRanges(),
contains(new MonotonicRange(4, 4), new MonotonicRange(6, BLOCK_SIZE - 1)));
assertEquals(1, block.getLastCommitted());
generatorRecovering.generateAccessions(0);
assertEquals(1, generatorRecovering.getAvailableRanges().size());
MonotonicRange monotonicRange = generatorRecovering.getAvailableRanges().peek();
assertEquals(2, monotonicRange.getStart());
assertEquals(BLOCK_SIZE - 1, monotonicRange.getEnd());
}

@Test(expected = AccessionIsNotPendingException.class)
Expand Down Expand Up @@ -368,7 +374,7 @@ public void assertGenerateWithObjects() throws Exception {
assertEquals(0, repository.count());

MonotonicAccessionGenerator<String> generator =
new MonotonicAccessionGenerator(CATEGORY_ID, INSTANCE_ID, service);
new MonotonicAccessionGenerator(CATEGORY_ID, INSTANCE_ID, service, monotonicDBService);

HashMap<String, String> objects = new HashMap<>();
objects.put("hash1", "object2");
Expand Down Expand Up @@ -419,42 +425,44 @@ public void assertReleaseInAlternateRanges() throws Exception {
@Test
public void assertRecoverInAlternateRanges() throws Exception {
MonotonicAccessionGenerator generator = getMonotonicAccessionGeneratorForCategoryHavingBlockInterval();
long[] accessions1 = generator.generateAccessions(NUM_OF_ACCESSIONS);
long[] accessions1 = generator.generateAccessions(6);
generator.shutDownAccessionGenerator();
// Now assume that the db layer has stored some elements and that the application has died and restarted.
MonotonicAccessionGenerator generatorRecovering =
new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service, new long[]{2, 3});
long[] accessions2 = generatorRecovering.generateAccessions(NUM_OF_ACCESSIONS);
new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service, monotonicDBService);
long[] accessions2 = generatorRecovering.generateAccessions(6);
assertEquals(1, accessions2[0]);
assertEquals(4, accessions2[1]);
assertEquals(5, accessions2[2]);
assertEquals(11, accessions2[3]);
assertEquals(2, accessions2[1]);
assertEquals(3, accessions2[2]);
assertEquals(4, accessions2[3]);
assertEquals(5, accessions2[4]);
assertEquals(11, accessions2[5]);
}

private MonotonicAccessionGenerator getMonotonicAccessionGenerator() throws Exception {
assertEquals(0, repository.count());

MonotonicAccessionGenerator generator = new MonotonicAccessionGenerator(
CATEGORY_ID, INSTANCE_ID, service);
CATEGORY_ID, INSTANCE_ID, service, monotonicDBService);
return generator;
}

private MonotonicAccessionGenerator getMonotonicAccessionGeneratorForCategoryHavingBlockInterval() {
assertEquals(0, repository.count());
return new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service);
return new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service, monotonicDBService);
}

@Test
public void assertAbortExecutionWhenDBConstraintExceptionThrown() {
ContiguousIdBlockService mockService = Mockito.mock(ContiguousIdBlockService.class, Answers.RETURNS_DEEP_STUBS);
MonotonicAccessionGenerator mockGenerator = new MonotonicAccessionGenerator(CATEGORY_ID, INSTANCE_ID, mockService);
MonotonicAccessionGenerator mockGenerator = new MonotonicAccessionGenerator(CATEGORY_ID, INSTANCE_ID, mockService, monotonicDBService);
when(mockService.reserveNewBlock(anyString(), anyString())).thenThrow(ConstraintViolationException.class);
assertThrows(ExponentialBackOffMaxRetriesRuntimeException.class, () -> mockGenerator.generateAccessions(1));
assertEquals(0, repository.count());
}

@Test
public void testInitializeBlockManager() {
public void testInitializeBlockManager() throws AccessionCouldNotBeGeneratedException {
ContiguousIdBlock block = getUnreservedContiguousIdBlock(CATEGORY_ID_2, INSTANCE_ID, 0, 10);
repository.save(block);

Expand All @@ -470,8 +478,11 @@ public void testInitializeBlockManager() {
assertEquals(-1, unreservedBlocks.get(0).getLastCommitted());
assertEquals(false, unreservedBlocks.get(0).isReserved());

// Generator 1 starts and its recover state reserves the UnCompleted block
MonotonicAccessionGenerator generator1 = new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service, new long[]{});
// Generator 1 starts
MonotonicAccessionGenerator generator1 = new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service, monotonicDBService);
assertEquals(0, generator1.getAvailableRanges().size());
// its recover state reserves the UnCompleted block
generator1.generateAccessions(0);
assertEquals(1, generator1.getAvailableRanges().size());
assertEquals(new MonotonicRange(0, 9), generator1.getAvailableRanges().peek());

Expand All @@ -488,12 +499,14 @@ public void testInitializeBlockManager() {
assertEquals(true, reservedBlocks.get(0).isReserved());

// Generator-2 will not be able to reserve the un-completed block as it is currently reserved by Generator-1
MonotonicAccessionGenerator generator2 = new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service, new long[]{});
MonotonicAccessionGenerator generator2 = new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service, monotonicDBService);
generator2.generateAccessions(0);
assertEquals(0, generator2.getAvailableRanges().size());

// Generator-3 can reserve the same Uncompleted block, once Generator-1 releases it
generator1.shutDownAccessionGenerator();
MonotonicAccessionGenerator generator3 = new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service, new long[]{});
MonotonicAccessionGenerator generator3 = new MonotonicAccessionGenerator(CATEGORY_ID_2, INSTANCE_ID, service, monotonicDBService);
generator3.generateAccessions(0);
assertEquals(1, generator3.getAvailableRanges().size());
assertEquals(new MonotonicRange(0, 9), generator3.getAvailableRanges().peek());
}
Expand Down

0 comments on commit 1111fd5

Please sign in to comment.