feat: add stale connection stats tracking and varz exposure

This commit is contained in:
Joseph Doherty
2026-02-23 00:38:43 -05:00
parent eb25d52ed5
commit cd4ae3cce6
5 changed files with 64 additions and 0 deletions

View File

@@ -157,6 +157,12 @@ public sealed class Varz
[JsonPropertyName("slow_consumer_stats")]
public SlowConsumersStats SlowConsumerStats { get; set; } = new();
[JsonPropertyName("stale_connections")]
public long StaleConnections { get; set; }
[JsonPropertyName("stale_connection_stats")]
public StaleConnectionStats StaleConnectionStatsDetail { get; set; } = new();
[JsonPropertyName("subscriptions")]
public uint Subscriptions { get; set; }
@@ -219,6 +225,25 @@ public sealed class SlowConsumersStats
public ulong Leafs { get; set; }
}
/// <summary>
/// Statistics about stale connections by connection type.
/// Corresponds to Go server/monitor.go StaleConnectionStats struct.
/// </summary>
public sealed class StaleConnectionStats
{
[JsonPropertyName("clients")]
public ulong Clients { get; set; }
[JsonPropertyName("routes")]
public ulong Routes { get; set; }
[JsonPropertyName("gateways")]
public ulong Gateways { get; set; }
[JsonPropertyName("leafs")]
public ulong Leafs { get; set; }
}
/// <summary>
/// Cluster configuration monitoring information.
/// Corresponds to Go server/monitor.go ClusterOptsVarz struct.

View File

@@ -91,6 +91,14 @@ public sealed class VarzHandler : IDisposable
Gateways = (ulong)Interlocked.Read(ref stats.SlowConsumerGateways),
Leafs = (ulong)Interlocked.Read(ref stats.SlowConsumerLeafs),
},
StaleConnections = Interlocked.Read(ref stats.StaleConnections),
StaleConnectionStatsDetail = new StaleConnectionStats
{
Clients = (ulong)Interlocked.Read(ref stats.StaleConnectionClients),
Routes = (ulong)Interlocked.Read(ref stats.StaleConnectionRoutes),
Gateways = (ulong)Interlocked.Read(ref stats.StaleConnectionGateways),
Leafs = (ulong)Interlocked.Read(ref stats.StaleConnectionLeafs),
},
Subscriptions = _server.SubList.Count,
ConfigLoadTime = _server.StartTime,
HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value),

View File

@@ -682,6 +682,12 @@ public sealed class NatsClient : IDisposable
break;
}
if (reason == ClientClosedReason.StaleConnection)
{
Interlocked.Increment(ref _serverStats.StaleConnections);
Interlocked.Increment(ref _serverStats.StaleConnectionClients);
}
_logger.LogDebug("Client {ClientId} connection closed: {CloseReason}", Id, reason);
}

View File

@@ -16,5 +16,9 @@ public sealed class ServerStats
public long SlowConsumerRoutes;
public long SlowConsumerLeafs;
public long SlowConsumerGateways;
public long StaleConnectionClients;
public long StaleConnectionRoutes;
public long StaleConnectionLeafs;
public long StaleConnectionGateways;
public readonly ConcurrentDictionary<string, long> HttpReqStats = new();
}

View File

@@ -75,6 +75,27 @@ public class ServerStatsTests : IAsyncLifetime
client.StartTime.ShouldNotBe(default);
}
[Fact]
public void StaleConnection_stats_incremented_on_mark_closed()
{
var stats = new ServerStats();
stats.StaleConnectionClients.ShouldBe(0);
Interlocked.Increment(ref stats.StaleConnectionClients);
stats.StaleConnectionClients.ShouldBe(1);
}
[Fact]
public void StaleConnection_stats_all_fields_default_to_zero()
{
var stats = new ServerStats();
stats.StaleConnections.ShouldBe(0);
stats.StaleConnectionClients.ShouldBe(0);
stats.StaleConnectionRoutes.ShouldBe(0);
stats.StaleConnectionLeafs.ShouldBe(0);
stats.StaleConnectionGateways.ShouldBe(0);
}
private static int GetFreePort()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);