Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Direct: Adds Direct stack 3.37.6 upgrade #4961

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<ClientOfficialVersion>3.46.1</ClientOfficialVersion>
<ClientPreviewVersion>3.47.0</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview.1</ClientPreviewSuffixVersion>
<DirectVersion>3.37.5</DirectVersion>
<DirectVersion>3.37.6</DirectVersion>
<FaultInjectionVersion>1.0.0</FaultInjectionVersion>
<FaultInjectionSuffixVersion>beta.0</FaultInjectionSuffixVersion>
<EncryptionOfficialVersion>2.0.4</EncryptionOfficialVersion>
Expand Down
24 changes: 12 additions & 12 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6652,17 +6652,6 @@ private bool IsValidConsistency(

private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory)
{
this.AddressResolver = new GlobalAddressResolver(
this.GlobalEndpointManager,
this.PartitionKeyRangeLocation,
this.ConnectionPolicy.ConnectionProtocol,
this,
this.collectionCache,
this.partitionKeyRangeCache,
this.accountServiceConfiguration,
this.ConnectionPolicy,
this.httpClient);

// Check if we have a store client factory in input and if we do, do not initialize another store client
// The purpose is to reuse store client factory across all document clients inside compute gateway
if (storeClientFactory != null)
Expand Down Expand Up @@ -6704,7 +6693,6 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory
sendHangDetectionTimeSeconds: this.rntbdSendHangDetectionTimeSeconds,
retryWithConfiguration: this.ConnectionPolicy.RetryOptions?.GetRetryWithConfiguration(),
enableTcpConnectionEndpointRediscovery: this.ConnectionPolicy.EnableTcpConnectionEndpointRediscovery,
addressResolver: this.AddressResolver,
rntbdMaxConcurrentOpeningConnectionCount: this.rntbdMaxConcurrentOpeningConnectionCount,
remoteCertificateValidationCallback: this.remoteCertificateValidationCallback,
distributedTracingOptions: distributedTracingOptions,
Expand All @@ -6719,6 +6707,18 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory
this.isStoreClientFactoryCreatedInternally = true;
}

this.AddressResolver = new GlobalAddressResolver(
this.GlobalEndpointManager,
this.PartitionKeyRangeLocation,
this.ConnectionPolicy.ConnectionProtocol,
this,
this.collectionCache,
this.partitionKeyRangeCache,
this.accountServiceConfiguration,
this.ConnectionPolicy,
this.httpClient,
this.storeClientFactory.GetConnectionStateListener());

this.CreateStoreModel(subscribeRntbdStatus: true);
}

Expand Down
40 changes: 36 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ internal class GatewayAddressCache : IAddressCache, IDisposable
private readonly SemaphoreSlim semaphore;
private readonly CosmosHttpClient httpClient;
private readonly bool isReplicaAddressValidationEnabled;
private readonly IConnectionStateListener connectionStateListener;

private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
private DateTime suboptimalMasterPartitionTimestamp;
Expand All @@ -67,6 +68,7 @@ public GatewayAddressCache(
IServiceConfigurationReader serviceConfigReader,
CosmosHttpClient httpClient,
IOpenConnectionsHandler openConnectionsHandler,
IConnectionStateListener connectionStateListener,
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
bool enableTcpConnectionEndpointRediscovery = false,
bool replicaAddressValidationEnabled = false)
Expand All @@ -81,6 +83,7 @@ public GatewayAddressCache(
this.serverPartitionAddressToPkRangeIdMap = new ConcurrentDictionary<ServerKey, HashSet<PartitionKeyRangeIdentity>>();
this.suboptimalMasterPartitionTimestamp = DateTime.MaxValue;
this.enableTcpConnectionEndpointRediscovery = enableTcpConnectionEndpointRediscovery;
this.connectionStateListener = connectionStateListener;

this.suboptimalPartitionForceRefreshIntervalInSeconds = suboptimalPartitionForceRefreshIntervalInSeconds;

Expand Down Expand Up @@ -496,6 +499,13 @@ private static void LogPartitionCacheRefresh(
public async Task MarkAddressesToUnhealthyAsync(
ServerKey serverKey)
{
if (this.disposedValue)
{
// Will enable Listener to un-register in-case of un-graceful dispose
// <see cref="ConnectionStateMuxListener.NotifyAsync(ServerKey, ConcurrentDictionary{Func{ServerKey, Task}, object})"/>
throw new ObjectDisposedException(nameof(GatewayAddressCache));
}

if (serverKey == null)
{
throw new ArgumentNullException(nameof(serverKey));
Expand Down Expand Up @@ -538,6 +548,9 @@ where serverKey.Equals(transportAddress.ReplicaServerKey)

address.SetUnhealthy();
}

// Update the health status
this.CaptureTransportAddressUriHealthStates(addressInfo, transportAddresses);
}
}
}
Expand Down Expand Up @@ -828,9 +841,21 @@ internal Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> ToPartiti
partitionKeyRangeIdentity.PartitionKeyRangeId,
addressInfo.PhysicalUri);

