diff --git a/src/NATS.Server/Monitoring/Connz.cs b/src/NATS.Server/Monitoring/Connz.cs index d2a6f49..2147715 100644 --- a/src/NATS.Server/Monitoring/Connz.cs +++ b/src/NATS.Server/Monitoring/Connz.cs @@ -168,6 +168,7 @@ public enum SortOpt ByLast, ByIdle, ByUptime, + ByRtt, } /// diff --git a/src/NATS.Server/Monitoring/ConnzHandler.cs b/src/NATS.Server/Monitoring/ConnzHandler.cs index 8ff0a3e..f532e7c 100644 --- a/src/NATS.Server/Monitoring/ConnzHandler.cs +++ b/src/NATS.Server/Monitoring/ConnzHandler.cs @@ -30,6 +30,7 @@ public sealed class ConnzHandler(NatsServer server) SortOpt.ByLast => connInfos.OrderByDescending(c => c.LastActivity).ToList(), SortOpt.ByIdle => connInfos.OrderByDescending(c => now - c.LastActivity).ToList(), SortOpt.ByUptime => connInfos.OrderByDescending(c => now - c.Start).ToList(), + SortOpt.ByRtt => connInfos.OrderBy(c => c.Rtt).ToList(), _ => connInfos.OrderBy(c => c.Cid).ToList(), }; @@ -73,6 +74,7 @@ public sealed class ConnzHandler(NatsServer server) Reason = client.CloseReason.ToReasonString(), TlsVersion = client.TlsState?.TlsVersion ?? "", TlsCipherSuite = client.TlsState?.CipherSuite ?? "", + Rtt = client.Rtt == TimeSpan.Zero ? "" : $"{client.Rtt.TotalMilliseconds:F3}ms", }; if (opts.Subscriptions) @@ -116,6 +118,7 @@ public sealed class ConnzHandler(NatsServer server) "last" => SortOpt.ByLast, "idle" => SortOpt.ByIdle, "uptime" => SortOpt.ByUptime, + "rtt" => SortOpt.ByRtt, _ => SortOpt.ByCid, }; } diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index b380560..a2bb19e 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -73,6 +73,11 @@ public sealed class NatsClient : IDisposable private int _pingsOut; private long _lastIn; + // RTT tracking + private long _rttStartTicks; + private long _rtt; + public TimeSpan Rtt => new(Interlocked.Read(ref _rtt)); + public TlsConnectionState? TlsState { get; set; } public bool InfoAlreadySent { get; set; } @@ -321,6 +326,14 @@ public sealed class NatsClient : IDisposable case CommandType.Pong: Interlocked.Exchange(ref _pingsOut, 0); + var rttStart = Interlocked.Read(ref _rttStartTicks); + if (rttStart > 0) + { + var elapsed = DateTime.UtcNow.Ticks - rttStart; + if (elapsed <= 0) elapsed = 1; // min 1 tick for Windows granularity + Interlocked.Exchange(ref _rtt, elapsed); + } + _flags.SetFlag(ClientFlags.FirstPongSent); break; case CommandType.Sub: @@ -611,6 +624,13 @@ public sealed class NatsClient : IDisposable { while (await timer.WaitForNextTickAsync(ct)) { + // Delay first PING until client has responded with PONG or 2 seconds elapsed + if (!_flags.HasFlag(ClientFlags.FirstPongSent) + && (DateTime.UtcNow - StartTime).TotalSeconds < 2) + { + continue; + } + var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastIn); if (elapsed < (long)_options.PingInterval.TotalMilliseconds) { @@ -629,6 +649,7 @@ public sealed class NatsClient : IDisposable var currentPingsOut = Interlocked.Increment(ref _pingsOut); _logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})", Id, currentPingsOut, _options.MaxPingsOut); + Interlocked.Exchange(ref _rttStartTicks, DateTime.UtcNow.Ticks); WriteProtocol(NatsProtocol.PingBytes); } } diff --git a/tests/NATS.Server.Tests/RttTests.cs b/tests/NATS.Server.Tests/RttTests.cs new file mode 100644 index 0000000..9632cf5 --- /dev/null +++ b/tests/NATS.Server.Tests/RttTests.cs @@ -0,0 +1,123 @@ +using System.Net; +using System.Net.Http.Json; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests; + +public class RttTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _natsPort; + private readonly int _monitorPort; + private readonly CancellationTokenSource _cts = new(); + private readonly HttpClient _http = new(); + + public RttTests() + { + _natsPort = GetFreePort(); + _monitorPort = GetFreePort(); + _server = new NatsServer( + new NatsOptions + { + Port = _natsPort, + MonitorPort = _monitorPort, + PingInterval = TimeSpan.FromMilliseconds(200), + MaxPingsOut = 4, + }, + NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + for (int i = 0; i < 50; i++) + { + try + { + var resp = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); + if (resp.IsSuccessStatusCode) break; + } + catch (HttpRequestException) { } + await Task.Delay(50); + } + } + + public async Task DisposeAsync() + { + _http.Dispose(); + await _cts.CancelAsync(); + _server.Dispose(); + } + + [Fact] + public async Task Rtt_populated_after_ping_pong_cycle() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); // INFO + + // Send CONNECT + PING (triggers firstPongSent) + await stream.WriteAsync("CONNECT {}\r\nPING\r\n"u8.ToArray()); + await stream.FlushAsync(); + _ = await stream.ReadAsync(buf); // PONG + + // Wait for server's PING cycle + await Task.Delay(500); + + // Read server PING and respond with PONG + var received = new byte[4096]; + int totalRead = 0; + bool gotPing = false; + using var readCts = new CancellationTokenSource(2000); + while (!gotPing && !readCts.IsCancellationRequested) + { + var n = await stream.ReadAsync(received.AsMemory(totalRead), readCts.Token); + totalRead += n; + var text = System.Text.Encoding.ASCII.GetString(received, 0, totalRead); + if (text.Contains("PING")) + { + gotPing = true; + await stream.WriteAsync("PONG\r\n"u8.ToArray()); + await stream.FlushAsync(); + } + } + + gotPing.ShouldBeTrue("Server should have sent PING"); + + // Wait for RTT to be computed + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz"); + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + var conn = connz.Conns.FirstOrDefault(c => c.Rtt != ""); + conn.ShouldNotBeNull("At least one connection should have RTT populated"); + } + + [Fact] + public async Task Connz_sort_by_rtt() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=rtt"); + response.StatusCode.ShouldBe(System.Net.HttpStatusCode.OK); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } +}