From 79c8d7c6271a938dfe3e5e4740cab7190005c97a Mon Sep 17 00:00:00 2001 From: Chris Ryan Date: Thu, 26 May 2022 12:27:04 +1000 Subject: [PATCH] Fix for issue: https://github.com/awslabs/amazon-dynamodb-lock-client/issues/44 --- .gitignore | 3 +- pom.xml | 24 +- .../dynamodbv2/AmazonDynamoDBLockClient.java | 293 +++++++++--------- .../AmazonDynamoDBLockClientTest.java | 12 + 4 files changed, 183 insertions(+), 149 deletions(-) diff --git a/.gitignore b/.gitignore index eadcbb3..887c776 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ .settings/* target/* .project -.classpath \ No newline at end of file +.classpath +.idea diff --git a/pom.xml b/pom.xml index 52859b3..f82eff6 100644 --- a/pom.xml +++ b/pom.xml @@ -21,8 +21,8 @@ 1.8 - 3.1.1 - 3.8.0 + 3.3.0 + 3.9.0 3.1.1 3.0.0-M2 1.6 @@ -31,23 +31,23 @@ 3.0.1 - 2.3.5 - [4.1.59.Final,) + 2.17.198 + [4.1.77.Final,) 1.2 - 1.11.119 + 1.15.0 2.23.0 2.0.0 1.3 1.9.3 - 4.13.1 - 2.17.1 - 3.0.0-M3 - 3.0.0-M3 - 0.28.0 + 4.13.2 + 2.17.2 + 3.0.0-M5 + 3.0.0-M5 + 0.38.1 dev */BasicLockClientTests.java,*/GetAllLocksTests.java,*/GetLocksByPartitionKeyTest.java,*/ConsistentLockDataStressTest.java @@ -60,6 +60,10 @@ ${project.build.directory}/coverage-reports/aggregate.exec + + Chris Ryan + chrisryan@squareup.com + Justin Lin lnjus@amazon.com diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java index da30ee4..4e431fe 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java @@ -36,14 +36,13 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -import com.amazonaws.services.dynamodbv2.model.LockCurrentlyUnavailableException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import com.amazonaws.services.dynamodbv2.GetLockOptions.GetLockOptionsBuilder; +import com.amazonaws.services.dynamodbv2.model.LockCurrentlyUnavailableException; import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException; import com.amazonaws.services.dynamodbv2.model.LockTableDoesNotExistException; import com.amazonaws.services.dynamodbv2.util.LockClientUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import software.amazon.awssdk.annotations.ThreadSafe; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.SdkBytes; @@ -146,6 +145,8 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { protected static final String NEW_RVN_VALUE_EXPRESSION_VARIABLE = ":newRvn"; protected static final String LEASE_DURATION_PATH_VALUE_EXPRESSION_VARIABLE = "#ld"; protected static final String LEASE_DURATION_VALUE_EXPRESSION_VARIABLE = ":ld"; + protected static final String LOOKUP_TIME_PATH_VALUE_EXPRESSION_VARIABLE = "#lu"; + protected static final String LOOKUP_TIME_VALUE_EXPRESSION_VARIABLE = ":lu"; protected static final String RVN_PATH_EXPRESSION_VARIABLE = "#rvn"; protected static final String RVN_VALUE_EXPRESSION_VARIABLE = ":rvn"; protected static final String OWNER_NAME_PATH_EXPRESSION_VARIABLE = "#on"; @@ -162,23 +163,23 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { //attribute_not_exists(#pk) AND attribute_not_exists(#sk) protected static final String ACQUIRE_LOCK_THAT_DOESNT_EXIST_PK_SK_CONDITION = String.format( - "attribute_not_exists(%s) AND attribute_not_exists(%s)", - PK_PATH_EXPRESSION_VARIABLE, SK_PATH_EXPRESSION_VARIABLE); + "attribute_not_exists(%s) AND attribute_not_exists(%s)", + PK_PATH_EXPRESSION_VARIABLE, SK_PATH_EXPRESSION_VARIABLE); //attribute_exists(#pk) AND #ir = :ir protected static final String PK_EXISTS_AND_IS_RELEASED_CONDITION = String.format("attribute_exists(%s) AND %s = %s", - PK_PATH_EXPRESSION_VARIABLE, IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED_VALUE_EXPRESSION_VARIABLE); + PK_PATH_EXPRESSION_VARIABLE, IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED_VALUE_EXPRESSION_VARIABLE); //attribute_exists(#pk) AND attribute_exists(#sk) AND #ir = :ir protected static final String PK_EXISTS_AND_SK_EXISTS_AND_IS_RELEASED_CONDITION = String.format( - "attribute_exists(%s) AND attribute_exists(%s) AND %s = %s", - PK_PATH_EXPRESSION_VARIABLE, SK_PATH_EXPRESSION_VARIABLE, IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED_VALUE_EXPRESSION_VARIABLE); + "attribute_exists(%s) AND attribute_exists(%s) AND %s = %s", + PK_PATH_EXPRESSION_VARIABLE, SK_PATH_EXPRESSION_VARIABLE, IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED_VALUE_EXPRESSION_VARIABLE); //attribute_exists(#pk) AND attribute_exists(#sk) AND #rvn = :rvn AND #ir = :ir protected static final String PK_EXISTS_AND_SK_EXISTS_AND_RVN_IS_THE_SAME_AND_IS_RELEASED_CONDITION = String.format( - "attribute_exists(%s) AND attribute_exists(%s) AND %s = %s AND %s = %s", - PK_PATH_EXPRESSION_VARIABLE, SK_PATH_EXPRESSION_VARIABLE, RVN_PATH_EXPRESSION_VARIABLE, RVN_VALUE_EXPRESSION_VARIABLE, - IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED_VALUE_EXPRESSION_VARIABLE); + "attribute_exists(%s) AND attribute_exists(%s) AND %s = %s AND %s = %s", + PK_PATH_EXPRESSION_VARIABLE, SK_PATH_EXPRESSION_VARIABLE, RVN_PATH_EXPRESSION_VARIABLE, RVN_VALUE_EXPRESSION_VARIABLE, + IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED_VALUE_EXPRESSION_VARIABLE); //attribute_exists(#pk) AND attribute_exists(#sk) AND #rvn = :rvn protected static final String PK_EXISTS_AND_SK_EXISTS_AND_RVN_IS_THE_SAME_CONDITION = @@ -188,13 +189,13 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { //(attribute_exists(#pk) AND attribute_exists(#sk) AND #rvn = :rvn) AND (attribute_not_exists(#if) OR #if = :if) AND #on = :on protected static final String PK_EXISTS_AND_SK_EXISTS_AND_OWNER_NAME_SAME_AND_RVN_SAME_CONDITION = String.format("%s AND %s = %s ", - PK_EXISTS_AND_SK_EXISTS_AND_RVN_IS_THE_SAME_CONDITION, OWNER_NAME_PATH_EXPRESSION_VARIABLE, OWNER_NAME_VALUE_EXPRESSION_VARIABLE); + PK_EXISTS_AND_SK_EXISTS_AND_RVN_IS_THE_SAME_CONDITION, OWNER_NAME_PATH_EXPRESSION_VARIABLE, OWNER_NAME_VALUE_EXPRESSION_VARIABLE); //(attribute_exists(#pk) AND #rvn = :rvn AND #ir = :ir) AND (attribute_not_exists(#if) OR #if = :if) protected static final String PK_EXISTS_AND_RVN_IS_THE_SAME_AND_IS_RELEASED_CONDITION = - String.format("(attribute_exists(%s) AND %s = %s AND %s = %s)", - PK_PATH_EXPRESSION_VARIABLE, RVN_PATH_EXPRESSION_VARIABLE, RVN_VALUE_EXPRESSION_VARIABLE, - IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED_VALUE_EXPRESSION_VARIABLE); + String.format("(attribute_exists(%s) AND %s = %s AND %s = %s)", + PK_PATH_EXPRESSION_VARIABLE, RVN_PATH_EXPRESSION_VARIABLE, RVN_VALUE_EXPRESSION_VARIABLE, + IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED_VALUE_EXPRESSION_VARIABLE); //attribute_exists(#pk) AND #rvn = :rvn AND (attribute_not_exists(#if) OR #if = :if) protected static final String PK_EXISTS_AND_RVN_IS_THE_SAME_CONDITION = @@ -209,12 +210,14 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { protected static final String UPDATE_IS_RELEASED = String.format("SET %s = %s", IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED_VALUE_EXPRESSION_VARIABLE); protected static final String UPDATE_IS_RELEASED_AND_DATA = String.format("%s, %s = %s", UPDATE_IS_RELEASED, DATA_PATH_EXPRESSION_VARIABLE, DATA_VALUE_EXPRESSION_VARIABLE); - protected static final String UPDATE_LEASE_DURATION_AND_RVN = String.format( - "SET %s = %s, %s = %s", - LEASE_DURATION_PATH_VALUE_EXPRESSION_VARIABLE, LEASE_DURATION_VALUE_EXPRESSION_VARIABLE, RVN_PATH_EXPRESSION_VARIABLE, NEW_RVN_VALUE_EXPRESSION_VARIABLE); - protected static final String UPDATE_LEASE_DURATION_AND_RVN_AND_REMOVE_DATA = String.format("%s REMOVE %s", UPDATE_LEASE_DURATION_AND_RVN, DATA_PATH_EXPRESSION_VARIABLE); - protected static final String UPDATE_LEASE_DURATION_AND_RVN_AND_DATA = String.format("%s, %s = %s", - UPDATE_LEASE_DURATION_AND_RVN, DATA_PATH_EXPRESSION_VARIABLE, DATA_VALUE_EXPRESSION_VARIABLE); + protected static final String UPDATE_LEASE_DURATION_AND_LOOKUP_TIME_AND_RVN = String.format( + "SET %s = %s, %s = %s, %s = %s", + LEASE_DURATION_PATH_VALUE_EXPRESSION_VARIABLE, LEASE_DURATION_VALUE_EXPRESSION_VARIABLE, + LOOKUP_TIME_PATH_VALUE_EXPRESSION_VARIABLE, LOOKUP_TIME_VALUE_EXPRESSION_VARIABLE, + RVN_PATH_EXPRESSION_VARIABLE, NEW_RVN_VALUE_EXPRESSION_VARIABLE); + protected static final String UPDATE_LEASE_DURATION_AND_LOOKUP_TIME_AND_RVN_AND_REMOVE_DATA = String.format("%s REMOVE %s", UPDATE_LEASE_DURATION_AND_LOOKUP_TIME_AND_RVN, DATA_PATH_EXPRESSION_VARIABLE); + protected static final String UPDATE_LEASE_DURATION_AND_LOOKUP_TIME_AND_RVN_AND_DATA = String.format("%s, %s = %s", + UPDATE_LEASE_DURATION_AND_LOOKUP_TIME_AND_RVN, DATA_PATH_EXPRESSION_VARIABLE, DATA_VALUE_EXPRESSION_VARIABLE); protected static final String REMOVE_IS_RELEASED_UPDATE_EXPRESSION = String.format(" REMOVE %s ", IS_RELEASED_PATH_EXPRESSION_VARIABLE); protected static final String QUERY_PK_EXPRESSION = String.format("%s = %s", PK_PATH_EXPRESSION_VARIABLE, PK_VALUE_EXPRESSION_VARIABLE); @@ -242,6 +245,7 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { protected static final String DATA = "data"; protected static final String OWNER_NAME = "ownerName"; protected static final String LEASE_DURATION = "leaseDuration"; + protected static final String LOOKUP_TIME = "lookupTime"; protected static final String RECORD_VERSION_NUMBER = "recordVersionNumber"; protected static final String IS_RELEASED = "isReleased"; protected static final String IS_RELEASED_VALUE = "1"; @@ -284,8 +288,8 @@ public AmazonDynamoDBLockClient(final AmazonDynamoDBLockClientOptions amazonDyna if (amazonDynamoDBLockClientOptions.getCreateHeartbeatBackgroundThread()) { if (this.leaseDurationInMilliseconds < 2 * this.heartbeatPeriodInMilliseconds) { throw new IllegalArgumentException("Heartbeat period must be no more than half the length of the Lease Duration, " - + "or locks might expire due to the heartbeat thread taking too long to update them (recommendation is to make it much greater, for example " - + "4+ times greater)"); + + "or locks might expire due to the heartbeat thread taking too long to update them (recommendation is to make it much greater, for example " + + "4+ times greater)"); } this.backgroundThread = Optional.of(this.startBackgroundThread()); } else { @@ -301,7 +305,7 @@ public AmazonDynamoDBLockClient(final AmazonDynamoDBLockClientOptions amazonDyna public boolean lockTableExists() { try { final DescribeTableResponse result - = this.dynamoDB.describeTable(DescribeTableRequest.builder().tableName(tableName).build()); + = this.dynamoDB.describeTable(DescribeTableRequest.builder().tableName(tableName).build()); return availableStatuses.contains(result.table().tableStatus()); } catch (final ResourceNotFoundException e) { // This exception indicates the table doesn't exist. @@ -346,36 +350,36 @@ public static void createLockTableInDynamoDB(final CreateDynamoDBTableOptions cr Objects.requireNonNull(createDynamoDBTableOptions.getPartitionKeyName(), "Hash Key Name cannot be null"); Objects.requireNonNull(createDynamoDBTableOptions.getSortKeyName(), "Sort Key Name cannot be null"); final KeySchemaElement partitionKeyElement = KeySchemaElement.builder() - .attributeName(createDynamoDBTableOptions.getPartitionKeyName()).keyType(KeyType.HASH) - .build(); + .attributeName(createDynamoDBTableOptions.getPartitionKeyName()).keyType(KeyType.HASH) + .build(); final List keySchema = new ArrayList<>(); keySchema.add(partitionKeyElement); final Collection attributeDefinitions = new ArrayList<>(); attributeDefinitions.add(AttributeDefinition.builder() - .attributeName(createDynamoDBTableOptions.getPartitionKeyName()) - .attributeType(ScalarAttributeType.S) - .build()); + .attributeName(createDynamoDBTableOptions.getPartitionKeyName()) + .attributeType(ScalarAttributeType.S) + .build()); if (createDynamoDBTableOptions.getSortKeyName().isPresent()) { final KeySchemaElement sortKeyElement = KeySchemaElement.builder() - .attributeName(createDynamoDBTableOptions.getSortKeyName().get()) - .keyType(KeyType.RANGE) - .build(); + .attributeName(createDynamoDBTableOptions.getSortKeyName().get()) + .keyType(KeyType.RANGE) + .build(); keySchema.add(sortKeyElement); attributeDefinitions.add(AttributeDefinition.builder() - .attributeName(createDynamoDBTableOptions.getSortKeyName().get()) - .attributeType(ScalarAttributeType.S) - .build()); + .attributeName(createDynamoDBTableOptions.getSortKeyName().get()) + .attributeType(ScalarAttributeType.S) + .build()); } final CreateTableRequest createTableRequest = CreateTableRequest.builder() - .tableName(createDynamoDBTableOptions.getTableName()) - .keySchema(keySchema) - .provisionedThroughput(createDynamoDBTableOptions.getProvisionedThroughput()) - .attributeDefinitions(attributeDefinitions) - .build(); + .tableName(createDynamoDBTableOptions.getTableName()) + .keySchema(keySchema) + .provisionedThroughput(createDynamoDBTableOptions.getProvisionedThroughput()) + .attributeDefinitions(attributeDefinitions) + .build(); createDynamoDBTableOptions.getDynamoDBClient().createTable(createTableRequest); } @@ -403,7 +407,8 @@ public static void createLockTableInDynamoDB(final CreateDynamoDBTableOptions cr * @return the lock * @throws InterruptedException in case the Thread.sleep call was interrupted while waiting to refresh. */ - @SuppressWarnings("resource") // LockItem.close() does not need to be called until the lock is acquired, so we suppress the warning here. + @SuppressWarnings("resource") + // LockItem.close() does not need to be called until the lock is acquired, so we suppress the warning here. public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGrantedException, InterruptedException { Objects.requireNonNull(options, "Cannot acquire lock when options is null"); Objects.requireNonNull(options.getPartitionKey(), "Cannot acquire lock when key is null"); @@ -418,12 +423,16 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran } } - if (options.getAdditionalAttributes().containsKey(this.partitionKeyName) || options.getAdditionalAttributes().containsKey(OWNER_NAME) || options - .getAdditionalAttributes().containsKey(LEASE_DURATION) || options.getAdditionalAttributes().containsKey(RECORD_VERSION_NUMBER) || options - .getAdditionalAttributes().containsKey(DATA) || this.sortKeyName.isPresent() && options.getAdditionalAttributes().containsKey(this.sortKeyName.get())) { + if (options.getAdditionalAttributes().containsKey(this.partitionKeyName) || + options.getAdditionalAttributes().containsKey(OWNER_NAME) || + options.getAdditionalAttributes().containsKey(LEASE_DURATION) || + options.getAdditionalAttributes().containsKey(LOOKUP_TIME) || + options.getAdditionalAttributes().containsKey(RECORD_VERSION_NUMBER) || + options.getAdditionalAttributes().containsKey(DATA) || + this.sortKeyName.isPresent() && options.getAdditionalAttributes().containsKey(this.sortKeyName.get())) { throw new IllegalArgumentException(String - .format("Additional attribute cannot be one of the following types: " + "%s, %s, %s, %s, %s", this.partitionKeyName, OWNER_NAME, LEASE_DURATION, - RECORD_VERSION_NUMBER, DATA)); + .format("Additional attribute cannot be one of the following types: " + "%s, %s, %s, %s, %s, %s", + this.partitionKeyName, OWNER_NAME, LEASE_DURATION, LOOKUP_TIME, RECORD_VERSION_NUMBER, DATA)); } long millisecondsToWait = DEFAULT_BUFFER_MS; @@ -455,9 +464,9 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran boolean alreadySleptOnceForOneLeasePeriod = false; final GetLockOptions getLockOptions = new GetLockOptions.GetLockOptionsBuilder(key) - .withSortKey(sortKey.orElse(null)) - .withDeleteLockOnRelease(deleteLockOnRelease) - .build(); + .withSortKey(sortKey.orElse(null)) + .withDeleteLockOnRelease(deleteLockOnRelease) + .build(); while (true) { try { @@ -469,14 +478,6 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran throw new LockNotGrantedException("Lock does not exist."); } - if (options.shouldSkipBlockingWait() && existingLock.isPresent() && !existingLock.get().isExpired()) { - /* - * The lock is being held by some one and is still not expired. And the caller explicitly said not to perform a blocking wait; - * We will throw back a lock not grant exception, so that the caller can retry if needed. - */ - throw new LockCurrentlyUnavailableException("The lock being requested is being held by another client."); - } - Optional newLockData = Optional.empty(); if (replaceData) { newLockData = options.getData(); @@ -493,12 +494,13 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran item.put(this.partitionKeyName, AttributeValue.builder().s(key).build()); item.put(OWNER_NAME, AttributeValue.builder().s(this.ownerName).build()); item.put(LEASE_DURATION, AttributeValue.builder().s(String.valueOf(this.leaseDurationInMilliseconds)).build()); + item.put(LOOKUP_TIME, AttributeValue.builder().s(String.valueOf(LockClientUtils.INSTANCE.millisecondTime())).build()); final String recordVersionNumber = this.generateRecordVersionNumber(); item.put(RECORD_VERSION_NUMBER, AttributeValue.builder().s(String.valueOf(recordVersionNumber)).build()); sortKeyName.ifPresent(sortKeyName -> item.put(sortKeyName, AttributeValue.builder().s(sortKey.get()).build())); newLockData.ifPresent(byteBuffer -> item.put(DATA, AttributeValue.builder().b(SdkBytes.fromByteBuffer(byteBuffer)).build())); - //if the existing lock does not exist or exists and is released + //if the existing lock does not exist or exists and is released/expired if (!existingLock.isPresent() && !options.getAcquireOnlyIfLockAlreadyExists()) { return upsertAndMonitorNewLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, item, recordVersionNumber); @@ -513,28 +515,33 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran //lockTryingToBeAcquired only ever gets set to non-null values after this point. //so it is impossible to get in this /* - * Someone else has the lock, and they have the lock for LEASE_DURATION time. At this point, we need - * to wait at least LEASE_DURATION milliseconds before we can try to acquire the lock. + * Someone else has the lock, and they have the lock for LEASE_DURATION time. At this point, + * we may need to wait up to LEASE_DURATION milliseconds before we can try to acquire the lock. */ lockTryingToBeAcquired = existingLock.get(); + if (options.shouldSkipBlockingWait() && !lockTryingToBeAcquired.isExpired()) { + throw new LockCurrentlyUnavailableException("The lock being requested has not expired its current lease duration."); + } if (!alreadySleptOnceForOneLeasePeriod) { alreadySleptOnceForOneLeasePeriod = true; - millisecondsToWait += existingLock.get().getLeaseDuration(); + millisecondsToWait += Math.max( + lockTryingToBeAcquired.getLookupTime() + lockTryingToBeAcquired.getLeaseDuration() - + LockClientUtils.INSTANCE.millisecondTime(), + 1); } - } else { - if (lockTryingToBeAcquired.getRecordVersionNumber().equals(existingLock.get().getRecordVersionNumber())) { - /* If the version numbers match, then we can acquire the lock, assuming it has already expired */ - if (lockTryingToBeAcquired.isExpired()) { - return upsertAndMonitorExpiredLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock, newLockData, item, - recordVersionNumber); - } - } else { - /* - * If the version number changed since we last queried the lock, then we need to update - * lockTryingToBeAcquired as the lock has been refreshed since we last checked - */ - lockTryingToBeAcquired = existingLock.get(); + } + if (lockTryingToBeAcquired.getRecordVersionNumber().equals(existingLock.get().getRecordVersionNumber())) { + /* If the version numbers match, then we can acquire the lock, assuming it has already expired */ + if (lockTryingToBeAcquired.isExpired()) { + return upsertAndMonitorExpiredLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock, newLockData, item, + recordVersionNumber); } + } else { + /* + * If the version number changed since we last queried the lock, then we need to update + * lockTryingToBeAcquired as the lock has been refreshed since we last checked + */ + lockTryingToBeAcquired = existingLock.get(); } } catch (final ConditionalCheckFailedException conditionalCheckFailedException) { /* Someone else acquired the lock while we tried to do so, so we throw an exception */ @@ -556,7 +563,7 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran } catch (final LockNotGrantedException x) { if (LockClientUtils.INSTANCE.millisecondTime() - currentTimeMillis > millisecondsToWait) { logger.debug("This client waited more than millisecondsToWait=" + millisecondsToWait - + " ms since the beginning of this acquire call.", x); + + " ms since the beginning of this acquire call.", x); throw x; } } @@ -569,7 +576,7 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran } /** - * Returns true if the client currently owns the lock with @param key and @param sortKey. It returns false otherwise. + * Returns true if the client currently owns the lock with {@param key} and {@param sortKey}. It returns false otherwise. * * @param key The partition key representing the lock. * @param sortKey The sort key if present. @@ -582,7 +589,7 @@ public boolean hasLock(final String key, final Optional sortKey) { } private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, - Optional sessionMonitor, Optional existingLock, Optional newLockData, Map item, String recordVersionNumber) { + Optional sessionMonitor, Optional existingLock, Optional newLockData, Map item, String recordVersionNumber) { final String conditionalExpression; final Map expressionAttributeValues = new HashMap<>(); final boolean updateExistingLockRecord = options.getUpdateExistingLockRecord(); @@ -606,13 +613,13 @@ private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String final String updateExpression = getUpdateExpressionAndUpdateNameValueMaps(item, expressionAttributeNames, expressionAttributeValues); final UpdateItemRequest updateItemRequest = UpdateItemRequest.builder().tableName(tableName).key(getItemKeys(existingLock.get())) - .updateExpression(updateExpression).expressionAttributeNames(expressionAttributeNames) - .expressionAttributeValues(expressionAttributeValues).conditionExpression(conditionalExpression).build(); + .updateExpression(updateExpression).expressionAttributeNames(expressionAttributeNames) + .expressionAttributeValues(expressionAttributeValues).conditionExpression(conditionalExpression).build(); logger.trace("Acquiring an existing lock whose revisionVersionNumber did not change for " + partitionKeyName + " partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); return updateItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, updateItemRequest); } else { final PutItemRequest putItemRequest = PutItemRequest.builder().item(item).tableName(tableName).conditionExpression(conditionalExpression) - .expressionAttributeNames(expressionAttributeNames).expressionAttributeValues(expressionAttributeValues).build(); + .expressionAttributeNames(expressionAttributeNames).expressionAttributeValues(expressionAttributeValues).build(); logger.trace("Acquiring an existing lock whose revisionVersionNumber did not change for " + partitionKeyName + " partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest); @@ -620,8 +627,8 @@ private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String } private LockItem upsertAndMonitorReleasedLock(AcquireLockOptions options, String key, Optional sortKey, boolean - deleteLockOnRelease, Optional sessionMonitor, Optional existingLock, Optional - newLockData, Map item, String recordVersionNumber) { + deleteLockOnRelease, Optional sessionMonitor, Optional existingLock, Optional + newLockData, Map item, String recordVersionNumber) { final String conditionalExpression; final boolean updateExistingLockRecord = options.getUpdateExistingLockRecord(); @@ -660,20 +667,20 @@ private LockItem upsertAndMonitorReleasedLock(AcquireLockOptions options, String item.remove(sortKeyName.get()); } final String updateExpression = getUpdateExpressionAndUpdateNameValueMaps(item, expressionAttributeNames, expressionAttributeValues) - + REMOVE_IS_RELEASED_UPDATE_EXPRESSION; + + REMOVE_IS_RELEASED_UPDATE_EXPRESSION; final UpdateItemRequest updateItemRequest = UpdateItemRequest.builder().tableName(tableName).key(getItemKeys(existingLock.get())) - .updateExpression(updateExpression).expressionAttributeNames(expressionAttributeNames) - .expressionAttributeValues(expressionAttributeValues).conditionExpression(conditionalExpression).build(); + .updateExpression(updateExpression).expressionAttributeNames(expressionAttributeNames) + .expressionAttributeValues(expressionAttributeValues).conditionExpression(conditionalExpression).build(); logger.trace("Acquiring an existing released whose revisionVersionNumber did not change for " + partitionKeyName + " " + - "partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); + "partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); return updateItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, updateItemRequest); } else { final PutItemRequest putItemRequest = PutItemRequest.builder().item(item).tableName(tableName).conditionExpression(conditionalExpression) - .expressionAttributeNames(expressionAttributeNames).expressionAttributeValues(expressionAttributeValues).build(); + .expressionAttributeNames(expressionAttributeNames).expressionAttributeValues(expressionAttributeValues).build(); logger.trace("Acquiring an existing released lock whose revisionVersionNumber did not change for " + partitionKeyName + " " + - "partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); + "partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest); } @@ -685,8 +692,8 @@ private LockItem updateItemAndStartSessionMonitor(AcquireLockOptions options, St final long lastUpdatedTime = LockClientUtils.INSTANCE.millisecondTime(); this.dynamoDB.updateItem(updateItemRequest); final LockItem lockItem = - new LockItem(this, key, sortKey, newLockData, deleteLockOnRelease, this.ownerName, this.leaseDurationInMilliseconds, lastUpdatedTime, - recordVersionNumber, !IS_RELEASED_INDICATOR, sessionMonitor, options.getAdditionalAttributes()); + new LockItem(this, key, sortKey, newLockData, deleteLockOnRelease, this.ownerName, this.leaseDurationInMilliseconds, lastUpdatedTime, + recordVersionNumber, !IS_RELEASED_INDICATOR, sessionMonitor, options.getAdditionalAttributes()); this.locks.put(lockItem.getUniqueIdentifier(), lockItem); this.tryAddSessionMonitor(lockItem.getUniqueIdentifier(), lockItem); return lockItem; @@ -694,19 +701,20 @@ private LockItem updateItemAndStartSessionMonitor(AcquireLockOptions options, St /** * This method puts a new lock item in the lock table and returns an optionally monitored LockItem object - * @param options a wrapper of RequestMetricCollector and an "additional attributes" map - * @param key the partition key of the lock to write - * @param sortKey the optional sort key of the lock to write + * + * @param options a wrapper of RequestMetricCollector and an "additional attributes" map + * @param key the partition key of the lock to write + * @param sortKey the optional sort key of the lock to write * @param deleteLockOnRelease whether or not to delete the lock when releasing it - * @param sessionMonitor the optional session monitor to start for this lock - * @param newLockData the new lock data - * @param item the lock item to write to the lock table + * @param sessionMonitor the optional session monitor to start for this lock + * @param newLockData the new lock data + * @param item the lock item to write to the lock table * @param recordVersionNumber the rvn to condition the PutItem call on. * @return a new monitored LockItem */ private LockItem upsertAndMonitorNewLock(AcquireLockOptions options, String key, Optional sortKey, - boolean deleteLockOnRelease, Optional sessionMonitor, - Optional newLockData, Map item, String recordVersionNumber) { + boolean deleteLockOnRelease, Optional sessionMonitor, + Optional newLockData, Map item, String recordVersionNumber) { final Map expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put(PK_PATH_EXPRESSION_VARIABLE, this.partitionKeyName); @@ -730,25 +738,25 @@ private LockItem upsertAndMonitorNewLock(AcquireLockOptions options, String key, final Map expressionAttributeValues = new HashMap<>(); final String updateExpression = getUpdateExpressionAndUpdateNameValueMaps(item, expressionAttributeNames, expressionAttributeValues); final UpdateItemRequest updateItemRequest = UpdateItemRequest.builder().tableName(tableName).key(getKeys(key, sortKey)) - .updateExpression(updateExpression).expressionAttributeNames(expressionAttributeNames) - .expressionAttributeValues(expressionAttributeValues).conditionExpression(conditionalExpression).build(); + .updateExpression(updateExpression).expressionAttributeNames(expressionAttributeNames) + .expressionAttributeValues(expressionAttributeValues).conditionExpression(conditionalExpression).build(); logger.trace("Acquiring a new lock on " + partitionKeyName + "=" + key + ", " + this.sortKeyName + "=" + sortKey); return updateItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, updateItemRequest); } else { final PutItemRequest putItemRequest = PutItemRequest.builder().item(item).tableName(tableName) - .conditionExpression(conditionalExpression) - .expressionAttributeNames(expressionAttributeNames).build(); - /* No one has the lock, go ahead and acquire it. - * The person storing the lock into DynamoDB should err on the side of thinking the lock will expire - * sooner than it actually will, so they start counting towards its expiration before the Put succeeds - */ + .conditionExpression(conditionalExpression) + .expressionAttributeNames(expressionAttributeNames).build(); + /* No one has the lock, go ahead and acquire it. + * The person storing the lock into DynamoDB should err on the side of thinking the lock will expire + * sooner than it actually will, so they start counting towards its expiration before the Put succeeds + */ logger.trace("Acquiring a new lock on " + partitionKeyName + "=" + key + ", " + this.sortKeyName + "=" + sortKey); return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest); } } private LockItem putLockItemAndStartSessionMonitor(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, - Optional sessionMonitor, Optional newLockData, String recordVersionNumber, PutItemRequest putItemRequest) { + Optional sessionMonitor, Optional newLockData, String recordVersionNumber, PutItemRequest putItemRequest) { final long lastUpdatedTime = LockClientUtils.INSTANCE.millisecondTime(); this.dynamoDB.putItem(putItemRequest); @@ -763,13 +771,14 @@ private LockItem putLockItemAndStartSessionMonitor(AcquireLockOptions options, S /** * Builds an updateExpression for all fields in item map and updates the correspoding expression attribute name and * value maps. - * @param item Map of Name and AttributeValue to update or create + * + * @param item Map of Name and AttributeValue to update or create * @param expressionAttributeNames * @param expressionAttributeValues * @return */ private String getUpdateExpressionAndUpdateNameValueMaps(Map item, - Map expressionAttributeNames, Map expressionAttributeValues) { + Map expressionAttributeNames, Map expressionAttributeValues) { final String additionalUpdateExpression = "SET "; StringBuilder updateExpressionBuilder = new StringBuilder(additionalUpdateExpression); int i = 0; @@ -864,12 +873,12 @@ public boolean releaseLock(final ReleaseLockOptions options) { final Map key = getItemKeys(lockItem); if (deleteLock) { final DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder() - .tableName(tableName) - .key(key) - .conditionExpression(conditionalExpression) - .expressionAttributeNames(expressionAttributeNames) - .expressionAttributeValues(expressionAttributeValues) - .build(); + .tableName(tableName) + .key(key) + .conditionExpression(conditionalExpression) + .expressionAttributeNames(expressionAttributeNames) + .expressionAttributeValues(expressionAttributeValues) + .build(); this.dynamoDB.deleteItem(deleteItemRequest); } else { @@ -884,12 +893,12 @@ public boolean releaseLock(final ReleaseLockOptions options) { updateExpression = UPDATE_IS_RELEASED; } final UpdateItemRequest updateItemRequest = UpdateItemRequest.builder() - .tableName(this.tableName) - .key(key) - .updateExpression(updateExpression) - .conditionExpression(conditionalExpression) - .expressionAttributeNames(expressionAttributeNames) - .expressionAttributeValues(expressionAttributeValues).build(); + .tableName(this.tableName) + .key(key) + .updateExpression(updateExpression) + .conditionExpression(conditionalExpression) + .expressionAttributeNames(expressionAttributeNames) + .expressionAttributeValues(expressionAttributeValues).build(); this.dynamoDB.updateItem(updateItemRequest); } @@ -1017,14 +1026,20 @@ private LockItem createLockItem(final GetLockOptions options, final Map getAllLocksFromDynamoDB(final boolean deleteOnRelease) { * Not that this may return a lock item even if it was released. *

