diff --git a/ambry-api/src/main/java/com.github.ambry/config/ServerConfig.java b/ambry-api/src/main/java/com.github.ambry/config/ServerConfig.java index 3712187440..9f8c4bfc7a 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/ServerConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/ServerConfig.java @@ -93,6 +93,13 @@ public class ServerConfig { @Default("false") public final boolean serverValidateRequestBasedOnStoreState; + /** + * True to enable ambry server handling undelete requests. + */ + @Config("server.handle.undelete.request.enabled") + @Default("false") + public final boolean serverHandleUndeleteRequestEnabled; + public ServerConfig(VerifiableProperties verifiableProperties) { serverRequestHandlerNumOfThreads = verifiableProperties.getInt("server.request.handler.num.of.threads", 7); serverSchedulerNumOfthreads = verifiableProperties.getInt("server.scheduler.num.of.threads", 10); @@ -110,5 +117,7 @@ public ServerConfig(VerifiableProperties verifiableProperties) { Utils.splitString(verifiableProperties.getString("server.stats.reports.to.publish", ""), ","); serverValidateRequestBasedOnStoreState = verifiableProperties.getBoolean("server.validate.request.based.on.store.state", false); + serverHandleUndeleteRequestEnabled = + verifiableProperties.getBoolean("server.handle.undelete.request.enabled", false); } } diff --git a/ambry-api/src/main/java/com.github.ambry/server/ServerErrorCode.java b/ambry-api/src/main/java/com.github.ambry/server/ServerErrorCode.java index fe076cfb79..c3e5f3409b 100644 --- a/ambry-api/src/main/java/com.github.ambry/server/ServerErrorCode.java +++ b/ambry-api/src/main/java/com.github.ambry/server/ServerErrorCode.java @@ -37,5 +37,9 @@ public enum ServerErrorCode { Blob_Already_Updated, Blob_Update_Not_Allowed, Replica_Unavailable, - Blob_Authorization_Failure + Blob_Authorization_Failure, + Blob_Life_Version_Conflict, + Blob_Not_Deleted, + Blob_Already_Undeleted, + Blob_Deleted_Permanently } diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/ErrorMapping.java b/ambry-commons/src/main/java/com.github.ambry.commons/ErrorMapping.java index 5c32f4cfdc..5b10e91cf3 100644 --- a/ambry-commons/src/main/java/com.github.ambry.commons/ErrorMapping.java +++ b/ambry-commons/src/main/java/com.github.ambry.commons/ErrorMapping.java @@ -38,6 +38,10 @@ public class ErrorMapping { tempMap.put(StoreErrorCodes.Authorization_Failure, ServerErrorCode.Blob_Authorization_Failure); tempMap.put(StoreErrorCodes.Already_Updated, ServerErrorCode.Blob_Already_Updated); tempMap.put(StoreErrorCodes.Update_Not_Allowed, ServerErrorCode.Blob_Update_Not_Allowed); + tempMap.put(StoreErrorCodes.Life_Version_Conflict, ServerErrorCode.Blob_Life_Version_Conflict); + tempMap.put(StoreErrorCodes.ID_Not_Deleted, ServerErrorCode.Blob_Not_Deleted); + tempMap.put(StoreErrorCodes.ID_Undeleted, ServerErrorCode.Blob_Already_Undeleted); + tempMap.put(StoreErrorCodes.ID_Deleted_Permanently, ServerErrorCode.Blob_Deleted_Permanently); storeErrorMapping = Collections.unmodifiableMap(tempMap); } diff --git a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java index d61b29e0ae..f450bf6495 100644 --- a/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java +++ b/ambry-commons/src/main/java/com.github.ambry.commons/ServerMetrics.java @@ -100,6 +100,12 @@ public class ServerMetrics { public final Histogram deleteBlobSendTimeInMs; public final Histogram deleteBlobTotalTimeInMs; + public final Histogram undeleteBlobRequestQueueTimeInMs; + public final Histogram undeleteBlobProcessingTimeInMs; + public final Histogram undeleteBlobResponseQueueTimeInMs; + public final Histogram undeleteBlobSendTimeInMs; + public final Histogram undeleteBlobTotalTimeInMs; + public final Histogram updateBlobTtlRequestQueueTimeInMs; public final Histogram updateBlobTtlProcessingTimeInMs; public final Histogram updateBlobTtlResponseQueueTimeInMs; @@ -157,6 +163,7 @@ public class ServerMetrics { public final Meter getBlobAllByReplicaRequestRate; public final Meter getBlobInfoRequestRate; public final Meter deleteBlobRequestRate; + public final Meter undeleteBlobRequestRate; public final Meter updateBlobTtlRequestRate; public final Meter replicaMetadataRequestRate; public final Meter triggerCompactionRequestRate; @@ -183,6 +190,7 @@ public class ServerMetrics { public final Counter unExpectedStoreGetError; public final Counter unExpectedStoreTtlUpdateError; public final Counter unExpectedStoreDeleteError; + public final Counter unExpectedStoreUndeleteError; public final Counter unExpectedAdminOperationError; public final Counter unExpectedStoreFindEntriesError; public final Counter idAlreadyExistError; @@ -190,11 +198,15 @@ public class ServerMetrics { public final Counter unknownFormatError; public final Counter idNotFoundError; public final Counter idDeletedError; + public final Counter idUndeletedError; + public final Counter idNotDeletedError; + public final Counter lifeVersionConflictError; public final Counter ttlExpiredError; public final Counter badRequestError; public final Counter temporarilyDisabledError; public final Counter getAuthorizationFailure; public final Counter deleteAuthorizationFailure; + public final Counter undeleteAuthorizationFailure; public final Counter ttlUpdateAuthorizationFailure; public final Counter ttlAlreadyUpdatedError; public final Counter ttlUpdateRejectedError; @@ -303,6 +315,15 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime")); deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime")); + undeleteBlobRequestQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime")); + undeleteBlobProcessingTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobProcessingTime")); + undeleteBlobResponseQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobResponseQueueTime")); + undeleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobSendTime")); + undeleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobTotalTime")); + updateBlobTtlRequestQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestQueueTime")); updateBlobTtlProcessingTimeInMs = @@ -399,6 +420,7 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se registry.meter(MetricRegistry.name(requestClass, "GetBlobAllByReplicaRequestRate")); getBlobInfoRequestRate = registry.meter(MetricRegistry.name(requestClass, "GetBlobInfoRequestRate")); deleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "DeleteBlobRequestRate")); + undeleteBlobRequestRate = registry.meter(MetricRegistry.name(requestClass, "UndeleteBlobRequestRate")); updateBlobTtlRequestRate = registry.meter(MetricRegistry.name(requestClass, "UpdateBlobTtlRequestRate")); replicaMetadataRequestRate = registry.meter(MetricRegistry.name(requestClass, "ReplicaMetadataRequestRate")); triggerCompactionRequestRate = registry.meter(MetricRegistry.name(requestClass, "TriggerCompactionRequestRate")); @@ -426,12 +448,16 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se unknownFormatError = registry.counter(MetricRegistry.name(requestClass, "UnknownFormatError")); idNotFoundError = registry.counter(MetricRegistry.name(requestClass, "IDNotFoundError")); idDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDDeletedError")); + idUndeletedError = registry.counter(MetricRegistry.name(requestClass, "IDUndeletedError")); + idNotDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDNotDeletedError")); + lifeVersionConflictError = registry.counter(MetricRegistry.name(requestClass, "lifeVersionConflictError")); ttlExpiredError = registry.counter(MetricRegistry.name(requestClass, "TTLExpiredError")); temporarilyDisabledError = registry.counter(MetricRegistry.name(requestClass, "TemporarilyDisabledError")); badRequestError = registry.counter(MetricRegistry.name(requestClass, "BadRequestError")); unExpectedStorePutError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStorePutError")); unExpectedStoreGetError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreGetError")); unExpectedStoreDeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreDeleteError")); + unExpectedStoreUndeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreUndeleteError")); unExpectedAdminOperationError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedAdminOperationError")); unExpectedStoreTtlUpdateError = @@ -440,6 +466,7 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreFindEntriesError")); getAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "GetAuthorizationFailure")); deleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "DeleteAuthorizationFailure")); + undeleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "UndeleteAuthorizationFailure")); ttlUpdateAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "TtlUpdateAuthorizationFailure")); ttlAlreadyUpdatedError = registry.counter(MetricRegistry.name(requestClass, "TtlAlreadyUpdatedError")); diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java index 3ddc142c82..0c4d8e8fa4 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java @@ -505,7 +505,7 @@ public void messageFormatUndeleteUpdateRecordTest() throws Exception { * @param updateTimeMs the expected time of update * @throws Exception any error. */ - private static void checkUndeleteMessage(InputStream stream, Long expectedRecordSize, StoreKey key, short accountId, + public static void checkUndeleteMessage(InputStream stream, Long expectedRecordSize, StoreKey key, short accountId, short containerId, long updateTimeMs, short lifeVersion) throws Exception { checkHeaderAndStoreKeyForUpdate(stream, expectedRecordSize, key, lifeVersion); checkUndeleteSubRecord(stream, accountId, containerId, updateTimeMs); diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java index a1d8483cbc..dd49afce3e 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/AmbryRequests.java @@ -135,6 +135,9 @@ public void handleRequests(NetworkRequest request) throws InterruptedException { case AdminRequest: handleAdminRequest(request); break; + case UndeleteRequest: + handleUndeleteRequest(request); + break; default: throw new UnsupportedOperationException("Request type not supported"); } @@ -641,9 +644,84 @@ public void handleReplicaMetadataRequest(NetworkRequest request) throws IOExcept metrics.replicaMetadataSendTimeInMs, metrics.replicaMetadataTotalTimeInMs, null, null, totalTimeSpent)); } - private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, NetworkRequest request, - Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent, - long blobSize, ServerMetrics metrics) throws InterruptedException { + @Override + public void handleUndeleteRequest(NetworkRequest request) throws IOException, InterruptedException { + UndeleteRequest undeleteRequest = + UndeleteRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs(); + long totalTimeSpent = requestQueueTime; + metrics.undeleteBlobRequestQueueTimeInMs.update(requestQueueTime); + metrics.undeleteBlobRequestRate.mark(); + long startTime = SystemTime.getInstance().milliseconds(); + UndeleteResponse response = null; + try { + StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(undeleteRequest.getBlobId())).get(0); + ServerErrorCode error = + validateRequest(undeleteRequest.getBlobId().getPartition(), RequestOrResponseType.UndeleteRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating undelete request failed with error {} for request {}", error, undeleteRequest); + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), error); + } else { + BlobId convertedBlobId = (BlobId) convertedStoreKey; + MessageInfo info = + new MessageInfo(convertedBlobId, 0, convertedBlobId.getAccountId(), convertedBlobId.getContainerId(), + undeleteRequest.getOperationTimeMs()); + Store storeToUndelete = storeManager.getStore(undeleteRequest.getBlobId().getPartition()); + short lifeVersion = storeToUndelete.undelete(info); + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), lifeVersion); + if (notification != null) { + notification.onBlobReplicaUndeleted(currentNode.getHostname(), currentNode.getPort(), + convertedStoreKey.getID(), BlobReplicaSourceType.PRIMARY); + } + } + } catch (StoreException e) { + boolean logInErrorLevel = false; + if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) { + metrics.idNotFoundError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.TTL_Expired) { + metrics.ttlExpiredError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted_Permanently) { + metrics.idDeletedError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.Life_Version_Conflict) { + metrics.lifeVersionConflictError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.ID_Not_Deleted) { + metrics.idNotDeletedError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.ID_Undeleted) { + metrics.idUndeletedError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) { + metrics.undeleteAuthorizationFailure.inc(); + } else { + logInErrorLevel = true; + metrics.unExpectedStoreUndeleteError.inc(); + } + if (logInErrorLevel) { + logger.error("Store exception on a undelete with error code {} for request {}", e.getErrorCode(), + undeleteRequest, e); + } else { + logger.trace("Store exception on a undelete with error code {} for request {}", e.getErrorCode(), + undeleteRequest, e); + } + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(e.getErrorCode())); + } catch (Exception e) { + logger.error("Unknown exception for undelete request " + undeleteRequest, e); + response = new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + ServerErrorCode.Unknown_Error); + metrics.unExpectedStoreUndeleteError.inc(); + } finally { + long processingTime = SystemTime.getInstance().milliseconds() - startTime; + totalTimeSpent += processingTime; + publicAccessLogger.info("{} {} processingTime {}", undeleteRequest, response, processingTime); + metrics.undeleteBlobProcessingTimeInMs.update(processingTime); + } + requestResponseChannel.sendResponse(response, request, + new ServerNetworkResponseMetrics(metrics.undeleteBlobResponseQueueTimeInMs, metrics.undeleteBlobSendTimeInMs, + metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent)); + } + + private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutResponse response, + NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, + long totalTimeSpent, long blobSize, ServerMetrics metrics) throws InterruptedException { if (response.getError() == ServerErrorCode.No_Error) { metrics.markPutBlobRequestRateBySize(blobSize); if (blobSize <= ServerMetrics.smallBlob) { @@ -666,9 +744,9 @@ private void sendPutResponse(RequestResponseChannel requestResponseChannel, PutR } } - private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, NetworkRequest request, - Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, long totalTimeSpent, - long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException { + private void sendGetResponse(RequestResponseChannel requestResponseChannel, GetResponse response, + NetworkRequest request, Histogram responseQueueTime, Histogram responseSendTime, Histogram requestTotalTime, + long totalTimeSpent, long blobSize, MessageFormatFlags flags, ServerMetrics metrics) throws InterruptedException { if (blobSize <= ServerMetrics.smallBlob) { if (flags == MessageFormatFlags.Blob || flags == MessageFormatFlags.All) { diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/UndeleteResponse.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/UndeleteResponse.java index 7d448d9753..3522ede36e 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/UndeleteResponse.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/UndeleteResponse.java @@ -13,6 +13,8 @@ */ package com.github.ambry.protocol; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.Utils; import java.io.DataInputStream; @@ -80,9 +82,7 @@ public static UndeleteResponse readFrom(DataInputStream stream) throws IOExcepti } } - @Override - public long writeTo(WritableByteChannel channel) throws IOException { - long written = 0; + private void prepareBuffer() { if (bufferToSend == null) { bufferToSend = ByteBuffer.allocate((int) sizeInBytes()); writeHeader(); @@ -91,12 +91,24 @@ public long writeTo(WritableByteChannel channel) throws IOException { } bufferToSend.flip(); } + } + + @Override + public long writeTo(WritableByteChannel channel) throws IOException { + long written = 0; + prepareBuffer(); if (bufferToSend.remaining() > 0) { written = channel.write(bufferToSend); } return written; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + prepareBuffer(); + channel.write(bufferToSend, callback); + } + @Override public long sizeInBytes() { return super.sizeInBytes() + (long) (getError() == ServerErrorCode.No_Error ? Life_Version_InBytes : 0); diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java index 716347958d..3b94bce0a8 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java @@ -17,6 +17,9 @@ import com.github.ambry.clustermap.ReplicaState; import com.github.ambry.router.AsyncWritableChannel; import com.github.ambry.router.Callback; +import com.github.ambry.messageformat.MessageFormatInputStream; +import com.github.ambry.messageformat.MessageFormatWriteSet; +import com.github.ambry.messageformat.UndeleteMessageFormatInputStream; import com.github.ambry.store.FindInfo; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageReadSet; @@ -205,9 +208,9 @@ public void put(MessageWriteSet messageSetToWrite) throws StoreException { List infos = new ArrayList<>(); for (MessageInfo info : newInfos) { if (info.isTtlUpdated()) { - info = - new MessageInfo(info.getStoreKey(), info.getSize(), info.isDeleted(), false, info.getExpirationTimeInMs(), - info.getCrc(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs()); + info = new MessageInfo(info.getStoreKey(), info.getSize(), info.isDeleted(), false, info.isUndeleted(), + info.getExpirationTimeInMs(), info.getCrc(), info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), (short) 0); } infos.add(info); } @@ -225,36 +228,65 @@ public void delete(MessageWriteSet messageSetToDelete) throws StoreException { } catch (StoreException e) { throw new IllegalStateException(e); } - MessageInfo ttlUpdateInfo = getMessageInfo(info.getStoreKey(), messageInfos, false, true); - messageInfos.add( - new MessageInfo(info.getStoreKey(), info.getSize(), true, ttlUpdateInfo != null, info.getExpirationTimeInMs(), - info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); + messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), true, info.isTtlUpdated(), false, + info.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), + info.getLifeVersion())); } } @Override public void updateTtl(MessageWriteSet messageSetToUpdate) throws StoreException { for (MessageInfo info : messageSetToUpdate.getMessageSetInfo()) { - if (getMessageInfo(info.getStoreKey(), messageInfos, true, false) != null) { + if (getMessageInfo(info.getStoreKey(), messageInfos, true, false, false) != null) { throw new StoreException("Deleted", StoreErrorCodes.ID_Deleted); - } else if (getMessageInfo(info.getStoreKey(), messageInfos, false, true) != null) { + } else if (getMessageInfo(info.getStoreKey(), messageInfos, false, false, true) != null) { throw new StoreException("Updated already", StoreErrorCodes.Already_Updated); - } else if (getMessageInfo(info.getStoreKey(), messageInfos, false, false) == null) { + } else if (getMessageInfo(info.getStoreKey(), messageInfos, false, false, false) == null) { throw new StoreException("Not Found", StoreErrorCodes.ID_Not_Found); } + short lifeVersion = info.getLifeVersion(); + if (info.getLifeVersion() == 0) { + lifeVersion = getMessageInfo(info.getStoreKey(), messageInfos, false, false, false).getLifeVersion(); + } try { messageSetToUpdate.writeTo(log); } catch (StoreException e) { throw new IllegalStateException(e); } - messageInfos.add(new MessageInfo(info.getStoreKey(), info.getSize(), false, true, info.getExpirationTimeInMs(), - info.getAccountId(), info.getContainerId(), info.getOperationTimeMs())); + messageInfos.add( + new MessageInfo(info.getStoreKey(), info.getSize(), false, true, false, info.getExpirationTimeInMs(), null, + info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), lifeVersion)); } } @Override public short undelete(MessageInfo info) throws StoreException { - throw new UnsupportedOperationException("Undelete unsupported for now"); + StoreKey key = info.getStoreKey(); + MessageInfo deleteInfo = getMessageInfo(key, messageInfos, true, false, false); + if (info.getLifeVersion() == -1 && deleteInfo == null) { + throw new StoreException("Key " + key + " not delete yet", StoreErrorCodes.ID_Not_Deleted); + } + short lifeVersion = info.getLifeVersion(); + if (info.getLifeVersion() == -1) { + lifeVersion = (short) (deleteInfo.getLifeVersion() + 1); + } + try { + MessageFormatInputStream stream = + new UndeleteMessageFormatInputStream(key, info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), lifeVersion); + // Update info to add stream size; + info = new MessageInfo(key, stream.getSize(), false, deleteInfo.isTtlUpdated(), true, + deleteInfo.getExpirationTimeInMs(), null, info.getAccountId(), info.getContainerId(), + info.getOperationTimeMs(), lifeVersion); + ArrayList infoList = new ArrayList<>(); + infoList.add(info); + MessageFormatWriteSet writeSet = new MessageFormatWriteSet(stream, infoList, false); + writeSet.writeTo(log); + return lifeVersion; + } catch (Exception e) { + throw new StoreException("Unknown error while trying to undelete blobs from store", e, + StoreErrorCodes.Unknown_Error); + } } @Override @@ -310,7 +342,7 @@ public StoreStats getStoreStats() { @Override public boolean isKeyDeleted(StoreKey key) throws StoreException { - return getMessageInfo(key, messageInfos, true, false) != null; + return getMessageInfo(key, messageInfos, true, false, false) != null; } @Override diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java b/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java index c9d80368ac..bb944959b9 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/MockConnectionPool.java @@ -200,7 +200,7 @@ public void send(Send request) { // If MsgInfo says it is deleted, get the original Put Message's MessageInfo as that is what Get Request // looks for. Just set the deleted flag to true for the constructed MessageInfo from Put. if (infoFound.isDeleted()) { - MessageInfo putMsgInfo = getMessageInfo(infoFound.getStoreKey(), messageInfoList, false, false); + MessageInfo putMsgInfo = getMessageInfo(infoFound.getStoreKey(), messageInfoList, false, false, false); infoFound = new MessageInfo(putMsgInfo.getStoreKey(), putMsgInfo.getSize(), true, false, putMsgInfo.getExpirationTimeInMs(), putMsgInfo.getAccountId(), putMsgInfo.getContainerId(), putMsgInfo.getOperationTimeMs()); diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java index e697ffcc56..9a59506080 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java @@ -1237,7 +1237,7 @@ public void ttlUpdateReplicationTest() throws Exception { StoreKey remoteId = remoteInfo.getStoreKey(); if (seen.add(remoteId)) { StoreKey localId = storeKeyConverter.convert(Collections.singleton(remoteId)).get(remoteId); - MessageInfo localInfo = getMessageInfo(localId, localInfos, false, false); + MessageInfo localInfo = getMessageInfo(localId, localInfos, false, false, false); if (localId == null) { // this is a deprecated ID. There should be no messages locally assertNull(remoteId + " is deprecated and should have no entries", localInfo); @@ -1593,7 +1593,7 @@ public void replicaThreadTest() throws Exception { // ensure that the first key is not deleted in the local host assertNull(toDeleteId + " should not be deleted in the local host", - getMessageInfo(toDeleteId, localHost.infosByPartition.get(partitionId), true, false)); + getMessageInfo(toDeleteId, localHost.infosByPartition.get(partitionId), true, false, false)); } StoreKeyFactory storeKeyFactory = new BlobIdFactory(clusterMap); @@ -2119,7 +2119,7 @@ private void verifyNoMoreMissingKeysAndExpectedMissingBufferCount(MockHost remot // test that the first key has been marked deleted List messageInfos = localHost.infosByPartition.get(entry.getKey()); StoreKey deletedId = messageInfos.get(0).getStoreKey(); - assertNotNull(deletedId + " should have been deleted", getMessageInfo(deletedId, messageInfos, true, false)); + assertNotNull(deletedId + " should have been deleted", getMessageInfo(deletedId, messageInfos, true, false, false)); Map ignoreState = new HashMap<>(); for (StoreKey toBeIgnored : idsToBeIgnoredByPartition.get(entry.getKey())) { ignoreState.put(toBeIgnored, false); @@ -2236,7 +2236,7 @@ private void addPutMessagesToReplicasOfPartition(List ids, List hosts) throws MessageFormatException, IOException { - MessageInfo putMsg = getMessageInfo(id, hosts.get(0).infosByPartition.get(partitionId), false, false); + MessageInfo putMsg = getMessageInfo(id, hosts.get(0).infosByPartition.get(partitionId), false, false, false); short aid; short cid; if (putMsg == null) { @@ -2292,7 +2292,7 @@ public static void addPutMessagesToReplicasOfPartition(StoreKey id, short accoun */ public static void addTtlUpdateMessagesToReplicasOfPartition(PartitionId partitionId, StoreKey id, List hosts, long expirationTime) throws MessageFormatException, IOException { - MessageInfo putMsg = getMessageInfo(id, hosts.get(0).infosByPartition.get(partitionId), false, false); + MessageInfo putMsg = getMessageInfo(id, hosts.get(0).infosByPartition.get(partitionId), false, false, false); short aid; short cid; if (putMsg == null) { @@ -2382,10 +2382,11 @@ private static ByteBuffer getTtlUpdateMessage(StoreKey id, short accountId, shor * @param id the {@link StoreKey} to look for. * @param messageInfos the {@link MessageInfo} list. * @param deleteMsg {@code true} if delete msg is requested. {@code false} otherwise + * @param undeleteMsg {@code true} if undelete msg is requested. {@code false} otherwise * @param ttlUpdateMsg {@code true} if ttl update msg is requested. {@code false} otherwise * @return the delete {@link MessageInfo} if it exists in {@code messageInfos}. {@code null otherwise.} */ - static MessageInfo getMessageInfo(StoreKey id, List messageInfos, boolean deleteMsg, + static MessageInfo getMessageInfo(StoreKey id, List messageInfos, boolean deleteMsg, boolean undeleteMsg, boolean ttlUpdateMsg) { MessageInfo toRet = null; for (MessageInfo messageInfo : messageInfos) { @@ -2393,10 +2394,14 @@ static MessageInfo getMessageInfo(StoreKey id, List messageInfos, b if (deleteMsg && messageInfo.isDeleted()) { toRet = messageInfo; break; - } else if (ttlUpdateMsg && messageInfo.isTtlUpdated()) { + } else if (undeleteMsg && messageInfo.isUndeleted()) { toRet = messageInfo; break; - } else if (!deleteMsg && !ttlUpdateMsg) { + } else if (ttlUpdateMsg && !messageInfo.isUndeleted() && !messageInfo.isDeleted() + && messageInfo.isTtlUpdated()) { + toRet = messageInfo; + break; + } else if (!deleteMsg && !ttlUpdateMsg && !undeleteMsg) { toRet = messageInfo; break; } @@ -2412,15 +2417,15 @@ static MessageInfo getMessageInfo(StoreKey id, List messageInfos, b * @return a merged {@link MessageInfo} for {@code key} */ static MessageInfo getMergedMessageInfo(StoreKey key, List partitionInfos) { - MessageInfo info = getMessageInfo(key, partitionInfos, true, false); + MessageInfo info = getMessageInfo(key, partitionInfos, true, true, false); if (info == null) { - info = getMessageInfo(key, partitionInfos, false, false); + info = getMessageInfo(key, partitionInfos, false, false, false); } - MessageInfo ttlUpdateInfo = getMessageInfo(key, partitionInfos, false, true); + MessageInfo ttlUpdateInfo = getMessageInfo(key, partitionInfos, false, false, true); if (ttlUpdateInfo != null) { - info = new MessageInfo(info.getStoreKey(), info.getSize(), info.isDeleted(), true, + info = new MessageInfo(info.getStoreKey(), info.getSize(), info.isDeleted(), true, info.isUndeleted(), ttlUpdateInfo.getExpirationTimeInMs(), info.getCrc(), info.getAccountId(), info.getContainerId(), - info.getOperationTimeMs()); + info.getOperationTimeMs(), info.getLifeVersion()); } return info; } diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java index 74cdac1b6f..9c1eac488c 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java @@ -247,6 +247,7 @@ private VerifiableProperties createInitProperties(DataNodeId dataNodeId, boolean props.setProperty("store.validate.authorization", "true"); props.setProperty("kms.default.container.key", TestUtils.getRandomKey(32)); props.setProperty("server.enable.store.data.prefetch", Boolean.toString(enableDataPrefetch)); + props.setProperty("server.handle.undelete.request.enabled", "true"); props.setProperty("replication.intra.replica.thread.throttle.sleep.duration.ms", "100"); props.setProperty("replication.inter.replica.thread.throttle.sleep.duration.ms", "100"); props.putAll(sslProperties); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java index 6596899509..6bd8385594 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java @@ -76,6 +76,8 @@ import com.github.ambry.protocol.PutResponse; import com.github.ambry.protocol.TtlUpdateRequest; import com.github.ambry.protocol.TtlUpdateResponse; +import com.github.ambry.protocol.UndeleteRequest; +import com.github.ambry.protocol.UndeleteResponse; import com.github.ambry.replication.FindTokenFactory; import com.github.ambry.router.Callback; import com.github.ambry.router.GetBlobOptionsBuilder; @@ -494,6 +496,13 @@ static void endToEndTest(Port targetPort, String routerDatacenter, MockCluster c actualBlobData = getBlobDataAndRelease(blobAll.getBlobData()); Assert.assertArrayEquals("Content mismatch", data, actualBlobData); + // undelete a not-deleted blob should return fail + UndeleteRequest undeleteRequest = new UndeleteRequest(1, "undeleteClient", blobId1, System.currentTimeMillis()); + channel.send(undeleteRequest); + stream = channel.receive().getInputStream(); + UndeleteResponse undeleteResponse = UndeleteResponse.readFrom(new DataInputStream(stream)); + assertEquals("Undelete blob should succeed", ServerErrorCode.Blob_Not_Deleted, undeleteResponse.getError()); + // delete a blob on a restarted store , which should succeed deleteRequest = new DeleteRequest(1, "deleteClient", blobId1, System.currentTimeMillis()); channel.send(deleteRequest); @@ -502,6 +511,31 @@ static void endToEndTest(Port targetPort, String routerDatacenter, MockCluster c assertEquals("Delete blob on restarted store should succeed", ServerErrorCode.No_Error, deleteResponse.getError()); + // undelete a deleted blob, which should succeed + undeleteRequest = new UndeleteRequest(2, "undeleteClient", blobId1, System.currentTimeMillis()); + channel.send(undeleteRequest); + stream = channel.receive().getInputStream(); + undeleteResponse = UndeleteResponse.readFrom(new DataInputStream(stream)); + assertEquals("Undelete blob should succeed", ServerErrorCode.No_Error, undeleteResponse.getError()); + assertEquals("Undelete life version mismatch", undeleteResponse.getLifeVersion(), (short) 1); + + // undelete an already undeleted blob, which should fail + undeleteRequest = new UndeleteRequest(3, "undeleteClient", blobId1, System.currentTimeMillis()); + channel.send(undeleteRequest); + stream = channel.receive().getInputStream(); + undeleteResponse = UndeleteResponse.readFrom(new DataInputStream(stream)); + assertEquals("Undelete blob should fail", ServerErrorCode.Blob_Already_Undeleted, undeleteResponse.getError()); + + // get an undeleted blob, which should succeed + getRequest1 = new GetRequest(1, "clientid1", MessageFormatFlags.All, partitionRequestInfoList, GetOption.None); + channel.send(getRequest1); + stream = channel.receive().getInputStream(); + resp1 = GetResponse.readFrom(new DataInputStream(stream), clusterMap); + responseStream = resp1.getInputStream(); + blobAll = MessageFormatRecord.deserializeBlobAll(responseStream, blobIdFactory); + actualBlobData = getBlobDataAndRelease(blobAll.getBlobData()); + Assert.assertArrayEquals("Content mismatch", data, actualBlobData); + // Bounce servers to make them read the persisted token file. cluster.stopServers(); cluster.startServers(); diff --git a/ambry-server/src/main/java/com.github.ambry.server/AmbryServerRequests.java b/ambry-server/src/main/java/com.github.ambry.server/AmbryServerRequests.java index 9b878d4a75..487502d6be 100644 --- a/ambry-server/src/main/java/com.github.ambry.server/AmbryServerRequests.java +++ b/ambry-server/src/main/java/com.github.ambry.server/AmbryServerRequests.java @@ -74,11 +74,12 @@ public class AmbryServerRequests extends AmbryRequests { private final ConcurrentHashMap localPartitionToReplicaMap; // POST requests are allowed on stores states: { LEADER, STANDBY } static final Set PUT_ALLOWED_STORE_STATES = EnumSet.of(ReplicaState.LEADER, ReplicaState.STANDBY); - // UPDATE requests (including DELETE, TTLUpdate) are allowed on stores states: { LEADER, STANDBY, INACTIVE, BOOTSTRAP } + // UPDATE requests (including DELETE, TTLUpdate, UNDELETE) are allowed on stores states: { LEADER, STANDBY, INACTIVE, BOOTSTRAP } static final Set UPDATE_ALLOWED_STORE_STATES = EnumSet.of(ReplicaState.LEADER, ReplicaState.STANDBY, ReplicaState.INACTIVE, ReplicaState.BOOTSTRAP); static final Set UPDATE_REQUEST_TYPES = - EnumSet.of(RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest); + EnumSet.of(RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest, + RequestOrResponseType.UndeleteResponse); private static final Logger logger = LoggerFactory.getLogger(AmbryServerRequests.class); @@ -93,7 +94,7 @@ public class AmbryServerRequests extends AmbryRequests { this.statsManager = statsManager; for (RequestOrResponseType requestType : EnumSet.of(RequestOrResponseType.PutRequest, - RequestOrResponseType.GetRequest, RequestOrResponseType.DeleteRequest, + RequestOrResponseType.GetRequest, RequestOrResponseType.DeleteRequest, RequestOrResponseType.UndeleteRequest, RequestOrResponseType.ReplicaMetadataRequest, RequestOrResponseType.TtlUpdateRequest)) { requestsDisableInfo.put(requestType, Collections.newSetFromMap(new ConcurrentHashMap<>())); } @@ -121,6 +122,9 @@ protected ConcurrentHashMap createLocalPartitionToReplic * @return {@code true} if the request is enabled. {@code false} otherwise. */ private boolean isRequestEnabled(RequestOrResponseType requestType, PartitionId id) { + if (requestType.equals(RequestOrResponseType.UndeleteRequest) && !serverConfig.serverHandleUndeleteRequestEnabled) { + return false; + } Set requestDisableInfo = requestsDisableInfo.get(requestType); // 1. check if request is disabled by admin request if (requestDisableInfo != null && requestDisableInfo.contains(id)) { diff --git a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java index bd4ecbb39c..2f55dbaf44 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java @@ -67,6 +67,7 @@ import com.github.ambry.protocol.RequestOrResponseType; import com.github.ambry.protocol.Response; import com.github.ambry.protocol.TtlUpdateRequest; +import com.github.ambry.protocol.UndeleteRequest; import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.replication.MockFindTokenHelper; import com.github.ambry.replication.MockReplicationManager; @@ -127,7 +128,6 @@ public class AmbryServerRequestsTest { private final MockStorageManager storageManager; private final MockReplicationManager replicationManager; private final MockStatsManager statsManager; - private final AmbryServerRequests ambryRequests; private final MockRequestResponseChannel requestResponseChannel = new MockRequestResponseChannel(); private final Set validKeysInStore = new HashSet<>(); private final Map conversionMap = new HashMap<>(); @@ -137,6 +137,7 @@ public class AmbryServerRequestsTest { private final ReplicaStatusDelegate mockDelegate = Mockito.mock(ReplicaStatusDelegate.class); private final boolean putRequestShareMemory; private final boolean validateRequestOnStoreState; + private AmbryServerRequests ambryRequests; @Parameterized.Parameters public static List data() { @@ -148,15 +149,7 @@ public AmbryServerRequestsTest(boolean putRequestShareMemory, boolean validateRe this.putRequestShareMemory = putRequestShareMemory; this.validateRequestOnStoreState = validateRequestOnStoreState; clusterMap = new MockClusterMap(); - Properties properties = new Properties(); - properties.setProperty("clustermap.cluster.name", "test"); - properties.setProperty("clustermap.datacenter.name", "DC1"); - properties.setProperty("clustermap.host.name", "localhost"); - properties.setProperty("replication.token.factory", "com.github.ambry.store.StoreFindTokenFactory"); - properties.setProperty("replication.no.of.intra.dc.replica.threads", "1"); - properties.setProperty("replication.no.of.inter.dc.replica.threads", "1"); - properties.setProperty("server.validate.request.based.on.store.state", - Boolean.toString(validateRequestOnStoreState)); + Properties properties = createProperties(validateRequestOnStoreState, true); VerifiableProperties verifiableProperties = new VerifiableProperties(properties); replicationConfig = new ReplicationConfig(verifiableProperties); serverConfig = new ServerConfig(verifiableProperties); @@ -183,6 +176,21 @@ public AmbryServerRequestsTest(boolean putRequestShareMemory, boolean validateRe Mockito.when(mockDelegate.unmarkStopped(anyList())).thenReturn(true); } + private static Properties createProperties(boolean validateRequestOnStoreState, + boolean handleUndeleteRequestEnabled) { + Properties properties = new Properties(); + properties.setProperty("clustermap.cluster.name", "test"); + properties.setProperty("clustermap.datacenter.name", "DC1"); + properties.setProperty("clustermap.host.name", "localhost"); + properties.setProperty("replication.token.factory", "com.github.ambry.store.StoreFindTokenFactory"); + properties.setProperty("replication.no.of.intra.dc.replica.threads", "1"); + properties.setProperty("replication.no.of.inter.dc.replica.threads", "1"); + properties.setProperty("server.validate.request.based.on.store.state", + Boolean.toString(validateRequestOnStoreState)); + properties.setProperty("server.handle.undelete.request.enabled", Boolean.toString(handleUndeleteRequestEnabled)); + return properties; + } + /** * Close the storageManager created. */ @@ -210,7 +218,8 @@ public void validateRequestsTest() { } for (RequestOrResponseType request : EnumSet.of(RequestOrResponseType.PutRequest, RequestOrResponseType.GetRequest, - RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest)) { + RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest, + RequestOrResponseType.UndeleteRequest)) { for (Map.Entry entry : stateToReplica.entrySet()) { if (request == RequestOrResponseType.PutRequest) { // for PUT request, it is not allowed on OFFLINE,BOOTSTRAP and INACTIVE when validateRequestOnStoreState = true @@ -223,7 +232,7 @@ public void validateRequestsTest() { ambryRequests.validateRequest(entry.getValue().getPartitionId(), request, false)); } } else if (AmbryServerRequests.UPDATE_REQUEST_TYPES.contains(request)) { - // for DELETE/TTL Update request, they are not allowed on OFFLINE,BOOTSTRAP and INACTIVE when validateRequestOnStoreState = true + // for UNDELETE/DELETE/TTL Update request, they are not allowed on OFFLINE,BOOTSTRAP and INACTIVE when validateRequestOnStoreState = true if (AmbryServerRequests.UPDATE_ALLOWED_STORE_STATES.contains(entry.getKey())) { assertEquals("Error code is not expected for DELETE/TTL Update", ServerErrorCode.No_Error, ambryRequests.validateRequest(entry.getValue().getPartitionId(), request, false)); @@ -313,7 +322,8 @@ public void scheduleCompactionFailureTest() throws InterruptedException, IOExcep public void controlRequestSuccessTest() throws InterruptedException, IOException { RequestOrResponseType[] requestOrResponseTypes = {RequestOrResponseType.PutRequest, RequestOrResponseType.DeleteRequest, RequestOrResponseType.GetRequest, - RequestOrResponseType.ReplicaMetadataRequest, RequestOrResponseType.TtlUpdateRequest}; + RequestOrResponseType.ReplicaMetadataRequest, RequestOrResponseType.TtlUpdateRequest, + RequestOrResponseType.UndeleteRequest}; for (RequestOrResponseType requestType : requestOrResponseTypes) { List partitionIds = clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS); for (PartitionId id : partitionIds) { @@ -776,6 +786,64 @@ public void ttlUpdateTest() throws InterruptedException, IOException, MessageFor miscTtlUpdateFailuresTest(); } + /** + * Tests for server config to enable and disable UNDELETE + */ + @Test + public void undeleteEnableDisableTest() throws Exception { + Properties properties = createProperties(validateRequestOnStoreState, false); + VerifiableProperties verifiableProperties = new VerifiableProperties(properties); + ServerConfig serverConfig = new ServerConfig(verifiableProperties); + ServerMetrics serverMetrics = + new ServerMetrics(clusterMap.getMetricRegistry(), AmbryRequests.class, AmbryServer.class); + AmbryServerRequests other = new AmbryServerRequests(storageManager, requestResponseChannel, clusterMap, dataNodeId, + clusterMap.getMetricRegistry(), serverMetrics, findTokenHelper, null, replicationManager, null, serverConfig, + storeKeyConverterFactory, statsManager); + + AmbryServerRequests temp = ambryRequests; + ambryRequests = other; + try { + MockPartitionId id = (MockPartitionId) clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS).get(0); + int correlationId = TestUtils.RANDOM.nextInt(); + String clientId = UtilsTest.getRandomString(10); + BlobId blobId = new BlobId(CommonTestUtils.getCurrentBlobIdVersion(), BlobId.BlobIdType.NATIVE, + ClusterMapUtils.UNKNOWN_DATACENTER_ID, Utils.getRandomShort(TestUtils.RANDOM), + Utils.getRandomShort(TestUtils.RANDOM), id, false, BlobId.BlobDataType.DATACHUNK); + long opTimeMs = SystemTime.getInstance().milliseconds(); + doUndelete(correlationId++, clientId, blobId, opTimeMs, ServerErrorCode.Temporarily_Disabled); + } finally { + ambryRequests = temp; + } + } + + /** + * Tests for success and failure scenarios for UNDELETE + */ + @Test + public void undeleteTest() throws Exception { + MockPartitionId id = (MockPartitionId) clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS).get(0); + int correlationId = TestUtils.RANDOM.nextInt(); + String clientId = UtilsTest.getRandomString(10); + BlobId blobId = new BlobId(CommonTestUtils.getCurrentBlobIdVersion(), BlobId.BlobIdType.NATIVE, + ClusterMapUtils.UNKNOWN_DATACENTER_ID, Utils.getRandomShort(TestUtils.RANDOM), + Utils.getRandomShort(TestUtils.RANDOM), id, false, BlobId.BlobDataType.DATACHUNK); + long opTimeMs = SystemTime.getInstance().milliseconds(); + + // since we already test store key conversion in ttlupdate, we don't test it again in this method. + // storekey not valid for store + doUndelete(correlationId++, clientId, blobId, opTimeMs, ServerErrorCode.Blob_Not_Found); + // valid now + validKeysInStore.add(blobId); + doUndelete(correlationId++, clientId, blobId, opTimeMs, ServerErrorCode.No_Error); + + // READ_ONLY is fine too + changePartitionState(id, true); + doUndelete(correlationId++, clientId, blobId, opTimeMs, ServerErrorCode.No_Error); + changePartitionState(id, false); + + miscUndeleteFailuresTest(); + } + // helpers // general @@ -824,7 +892,8 @@ private void sendAndVerifyOperationRequest(RequestOrResponse request, ServerErro Response response = sendRequestGetResponse(request, EnumSet.of(RequestOrResponseType.GetRequest, RequestOrResponseType.ReplicaMetadataRequest).contains(requestType) ? ServerErrorCode.No_Error : expectedErrorCode); - if (expectedErrorCode.equals(ServerErrorCode.No_Error) || forceCheckOpReceived) { + if (expectedErrorCode.equals(ServerErrorCode.No_Error) || ( + forceCheckOpReceived && !expectedErrorCode.equals(ServerErrorCode.Temporarily_Disabled))) { assertEquals("Operation received at the store not as expected", requestType, MockStorageManager.operationReceived); } @@ -1037,6 +1106,10 @@ private void sendAndVerifyOperationRequest(RequestOrResponseType requestType, Li case DeleteRequest: request = new DeleteRequest(correlationId, clientId, originalBlobId, SystemTime.getInstance().milliseconds()); break; + case UndeleteRequest: + request = + new UndeleteRequest(correlationId, clientId, originalBlobId, SystemTime.getInstance().milliseconds()); + break; case GetRequest: PartitionRequestInfo pRequestInfo = new PartitionRequestInfo(id, Collections.singletonList(originalBlobId)); request = @@ -1235,6 +1308,25 @@ private void doTtlUpdate(int correlationId, String clientId, BlobId blobId, long } } + /** + * Does a UNDELETE and checks for success if {@code expectedErrorCode} is {@link ServerErrorCode#No_Error}. Else, + * checks for failure with the code {@code expectedErrorCode}. + * @param correlationId the correlation ID to use in the request + * @param clientId the client ID to use in the request + * @param blobId the blob ID to use in the request + * @param opTimeMs the operation time (ms) to use in the request + * @param expectedErrorCode the expected {@link ServerErrorCode} + * @throws Exception + */ + private void doUndelete(int correlationId, String clientId, BlobId blobId, long opTimeMs, + ServerErrorCode expectedErrorCode) throws Exception { + UndeleteRequest request = new UndeleteRequest(correlationId, clientId, blobId, opTimeMs); + sendAndVerifyOperationRequest(request, expectedErrorCode, true); + if (expectedErrorCode == ServerErrorCode.No_Error) { + verifyUndelete(request.getBlobId(), opTimeMs, MockStorageManager.messageWriteSetReceived); + } + } + /** * Exercises various failure paths for TTL updates * @throws InterruptedException @@ -1271,6 +1363,41 @@ private void miscTtlUpdateFailuresTest() throws InterruptedException, IOExceptio // request disabled is checked in request control tests } + /** + * Exercises various failure paths for UNDELETEs + * @throws Exception + */ + private void miscUndeleteFailuresTest() throws Exception { + PartitionId id = clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS).get(0); + // store exceptions + for (StoreErrorCodes code : StoreErrorCodes.values()) { + MockStorageManager.storeException = new StoreException("expected", code); + ServerErrorCode expectedErrorCode = ErrorMapping.getStoreErrorMapping(code); + sendAndVerifyOperationRequest(RequestOrResponseType.UndeleteRequest, Collections.singletonList(id), + expectedErrorCode, true); + MockStorageManager.storeException = null; + } + // runtime exception + MockStorageManager.runtimeException = new RuntimeException("expected"); + sendAndVerifyOperationRequest(RequestOrResponseType.UndeleteRequest, Collections.singletonList(id), + ServerErrorCode.Unknown_Error, true); + MockStorageManager.runtimeException = null; + // store is not started/is stopped/otherwise unavailable - Replica_Unavailable + storageManager.returnNullStore = true; + sendAndVerifyOperationRequest(RequestOrResponseType.UndeleteRequest, Collections.singletonList(id), + ServerErrorCode.Replica_Unavailable, false); + storageManager.returnNullStore = false; + // PartitionUnknown is hard to simulate without betraying knowledge of the internals of MockClusterMap. + + // disk down + ReplicaId replicaId = findReplica(id); + clusterMap.onReplicaEvent(replicaId, ReplicaEventType.Disk_Error); + sendAndVerifyOperationRequest(RequestOrResponseType.UndeleteRequest, Collections.singletonList(id), + ServerErrorCode.Disk_Unavailable, false); + clusterMap.onReplicaEvent(replicaId, ReplicaEventType.Disk_Ok); + // request disabled is checked in request control tests + } + /** * Verifies that the TTL update request was delivered to the {@link Store} correctly. * @param blobId the {@link BlobId} that was updated @@ -1302,6 +1429,35 @@ private void verifyTtlUpdate(BlobId blobId, long expiresAtMs, long opTimeMs, Mes key.getContainerId(), opTimeMs, (short) 0); } + /** + * Verifies that the UNDELETE request was delivered to the {@link Store} correctly. + * @param blobId the {@link BlobId} that was updated + * @param opTimeMs the op time (in ms) + * @param messageWriteSet the {@link MessageWriteSet} received at the {@link Store} + * @throws IOException + * @throws MessageFormatException + * @throws StoreException + */ + private void verifyUndelete(BlobId blobId, long opTimeMs, MessageWriteSet messageWriteSet) throws Exception { + BlobId key = (BlobId) conversionMap.getOrDefault(blobId, blobId); + assertEquals("There should be one message in the write set", 1, messageWriteSet.getMessageSetInfo().size()); + MessageInfo info = messageWriteSet.getMessageSetInfo().get(0); + + // verify stream + ByteBuffer record = getDataInWriteSet(messageWriteSet, (int) info.getSize()); + int expectedSize = record.remaining(); + InputStream stream = new ByteBufferInputStream(record); + + // verify stream + MessageFormatInputStreamTest.checkUndeleteMessage(stream, null, key, key.getAccountId(), key.getContainerId(), + opTimeMs, storageManager.returnValueOfUndelete); + + // verify MessageInfo + // since the record has been verified, the buffer size before verification is a good indicator of size + MessageInfoTest.checkGetters(info, key, expectedSize, false, false, true, Utils.Infinite_Time, null, + key.getAccountId(), key.getContainerId(), opTimeMs, storageManager.returnValueOfUndelete); + } + /** * Implementation of {@link NetworkRequest} to help with tests. */ diff --git a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java index dc6200f72d..b91fa10a31 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java +++ b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java @@ -152,8 +152,8 @@ public short undelete(MessageInfo info) throws StoreException { new UndeleteMessageFormatInputStream(info.getStoreKey(), info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), (short) returnValueOfUndelete); // Update info to add stream size; - info = new MessageInfo(info.getStoreKey(), stream.getSize(), info.getAccountId(), info.getContainerId(), - info.getOperationTimeMs()); + info = new MessageInfo(info.getStoreKey(), stream.getSize(), false, false, true, Utils.Infinite_Time, null, + info.getAccountId(), info.getContainerId(), info.getOperationTimeMs(), returnValueOfUndelete); ArrayList infoList = new ArrayList<>(); infoList.add(info); messageWriteSetReceived = new MessageFormatWriteSet(stream, infoList, false);