Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[RC] ABR 18 + 20 (#5925)
Browse files Browse the repository at this point in the history
  • Loading branch information
gsheasby authored Feb 28, 2022
1 parent c526c10 commit d7015cc
Show file tree
Hide file tree
Showing 36 changed files with 701 additions and 122 deletions.
4 changes: 4 additions & 0 deletions atlasdb-backup/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ dependencies {
implementation project(':atlasdb-config')
implementation project(':timelock-api:timelock-api-objects')

implementation 'org.immutables:value'
implementation 'org.immutables:value::annotations'
implementation 'com.palantir.common:streams'
implementation 'com.palantir.safe-logging:safe-logging'
implementation 'com.palantir.conjure.java.runtime:conjure-java-jackson-serialization'

testImplementation 'org.mockito:mockito-core'
testCompile group: 'commons-codec', name: 'commons-codec', version: '1.15'

annotationProcessor group: 'org.immutables', name: 'value'
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.palantir.atlasdb.http.AtlasDbRemotingConstants;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.conjure.java.api.config.service.ServicesConfigBlock;
import com.palantir.conjure.java.api.config.service.UserAgent;
import com.palantir.dialogue.clients.DialogueClients;
Expand Down Expand Up @@ -75,20 +76,22 @@ public static AtlasBackupService create(
reloadingFactory.get(AtlasBackupClientBlocking.class, serviceName));

BackupPersister backupPersister = new ExternalBackupPersister(backupFolderFactory);
KvsRunner kvsRunner = KvsRunner.create(keyValueServiceFactory);
CoordinationServiceRecorder coordinationServiceRecorder =
new CoordinationServiceRecorder(keyValueServiceFactory, backupPersister);
new CoordinationServiceRecorder(kvsRunner, backupPersister);

return new AtlasBackupService(authHeader, atlasBackupClient, coordinationServiceRecorder, backupPersister);
}

public static AtlasBackupService create(
public static AtlasBackupService createForTests(
AuthHeader authHeader,
AtlasBackupClient atlasBackupClient,
Function<Namespace, Path> backupFolderFactory,
Function<Namespace, KeyValueService> keyValueServiceFactory) {
TransactionManager transactionManager,
Function<Namespace, Path> backupFolderFactory) {
BackupPersister backupPersister = new ExternalBackupPersister(backupFolderFactory);
KvsRunner kvsRunner = KvsRunner.create(transactionManager);
CoordinationServiceRecorder coordinationServiceRecorder =
new CoordinationServiceRecorder(keyValueServiceFactory, backupPersister);
new CoordinationServiceRecorder(kvsRunner, backupPersister);

return new AtlasBackupService(authHeader, atlasBackupClient, coordinationServiceRecorder, backupPersister);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@
import com.palantir.atlasdb.timelock.api.UnsuccessfulDisableNamespacesResponse;
import com.palantir.atlasdb.timelock.api.management.TimeLockManagementService;
import com.palantir.atlasdb.timelock.api.management.TimeLockManagementServiceBlocking;
import com.palantir.common.annotation.NonIdempotent;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.common.streams.KeyedStream;
import com.palantir.conjure.java.api.config.service.ServicesConfigBlock;
import com.palantir.conjure.java.api.config.service.UserAgent;
import com.palantir.dialogue.clients.DialogueClients;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
Expand Down Expand Up @@ -97,7 +98,21 @@ public static AtlasRestoreService create(
TimeLockManagementService timeLockManagementService = new DialogueAdaptingTimeLockManagementService(
reloadingFactory.get(TimeLockManagementServiceBlocking.class, serviceName));
CassandraRepairHelper cassandraRepairHelper =
new CassandraRepairHelper(keyValueServiceConfigFactory, keyValueServiceFactory);
new CassandraRepairHelper(KvsRunner.create(keyValueServiceFactory), keyValueServiceConfigFactory);

return new AtlasRestoreService(
authHeader, atlasRestoreClient, timeLockManagementService, backupPersister, cassandraRepairHelper);
}

public static AtlasRestoreService createForTests(
AuthHeader authHeader,
AtlasRestoreClient atlasRestoreClient,
TimeLockManagementService timeLockManagementService,
BackupPersister backupPersister,
TransactionManager transactionManager,
Function<Namespace, CassandraKeyValueServiceConfig> keyValueServiceConfigFactory) {
CassandraRepairHelper cassandraRepairHelper =
new CassandraRepairHelper(KvsRunner.create(transactionManager), keyValueServiceConfigFactory);

return new AtlasRestoreService(
authHeader, atlasRestoreClient, timeLockManagementService, backupPersister, cassandraRepairHelper);
Expand All @@ -108,14 +123,19 @@ public static AtlasRestoreService create(
* This will fail if any namespace is already disabled, unless it was disabled with the provided backupId.
* Namespaces for which we don't have a recorded backup will be ignored.
*
* @param namespaces the namespaces to disable.
* @param restoreRequests the requests to prepare.
* @param backupId a unique identifier for this request (uniquely identifies the backup to which we're restoring)
*
* @return the namespaces successfully disabled.
*/
public Set<Namespace> prepareRestore(Set<Namespace> namespaces, String backupId) {
Map<Namespace, CompletedBackup> completedBackups = getCompletedBackups(namespaces);
Set<Namespace> namespacesToRestore = completedBackups.keySet();
public Set<Namespace> prepareRestore(Set<RestoreRequest> restoreRequests, String backupId) {
Map<RestoreRequest, CompletedBackup> completedBackups = getCompletedBackups(restoreRequests);
Set<Namespace> namespacesToRestore = getNamespacesToRestore(completedBackups);
Preconditions.checkArgument(
namespacesToRestore.size() == completedBackups.size(),
"Attempting to restore multiple namespaces into the same namespace! "
+ "This will cause severe data corruption.",
SafeArg.of("restoreRequests", restoreRequests));

DisableNamespacesRequest request = DisableNamespacesRequest.of(namespacesToRestore, backupId);
DisableNamespacesResponse response = timeLockManagementService.disableTimelock(authHeader, request);
Expand All @@ -129,7 +149,7 @@ public Set<Namespace> visitSuccessful(SuccessfulDisableNamespacesResponse value)
public Set<Namespace> visitUnsuccessful(UnsuccessfulDisableNamespacesResponse value) {
log.error(
"Failed to disable namespaces prior to restore",
SafeArg.of("namespaces", namespaces),
SafeArg.of("requests", restoreRequests),
SafeArg.of("response", value));
return ImmutableSet.of();
}
Expand All @@ -148,15 +168,15 @@ public Set<Namespace> visitUnknown(String unknownType) {
* Namespaces are repaired serially. If repairTable throws an exception, then this will propagate back to the
* caller. In such cases, some namespaces may not have been repaired.
*
* @param namespaces the namespaces to repair.
* @param restoreRequests the repair requests.
* @param repairTable supplied function which is expected to repair the given ranges.
*
* @return the set of namespaces for which we issued a repair command via the provided Consumer.
*/
public Set<Namespace> repairInternalTables(
Set<Namespace> namespaces, BiConsumer<String, RangesForRepair> repairTable) {
Map<Namespace, CompletedBackup> completedBackups = getCompletedBackups(namespaces);
Set<Namespace> namespacesToRepair = completedBackups.keySet();
Set<RestoreRequest> restoreRequests, BiConsumer<String, RangesForRepair> repairTable) {
Map<RestoreRequest, CompletedBackup> completedBackups = getCompletedBackups(restoreRequests);
Set<Namespace> namespacesToRepair = getNamespacesToRestore(completedBackups);
repairTables(repairTable, completedBackups, namespacesToRepair);
return namespacesToRepair;
}
Expand All @@ -165,37 +185,40 @@ public Set<Namespace> repairInternalTables(
* Completes the restore process for the requested namespaces.
* This includes fast-forwarding the timestamp, and then re-enabling the TimeLock namespaces.
*
* @param namespaces the namespaces to re-enable
* @param restoreRequests the requests to complete.
* @param backupId the backup identifier, which must match the one given to {@link #prepareRestore(Set, String)}
* @return the set of namespaces that were successfully fast-forwarded and re-enabled.
*/
@NonIdempotent
public Set<Namespace> completeRestore(Set<Namespace> namespaces, String backupId) {
Set<CompletedBackup> completedBackups = namespaces.stream()
.map(backupPersister::getCompletedBackup)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
public Set<Namespace> completeRestore(Set<RestoreRequest> restoreRequests, String backupId) {
Map<RestoreRequest, CompletedBackup> completedBackups = getCompletedBackups(restoreRequests);
Set<Namespace> namespacesToRestore = getNamespacesToRestore(completedBackups);

if (completedBackups.isEmpty()) {
log.info(
"Attempted to complete restore, but no completed backups were found",
SafeArg.of("namespaces", namespaces));
SafeArg.of("restoreRequests", restoreRequests));
return ImmutableSet.of();
} else if (completedBackups.size() < namespaces.size()) {
Set<Namespace> namespacesWithBackup =
completedBackups.stream().map(CompletedBackup::getNamespace).collect(Collectors.toSet());
Set<Namespace> namespacesWithoutBackup = Sets.difference(namespaces, namespacesWithBackup);
} else if (completedBackups.size() < restoreRequests.size()) {
Set<Namespace> namespacesWithBackup = completedBackups.values().stream()
.map(CompletedBackup::getNamespace)
.collect(Collectors.toSet());
Set<Namespace> allOldNamespaces =
restoreRequests.stream().map(RestoreRequest::oldNamespace).collect(Collectors.toSet());
Set<Namespace> namespacesWithoutBackup = Sets.difference(allOldNamespaces, namespacesWithBackup);
log.warn(
"Completed backups were not found for some namespaces",
SafeArg.of("namespacesWithBackup", namespacesWithBackup),
SafeArg.of("namespacesWithoutBackup", namespacesWithoutBackup));
}

// Fast forward timestamps
Map<Namespace, CompletedBackup> completeRequest = KeyedStream.stream(completedBackups)
.mapKeys(RestoreRequest::newNamespace)
.collectToMap();
CompleteRestoreResponse response =
atlasRestoreClient.completeRestore(authHeader, CompleteRestoreRequest.of(completedBackups));
atlasRestoreClient.completeRestore(authHeader, CompleteRestoreRequest.of(completeRequest));
Set<Namespace> successfulNamespaces = response.getSuccessfulNamespaces();
Set<Namespace> failedNamespaces = Sets.difference(namespaces, successfulNamespaces);
Set<Namespace> failedNamespaces = Sets.difference(namespacesToRestore, successfulNamespaces);
if (!failedNamespaces.isEmpty()) {
log.error(
"Failed to fast-forward timestamp for some namespaces. These will not be re-enabled.",
Expand All @@ -204,9 +227,11 @@ public Set<Namespace> completeRestore(Set<Namespace> namespaces, String backupId
}

// Re-enable timelock
Set<Namespace> newNamespaces =
restoreRequests.stream().map(RestoreRequest::newNamespace).collect(Collectors.toSet());
timeLockManagementService.reenableTimelock(
authHeader, ReenableNamespacesRequest.of(successfulNamespaces, backupId));
if (successfulNamespaces.containsAll(namespaces)) {
if (successfulNamespaces.containsAll(newNamespaces)) {
log.info(
"Successfully completed restore for all namespaces",
SafeArg.of("namespaces", successfulNamespaces));
Expand All @@ -217,31 +242,34 @@ public Set<Namespace> completeRestore(Set<Namespace> namespaces, String backupId

private void repairTables(
BiConsumer<String, RangesForRepair> repairTable,
Map<Namespace, CompletedBackup> completedBackups,
Map<RestoreRequest, CompletedBackup> completedBackups,
Set<Namespace> namespacesToRepair) {
// ConsistentCasTablesTask
namespacesToRepair.forEach(namespace -> cassandraRepairHelper.repairInternalTables(namespace, repairTable));

// RepairTransactionsTablesTask
KeyedStream.stream(completedBackups)
.forEach((namespace, completedBackup) ->
repairTransactionsTables(namespace, completedBackup, repairTable));
.forEach((restoreRequest, completedBackup) ->
repairTransactionsTables(restoreRequest, completedBackup, repairTable));
}

private void repairTransactionsTables(
Namespace namespace, CompletedBackup completedBackup, BiConsumer<String, RangesForRepair> repairTable) {
Map<FullyBoundedTimestampRange, Integer> coordinationMap = getCoordinationMap(namespace, completedBackup);
RestoreRequest restoreRequest,
CompletedBackup completedBackup,
BiConsumer<String, RangesForRepair> repairTable) {
Map<FullyBoundedTimestampRange, Integer> coordinationMap = getCoordinationMap(completedBackup);
List<TransactionsTableInteraction> transactionsTableInteractions =
TransactionsTableInteraction.getTransactionTableInteractions(
coordinationMap, DefaultRetryPolicy.INSTANCE);
Namespace namespace = restoreRequest.newNamespace();
cassandraRepairHelper.repairTransactionsTables(namespace, transactionsTableInteractions, repairTable);
cassandraRepairHelper.cleanTransactionsTables(
namespace, completedBackup.getBackupStartTimestamp(), transactionsTableInteractions);
}

private Map<FullyBoundedTimestampRange, Integer> getCoordinationMap(
Namespace namespace, CompletedBackup completedBackup) {
Optional<InternalSchemaMetadataState> schemaMetadataState = backupPersister.getSchemaMetadata(namespace);
private Map<FullyBoundedTimestampRange, Integer> getCoordinationMap(CompletedBackup completedBackup) {
Optional<InternalSchemaMetadataState> schemaMetadataState =
backupPersister.getSchemaMetadata(completedBackup.getNamespace());

long fastForwardTs = completedBackup.getBackupEndTimestamp();
long immutableTs = completedBackup.getBackupStartTimestamp();
Expand All @@ -250,10 +278,17 @@ private Map<FullyBoundedTimestampRange, Integer> getCoordinationMap(
schemaMetadataState, fastForwardTs, immutableTs);
}

private Map<Namespace, CompletedBackup> getCompletedBackups(Set<Namespace> namespaces) {
return KeyedStream.of(namespaces)
private Map<RestoreRequest, CompletedBackup> getCompletedBackups(Set<RestoreRequest> restoreRequests) {
return KeyedStream.of(restoreRequests)
.map(RestoreRequest::oldNamespace)
.map(backupPersister::getCompletedBackup)
.flatMap(Optional::stream)
.collectToMap();
}

private Set<Namespace> getNamespacesToRestore(Map<RestoreRequest, CompletedBackup> completedBackups) {
return completedBackups.keySet().stream()
.map(RestoreRequest::newNamespace)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.Optional;
import java.util.function.Function;

final class CoordinationServiceRecorder {
private static final SafeLogger log = SafeLoggerFactory.get(CoordinationServiceRecorder.class);

private final Function<Namespace, KeyValueService> keyValueServiceFactory;
private final KvsRunner kvsRunner;
private final BackupPersister backupPersister;

CoordinationServiceRecorder(
Function<Namespace, KeyValueService> keyValueServiceFactory, BackupPersister backupPersister) {
this.keyValueServiceFactory = keyValueServiceFactory;
CoordinationServiceRecorder(KvsRunner kvsRunner, BackupPersister backupPersister) {
this.kvsRunner = kvsRunner;
this.backupPersister = backupPersister;
}

Expand All @@ -60,15 +58,17 @@ private void logEmptyMetadata(Namespace namespace) {
}

private Optional<InternalSchemaMetadataState> fetchSchemaMetadata(Namespace namespace, long timestamp) {
try (KeyValueService kvs = keyValueServiceFactory.apply(namespace)) {
if (!kvs.getAllTableNames().contains(AtlasDbConstants.COORDINATION_TABLE)) {
return Optional.empty();
}
CoordinationService<InternalSchemaMetadata> coordination =
CoordinationServices.createDefault(kvs, () -> timestamp, false);
return kvsRunner.run(namespace, kvs -> getInternalSchemaMetadataState(kvs, timestamp));
}

return Optional.of(InternalSchemaMetadataState.of(getValidMetadata(coordination, timestamp)));
private Optional<InternalSchemaMetadataState> getInternalSchemaMetadataState(KeyValueService kvs, long timestamp) {
if (!kvs.getAllTableNames().contains(AtlasDbConstants.COORDINATION_TABLE)) {
return Optional.empty();
}
CoordinationService<InternalSchemaMetadata> coordination =
CoordinationServices.createDefault(kvs, () -> timestamp, false);

return Optional.of(InternalSchemaMetadataState.of(getValidMetadata(coordination, timestamp)));
}

private ValueAndBound<InternalSchemaMetadata> getValidMetadata(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* (c) Copyright 2022 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.backup;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.palantir.atlasdb.timelock.api.Namespace;
import org.immutables.value.Value;

@JsonDeserialize(as = ImmutableRestoreRequest.class)
@JsonSerialize(as = ImmutableRestoreRequest.class)
@Value.Immutable
public interface RestoreRequest {
Namespace oldNamespace();

Namespace newNamespace();

class Builder extends ImmutableRestoreRequest.Builder {}

static Builder builder() {
return new Builder();
}
}
Loading

0 comments on commit d7015cc

Please sign in to comment.