Skip to content

Commit

Permalink
Add missing fields to CLUSTER NODES (microsoft#588)
Browse files Browse the repository at this point in the history
* added connection info object to re-use across instance metrics calls

* remove unused file ref and simplify connection stats for replica

* add support for missing CLUSTER NODES fields
  • Loading branch information
vazois authored Aug 13, 2024
1 parent 38b5805 commit d6b01c0
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 33 deletions.
26 changes: 18 additions & 8 deletions libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -469,20 +469,25 @@ private static void slotBitmapSetBit(ref byte[] bitmap, int pos)
/// Get formatted (using CLUSTER NODES format) cluster info.
/// </summary>
/// <returns>Formatted string.</returns>
public string GetClusterInfo()
public string GetClusterInfo(ClusterProvider clusterProvider)
{
var nodes = "";
for (ushort i = 1; i <= NumWorkers; i++)
nodes += GetNodeInfo(i);
{
var info = default(ConnectionInfo);
_ = clusterProvider?.clusterManager?.GetConnectionInfo(workers[i].Nodeid, out info);
nodes += GetNodeInfo(i, info);
}
return nodes;
}

/// <summary>
/// Get formatted (using CLUSTER NODES format) worker info.
/// </summary>
/// <param name="workerId">Offset of worker in the worker list.</param>
/// <param name="info">Connection information for the corresponding worker.</param>
/// <returns>Formatted string.</returns>
public string GetNodeInfo(ushort workerId)
public string GetNodeInfo(ushort workerId, ConnectionInfo info)
{
//<id>
//<ip:port@cport[,hostname[,auxiliary_field=value]*]>
Expand All @@ -498,10 +503,10 @@ public string GetNodeInfo(ushort workerId)
$"{workers[workerId].Address}:{workers[workerId].Port}@{workers[workerId].Port + 10000},{workers[workerId].hostname} " +
$"{(workerId == 1 ? "myself," : "")}{(workers[workerId].Role == NodeRole.PRIMARY ? "master" : "slave")} " +
$"{(workers[workerId].Role == NodeRole.REPLICA ? workers[workerId].ReplicaOfNodeId : "-")} " +
$"0 " +
$"0 " +
$"{info.ping} " +
$"{info.pong} " +
$"{workers[workerId].ConfigEpoch} " +
$"connected" +
$"{(info.connected || workerId == 1 ? "connected" : "disconnected")}" +
$"{GetSlotRange(workerId)}" +
$"{GetSpecialStates(workerId)}\n";
}
Expand Down Expand Up @@ -740,15 +745,20 @@ private List<int> GetSlotList(ushort workerId)
/// Get Replicas for node-id.
/// </summary>
/// <param name="nodeid">Node-id string.</param>
/// <param name="clusterProvider">ClusterProvider instance.</param>
/// <returns></returns>
public List<string> GetReplicas(string nodeid)
public List<string> GetReplicas(string nodeid, ClusterProvider clusterProvider)
{
List<string> replicas = [];
for (ushort i = 1; i < workers.Length; i++)
{
var replicaOf = workers[i].ReplicaOfNodeId;
if (replicaOf != null && replicaOf.Equals(nodeid, StringComparison.OrdinalIgnoreCase))
replicas.Add(GetNodeInfo(i));
{
var info = default(ConnectionInfo);
_ = clusterProvider?.clusterManager?.GetConnectionInfo(replicaOf, out info);
replicas.Add(GetNodeInfo(i, info));
}
}
return replicas;
}
Expand Down
3 changes: 1 addition & 2 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ public unsafe ClusterManager(ClusterProvider clusterProvider, ILogger logger = n
}

clusterConnectionStore = new GarnetClusterConnectionStore(logger: logger);

InitLocal(address, opts.Port, recoverConfig);
logger?.LogInformation("{NodeInfoStartup}", CurrentConfig.GetClusterInfo().TrimEnd('\n'));
logger?.LogInformation("{NodeInfoStartup}", CurrentConfig.GetClusterInfo(clusterProvider).TrimEnd('\n'));
gossipDelay = TimeSpan.FromSeconds(opts.GossipDelay);
clusterTimeout = opts.ClusterTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(opts.ClusterTimeout);
numActiveTasks = 0;
Expand Down
7 changes: 4 additions & 3 deletions libs/cluster/Server/ClusterManagerWorkerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,12 @@ public bool TryAddReplica(string nodeid, bool force, out ReadOnlySpan<byte> erro
/// <summary>
/// List replicas of specified primary with given nodeid
/// </summary>
/// <param name="nodeid"></param>
public List<string> ListReplicas(string nodeid)
/// <param name="nodeid"> Node-id string</param>
/// <param name="clusterProvider">ClusterProvider instance</param>
public List<string> ListReplicas(string nodeid, ClusterProvider clusterProvider)
{
var current = CurrentConfig;
return current.GetReplicas(nodeid);
return current.GetReplicas(nodeid, clusterProvider);
}
}
}
21 changes: 21 additions & 0 deletions libs/cluster/Server/ConnectionInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace Garnet.cluster
{
internal struct ConnectionInfo
{
public long ping;
public long pong;
public bool connected;
public long lastIO;

public ConnectionInfo()
{
ping = 0;
pong = 0;
connected = false;
lastIO = 0;
}
}
}
15 changes: 5 additions & 10 deletions libs/cluster/Server/GarnetClusterConnectionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private bool UnsafeGetConnection(string nodeId, out GarnetServerNode conn)
{
conn = null;
if (_disposed) return false;
for (int i = 0; i < numConnection; i++)
for (var i = 0; i < numConnection; i++)
{
var _conn = connections[i];
if (_conn.NodeId.Equals(nodeId, StringComparison.OrdinalIgnoreCase))
Expand Down Expand Up @@ -233,24 +233,19 @@ public bool GetRandomConnection(out GarnetServerNode conn)
/// Populate metrics related to link connection status.
/// </summary>
/// <param name="nodeId">Node-id to search for.</param>
/// <param name="linkStatus">Metrics info to retrieve for connection.</param>
/// <param name="info">Connection info corresponding to specified connection.</param>
/// <returns></returns>
public bool GetConnectionInfo(string nodeId, ref MetricsItem[] linkStatus)
public bool GetConnectionInfo(string nodeId, out ConnectionInfo info)
{
try
{
_lock.ReadLock();
if (UnsafeGetConnection(nodeId, out var conn))
{
var nowTicks = DateTimeOffset.UtcNow.Ticks;
var last_io_seconds = conn.GossipRecv == -1 ? -1 : nowTicks - conn.GossipSend;
last_io_seconds = last_io_seconds < 0 ? 0 : (int)TimeSpan.FromTicks(last_io_seconds).TotalSeconds;
var connection_status = conn.IsConnected ? "up" : "down";
linkStatus[0] = new("master_link_status", connection_status);
linkStatus[1] = new("master_last_io_seconds_ago", last_io_seconds.ToString());
info = conn.GetConnectionInfo();
return true;
}

info = new ConnectionInfo();
}
finally
{
Expand Down
17 changes: 17 additions & 0 deletions libs/cluster/Server/GarnetServerNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal sealed class GarnetServerNode
{
readonly ClusterProvider clusterProvider;
readonly GarnetClient gc;

long gossip_send;
long gossip_recv;
private CancellationTokenSource cts = new();
Expand Down Expand Up @@ -67,6 +68,8 @@ public GarnetServerNode(ClusterProvider clusterProvider, string address, int por
logger: logger);
this.initialized = 0;
this.logger = logger;
this.gossip_recv = 0;
this.gossip_send = 0;
ResetCts();
}

Expand Down Expand Up @@ -224,5 +227,19 @@ private Task Gossip(byte[] configByteArray)
}
}, TaskContinuationOptions.OnlyOnRanToCompletion).WaitAsync(clusterProvider.clusterManager.gossipDelay, cts.Token);
}

