Skip to content

Commit

Permalink
[Internal] Direct: Adds Direct stack 3.37.6 upgrade (#4961)
Browse files Browse the repository at this point in the history
# Pull Request Template

## Description

This PR is for upgrading Direct stack version 3.37.6 the V3 codebase
with 3.37.6.

It contains two changes,
 

https://msdata.visualstudio.com/CosmosDB/_git/CosmosDB/pullrequest/1505615

 : QuorumReader: Fixes bug with BoundedStaleness


https://msdata.visualstudio.com/CosmosDB/_git/CosmosDB/pullrequest/1529101

ConnectionStateListener multiplexing: support multiple CosmosClients
(Sharing StoreClient)

It also brings in a dependent change done in OSS codebase 


https://msdata.visualstudio.com/CosmosDB/_git/CosmosSdkOSSClone/pullrequest/1532400?_a=files&path=/Microsoft.Azure.Cosmos/src/DocumentClient.cs


## Type of change

Please delete options that are not relevant.

- [x] New feature (non-breaking change which adds functionality)
  • Loading branch information
ananth7592 authored Jan 16, 2025
1 parent dc8b9f9 commit b4a4ac0
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<ClientOfficialVersion>3.46.1</ClientOfficialVersion>
<ClientPreviewVersion>3.47.0</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview.1</ClientPreviewSuffixVersion>
<DirectVersion>3.37.5</DirectVersion>
<DirectVersion>3.37.6</DirectVersion>
<FaultInjectionVersion>1.0.0</FaultInjectionVersion>
<FaultInjectionSuffixVersion>beta.0</FaultInjectionSuffixVersion>
<EncryptionOfficialVersion>2.0.4</EncryptionOfficialVersion>
Expand Down
24 changes: 12 additions & 12 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6652,17 +6652,6 @@ private bool IsValidConsistency(

private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory)
{
this.AddressResolver = new GlobalAddressResolver(
this.GlobalEndpointManager,
this.PartitionKeyRangeLocation,
this.ConnectionPolicy.ConnectionProtocol,
this,
this.collectionCache,
this.partitionKeyRangeCache,
this.accountServiceConfiguration,
this.ConnectionPolicy,
this.httpClient);

// Check if we have a store client factory in input and if we do, do not initialize another store client
// The purpose is to reuse store client factory across all document clients inside compute gateway
if (storeClientFactory != null)
Expand Down Expand Up @@ -6704,7 +6693,6 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory
sendHangDetectionTimeSeconds: this.rntbdSendHangDetectionTimeSeconds,
retryWithConfiguration: this.ConnectionPolicy.RetryOptions?.GetRetryWithConfiguration(),
enableTcpConnectionEndpointRediscovery: this.ConnectionPolicy.EnableTcpConnectionEndpointRediscovery,
addressResolver: this.AddressResolver,
rntbdMaxConcurrentOpeningConnectionCount: this.rntbdMaxConcurrentOpeningConnectionCount,
remoteCertificateValidationCallback: this.remoteCertificateValidationCallback,
distributedTracingOptions: distributedTracingOptions,
Expand All @@ -6719,6 +6707,18 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory
this.isStoreClientFactoryCreatedInternally = true;
}

this.AddressResolver = new GlobalAddressResolver(
this.GlobalEndpointManager,
this.PartitionKeyRangeLocation,
this.ConnectionPolicy.ConnectionProtocol,
this,
this.collectionCache,
this.partitionKeyRangeCache,
this.accountServiceConfiguration,
this.ConnectionPolicy,
this.httpClient,
this.storeClientFactory.GetConnectionStateListener());

this.CreateStoreModel(subscribeRntbdStatus: true);
}

Expand Down
40 changes: 36 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ internal class GatewayAddressCache : IAddressCache, IDisposable
private readonly SemaphoreSlim semaphore;
private readonly CosmosHttpClient httpClient;
private readonly bool isReplicaAddressValidationEnabled;
private readonly IConnectionStateListener connectionStateListener;

private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
private DateTime suboptimalMasterPartitionTimestamp;
Expand All @@ -67,6 +68,7 @@ public GatewayAddressCache(
IServiceConfigurationReader serviceConfigReader,
CosmosHttpClient httpClient,
IOpenConnectionsHandler openConnectionsHandler,
IConnectionStateListener connectionStateListener,
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
bool enableTcpConnectionEndpointRediscovery = false,
bool replicaAddressValidationEnabled = false)
Expand All @@ -81,6 +83,7 @@ public GatewayAddressCache(
this.serverPartitionAddressToPkRangeIdMap = new ConcurrentDictionary<ServerKey, HashSet<PartitionKeyRangeIdentity>>();
this.suboptimalMasterPartitionTimestamp = DateTime.MaxValue;
this.enableTcpConnectionEndpointRediscovery = enableTcpConnectionEndpointRediscovery;
this.connectionStateListener = connectionStateListener;

this.suboptimalPartitionForceRefreshIntervalInSeconds = suboptimalPartitionForceRefreshIntervalInSeconds;

Expand Down Expand Up @@ -496,6 +499,13 @@ private static void LogPartitionCacheRefresh(
public async Task MarkAddressesToUnhealthyAsync(
ServerKey serverKey)
{
if (this.disposedValue)
{
// Will enable Listener to un-register in-case of un-graceful dispose
// <see cref="ConnectionStateMuxListener.NotifyAsync(ServerKey, ConcurrentDictionary{Func{ServerKey, Task}, object})"/>
throw new ObjectDisposedException(nameof(GatewayAddressCache));
}

if (serverKey == null)
{
throw new ArgumentNullException(nameof(serverKey));
Expand Down Expand Up @@ -538,6 +548,9 @@ where serverKey.Equals(transportAddress.ReplicaServerKey)

address.SetUnhealthy();
}

// Update the health status
this.CaptureTransportAddressUriHealthStates(addressInfo, transportAddresses);
}
}
}
Expand Down Expand Up @@ -828,9 +841,21 @@ internal Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> ToPartiti
partitionKeyRangeIdentity.PartitionKeyRangeId,
addressInfo.PhysicalUri);

