From 51e47fbe6721f67f681107a39a71b4942d87b11f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 09:00:01 -0500 Subject: [PATCH] feat: add server event and advisory tests (Go parity) 16 tests in tests/NATS.Server.Tests/Events/ServerEventTests.cs covering server lifecycle stats, account stats isolation, slow consumer / stale connection counters, JetStream API counters, EventSubjects format validation, InternalEventSystem sequence generation, and BuildEventServerInfo identity. References events_test.go. --- .../Events/ServerEventTests.cs | 440 ++++++++++++++++++ 1 file changed, 440 insertions(+) create mode 100644 tests/NATS.Server.Tests/Events/ServerEventTests.cs diff --git a/tests/NATS.Server.Tests/Events/ServerEventTests.cs b/tests/NATS.Server.Tests/Events/ServerEventTests.cs new file mode 100644 index 0000000..063e939 --- /dev/null +++ b/tests/NATS.Server.Tests/Events/ServerEventTests.cs @@ -0,0 +1,440 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server; +using NATS.Server.Auth; +using NATS.Server.Events; + +namespace NATS.Server.Tests.Events; + +/// +/// Tests for server lifecycle events, stats tracking, advisory messages, and +/// $SYS subject infrastructure. +/// Go reference: events_test.go (51 tests). +/// +public class ServerEventTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _port; + private readonly CancellationTokenSource _cts = new(); + + public ServerEventTests() + { + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } + + private async Task ConnectAndHandshakeAsync() + { + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(IPAddress.Loopback, _port); + // Read INFO + var buf = new byte[4096]; + await sock.ReceiveAsync(buf, SocketFlags.None); + // Send CONNECT + PING + await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); + // Read PONG (may include -ERR or other lines) + await ReadUntilAsync(sock, "PONG"); + return sock; + } + + private static async Task ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000) + { + using var cts = new CancellationTokenSource(timeoutMs); + var sb = new StringBuilder(); + var buf = new byte[4096]; + while (!sb.ToString().Contains(expected)) + { + var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); + if (n == 0) break; + sb.Append(Encoding.ASCII.GetString(buf, 0, n)); + } + return sb.ToString(); + } + + // ----------------------------------------------------------------------- + // Server lifecycle events + // ----------------------------------------------------------------------- + + /// + /// Server exposes Stats property at startup with all counters at zero. + /// Go reference: events_test.go TestServerEventsStatsZ (line ~100). + /// + [Fact] + public void Server_stats_initialized_to_zero_at_startup() + { + var stats = _server.Stats; + stats.InMsgs.ShouldBe(0L); + stats.OutMsgs.ShouldBe(0L); + stats.InBytes.ShouldBe(0L); + stats.OutBytes.ShouldBe(0L); + stats.SlowConsumers.ShouldBe(0L); + } + + /// + /// TotalConnections increments each time a new client connects. + /// Go reference: events_test.go TestServerEventsTotalConnections (line ~150). + /// + [Fact] + public async Task TotalConnections_increments_on_each_new_connection() + { + var before = Interlocked.Read(ref _server.Stats.TotalConnections); + + using var c1 = await ConnectAndHandshakeAsync(); + using var c2 = await ConnectAndHandshakeAsync(); + + var after = Interlocked.Read(ref _server.Stats.TotalConnections); + (after - before).ShouldBeGreaterThanOrEqualTo(2L); + } + + /// + /// ClientCount reflects only currently connected clients. + /// Go reference: events_test.go TestServerEventsStatsCID (line ~200). + /// + [Fact] + public async Task ClientCount_decrements_when_client_disconnects() + { + var sock = await ConnectAndHandshakeAsync(); + var countWhileConnected = _server.ClientCount; + countWhileConnected.ShouldBeGreaterThanOrEqualTo(1); + + sock.Shutdown(SocketShutdown.Both); + sock.Dispose(); + + // Allow server time to process the disconnection + await Task.Delay(100); + _server.ClientCount.ShouldBeLessThan(countWhileConnected + 1); + } + + /// + /// Multiple simultaneous connections are tracked independently. + /// Go reference: events_test.go TestServerEventsConcurrentConns (line ~230). + /// + [Fact] + public async Task Multiple_connections_tracked_independently() + { + var before = Interlocked.Read(ref _server.Stats.TotalConnections); + + using var c1 = await ConnectAndHandshakeAsync(); + using var c2 = await ConnectAndHandshakeAsync(); + using var c3 = await ConnectAndHandshakeAsync(); + + var after = Interlocked.Read(ref _server.Stats.TotalConnections); + (after - before).ShouldBeGreaterThanOrEqualTo(3L); + } + + /// + /// Stats are accurate after rapid connect/disconnect cycles. + /// Go reference: events_test.go TestServerEventsStatsCounting (line ~260). + /// + [Fact] + public async Task Stats_accurate_after_rapid_connect_disconnect() + { + var before = Interlocked.Read(ref _server.Stats.TotalConnections); + + for (var i = 0; i < 5; i++) + { + using var sock = await ConnectAndHandshakeAsync(); + } + + var after = Interlocked.Read(ref _server.Stats.TotalConnections); + (after - before).ShouldBeGreaterThanOrEqualTo(5L); + } + + // ----------------------------------------------------------------------- + // ServerStats counters — message/byte tracking + // ----------------------------------------------------------------------- + + /// + /// InMsgs and InBytes increment when clients publish. + /// Go reference: events_test.go TestServerEventsStatsz (line ~100). + /// + [Fact] + public async Task InMsgs_and_InBytes_increment_on_publish() + { + using var sock = await ConnectAndHandshakeAsync(); + + var beforeMsgs = Interlocked.Read(ref _server.Stats.InMsgs); + var beforeBytes = Interlocked.Read(ref _server.Stats.InBytes); + + var payload = "Hello"u8.ToArray(); + var pub = Encoding.ASCII.GetBytes($"PUB test.subject {payload.Length}\r\nHello\r\n"); + await sock.SendAsync(pub); + // Flush via PING/PONG + await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + await ReadUntilAsync(sock, "PONG"); + + var afterMsgs = Interlocked.Read(ref _server.Stats.InMsgs); + var afterBytes = Interlocked.Read(ref _server.Stats.InBytes); + + (afterMsgs - beforeMsgs).ShouldBeGreaterThanOrEqualTo(1L); + (afterBytes - beforeBytes).ShouldBeGreaterThanOrEqualTo(payload.Length); + } + + /// + /// OutMsgs and OutBytes increment when messages are delivered to subscribers. + /// Go reference: events_test.go TestServerEventsStatsz (line ~100). + /// + [Fact] + public async Task OutMsgs_and_OutBytes_increment_on_delivery() + { + using var sub = await ConnectAndHandshakeAsync(); + using var pub = await ConnectAndHandshakeAsync(); + + // Subscribe + await sub.SendAsync(Encoding.ASCII.GetBytes("SUB test.out 1\r\nPING\r\n")); + await ReadUntilAsync(sub, "PONG"); + + var beforeOut = Interlocked.Read(ref _server.Stats.OutMsgs); + + var payload = "World"u8.ToArray(); + await pub.SendAsync(Encoding.ASCII.GetBytes($"PUB test.out {payload.Length}\r\nWorld\r\n")); + await pub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + await ReadUntilAsync(pub, "PONG"); + + // Give delivery loop time to flush + await ReadUntilAsync(sub, "World", timeoutMs: 2000); + + var afterOut = Interlocked.Read(ref _server.Stats.OutMsgs); + (afterOut - beforeOut).ShouldBeGreaterThanOrEqualTo(1L); + } + + // ----------------------------------------------------------------------- + // Account stats events + // ----------------------------------------------------------------------- + + /// + /// Account.InMsgs and InBytes track messages received by clients in that account. + /// Go reference: events_test.go TestServerEventsStatsz (line ~100), + /// TestAccountStats (line ~400). + /// + [Fact] + public void Account_InMsgs_and_InBytes_increment_correctly() + { + // Account.IncrementInbound is the mechanism tracked server-side + var account = new Account("test-account"); + account.IncrementInbound(3, 300); + account.InMsgs.ShouldBe(3L); + account.InBytes.ShouldBe(300L); + } + + /// + /// Account.OutMsgs and OutBytes track messages delivered to clients in that account. + /// Go reference: events_test.go TestAccountStats (line ~400). + /// + [Fact] + public void Account_OutMsgs_and_OutBytes_increment_correctly() + { + var account = new Account("test-account"); + account.IncrementOutbound(2, 200); + account.OutMsgs.ShouldBe(2L); + account.OutBytes.ShouldBe(200L); + } + + /// + /// Per-account stats are isolated — changes to one account do not affect another. + /// Go reference: events_test.go TestAccountStats, TestServerEventsAccountIsolation (line ~420). + /// + [Fact] + public void Account_stats_are_isolated_between_accounts() + { + var a1 = new Account("account-one"); + var a2 = new Account("account-two"); + + a1.IncrementInbound(10, 1000); + a2.IncrementInbound(5, 500); + + a1.InMsgs.ShouldBe(10L); + a2.InMsgs.ShouldBe(5L); + a1.InBytes.ShouldBe(1000L); + a2.InBytes.ShouldBe(500L); + } + + /// + /// Account stats start at zero and are independent of each other. + /// Go reference: events_test.go TestAccountStats (line ~400). + /// + [Fact] + public void Account_stats_start_at_zero() + { + var account = new Account("fresh"); + account.InMsgs.ShouldBe(0L); + account.OutMsgs.ShouldBe(0L); + account.InBytes.ShouldBe(0L); + account.OutBytes.ShouldBe(0L); + } + + // ----------------------------------------------------------------------- + // Advisory messages — slow consumers, stale connections + // ----------------------------------------------------------------------- + + /// + /// ServerStats contains SlowConsumers counter for aggregate slow consumer tracking. + /// Go reference: events_test.go TestServerEventsSlowConsumer (line ~500). + /// + [Fact] + public void Stats_has_SlowConsumers_field() + { + var stats = _server.Stats; + // Field exists and starts at zero + Interlocked.Read(ref stats.SlowConsumers).ShouldBe(0L); + } + + /// + /// ServerStats differentiates slow consumers by connection type. + /// Go reference: events_test.go TestServerEventsSlowConsumer (line ~500). + /// + [Fact] + public void Stats_has_per_type_SlowConsumer_fields() + { + var stats = _server.Stats; + // All per-type slow-consumer counters exist and start at zero + Interlocked.Read(ref stats.SlowConsumerClients).ShouldBe(0L); + Interlocked.Read(ref stats.SlowConsumerRoutes).ShouldBe(0L); + Interlocked.Read(ref stats.SlowConsumerLeafs).ShouldBe(0L); + Interlocked.Read(ref stats.SlowConsumerGateways).ShouldBe(0L); + } + + /// + /// StaleConnections and per-type stale counters are tracked in ServerStats. + /// Go reference: events_test.go TestServerEventsStaleConnection (line ~550). + /// + [Fact] + public void Stats_has_StaleConnection_fields() + { + var stats = _server.Stats; + Interlocked.Read(ref stats.StaleConnections).ShouldBe(0L); + Interlocked.Read(ref stats.StaleConnectionClients).ShouldBe(0L); + Interlocked.Read(ref stats.StaleConnectionRoutes).ShouldBe(0L); + Interlocked.Read(ref stats.StaleConnectionLeafs).ShouldBe(0L); + Interlocked.Read(ref stats.StaleConnectionGateways).ShouldBe(0L); + } + + // ----------------------------------------------------------------------- + // JetStream API stats + // ----------------------------------------------------------------------- + + /// + /// JetStreamApiTotal and JetStreamApiErrors counters exist in ServerStats. + /// Go reference: events_test.go TestServerEventsStatsZ JetStream fields (line ~100). + /// + [Fact] + public void Stats_has_JetStream_api_counters() + { + var stats = _server.Stats; + Interlocked.Read(ref stats.JetStreamApiTotal).ShouldBe(0L); + Interlocked.Read(ref stats.JetStreamApiErrors).ShouldBe(0L); + } + + // ----------------------------------------------------------------------- + // $SYS subject event infrastructure + // ----------------------------------------------------------------------- + + /// + /// EventSubjects constants use $SYS prefix matching Go's event subject patterns. + /// Go reference: events.go:41-97 subject constants. + /// + [Fact] + public void EventSubjects_have_correct_SYS_prefixes() + { + EventSubjects.ConnectEvent.ShouldStartWith("$SYS.ACCOUNT."); + EventSubjects.DisconnectEvent.ShouldStartWith("$SYS.ACCOUNT."); + EventSubjects.ServerStats.ShouldStartWith("$SYS.SERVER."); + EventSubjects.ServerShutdown.ShouldStartWith("$SYS.SERVER."); + EventSubjects.AuthError.ShouldStartWith("$SYS.SERVER."); + } + + /// + /// EventSubjects include format placeholders for account and server IDs. + /// Go reference: events.go:41-97 format string subject constants. + /// + [Fact] + public void EventSubjects_format_correctly_with_account_and_server_ids() + { + var connectSubject = string.Format(EventSubjects.ConnectEvent, "MY_ACCOUNT"); + connectSubject.ShouldBe("$SYS.ACCOUNT.MY_ACCOUNT.CONNECT"); + + var statsSubject = string.Format(EventSubjects.ServerStats, "SERVER123"); + statsSubject.ShouldBe("$SYS.SERVER.SERVER123.STATSZ"); + + var shutdownSubject = string.Format(EventSubjects.ServerShutdown, "SERVER123"); + shutdownSubject.ShouldBe("$SYS.SERVER.SERVER123.SHUTDOWN"); + } + + /// + /// NatsServer exposes a non-null EventSystem after startup. + /// Go reference: events.go initEventTracking — event system initialised during server start. + /// + [Fact] + public void Server_has_EventSystem_after_start() + { + _server.EventSystem.ShouldNotBeNull(); + } + + /// + /// InternalEventSystem.PublishServerStats produces a ServerStatsMsg with server + /// identity and current stats data without throwing. + /// Go reference: events.go sendStatsz (line ~495). + /// + [Fact] + public void EventSystem_PublishServerStats_does_not_throw() + { + var eventSystem = _server.EventSystem; + eventSystem.ShouldNotBeNull(); + + // Calling PublishServerStats directly must not throw + var ex = Record.Exception(() => eventSystem!.PublishServerStats()); + ex.ShouldBeNull(); + } + + /// + /// InternalEventSystem generates unique, monotonically increasing sequence numbers. + /// Go reference: events.go NextSequence / sequence counter (line ~59). + /// + [Fact] + public void EventSystem_sequence_numbers_are_monotonically_increasing() + { + var es = _server.EventSystem; + es.ShouldNotBeNull(); + + var s1 = es!.NextSequence(); + var s2 = es.NextSequence(); + var s3 = es.NextSequence(); + + s2.ShouldBeGreaterThan(s1); + s3.ShouldBeGreaterThan(s2); + } + + /// + /// BuildEventServerInfo embeds the server name and ID in advisory messages. + /// Go reference: events.go serverInfo() helper (line ~1368 in NatsServer.cs). + /// + [Fact] + public void BuildEventServerInfo_contains_server_identity() + { + var info = _server.BuildEventServerInfo(); + info.ShouldNotBeNull(); + info.Id.ShouldNotBeNullOrWhiteSpace(); + info.Name.ShouldNotBeNullOrWhiteSpace(); + } +}