public ConnectionInfo GetConnectionInfo()
{
var nowTicks = DateTimeOffset.UtcNow.Ticks;
var last_io_seconds = GossipRecv == 0 ? 0 : (int)TimeSpan.FromTicks(nowTicks - GossipSend).TotalSeconds;

return new ConnectionInfo()
{
ping = GossipSend,
pong = GossipRecv,
connected = IsConnected,
lastIO = last_io_seconds,
};
}
}
}
17 changes: 11 additions & 6 deletions libs/cluster/Server/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ internal sealed partial class ClusterManager : IDisposable
readonly int GossipSamplePercent;

public TimeSpan GetClusterTimeout() => clusterTimeout;
readonly ConcurrentDictionary<string, long> workerBanList = new ConcurrentDictionary<string, long>();
readonly ConcurrentDictionary<string, long> workerBanList = new();
public readonly CancellationTokenSource ctsGossip = new();

public List<string> GetBanList()
Expand All @@ -127,22 +127,27 @@ public List<string> GetBanList()
return banlist;
}

public bool GetConnectionInfo(string nodeId, out ConnectionInfo info)
=> clusterConnectionStore.GetConnectionInfo(nodeId, out info);

/// <summary>
/// Get link status info for primary of this node.
/// </summary>
/// <param name="config">Snapshot of config to use for retrieving that information.</param>
/// <returns>MetricsItem array of all the associated info.</returns>
public MetricsItem[] GetPrimaryLinkStatus(ClusterConfig config)
{
ConnectionInfo info = new();
var primaryId = config.LocalNodePrimaryId;

if (primaryId != null)
_ = clusterConnectionStore.GetConnectionInfo(primaryId, out info);

var primaryLinkStatus = new MetricsItem[2]
{
new("master_link_status", "down"),
new("master_last_io_seconds_ago", "0")
new("master_link_status", info.connected ? "up" : "down"),
new("master_last_io_seconds_ago", info.lastIO.ToString())
};

if (primaryId != null)
_ = clusterConnectionStore.GetConnectionInfo(primaryId, ref primaryLinkStatus);
return primaryLinkStatus;
}

Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Session/RespClusterBasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private bool NetworkClusterNodes(int count, out bool invalidParameters)