* - * @param key the partition key + * @param key the partition key * @param deleteOnRelease Whether or not the {@link LockItem} should delete the item * when {@link LockItem#close()} is called on it. * @return A non parallel {@link Stream} of {@link LockItem}s that has the partition key in @@ -1155,6 +1170,7 @@ public void sendHeartbeat(final SendHeartbeatOptions options) { final Map expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put(PK_PATH_EXPRESSION_VARIABLE, partitionKeyName); expressionAttributeNames.put(LEASE_DURATION_PATH_VALUE_EXPRESSION_VARIABLE, LEASE_DURATION); + expressionAttributeNames.put(LOOKUP_TIME_PATH_VALUE_EXPRESSION_VARIABLE, LOOKUP_TIME); expressionAttributeNames.put(RVN_PATH_EXPRESSION_VARIABLE, RECORD_VERSION_NUMBER); expressionAttributeNames.put(OWNER_NAME_PATH_EXPRESSION_VARIABLE, OWNER_NAME); if (this.sortKeyName.isPresent()) { @@ -1165,32 +1181,33 @@ public void sendHeartbeat(final SendHeartbeatOptions options) { } final String recordVersionNumber = this.generateRecordVersionNumber(); + final long lastUpdateOfLock = LockClientUtils.INSTANCE.millisecondTime(); //Set up update expression for UpdateItem. final String updateExpression; expressionAttributeValues.put(NEW_RVN_VALUE_EXPRESSION_VARIABLE, AttributeValue.builder().s(recordVersionNumber).build()); expressionAttributeValues.put(LEASE_DURATION_VALUE_EXPRESSION_VARIABLE, AttributeValue.builder().s(String.valueOf(leaseDurationToEnsureInMilliseconds)).build()); + expressionAttributeValues.put(LOOKUP_TIME_VALUE_EXPRESSION_VARIABLE, AttributeValue.builder().s(String.valueOf(lastUpdateOfLock)).build()); if (deleteData) { expressionAttributeNames.put(DATA_PATH_EXPRESSION_VARIABLE, DATA); - updateExpression = UPDATE_LEASE_DURATION_AND_RVN_AND_REMOVE_DATA; + updateExpression = UPDATE_LEASE_DURATION_AND_LOOKUP_TIME_AND_RVN_AND_REMOVE_DATA; } else if (options.getData().isPresent()) { expressionAttributeNames.put(DATA_PATH_EXPRESSION_VARIABLE, DATA); expressionAttributeValues.put(DATA_VALUE_EXPRESSION_VARIABLE, AttributeValue.builder().b(SdkBytes.fromByteBuffer(options.getData().get())).build()); - updateExpression = UPDATE_LEASE_DURATION_AND_RVN_AND_DATA; + updateExpression = UPDATE_LEASE_DURATION_AND_LOOKUP_TIME_AND_RVN_AND_DATA; } else { - updateExpression = UPDATE_LEASE_DURATION_AND_RVN; + updateExpression = UPDATE_LEASE_DURATION_AND_LOOKUP_TIME_AND_RVN; } final UpdateItemRequest updateItemRequest = UpdateItemRequest.builder() - .tableName(tableName) - .key(getItemKeys(lockItem)) - .conditionExpression(conditionalExpression) - .updateExpression(updateExpression) - .expressionAttributeNames(expressionAttributeNames) - .expressionAttributeValues(expressionAttributeValues).build(); + .tableName(tableName) + .key(getItemKeys(lockItem)) + .conditionExpression(conditionalExpression) + .updateExpression(updateExpression) + .expressionAttributeNames(expressionAttributeNames) + .expressionAttributeValues(expressionAttributeValues).build(); try { - final long lastUpdateOfLock = LockClientUtils.INSTANCE.millisecondTime(); this.dynamoDB.updateItem(updateItemRequest); lockItem.updateRecordVersionNumber(recordVersionNumber, lastUpdateOfLock, leaseDurationToEnsureInMilliseconds); if (deleteData) { @@ -1282,8 +1299,8 @@ private GetItemResponse readFromDynamoDB(final String key, final Optional item) { + Map item) { GetLockOptionsBuilder options = GetLockOptions.builder(key).withDeleteLockOnRelease(deleteOnRelease); options = this.sortKeyName.map(item::get).map(AttributeValue::s).map(options::withSortKey).orElse(options); diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java index 0480d56..c4a65b6 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java @@ -110,6 +110,7 @@ public void releaseLock_whenRemoveKillSessionMonitorJoinInterrupted_swallowsInte item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s("oolala").build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); when(dynamodb.getItem(ArgumentMatchers.any())).thenReturn(GetItemResponse.builder().item(item).build()); LockItem lockItem = lockClient.acquireLock(AcquireLockOptions.builder(PARTITION_KEY) .withSessionMonitor(3001, @@ -162,6 +163,7 @@ public void acquireLock_whenLockAlreadyExists_throwLockNotGrantedException() thr Map lockItem = new HashMap<>(3); lockItem.put("ownerName", AttributeValue.builder().s("owner").build()); lockItem.put("leaseDuration", AttributeValue.builder().s("1").build()); + lockItem.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); lockItem.put("recordVersionNumber", AttributeValue.builder().s("uuid").build()); when(dynamodb.getItem(ArgumentMatchers.any())).thenReturn(GetItemResponse.builder().item(lockItem).build()); when(dynamodb.putItem(ArgumentMatchers.any())).thenThrow(ConditionalCheckFailedException.builder().message("item existed").build()); @@ -178,6 +180,7 @@ public void acquireLock_whenProvisionedThroughputExceeds_throwLockNotGrantedExce item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s("oolala").build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); when(dynamodb.getItem(Mockito.any())).thenReturn(GetItemResponse.builder().item(item).build()); when(dynamodb.putItem(Mockito.any())).thenThrow(ProvisionedThroughputExceededException.builder() .message("Provisioned Throughput for the table exceeded").build()); @@ -212,6 +215,7 @@ public void acquireLock_withAcquireOnlyIfLockAlreadyExistsTrue_releasedLockCondi item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s(uuid.toString()).build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); item.put("isReleased", AttributeValue.builder().bool(true).build()); doAnswer((InvocationOnMock invocation) -> GetItemResponse.builder().item(item).build()) @@ -229,6 +233,7 @@ public void acquireLock_withAcquireOnlyIfLockAlreadyExists_releasedLockGetsCreat item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s(uuid.toString()).build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); item.put("isReleased", AttributeValue.builder().bool(true).build()); when(dynamodb.getItem(Mockito.any())).thenReturn(GetItemResponse.builder().item(item).build()); LockItem lockItem = client.acquireLock(AcquireLockOptions.builder("asdf").withAcquireOnlyIfLockAlreadyExists(true).build()); @@ -245,6 +250,7 @@ public void acquireLock_withReentrant_doesNotFailIfHoldingLock() throws Interrup item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s(uuid.toString()).build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); Map differentRvn1 = new HashMap<>(item); differentRvn1.put("recordVersionNumber", AttributeValue.builder().s("uuid1").build()); @@ -311,6 +317,7 @@ public void acquireLock_whenLockAlreadyExistsAndIsNotReleased_andWhenHaveSleptFo item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s(uuid.toString()).build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); Map differentItem = new HashMap<>(item); differentItem.put("recordVersionNumber", AttributeValue.builder().s("a different uuid").build()); @@ -336,6 +343,7 @@ public void acquireLock_withConsistentLockDataTrue_releasedLockConditionalCheckF item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s(uuid.toString()).build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); item.put("isReleased", AttributeValue.builder().bool(true).build()); doAnswer((InvocationOnMock invocation) -> GetItemResponse.builder().item(item).build()) .when(dynamodb).getItem(Mockito.any()); @@ -351,6 +359,7 @@ public void acquireLock_withNotUpdateRecordAndConsistentLockDataTrue_releasedLoc item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s("a specific rvn").build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); item.put("isReleased", AttributeValue.builder().bool(true).build()); when(dynamodb.getItem(Mockito.any())).thenReturn(GetItemResponse.builder().item(item).build()); LockItem lockItem = client.acquireLock(AcquireLockOptions.builder("asdf").withAcquireReleasedLocksConsistently(true).withUpdateExistingLockRecord @@ -372,6 +381,7 @@ public void acquireLock_withUpdateRecordAndConsistentLockDataTrue_releasedLockGe item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s("a specific rvn").build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); item.put("isReleased", AttributeValue.builder().bool(true).build()); when(dynamodb.getItem(Mockito.any())).thenReturn(GetItemResponse.builder().item(item).build()); LockItem lockItem = client.acquireLock(AcquireLockOptions.builder("asdf").withAcquireReleasedLocksConsistently(true).withUpdateExistingLockRecord @@ -416,6 +426,7 @@ public void acquireLock_whenLockExistsAndIsExpired_andSkipBlockingWaitIsTurnedOn item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s(uuid.toString()).build()); item.put("leaseDuration", AttributeValue.builder().s("1").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); item.put("isReleased", AttributeValue.builder().bool(true).build()); when(dynamodb.getItem(Mockito.any())) .thenReturn(GetItemResponse.builder().item(item).build()) @@ -439,6 +450,7 @@ public void acquireLock_whenLockAlreadyExistsAndIsNotReleased_andSkipBlockingWai item.put("ownerName", AttributeValue.builder().s("foobar").build()); item.put("recordVersionNumber", AttributeValue.builder().s(uuid.toString()).build()); item.put("leaseDuration", AttributeValue.builder().s("100000").build()); + item.put("lookupTime", AttributeValue.builder().s("" + LockClientUtils.INSTANCE.millisecondTime()).build()); when(dynamodb.getItem(Mockito.any())) .thenReturn(GetItemResponse.builder().item(item).build()) .thenReturn(GetItemResponse.builder().build());