Skip to content

Commit

Permalink
Add ReadWriteLock for concurrent tablet modifications
Browse files Browse the repository at this point in the history
Each table now has a dedicated lock for its set of tablets.
Multiple sets can be modified concurrently. Most importantly without
copying the whole set.
Should improve times, mostly in cases of large amount of tablets and
reduce number of misses during tablet shuffling.
  • Loading branch information
Bouncheck committed Nov 9, 2024
1 parent 07d0f66 commit 31a7a72
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 64 deletions.
23 changes: 15 additions & 8 deletions driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -916,8 +916,8 @@ public int getShardForTabletToken(
UUID targetHostUuid = host.getHostId();
long tokenValue = (long) token.getValue();
TabletMap.KeyspaceTableNamePair key = new TabletMap.KeyspaceTableNamePair(keyspace, table);
NavigableSet<TabletMap.Tablet> targetTablets = tabletMap.getMapping().get(key);
if (targetTablets == null) {
TabletMap.TabletSet targetSet = tabletMap.getMapping().get(key);
if (targetSet == null) {
logger.trace(
"Could not determine shard for token {} on host {} because table {}.{} is not present in tablets "
+ "metadata. Returning -1.",
Expand All @@ -927,13 +927,20 @@ public int getShardForTabletToken(
table);
return -1;
}
TabletMap.Tablet row = targetTablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue));
if (row != null && row.getFirstToken() < tokenValue) {
for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) {
if (hostShardPair.getHost().equals(targetHostUuid)) {
return hostShardPair.getShard();
Lock readLock = targetSet.lock.readLock();
try {
readLock.lock();
TabletMap.Tablet row =
targetSet.tablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue));
if (row != null && row.getFirstToken() < tokenValue) {
for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) {
if (hostShardPair.getHost().equals(targetHostUuid)) {
return hostShardPair.getShard();
}
}
}
} finally {
readLock.unlock();
}
logger.trace(
"Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning -1.",
Expand Down
135 changes: 79 additions & 56 deletions driver-core/src/main/java/com/datastax/driver/core/TabletMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -25,7 +27,9 @@
public class TabletMap {
private static final Logger logger = LoggerFactory.getLogger(TabletMap.class);

private final ConcurrentMap<KeyspaceTableNamePair, NavigableSet<Tablet>> mapping;
// There are no additional locking mechanisms for the mapping field itself, however each TabletSet
// inside has its own ReadWriteLock that should be used when dealing with its internals.
private final ConcurrentMap<KeyspaceTableNamePair, TabletSet> mapping;

private final Cluster.Manager cluster;

Expand All @@ -34,7 +38,7 @@ public class TabletMap {
private TypeCodec<TupleValue> tabletPayloadCodec = null;

public TabletMap(
Cluster.Manager cluster, ConcurrentMap<KeyspaceTableNamePair, NavigableSet<Tablet>> mapping) {
Cluster.Manager cluster, ConcurrentMap<KeyspaceTableNamePair, TabletSet> mapping) {
this.cluster = cluster;
this.mapping = mapping;
}
Expand All @@ -46,9 +50,9 @@ public static TabletMap emptyMap(Cluster.Manager cluster) {
/**
* Returns the mapping of tables to their tablets.
*
* @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type.
* @return the Map keyed by (keyspace,table) pairs with {@link TabletSet} as value type.
*/
public Map<KeyspaceTableNamePair, NavigableSet<Tablet>> getMapping() {
public Map<KeyspaceTableNamePair, TabletSet> getMapping() {
return mapping;
}

Expand All @@ -68,28 +72,34 @@ public Set<UUID> getReplicas(String keyspace, String table, long token) {
return Collections.emptySet();
}

NavigableSet<Tablet> set = mapping.get(key);
if (set == null) {
TabletSet tabletSet = mapping.get(key);
if (tabletSet == null) {
logger.trace(
"There is no tablets for {}.{} in this mapping. Returning empty set.", keyspace, table);
return Collections.emptySet();
}
Tablet row = mapping.get(key).ceiling(Tablet.malformedTablet(token));
if (row == null || row.firstToken >= token) {
logger.trace(
"Could not find tablet for {}.{} that owns token {}. Returning empty set.",
keyspace,
table,
token);
return Collections.emptySet();
}
Lock readLock = tabletSet.lock.readLock();
try {
readLock.lock();
Tablet row = mapping.get(key).tablets.ceiling(Tablet.malformedTablet(token));
if (row == null || row.firstToken >= token) {
logger.trace(
"Could not find tablet for {}.{} that owns token {}. Returning empty set.",
keyspace,
table,
token);
return Collections.emptySet();
}

HashSet<UUID> uuidSet = new HashSet<>();
for (HostShardPair hostShardPair : row.replicas) {
if (cluster.metadata.getHost(hostShardPair.getHost()) != null)
uuidSet.add(hostShardPair.getHost());
HashSet<UUID> uuidSet = new HashSet<>();
for (HostShardPair hostShardPair : row.replicas) {
if (cluster.metadata.getHost(hostShardPair.getHost()) != null)
uuidSet.add(hostShardPair.getHost());
}
return uuidSet;
} finally {
readLock.unlock();
}
return uuidSet;
}

/**
Expand Down Expand Up @@ -121,46 +131,47 @@ void processTabletsRoutingV1Payload(String keyspace, String table, ByteBuffer pa
HostShardPair hostShardPair = new HostShardPair(tuple.getUUID(0), tuple.getInt(1));
replicas.add(hostShardPair);
}

// Working on a copy to avoid concurrent modification of the same set
NavigableSet<Tablet> existingTablets =
new TreeSet<>(mapping.computeIfAbsent(ktPair, k -> new TreeSet<>()));

// Single tablet token range is represented by (firstToken, lastToken] interval
// We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing
// tablets
// and then by looking at firstToken of existing tablets. Currently, the tablets are sorted
// according
// to their lastTokens.

// First sweep: remove all tablets whose lastToken is inside this interval
Iterator<Tablet> it =
existingTablets.headSet(Tablet.malformedTablet(lastToken), true).descendingIterator();
while (it.hasNext()) {
Tablet tablet = it.next();
if (tablet.lastToken <= firstToken) {
break;
Tablet newTablet = new Tablet(keyspace, null, table, firstToken, lastToken, replicas);

TabletSet tabletSet = mapping.computeIfAbsent(ktPair, k -> new TabletSet());
Lock writeLock = tabletSet.lock.writeLock();
try {
writeLock.lock();
NavigableSet<Tablet> currentTablets = tabletSet.tablets;
// Single tablet token range is represented by (firstToken, lastToken] interval
// We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing
// tablets
// and then by looking at firstToken of existing tablets. Currently, the tablets are sorted
// according
// to their lastTokens.

// First sweep: remove all tablets whose lastToken is inside this interval
Iterator<Tablet> it = currentTablets.headSet(newTablet, true).descendingIterator();
while (it.hasNext()) {
Tablet tablet = it.next();
if (tablet.lastToken <= firstToken) {
break;
}
it.remove();
}
it.remove();
}

// Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken,
// lastToken]
// After the first sweep, this theoretically should remove at most 1 tablet
it = existingTablets.tailSet(Tablet.malformedTablet(lastToken), true).iterator();
while (it.hasNext()) {
Tablet tablet = it.next();
if (tablet.firstToken >= lastToken) {
break;
// Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken,
// lastToken]
// After the first sweep, this theoretically should remove at most 1 tablet
it = currentTablets.tailSet(newTablet, true).iterator();
while (it.hasNext()) {
Tablet tablet = it.next();
if (tablet.firstToken >= lastToken) {
break;
}
it.remove();
}
it.remove();
}

// Add new (now) non-overlapping tablet
existingTablets.add(new Tablet(keyspace, null, table, firstToken, lastToken, replicas));

// Set the updated result in the main map
mapping.put(ktPair, existingTablets);
// Add new (now) non-overlapping tablet
currentTablets.add(newTablet);
} finally {
writeLock.unlock();
}
}

public TupleType getPayloadOuterTuple() {
Expand Down Expand Up @@ -258,6 +269,18 @@ public int hashCode() {
}
}

/**
* Set of tablets bundled with ReadWriteLock to allow concurrent modification for different sets.
*/
public static class TabletSet {
final NavigableSet<Tablet> tablets;
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

public TabletSet() {
this.tablets = new TreeSet<>();
}
}

/**
* Represents a single tablet created from tablets-routing-v1 custom payload. Its {@code
* compareTo} implementation intentionally relies solely on {@code lastToken} in order to allow
Expand Down

0 comments on commit 31a7a72

Please sign in to comment.