diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index e9545e4..164254a 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -38,6 +38,43 @@ public sealed class InternalSystemMessage public required SystemMessageHandler Callback { get; init; } } +/// +/// Detail payload for an auth error advisory. +/// Provides the caller-supplied fields used to populate . +/// Go reference: server.go sendAuthErrorEvent — arguments passed when constructing the advisory. +/// +public sealed record AuthErrorDetail( + ulong ClientId, + string RemoteAddress, + string? AccountName, + string? UserName, + string Reason, + DateTime OccurredAt); + +/// +/// Detail payload for a client connect advisory. +/// Provides the caller-supplied fields used to populate . +/// Go reference: events.go sendConnect / postConnectEvent. +/// +public sealed record ConnectEventDetail( + ulong ClientId, + string RemoteAddress, + string? AccountName, + string? UserName, + DateTime ConnectedAt); + +/// +/// Detail payload for a client disconnect advisory. +/// Provides the caller-supplied fields used to populate . +/// Go reference: events.go sendDisconnect / postDisconnectEvent. +/// +public sealed record DisconnectEventDetail( + ulong ClientId, + string RemoteAddress, + string? AccountName, + string Reason, + DateTime DisconnectedAt); + /// /// Manages the server's internal event system with Channel-based send/receive loops. /// Maps to Go's internal struct in events.go:124-147 and the goroutines @@ -59,11 +96,18 @@ public sealed class InternalEventSystem : IAsyncDisposable private ulong _sequence; private int _subscriptionId; private readonly ConcurrentDictionary _callbacks = new(); + private long _authErrorEventCount; public Account SystemAccount { get; } public InternalClient SystemClient { get; } public string ServerHash { get; } + /// + /// Number of auth error events sent since this instance was created. + /// Go reference: server stats tracking of auth error advisories. + /// + public long AuthErrorEventCount => Interlocked.Read(ref _authErrorEventCount); + public InternalEventSystem(Account systemAccount, InternalClient systemClient, string serverName, ILogger logger) { _logger = logger; @@ -225,6 +269,85 @@ public sealed class InternalEventSystem : IAsyncDisposable /// public ulong NextSequence() => Interlocked.Increment(ref _sequence); + /// + /// Publishes a client auth-error advisory to $SYS.SERVER.{id}.CLIENT.AUTH.ERR. + /// Increments each time it is called. + /// Go reference: events.go:2631 sendAuthErrorEvent. + /// + public void SendAuthErrorEvent(string serverId, AuthErrorDetail detail) + { + var subject = string.Format(EventSubjects.AuthError, serverId); + var msg = new AuthErrorEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = detail.OccurredAt, + Server = _server?.BuildEventServerInfo() ?? new EventServerInfo { Id = serverId }, + Client = new EventClientInfo + { + Id = detail.ClientId, + Host = detail.RemoteAddress, + Account = detail.AccountName, + User = detail.UserName, + }, + Reason = detail.Reason, + }; + + Interlocked.Increment(ref _authErrorEventCount); + Enqueue(new PublishMessage { Subject = subject, Body = msg }); + } + + /// + /// Publishes a client connect advisory to $SYS.ACCOUNT.{account}.CONNECT. + /// Go reference: events.go postConnectEvent / sendConnect. + /// + public void SendConnectEvent(string serverId, ConnectEventDetail detail) + { + var accountName = detail.AccountName ?? "$G"; + var subject = string.Format(EventSubjects.ConnectEvent, accountName); + var msg = new ConnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = detail.ConnectedAt, + Server = _server?.BuildEventServerInfo() ?? new EventServerInfo { Id = serverId }, + Client = new EventClientInfo + { + Id = detail.ClientId, + Host = detail.RemoteAddress, + Account = detail.AccountName, + User = detail.UserName, + Start = detail.ConnectedAt, + }, + }; + + Enqueue(new PublishMessage { Subject = subject, Body = msg }); + } + + /// + /// Publishes a client disconnect advisory to $SYS.ACCOUNT.{account}.DISCONNECT. + /// Go reference: events.go postDisconnectEvent / sendDisconnect. + /// + public void SendDisconnectEvent(string serverId, DisconnectEventDetail detail) + { + var accountName = detail.AccountName ?? "$G"; + var subject = string.Format(EventSubjects.DisconnectEvent, accountName); + var msg = new DisconnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = detail.DisconnectedAt, + Server = _server?.BuildEventServerInfo() ?? new EventServerInfo { Id = serverId }, + Client = new EventClientInfo + { + Id = detail.ClientId, + Host = detail.RemoteAddress, + Account = detail.AccountName, + Stop = detail.DisconnectedAt, + }, + Reason = detail.Reason, + }; + + Enqueue(new PublishMessage { Subject = subject, Body = msg }); + } + /// /// Enqueue an internal message for publishing through the send loop. /// diff --git a/tests/NATS.Server.Tests/Events/AuthErrorEventTests.cs b/tests/NATS.Server.Tests/Events/AuthErrorEventTests.cs new file mode 100644 index 0000000..d7c5f40 --- /dev/null +++ b/tests/NATS.Server.Tests/Events/AuthErrorEventTests.cs @@ -0,0 +1,294 @@ +// Port of Go server/events_test.go — auth error advisory publication tests. +// Go reference: golang/nats-server/server/events.go:2631 sendAuthErrorEvent. +// +// Tests cover: SendAuthErrorEvent counter, enqueue behaviour, record field +// preservation, SendConnectEvent, SendDisconnectEvent, and the supporting +// detail record types AuthErrorDetail, ConnectEventDetail, DisconnectEventDetail. + +using System.Text.Json; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server; +using NATS.Server.Events; + +namespace NATS.Server.Tests.Events; + +/// +/// Tests for , +/// , +/// , and the three +/// companion detail record types. +/// Go reference: events_test.go TestSystemAccountDisconnectBadLogin, +/// TestSystemAccountNewConnection. +/// +public class AuthErrorEventTests : IAsyncLifetime +{ + private NatsServer _server = null!; + private int _port; + + public async Task InitializeAsync() + { + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance); + _ = _server.StartAsync(CancellationToken.None); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() + { + await _server.ShutdownAsync(); + _server.Dispose(); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } + + // ======================================================================== + // AuthErrorEventCount + // Go reference: events.go:2631 sendAuthErrorEvent — counter per advisory + // ======================================================================== + + /// + /// AuthErrorEventCount starts at zero before any advisories are sent. + /// Go reference: events_test.go TestSystemAccountDisconnectBadLogin. + /// + [Fact] + public void AuthErrorEventCount_StartsAtZero() + { + // Go reference: events_test.go TestSystemAccountDisconnectBadLogin — no events at startup. + var es = _server.EventSystem!; + es.AuthErrorEventCount.ShouldBe(0L); + } + + /// + /// Calling SendAuthErrorEvent once increments the counter to 1. + /// Go reference: events.go:2631 sendAuthErrorEvent — each call is one advisory. + /// + [Fact] + public void SendAuthErrorEvent_IncrementsCounter() + { + // Go reference: events_test.go TestSystemAccountDisconnectBadLogin. + var es = _server.EventSystem!; + var detail = new AuthErrorDetail( + ClientId: 42, + RemoteAddress: "127.0.0.1:5000", + AccountName: "$G", + UserName: "alice", + Reason: "Authorization Violation", + OccurredAt: DateTime.UtcNow); + + es.SendAuthErrorEvent(_server.ServerId, detail); + + es.AuthErrorEventCount.ShouldBe(1L); + } + + /// + /// Each call to SendAuthErrorEvent enqueues a message (counter grows by one per call). + /// Go reference: events.go:2687 sendInternalMsg — advisory is always enqueued. + /// + [Fact] + public void SendAuthErrorEvent_EnqueuesMessage() + { + // Go reference: events.go sendAuthErrorEvent publishes via sendInternalMsg. + var es = _server.EventSystem!; + var detail = new AuthErrorDetail( + ClientId: 7, + RemoteAddress: "10.0.0.1:4222", + AccountName: null, + UserName: null, + Reason: "Authentication Timeout", + OccurredAt: DateTime.UtcNow); + + var before = es.AuthErrorEventCount; + es.SendAuthErrorEvent(_server.ServerId, detail); + var after = es.AuthErrorEventCount; + + // The counter increment is the observable side-effect of the enqueue path. + (after - before).ShouldBe(1L); + } + + /// + /// Sending multiple auth error events increments the counter for each. + /// Go reference: events.go:2631 sendAuthErrorEvent — cumulative count. + /// + [Fact] + public void AuthErrorEventCount_MultipleSends_Incremented() + { + // Go reference: events_test.go TestSystemAccountDisconnectBadLogin. + var es = _server.EventSystem!; + var detail = new AuthErrorDetail( + ClientId: 1, + RemoteAddress: "192.168.1.1:9999", + AccountName: "myacc", + UserName: "bob", + Reason: "Bad credentials", + OccurredAt: DateTime.UtcNow); + + var before = es.AuthErrorEventCount; + + const int count = 5; + for (var i = 0; i < count; i++) + es.SendAuthErrorEvent(_server.ServerId, detail); + + (es.AuthErrorEventCount - before).ShouldBe(count); + } + + // ======================================================================== + // SendConnectEvent + // Go reference: events.go postConnectEvent / sendConnect + // ======================================================================== + + /// + /// SendConnectEvent enqueues a message without throwing. + /// Go reference: events.go postConnectEvent — advisory fired on client connect. + /// + [Fact] + public void SendConnectEvent_EnqueuesMessage() + { + // Go reference: events_test.go TestSystemAccountNewConnection. + var es = _server.EventSystem!; + var detail = new ConnectEventDetail( + ClientId: 10, + RemoteAddress: "127.0.0.1:6000", + AccountName: "$G", + UserName: "user1", + ConnectedAt: DateTime.UtcNow); + + var ex = Record.Exception(() => es.SendConnectEvent(_server.ServerId, detail)); + ex.ShouldBeNull(); + } + + // ======================================================================== + // SendDisconnectEvent + // Go reference: events.go postDisconnectEvent / sendDisconnect + // ======================================================================== + + /// + /// SendDisconnectEvent enqueues a message without throwing. + /// Go reference: events.go postDisconnectEvent — advisory fired on client disconnect. + /// + [Fact] + public void SendDisconnectEvent_EnqueuesMessage() + { + // Go reference: events_test.go TestSystemAccountNewConnection (disconnect part). + var es = _server.EventSystem!; + var detail = new DisconnectEventDetail( + ClientId: 20, + RemoteAddress: "127.0.0.1:7000", + AccountName: "$G", + Reason: "Client Closed", + DisconnectedAt: DateTime.UtcNow); + + var ex = Record.Exception(() => es.SendDisconnectEvent(_server.ServerId, detail)); + ex.ShouldBeNull(); + } + + // ======================================================================== + // AuthErrorDetail record + // ======================================================================== + + /// + /// AuthErrorDetail preserves all fields passed to its constructor. + /// Go reference: events.go:2631 — all client fields captured in the advisory. + /// + [Fact] + public void AuthErrorDetail_PreservesAllFields() + { + // Go reference: events_test.go TestSystemAccountDisconnectBadLogin. + var now = DateTime.UtcNow; + var detail = new AuthErrorDetail( + ClientId: 99, + RemoteAddress: "10.0.0.2:1234", + AccountName: "test-account", + UserName: "testuser", + Reason: "Authorization Violation", + OccurredAt: now); + + detail.ClientId.ShouldBe(99UL); + detail.RemoteAddress.ShouldBe("10.0.0.2:1234"); + detail.AccountName.ShouldBe("test-account"); + detail.UserName.ShouldBe("testuser"); + detail.Reason.ShouldBe("Authorization Violation"); + detail.OccurredAt.ShouldBe(now); + } + + /// + /// AuthErrorDetail accepts a non-empty Reason (the key advisory field). + /// Go reference: events.go:2631 sendAuthErrorEvent — reason is always set. + /// + [Fact] + public void AuthErrorDetail_ReasonRequired() + { + // Go reference: events_test.go TestSystemAccountDisconnectBadLogin — reason distinguishes error types. + var detail = new AuthErrorDetail( + ClientId: 1, + RemoteAddress: "127.0.0.1:0", + AccountName: null, + UserName: null, + Reason: "Authentication Timeout", + OccurredAt: DateTime.UtcNow); + + detail.Reason.ShouldNotBeNullOrEmpty(); + detail.Reason.ShouldBe("Authentication Timeout"); + } + + // ======================================================================== + // ConnectEventDetail record + // ======================================================================== + + /// + /// ConnectEventDetail preserves all constructor fields. + /// Go reference: events.go postConnectEvent — all fields captured on connect. + /// + [Fact] + public void ConnectEventDetail_PreservesFields() + { + // Go reference: events_test.go TestSystemAccountNewConnection. + var connectedAt = new DateTime(2026, 2, 25, 10, 0, 0, DateTimeKind.Utc); + var detail = new ConnectEventDetail( + ClientId: 55, + RemoteAddress: "192.168.0.5:8080", + AccountName: "prod-account", + UserName: "svc-user", + ConnectedAt: connectedAt); + + detail.ClientId.ShouldBe(55UL); + detail.RemoteAddress.ShouldBe("192.168.0.5:8080"); + detail.AccountName.ShouldBe("prod-account"); + detail.UserName.ShouldBe("svc-user"); + detail.ConnectedAt.ShouldBe(connectedAt); + } + + // ======================================================================== + // DisconnectEventDetail record + // ======================================================================== + + /// + /// DisconnectEventDetail preserves all constructor fields. + /// Go reference: events.go postDisconnectEvent — all fields captured on disconnect. + /// + [Fact] + public void DisconnectEventDetail_PreservesFields() + { + // Go reference: events_test.go TestSystemAccountNewConnection (disconnect part). + var disconnectedAt = new DateTime(2026, 2, 25, 11, 0, 0, DateTimeKind.Utc); + var detail = new DisconnectEventDetail( + ClientId: 77, + RemoteAddress: "172.16.0.3:3000", + AccountName: "staging-account", + Reason: "Slow Consumer", + DisconnectedAt: disconnectedAt); + + detail.ClientId.ShouldBe(77UL); + detail.RemoteAddress.ShouldBe("172.16.0.3:3000"); + detail.AccountName.ShouldBe("staging-account"); + detail.Reason.ShouldBe("Slow Consumer"); + detail.DisconnectedAt.ShouldBe(disconnectedAt); + } +}