diff --git a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java index e84e407536a..c4cb08ba2b9 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Metadata.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Metadata.java @@ -36,13 +36,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -916,8 +916,8 @@ public int getShardForTabletToken( UUID targetHostUuid = host.getHostId(); long tokenValue = (long) token.getValue(); TabletMap.KeyspaceTableNamePair key = new TabletMap.KeyspaceTableNamePair(keyspace, table); - NavigableSet targetTablets = tabletMap.getMapping().get(key); - if (targetTablets == null) { + TabletMap.TabletSet targetSet = tabletMap.getMapping().get(key); + if (targetSet == null) { logger.trace( "Could not determine shard for token {} on host {} because table {}.{} is not present in tablets " + "metadata. Returning -1.", @@ -927,13 +927,20 @@ public int getShardForTabletToken( table); return -1; } - TabletMap.Tablet row = targetTablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue)); - if (row != null && row.getFirstToken() < tokenValue) { - for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) { - if (hostShardPair.getHost().equals(targetHostUuid)) { - return hostShardPair.getShard(); + Lock readLock = targetSet.lock.readLock(); + try { + readLock.lock(); + TabletMap.Tablet row = + targetSet.tablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue)); + if (row != null && row.getFirstToken() < tokenValue) { + for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) { + if (hostShardPair.getHost().equals(targetHostUuid)) { + return hostShardPair.getShard(); + } } } + } finally { + readLock.unlock(); } logger.trace( "Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning -1.", diff --git a/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java b/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java index 38d424b8c20..0920802c595 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java +++ b/driver-core/src/main/java/com/datastax/driver/core/TabletMap.java @@ -14,6 +14,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +27,9 @@ public class TabletMap { private static final Logger logger = LoggerFactory.getLogger(TabletMap.class); - private final ConcurrentMap> mapping; + // There are no additional locking mechanisms for the mapping field itself, however each TabletSet + // inside has its own ReadWriteLock that should be used when dealing with its internals. + private final ConcurrentMap mapping; private final Cluster.Manager cluster; @@ -34,7 +38,7 @@ public class TabletMap { private TypeCodec tabletPayloadCodec = null; public TabletMap( - Cluster.Manager cluster, ConcurrentMap> mapping) { + Cluster.Manager cluster, ConcurrentMap mapping) { this.cluster = cluster; this.mapping = mapping; } @@ -46,9 +50,9 @@ public static TabletMap emptyMap(Cluster.Manager cluster) { /** * Returns the mapping of tables to their tablets. * - * @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type. + * @return the Map keyed by (keyspace,table) pairs with {@link TabletSet} as value type. */ - public Map> getMapping() { + public Map getMapping() { return mapping; } @@ -68,28 +72,34 @@ public Set getReplicas(String keyspace, String table, long token) { return Collections.emptySet(); } - NavigableSet set = mapping.get(key); - if (set == null) { + TabletSet tabletSet = mapping.get(key); + if (tabletSet == null) { logger.trace( "There is no tablets for {}.{} in this mapping. Returning empty set.", keyspace, table); return Collections.emptySet(); } - Tablet row = mapping.get(key).ceiling(Tablet.malformedTablet(token)); - if (row == null || row.firstToken >= token) { - logger.trace( - "Could not find tablet for {}.{} that owns token {}. Returning empty set.", - keyspace, - table, - token); - return Collections.emptySet(); - } + Lock readLock = tabletSet.lock.readLock(); + try { + readLock.lock(); + Tablet row = mapping.get(key).tablets.ceiling(Tablet.malformedTablet(token)); + if (row == null || row.firstToken >= token) { + logger.trace( + "Could not find tablet for {}.{} that owns token {}. Returning empty set.", + keyspace, + table, + token); + return Collections.emptySet(); + } - HashSet uuidSet = new HashSet<>(); - for (HostShardPair hostShardPair : row.replicas) { - if (cluster.metadata.getHost(hostShardPair.getHost()) != null) - uuidSet.add(hostShardPair.getHost()); + HashSet uuidSet = new HashSet<>(); + for (HostShardPair hostShardPair : row.replicas) { + if (cluster.metadata.getHost(hostShardPair.getHost()) != null) + uuidSet.add(hostShardPair.getHost()); + } + return uuidSet; + } finally { + readLock.unlock(); } - return uuidSet; } /** @@ -121,46 +131,47 @@ void processTabletsRoutingV1Payload(String keyspace, String table, ByteBuffer pa HostShardPair hostShardPair = new HostShardPair(tuple.getUUID(0), tuple.getInt(1)); replicas.add(hostShardPair); } - - // Working on a copy to avoid concurrent modification of the same set - NavigableSet existingTablets = - new TreeSet<>(mapping.computeIfAbsent(ktPair, k -> new TreeSet<>())); - - // Single tablet token range is represented by (firstToken, lastToken] interval - // We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing - // tablets - // and then by looking at firstToken of existing tablets. Currently, the tablets are sorted - // according - // to their lastTokens. - - // First sweep: remove all tablets whose lastToken is inside this interval - Iterator it = - existingTablets.headSet(Tablet.malformedTablet(lastToken), true).descendingIterator(); - while (it.hasNext()) { - Tablet tablet = it.next(); - if (tablet.lastToken <= firstToken) { - break; + Tablet newTablet = new Tablet(keyspace, null, table, firstToken, lastToken, replicas); + + TabletSet tabletSet = mapping.computeIfAbsent(ktPair, k -> new TabletSet()); + Lock writeLock = tabletSet.lock.writeLock(); + try { + writeLock.lock(); + NavigableSet currentTablets = tabletSet.tablets; + // Single tablet token range is represented by (firstToken, lastToken] interval + // We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing + // tablets + // and then by looking at firstToken of existing tablets. Currently, the tablets are sorted + // according + // to their lastTokens. + + // First sweep: remove all tablets whose lastToken is inside this interval + Iterator it = currentTablets.headSet(newTablet, true).descendingIterator(); + while (it.hasNext()) { + Tablet tablet = it.next(); + if (tablet.lastToken <= firstToken) { + break; + } + it.remove(); } - it.remove(); - } - // Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken, - // lastToken] - // After the first sweep, this theoretically should remove at most 1 tablet - it = existingTablets.tailSet(Tablet.malformedTablet(lastToken), true).iterator(); - while (it.hasNext()) { - Tablet tablet = it.next(); - if (tablet.firstToken >= lastToken) { - break; + // Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken, + // lastToken] + // After the first sweep, this theoretically should remove at most 1 tablet + it = currentTablets.tailSet(newTablet, true).iterator(); + while (it.hasNext()) { + Tablet tablet = it.next(); + if (tablet.firstToken >= lastToken) { + break; + } + it.remove(); } - it.remove(); - } - // Add new (now) non-overlapping tablet - existingTablets.add(new Tablet(keyspace, null, table, firstToken, lastToken, replicas)); - - // Set the updated result in the main map - mapping.put(ktPair, existingTablets); + // Add new (now) non-overlapping tablet + currentTablets.add(newTablet); + } finally { + writeLock.unlock(); + } } public TupleType getPayloadOuterTuple() { @@ -258,6 +269,18 @@ public int hashCode() { } } + /** + * Set of tablets bundled with ReadWriteLock to allow concurrent modification for different sets. + */ + public static class TabletSet { + final NavigableSet tablets; + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + public TabletSet() { + this.tablets = new TreeSet<>(); + } + } + /** * Represents a single tablet created from tablets-routing-v1 custom payload. Its {@code * compareTo} implementation intentionally relies solely on {@code lastToken} in order to allow