From d77dbaa7cc6d8d6b856e3beb2146427e69ae5af0 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Tue, 29 Aug 2023 07:05:50 +0200 Subject: [PATCH] Add Redis member store for ACA provider and manage stale members (#2035) * Add Redis store and manage stale members * Fix ListAsync method of Redis store --- .../AzureContainerAppsClusterMonitor.cs | 295 ++++++++++++++++++ ...DefaultAzureCredentialArmClientProvider.cs | 3 +- .../AzureContainerAppsProvider.cs | 179 ----------- .../AzureContainerAppsProvider.cs | 96 ++++++ .../{ => Contracts}/IArmClientProvider.cs | 2 +- .../{ => Contracts}/IClusterMemberStore.cs | 22 +- .../IContainerAppMetadataAccessor.cs | 16 + .../Contracts/ISystemClock.cs | 14 + .../ServiceCollectionExtensions.cs | 34 +- .../Messages/RegisterMember.cs | 8 + .../Messages/UnregisterMember.cs | 6 + .../Messages/UpdateMembers.cs | 6 + .../Models/ContainerAppMetadata.cs | 9 + .../Models/StoredMember.cs | 42 +++ .../AzureContainerAppsProviderOptions.cs | 12 +- ...eContainerAppsProviderOptionsValidator.cs} | 2 +- .../Proto.Cluster.AzureContainerApps.csproj | 1 + .../Services/DefaultSystemClock.cs | 11 + ...EnvironmentContainerAppMetadataAccessor.cs | 28 ++ .../IRedisConnectionMultiplexerProvider.cs | 9 + .../Stores/Redis/RedisClusterMemberStore.cs | 85 +++++ .../RedisConnectionMultiplexerProvider.cs | 21 ++ .../Redis/ServiceCollectionExtensions.cs | 25 ++ .../Stores/ResourceTags/ResourceTagNames.cs | 6 +- .../ResourceTagsClusterMemberStore.cs | 55 +++- .../ServiceCollectionExtensions.cs | 19 +- .../Stores/ResourceTags/TaggedMember.cs | 4 +- .../{ => Utils}/ArmClientProviders.cs | 4 +- .../{IPAddressUtils.cs => Utils/IPUtils.cs} | 24 +- 29 files changed, 807 insertions(+), 231 deletions(-) create mode 100644 src/Proto.Cluster.AzureContainerApps/Actors/AzureContainerAppsClusterMonitor.cs rename src/Proto.Cluster.AzureContainerApps/{ => ArmClientProviders}/DefaultAzureCredentialArmClientProvider.cs (83%) delete mode 100644 src/Proto.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/ClusterProviders/AzureContainerAppsProvider.cs rename src/Proto.Cluster.AzureContainerApps/{ => Contracts}/IArmClientProvider.cs (89%) rename src/Proto.Cluster.AzureContainerApps/{ => Contracts}/IClusterMemberStore.cs (62%) create mode 100644 src/Proto.Cluster.AzureContainerApps/Contracts/IContainerAppMetadataAccessor.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Contracts/ISystemClock.cs rename src/Proto.Cluster.AzureContainerApps/{ => Extensions}/ServiceCollectionExtensions.cs (53%) create mode 100644 src/Proto.Cluster.AzureContainerApps/Messages/RegisterMember.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Messages/UnregisterMember.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Messages/UpdateMembers.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Models/ContainerAppMetadata.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Models/StoredMember.cs rename src/Proto.Cluster.AzureContainerApps/{ => Options}/AzureContainerAppsProviderOptions.cs (50%) rename src/Proto.Cluster.AzureContainerApps/{ResourceTagsMemberStoreOptionsValidator.cs => Options/AzureContainerAppsProviderOptionsValidator.cs} (92%) create mode 100644 src/Proto.Cluster.AzureContainerApps/Services/DefaultSystemClock.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Services/EnvironmentContainerAppMetadataAccessor.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Stores/Redis/IRedisConnectionMultiplexerProvider.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Stores/Redis/RedisClusterMemberStore.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Stores/Redis/RedisConnectionMultiplexerProvider.cs create mode 100644 src/Proto.Cluster.AzureContainerApps/Stores/Redis/ServiceCollectionExtensions.cs rename src/Proto.Cluster.AzureContainerApps/{ => Utils}/ArmClientProviders.cs (73%) rename src/Proto.Cluster.AzureContainerApps/{IPAddressUtils.cs => Utils/IPUtils.cs} (61%) diff --git a/src/Proto.Cluster.AzureContainerApps/Actors/AzureContainerAppsClusterMonitor.cs b/src/Proto.Cluster.AzureContainerApps/Actors/AzureContainerAppsClusterMonitor.cs new file mode 100644 index 0000000000..b1fc29ce34 --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Actors/AzureContainerAppsClusterMonitor.cs @@ -0,0 +1,295 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.ResourceManager.AppContainers; +using JetBrains.Annotations; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Proto.Cluster.AzureContainerApps.Contracts; +using Proto.Cluster.AzureContainerApps.Messages; +using Proto.Cluster.AzureContainerApps.Models; +using Proto.Cluster.AzureContainerApps.Options; +using Proto.Cluster.AzureContainerApps.Stores.ResourceTags; +using Proto.Timers; +using Proto.Utils; + +namespace Proto.Cluster.AzureContainerApps.Actors; + +/// +/// An actor that periodically updates the member list. +/// +public class AzureContainerAppsClusterMonitor : IActor +{ + private readonly Cluster _cluster; + private readonly IArmClientProvider _armClientProvider; + private readonly IClusterMemberStore _clusterMemberStore; + private readonly IContainerAppMetadataAccessor _containerAppMetadataAccessor; + private readonly ISystemClock _systemClock; + private readonly IOptions _options; + private readonly ILogger _logger; + + private string _memberId = default!; + private string _host = default!; + private int _port; + private string _address = default!; + private ICollection _kinds = default!; + private ContainerAppMetadata _containerAppMetadata = default!; + [CanBeNull] private ContainerAppResource _containerApp; + [CanBeNull] private CancellationTokenSource _scheduledTask; + private bool _stopping; + + /// + /// Initializes a new instance of . + /// + public AzureContainerAppsClusterMonitor( + Cluster cluster, + IArmClientProvider armClientProvider, + IClusterMemberStore clusterMemberStore, + IContainerAppMetadataAccessor containerAppMetadataAccessor, + ISystemClock systemClock, + IOptions options, + ILogger logger + ) + { + _cluster = cluster; + _armClientProvider = armClientProvider; + _clusterMemberStore = clusterMemberStore; + _containerAppMetadataAccessor = containerAppMetadataAccessor; + _systemClock = systemClock; + _options = options; + _logger = logger; + } + + private string ClusterName => _cluster.Config.ClusterName; + + /// + public Task ReceiveAsync(IContext context) + { + var cancellationToken = context.CancellationToken; + + var task = context.Message switch + { + Started => OnStarted(cancellationToken), + RegisterMember command => OnRegisterMember(context, command), + UpdateMembers => OnUpdateMembers(context), + UnregisterMember => OnUnregisterMember(), + _ => Task.CompletedTask + }; + + return task; + } + + private async Task OnStarted(CancellationToken cancellationToken) + { + _containerAppMetadata = await _containerAppMetadataAccessor.GetMetadataAsync(cancellationToken).ConfigureAwait(false); + } + + private Task OnRegisterMember(IContext context, RegisterMember command) + { + // Store the member ID. + _memberId = command.MemberId; + _host = command.Host; + _port = command.Port; + _kinds = command.Kinds; + _address = $"{_host}:{_port}"; + + var registerMemberTask = RegisterMemberInternal(); + + // Reenter after the member has been registered. + context.ReenterAfter(registerMemberTask, _ => + { + // Schedule the first update. + ScheduleUpdate(context); + }); + + return Task.CompletedTask; + } + + private async Task OnUnregisterMember() + { + await Retry.Try(UnregisterMemberInternal, onError: OnError, onFailed: OnFailed).ConfigureAwait(false); + void OnError(int attempt, Exception exception) => _logger.LogWarning(exception, "Failed to unregister member"); + void OnFailed(Exception exception) => _logger.LogError(exception, "Failed to unregister member"); + } + + private Task OnUpdateMembers(IContext context) + { + if (_stopping) + return Task.CompletedTask; + + var updateMembersTask = UpdateMembersAsync(); + context.ReenterAfter(updateMembersTask, _ => + { + // Schedule the next update. + ScheduleUpdate(context); + }); + + return Task.CompletedTask; + } + + private async Task RegisterMemberInternal() + { + // Register the member in the store. + await AddMember().ConfigureAwait(false); + } + + private async Task UpdateMembersAsync() + { + var storeName = _clusterMemberStore.GetType().Name; + _logger.LogInformation("Looking for members in {Store}", storeName); + + var now = _systemClock.UtcNow; + + try + { + var ttl = _options.Value.PollInterval + _options.Value.MemberTimeToLive; + var storedMembers = await GetMembersFromStore(); + var activeMembers = storedMembers.Where(m => m.UpdatedAt + ttl >= now).ToList(); + var expiredMembers = storedMembers.Where(m => m.UpdatedAt + ttl < now).ToList(); + + LogStoredMembers(storedMembers); + await UpdateCurrentMember(activeMembers, now); + await RemoveExpiredMembers(expiredMembers); + + UpdateClusterTopology(activeMembers); + } + catch (Exception x) + { + _logger.LogError(x, "Failed to get members from {Store}", storeName); + } + } + + private async Task> GetMembersFromStore() + { + return (await _clusterMemberStore.ListAsync().ConfigureAwait(false)).ToList(); + } + + private void LogStoredMembers(ICollection storedMembers) + { + if (storedMembers.Any()) + _logger.LogInformation("Got members {Members}", storedMembers.Count); + else + _logger.LogWarning("Did not get any members from {Store}", _clusterMemberStore.GetType().Name); + } + + private async Task UpdateCurrentMember(ICollection activeMembers, DateTimeOffset now) + { + var currentMember = activeMembers.FirstOrDefault(m => m.Id == _memberId); + + if (currentMember is not null) + { + var canReceiveTraffic = await CanReceiveTrafficAsync().ConfigureAwait(false); + var revisionName = _containerAppMetadata.RevisionName; + + if (!canReceiveTraffic) + { + _logger.LogInformation("Revision {RevisionName} is not active", revisionName); + activeMembers.Remove(currentMember); + } + else + { + _logger.LogInformation("Updating current member {MemberId} on {MemberAddress}", currentMember.Id, currentMember.Address); + currentMember = currentMember with { UpdatedAt = now }; + await _clusterMemberStore.UpdateAsync(currentMember).ConfigureAwait(false); + } + } + } + + private async Task RemoveExpiredMembers(IList expiredMembers) + { + if (expiredMembers.Any()) + { + _logger.LogInformation("Removing {Members} expired members", expiredMembers.Count); + + foreach (var expiredMember in expiredMembers) + { + _logger.LogInformation("Expired member {MemberId} on {MemberAddress}", expiredMember.Id, expiredMember.Address); + await _clusterMemberStore.UnregisterAsync(expiredMember.Id).ConfigureAwait(false); + } + } + } + + private void UpdateClusterTopology(IEnumerable activeMembers) + { + var members = activeMembers.Select(x => x.ToMember()).ToList(); + _cluster.MemberList.UpdateClusterTopology(members); + } + + private async Task AddMember() + { + await Retry.Try(AddMemberInternal, retryCount: Retry.Forever, onError: OnError, onFailed: OnFailed).ConfigureAwait(false); + + void OnError(int attempt, Exception exception) => _logger.LogWarning(exception, "Failed to register member"); + void OnFailed(Exception exception) => _logger.LogError(exception, "Failed to register member"); + } + + private async Task AddMemberInternal() + { + var canReceiveTraffic = await CanReceiveTrafficAsync().ConfigureAwait(false); + + if (!canReceiveTraffic) + { + _logger.LogInformation("Revision {RevisionName} is not active", _containerAppMetadata.RevisionName); + return; + } + + var member = new Member + { + Id = _memberId, + Host = _host, + Port = _port, + }; + + _logger.LogInformation( + "[Cluster][AzureContainerAppsProvider] Registering service {ReplicaName} on {IpAddress}", + _containerAppMetadata.ReplicaName, + _address); + + member.Kinds.AddRange(_kinds); + + await _clusterMemberStore.RegisterAsync(ClusterName, member).ConfigureAwait(false); + } + + private void ScheduleUpdate(IContext context) + { + if (_stopping) + return; + + var pollInterval = _options.Value.PollInterval; + _scheduledTask = context.Scheduler().SendOnce(pollInterval, context.Self, new UpdateMembers()); + } + + private async Task CanReceiveTrafficAsync() + { + var containerApp = await GetContainerAppAsync(); + var revisionName = _containerAppMetadata.RevisionName; + var revision = await containerApp.GetContainerAppRevisionAsync(revisionName).ConfigureAwait(false); + return revision.Value.Data.TrafficWeight.GetValueOrDefault() > 0; + } + + private async Task GetContainerAppAsync() + { + if (_containerApp is not null) + return _containerApp; + + var subscriptionId = _options.Value.SubscriptionId; + var resourceGroupName = _options.Value.ResourceGroupName; + var client = await _armClientProvider.CreateClientAsync().ConfigureAwait(false); + var resourceGroup = await client.GetResourceGroupByNameAsync(resourceGroupName, subscriptionId).ConfigureAwait(false); + var containerAppName = _containerAppMetadata.ContainerAppName; + var response = await resourceGroup.GetContainerAppAsync(containerAppName).ConfigureAwait(false); + + _containerApp = response.Value; + return _containerApp; + } + + private async Task UnregisterMemberInternal() + { + _stopping = true; + _scheduledTask?.Cancel(); + _logger.LogInformation("[Cluster][AzureContainerAppsProvider] Unregistering member {ReplicaName} on {IpAddress}", _containerAppMetadata.RevisionName, _address); + await _clusterMemberStore.UnregisterAsync(_memberId).ConfigureAwait(false); + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/DefaultAzureCredentialArmClientProvider.cs b/src/Proto.Cluster.AzureContainerApps/ArmClientProviders/DefaultAzureCredentialArmClientProvider.cs similarity index 83% rename from src/Proto.Cluster.AzureContainerApps/DefaultAzureCredentialArmClientProvider.cs rename to src/Proto.Cluster.AzureContainerApps/ArmClientProviders/DefaultAzureCredentialArmClientProvider.cs index 97df9f690f..379f35ba59 100644 --- a/src/Proto.Cluster.AzureContainerApps/DefaultAzureCredentialArmClientProvider.cs +++ b/src/Proto.Cluster.AzureContainerApps/ArmClientProviders/DefaultAzureCredentialArmClientProvider.cs @@ -3,8 +3,9 @@ using Azure.Identity; using Azure.ResourceManager; using JetBrains.Annotations; +using Proto.Cluster.AzureContainerApps.Contracts; -namespace Proto.Cluster.AzureContainerApps; +namespace Proto.Cluster.AzureContainerApps.ArmClientProviders; /// /// Provides an instance using diff --git a/src/Proto.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs b/src/Proto.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs deleted file mode 100644 index 13409c70bd..0000000000 --- a/src/Proto.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs +++ /dev/null @@ -1,179 +0,0 @@ -using System; -using System.Linq; -using System.Threading.Tasks; -using Azure.ResourceManager; -using Azure.ResourceManager.AppContainers; -using JetBrains.Annotations; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Proto.Cluster.AzureContainerApps.Stores.ResourceTags; -using Proto.Utils; - -namespace Proto.Cluster.AzureContainerApps; - -/// -/// A cluster provider that uses Azure Container Apps to host the cluster. -/// -[PublicAPI] -public class AzureContainerAppsProvider : IClusterProvider -{ - private readonly IArmClientProvider _armClientProvider; - private readonly IClusterMemberStore _clusterMemberStore; - private readonly IOptions _options; - private readonly ILogger _logger; - [CanBeNull] private readonly string _containerAppName; - private readonly string _revisionName; - private readonly string _replicaName; - private readonly string _advertisedHost; - - private string _memberId = null!; - private string _address = null!; - private Cluster _cluster = null!; - private string _clusterName = null!; - private string[] _kinds = null!; - private int _port; - private ArmClient _client = null!; - - /// - /// Use this constructor to create a new instance. - /// - /// An to create instances. - /// The store to use for storing member information. - /// The options for this provider. - /// The logger to use. - public AzureContainerAppsProvider( - IArmClientProvider armClientProvider, - IClusterMemberStore clusterMemberStore, - IOptions options, - ILogger logger) - { - _armClientProvider = armClientProvider; - _clusterMemberStore = clusterMemberStore; - _options = options; - _logger = logger; - _containerAppName = Environment.GetEnvironmentVariable("CONTAINER_APP_NAME") ?? throw new Exception("No app name provided"); - _revisionName = Environment.GetEnvironmentVariable("CONTAINER_APP_REVISION") ?? throw new Exception("No app revision provided"); - _replicaName = Environment.GetEnvironmentVariable("HOSTNAME") ?? throw new Exception("No replica name provided"); - _advertisedHost = IPAddressUtils.FindSmallestIpAddress().ToString(); - } - - /// - public async Task StartMemberAsync(Cluster cluster) - { - var clusterName = cluster.Config.ClusterName; - var (host, port) = cluster.System.GetAddress(); - var kinds = cluster.GetClusterKinds(); - _cluster = cluster; - _clusterName = clusterName; - _memberId = cluster.System.Id; - _port = port; - _kinds = kinds; - _address = $"{host}:{port}"; - _client = await _armClientProvider.CreateClientAsync(); - - await RegisterMemberAsync().ConfigureAwait(false); - StartClusterMonitor(); - } - - /// - public Task StartClientAsync(Cluster cluster) - { - var clusterName = cluster.Config.ClusterName; - var (_, port) = cluster.System.GetAddress(); - _cluster = cluster; - _clusterName = clusterName; - _memberId = cluster.System.Id; - _port = port; - _kinds = Array.Empty(); - - StartClusterMonitor(); - return Task.CompletedTask; - } - - /// - public async Task ShutdownAsync(bool graceful) => await DeregisterMemberAsync().ConfigureAwait(false); - - private async Task RegisterMemberAsync() - { - await Retry.Try(RegisterMemberInternal, retryCount: Retry.Forever, onError: OnError, onFailed: OnFailed).ConfigureAwait(false); - - void OnError(int attempt, Exception exception) => _logger.LogWarning(exception, "Failed to register service"); - void OnFailed(Exception exception) => _logger.LogError(exception, "Failed to register service"); - } - - private async Task RegisterMemberInternal() - { - var subscriptionId = _options.Value.SubscriptionId; - var resourceGroupName = _options.Value.ResourceGroupName; - var resourceGroup = await _client.GetResourceGroupByNameAsync(resourceGroupName, subscriptionId).ConfigureAwait(false); - var containerApp = await resourceGroup.GetContainerAppAsync(_containerAppName).ConfigureAwait(false); - var revision = await containerApp.Value.GetContainerAppRevisionAsync(_revisionName).ConfigureAwait(false); - - if ((revision.Value.Data.TrafficWeight ?? 0) == 0) - return; - - var member = new Member - { - Id = _memberId, - Host = _advertisedHost, - Port = _port, - }; - - _logger.LogInformation( - "[Cluster][AzureContainerAppsProvider] Registering service {ReplicaName} on {IpAddress}", - _replicaName, - _address); - - member.Kinds.AddRange(_kinds); - await _clusterMemberStore.RegisterAsync(_clusterName, member).ConfigureAwait(false); - } - - private void StartClusterMonitor() - { - var pollInterval = _options.Value.PollInterval; - var storeName = _clusterMemberStore.GetType().Name; - - _ = SafeTask.Run(async () => - { - while (!_cluster.System.Shutdown.IsCancellationRequested) - { - _logger.LogInformation("Looking for members in {Store}", storeName); - - try - { - var members = (await _clusterMemberStore.ListAsync().ConfigureAwait(false)).ToArray(); - - if (members.Any()) - { - _logger.LogInformation("Got members {Members}", members.Length); - _cluster.MemberList.UpdateClusterTopology(members); - } - else - { - _logger.LogWarning("Failed to get members from {Store}", storeName); - } - } - catch (Exception x) - { - _logger.LogError(x, "Failed to get members from {Store}", storeName); - } - - await Task.Delay(pollInterval).ConfigureAwait(false); - } - } - ); - } - - private async Task DeregisterMemberAsync() - { - await Retry.Try(DeregisterMemberInner, onError: OnError, onFailed: OnFailed).ConfigureAwait(false); - void OnError(int attempt, Exception exception) => _logger.LogWarning(exception, "Failed to deregister service"); - void OnFailed(Exception exception) => _logger.LogError(exception, "Failed to deregister service"); - } - - private async Task DeregisterMemberInner() - { - _logger.LogInformation("[Cluster][AzureContainerAppsProvider] Unregistering member {ReplicaName} on {IpAddress}", _replicaName, _address); - await _clusterMemberStore.UnregisterAsync(_memberId).ConfigureAwait(false); - } -} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/ClusterProviders/AzureContainerAppsProvider.cs b/src/Proto.Cluster.AzureContainerApps/ClusterProviders/AzureContainerAppsProvider.cs new file mode 100644 index 0000000000..152477f3de --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/ClusterProviders/AzureContainerAppsProvider.cs @@ -0,0 +1,96 @@ +using System.Threading.Tasks; +using JetBrains.Annotations; +using Microsoft.Extensions.Logging; +using Proto.Cluster.AzureContainerApps.Actors; +using Proto.Cluster.AzureContainerApps.Messages; +using Proto.Cluster.AzureContainerApps.Utils; +using Proto.DependencyInjection; + +namespace Proto.Cluster.AzureContainerApps.ClusterProviders; + +/// +/// A cluster provider that uses Azure Container Apps to host the cluster. +/// +[PublicAPI] +public class AzureContainerAppsProvider : IClusterProvider +{ + private readonly ILogger _logger; + private readonly string _advertisedHost; + + private string _address = default!; + private Cluster _cluster = default!; + private string _clusterName = default!; + private string[] _kinds = default!; + private int _port; + private PID _clusterMonitor = default!; + + /// + /// Use this constructor to create a new instance. + /// + /// The logger to use. + public AzureContainerAppsProvider(ILogger logger) + { + _logger = logger; + _advertisedHost = IPUtils.FindSmallestIpAddress().ToString(); + } + + /// + public Task StartMemberAsync(Cluster cluster) + { + var clusterName = cluster.Config.ClusterName; + var (_, port) = cluster.System.GetAddress(); + var kinds = cluster.GetClusterKinds(); + _cluster = cluster; + _clusterName = clusterName; + _port = port; + _kinds = kinds; + + // Start the monitor. + StartClusterMonitor(); + + // Register this member. + RegisterMember(); + + return Task.CompletedTask; + } + + /// + public Task StartClientAsync(Cluster cluster) + { + _cluster = cluster; + + // Start the monitor. + StartClusterMonitor(); + + return Task.CompletedTask; + } + + /// + public Task ShutdownAsync(bool graceful) + { + UnregisterMember(); + return Task.CompletedTask; + } + + private void StartClusterMonitor() + { + // Create props for the cluster monitor actor. + var props = _cluster.System.DI() + .PropsFor() + .WithGuardianSupervisorStrategy(Supervision.AlwaysRestartStrategy); + + // Spawn the cluster monitor actor. + _clusterMonitor = _cluster.System.Root.SpawnNamedSystem(props, "$aca-cluster-monitor"); + } + + private void RegisterMember() + { + // Send a message to register this member, which will also start the cluster monitor. + _cluster.System.Root.Send(_clusterMonitor, new RegisterMember(_clusterName, _advertisedHost, _port, _kinds, _cluster.System.Id)); + } + + private void UnregisterMember() + { + _cluster.System.Root.Send(_clusterMonitor, new UnregisterMember()); + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/IArmClientProvider.cs b/src/Proto.Cluster.AzureContainerApps/Contracts/IArmClientProvider.cs similarity index 89% rename from src/Proto.Cluster.AzureContainerApps/IArmClientProvider.cs rename to src/Proto.Cluster.AzureContainerApps/Contracts/IArmClientProvider.cs index 26cb308c65..d8dbc9bacb 100644 --- a/src/Proto.Cluster.AzureContainerApps/IArmClientProvider.cs +++ b/src/Proto.Cluster.AzureContainerApps/Contracts/IArmClientProvider.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Azure.ResourceManager; -namespace Proto.Cluster.AzureContainerApps; +namespace Proto.Cluster.AzureContainerApps.Contracts; /// /// Provides an instance. diff --git a/src/Proto.Cluster.AzureContainerApps/IClusterMemberStore.cs b/src/Proto.Cluster.AzureContainerApps/Contracts/IClusterMemberStore.cs similarity index 62% rename from src/Proto.Cluster.AzureContainerApps/IClusterMemberStore.cs rename to src/Proto.Cluster.AzureContainerApps/Contracts/IClusterMemberStore.cs index 560a011782..c2ac24c3cc 100644 --- a/src/Proto.Cluster.AzureContainerApps/IClusterMemberStore.cs +++ b/src/Proto.Cluster.AzureContainerApps/Contracts/IClusterMemberStore.cs @@ -1,8 +1,9 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Proto.Cluster.AzureContainerApps.Models; -namespace Proto.Cluster.AzureContainerApps; +namespace Proto.Cluster.AzureContainerApps.Contracts; /// /// Represents a repository of members in a cluster. @@ -10,19 +11,19 @@ namespace Proto.Cluster.AzureContainerApps; public interface IClusterMemberStore { /// - /// Returns a list of all members in the cluster. + /// Returns a list of all members. /// /// The cancellation token. /// A list of all members in the cluster. - ValueTask> ListAsync(CancellationToken cancellationToken = default); + ValueTask> ListAsync(CancellationToken cancellationToken = default); /// - /// Registers a member in the cluster. + /// Registers a member. /// /// The name of the cluster. /// The member to register. /// The cancellation token. - ValueTask RegisterAsync(string clusterName, Member member, CancellationToken cancellationToken = default); + ValueTask RegisterAsync(string clusterName, Member member, CancellationToken cancellationToken = default); /// /// Unregisters a member from the cluster. @@ -30,9 +31,16 @@ public interface IClusterMemberStore /// The ID of the member to unregister. /// The cancellation token. ValueTask UnregisterAsync(string memberId, CancellationToken cancellationToken = default); - + + /// + /// Updates a member. + /// + /// The member to update. + /// The cancellation token. + ValueTask UpdateAsync(StoredMember storedMember, CancellationToken cancellationToken = default); + /// - /// Clears all members from the cluster. + /// Clears all members. /// /// The name of the cluster. /// The cancellation token. diff --git a/src/Proto.Cluster.AzureContainerApps/Contracts/IContainerAppMetadataAccessor.cs b/src/Proto.Cluster.AzureContainerApps/Contracts/IContainerAppMetadataAccessor.cs new file mode 100644 index 0000000000..24e65d234e --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Contracts/IContainerAppMetadataAccessor.cs @@ -0,0 +1,16 @@ +using System.Threading; +using System.Threading.Tasks; +using Proto.Cluster.AzureContainerApps.Models; + +namespace Proto.Cluster.AzureContainerApps.Contracts; + +/// +/// Provides access to the container app metadata. +/// +public interface IContainerAppMetadataAccessor +{ + /// + /// Gets the container app metadata. + /// + ValueTask GetMetadataAsync(CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Contracts/ISystemClock.cs b/src/Proto.Cluster.AzureContainerApps/Contracts/ISystemClock.cs new file mode 100644 index 0000000000..e678baef7c --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Contracts/ISystemClock.cs @@ -0,0 +1,14 @@ +using System; + +namespace Proto.Cluster.AzureContainerApps.Contracts; + +/// +/// Provides the current time in UTC. +/// +public interface ISystemClock +{ + /// + /// The current time in UTC. + /// + DateTimeOffset UtcNow { get; } +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/ServiceCollectionExtensions.cs b/src/Proto.Cluster.AzureContainerApps/Extensions/ServiceCollectionExtensions.cs similarity index 53% rename from src/Proto.Cluster.AzureContainerApps/ServiceCollectionExtensions.cs rename to src/Proto.Cluster.AzureContainerApps/Extensions/ServiceCollectionExtensions.cs index 6e798c40f6..8fc79f5a63 100644 --- a/src/Proto.Cluster.AzureContainerApps/ServiceCollectionExtensions.cs +++ b/src/Proto.Cluster.AzureContainerApps/Extensions/ServiceCollectionExtensions.cs @@ -1,13 +1,15 @@ using System; -using System.Diagnostics.CodeAnalysis; using Azure.ResourceManager; using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; +using Proto.Cluster.AzureContainerApps.Actors; +using Proto.Cluster.AzureContainerApps.ClusterProviders; +using Proto.Cluster.AzureContainerApps.Contracts; +using Proto.Cluster.AzureContainerApps.Options; +using Proto.Cluster.AzureContainerApps.Services; using Proto.Cluster.AzureContainerApps.Stores.ResourceTags; +// ReSharper disable once CheckNamespace namespace Proto.Cluster.AzureContainerApps; /// @@ -21,26 +23,30 @@ public static class ServiceCollectionExtensions /// /// The service collection to add the provider to. /// An to create instances. + /// An optional configuration for the member store. /// An optional action to configure the provider options. - /// The service collection. - public static IServiceCollection AddAzureContainerAppsProvider(this IServiceCollection services, [CanBeNull] IArmClientProvider armClientProvider = default, [AllowNull] Action configure = null) + public static IServiceCollection AddAzureContainerAppsProvider(this IServiceCollection services, + [CanBeNull] IArmClientProvider armClientProvider = default, + [CanBeNull] Action configureMemberStore = null, + [CanBeNull] Action configure = null) { var configureOptions = configure ?? (_ => { }); services.Configure(configureOptions); services.ConfigureOptions(); services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddTransient(); if (armClientProvider != null) services.AddSingleton(armClientProvider); - // Register the default member store. - services.TryAddSingleton(sp => - { - var clientProvider = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var options = sp.GetRequiredService>().Value; - return new ResourceTagsClusterMemberStore(clientProvider, logger, options.ResourceGroupName, options.SubscriptionId); - }); + if (configureMemberStore != null) + // Add the custom member store + configureMemberStore.Invoke(services); + else + // Add the default member store + services.AddResourceTagsMemberStore(); return services; } diff --git a/src/Proto.Cluster.AzureContainerApps/Messages/RegisterMember.cs b/src/Proto.Cluster.AzureContainerApps/Messages/RegisterMember.cs new file mode 100644 index 0000000000..094bddabde --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Messages/RegisterMember.cs @@ -0,0 +1,8 @@ +using System.Collections.Generic; + +namespace Proto.Cluster.AzureContainerApps.Messages; + +/// +/// Registers a member in the cluster. +/// +public record RegisterMember(string ClusterName, string Host, int Port, ICollection Kinds, string MemberId); \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Messages/UnregisterMember.cs b/src/Proto.Cluster.AzureContainerApps/Messages/UnregisterMember.cs new file mode 100644 index 0000000000..9c3afc6861 --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Messages/UnregisterMember.cs @@ -0,0 +1,6 @@ +namespace Proto.Cluster.AzureContainerApps.Messages; + +/// +/// Unregisters a member from the cluster. +/// +public record UnregisterMember; \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Messages/UpdateMembers.cs b/src/Proto.Cluster.AzureContainerApps/Messages/UpdateMembers.cs new file mode 100644 index 0000000000..c322c1d8d5 --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Messages/UpdateMembers.cs @@ -0,0 +1,6 @@ +namespace Proto.Cluster.AzureContainerApps.Messages; + +/// +/// Update the members in the cluster. +/// +public record UpdateMembers; \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Models/ContainerAppMetadata.cs b/src/Proto.Cluster.AzureContainerApps/Models/ContainerAppMetadata.cs new file mode 100644 index 0000000000..6d3e27e73c --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Models/ContainerAppMetadata.cs @@ -0,0 +1,9 @@ +namespace Proto.Cluster.AzureContainerApps.Models; + +/// +/// Contains information about the container app name, revision name and replica name. +/// +/// The name of the container app. +/// The name of the revision. +/// The name of the replica. +public record ContainerAppMetadata(string ContainerAppName, string RevisionName, string ReplicaName); \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Models/StoredMember.cs b/src/Proto.Cluster.AzureContainerApps/Models/StoredMember.cs new file mode 100644 index 0000000000..027551701f --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Models/StoredMember.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Text.Json.Serialization; +using Proto.Cluster.AzureContainerApps.Contracts; + +namespace Proto.Cluster.AzureContainerApps.Models; + +/// +/// Represents a member in a cluster stored in a in a storable format. +/// +public record StoredMember(string Id, string Host, int Port, ICollection Kinds, string Cluster, DateTimeOffset CreatedAt, DateTimeOffset UpdatedAt) +{ + /// + /// Gets the address of the member. + /// + [JsonIgnore] + public string Address => Host + ":" + Port; + + /// + /// Creates a new instance of the class from the values in this instance. + /// + /// + public Member ToMember() => new() + { + Id = Id, + Host = Host, + Port = Port, + Kinds = { Kinds } + }; + + /// + /// Creates a new instance of the class from the values in the specified instance. + /// + /// The member to create a new instance from. + /// The name of the cluster. + /// The time the member was created. + /// A new instance of the class. + public static StoredMember FromMember(Member member, string cluster, DateTimeOffset createdAt) + { + return new StoredMember(member.Id, member.Host, member.Port, member.Kinds, cluster, createdAt, createdAt); + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/AzureContainerAppsProviderOptions.cs b/src/Proto.Cluster.AzureContainerApps/Options/AzureContainerAppsProviderOptions.cs similarity index 50% rename from src/Proto.Cluster.AzureContainerApps/AzureContainerAppsProviderOptions.cs rename to src/Proto.Cluster.AzureContainerApps/Options/AzureContainerAppsProviderOptions.cs index 74aecebcfb..05afd64834 100644 --- a/src/Proto.Cluster.AzureContainerApps/AzureContainerAppsProviderOptions.cs +++ b/src/Proto.Cluster.AzureContainerApps/Options/AzureContainerAppsProviderOptions.cs @@ -1,7 +1,9 @@ using System; using JetBrains.Annotations; +using Proto.Cluster.AzureContainerApps.ClusterProviders; +using Proto.Cluster.AzureContainerApps.Contracts; -namespace Proto.Cluster.AzureContainerApps; +namespace Proto.Cluster.AzureContainerApps.Options; /// /// Options for @@ -21,7 +23,13 @@ public class AzureContainerAppsProviderOptions public string ResourceGroupName { get; set; } = default!; /// - /// The interval to use for polling the for changes. + /// The interval at which to poll the cluster member store for changes. /// public TimeSpan PollInterval { get; set; } = TimeSpan.FromSeconds(5); + + /// + /// The time to live for a member in the store before it is considered stale and removed from the underlying . + /// The actual TTL is determined by the and the by adding them together. + /// + public TimeSpan MemberTimeToLive { get; set; } = TimeSpan.FromMinutes(1); } \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/ResourceTagsMemberStoreOptionsValidator.cs b/src/Proto.Cluster.AzureContainerApps/Options/AzureContainerAppsProviderOptionsValidator.cs similarity index 92% rename from src/Proto.Cluster.AzureContainerApps/ResourceTagsMemberStoreOptionsValidator.cs rename to src/Proto.Cluster.AzureContainerApps/Options/AzureContainerAppsProviderOptionsValidator.cs index 104637e99c..40bffe49f9 100644 --- a/src/Proto.Cluster.AzureContainerApps/ResourceTagsMemberStoreOptionsValidator.cs +++ b/src/Proto.Cluster.AzureContainerApps/Options/AzureContainerAppsProviderOptionsValidator.cs @@ -2,7 +2,7 @@ using Microsoft.Extensions.Options; using Proto.Cluster.AzureContainerApps.Stores.ResourceTags; -namespace Proto.Cluster.AzureContainerApps; +namespace Proto.Cluster.AzureContainerApps.Options; /// /// Validates the to ensure that the required options are provided. diff --git a/src/Proto.Cluster.AzureContainerApps/Proto.Cluster.AzureContainerApps.csproj b/src/Proto.Cluster.AzureContainerApps/Proto.Cluster.AzureContainerApps.csproj index 8a1fc04215..8c8b91ce0d 100644 --- a/src/Proto.Cluster.AzureContainerApps/Proto.Cluster.AzureContainerApps.csproj +++ b/src/Proto.Cluster.AzureContainerApps/Proto.Cluster.AzureContainerApps.csproj @@ -7,6 +7,7 @@ + diff --git a/src/Proto.Cluster.AzureContainerApps/Services/DefaultSystemClock.cs b/src/Proto.Cluster.AzureContainerApps/Services/DefaultSystemClock.cs new file mode 100644 index 0000000000..d68b966168 --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Services/DefaultSystemClock.cs @@ -0,0 +1,11 @@ +using System; +using Proto.Cluster.AzureContainerApps.Contracts; + +namespace Proto.Cluster.AzureContainerApps.Services; + +/// +public class DefaultSystemClock : ISystemClock +{ + /// + public DateTimeOffset UtcNow => DateTimeOffset.UtcNow; +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Services/EnvironmentContainerAppMetadataAccessor.cs b/src/Proto.Cluster.AzureContainerApps/Services/EnvironmentContainerAppMetadataAccessor.cs new file mode 100644 index 0000000000..fc5f896393 --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Services/EnvironmentContainerAppMetadataAccessor.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Proto.Cluster.AzureContainerApps.Contracts; +using Proto.Cluster.AzureContainerApps.Models; + +namespace Proto.Cluster.AzureContainerApps.Services; + +/// +/// Provides the container app metadata from the environment variables. +/// +public class EnvironmentContainerAppMetadataAccessor : IContainerAppMetadataAccessor +{ + /// + /// Returns the container app metadata from the environment variables. + /// + /// An optional cancellation token. + /// A instance. + /// Thrown if any of the required environment variables are not set. + public ValueTask GetMetadataAsync(CancellationToken cancellationToken = default) + { + var containerAppName = Environment.GetEnvironmentVariable("CONTAINER_APP_NAME") ?? throw new Exception("No app name provided"); + var revisionName = Environment.GetEnvironmentVariable("CONTAINER_APP_REVISION") ?? throw new Exception("No app revision provided"); + var replicaName = Environment.GetEnvironmentVariable("HOSTNAME") ?? throw new Exception("No replica name provided"); + + return new(new ContainerAppMetadata(containerAppName, revisionName, replicaName)); + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Stores/Redis/IRedisConnectionMultiplexerProvider.cs b/src/Proto.Cluster.AzureContainerApps/Stores/Redis/IRedisConnectionMultiplexerProvider.cs new file mode 100644 index 0000000000..0b939261ad --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Stores/Redis/IRedisConnectionMultiplexerProvider.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; +using StackExchange.Redis; + +namespace Proto.Cluster.AzureContainerApps.Stores.Redis; + +public interface IRedisConnectionMultiplexerProvider +{ + Task GetConnectionMultiplexerAsync(); +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Stores/Redis/RedisClusterMemberStore.cs b/src/Proto.Cluster.AzureContainerApps/Stores/Redis/RedisClusterMemberStore.cs new file mode 100644 index 0000000000..10d5b629d3 --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Stores/Redis/RedisClusterMemberStore.cs @@ -0,0 +1,85 @@ +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using JetBrains.Annotations; +using Proto.Cluster.AzureContainerApps.Contracts; +using Proto.Cluster.AzureContainerApps.Models; +using StackExchange.Redis; + +namespace Proto.Cluster.AzureContainerApps.Stores.Redis; + +/// +/// Stores cluster member information in Redis database. +/// +[PublicAPI] +public class RedisClusterMemberStore : IClusterMemberStore +{ + private readonly IRedisConnectionMultiplexerProvider _connectionMultiplexerProvider; + private readonly ISystemClock _systemClock; + private const string ClusterKey = "proto:cluster:members"; + [CanBeNull] private IDatabase _database; + + /// + /// Initializes a new instance of the class. + /// + public RedisClusterMemberStore(IRedisConnectionMultiplexerProvider connectionMultiplexerProvider, ISystemClock systemClock) + { + _connectionMultiplexerProvider = connectionMultiplexerProvider; + _systemClock = systemClock; + } + + /// + public async ValueTask> ListAsync(CancellationToken cancellationToken = default) + { + var database = await GetDatabaseAsync(); + var entries = await database.HashGetAllAsync(ClusterKey); + return entries.Select(entry => Deserialize(entry.Value!)).ToList(); + } + + /// + public async ValueTask RegisterAsync(string clusterName, Member member, CancellationToken cancellationToken = default) + { + var storedMember = StoredMember.FromMember(member, clusterName, _systemClock.UtcNow); + var serialized = Serialize(storedMember); + var database = await GetDatabaseAsync(); + + await database.HashSetAsync(ClusterKey, member.Id, serialized); + return storedMember; + } + + /// + public async ValueTask UnregisterAsync(string memberId, CancellationToken cancellationToken = default) + { + var database = await GetDatabaseAsync(); + await database.HashDeleteAsync(ClusterKey, memberId); + } + + /// + public async ValueTask UpdateAsync(StoredMember storedMember, CancellationToken cancellationToken = default) + { + var serialized = Serialize(storedMember); + var database = await GetDatabaseAsync(); + await database.HashSetAsync(ClusterKey, storedMember.Id, serialized); + } + + /// + public async ValueTask ClearAsync(string clusterName, CancellationToken cancellationToken = default) + { + var database = await GetDatabaseAsync(); + await database.KeyDeleteAsync(ClusterKey); + } + + private async Task GetDatabaseAsync() + { + if (_database is not null) + return _database; + + var connectionMultiplexer = await _connectionMultiplexerProvider.GetConnectionMultiplexerAsync(); + return _database = connectionMultiplexer.GetDatabase(); + } + + private static string Serialize(StoredMember member) => JsonSerializer.Serialize(member); + private static StoredMember Deserialize(string data) => JsonSerializer.Deserialize(data)!; +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Stores/Redis/RedisConnectionMultiplexerProvider.cs b/src/Proto.Cluster.AzureContainerApps/Stores/Redis/RedisConnectionMultiplexerProvider.cs new file mode 100644 index 0000000000..ec899035d8 --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Stores/Redis/RedisConnectionMultiplexerProvider.cs @@ -0,0 +1,21 @@ +using System.Threading.Tasks; +using JetBrains.Annotations; +using StackExchange.Redis; + +namespace Proto.Cluster.AzureContainerApps.Stores.Redis; + +public class RedisConnectionMultiplexerProvider : IRedisConnectionMultiplexerProvider +{ + private readonly string _connectionString; + [CanBeNull] private ConnectionMultiplexer _connectionMultiplexer; + + public RedisConnectionMultiplexerProvider(string connectionString) + { + _connectionString = connectionString; + } + + public async Task GetConnectionMultiplexerAsync() + { + return _connectionMultiplexer ??= await ConnectionMultiplexer.ConnectAsync(_connectionString); + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Stores/Redis/ServiceCollectionExtensions.cs b/src/Proto.Cluster.AzureContainerApps/Stores/Redis/ServiceCollectionExtensions.cs new file mode 100644 index 0000000000..c5b5173986 --- /dev/null +++ b/src/Proto.Cluster.AzureContainerApps/Stores/Redis/ServiceCollectionExtensions.cs @@ -0,0 +1,25 @@ +using JetBrains.Annotations; +using Microsoft.Extensions.DependencyInjection; +using Proto.Cluster.AzureContainerApps.Contracts; + +namespace Proto.Cluster.AzureContainerApps.Stores.Redis; + +/// +/// Adds extension methods to for registering the Azure Container Apps provider +/// +[PublicAPI] +public static class ServiceCollectionExtensions +{ + /// + /// Adds the to the service collection. + /// + /// The service collection to add the provider to. + /// Connection string for the Redis client. + /// The service collection. + public static IServiceCollection AddRedisClusterMemberStore(this IServiceCollection services, string connectionString) + { + services.AddSingleton(new RedisConnectionMultiplexerProvider(connectionString)); + services.AddSingleton(); + return services; + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ResourceTagNames.cs b/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ResourceTagNames.cs index bfa965e35d..5139391f0f 100644 --- a/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ResourceTagNames.cs +++ b/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ResourceTagNames.cs @@ -9,19 +9,19 @@ public static class ResourceTagNames /// The prefix for the tag name. /// public const string NamePrefix = "proto.cluster:member:"; - + /// /// The prefix for the tag name. /// public const string KindPrefix = "kind:"; - + /// /// Gets the prefixed name for the given member ID. /// /// The member ID. /// The prefixed name. public static string Prefix(string memberId) => $"{NamePrefix}{memberId}"; - + /// /// Gets the prefixed name for the given member ID and name. /// diff --git a/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ResourceTagsClusterMemberStore.cs b/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ResourceTagsClusterMemberStore.cs index 1d00e5c19a..5d150eff64 100644 --- a/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ResourceTagsClusterMemberStore.cs +++ b/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ResourceTagsClusterMemberStore.cs @@ -10,6 +10,8 @@ using JetBrains.Annotations; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Proto.Cluster.AzureContainerApps.Contracts; +using Proto.Cluster.AzureContainerApps.Models; namespace Proto.Cluster.AzureContainerApps.Stores.ResourceTags; @@ -19,6 +21,7 @@ namespace Proto.Cluster.AzureContainerApps.Stores.ResourceTags; [PublicAPI] public class ResourceTagsClusterMemberStore : IClusterMemberStore { + private readonly ISystemClock _systemClock; private readonly IArmClientProvider _armClientProvider; private readonly ILogger _logger; private readonly string _containerAppName; @@ -32,11 +35,14 @@ public class ResourceTagsClusterMemberStore : IClusterMemberStore /// /// The to use. /// The options for this store. + /// The to use. /// The logger to use. public ResourceTagsClusterMemberStore( IArmClientProvider armArmClientProvider, IOptions options, - ILogger logger) : this(armArmClientProvider, logger, options.Value.ResourceGroupName, options.Value.SubscriptionId) + ISystemClock systemClock, + ILogger logger) : + this(armArmClientProvider, systemClock, logger, options.Value.ResourceGroupName, options.Value.SubscriptionId) { } @@ -44,16 +50,19 @@ public ResourceTagsClusterMemberStore( /// Initializes a new instance of the class. /// /// The to use. + /// The to use. /// The logger to use. /// The name of the resource group. /// The subscription ID. internal ResourceTagsClusterMemberStore( IArmClientProvider armArmClientProvider, + ISystemClock systemClock, ILogger logger, string resourceGroupName, [CanBeNull] string subscriptionId = default) { _armClientProvider = armArmClientProvider; + _systemClock = systemClock; _logger = logger; _resourceGroupName = resourceGroupName; _subscriptionId = subscriptionId; @@ -61,9 +70,9 @@ internal ResourceTagsClusterMemberStore( } /// - public async ValueTask> ListAsync(CancellationToken cancellationToken = default) + public async ValueTask> ListAsync(CancellationToken cancellationToken = default) { - var members = new List(); + var storedMembers = new List(); var resourceGroupName = _resourceGroupName; var containerAppName = _containerAppName; var containerApp = await GetContainerAppAsync(cancellationToken).ConfigureAwait(false); @@ -71,7 +80,7 @@ public async ValueTask> ListAsync(CancellationToken cancella if (containerApp == null) { _logger.LogError("Resource: {ResourceName} in resource group: {ResourceGroup} is not found", containerAppName, resourceGroupName); - return members.ToArray(); + return storedMembers.ToArray(); } // Get the app container managed environment in order to get the other container apps. @@ -101,16 +110,19 @@ public async ValueTask> ListAsync(CancellationToken cancella .Select(x => x.Value); member.Kinds.AddRange(kinds); - members.Add(member); + var storedMember = StoredMember.FromMember(member, taggedMember.Cluster, DateTimeOffset.UtcNow); + storedMembers.Add(storedMember); } - return members.ToArray(); + return storedMembers.ToArray(); } /// - public async ValueTask RegisterAsync(string clusterName, Member member, CancellationToken cancellationToken = default) + public async ValueTask RegisterAsync(string clusterName, Member member, CancellationToken cancellationToken = default) { - var taggedMember = new TaggedMember(member.Host, member.Port, clusterName); + var now = _systemClock.UtcNow; + var storedMember = StoredMember.FromMember(member, clusterName, now); + var taggedMember = new TaggedMember(member.Host, member.Port, clusterName, now, now); var serializedTaggedMember = Serialize(taggedMember); var tags = new Dictionary @@ -123,12 +135,14 @@ public async ValueTask RegisterAsync(string clusterName, Member member, Cancella try { - await AddMemberTags(tags, cancellationToken).ConfigureAwait(false); + await SetMemberTags(tags, cancellationToken).ConfigureAwait(false); } catch (Exception x) { _logger.LogError(x, "Failed to update metadata"); } + + return storedMember; } /// @@ -149,6 +163,27 @@ public async ValueTask UnregisterAsync(string memberId, CancellationToken cancel await containerApp.SetTagsAsync(existingTags, cancellationToken).ConfigureAwait(false); } + /// + public async ValueTask UpdateAsync(StoredMember storedMember, CancellationToken cancellationToken = default) + { + var taggedMember = new TaggedMember(storedMember.Host, storedMember.Port, storedMember.Cluster, storedMember.CreatedAt, _systemClock.UtcNow); + var serializedTaggedMember = Serialize(taggedMember); + + var tags = new Dictionary + { + [ResourceTagNames.Prefix(storedMember.Id)] = serializedTaggedMember + }; + + try + { + await SetMemberTags(tags, cancellationToken).ConfigureAwait(false); + } + catch (Exception x) + { + _logger.LogError(x, "Failed to update metadata"); + } + } + /// public async ValueTask ClearAsync(string clusterName, CancellationToken cancellationToken = default) { @@ -167,7 +202,7 @@ public async ValueTask ClearAsync(string clusterName, CancellationToken cancella await containerApp.SetTagsAsync(existingTags, cancellationToken).ConfigureAwait(false); } - private async Task AddMemberTags(Dictionary newTags, CancellationToken cancellationToken) + private async Task SetMemberTags(Dictionary newTags, CancellationToken cancellationToken) { var containerApp = await GetContainerAppAsync(cancellationToken).ConfigureAwait(false); diff --git a/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ServiceCollectionExtensions.cs b/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ServiceCollectionExtensions.cs index 1af9bd5dcd..64934283e6 100644 --- a/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ServiceCollectionExtensions.cs +++ b/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/ServiceCollectionExtensions.cs @@ -1,15 +1,16 @@ using System; using System.Diagnostics.CodeAnalysis; -using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Proto.Cluster.AzureContainerApps.Contracts; +using Proto.Cluster.AzureContainerApps.Options; namespace Proto.Cluster.AzureContainerApps.Stores.ResourceTags; /// /// Adds extension methods to for registering the Azure Container Apps provider /// -[PublicAPI] public static class ServiceCollectionExtensions { /// @@ -18,12 +19,20 @@ public static class ServiceCollectionExtensions /// The service collection to add the provider to. /// An optional action to configure the provider options. /// The service collection. - public static IServiceCollection AddResourceTagsMemberStore(this IServiceCollection services, [AllowNull]Action configure = null) + public static IServiceCollection AddResourceTagsMemberStore(this IServiceCollection services, [AllowNull] Action configure = null) { var configureOptions = configure ?? (_ => { }); services.Configure(configureOptions); services.ConfigureOptions(); - services.Replace(new ServiceDescriptor(typeof(IClusterMemberStore), typeof(ResourceTagsClusterMemberStore), ServiceLifetime.Singleton)); + + services.AddSingleton(sp => + { + var clientProvider = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var systemClock = sp.GetRequiredService(); + var options = sp.GetRequiredService>().Value; + return new ResourceTagsClusterMemberStore(clientProvider, systemClock, logger, options.ResourceGroupName, options.SubscriptionId); + }); return services; } diff --git a/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/TaggedMember.cs b/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/TaggedMember.cs index f78e7f97e6..f214ce3522 100644 --- a/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/TaggedMember.cs +++ b/src/Proto.Cluster.AzureContainerApps/Stores/ResourceTags/TaggedMember.cs @@ -1,6 +1,8 @@ +using System; + namespace Proto.Cluster.AzureContainerApps.Stores.ResourceTags; /// /// A member with a cluster name. /// -public record TaggedMember(string Host, int Port, string Cluster); \ No newline at end of file +public record TaggedMember(string Host, int Port, string Cluster, DateTimeOffset CreatedAt, DateTimeOffset UpdatedAt); \ No newline at end of file diff --git a/src/Proto.Cluster.AzureContainerApps/ArmClientProviders.cs b/src/Proto.Cluster.AzureContainerApps/Utils/ArmClientProviders.cs similarity index 73% rename from src/Proto.Cluster.AzureContainerApps/ArmClientProviders.cs rename to src/Proto.Cluster.AzureContainerApps/Utils/ArmClientProviders.cs index 4023a992f2..128387e2e3 100644 --- a/src/Proto.Cluster.AzureContainerApps/ArmClientProviders.cs +++ b/src/Proto.Cluster.AzureContainerApps/Utils/ArmClientProviders.cs @@ -1,7 +1,9 @@ using Azure.ResourceManager; using JetBrains.Annotations; +using Proto.Cluster.AzureContainerApps.ArmClientProviders; +using Proto.Cluster.AzureContainerApps.Contracts; -namespace Proto.Cluster.AzureContainerApps; +namespace Proto.Cluster.AzureContainerApps.Utils; /// /// Provides an instance. diff --git a/src/Proto.Cluster.AzureContainerApps/IPAddressUtils.cs b/src/Proto.Cluster.AzureContainerApps/Utils/IPUtils.cs similarity index 61% rename from src/Proto.Cluster.AzureContainerApps/IPAddressUtils.cs rename to src/Proto.Cluster.AzureContainerApps/Utils/IPUtils.cs index 9b708d002a..77a62a38e8 100644 --- a/src/Proto.Cluster.AzureContainerApps/IPAddressUtils.cs +++ b/src/Proto.Cluster.AzureContainerApps/Utils/IPUtils.cs @@ -5,17 +5,29 @@ using System.Net.Sockets; using JetBrains.Annotations; -namespace Proto.Cluster.AzureContainerApps; +namespace Proto.Cluster.AzureContainerApps.Utils; -public static class IPAddressUtils +/// +/// Utility methods for configuration. +/// +public static class IPUtils { - public static IPAddress FindSmallestIpAddress(AddressFamily family = AddressFamily.InterNetwork) + /// + /// Finds the smallest IP address on the machine. + /// + /// The address family to look for. + /// Whether to include loopback addresses. + public static IPAddress FindSmallestIpAddress(AddressFamily family = AddressFamily.InterNetwork, bool includeLoopback = false) { - var addressCandidates = NetworkInterface.GetAllNetworkInterfaces() + var addressCandidatesQuery = NetworkInterface.GetAllNetworkInterfaces() .Where(nif => nif.OperationalStatus == OperationalStatus.Up) .SelectMany(nif => nif.GetIPProperties().UnicastAddresses.Select(a => a.Address)) - .Where(addr => addr.AddressFamily == family && !IPAddress.IsLoopback(addr)) - .ToList(); + .Where(addr => addr.AddressFamily == family); + + if (!includeLoopback) + addressCandidatesQuery = addressCandidatesQuery.Where(addr => !IPAddress.IsLoopback(addr)); + + var addressCandidates = addressCandidatesQuery.ToList(); return PickSmallestIpAddress(addressCandidates); }