HashSet<PartitionKeyRangeIdentity> createdValue = null;
ServerKey serverKey = new ServerKey(new Uri(addressInfo.PhysicalUri));
HashSet<PartitionKeyRangeIdentity> pkRangeIdSet = this.serverPartitionAddressToPkRangeIdMap.GetOrAdd(
new ServerKey(new Uri(addressInfo.PhysicalUri)),
(_) => new HashSet<PartitionKeyRangeIdentity>());
serverKey,
(_) =>
{
createdValue = new HashSet<PartitionKeyRangeIdentity>();
return createdValue;
});

if (object.ReferenceEquals(pkRangeIdSet, createdValue))
{
this.connectionStateListener.Register(serverKey, this.MarkAddressesToUnhealthyAsync);
}

lock (pkRangeIdSet)
{
pkRangeIdSet.Add(partitionKeyRangeIdentity);
Expand Down Expand Up @@ -893,7 +918,7 @@ private static void LogAddressResolutionEnd(DocumentServiceRequest request, stri

private static Protocol ProtocolFromString(string protocol)
{
return (protocol.ToLowerInvariant()) switch
return protocol.ToLowerInvariant() switch
{
RuntimeConstants.Protocols.HTTPS => Protocol.Https,
RuntimeConstants.Protocols.RNTBD => Protocol.Tcp,
Expand All @@ -903,7 +928,7 @@ private static Protocol ProtocolFromString(string protocol)

private static string ProtocolString(Protocol protocol)
{
return ((int)protocol) switch
return (int)protocol switch
{
(int)Protocol.Https => RuntimeConstants.Protocols.HTTPS,
(int)Protocol.Tcp => RuntimeConstants.Protocols.RNTBD,
Expand Down Expand Up @@ -1071,11 +1096,18 @@ protected virtual void Dispose(bool disposing)
{
if (this.disposedValue)
{
DefaultTrace.TraceInformation("GatewayAddressCache is already disposed {0}", this.GetHashCode());
return;
}

if (disposing)
{
// Unregister the server-key
foreach (ServerKey serverKey in this.serverPartitionAddressToPkRangeIdMap.Keys)
{
this.connectionStateListener.UnRegister(serverKey, this.MarkAddressesToUnhealthyAsync);
}

this.serverPartitionAddressCache?.Dispose();
}

Expand Down
19 changes: 5 additions & 14 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly bool isReplicaAddressValidationEnabled;
private readonly IConnectionStateListener connectionStateListener;
private IOpenConnectionsHandler openConnectionsHandler;

public GlobalAddressResolver(
Expand All @@ -50,7 +51,8 @@ public GlobalAddressResolver(
PartitionKeyRangeCache routingMapProvider,
IServiceConfigurationReader serviceConfigReader,
ConnectionPolicy connectionPolicy,
CosmosHttpClient httpClient)
CosmosHttpClient httpClient,
IConnectionStateListener connectionStateListener)
{
this.endpointManager = endpointManager;
this.partitionKeyRangeLocationCache = partitionKeyRangeLocationCache;
Expand All @@ -60,6 +62,7 @@ public GlobalAddressResolver(
this.routingMapProvider = routingMapProvider;
this.serviceConfigReader = serviceConfigReader;
this.httpClient = httpClient;
this.connectionStateListener = connectionStateListener;

int maxBackupReadEndpoints =
!connectionPolicy.EnableReadRequestsFallback.HasValue || connectionPolicy.EnableReadRequestsFallback.Value
Expand Down Expand Up @@ -229,19 +232,6 @@ public async Task<PartitionAddressInformation> ResolveAsync(
return await resolver.ResolveAsync(request, forceRefresh, cancellationToken);
}

public async Task UpdateAsync(
ServerKey serverKey,
CancellationToken cancellationToken)
{
foreach (KeyValuePair<Uri, EndpointCache> addressCache in this.addressCacheByEndpoint)
{
// since we don't know which address cache contains the pkRanges mapped to this node,
// we mark all transport uris that has the same server key to unhealthy status in the
// AddressCaches of all regions.
await addressCache.Value.AddressCache.MarkAddressesToUnhealthyAsync(serverKey);
}
}

/// <summary>
/// ReplicatedResourceClient will use this API to get the direct connectivity AddressCache for given request.
/// </summary>
Expand Down Expand Up @@ -283,6 +273,7 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.serviceConfigReader,
this.httpClient,
this.openConnectionsHandler,
this.connectionStateListener,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled);

Expand Down
Loading

0 comments on commit b4a4ac0

Please sign in to comment.