var ptr = recvBufferPtr + readHead;
readHead = (int)(ptr - recvBufferPtr);
var nodes = clusterProvider.clusterManager.CurrentConfig.GetClusterInfo();
var nodes = clusterProvider.clusterManager.CurrentConfig.GetClusterInfo(clusterProvider);
while (!RespWriteUtils.WriteAsciiBulkString(nodes, ref dcurr, dend))
SendAndReset();

Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Session/RespClusterReplicationCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private bool NetworkClusterReplicas(int count, out bool invalidParameters)
if (!RespReadUtils.ReadStringWithLengthHeader(out var nodeid, ref ptr, recvBufferPtr + bytesRead))
return false;
readHead = (int)(ptr - recvBufferPtr);
var replicas = clusterProvider.clusterManager.ListReplicas(nodeid);
var replicas = clusterProvider.clusterManager.ListReplicas(nodeid, clusterProvider);

while (!RespWriteUtils.WriteArrayLength(replicas.Count, ref dcurr, dend))
SendAndReset();
Expand Down
2 changes: 0 additions & 2 deletions playground/ClusterStress/ClusterStress.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\..\libs\cluster\Server\ClusterConfig.cs" Link="ClusterConfig.cs" />
<Compile Include="..\..\libs\cluster\Server\ClusterConfigSerializer.cs" Link="ClusterConfigSerializer.cs" />
<Compile Include="..\..\libs\cluster\Server\Worker.cs" Link="Worker.cs" />
<Compile Include="..\..\libs\cluster\Server\HashSlot.cs" Link="HashSlot.cs" />
<Compile Include="..\..\benchmark\Resp.benchmark\Options.cs" Link="Options.cs" />
Expand Down

0 comments on commit d6b01c0

Please sign in to comment.