HashSet<PartitionKeyRangeIdentity> createdValue = null;
ServerKey serverKey = new ServerKey(new Uri(addressInfo.PhysicalUri));
HashSet<PartitionKeyRangeIdentity> pkRangeIdSet = this.serverPartitionAddressToPkRangeIdMap.GetOrAdd(
new ServerKey(new Uri(addressInfo.PhysicalUri)),
(_) => new HashSet<PartitionKeyRangeIdentity>());
serverKey,
(_) =>
{
createdValue = new HashSet<PartitionKeyRangeIdentity>();
return createdValue;
});

if (object.ReferenceEquals(pkRangeIdSet, createdValue))
{
this.connectionStateListener.Register(serverKey, this.MarkAddressesToUnhealthyAsync);
}

lock (pkRangeIdSet)
{
pkRangeIdSet.Add(partitionKeyRangeIdentity);
Expand Down Expand Up @@ -893,7 +918,7 @@ private static void LogAddressResolutionEnd(DocumentServiceRequest request, stri

private static Protocol ProtocolFromString(string protocol)
{
return (protocol.ToLowerInvariant()) switch
return protocol.ToLowerInvariant() switch
{
RuntimeConstants.Protocols.HTTPS => Protocol.Https,
RuntimeConstants.Protocols.RNTBD => Protocol.Tcp,
Expand All @@ -903,7 +928,7 @@ private static Protocol ProtocolFromString(string protocol)

private static string ProtocolString(Protocol protocol)
{
return ((int)protocol) switch
return (int)protocol switch
{
(int)Protocol.Https => RuntimeConstants.Protocols.HTTPS,
(int)Protocol.Tcp => RuntimeConstants.Protocols.RNTBD,
Expand Down Expand Up @@ -1071,11 +1096,18 @@ protected virtual void Dispose(bool disposing)
{
if (this.disposedValue)
{
DefaultTrace.TraceInformation("GatewayAddressCache is already disposed {0}", this.GetHashCode());
return;
}

if (disposing)
{
// Unregister the server-key
foreach (ServerKey serverKey in this.serverPartitionAddressToPkRangeIdMap.Keys)
{
this.connectionStateListener.UnRegister(serverKey, this.MarkAddressesToUnhealthyAsync);
}

this.serverPartitionAddressCache?.Dispose();
}

Expand Down
19 changes: 5 additions & 14 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly bool isReplicaAddressValidationEnabled;
private readonly IConnectionStateListener connectionStateListener;
private IOpenConnectionsHandler openConnectionsHandler;

public GlobalAddressResolver(
Expand All @@ -50,7 +51,8 @@ public GlobalAddressResolver(
PartitionKeyRangeCache routingMapProvider,
IServiceConfigurationReader serviceConfigReader,
ConnectionPolicy connectionPolicy,
CosmosHttpClient httpClient)
CosmosHttpClient httpClient,
IConnectionStateListener connectionStateListener)
{
this.endpointManager = endpointManager;
this.partitionKeyRangeLocationCache = partitionKeyRangeLocationCache;
Expand All @@ -60,6 +62,7 @@ public GlobalAddressResolver(
this.routingMapProvider = routingMapProvider;
this.serviceConfigReader = serviceConfigReader;
this.httpClient = httpClient;
this.connectionStateListener = connectionStateListener;

int maxBackupReadEndpoints =
!connectionPolicy.EnableReadRequestsFallback.HasValue || connectionPolicy.EnableReadRequestsFallback.Value
Expand Down Expand Up @@ -229,19 +232,6 @@ public async Task<PartitionAddressInformation> ResolveAsync(
return await resolver.ResolveAsync(request, forceRefresh, cancellationToken);
}

public async Task UpdateAsync(
ananth7592 marked this conversation as resolved.
Show resolved Hide resolved
ServerKey serverKey,
CancellationToken cancellationToken)
{
foreach (KeyValuePair<Uri, EndpointCache> addressCache in this.addressCacheByEndpoint)
{
// since we don't know which address cache contains the pkRanges mapped to this node,
// we mark all transport uris that has the same server key to unhealthy status in the
// AddressCaches of all regions.
await addressCache.Value.AddressCache.MarkAddressesToUnhealthyAsync(serverKey);
}
}

/// <summary>
/// ReplicatedResourceClient will use this API to get the direct connectivity AddressCache for given request.
/// </summary>
Expand Down Expand Up @@ -283,6 +273,7 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.serviceConfigReader,
this.httpClient,
this.openConnectionsHandler,
this.connectionStateListener,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled);

Expand Down
Loading
Loading