Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[WIP] KeyValueSnapshotReaderFactory #7004

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.palantir.atlasdb.keyvalue.api.ColumnRangeSelection;
import com.palantir.atlasdb.keyvalue.api.ColumnSelection;
import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.RangeRequest;
import com.palantir.atlasdb.keyvalue.api.RowColumnRangeIterator;
import com.palantir.atlasdb.keyvalue.api.RowResult;
Expand Down Expand Up @@ -69,4 +70,6 @@ RowColumnRangeIterator getRowsColumnRange(

void multiPut(Map<TableReference, ? extends Map<Cell, byte[]>> valuesByTable, long timestamp)
throws KeyAlreadyExistsException;

KeyValueService getDelegate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not allowed in here.

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,9 @@ public void multiPut(Map<TableReference, ? extends Map<Cell, byte[]>> valuesByTa
throws KeyAlreadyExistsException {
delegate.multiPut(valuesByTable, timestamp);
}

@Override
public KeyValueService getDelegate() {
return delegate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ public static NamespaceDeleterFactory createNamespaceDeleterFactoryOfCorrectType
config, NamespaceDeleterFactory::getType, NamespaceDeleterFactory.class);
}

public static TransactionManagerFactory createTransactionManagerFactoryOfCorrectType(KeyValueServiceConfig config) {
try {
return createAtlasDbServiceOfCorrectType(
config, TransactionManagerFactory::getType, TransactionManagerFactory.class);
} catch (SafeIllegalStateException _e) {
return new DefaultTransactionManagerFactory();
}
}

private static <T> T createAtlasDbServiceOfCorrectType(
KeyValueServiceConfig config, Function<T, String> typeExtractor, Class<T> clazz) {
return createAtlasDbServiceOfCorrectType(config.type(), typeExtractor, clazz);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.factory;

import com.google.auto.service.AutoService;
import com.palantir.async.initializer.Callback;
import com.palantir.atlasdb.cache.TimestampCache;
import com.palantir.atlasdb.cleaner.api.Cleaner;
import com.palantir.atlasdb.debug.ConflictTracer;
import com.palantir.atlasdb.keyvalue.api.watch.LockWatchManagerInternal;
import com.palantir.atlasdb.sweep.queue.MultiTableSweepQueueWriter;
import com.palantir.atlasdb.transaction.TransactionConfig;
import com.palantir.atlasdb.transaction.api.AtlasDbConstraintCheckingMode;
import com.palantir.atlasdb.transaction.api.TransactionKeyValueServiceManager;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.atlasdb.transaction.impl.ConflictDetectionManager;
import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager;
import com.palantir.atlasdb.transaction.impl.SweepStrategyManager;
import com.palantir.atlasdb.transaction.impl.metrics.MetricsFilterEvaluationContext;
import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.lock.LockService;
import com.palantir.lock.v2.TimelockService;
import com.palantir.timestamp.TimestampManagementService;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

@AutoService(TransactionManagerFactory.class)
public class DefaultTransactionManagerFactory implements TransactionManagerFactory {
public static final String TYPE = "default";

@Override
public String getType() {
return TYPE;
}

@Override
public TransactionManager createInstrumented(
MetricsManager metricsManager,
TransactionKeyValueServiceManager transactionKeyValueServiceManager,
TimelockService timelockService,
LockWatchManagerInternal lockWatchManager,
TimestampManagementService timestampManagementService,
LockService lockService,
TransactionService transactionService,
Supplier<AtlasDbConstraintCheckingMode> constraintModeSupplier,
ConflictDetectionManager conflictDetectionManager,
SweepStrategyManager sweepStrategyManager,
Cleaner cleaner,
Supplier<Boolean> initializationPrerequisite,
boolean allowHiddenTableAccess,
int concurrentGetRangesThreadPoolSize,
int defaultGetRangesConcurrency,
boolean initializeAsync,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
Callback<TransactionManager> callback,
boolean validateLocksOnReads,
Supplier<TransactionConfig> transactionConfig,
ConflictTracer conflictTracer,
MetricsFilterEvaluationContext metricsFilterEvaluationContext,
Optional<Integer> sharedGetRangesPoolSize,
TransactionKnowledgeComponents knowledge) {
return SerializableTransactionManager.createInstrumented(
metricsManager,
transactionKeyValueServiceManager,
timelockService,
lockWatchManager,
timestampManagementService,
lockService,
transactionService,
constraintModeSupplier,
conflictDetectionManager,
sweepStrategyManager,
cleaner,
initializationPrerequisite,
allowHiddenTableAccess,
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
initializeAsync,
timestampCache,
sweepQueueWriter,
callback,
validateLocksOnReads,
transactionConfig,
conflictTracer,
metricsFilterEvaluationContext,
sharedGetRangesPoolSize,
knowledge);
}

@Override
public TransactionManager create(
MetricsManager metricsManager,
TransactionKeyValueServiceManager transactionKeyValueServiceManager,
TimelockService timelockService,
LockWatchManagerInternal lockWatchManager,
TimestampManagementService timestampManagementService,
LockService lockService,
TransactionService transactionService,
Supplier<AtlasDbConstraintCheckingMode> constraintModeSupplier,
ConflictDetectionManager conflictDetectionManager,
SweepStrategyManager sweepStrategyManager,
Cleaner cleaner,
Supplier<Boolean> initializationPrerequisite,
boolean allowHiddenTableAccess,
int concurrentGetRangesThreadPoolSize,
int defaultGetRangesConcurrency,
boolean initializeAsync,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
Callback<TransactionManager> callback,
ScheduledExecutorService initializer,
boolean validateLocksOnReads,
Supplier<TransactionConfig> transactionConfig,
ConflictTracer conflictTracer,
MetricsFilterEvaluationContext metricsFilterEvaluationContext,
Optional<Integer> sharedGetRangesPoolSize,
TransactionKnowledgeComponents knowledge) {
return SerializableTransactionManager.create(
metricsManager,
transactionKeyValueServiceManager,
timelockService,
lockWatchManager,
timestampManagementService,
lockService,
transactionService,
constraintModeSupplier,
conflictDetectionManager,
sweepStrategyManager,
cleaner,
initializationPrerequisite,
allowHiddenTableAccess,
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
initializeAsync,
timestampCache,
sweepQueueWriter,
callback,
validateLocksOnReads,
transactionConfig,
conflictTracer,
metricsFilterEvaluationContext,
sharedGetRangesPoolSize,
knowledge);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.factory;

import com.palantir.async.initializer.Callback;
import com.palantir.atlasdb.cache.TimestampCache;
import com.palantir.atlasdb.cleaner.api.Cleaner;
import com.palantir.atlasdb.debug.ConflictTracer;
import com.palantir.atlasdb.keyvalue.api.watch.LockWatchManagerInternal;
import com.palantir.atlasdb.sweep.queue.MultiTableSweepQueueWriter;
import com.palantir.atlasdb.transaction.TransactionConfig;
import com.palantir.atlasdb.transaction.api.AtlasDbConstraintCheckingMode;
import com.palantir.atlasdb.transaction.api.TransactionKeyValueServiceManager;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.atlasdb.transaction.impl.ConflictDetectionManager;
import com.palantir.atlasdb.transaction.impl.SweepStrategyManager;
import com.palantir.atlasdb.transaction.impl.metrics.MetricsFilterEvaluationContext;
import com.palantir.atlasdb.transaction.knowledge.TransactionKnowledgeComponents;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.lock.LockService;
import com.palantir.lock.v2.TimelockService;
import com.palantir.timestamp.TimestampManagementService;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

@SuppressWarnings("TooManyArguments")
public interface TransactionManagerFactory {
String getType();

TransactionManager createInstrumented(
MetricsManager metricsManager,
TransactionKeyValueServiceManager transactionKeyValueServiceManager,
TimelockService timelockService,
LockWatchManagerInternal lockWatchManager,
TimestampManagementService timestampManagementService,
LockService lockService,
TransactionService transactionService,
Supplier<AtlasDbConstraintCheckingMode> constraintModeSupplier,
ConflictDetectionManager conflictDetectionManager,
SweepStrategyManager sweepStrategyManager,
Cleaner cleaner,
Supplier<Boolean> initializationPrerequisite,
boolean allowHiddenTableAccess,
int concurrentGetRangesThreadPoolSize,
int defaultGetRangesConcurrency,
boolean initializeAsync,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
Callback<TransactionManager> callback,
boolean validateLocksOnReads,
Supplier<TransactionConfig> transactionConfig,
ConflictTracer conflictTracer,
MetricsFilterEvaluationContext metricsFilterEvaluationContext,
Optional<Integer> sharedGetRangesPoolSize,
TransactionKnowledgeComponents knowledge);

TransactionManager create(
MetricsManager metricsManager,
TransactionKeyValueServiceManager transactionKeyValueServiceManager,
TimelockService timelockService,
LockWatchManagerInternal lockWatchManager,
TimestampManagementService timestampManagementService,
LockService lockService,
TransactionService transactionService,
Supplier<AtlasDbConstraintCheckingMode> constraintModeSupplier,
ConflictDetectionManager conflictDetectionManager,
SweepStrategyManager sweepStrategyManager,
Cleaner cleaner,
Supplier<Boolean> initializationPrerequisite,
boolean allowHiddenTableAccess,
int concurrentGetRangesThreadPoolSize,
int defaultGetRangesConcurrency,
boolean initializeAsync,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
Callback<TransactionManager> callback,
ScheduledExecutorService initializer,
boolean validateLocksOnReads,
Supplier<TransactionConfig> transactionConfig,
ConflictTracer conflictTracer,
MetricsFilterEvaluationContext metricsFilterEvaluationContext,
Optional<Integer> sharedGetRangesPoolSize,
TransactionKnowledgeComponents knowledge);
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.atlasdb.transaction.impl.ConflictDetectionManager;
import com.palantir.atlasdb.transaction.impl.ConflictDetectionManagers;
import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager;
import com.palantir.atlasdb.transaction.impl.SweepStrategyManager;
import com.palantir.atlasdb.transaction.impl.SweepStrategyManagers;
import com.palantir.atlasdb.transaction.impl.TimelockTimestampServiceAdapter;
Expand Down Expand Up @@ -535,8 +534,10 @@ private TransactionManager serializableInternal(@Output List<AutoCloseable> clos
asyncInitializationCallback(),
createClearsTable()));

TransactionManagerFactory transactionManagerFactory =
AtlasDbServiceDiscovery.createTransactionManagerFactoryOfCorrectType(installConfig);
TransactionManager transactionManager = initializeCloseable(
() -> SerializableTransactionManager.createInstrumented(
() -> transactionManagerFactory.createInstrumented(
metricsManager,
transactionKeyValueServiceManager,
lockAndTimestampServices.timelock(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

final class DefaultTaskExecutors {
public final class DefaultTaskExecutors {
private static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofSeconds(5);
private static final int SINGLE_THREAD = 1;

Expand All @@ -36,7 +36,7 @@ private DefaultTaskExecutors() {
// factory
}

static ExecutorService createDefaultDeleteExecutor() {
public static ExecutorService createDefaultDeleteExecutor() {
return PTExecutors.newThreadPoolExecutor(
SINGLE_THREAD,
SINGLE_THREAD,
Expand Down
Loading