From 3c96551e6c4f46faa48eb068066e2a0d334123f1 Mon Sep 17 00:00:00 2001 From: Jan van Mansum Date: Sun, 15 Dec 2024 14:23:04 +0100 Subject: [PATCH] Smarter wait for state. --- pom.xml | 2 +- src/main/assembly/dist/cfg/config.yml | 18 +++++---- .../DdDataverseIngestApplication.java | 5 ++- .../config/WaitForReleasedStateConfig.java | 8 +++- .../core/bagprocessor/BagProcessor.java | 6 +-- .../bagprocessor/FilesInDatasetCache.java | 9 +++++ .../core/bagprocessor/StateUpdater.java | 10 +++-- .../core/service/DataverseService.java | 4 +- .../core/service/DataverseServiceImpl.java | 39 ++++++------------- src/test/resources/debug-etc/config.yml | 14 +++++-- 10 files changed, 63 insertions(+), 52 deletions(-) diff --git a/pom.xml b/pom.xml index 3078657..197d50f 100644 --- a/pom.xml +++ b/pom.xml @@ -65,7 +65,7 @@ nl.knaw.dans dans-dataverse-client-lib - 1.1.0 + 1.2.0 nl.knaw.dans diff --git a/src/main/assembly/dist/cfg/config.yml b/src/main/assembly/dist/cfg/config.yml index dc04747..acea49f 100644 --- a/src/main/assembly/dist/cfg/config.yml +++ b/src/main/assembly/dist/cfg/config.yml @@ -63,14 +63,18 @@ ingest: # maxNumberOfFilesPerUploadBatch: 1000 maxByteSizePerUploadBatch: 500MiB + + # + # The service waits for the dataset to reach the released state before it continues processing the next deposit. These settings control how long the service waits, + # before giving up and marking the deposit as failed. + # waitForReleasedState: - # maxWaitTime = 5h - # # It is not useful to check directly after sending the publication request. The publication will verify all checksums. - # initialWaitTimePerMegaByte = 10s (or initialWaitTimePerFile = 10s) - # timeBetweenChecks = 10s - # 10s * 720 = 2 hours - maxNumberOfRetries: 720 - timeBetweenChecks: 10s + # Start polling for the dataset state after this time x number of files in the dataset. It is expected that releasing a dataset takes at least this amount of time. + leadTimePerFile: 200ms + # Give up waiting for the dataset to be in the expected state after this time. The lead time is *not* included in this timeout. + timeout: 3h + # The interval between polling the dataset state. + pollingInterval: 5s # # Settings related to the conversion of deposits from the legacy format to the format used by the ingest service. Set to null to disable. diff --git a/src/main/java/nl/knaw/dans/dvingest/DdDataverseIngestApplication.java b/src/main/java/nl/knaw/dans/dvingest/DdDataverseIngestApplication.java index 2c9ec76..645f52c 100644 --- a/src/main/java/nl/knaw/dans/dvingest/DdDataverseIngestApplication.java +++ b/src/main/java/nl/knaw/dans/dvingest/DdDataverseIngestApplication.java @@ -72,8 +72,9 @@ public void run(final DdDataverseIngestConfiguration configuration, final Enviro var dataverseService = DataverseServiceImpl.builder() .dataverseClient(dataverseClient) .metadataKeys(configuration.getIngest().getMetadataKeys()) - .millisecondsBetweenChecks(configuration.getIngest().getWaitForReleasedState().getTimeBetweenChecks().toMilliseconds()) - .maxNumberOfRetries(configuration.getIngest().getWaitForReleasedState().getMaxNumberOfRetries()) + .timeout(configuration.getIngest().getWaitForReleasedState().getTimeout().toMilliseconds()) + .leadTimePerFile(configuration.getIngest().getWaitForReleasedState().getLeadTimePerFile().toMilliseconds()) + .pollingInterval(configuration.getIngest().getWaitForReleasedState().getPollingInterval().toMilliseconds()) .build(); var utilityServices = UtilityServicesImpl.builder() .tempDir(configuration.getIngest().getTempDir()) diff --git a/src/main/java/nl/knaw/dans/dvingest/config/WaitForReleasedStateConfig.java b/src/main/java/nl/knaw/dans/dvingest/config/WaitForReleasedStateConfig.java index 446c8d3..a079749 100644 --- a/src/main/java/nl/knaw/dans/dvingest/config/WaitForReleasedStateConfig.java +++ b/src/main/java/nl/knaw/dans/dvingest/config/WaitForReleasedStateConfig.java @@ -20,7 +20,11 @@ @Data public class WaitForReleasedStateConfig { - private int maxNumberOfRetries = 10; - private Duration timeBetweenChecks = Duration.seconds(10); + private Duration timeout = Duration.minutes(30); + + private Duration leadTimePerFile = Duration.seconds(5); + + private Duration pollingInterval = Duration.seconds(10); + } diff --git a/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/BagProcessor.java b/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/BagProcessor.java index ef6a12d..1415901 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/BagProcessor.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/BagProcessor.java @@ -21,15 +21,13 @@ import nl.knaw.dans.dvingest.core.DataverseIngestBag; import nl.knaw.dans.dvingest.core.service.DataverseService; import nl.knaw.dans.dvingest.core.service.UtilityServices; -import nl.knaw.dans.dvingest.core.yaml.UpdateState; -import nl.knaw.dans.dvingest.core.yaml.UpdateStateRoot; import nl.knaw.dans.lib.dataverse.DataverseException; import java.io.IOException; import java.util.UUID; /** - * Processes a bag, creating and/or editing a dataset version in Dataverse. + * Processes a bag, creating and/or editing a dataset version in Dataverse. A BagProcessor is created for each deposit. */ @Slf4j public class BagProcessor { @@ -53,7 +51,7 @@ public String run(String targetPid) throws IOException, DataverseException { filesEditor.editFiles(targetPid); metadataEditor.editMetadata(targetPid); permissionsEditor.editPermissions(targetPid); - stateUpdater.updateState(targetPid); + stateUpdater.updateState(targetPid, filesEditor.getFilesInDatasetCache().getNumberOfFilesInDataset()); return targetPid; } } diff --git a/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/FilesInDatasetCache.java b/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/FilesInDatasetCache.java index f3b4cfd..a45ec70 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/FilesInDatasetCache.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/FilesInDatasetCache.java @@ -117,6 +117,15 @@ public void downloadFromDataset(@NonNull String pid) throws IOException, Dataver initialized = true; } + /** + * Returns the number of files in the dataset. + * + * @return the number of files in the dataset + */ + public int getNumberOfFilesInDataset() { + return filesInDataset.size(); + } + private String getPath(@NonNull FileMeta file) { var dataversePath = new DataversePath(file.getDirectoryLabel(), file.getLabel()); return dataversePath.toString(); diff --git a/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/StateUpdater.java b/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/StateUpdater.java index 2e31465..4cf4eb6 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/StateUpdater.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/bagprocessor/StateUpdater.java @@ -32,12 +32,16 @@ public class StateUpdater { private final UUID depositId; private final UpdateAction updateAction; + private final DataverseService dataverseService; private String pid; + private int numberOfFilesInDataset; - public void updateState(String pid) throws DataverseException, IOException { + public void updateState(String pid, int numberOfFilesInDataset) throws DataverseException, IOException { this.pid = pid; + this.numberOfFilesInDataset = numberOfFilesInDataset; + if (updateAction instanceof PublishAction) { publishVersion(((PublishAction) updateAction).getUpdateType()); } @@ -49,14 +53,14 @@ else if (updateAction instanceof ReleaseMigratedAction) { private void publishVersion(UpdateType updateType) throws DataverseException, IOException { log.debug("Start publishing version for deposit {}", depositId); dataverseService.publishDataset(pid, updateType); - dataverseService.waitForState(pid, "RELEASED"); + dataverseService.waitForReleasedState(pid, numberOfFilesInDataset); log.debug("End publishing version for deposit {}", depositId); } public void releaseMigrated(String date) throws DataverseException, IOException { log.debug("Start releasing migrated version for deposit {}", depositId); dataverseService.releaseMigratedDataset(pid, date); - dataverseService.waitForState(pid, "RELEASED"); + dataverseService.waitForReleasedState(pid, numberOfFilesInDataset); log.debug("End releasing migrated version for deposit {}", depositId); } } diff --git a/src/main/java/nl/knaw/dans/dvingest/core/service/DataverseService.java b/src/main/java/nl/knaw/dans/dvingest/core/service/DataverseService.java index 7475819..57d5cbc 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/service/DataverseService.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/service/DataverseService.java @@ -47,8 +47,6 @@ public interface DataverseService { String getDatasetUrnNbn(String datasetId) throws IOException, DataverseException; - void waitForState(String persistentId, String state) throws DataverseException; - void updateMetadata(String targetDatasetPid, DatasetVersion datasetMetadata) throws DataverseException, IOException; void updateFileMetadata(int id, FileMeta newMeta) throws DataverseException, IOException; @@ -80,4 +78,6 @@ public interface DataverseService { void importDataset(String pid, Dataset dataset) throws IOException, DataverseException; void releaseMigratedDataset(String pid, String date) throws DataverseException, IOException; + + void waitForReleasedState(String persistentId, int numberOfFilesInDataset) throws DataverseException, IOException; } diff --git a/src/main/java/nl/knaw/dans/dvingest/core/service/DataverseServiceImpl.java b/src/main/java/nl/knaw/dans/dvingest/core/service/DataverseServiceImpl.java index 694adce..cbc5cce 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/service/DataverseServiceImpl.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/service/DataverseServiceImpl.java @@ -54,10 +54,13 @@ public class DataverseServiceImpl implements DataverseService { private final DataverseClient dataverseClient; @Builder.Default - private int maxNumberOfRetries = 10; + private long pollingInterval = 3000; // 3 seconds @Builder.Default - private long millisecondsBetweenChecks = 3000; + private long leadTimePerFile = 200; + + @Builder.Default + private long timeout = 1800000; // 30 minutes @Builder.Default private Map metadataKeys = new HashMap<>(); @@ -231,35 +234,15 @@ public void releaseMigratedDataset(String pid, String date) throws DataverseExce log.debug(result.getEnvelopeAsString()); } - // TODO: move this to dans-dataverse-client-lib; it is similar to awaitLockState. - public void waitForState(String datasetId, String expectedState) { - var numberOfTimesTried = 0; - var state = ""; - + public void waitForReleasedState(String pid, int numberOfFilesInDataset) throws DataverseException, IOException { + long leadTime = numberOfFilesInDataset * leadTimePerFile; + log.debug("Waiting {} ms before first check", leadTime); try { - state = getDatasetState(datasetId); - log.debug("Initial state for dataset {} is {}", datasetId, state); - while (!expectedState.equals(state) && numberOfTimesTried < maxNumberOfRetries) { - log.debug("Sleeping for {} milliseconds before checking again", millisecondsBetweenChecks); - Thread.sleep(millisecondsBetweenChecks); - - state = getDatasetState(datasetId); - numberOfTimesTried += 1; - log.debug("Current state for dataset {} is {}, tried {} of {} times", datasetId, state, numberOfTimesTried, maxNumberOfRetries); - } - - if (!expectedState.equals(state)) { - throw new IllegalStateException(String.format( - "Dataset did not become %s within the wait period (%d seconds); current state is %s", - expectedState, (maxNumberOfRetries * millisecondsBetweenChecks), state - )); - } + Thread.sleep(leadTime); } catch (InterruptedException e) { - throw new RuntimeException("Dataset state check was interrupted; last know state is " + state); - } - catch (IOException | DataverseException e) { - throw new RuntimeException(e); + log.error("Interrupted during lead time. Continuing", e); } + dataverseClient.dataset(pid).awaitState("RELEASED", timeout, pollingInterval); } } diff --git a/src/test/resources/debug-etc/config.yml b/src/test/resources/debug-etc/config.yml index a23dff2..0089ab0 100644 --- a/src/test/resources/debug-etc/config.yml +++ b/src/test/resources/debug-etc/config.yml @@ -64,10 +64,18 @@ ingest: # See: https://guides.dataverse.org/en/latest/installation/config.html#zipuploadfileslimit # maxNumberOfFilesPerUploadBatch: 1000 + + # + # The service waits for the dataset to reach the released state before it continues processing the next deposit. These settings control how long the service waits, + # before giving up and marking the deposit as failed. + # waitForReleasedState: - # 10s * 360 = 1 hour - maxNumberOfRetries: 360 - timeBetweenChecks: 2s + # Start polling for the dataset state after this time x number of files in the dataset. It is expected that releasing a dataset takes at least this amount of time. + leadTimePerFile: 200ms + # Give up waiting for the dataset to be in the expected state after this time. The lead time is *not* included in this timeout. + timeout: 5m + # The interval between polling the dataset state. + pollingInterval: 1s # # Settings related to the conversion of deposits from the legacy format to the format used by the ingest service. Set to null to disable.