Files
natsdotnet/tests/NATS.Server.Monitoring.Tests/Events/ServerEventTests.cs
Joseph Doherty 0c086522a4 refactor: extract NATS.Server.Monitoring.Tests project
Move 39 monitoring, events, and system endpoint test files from
NATS.Server.Tests into a dedicated NATS.Server.Monitoring.Tests project.
Update namespaces, replace private GetFreePort/ReadUntilAsync with
TestUtilities shared helpers, add InternalsVisibleTo, and register
in the solution file. All 439 tests pass.
2026-03-12 15:44:12 -04:00

422 lines
15 KiB
C#

using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
using NATS.Server.Auth;
using NATS.Server.Events;
using NATS.Server.TestUtilities;
namespace NATS.Server.Monitoring.Tests.Events;
/// <summary>
/// Tests for server lifecycle events, stats tracking, advisory messages, and
/// $SYS subject infrastructure.
/// Go reference: events_test.go (51 tests).
/// </summary>
public class ServerEventTests : IAsyncLifetime
{
private readonly NatsServer _server;
private readonly int _port;
private readonly CancellationTokenSource _cts = new();
public ServerEventTests()
{
_port = TestPortAllocator.GetFreePort();
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
}
public async Task InitializeAsync()
{
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public async Task DisposeAsync()
{
await _cts.CancelAsync();
_server.Dispose();
}
private async Task<Socket> ConnectAndHandshakeAsync()
{
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(IPAddress.Loopback, _port);
// Read INFO
var buf = new byte[4096];
await sock.ReceiveAsync(buf, SocketFlags.None);
// Send CONNECT + PING
await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n"));
// Read PONG (may include -ERR or other lines)
await SocketTestHelper.ReadUntilAsync(sock, "PONG");
return sock;
}
// -----------------------------------------------------------------------
// Server lifecycle events
// -----------------------------------------------------------------------
/// <summary>
/// Server exposes Stats property at startup with all counters at zero.
/// Go reference: events_test.go TestServerEventsStatsZ (line ~100).
/// </summary>
[Fact]
public void Server_stats_initialized_to_zero_at_startup()
{
var stats = _server.Stats;
stats.InMsgs.ShouldBe(0L);
stats.OutMsgs.ShouldBe(0L);
stats.InBytes.ShouldBe(0L);
stats.OutBytes.ShouldBe(0L);
stats.SlowConsumers.ShouldBe(0L);
}
/// <summary>
/// TotalConnections increments each time a new client connects.
/// Go reference: events_test.go TestServerEventsTotalConnections (line ~150).
/// </summary>
[Fact]
public async Task TotalConnections_increments_on_each_new_connection()
{
var before = Interlocked.Read(ref _server.Stats.TotalConnections);
using var c1 = await ConnectAndHandshakeAsync();
using var c2 = await ConnectAndHandshakeAsync();
var after = Interlocked.Read(ref _server.Stats.TotalConnections);
(after - before).ShouldBeGreaterThanOrEqualTo(2L);
}
/// <summary>
/// ClientCount reflects only currently connected clients.
/// Go reference: events_test.go TestServerEventsStatsCID (line ~200).
/// </summary>
[Fact]
public async Task ClientCount_decrements_when_client_disconnects()
{
var sock = await ConnectAndHandshakeAsync();
var countWhileConnected = _server.ClientCount;
countWhileConnected.ShouldBeGreaterThanOrEqualTo(1);
sock.Shutdown(SocketShutdown.Both);
sock.Dispose();
// Allow server time to process the disconnection
await Task.Delay(100);
_server.ClientCount.ShouldBeLessThan(countWhileConnected + 1);
}
/// <summary>
/// Multiple simultaneous connections are tracked independently.
/// Go reference: events_test.go TestServerEventsConcurrentConns (line ~230).
/// </summary>
[Fact]
public async Task Multiple_connections_tracked_independently()
{
var before = Interlocked.Read(ref _server.Stats.TotalConnections);
using var c1 = await ConnectAndHandshakeAsync();
using var c2 = await ConnectAndHandshakeAsync();
using var c3 = await ConnectAndHandshakeAsync();
var after = Interlocked.Read(ref _server.Stats.TotalConnections);
(after - before).ShouldBeGreaterThanOrEqualTo(3L);
}
/// <summary>
/// Stats are accurate after rapid connect/disconnect cycles.
/// Go reference: events_test.go TestServerEventsStatsCounting (line ~260).
/// </summary>
[Fact]
public async Task Stats_accurate_after_rapid_connect_disconnect()
{
var before = Interlocked.Read(ref _server.Stats.TotalConnections);
for (var i = 0; i < 5; i++)
{
using var sock = await ConnectAndHandshakeAsync();
}
var after = Interlocked.Read(ref _server.Stats.TotalConnections);
(after - before).ShouldBeGreaterThanOrEqualTo(5L);
}
// -----------------------------------------------------------------------
// ServerStats counters — message/byte tracking
// -----------------------------------------------------------------------
/// <summary>
/// InMsgs and InBytes increment when clients publish.
/// Go reference: events_test.go TestServerEventsStatsz (line ~100).
/// </summary>
[Fact]
public async Task InMsgs_and_InBytes_increment_on_publish()
{
using var sock = await ConnectAndHandshakeAsync();
var beforeMsgs = Interlocked.Read(ref _server.Stats.InMsgs);
var beforeBytes = Interlocked.Read(ref _server.Stats.InBytes);
var payload = "Hello"u8.ToArray();
var pub = Encoding.ASCII.GetBytes($"PUB test.subject {payload.Length}\r\nHello\r\n");
await sock.SendAsync(pub);
// Flush via PING/PONG
await sock.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
await SocketTestHelper.ReadUntilAsync(sock, "PONG");
var afterMsgs = Interlocked.Read(ref _server.Stats.InMsgs);
var afterBytes = Interlocked.Read(ref _server.Stats.InBytes);
(afterMsgs - beforeMsgs).ShouldBeGreaterThanOrEqualTo(1L);
(afterBytes - beforeBytes).ShouldBeGreaterThanOrEqualTo(payload.Length);
}
/// <summary>
/// OutMsgs and OutBytes increment when messages are delivered to subscribers.
/// Go reference: events_test.go TestServerEventsStatsz (line ~100).
/// </summary>
[Fact]
public async Task OutMsgs_and_OutBytes_increment_on_delivery()
{
using var sub = await ConnectAndHandshakeAsync();
using var pub = await ConnectAndHandshakeAsync();
// Subscribe
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB test.out 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG");
var beforeOut = Interlocked.Read(ref _server.Stats.OutMsgs);
var payload = "World"u8.ToArray();
await pub.SendAsync(Encoding.ASCII.GetBytes($"PUB test.out {payload.Length}\r\nWorld\r\n"));
await pub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
await SocketTestHelper.ReadUntilAsync(pub, "PONG");
// Give delivery loop time to flush
await SocketTestHelper.ReadUntilAsync(sub, "World", timeoutMs: 2000);
var afterOut = Interlocked.Read(ref _server.Stats.OutMsgs);
(afterOut - beforeOut).ShouldBeGreaterThanOrEqualTo(1L);
}
// -----------------------------------------------------------------------
// Account stats events
// -----------------------------------------------------------------------
/// <summary>
/// Account.InMsgs and InBytes track messages received by clients in that account.
/// Go reference: events_test.go TestServerEventsStatsz (line ~100),
/// TestAccountStats (line ~400).
/// </summary>
[Fact]
public void Account_InMsgs_and_InBytes_increment_correctly()
{
// Account.IncrementInbound is the mechanism tracked server-side
var account = new Account("test-account");
account.IncrementInbound(3, 300);
account.InMsgs.ShouldBe(3L);
account.InBytes.ShouldBe(300L);
}
/// <summary>
/// Account.OutMsgs and OutBytes track messages delivered to clients in that account.
/// Go reference: events_test.go TestAccountStats (line ~400).
/// </summary>
[Fact]
public void Account_OutMsgs_and_OutBytes_increment_correctly()
{
var account = new Account("test-account");
account.IncrementOutbound(2, 200);
account.OutMsgs.ShouldBe(2L);
account.OutBytes.ShouldBe(200L);
}
/// <summary>
/// Per-account stats are isolated — changes to one account do not affect another.
/// Go reference: events_test.go TestAccountStats, TestServerEventsAccountIsolation (line ~420).
/// </summary>
[Fact]
public void Account_stats_are_isolated_between_accounts()
{
var a1 = new Account("account-one");
var a2 = new Account("account-two");
a1.IncrementInbound(10, 1000);
a2.IncrementInbound(5, 500);
a1.InMsgs.ShouldBe(10L);
a2.InMsgs.ShouldBe(5L);
a1.InBytes.ShouldBe(1000L);
a2.InBytes.ShouldBe(500L);
}
/// <summary>
/// Account stats start at zero and are independent of each other.
/// Go reference: events_test.go TestAccountStats (line ~400).
/// </summary>
[Fact]
public void Account_stats_start_at_zero()
{
var account = new Account("fresh");
account.InMsgs.ShouldBe(0L);
account.OutMsgs.ShouldBe(0L);
account.InBytes.ShouldBe(0L);
account.OutBytes.ShouldBe(0L);
}
// -----------------------------------------------------------------------
// Advisory messages — slow consumers, stale connections
// -----------------------------------------------------------------------
/// <summary>
/// ServerStats contains SlowConsumers counter for aggregate slow consumer tracking.
/// Go reference: events_test.go TestServerEventsSlowConsumer (line ~500).
/// </summary>
[Fact]
public void Stats_has_SlowConsumers_field()
{
var stats = _server.Stats;
// Field exists and starts at zero
Interlocked.Read(ref stats.SlowConsumers).ShouldBe(0L);
}
/// <summary>
/// ServerStats differentiates slow consumers by connection type.
/// Go reference: events_test.go TestServerEventsSlowConsumer (line ~500).
/// </summary>
[Fact]
public void Stats_has_per_type_SlowConsumer_fields()
{
var stats = _server.Stats;
// All per-type slow-consumer counters exist and start at zero
Interlocked.Read(ref stats.SlowConsumerClients).ShouldBe(0L);
Interlocked.Read(ref stats.SlowConsumerRoutes).ShouldBe(0L);
Interlocked.Read(ref stats.SlowConsumerLeafs).ShouldBe(0L);
Interlocked.Read(ref stats.SlowConsumerGateways).ShouldBe(0L);
}
/// <summary>
/// StaleConnections and per-type stale counters are tracked in ServerStats.
/// Go reference: events_test.go TestServerEventsStaleConnection (line ~550).
/// </summary>
[Fact]
public void Stats_has_StaleConnection_fields()
{
var stats = _server.Stats;
Interlocked.Read(ref stats.StaleConnections).ShouldBe(0L);
Interlocked.Read(ref stats.StaleConnectionClients).ShouldBe(0L);
Interlocked.Read(ref stats.StaleConnectionRoutes).ShouldBe(0L);
Interlocked.Read(ref stats.StaleConnectionLeafs).ShouldBe(0L);
Interlocked.Read(ref stats.StaleConnectionGateways).ShouldBe(0L);
}
// -----------------------------------------------------------------------
// JetStream API stats
// -----------------------------------------------------------------------
/// <summary>
/// JetStreamApiTotal and JetStreamApiErrors counters exist in ServerStats.
/// Go reference: events_test.go TestServerEventsStatsZ JetStream fields (line ~100).
/// </summary>
[Fact]
public void Stats_has_JetStream_api_counters()
{
var stats = _server.Stats;
Interlocked.Read(ref stats.JetStreamApiTotal).ShouldBe(0L);
Interlocked.Read(ref stats.JetStreamApiErrors).ShouldBe(0L);
}
// -----------------------------------------------------------------------
// $SYS subject event infrastructure
// -----------------------------------------------------------------------
/// <summary>
/// EventSubjects constants use $SYS prefix matching Go's event subject patterns.
/// Go reference: events.go:41-97 subject constants.
/// </summary>
[Fact]
public void EventSubjects_have_correct_SYS_prefixes()
{
EventSubjects.ConnectEvent.ShouldStartWith("$SYS.ACCOUNT.");
EventSubjects.DisconnectEvent.ShouldStartWith("$SYS.ACCOUNT.");
EventSubjects.ServerStats.ShouldStartWith("$SYS.SERVER.");
EventSubjects.ServerShutdown.ShouldStartWith("$SYS.SERVER.");
EventSubjects.AuthError.ShouldStartWith("$SYS.SERVER.");
}
/// <summary>
/// EventSubjects include format placeholders for account and server IDs.
/// Go reference: events.go:41-97 format string subject constants.
/// </summary>
[Fact]
public void EventSubjects_format_correctly_with_account_and_server_ids()
{
var connectSubject = string.Format(EventSubjects.ConnectEvent, "MY_ACCOUNT");
connectSubject.ShouldBe("$SYS.ACCOUNT.MY_ACCOUNT.CONNECT");
var statsSubject = string.Format(EventSubjects.ServerStats, "SERVER123");
statsSubject.ShouldBe("$SYS.SERVER.SERVER123.STATSZ");
var shutdownSubject = string.Format(EventSubjects.ServerShutdown, "SERVER123");
shutdownSubject.ShouldBe("$SYS.SERVER.SERVER123.SHUTDOWN");
}
/// <summary>
/// NatsServer exposes a non-null EventSystem after startup.
/// Go reference: events.go initEventTracking — event system initialised during server start.
/// </summary>
[Fact]
public void Server_has_EventSystem_after_start()
{
_server.EventSystem.ShouldNotBeNull();
}
/// <summary>
/// InternalEventSystem.PublishServerStats produces a ServerStatsMsg with server
/// identity and current stats data without throwing.
/// Go reference: events.go sendStatsz (line ~495).
/// </summary>
[Fact]
public void EventSystem_PublishServerStats_does_not_throw()
{
var eventSystem = _server.EventSystem;
eventSystem.ShouldNotBeNull();
// Calling PublishServerStats directly must not throw
var ex = Record.Exception(() => eventSystem!.PublishServerStats());
ex.ShouldBeNull();
}
/// <summary>
/// InternalEventSystem generates unique, monotonically increasing sequence numbers.
/// Go reference: events.go NextSequence / sequence counter (line ~59).
/// </summary>
[Fact]
public void EventSystem_sequence_numbers_are_monotonically_increasing()
{
var es = _server.EventSystem;
es.ShouldNotBeNull();
var s1 = es!.NextSequence();
var s2 = es.NextSequence();
var s3 = es.NextSequence();
s2.ShouldBeGreaterThan(s1);
s3.ShouldBeGreaterThan(s2);
}
/// <summary>
/// BuildEventServerInfo embeds the server name and ID in advisory messages.
/// Go reference: events.go serverInfo() helper (line ~1368 in NatsServer.cs).
/// </summary>
[Fact]
public void BuildEventServerInfo_contains_server_identity()
{
var info = _server.BuildEventServerInfo();
info.ShouldNotBeNull();
info.Id.ShouldNotBeNullOrWhiteSpace();
info.Name.ShouldNotBeNullOrWhiteSpace();
}
}