Skip to content

Commit

Permalink
ACA Cluster Provider Improvements (#2036)
Browse files Browse the repository at this point in the history
* Add Redis store and manage stale members

* Fix ListAsync method of Redis store

* Expire current member immediately if replica inactive

* Use configured cluster host instead of always finding smallest IP

This ensures that whatever the RemoteConfig is bound to, is used.
  • Loading branch information
sfmskywalker authored Sep 1, 2023
1 parent f3e874f commit 32fc17b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private async Task UpdateMembersAsync()
var expiredMembers = storedMembers.Where(m => m.UpdatedAt + ttl < now).ToList();

LogStoredMembers(storedMembers);
await UpdateCurrentMember(activeMembers, now);
await UpdateCurrentMember(activeMembers, expiredMembers, now);
await RemoveExpiredMembers(expiredMembers);

UpdateClusterTopology(activeMembers);
Expand All @@ -174,7 +174,7 @@ private void LogStoredMembers(ICollection<StoredMember> storedMembers)
_logger.LogWarning("Did not get any members from {Store}", _clusterMemberStore.GetType().Name);
}

private async Task UpdateCurrentMember(ICollection<StoredMember> activeMembers, DateTimeOffset now)
private async Task UpdateCurrentMember(ICollection<StoredMember> activeMembers, ICollection<StoredMember> expiredMembers, DateTimeOffset now)
{
var currentMember = activeMembers.FirstOrDefault(m => m.Id == _memberId);

Expand All @@ -187,6 +187,7 @@ private async Task UpdateCurrentMember(ICollection<StoredMember> activeMembers,
{
_logger.LogInformation("Revision {RevisionName} is not active", revisionName);
activeMembers.Remove(currentMember);
expiredMembers.Add(currentMember);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Microsoft.Extensions.Logging;
using Proto.Cluster.AzureContainerApps.Actors;
using Proto.Cluster.AzureContainerApps.Messages;
using Proto.Cluster.AzureContainerApps.Utils;
using Proto.DependencyInjection;

namespace Proto.Cluster.AzureContainerApps.ClusterProviders;
Expand All @@ -15,12 +14,12 @@ namespace Proto.Cluster.AzureContainerApps.ClusterProviders;
public class AzureContainerAppsProvider : IClusterProvider
{
private readonly ILogger _logger;
private readonly string _advertisedHost;

private string _address = default!;
private Cluster _cluster = default!;
private string _clusterName = default!;
private string[] _kinds = default!;
private string _host = default!;
private int _port;
private PID _clusterMonitor = default!;

Expand All @@ -31,17 +30,17 @@ public class AzureContainerAppsProvider : IClusterProvider
public AzureContainerAppsProvider(ILogger<AzureContainerAppsProvider> logger)
{
_logger = logger;
_advertisedHost = IPUtils.FindSmallestIpAddress().ToString();
}

/// <inheritdoc />
public Task StartMemberAsync(Cluster cluster)
{
var clusterName = cluster.Config.ClusterName;
var (_, port) = cluster.System.GetAddress();
var (host, port) = cluster.System.GetAddress();
var kinds = cluster.GetClusterKinds();
_cluster = cluster;
_clusterName = clusterName;
_host = host;
_port = port;
_kinds = kinds;

Expand Down Expand Up @@ -86,7 +85,7 @@ private void StartClusterMonitor()
private void RegisterMember()
{
// Send a message to register this member, which will also start the cluster monitor.
_cluster.System.Root.Send(_clusterMonitor, new RegisterMember(_clusterName, _advertisedHost, _port, _kinds, _cluster.System.Id));
_cluster.System.Root.Send(_clusterMonitor, new RegisterMember(_clusterName, _host, _port, _kinds, _cluster.System.Id));
}

private void UnregisterMember()
Expand Down

0 comments on commit 32fc17b

Please sign in to comment.