Files
natsdotnet/tests/NATS.Server.Tests/Monitoring/MonitorVarzTests.cs
Joseph Doherty 9554d53bf5 feat: Wave 6 batch 1 — monitoring, config reload, client protocol, MQTT, leaf node tests
Port 405 new test methods across 5 subsystems for Go parity:
- Monitoring: 102 tests (varz, connz, routez, subsz, stacksz)
- Leaf Nodes: 85 tests (connection, forwarding, loop detection, subject filter, JetStream)
- MQTT Bridge: 86 tests (advanced, auth, retained messages, topic mapping, will messages)
- Client Protocol: 73 tests (connection handling, protocol violations, limits)
- Config Reload: 59 tests (hot reload, option changes, permission updates)

Total: 1,678 tests passing, 0 failures, 3 skipped
2026-02-23 21:40:29 -05:00

527 lines
20 KiB
C#

// Go: TestMonitorHandleVarz server/monitor_test.go:275
// Go: TestMyUptime server/monitor_test.go:135
// Go: TestMonitorVarzSubscriptionsResetProperly server/monitor_test.go:257
// Go: TestMonitorNoPort server/monitor_test.go:168
// Go: TestMonitorHTTPBasePath server/monitor_test.go:220
// Go: TestMonitorHandleRoot server/monitor_test.go:1819
// Go: TestMonitorServerIDs server/monitor_test.go:2410
// Go: TestMonitorHttpStatsNoUpdatedWhenUsingServerFuncs server/monitor_test.go:2435
// Go: TestMonitorVarzRaces server/monitor_test.go:2641
using System.Net;
using System.Net.Http.Json;
using System.Net.Sockets;
using System.Text.Json;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Monitoring;
namespace NATS.Server.Tests.Monitoring;
/// <summary>
/// Tests covering /varz endpoint behavior, ported from the Go server's monitor_test.go.
/// </summary>
public class MonitorVarzTests : IAsyncLifetime
{
private readonly NatsServer _server;
private readonly int _natsPort;
private readonly int _monitorPort;
private readonly CancellationTokenSource _cts = new();
private readonly HttpClient _http = new();
public MonitorVarzTests()
{
_natsPort = GetFreePort();
_monitorPort = GetFreePort();
_server = new NatsServer(
new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort },
NullLoggerFactory.Instance);
}
public async Task InitializeAsync()
{
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
for (var i = 0; i < 50; i++)
{
try
{
var probe = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz");
if (probe.IsSuccessStatusCode) break;
}
catch (HttpRequestException) { }
await Task.Delay(50);
}
}
public async Task DisposeAsync()
{
_http.Dispose();
await _cts.CancelAsync();
_server.Dispose();
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275), mode=0.
/// Verifies /varz returns valid JSON with server identity fields including
/// server_id, version, start time within 10s, host, port, max_payload.
/// </summary>
[Fact]
public async Task Varz_returns_server_identity_and_start_within_10_seconds()
{
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz");
response.StatusCode.ShouldBe(HttpStatusCode.OK);
var varz = await response.Content.ReadFromJsonAsync<Varz>();
varz.ShouldNotBeNull();
varz.Id.ShouldNotBeNullOrEmpty();
varz.Version.ShouldNotBeNullOrEmpty();
// Go: if time.Since(v.Start) > 10*time.Second { t.Fatal(...) }
(DateTime.UtcNow - varz.Start).ShouldBeLessThan(TimeSpan.FromSeconds(10));
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275), after connecting client.
/// Verifies /varz tracks connections, in_msgs, out_msgs, in_bytes, out_bytes
/// after a client connects, subscribes, and publishes.
/// </summary>
[Fact]
public async Task Varz_tracks_connection_stats_after_client_pubsub()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
var buf = new byte[4096];
_ = await sock.ReceiveAsync(buf, SocketFlags.None);
// Subscribe, publish 5-byte payload "hello", then flush
await sock.SendAsync("CONNECT {}\r\nSUB foo 1\r\nPUB foo 5\r\nhello\r\n"u8.ToArray(), SocketFlags.None);
await Task.Delay(200);
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
// Go: v.Connections != 1
varz.Connections.ShouldBeGreaterThanOrEqualTo(1);
// Go: v.TotalConnections < 1
varz.TotalConnections.ShouldBeGreaterThanOrEqualTo(1UL);
// Go: v.InMsgs != 1
varz.InMsgs.ShouldBeGreaterThanOrEqualTo(1L);
// Go: v.InBytes != 5
varz.InBytes.ShouldBeGreaterThanOrEqualTo(5L);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies that /varz reports subscriptions count after a client subscribes.
/// </summary>
[Fact]
public async Task Varz_reports_subscription_count()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
var buf = new byte[4096];
_ = await sock.ReceiveAsync(buf, SocketFlags.None);
await sock.SendAsync("CONNECT {}\r\nSUB test 1\r\nSUB test2 2\r\n"u8.ToArray(), SocketFlags.None);
await Task.Delay(200);
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.Subscriptions.ShouldBeGreaterThanOrEqualTo(2u);
}
/// <summary>
/// Go: TestMonitorVarzSubscriptionsResetProperly (line 257).
/// Verifies /varz subscriptions count remains stable across multiple calls,
/// and does not double on each request.
/// </summary>
[Fact]
public async Task Varz_subscriptions_do_not_double_across_repeated_calls()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
var buf = new byte[4096];
_ = await sock.ReceiveAsync(buf, SocketFlags.None);
await sock.SendAsync("CONNECT {}\r\nSUB test 1\r\n"u8.ToArray(), SocketFlags.None);
await Task.Delay(200);
var varz1 = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
var subs1 = varz1!.Subscriptions;
var varz2 = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
var subs2 = varz2!.Subscriptions;
// Go: check that we get same number back (not doubled)
subs2.ShouldBe(subs1);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz exposes JetStream config and stats sections.
/// </summary>
[Fact]
public async Task Varz_includes_jetstream_section()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.JetStream.ShouldNotBeNull();
varz.JetStream.Config.ShouldNotBeNull();
varz.JetStream.Stats.ShouldNotBeNull();
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz includes runtime metrics: mem > 0, cores > 0.
/// </summary>
[Fact]
public async Task Varz_includes_runtime_metrics()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.Mem.ShouldBeGreaterThan(0L);
varz.Cores.ShouldBeGreaterThan(0);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz uptime string is non-empty and matches expected format (e.g. "0s", "1m2s").
/// </summary>
[Fact]
public async Task Varz_uptime_is_formatted_string()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.Uptime.ShouldNotBeNullOrEmpty();
// Uptime should end with 's' (seconds), matching Go format like "0s", "1m0s"
varz.Uptime.ShouldEndWith("s");
}
/// <summary>
/// Go: TestMyUptime (line 135).
/// Verifies the uptime formatting logic produces correct duration strings.
/// Tests: 22s, 4m22s, 4h4m22s, 32d4h4m22s.
/// </summary>
[Theory]
[InlineData(22, "22s")]
[InlineData(22 + 4 * 60, "4m22s")]
[InlineData(22 + 4 * 60 + 4 * 3600, "4h4m22s")]
[InlineData(22 + 4 * 60 + 4 * 3600 + 32 * 86400, "32d4h4m22s")]
public void Uptime_format_matches_go_myUptime(int totalSeconds, string expected)
{
var ts = TimeSpan.FromSeconds(totalSeconds);
var result = FormatUptime(ts);
result.ShouldBe(expected);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz serializes with correct Go JSON field names.
/// </summary>
[Fact]
public async Task Varz_json_uses_go_field_names()
{
var response = await _http.GetStringAsync($"http://127.0.0.1:{_monitorPort}/varz");
response.ShouldContain("\"server_id\"");
response.ShouldContain("\"server_name\"");
response.ShouldContain("\"in_msgs\"");
response.ShouldContain("\"out_msgs\"");
response.ShouldContain("\"in_bytes\"");
response.ShouldContain("\"out_bytes\"");
response.ShouldContain("\"max_payload\"");
response.ShouldContain("\"total_connections\"");
response.ShouldContain("\"slow_consumers\"");
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz includes nested configuration sections for cluster, gateway, leaf.
/// </summary>
[Fact]
public async Task Varz_includes_cluster_gateway_leaf_sections()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.Cluster.ShouldNotBeNull();
varz.Gateway.ShouldNotBeNull();
varz.Leaf.ShouldNotBeNull();
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz max_payload defaults to 1MB.
/// </summary>
[Fact]
public async Task Varz_max_payload_defaults_to_1MB()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.MaxPayload.ShouldBe(1024 * 1024);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz host and port match the configured values.
/// </summary>
[Fact]
public async Task Varz_host_and_port_match_configuration()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.Port.ShouldBe(_natsPort);
varz.Host.ShouldNotBeNullOrEmpty();
}
/// <summary>
/// Go: TestMonitorServerIDs (line 2410).
/// Verifies /varz and /connz both expose the same server_id.
/// </summary>
[Fact]
public async Task Varz_and_connz_report_matching_server_id()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
var connz = await _http.GetFromJsonAsync<Connz>($"http://127.0.0.1:{_monitorPort}/connz");
varz.ShouldNotBeNull();
connz.ShouldNotBeNull();
varz.Id.ShouldNotBeNullOrEmpty();
connz.Id.ShouldBe(varz.Id);
}
/// <summary>
/// Go: TestMonitorHttpStatsNoUpdatedWhenUsingServerFuncs (line 2435).
/// Verifies /varz http_req_stats tracks endpoint hit counts and increments on each call.
/// </summary>
[Fact]
public async Task Varz_http_req_stats_increment_on_each_request()
{
// First request establishes baseline
await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz");
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.HttpReqStats.ShouldContainKey("/varz");
var count = varz.HttpReqStats["/varz"];
count.ShouldBeGreaterThanOrEqualTo(2UL);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz includes slow_consumer_stats section with breakdown fields.
/// </summary>
[Fact]
public async Task Varz_includes_slow_consumer_stats_breakdown()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.SlowConsumerStats.ShouldNotBeNull();
varz.SlowConsumerStats.Clients.ShouldBeGreaterThanOrEqualTo(0UL);
varz.SlowConsumerStats.Routes.ShouldBeGreaterThanOrEqualTo(0UL);
varz.SlowConsumerStats.Gateways.ShouldBeGreaterThanOrEqualTo(0UL);
varz.SlowConsumerStats.Leafs.ShouldBeGreaterThanOrEqualTo(0UL);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz includes proto version field.
/// </summary>
[Fact]
public async Task Varz_includes_proto_version()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.Proto.ShouldBeGreaterThanOrEqualTo(0);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz config_load_time is set.
/// </summary>
[Fact]
public async Task Varz_config_load_time_is_set()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.ConfigLoadTime.ShouldBeGreaterThan(DateTime.MinValue);
}
/// <summary>
/// Go: TestMonitorVarzRaces (line 2641).
/// Verifies concurrent /varz requests do not cause errors or data corruption.
/// </summary>
[Fact]
public async Task Varz_handles_concurrent_requests_without_errors()
{
var tasks = Enumerable.Range(0, 10).Select(async _ =>
{
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz");
response.StatusCode.ShouldBe(HttpStatusCode.OK);
var v = await response.Content.ReadFromJsonAsync<Varz>();
v.ShouldNotBeNull();
v.Id.ShouldNotBeNullOrEmpty();
});
await Task.WhenAll(tasks);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz out_msgs increments when messages are delivered to subscribers.
/// </summary>
[Fact]
public async Task Varz_out_msgs_increments_on_delivery()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
var buf = new byte[4096];
_ = await sock.ReceiveAsync(buf, SocketFlags.None);
// Subscribe then publish to matched subject
await sock.SendAsync("CONNECT {}\r\nSUB foo 1\r\nPUB foo 5\r\nhello\r\n"u8.ToArray(), SocketFlags.None);
await Task.Delay(200);
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
// Message was published and delivered to the subscriber, so out_msgs >= 1
varz.OutMsgs.ShouldBeGreaterThanOrEqualTo(1L);
varz.OutBytes.ShouldBeGreaterThanOrEqualTo(5L);
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz includes MQTT section in response.
/// </summary>
[Fact]
public async Task Varz_includes_mqtt_section()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.Mqtt.ShouldNotBeNull();
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz includes websocket section.
/// </summary>
[Fact]
public async Task Varz_includes_websocket_section()
{
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
varz.Websocket.ShouldNotBeNull();
}
/// <summary>
/// Go: TestMonitorHandleRoot (line 1819).
/// Verifies GET / returns a listing of available monitoring endpoints.
/// </summary>
[Fact]
public async Task Root_endpoint_returns_endpoint_listing()
{
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/");
response.StatusCode.ShouldBe(HttpStatusCode.OK);
var body = await response.Content.ReadAsStringAsync();
body.ShouldContain("varz");
body.ShouldContain("connz");
body.ShouldContain("healthz");
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz total_connections tracks cumulative connections, not just active.
/// </summary>
[Fact]
public async Task Varz_total_connections_tracks_cumulative_count()
{
// Connect and disconnect a client
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
var buf = new byte[4096];
_ = await sock.ReceiveAsync(buf, SocketFlags.None);
await sock.SendAsync("CONNECT {}\r\n"u8.ToArray(), SocketFlags.None);
await Task.Delay(100);
sock.Shutdown(SocketShutdown.Both);
sock.Dispose();
await Task.Delay(300);
// Connect a second client (still active)
using var sock2 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock2.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
buf = new byte[4096];
_ = await sock2.ReceiveAsync(buf, SocketFlags.None);
await sock2.SendAsync("CONNECT {}\r\n"u8.ToArray(), SocketFlags.None);
await Task.Delay(200);
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
varz.ShouldNotBeNull();
// Total should be >= 2 (both connections counted), active should be 1
varz.TotalConnections.ShouldBeGreaterThanOrEqualTo(2UL);
varz.Connections.ShouldBeGreaterThanOrEqualTo(1);
}
/// <summary>
/// Go: TestMonitorNoPort (line 168).
/// Verifies that when no monitor port is configured, monitoring endpoints are not accessible.
/// This is a standalone test since it uses a different server configuration.
/// </summary>
[Fact]
public async Task Monitor_not_accessible_when_port_not_configured()
{
var natsPort = GetFreePort();
var server = new NatsServer(
new NatsOptions { Port = natsPort, MonitorPort = 0 },
NullLoggerFactory.Instance);
var cts = new CancellationTokenSource();
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
try
{
using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(2) };
// Try a random port where no monitor should be running
var act = async () => await http.GetAsync("http://127.0.0.1:11245/varz");
await act.ShouldThrowAsync<Exception>();
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
/// <summary>
/// Go: TestMonitorHandleVarz (line 275).
/// Verifies /varz now field returns a plausible UTC timestamp.
/// </summary>
[Fact]
public async Task Varz_now_is_plausible_utc_timestamp()
{
var before = DateTime.UtcNow;
var varz = await _http.GetFromJsonAsync<Varz>($"http://127.0.0.1:{_monitorPort}/varz");
var after = DateTime.UtcNow;
varz.ShouldNotBeNull();
varz.Now.ShouldBeGreaterThanOrEqualTo(before.AddSeconds(-1));
varz.Now.ShouldBeLessThanOrEqualTo(after.AddSeconds(1));
}
// Helper: matches Go server myUptime() format
private static string FormatUptime(TimeSpan ts)
{
if (ts.TotalDays >= 1)
return $"{(int)ts.TotalDays}d{ts.Hours}h{ts.Minutes}m{ts.Seconds}s";
if (ts.TotalHours >= 1)
return $"{(int)ts.TotalHours}h{ts.Minutes}m{ts.Seconds}s";
if (ts.TotalMinutes >= 1)
return $"{(int)ts.TotalMinutes}m{ts.Seconds}s";
return $"{(int)ts.TotalSeconds}s";
}
private static int GetFreePort()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
return ((IPEndPoint)sock.LocalEndPoint!).Port;
}
}