From 125cd925b6294e77fb2c4c9a4d56a0a3d3100929 Mon Sep 17 00:00:00 2001 From: Raymond Huffman Date: Tue, 5 Mar 2024 00:51:29 -0500 Subject: [PATCH] service loading transaction manager and some changes to enable extension --- .../cell/api/TransactionKeyValueService.java | 3 + .../DefaultTransactionKeyValueService.java | 5 + .../factory/AtlasDbServiceDiscovery.java | 9 + .../DefaultTransactionManagerFactory.java | 163 +++++++++++++++ .../factory/TransactionManagerFactory.java | 100 +++++++++ .../atlasdb/factory/TransactionManagers.java | 5 +- .../impl/DefaultTaskExecutors.java | 4 +- .../impl/InitializeCheckingWrapper.java | 189 ++++++++++++++++++ .../impl/SerializableTransactionManager.java | 166 --------------- .../impl/SnapshotTransactionManager.java | 2 +- .../SerializableTransactionManagerTest.java | 8 +- 11 files changed, 478 insertions(+), 176 deletions(-) create mode 100644 atlasdb-config/src/main/java/com/palantir/atlasdb/factory/DefaultTransactionManagerFactory.java create mode 100644 atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagerFactory.java create mode 100644 atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InitializeCheckingWrapper.java diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java index 917fdcd3050..b06ef0aefb1 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/cell/api/TransactionKeyValueService.java @@ -23,6 +23,7 @@ import com.palantir.atlasdb.keyvalue.api.ColumnRangeSelection; import com.palantir.atlasdb.keyvalue.api.ColumnSelection; import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; +import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.keyvalue.api.RangeRequest; import com.palantir.atlasdb.keyvalue.api.RowColumnRangeIterator; import com.palantir.atlasdb.keyvalue.api.RowResult; @@ -69,4 +70,6 @@ RowColumnRangeIterator getRowsColumnRange( void multiPut(Map> valuesByTable, long timestamp) throws KeyAlreadyExistsException; + + KeyValueService getDelegate(); } diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/DefaultTransactionKeyValueService.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/DefaultTransactionKeyValueService.java index 3b91f725429..5054e535619 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/DefaultTransactionKeyValueService.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/DefaultTransactionKeyValueService.java @@ -95,4 +95,9 @@ public void multiPut(Map> valuesByTa throws KeyAlreadyExistsException { delegate.multiPut(valuesByTable, timestamp); } + + @Override + public KeyValueService getDelegate() { + return delegate; + } } diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/AtlasDbServiceDiscovery.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/AtlasDbServiceDiscovery.java index 8552886ba62..8cb69c6e3ec 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/AtlasDbServiceDiscovery.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/AtlasDbServiceDiscovery.java @@ -57,6 +57,15 @@ public static NamespaceDeleterFactory createNamespaceDeleterFactoryOfCorrectType config, NamespaceDeleterFactory::getType, NamespaceDeleterFactory.class); } + public static TransactionManagerFactory createTransactionManagerFactoryOfCorrectType(KeyValueServiceConfig config) { + try { + return createAtlasDbServiceOfCorrectType( + config, TransactionManagerFactory::getType, TransactionManagerFactory.class); + } catch (SafeIllegalStateException _e) { + return new DefaultTransactionManagerFactory(); + } + } + private static T createAtlasDbServiceOfCorrectType( KeyValueServiceConfig config, Function typeExtractor, Class clazz) { return createAtlasDbServiceOfCorrectType(config.type(), typeExtractor, clazz); diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/DefaultTransactionManagerFactory.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/DefaultTransactionManagerFactory.java new file mode 100644 index 00000000000..c98c3210711 --- /dev/null +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/DefaultTransactionManagerFactory.java @@ -0,0 +1,163 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.factory; + +import com.google.auto.service.AutoService; +import com.palantir.async.initializer.Callback; +import com.palantir.atlasdb.cache.TimestampCache; +import com.palantir.atlasdb.cleaner.api.Cleaner; +import com.palantir.atlasdb.debug.ConflictTracer; +import com.palantir.atlasdb.keyvalue.api.watch.LockWatchManagerInternal; +import com.palantir.atlasdb.sweep.queue.MultiTableSweepQueueWriter; +import com.palantir.atlasdb.transaction.TransactionConfig; +import com.palantir.atlasdb.transaction.api.AtlasDbConstraintCheckingMode; +import com.palantir.atlasdb.transaction.api.TransactionKeyValueServiceManager; +import com.palantir.atlasdb.transaction.api.TransactionManager; +import com.palantir.atlasdb.transaction.impl.ConflictDetectionManager; +import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; +import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; +import com.palantir.atlasdb.transaction.impl.metrics.MetricsFilterEvaluationContext; +import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; +import com.palantir.atlasdb.transaction.service.TransactionService; +import com.palantir.atlasdb.util.MetricsManager; +import com.palantir.lock.LockService; +import com.palantir.lock.v2.TimelockService; +import com.palantir.timestamp.TimestampManagementService; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; + +@AutoService(TransactionManagerFactory.class) +public class DefaultTransactionManagerFactory implements TransactionManagerFactory { + public static final String TYPE = "default"; + + @Override + public String getType() { + return TYPE; + } + + @Override + public TransactionManager createInstrumented( + MetricsManager metricsManager, + TransactionKeyValueServiceManager transactionKeyValueServiceManager, + TimelockService timelockService, + LockWatchManagerInternal lockWatchManager, + TimestampManagementService timestampManagementService, + LockService lockService, + TransactionService transactionService, + Supplier constraintModeSupplier, + ConflictDetectionManager conflictDetectionManager, + SweepStrategyManager sweepStrategyManager, + Cleaner cleaner, + Supplier initializationPrerequisite, + boolean allowHiddenTableAccess, + int concurrentGetRangesThreadPoolSize, + int defaultGetRangesConcurrency, + boolean initializeAsync, + TimestampCache timestampCache, + MultiTableSweepQueueWriter sweepQueueWriter, + Callback callback, + boolean validateLocksOnReads, + Supplier transactionConfig, + ConflictTracer conflictTracer, + MetricsFilterEvaluationContext metricsFilterEvaluationContext, + Optional sharedGetRangesPoolSize, + TransactionKnowledgeComponents knowledge) { + return SerializableTransactionManager.createInstrumented( + metricsManager, + transactionKeyValueServiceManager, + timelockService, + lockWatchManager, + timestampManagementService, + lockService, + transactionService, + constraintModeSupplier, + conflictDetectionManager, + sweepStrategyManager, + cleaner, + initializationPrerequisite, + allowHiddenTableAccess, + concurrentGetRangesThreadPoolSize, + defaultGetRangesConcurrency, + initializeAsync, + timestampCache, + sweepQueueWriter, + callback, + validateLocksOnReads, + transactionConfig, + conflictTracer, + metricsFilterEvaluationContext, + sharedGetRangesPoolSize, + knowledge); + } + + @Override + public TransactionManager create( + MetricsManager metricsManager, + TransactionKeyValueServiceManager transactionKeyValueServiceManager, + TimelockService timelockService, + LockWatchManagerInternal lockWatchManager, + TimestampManagementService timestampManagementService, + LockService lockService, + TransactionService transactionService, + Supplier constraintModeSupplier, + ConflictDetectionManager conflictDetectionManager, + SweepStrategyManager sweepStrategyManager, + Cleaner cleaner, + Supplier initializationPrerequisite, + boolean allowHiddenTableAccess, + int concurrentGetRangesThreadPoolSize, + int defaultGetRangesConcurrency, + boolean initializeAsync, + TimestampCache timestampCache, + MultiTableSweepQueueWriter sweepQueueWriter, + Callback callback, + ScheduledExecutorService initializer, + boolean validateLocksOnReads, + Supplier transactionConfig, + ConflictTracer conflictTracer, + MetricsFilterEvaluationContext metricsFilterEvaluationContext, + Optional sharedGetRangesPoolSize, + TransactionKnowledgeComponents knowledge) { + return SerializableTransactionManager.create( + metricsManager, + transactionKeyValueServiceManager, + timelockService, + lockWatchManager, + timestampManagementService, + lockService, + transactionService, + constraintModeSupplier, + conflictDetectionManager, + sweepStrategyManager, + cleaner, + initializationPrerequisite, + allowHiddenTableAccess, + concurrentGetRangesThreadPoolSize, + defaultGetRangesConcurrency, + initializeAsync, + timestampCache, + sweepQueueWriter, + callback, + validateLocksOnReads, + transactionConfig, + conflictTracer, + metricsFilterEvaluationContext, + sharedGetRangesPoolSize, + knowledge); + } +} diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagerFactory.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagerFactory.java new file mode 100644 index 00000000000..3128186ada7 --- /dev/null +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagerFactory.java @@ -0,0 +1,100 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.factory; + +import com.palantir.async.initializer.Callback; +import com.palantir.atlasdb.cache.TimestampCache; +import com.palantir.atlasdb.cleaner.api.Cleaner; +import com.palantir.atlasdb.debug.ConflictTracer; +import com.palantir.atlasdb.keyvalue.api.watch.LockWatchManagerInternal; +import com.palantir.atlasdb.sweep.queue.MultiTableSweepQueueWriter; +import com.palantir.atlasdb.transaction.TransactionConfig; +import com.palantir.atlasdb.transaction.api.AtlasDbConstraintCheckingMode; +import com.palantir.atlasdb.transaction.api.TransactionKeyValueServiceManager; +import com.palantir.atlasdb.transaction.api.TransactionManager; +import com.palantir.atlasdb.transaction.impl.ConflictDetectionManager; +import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; +import com.palantir.atlasdb.transaction.impl.metrics.MetricsFilterEvaluationContext; +import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents; +import com.palantir.atlasdb.transaction.service.TransactionService; +import com.palantir.atlasdb.util.MetricsManager; +import com.palantir.lock.LockService; +import com.palantir.lock.v2.TimelockService; +import com.palantir.timestamp.TimestampManagementService; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; + +@SuppressWarnings("TooManyArguments") +public interface TransactionManagerFactory { + String getType(); + + TransactionManager createInstrumented( + MetricsManager metricsManager, + TransactionKeyValueServiceManager transactionKeyValueServiceManager, + TimelockService timelockService, + LockWatchManagerInternal lockWatchManager, + TimestampManagementService timestampManagementService, + LockService lockService, + TransactionService transactionService, + Supplier constraintModeSupplier, + ConflictDetectionManager conflictDetectionManager, + SweepStrategyManager sweepStrategyManager, + Cleaner cleaner, + Supplier initializationPrerequisite, + boolean allowHiddenTableAccess, + int concurrentGetRangesThreadPoolSize, + int defaultGetRangesConcurrency, + boolean initializeAsync, + TimestampCache timestampCache, + MultiTableSweepQueueWriter sweepQueueWriter, + Callback callback, + boolean validateLocksOnReads, + Supplier transactionConfig, + ConflictTracer conflictTracer, + MetricsFilterEvaluationContext metricsFilterEvaluationContext, + Optional sharedGetRangesPoolSize, + TransactionKnowledgeComponents knowledge); + + TransactionManager create( + MetricsManager metricsManager, + TransactionKeyValueServiceManager transactionKeyValueServiceManager, + TimelockService timelockService, + LockWatchManagerInternal lockWatchManager, + TimestampManagementService timestampManagementService, + LockService lockService, + TransactionService transactionService, + Supplier constraintModeSupplier, + ConflictDetectionManager conflictDetectionManager, + SweepStrategyManager sweepStrategyManager, + Cleaner cleaner, + Supplier initializationPrerequisite, + boolean allowHiddenTableAccess, + int concurrentGetRangesThreadPoolSize, + int defaultGetRangesConcurrency, + boolean initializeAsync, + TimestampCache timestampCache, + MultiTableSweepQueueWriter sweepQueueWriter, + Callback callback, + ScheduledExecutorService initializer, + boolean validateLocksOnReads, + Supplier transactionConfig, + ConflictTracer conflictTracer, + MetricsFilterEvaluationContext metricsFilterEvaluationContext, + Optional sharedGetRangesPoolSize, + TransactionKnowledgeComponents knowledge); +} diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index abc68d1d987..24b610f6ad8 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -100,7 +100,6 @@ import com.palantir.atlasdb.transaction.api.TransactionManager; import com.palantir.atlasdb.transaction.impl.ConflictDetectionManager; import com.palantir.atlasdb.transaction.impl.ConflictDetectionManagers; -import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; import com.palantir.atlasdb.transaction.impl.SweepStrategyManagers; import com.palantir.atlasdb.transaction.impl.TimelockTimestampServiceAdapter; @@ -535,8 +534,10 @@ private TransactionManager serializableInternal(@Output List clos asyncInitializationCallback(), createClearsTable())); + TransactionManagerFactory transactionManagerFactory = + AtlasDbServiceDiscovery.createTransactionManagerFactoryOfCorrectType(installConfig); TransactionManager transactionManager = initializeCloseable( - () -> SerializableTransactionManager.createInstrumented( + () -> transactionManagerFactory.createInstrumented( metricsManager, transactionKeyValueServiceManager, lockAndTimestampServices.timelock(), diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/DefaultTaskExecutors.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/DefaultTaskExecutors.java index de3a96efa48..94d3dff35bd 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/DefaultTaskExecutors.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/DefaultTaskExecutors.java @@ -25,7 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -final class DefaultTaskExecutors { +public final class DefaultTaskExecutors { private static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofSeconds(5); private static final int SINGLE_THREAD = 1; @@ -36,7 +36,7 @@ private DefaultTaskExecutors() { // factory } - static ExecutorService createDefaultDeleteExecutor() { + public static ExecutorService createDefaultDeleteExecutor() { return PTExecutors.newThreadPoolExecutor( SINGLE_THREAD, SINGLE_THREAD, diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InitializeCheckingWrapper.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InitializeCheckingWrapper.java new file mode 100644 index 00000000000..04cb4d05761 --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InitializeCheckingWrapper.java @@ -0,0 +1,189 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.transaction.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.palantir.async.initializer.Callback; +import com.palantir.atlasdb.transaction.api.AutoDelegate_TransactionManager; +import com.palantir.atlasdb.transaction.api.TransactionManager; +import com.palantir.exception.NotInitializedException; +import com.palantir.lock.LockService; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class InitializeCheckingWrapper implements AutoDelegate_TransactionManager { + private static final SafeLogger log = SafeLoggerFactory.get(InitializeCheckingWrapper.class); + + private final TransactionManager txManager; + private final Supplier initializationPrerequisite; + private final Callback callback; + + private State status = State.INITIALIZING; + private Throwable callbackThrowable = null; + + private final ScheduledExecutorService executorService; + + public InitializeCheckingWrapper( + TransactionManager manager, + Supplier initializationPrerequisite, + Callback callback, + ScheduledExecutorService initializer) { + this.txManager = manager; + this.initializationPrerequisite = initializationPrerequisite; + this.callback = callback; + this.executorService = initializer; + scheduleInitializationCheckAndCallback(); + } + + @Override + public TransactionManager delegate() { + assertOpen(); + if (!isInitialized()) { + throw new NotInitializedException("TransactionManager"); + } + return txManager; + } + + @Override + public boolean isInitialized() { + assertOpen(); + return status == State.READY && isInitializedInternal(); + } + + @Override + public LockService getLockService() { + assertOpen(); + return txManager.getLockService(); + } + + @Override + public void registerClosingCallback(Runnable closingCallback) { + assertOpen(); + txManager.registerClosingCallback(closingCallback); + } + + @Override + public void close() { + closeInternal(State.CLOSED); + } + + @VisibleForTesting + boolean isClosedByClose() { + return status == State.CLOSED; + } + + @VisibleForTesting + boolean isClosedByCallbackFailure() { + return status == State.CLOSED_BY_CALLBACK_FAILURE; + } + + private void assertOpen() { + if (status == State.CLOSED) { + throw new SafeIllegalStateException("Operations cannot be performed on closed TransactionManager."); + } + if (status == State.CLOSED_BY_CALLBACK_FAILURE) { + throw new SafeIllegalStateException( + "Operations cannot be performed on closed TransactionManager." + + " Closed due to a callback failure.", + callbackThrowable); + } + } + + private void runCallbackIfInitializedOrScheduleForLater(Runnable callbackTask) { + if (isInitializedInternal()) { + callbackTask.run(); + } else { + scheduleInitializationCheckAndCallback(); + } + } + + private void scheduleInitializationCheckAndCallback() { + executorService.schedule( + () -> { + if (status != State.INITIALIZING) { + return; + } + runCallbackIfInitializedOrScheduleForLater(this::runCallbackWithRetry); + }, + 1_000, + TimeUnit.MILLISECONDS); + } + + private boolean isInitializedInternal() { + // Note that the PersistentLockService is also initialized asynchronously as part of + // TransactionManagers.create; however, this is not required for the TransactionManager to fulfil + // requests (note that it is not accessible from any TransactionManager implementation), so we omit + // checking here whether it is initialized. + return txManager.getKeyValueService().isInitialized() + && txManager.getTimelockService().isInitialized() + && txManager.getTimestampService().isInitialized() + && txManager.getCleaner().isInitialized() + && initializationPrerequisite.get(); + } + + private void runCallbackWithRetry() { + try { + callback.runWithRetry(txManager); + changeStateToReady(); + } catch (Throwable e) { + changeStateToCallbackFailure(e); + } + } + + private void changeStateToReady() { + if (checkAndSetStatus(ImmutableSet.of(State.INITIALIZING), State.READY)) { + executorService.shutdown(); + } + } + + private void changeStateToCallbackFailure(Throwable th) { + log.error( + "Callback failed and was not able to perform its cleanup task. " + "Closing the TransactionManager.", + th); + callbackThrowable = th; + closeInternal(State.CLOSED_BY_CALLBACK_FAILURE); + } + + private void closeInternal(State newStatus) { + if (checkAndSetStatus(ImmutableSet.of(State.INITIALIZING, State.READY), newStatus)) { + callback.blockUntilSafeToShutdown(); + executorService.shutdown(); + txManager.close(); + } + } + + private synchronized boolean checkAndSetStatus(Set expected, State desired) { + if (expected.contains(status)) { + status = desired; + return true; + } + return false; + } + + private enum State { + INITIALIZING, + READY, + CLOSED, + CLOSED_BY_CALLBACK_FAILURE + } +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java index d8c39bd34cb..21efecc9ee4 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java @@ -16,7 +16,6 @@ package com.palantir.atlasdb.transaction.impl; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import com.palantir.async.initializer.Callback; import com.palantir.atlasdb.cache.DefaultTimestampCache; import com.palantir.atlasdb.cache.TimestampCache; @@ -28,7 +27,6 @@ import com.palantir.atlasdb.transaction.ImmutableTransactionConfig; import com.palantir.atlasdb.transaction.TransactionConfig; import com.palantir.atlasdb.transaction.api.AtlasDbConstraintCheckingMode; -import com.palantir.atlasdb.transaction.api.AutoDelegate_TransactionManager; import com.palantir.atlasdb.transaction.api.PreCommitCondition; import com.palantir.atlasdb.transaction.api.TransactionKeyValueServiceManager; import com.palantir.atlasdb.transaction.api.TransactionManager; @@ -41,183 +39,19 @@ import com.palantir.atlasdb.util.MetricsManager; import com.palantir.common.concurrent.NamedThreadFactory; import com.palantir.common.concurrent.PTExecutors; -import com.palantir.exception.NotInitializedException; import com.palantir.lock.LockService; import com.palantir.lock.v2.LockToken; import com.palantir.lock.v2.TimelockService; -import com.palantir.logsafe.exceptions.SafeIllegalStateException; -import com.palantir.logsafe.logger.SafeLogger; -import com.palantir.logsafe.logger.SafeLoggerFactory; import com.palantir.timestamp.TimestampManagementService; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; import java.util.function.Supplier; @SuppressWarnings("TooManyArguments") // Legacy public class SerializableTransactionManager extends SnapshotTransactionManager { - private static final SafeLogger log = SafeLoggerFactory.get(SerializableTransactionManager.class); - private final ConflictTracer conflictTracer; - public static class InitializeCheckingWrapper implements AutoDelegate_TransactionManager { - private final TransactionManager txManager; - private final Supplier initializationPrerequisite; - private final Callback callback; - - private State status = State.INITIALIZING; - private Throwable callbackThrowable = null; - - private final ScheduledExecutorService executorService; - - InitializeCheckingWrapper( - TransactionManager manager, - Supplier initializationPrerequisite, - Callback callback, - ScheduledExecutorService initializer) { - this.txManager = manager; - this.initializationPrerequisite = initializationPrerequisite; - this.callback = callback; - this.executorService = initializer; - scheduleInitializationCheckAndCallback(); - } - - @Override - public TransactionManager delegate() { - assertOpen(); - if (!isInitialized()) { - throw new NotInitializedException("TransactionManager"); - } - return txManager; - } - - @Override - public boolean isInitialized() { - assertOpen(); - return status == State.READY && isInitializedInternal(); - } - - @Override - public LockService getLockService() { - assertOpen(); - return txManager.getLockService(); - } - - @Override - public void registerClosingCallback(Runnable closingCallback) { - assertOpen(); - txManager.registerClosingCallback(closingCallback); - } - - @Override - public void close() { - closeInternal(State.CLOSED); - } - - @VisibleForTesting - boolean isClosedByClose() { - return status == State.CLOSED; - } - - @VisibleForTesting - boolean isClosedByCallbackFailure() { - return status == State.CLOSED_BY_CALLBACK_FAILURE; - } - - private void assertOpen() { - if (status == State.CLOSED) { - throw new SafeIllegalStateException("Operations cannot be performed on closed TransactionManager."); - } - if (status == State.CLOSED_BY_CALLBACK_FAILURE) { - throw new SafeIllegalStateException( - "Operations cannot be performed on closed TransactionManager." - + " Closed due to a callback failure.", - callbackThrowable); - } - } - - private void runCallbackIfInitializedOrScheduleForLater(Runnable callbackTask) { - if (isInitializedInternal()) { - callbackTask.run(); - } else { - scheduleInitializationCheckAndCallback(); - } - } - - private void scheduleInitializationCheckAndCallback() { - executorService.schedule( - () -> { - if (status != State.INITIALIZING) { - return; - } - runCallbackIfInitializedOrScheduleForLater(this::runCallbackWithRetry); - }, - 1_000, - TimeUnit.MILLISECONDS); - } - - private boolean isInitializedInternal() { - // Note that the PersistentLockService is also initialized asynchronously as part of - // TransactionManagers.create; however, this is not required for the TransactionManager to fulfil - // requests (note that it is not accessible from any TransactionManager implementation), so we omit - // checking here whether it is initialized. - return txManager.getKeyValueService().isInitialized() - && txManager.getTimelockService().isInitialized() - && txManager.getTimestampService().isInitialized() - && txManager.getCleaner().isInitialized() - && initializationPrerequisite.get(); - } - - private void runCallbackWithRetry() { - try { - callback.runWithRetry(txManager); - changeStateToReady(); - } catch (Throwable e) { - changeStateToCallbackFailure(e); - } - } - - private void changeStateToReady() { - if (checkAndSetStatus(ImmutableSet.of(State.INITIALIZING), State.READY)) { - executorService.shutdown(); - } - } - - private void changeStateToCallbackFailure(Throwable th) { - log.error( - "Callback failed and was not able to perform its cleanup task. " - + "Closing the TransactionManager.", - th); - callbackThrowable = th; - closeInternal(State.CLOSED_BY_CALLBACK_FAILURE); - } - - private void closeInternal(State newStatus) { - if (checkAndSetStatus(ImmutableSet.of(State.INITIALIZING, State.READY), newStatus)) { - callback.blockUntilSafeToShutdown(); - executorService.shutdown(); - txManager.close(); - } - } - - private synchronized boolean checkAndSetStatus(Set expected, State desired) { - if (expected.contains(status)) { - status = desired; - return true; - } - return false; - } - - private enum State { - INITIALIZING, - READY, - CLOSED, - CLOSED_BY_CALLBACK_FAILURE - } - } - public static TransactionManager createInstrumented( MetricsManager metricsManager, TransactionKeyValueServiceManager transactionKeyValueServiceManager, diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java index 70fba329f4e..5bbc00c78d0 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java @@ -373,7 +373,7 @@ protected final CommitTimestampLoader createCommitTimestampLoader( knowledge); } - protected final KeyValueSnapshotReader createDefaultSnapshotReader( + protected KeyValueSnapshotReader createDefaultSnapshotReader( LongSupplier startTimestampSupplier, TransactionKeyValueService transactionKeyValueService, CommitTimestampLoader commitTimestampLoader, diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java index d6e73a44987..8ae2b2f5a7f 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java @@ -146,8 +146,7 @@ public void closeShutsDownInitializingExecutorAndClosesTransactionManager() { assertThat(executorService.isShutdown()).isTrue(); assertThatThrownBy(() -> manager.runTaskWithRetry(ignore -> null)).isInstanceOf(IllegalStateException.class); - assertThat(((SerializableTransactionManager.InitializeCheckingWrapper) manager).isClosedByClose()) - .isTrue(); + assertThat(((InitializeCheckingWrapper) manager).isClosedByClose()).isTrue(); } @Test @@ -158,8 +157,7 @@ public void closePreventsInitializationAndCallbacksEvenIfExecutorStillTicks() { verify(mockCallback, never()).runWithRetry(any(SerializableTransactionManager.class)); assertThatThrownBy(() -> manager.runTaskWithRetry(ignore -> null)).isInstanceOf(IllegalStateException.class); - assertThat(((SerializableTransactionManager.InitializeCheckingWrapper) manager).isClosedByClose()) - .isTrue(); + assertThat(((InitializeCheckingWrapper) manager).isClosedByClose()).isTrue(); } @Test @@ -201,7 +199,7 @@ public void exceptionInCleanupClosesTransactionManager() { everythingInitialized(); tickInitializingThread(); - assertThat(((SerializableTransactionManager.InitializeCheckingWrapper) manager).isClosedByCallbackFailure()) + assertThat(((InitializeCheckingWrapper) manager).isClosedByCallbackFailure()) .isTrue(); assertThatThrownBy(() -> manager.runTaskWithRetry($ -> null)) .isInstanceOf(IllegalStateException.class)