From db6879a5ddef7a70cbf76c5a39f1c4f23c4e2a77 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Fri, 14 Feb 2020 13:19:47 -0800 Subject: [PATCH] Add handleUndeleteRequest method to AmbryRequests (#1379) Adding handleUndeleteRequest method to AmbryRequests class to handle undelete requests from ambry-frontend. Add unit test and integration test for handleUndeleteRequest Change some of the Store implementation in replication test, but since we are not change replication logic for undelete now, we don't change those implementation to support undelete for now. Co-authored-by: David Harju --- .../com.github.ambry/config/ServerConfig.java | 9 + .../server/ServerErrorCode.java | 6 +- .../ErrorMapping.java | 4 + .../ServerMetrics.java | 27 +++ .../MessageFormatInputStreamTest.java | 2 +- .../AmbryRequests.java | 90 ++++++++- .../UndeleteResponse.java | 18 +- .../InMemoryStore.java | 60 ++++-- .../MockConnectionPool.java | 2 +- .../ReplicationTest.java | 31 +-- .../com.github.ambry.server/MockCluster.java | 1 + .../ServerTestUtil.java | 34 ++++ .../AmbryServerRequests.java | 10 +- .../AmbryServerRequestsTest.java | 184 ++++++++++++++++-- .../MockStorageManager.java | 4 +- 15 files changed, 424 insertions(+), 58 deletions(-) diff --git a/ambry-api/src/main/java/com.github.ambry/config/ServerConfig.java b/ambry-api/src/main/java/com.github.ambry/config/ServerConfig.java index 3712187440..9f8c4bfc7a 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/ServerConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/ServerConfig.java @@ -93,6 +93,13 @@ public class ServerConfig { @Default("false") public final boolean serverValidateRequestBasedOnStoreState; + /** + * True to enable ambry server handling undelete requests. + */ + @Config("server.handle.undelete.request.enabled") + @Default("false") + public final boolean serverHandleUndeleteRequestEnabled; + public ServerConfig(VerifiableProperties verifiableProperties) { serverRequestHandlerNumOfThreads = verifiableProperties.getInt("server.request.handler.num.of.threads", 7); serverSchedulerNumOfthreads = verifiableProperties.getInt("server.scheduler.num.of.threads", 10); @@ -110,5 +117,7 @@ public ServerConfig(VerifiableProperties verifiableProperties) { Utils.splitString(verifiableProperties.getString("server.stats.reports.to.publish", ""), ","); serverValidateRequestBasedOnStoreState = verifiableProperties.getBoolean("server.validate.request.based.on.store.state", false); + serverHandleUndeleteRequestEnabled = + verifiableProperties.getBoolean("server.handle.undelete.request.enabled", false); } } diff --git a/ambry-api/src/main/java/com.github.ambry/server/ServerErrorCode.java b/ambry-api/src/main/java/com.github.ambry/server/ServerErrorCode.java index fe076cfb79..c3e5f3409b 100644 --- a/ambry-api/src/main/java/com.github.ambry/server/ServerErrorCode.java +++ b/ambry-api/src/main/java/com.github.ambry/server/ServerErrorCode.java @@ -37,5 +37,9 @@ public enum ServerErrorCode { Blob_Already_Updated, Blob_Update_Not_Allowed, Replica_Unavailable, - Blob_Authorization_Failure + Blob_Authorization_Failure, + Blob_Life_Version_Conflict, + Blob_Not_Deleted, + Blob_Already_Undeleted, + Blob_Deleted_Permanently } diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/ErrorMapping.java b/ambry-commons/src/main/java/com.github.ambry.commons/ErrorMapping.java index 5c32f4cfdc..5b10e91cf3 100644 --- a/ambry-commons/src/main/java/com.github.ambry.commons/ErrorMapping.java +++ b/ambry-commons/src/main/java/com.github.ambry.commons/ErrorMapping.java @@ -38,6 +38,10 @@ public class ErrorMapping { tempMap.put(StoreErrorCodes.Authorization_Failure, ServerErrorCode.Blob_Authorization_Failure); tempMap.put(StoreErrorCodes.Already_Updated, ServerErrorCode.Blob_Already_Updated); tempMap.put(StoreErrorCodes.Update_Not_Allowed, ServerErrorCode.Blob_Update_Not_Allowed); + tempMap.put(StoreErrorCodes.Life_Version_Conflict, ServerErrorCode.Blob_Life_Version_Conflict); + tempMap.put(StoreErrorCodes.ID_Not_Deleted, ServerErrorCode.Blob_Not_Deleted); + tempMap.put(StoreErrorCodes.ID_Undeleted, ServerErrorCode.Blob_Already_Undeleted); + tempMap.put(StoreErrorCodes.ID_Deleted_Permanently, ServerErrorCode.Blob_Deleted_Permanently); storeErrorMapping = Collections.unmodifiableMap(tempMap); } diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java index d61b29e0ae..f450bf6495 100644 --- a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java +++ b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java @@ -100,6 +100,12 @@ public class ServerMetrics { public final Histogram deleteBlobSendTimeInMs; public final Histogram deleteBlobTotalTimeInMs; + public final Histogram undeleteBlobRequestQueueTimeInMs; + public final Histogram undeleteBlobProcessingTimeInMs; + public final Histogram undeleteBlobResponseQueueTimeInMs; + public final Histogram undeleteBlobSendTimeInMs; + public final Histogram undeleteBlobTotalTimeInMs; + public final Histogram updateBlobTtlRequestQueueTimeInMs; public final Histogram updateBlobTtlProcessingTimeInMs; public final Histogram updateBlobTtlResponseQueueTimeInMs; @@ -157,6 +163,7 @@ public class ServerMetrics { public final Meter getBlobAllByReplicaRequestRate; public final Meter getBlobInfoRequestRate; public final Meter deleteBlobRequestRate; + public final Meter undeleteBlobRequestRate; public final Meter updateBlobTtlRequestRate; public final Meter replicaMetadataRequestRate; public final Meter triggerCompactionRequestRate; @@ -183,6 +190,7 @@ public class ServerMetrics { public final Counter unExpectedStoreGetError; public final Counter unExpectedStoreTtlUpdateError; public final Counter unExpectedStoreDeleteError; + public final Counter unExpectedStoreUndeleteError; public final Counter unExpectedAdminOperationError; public final Counter unExpectedStoreFindEntriesError; public final Counter idAlreadyExistError; @@ -190,11 +198,15 @@ public class ServerMetrics { public final Counter unknownFormatError; public final Counter idNotFoundError; public final Counter idDeletedError; + public final Counter idUndeletedError; + public final Counter idNotDeletedError; + public final Counter lifeVersionConflictError; public final Counter ttlExpiredError; public final Counter badRequestError; public final Counter temporarilyDisabledError; public final Counter getAuthorizationFailure; public final Counter deleteAuthorizationFailure; + public final Counter undeleteAuthorizationFailure; public final Counter ttlUpdateAuthorizationFailure; public final Counter ttlAlreadyUpdatedError; public final Counter ttlUpdateRejectedError; @@ -303,6 +315,15 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime")); deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime")); + undeleteBlobRequestQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime")); + undeleteBlobProcessingTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobProcessingTime")); + undeleteBlobResponseQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobResponseQueueTime")); + undeleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobSendTime")); + undeleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobTotalTime")); + updateBlobTtlRequestQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestQueueTime")); updateBlobTtlProcessingTimeInMs = @@ -399,6 +420,7 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se registry.meter(MetricRegistry.name(requestClass, "GetBlobAllByReplicaRequestRate")); getBlobInfoRequestRate = registry.meter(MetricRegistry.name(requestClass, "GetBlobInfoRequestRate")); deleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "DeleteBlobRequestRate")); + undeleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "UndeleteBlobRequestRate")); updateBlobTtlRequestRate = registry.meter(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestRate")); replicaMetadataRequestRate = registry.meter(MetricRegistry.name(requestClass, "ReplicaMetadataRequestRate")); triggerCompactionRequestRate = registry.meter(MetricRegistry.name(requestClass, "TriggerCompactionRequestRate")); @@ -426,12 +448,16 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se unknownFormatError = registry.counter(MetricRegistry.name(requestClass, "UnknownFormatError")); idNotFoundError = registry.counter(MetricRegistry.name(requestClass, "IDNotFoundError")); idDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDDeletedError")); + idUndeletedError = registry.counter(MetricRegistry.name(requestClass, "IDUndeletedError")); + idNotDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDNotDeletedError")); + lifeVersionConflictError = registry.counter(MetricRegistry.name(requestClass, "lifeVersionConflictError")); ttlExpiredError = registry.counter(MetricRegistry.name(requestClass, "TTLExpiredError")); temporarilyDisabledError = registry.counter(MetricRegistry.name(requestClass, "TemporarilyDisabledError")); badRequestError = registry.counter(MetricRegistry.name(requestClass, "BadRequestError")); unExpectedStorePutError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStorePutError")); unExpectedStoreGetError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreGetError")); unExpectedStoreDeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreDeleteError")); + unExpectedStoreUndeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreUndeleteError")); unExpectedAdminOperationError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedAdminOperationError")); unExpectedStoreTtlUpdateError = @@ -440,6 +466,7 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreFindEntriesError")); getAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "GetAuthorizationFailure")); deleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "DeleteAuthorizationFailure")); + undeleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "UndeleteAuthorizationFailure")); ttlUpdateAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "TtlUpdateAuthorizationFailure")); ttlAlreadyUpdatedError = registry.counter(MetricRegistry.name(requestClass, "TtlAlreadyUpdatedError")); diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java index 3ddc142c82..0c4d8e8fa4 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java @@ -505,7 +505,7 @@ public void messageFormatUndeleteUpdateRecordTest() throws Exception { * @param updateTimeMs the expected time of update * @throws Exception any error. */ - private static void checkUndeleteMessage(InputStream stream, Long expectedRecordSize, StoreKey key, short accountId, + public static void checkUndeleteMessage(InputStream stream, Long expectedRecordSize, StoreKey key, short accountId, short containerId, long updateTimeMs, short lifeVersion) throws Exception { checkHeaderAndStoreKeyForUpdate(stream, expectedRecordSize, key, lifeVersion); checkUndeleteSubRecord(stream, accountId, containerId, updateTimeMs); diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java index a1d8483cbc..dd49afce3e 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java @@ -135,6 +135,9 @@ public void handleRequests(NetworkRequest request) throws InterruptedException { case AdminRequest: handleAdminRequest(request); break; + case UndeleteRequest: + handleUndeleteRequest(request); + break; default: throw new UnsupportedOperationException("Request type not supported"); } @@ -641,9 +644,84 @@ public void handleReplicaMetadataRequest(NetworkRequest request) throws IOExcept metrics.replicaMetadataSendTimeInMs, metrics.replicaMetadataTotalTimeInMs, null, null, totalTimeSpent)); } - private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, NetworkRequest request, - Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent, - long blobSize, ServerMetrics metrics) throws InterruptedException { + @Override + public void handleUndeleteRequest(NetworkRequest request) throws IOException, InterruptedException { + UndeleteRequest undeleteRequest = + UndeleteRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs(); + long totalTimeSpent = requestQueueTime; + metrics.undeleteBlobRequestQueueTimeInMs.update(requestQueueTime); + metrics.undeleteBlobRequestRate.mark(); + long startTime = SystemTime.getInstance().milliseconds(); + UndeleteResponse response = null; + try { + StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(undeleteRequest.getBlobId())).get(0); + ServerErrorCode error = + validateRequest(undeleteRequest.getBlobId().getPartition(), RequestOrResponseType.UndeleteRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating undelete request failed with error {} for request {}", error, undeleteRequest); + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), error); + } else { + BlobId convertedBlobId = (BlobId) convertedStoreKey; + MessageInfo info = + new MessageInfo(convertedBlobId, 0, convertedBlobId.getAccountId(), convertedBlobId.getContainerId(), + undeleteRequest.getOperationTimeMs()); + Store storeToUndelete = storeManager.getStore(undeleteRequest.getBlobId().getPartition()); + short lifeVersion = storeToUndelete.undelete(info); + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), lifeVersion); + if (notification != null) { + notification.onBlobReplicaUndeleted(currentNode.getHostname(), currentNode.getPort(), + convertedStoreKey.getID(), BlobReplicaSourceType.PRIMARY); + } + } + } catch (StoreException e) { + boolean logInErrorLevel = false; + if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) { + metrics.idNotFoundError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.TTL_Expired) { + metrics.ttlExpiredError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted_Permanently) { + metrics.idDeletedError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.Life_Version_Conflict) { + metrics.lifeVersionConflictError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.ID_Not_Deleted) { + metrics.idNotDeletedError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.ID_Undeleted) { + metrics.idUndeletedError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) { + metrics.undeleteAuthorizationFailure.inc(); + } else { + logInErrorLevel = true; + metrics.unExpectedStoreUndeleteError.inc(); + } + if (logInErrorLevel) { + logger.error("Store exception on a undelete with error code {} for request {}", e.getErrorCode(), + undeleteRequest, e); + } else { + logger.trace("Store exception on a undelete with error code {} for request {}", e.getErrorCode(), + undeleteRequest, e); + } + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(e.getErrorCode())); + } catch (Exception e) { + logger.error("Unknown exception for undelete request " + undeleteRequest, e); + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + ServerErrorCode.Unknown_Error); + metrics.unExpectedStoreUndeleteError.inc(); + } finally { + long processingTime = SystemTime.getInstance().milliseconds() - startTime; + totalTimeSpent += processingTime; + publicAccessLogger.info("{} {} processingTime {}", undeleteRequest, response, processingTime); + metrics.undeleteBlobProcessingTimeInMs.update(processingTime); + } + requestResponseChannel.sendResponse(response, request, + new ServerNetworkResponseMetrics(metrics.undeleteBlobResponseQueueTimeInMs, metrics.undeleteBlobSendTimeInMs, + metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent)); + } + + private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, + NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, + long totalTimeSpent, long blobSize, ServerMetrics metrics) throws InterruptedException { if (response.getError() == ServerErrorCode.No_Error) { metrics.markPutBlobRequestRateBySize(blobSize); if (blobSize <= ServerMetrics.smallBlob) { @@ -666,9 +744,9 @@ private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutR } } - private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, NetworkRequest request, - Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent, - long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException { + private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, + NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, + long totalTimeSpent, long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException { if (blobSize <= ServerMetrics.smallBlob) { if (flags == MessageFormatFlags.Blob || flags == MessageFormatFlags.All) { diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/UndeleteResponse.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/UndeleteResponse.java index 7d448d9753..3522ede36e 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/UndeleteResponse.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/UndeleteResponse.java @@ -13,6 +13,8 @@ */ package com.github.ambry.protocol; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.Utils; import java.io.DataInputStream; @@ -80,9 +82,7 @@ public static UndeleteResponse readFrom(DataInputStream stream) throws IOExcepti } } - @Override - public long writeTo(WritableByteChannel channel) throws IOException { - long written = 0; + private void prepareBuffer() { if (bufferToSend == null) { bufferToSend = ByteBuffer.allocate((int) sizeInBytes()); writeHeader(); @@ -91,12 +91,24 @@ public long writeTo(WritableByteChannel channel) throws IOException { } bufferToSend.flip(); } + } + + @Override + public long writeTo(WritableByteChannel channel) throws IOException { + long written = 0; + prepareBuffer(); if (bufferToSend.remaining() > 0) { written = channel.write(bufferToSend); } return written; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + prepareBuffer(); + channel.write(bufferToSend, callback); + } + @Override public long sizeInBytes() { return super.sizeInBytes() + (long) (getError() == ServerErrorCode.No_Error ? Life_Version_InBytes : 0); diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java index 716347958d..3b94bce0a8 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java @@ -17,6 +17,9 @@ import com.github.ambry.clustermap.ReplicaState; import com.github.ambry.router.AsyncWritableChannel; import com.github.ambry.router.Callback; +import com.github.ambry.messageformat.MessageFormatInputStream; +import com.github.ambry.messageformat.MessageFormatWriteSet; +import com.github.ambry.messageformat.UndeleteMessageFormatInputStream; import com.github.ambry.store.FindInfo; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageReadSet; @@ -205,9 +208,9 @@ public void put(MessageWriteSet messageSetToWrite) throws StoreException { List infos = new ArrayList<>(); for (MessageInfo info : newInfos) { if (info.isTtlUpdated()) { - info = - new MessageInfo(info.getStoreKey(), info.getSize(), info.isDeleted(), false, info.getExpirationTimeInMs(), - info.getCrc(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()); + info = new MessageInfo(info.getStoreKey(), info.getSize(), info.isDeleted(), false, info.isUndeleted(), + info.getExpirationTimeInMs(), info.getCrc(), info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), (short) 0); } infos.add(info); } @@ -225,36 +228,65 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException { } catch (StoreException e) { throw new IllegalStateException(e); } - MessageInfo ttlUpdateInfo = getMessageInfo(info.getStoreKey(), messageInfos, false, true); - messageInfos.add( - new MessageInfo(info.getStoreKey(), info.getSize(), true, ttlUpdateInfo != null, info.getExpirationTimeInMs(), - info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); + messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), true, info.isTtlUpdated(), false, + info.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), + info.getLifeVersion())); } } @Override public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException { for (MessageInfo info : messageSetToUpdate.getMessageSetInfo()) { - if (getMessageInfo(info.getStoreKey(), messageInfos, true, false) != null) { + if (getMessageInfo(info.getStoreKey(), messageInfos, true, false, false) != null) { throw new StoreException("Deleted", StoreErrorCodes.ID_Deleted); - } else if (getMessageInfo(info.getStoreKey(), messageInfos, false, true) != null) { + } else if (getMessageInfo(info.getStoreKey(), messageInfos, false, false, true) != null) { throw new StoreException("Updated already", StoreErrorCodes.Already_Updated); - } else if (getMessageInfo(info.getStoreKey(), messageInfos, false, false) == null) { + } else if (getMessageInfo(info.getStoreKey(), messageInfos, false, false, false) == null) { throw new StoreException("Not Found", StoreErrorCodes.ID_Not_Found); } + short lifeVersion = info.getLifeVersion(); + if (info.getLifeVersion() == 0) { + lifeVersion = getMessageInfo(info.getStoreKey(), messageInfos, false, false, false).getLifeVersion(); + } try { messageSetToUpdate.writeTo(log); } catch (StoreException e) { throw new IllegalStateException(e); } - messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), false, true, info.getExpirationTimeInMs(), - info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); + messageInfos.add( + new MessageInfo(info.getStoreKey(), info.getSize(), false, true, false, info.getExpirationTimeInMs(), null, + info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), lifeVersion)); } } @Override public short undelete(MessageInfo info) throws StoreException { - throw new UnsupportedOperationException("Undelete unsupported for now"); + StoreKey key = info.getStoreKey(); + MessageInfo deleteInfo = getMessageInfo(key, messageInfos, true, false, false); + if (info.getLifeVersion() == -1 && deleteInfo == null) { + throw new StoreException("Key " + key + " not delete yet", StoreErrorCodes.ID_Not_Deleted); + } + short lifeVersion = info.getLifeVersion(); + if (info.getLifeVersion() == -1) { + lifeVersion = (short) (deleteInfo.getLifeVersion() + 1); + } + try { + MessageFormatInputStream stream = + new UndeleteMessageFormatInputStream(key, info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), lifeVersion); + // Update info to add stream size; + info = new MessageInfo(key, stream.getSize(), false, deleteInfo.isTtlUpdated(), true, + deleteInfo.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), lifeVersion); + ArrayList infoList = new ArrayList<>(); + infoList.add(info); + MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false); + writeSet.writeTo(log); + return lifeVersion; + } catch (Exception e) { + throw new StoreException("Unknown error while trying to undelete blobs from store", e, + StoreErrorCodes.Unknown_Error); + } } @Override @@ -310,7 +342,7 @@ public StoreStats getStoreStats() { @Override public boolean isKeyDeleted(StoreKey key) throws StoreException { - return getMessageInfo(key, messageInfos, true, false) != null; + return getMessageInfo(key, messageInfos, true, false, false) != null; } @Override diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java b/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java index c9d80368ac..bb944959b9 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java @@ -200,7 +200,7 @@ public void send(Send request) { // If MsgInfo says it is deleted, get the original Put Message's MessageInfo as that is what Get Request // looks for. Just set the deleted flag to true for the constructed MessageInfo from Put. if (infoFound.isDeleted()) { - MessageInfo putMsgInfo = getMessageInfo(infoFound.getStoreKey(), messageInfoList, false, false); + MessageInfo putMsgInfo = getMessageInfo(infoFound.getStoreKey(), messageInfoList, false, false, false); infoFound = new MessageInfo(putMsgInfo.getStoreKey(), putMsgInfo.getSize(), true, false, putMsgInfo.getExpirationTimeInMs(), putMsgInfo.getAccountId(), putMsgInfo.getContainerId(), putMsgInfo.getOperationTimeMs()); diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java index e697ffcc56..9a59506080 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java @@ -1237,7 +1237,7 @@ public void ttlUpdateReplicationTest() throws Exception { StoreKey remoteId = remoteInfo.getStoreKey(); if (seen.add(remoteId)) { StoreKey localId = storeKeyConverter.convert(Collections.singleton(remoteId)).get(remoteId); - MessageInfo localInfo = getMessageInfo(localId, localInfos, false, false); + MessageInfo localInfo = getMessageInfo(localId, localInfos, false, false, false); if (localId == null) { // this is a deprecated ID. There should be no messages locally assertNull(remoteId + " is deprecated and should have no entries", localInfo); @@ -1593,7 +1593,7 @@ public void replicaThreadTest() throws Exception { // ensure that the first key is not deleted in the local host assertNull(toDeleteId + " should not be deleted in the local host", - getMessageInfo(toDeleteId, localHost.infosByPartition.get(partitionId), true, false)); + getMessageInfo(toDeleteId, localHost.infosByPartition.get(partitionId), true, false, false)); } StoreKeyFactory storeKeyFactory = new BlobIdFactory(clusterMap); @@ -2119,7 +2119,7 @@ private void verifyNoMoreMissingKeysAndExpectedMissingBufferCount(MockHost remot // test that the first key has been marked deleted List messageInfos = localHost.infosByPartition.get(entry.getKey()); StoreKey deletedId = messageInfos.get(0).getStoreKey(); - assertNotNull(deletedId + " should have been deleted", getMessageInfo(deletedId, messageInfos, true, false)); + assertNotNull(deletedId + " should have been deleted", getMessageInfo(deletedId, messageInfos, true, false, false)); Map ignoreState = new HashMap<>(); for (StoreKey toBeIgnored : idsToBeIgnoredByPartition.get(entry.getKey())) { ignoreState.put(toBeIgnored, false); @@ -2236,7 +2236,7 @@ private void addPutMessagesToReplicasOfPartition(List ids, List hosts) throws MessageFormatException, IOException { - MessageInfo putMsg = getMessageInfo(id, hosts.get(0).infosByPartition.get(partitionId), false, false); + MessageInfo putMsg = getMessageInfo(id, hosts.get(0).infosByPartition.get(partitionId), false, false, false); short aid; short cid; if (putMsg == null) { @@ -2292,7 +2292,7 @@ public static void addPutMessagesToReplicasOfPartition(StoreKey id, short accoun */ public static void addTtlUpdateMessagesToReplicasOfPartition(PartitionId partitionId, StoreKey id, List hosts, long expirationTime) throws MessageFormatException, IOException { - MessageInfo putMsg = getMessageInfo(id, hosts.get(0).infosByPartition.get(partitionId), false, false); + MessageInfo putMsg = getMessageInfo(id, hosts.get(0).infosByPartition.get(partitionId), false, false, false); short aid; short cid; if (putMsg == null) { @@ -2382,10 +2382,11 @@ private static ByteBuffer getTtlUpdateMessage(StoreKey id, short accountId, shor * @param id the {@link StoreKey} to look for. * @param messageInfos the {@link MessageInfo} list. * @param deleteMsg {@code true} if delete msg is requested. {@code false} otherwise + * @param undeleteMsg {@code true} if undelete msg is requested. {@code false} otherwise * @param ttlUpdateMsg {@code true} if ttl update msg is requested. {@code false} otherwise * @return the delete {@link MessageInfo} if it exists in {@code messageInfos}. {@code null otherwise.} */ - static MessageInfo getMessageInfo(StoreKey id, List messageInfos, boolean deleteMsg, + static MessageInfo getMessageInfo(StoreKey id, List messageInfos, boolean deleteMsg, boolean undeleteMsg, boolean ttlUpdateMsg) { MessageInfo toRet = null; for (MessageInfo messageInfo : messageInfos) { @@ -2393,10 +2394,14 @@ static MessageInfo getMessageInfo(StoreKey id, List messageInfos, b if (deleteMsg && messageInfo.isDeleted()) { toRet = messageInfo; break; - } else if (ttlUpdateMsg && messageInfo.isTtlUpdated()) { + } else if (undeleteMsg && messageInfo.isUndeleted()) { toRet = messageInfo; break; - } else if (!deleteMsg && !ttlUpdateMsg) { + } else if (ttlUpdateMsg && !messageInfo.isUndeleted() && !messageInfo.isDeleted() + && messageInfo.isTtlUpdated()) { + toRet = messageInfo; + break; + } else if (!deleteMsg && !ttlUpdateMsg && !undeleteMsg) { toRet = messageInfo; break; } @@ -2412,15 +2417,15 @@ static MessageInfo getMessageInfo(StoreKey id, List messageInfos, b * @return a merged {@link MessageInfo} for {@code key} */ static MessageInfo getMergedMessageInfo(StoreKey key, List partitionInfos) { - MessageInfo info = getMessageInfo(key, partitionInfos, true, false); + MessageInfo info = getMessageInfo(key, partitionInfos, true, true, false); if (info == null) { - info = getMessageInfo(key, partitionInfos, false, false); + info = getMessageInfo(key, partitionInfos, false, false, false); } - MessageInfo ttlUpdateInfo = getMessageInfo(key, partitionInfos, false, true); + MessageInfo ttlUpdateInfo = getMessageInfo(key, partitionInfos, false, false, true); if (ttlUpdateInfo != null) { - info = new MessageInfo(info.getStoreKey(), info.getSize(), info.isDeleted(), true, + info = new MessageInfo(info.getStoreKey(), info.getSize(), info.isDeleted(), true, info.isUndeleted(), ttlUpdateInfo.getExpirationTimeInMs(), info.getCrc(), info.getAccountId(), info.getContainerId(), - info.getOperationTimeMs()); + info.getOperationTimeMs(), info.getLifeVersion()); } return info; } diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java index 74cdac1b6f..9c1eac488c 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java @@ -247,6 +247,7 @@ private VerifiableProperties createInitProperties(DataNodeId dataNodeId, boolean props.setProperty("store.validate.authorization", "true"); props.setProperty("kms.default.container.key", TestUtils.getRandomKey(32)); props.setProperty("server.enable.store.data.prefetch", Boolean.toString(enableDataPrefetch)); + props.setProperty("server.handle.undelete.request.enabled", "true"); props.setProperty("replication.intra.replica.thread.throttle.sleep.duration.ms", "100"); props.setProperty("replication.inter.replica.thread.throttle.sleep.duration.ms", "100"); props.putAll(sslProperties); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java index 6596899509..6bd8385594 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java @@ -76,6 +76,8 @@ import com.github.ambry.protocol.PutResponse; import com.github.ambry.protocol.TtlUpdateRequest; import com.github.ambry.protocol.TtlUpdateResponse; +import com.github.ambry.protocol.UndeleteRequest; +import com.github.ambry.protocol.UndeleteResponse; import com.github.ambry.replication.FindTokenFactory; import com.github.ambry.router.Callback; import com.github.ambry.router.GetBlobOptionsBuilder; @@ -494,6 +496,13 @@ static void endToEndTest(Port targetPort, String routerDatacenter, MockCluster c actualBlobData = getBlobDataAndRelease(blobAll.getBlobData()); Assert.assertArrayEquals("Content mismatch", data, actualBlobData); + // undelete a not-deleted blob should return fail + UndeleteRequest undeleteRequest = new UndeleteRequest(1, "undeleteClient", blobId1, System.currentTimeMillis()); + channel.send(undeleteRequest); + stream = channel.receive().getInputStream(); + UndeleteResponse undeleteResponse = UndeleteResponse.readFrom(new DataInputStream(stream)); + assertEquals("Undelete blob should succeed", ServerErrorCode.Blob_Not_Deleted, undeleteResponse.getError()); + // delete a blob on a restarted store , which should succeed deleteRequest = new DeleteRequest(1, "deleteClient", blobId1, System.currentTimeMillis()); channel.send(deleteRequest); @@ -502,6 +511,31 @@ static void endToEndTest(Port targetPort, String routerDatacenter, MockCluster c assertEquals("Delete blob on restarted store should succeed", ServerErrorCode.No_Error, deleteResponse.getError()); + // undelete a deleted blob, which should succeed + undeleteRequest = new UndeleteRequest(2, "undeleteClient", blobId1, System.currentTimeMillis()); + channel.send(undeleteRequest); + stream = channel.receive().getInputStream(); + undeleteResponse = UndeleteResponse.readFrom(new DataInputStream(stream)); + assertEquals("Undelete blob should succeed", ServerErrorCode.No_Error, undeleteResponse.getError()); + assertEquals("Undelete life version mismatch", undeleteResponse.getLifeVersion(), (short) 1); + + // undelete an already undeleted blob, which should fail + undeleteRequest = new UndeleteRequest(3, "undeleteClient", blobId1, System.currentTimeMillis()); + channel.send(undeleteRequest); + stream = channel.receive().getInputStream(); + undeleteResponse = UndeleteResponse.readFrom(new DataInputStream(stream)); + assertEquals("Undelete blob should fail", ServerErrorCode.Blob_Already_Undeleted, undeleteResponse.getError()); + + // get an undeleted blob, which should succeed + getRequest1 = new GetRequest(1, "clientid1", MessageFormatFlags.All, partitionRequestInfoList, GetOption.None); + channel.send(getRequest1); + stream = channel.receive().getInputStream(); + resp1 = GetResponse.readFrom(new DataInputStream(stream), clusterMap); + responseStream = resp1.getInputStream(); + blobAll = MessageFormatRecord.deserializeBlobAll(responseStream, blobIdFactory); + actualBlobData = getBlobDataAndRelease(blobAll.getBlobData()); + Assert.assertArrayEquals("Content mismatch", data, actualBlobData); + // Bounce servers to make them read the persisted token file. cluster.stopServers(); cluster.startServers(); diff --git a/ambry-server/src/main/java/com.github.ambry.server/AmbryServerRequests.java b/ambry-server/src/main/java/com.github.ambry.server/AmbryServerRequests.java index 9b878d4a75..487502d6be 100644 --- a/ambry-server/src/main/java/com.github.ambry.server/AmbryServerRequests.java +++ b/ambry-server/src/main/java/com.github.ambry.server/AmbryServerRequests.java @@ -74,11 +74,12 @@ public class AmbryServerRequests extends AmbryRequests { private final ConcurrentHashMap localPartitionToReplicaMap; // POST requests are allowed on stores states: { LEADER, STANDBY } static final Set PUT_ALLOWED_STORE_STATES = EnumSet.of(ReplicaState.LEADER, ReplicaState.STANDBY); - // UPDATE requests (including DELETE, TTLUpdate) are allowed on stores states: { LEADER, STANDBY, INACTIVE, BOOTSTRAP } + // UPDATE requests (including DELETE, TTLUpdate, UNDELETE) are allowed on stores states: { LEADER, STANDBY, INACTIVE, BOOTSTRAP } static final Set UPDATE_ALLOWED_STORE_STATES = EnumSet.of(ReplicaState.LEADER, ReplicaState.STANDBY, ReplicaState.INACTIVE, ReplicaState.BOOTSTRAP); static final Set UPDATE_REQUEST_TYPES = - EnumSet.of(RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest); + EnumSet.of(RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest, + RequestOrResponseType.UndeleteResponse); private static final Logger logger = LoggerFactory.getLogger(AmbryServerRequests.class); @@ -93,7 +94,7 @@ public class AmbryServerRequests extends AmbryRequests { this.statsManager = statsManager; for (RequestOrResponseType requestType : EnumSet.of(RequestOrResponseType.PutRequest, - RequestOrResponseType.GetRequest, RequestOrResponseType.DeleteRequest, + RequestOrResponseType.GetRequest, RequestOrResponseType.DeleteRequest, RequestOrResponseType.UndeleteRequest, RequestOrResponseType.ReplicaMetadataRequest, RequestOrResponseType.TtlUpdateRequest)) { requestsDisableInfo.put(requestType, Collections.newSetFromMap(new ConcurrentHashMap<>())); } @@ -121,6 +122,9 @@ protected ConcurrentHashMap createLocalPartitionToReplic * @return {@code true} if the request is enabled. {@code false} otherwise. */ private boolean isRequestEnabled(RequestOrResponseType requestType, PartitionId id) { + if (requestType.equals(RequestOrResponseType.UndeleteRequest) && !serverConfig.serverHandleUndeleteRequestEnabled) { + return false; + } Set requestDisableInfo = requestsDisableInfo.get(requestType); // 1. check if request is disabled by admin request if (requestDisableInfo != null && requestDisableInfo.contains(id)) { diff --git a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java index bd4ecbb39c..2f55dbaf44 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java @@ -67,6 +67,7 @@ import com.github.ambry.protocol.RequestOrResponseType; import com.github.ambry.protocol.Response; import com.github.ambry.protocol.TtlUpdateRequest; +import com.github.ambry.protocol.UndeleteRequest; import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.replication.MockFindTokenHelper; import com.github.ambry.replication.MockReplicationManager; @@ -127,7 +128,6 @@ public class AmbryServerRequestsTest { private final MockStorageManager storageManager; private final MockReplicationManager replicationManager; private final MockStatsManager statsManager; - private final AmbryServerRequests ambryRequests; private final MockRequestResponseChannel requestResponseChannel = new MockRequestResponseChannel(); private final Set validKeysInStore = new HashSet<>(); private final Map conversionMap = new HashMap<>(); @@ -137,6 +137,7 @@ public class AmbryServerRequestsTest { private final ReplicaStatusDelegate mockDelegate = Mockito.mock(ReplicaStatusDelegate.class); private final boolean putRequestShareMemory; private final boolean validateRequestOnStoreState; + private AmbryServerRequests ambryRequests; @Parameterized.Parameters public static List data() { @@ -148,15 +149,7 @@ public AmbryServerRequestsTest(boolean putRequestShareMemory, boolean validateRe this.putRequestShareMemory = putRequestShareMemory; this.validateRequestOnStoreState = validateRequestOnStoreState; clusterMap = new MockClusterMap(); - Properties properties = new Properties(); - properties.setProperty("clustermap.cluster.name", "test"); - properties.setProperty("clustermap.datacenter.name", "DC1"); - properties.setProperty("clustermap.host.name", "localhost"); - properties.setProperty("replication.token.factory", "com.github.ambry.store.StoreFindTokenFactory"); - properties.setProperty("replication.no.of.intra.dc.replica.threads", "1"); - properties.setProperty("replication.no.of.inter.dc.replica.threads", "1"); - properties.setProperty("server.validate.request.based.on.store.state", - Boolean.toString(validateRequestOnStoreState)); + Properties properties = createProperties(validateRequestOnStoreState, true); VerifiableProperties verifiableProperties = new VerifiableProperties(properties); replicationConfig = new ReplicationConfig(verifiableProperties); serverConfig = new ServerConfig(verifiableProperties); @@ -183,6 +176,21 @@ public AmbryServerRequestsTest(boolean putRequestShareMemory, boolean validateRe Mockito.when(mockDelegate.unmarkStopped(anyList())).thenReturn(true); } + private static Properties createProperties(boolean validateRequestOnStoreState, + boolean handleUndeleteRequestEnabled) { + Properties properties = new Properties(); + properties.setProperty("clustermap.cluster.name", "test"); + properties.setProperty("clustermap.datacenter.name", "DC1"); + properties.setProperty("clustermap.host.name", "localhost"); + properties.setProperty("replication.token.factory", "com.github.ambry.store.StoreFindTokenFactory"); + properties.setProperty("replication.no.of.intra.dc.replica.threads", "1"); + properties.setProperty("replication.no.of.inter.dc.replica.threads", "1"); + properties.setProperty("server.validate.request.based.on.store.state", + Boolean.toString(validateRequestOnStoreState)); + properties.setProperty("server.handle.undelete.request.enabled", Boolean.toString(handleUndeleteRequestEnabled)); + return properties; + } + /** * Close the storageManager created. */ @@ -210,7 +218,8 @@ public void validateRequestsTest() { } for (RequestOrResponseType request : EnumSet.of(RequestOrResponseType.PutRequest, RequestOrResponseType.GetRequest, - RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest)) { + RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest, + RequestOrResponseType.UndeleteRequest)) { for (Map.Entry entry : stateToReplica.entrySet()) { if (request == RequestOrResponseType.PutRequest) { // for PUT request, it is not allowed on OFFLINE,BOOTSTRAP and INACTIVE when validateRequestOnStoreState = true @@ -223,7 +232,7 @@ public void validateRequestsTest() { ambryRequests.validateRequest(entry.getValue().getPartitionId(), request, false)); } } else if (AmbryServerRequests.UPDATE_REQUEST_TYPES.contains(request)) { - // for DELETE/TTL Update request, they are not allowed on OFFLINE,BOOTSTRAP and INACTIVE when validateRequestOnStoreState = true + // for UNDELETE/DELETE/TTL Update request, they are not allowed on OFFLINE,BOOTSTRAP and INACTIVE when validateRequestOnStoreState = true if (AmbryServerRequests.UPDATE_ALLOWED_STORE_STATES.contains(entry.getKey())) { assertEquals("Error code is not expected for DELETE/TTL Update", ServerErrorCode.No_Error, ambryRequests.validateRequest(entry.getValue().getPartitionId(), request, false)); @@ -313,7 +322,8 @@ public void scheduleCompactionFailureTest() throws InterruptedException, IOExcep public void controlRequestSuccessTest() throws InterruptedException, IOException { RequestOrResponseType[] requestOrResponseTypes = {RequestOrResponseType.PutRequest, RequestOrResponseType.DeleteRequest, RequestOrResponseType.GetRequest, - RequestOrResponseType.ReplicaMetadataRequest, RequestOrResponseType.TtlUpdateRequest}; + RequestOrResponseType.ReplicaMetadataRequest, RequestOrResponseType.TtlUpdateRequest, + RequestOrResponseType.UndeleteRequest}; for (RequestOrResponseType requestType : requestOrResponseTypes) { List partitionIds = clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS); for (PartitionId id : partitionIds) { @@ -776,6 +786,64 @@ public void ttlUpdateTest() throws InterruptedException, IOException, MessageFor miscTtlUpdateFailuresTest(); } + /** + * Tests for server config to enable and disable UNDELETE + */ + @Test + public void undeleteEnableDisableTest() throws Exception { + Properties properties = createProperties(validateRequestOnStoreState, false); + VerifiableProperties verifiableProperties = new VerifiableProperties(properties); + ServerConfig serverConfig = new ServerConfig(verifiableProperties); + ServerMetrics serverMetrics = + new ServerMetrics(clusterMap.getMetricRegistry(), AmbryRequests.class, AmbryServer.class); + AmbryServerRequests other = new AmbryServerRequests(storageManager, requestResponseChannel, clusterMap, dataNodeId, + clusterMap.getMetricRegistry(), serverMetrics, findTokenHelper, null, replicationManager, null, serverConfig, + storeKeyConverterFactory, statsManager); + + AmbryServerRequests temp = ambryRequests; + ambryRequests = other; + try { + MockPartitionId id = (MockPartitionId) clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS).get(0); + int correlationId = TestUtils.RANDOM.nextInt(); + String clientId = UtilsTest.getRandomString(10); + BlobId blobId = new BlobId(CommonTestUtils.getCurrentBlobIdVersion(), BlobId.BlobIdType.NATIVE, + ClusterMapUtils.UNKNOWN_DATACENTER_ID, Utils.getRandomShort(TestUtils.RANDOM), + Utils.getRandomShort(TestUtils.RANDOM), id, false, BlobId.BlobDataType.DATACHUNK); + long opTimeMs = SystemTime.getInstance().milliseconds(); + doUndelete(correlationId++, clientId, blobId, opTimeMs, ServerErrorCode.Temporarily_Disabled); + } finally { + ambryRequests = temp; + } + } + + /** + * Tests for success and failure scenarios for UNDELETE + */ + @Test + public void undeleteTest() throws Exception { + MockPartitionId id = (MockPartitionId) clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS).get(0); + int correlationId = TestUtils.RANDOM.nextInt(); + String clientId = UtilsTest.getRandomString(10); + BlobId blobId = new BlobId(CommonTestUtils.getCurrentBlobIdVersion(), BlobId.BlobIdType.NATIVE, + ClusterMapUtils.UNKNOWN_DATACENTER_ID, Utils.getRandomShort(TestUtils.RANDOM), + Utils.getRandomShort(TestUtils.RANDOM), id, false, BlobId.BlobDataType.DATACHUNK); + long opTimeMs = SystemTime.getInstance().milliseconds(); + + // since we already test store key conversion in ttlupdate, we don't test it again in this method. + // storekey not valid for store + doUndelete(correlationId++, clientId, blobId, opTimeMs, ServerErrorCode.Blob_Not_Found); + // valid now + validKeysInStore.add(blobId); + doUndelete(correlationId++, clientId, blobId, opTimeMs, ServerErrorCode.No_Error); + + // READ_ONLY is fine too + changePartitionState(id, true); + doUndelete(correlationId++, clientId, blobId, opTimeMs, ServerErrorCode.No_Error); + changePartitionState(id, false); + + miscUndeleteFailuresTest(); + } + // helpers // general @@ -824,7 +892,8 @@ private void sendAndVerifyOperationRequest(RequestOrResponse request, ServerErro Response response = sendRequestGetResponse(request, EnumSet.of(RequestOrResponseType.GetRequest, RequestOrResponseType.ReplicaMetadataRequest).contains(requestType) ? ServerErrorCode.No_Error : expectedErrorCode); - if (expectedErrorCode.equals(ServerErrorCode.No_Error) || forceCheckOpReceived) { + if (expectedErrorCode.equals(ServerErrorCode.No_Error) || ( + forceCheckOpReceived && !expectedErrorCode.equals(ServerErrorCode.Temporarily_Disabled))) { assertEquals("Operation received at the store not as expected", requestType, MockStorageManager.operationReceived); } @@ -1037,6 +1106,10 @@ private void sendAndVerifyOperationRequest(RequestOrResponseType requestType, Li case DeleteRequest: request = new DeleteRequest(correlationId, clientId, originalBlobId, SystemTime.getInstance().milliseconds()); break; + case UndeleteRequest: + request = + new UndeleteRequest(correlationId, clientId, originalBlobId, SystemTime.getInstance().milliseconds()); + break; case GetRequest: PartitionRequestInfo pRequestInfo = new PartitionRequestInfo(id, Collections.singletonList(originalBlobId)); request = @@ -1235,6 +1308,25 @@ private void doTtlUpdate(int correlationId, String clientId, BlobId blobId, long } } + /** + * Does a UNDELETE and checks for success if {@code expectedErrorCode} is {@link ServerErrorCode#No_Error}. Else, + * checks for failure with the code {@code expectedErrorCode}. + * @param correlationId the correlation ID to use in the request + * @param clientId the client ID to use in the request + * @param blobId the blob ID to use in the request + * @param opTimeMs the operation time (ms) to use in the request + * @param expectedErrorCode the expected {@link ServerErrorCode} + * @throws Exception + */ + private void doUndelete(int correlationId, String clientId, BlobId blobId, long opTimeMs, + ServerErrorCode expectedErrorCode) throws Exception { + UndeleteRequest request = new UndeleteRequest(correlationId, clientId, blobId, opTimeMs); + sendAndVerifyOperationRequest(request, expectedErrorCode, true); + if (expectedErrorCode == ServerErrorCode.No_Error) { + verifyUndelete(request.getBlobId(), opTimeMs, MockStorageManager.messageWriteSetReceived); + } + } + /** * Exercises various failure paths for TTL updates * @throws InterruptedException @@ -1271,6 +1363,41 @@ private void miscTtlUpdateFailuresTest() throws InterruptedException, IOExceptio // request disabled is checked in request control tests } + /** + * Exercises various failure paths for UNDELETEs + * @throws Exception + */ + private void miscUndeleteFailuresTest() throws Exception { + PartitionId id = clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS).get(0); + // store exceptions + for (StoreErrorCodes code : StoreErrorCodes.values()) { + MockStorageManager.storeException = new StoreException("expected", code); + ServerErrorCode expectedErrorCode = ErrorMapping.getStoreErrorMapping(code); + sendAndVerifyOperationRequest(RequestOrResponseType.UndeleteRequest, Collections.singletonList(id), + expectedErrorCode, true); + MockStorageManager.storeException = null; + } + // runtime exception + MockStorageManager.runtimeException = new RuntimeException("expected"); + sendAndVerifyOperationRequest(RequestOrResponseType.UndeleteRequest, Collections.singletonList(id), + ServerErrorCode.Unknown_Error, true); + MockStorageManager.runtimeException = null; + // store is not started/is stopped/otherwise unavailable - Replica_Unavailable + storageManager.returnNullStore = true; + sendAndVerifyOperationRequest(RequestOrResponseType.UndeleteRequest, Collections.singletonList(id), + ServerErrorCode.Replica_Unavailable, false); + storageManager.returnNullStore = false; + // PartitionUnknown is hard to simulate without betraying knowledge of the internals of MockClusterMap. + + // disk down + ReplicaId replicaId = findReplica(id); + clusterMap.onReplicaEvent(replicaId, ReplicaEventType.Disk_Error); + sendAndVerifyOperationRequest(RequestOrResponseType.UndeleteRequest, Collections.singletonList(id), + ServerErrorCode.Disk_Unavailable, false); + clusterMap.onReplicaEvent(replicaId, ReplicaEventType.Disk_Ok); + // request disabled is checked in request control tests + } + /** * Verifies that the TTL update request was delivered to the {@link Store} correctly. * @param blobId the {@link BlobId} that was updated @@ -1302,6 +1429,35 @@ private void verifyTtlUpdate(BlobId blobId, long expiresAtMs, long opTimeMs, Mes key.getContainerId(), opTimeMs, (short) 0); } + /** + * Verifies that the UNDELETE request was delivered to the {@link Store} correctly. + * @param blobId the {@link BlobId} that was updated + * @param opTimeMs the op time (in ms) + * @param messageWriteSet the {@link MessageWriteSet} received at the {@link Store} + * @throws IOException + * @throws MessageFormatException + * @throws StoreException + */ + private void verifyUndelete(BlobId blobId, long opTimeMs, MessageWriteSet messageWriteSet) throws Exception { + BlobId key = (BlobId) conversionMap.getOrDefault(blobId, blobId); + assertEquals("There should be one message in the write set", 1, messageWriteSet.getMessageSetInfo().size()); + MessageInfo info = messageWriteSet.getMessageSetInfo().get(0); + + // verify stream + ByteBuffer record = getDataInWriteSet(messageWriteSet, (int) info.getSize()); + int expectedSize = record.remaining(); + InputStream stream = new ByteBufferInputStream(record); + + // verify stream + MessageFormatInputStreamTest.checkUndeleteMessage(stream, null, key, key.getAccountId(), key.getContainerId(), + opTimeMs, storageManager.returnValueOfUndelete); + + // verify MessageInfo + // since the record has been verified, the buffer size before verification is a good indicator of size + MessageInfoTest.checkGetters(info, key, expectedSize, false, false, true, Utils.Infinite_Time, null, + key.getAccountId(), key.getContainerId(), opTimeMs, storageManager.returnValueOfUndelete); + } + /** * Implementation of {@link NetworkRequest} to help with tests. */ diff --git a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java index dc6200f72d..b91fa10a31 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java +++ b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java @@ -152,8 +152,8 @@ public short undelete(MessageInfo info) throws StoreException { new UndeleteMessageFormatInputStream(info.getStoreKey(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), (short) returnValueOfUndelete); // Update info to add stream size; - info = new MessageInfo(info.getStoreKey(), stream.getSize(), info.getAccountId(), info.getContainerId(), - info.getOperationTimeMs()); + info = new MessageInfo(info.getStoreKey(), stream.getSize(), false, false, true, Utils.Infinite_Time, null, + info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), returnValueOfUndelete); ArrayList infoList = new ArrayList<>(); infoList.add(info); messageWriteSetReceived = new MessageFormatWriteSet(stream, infoList, false);