diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index b5707f8..d0c0964 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -127,6 +127,11 @@ public sealed class InternalEventSystem : IAsyncDisposable return sub; } + /// + /// Returns the next monotonically increasing sequence number for event ordering. + /// + public ulong NextSequence() => Interlocked.Increment(ref _sequence); + /// /// Enqueue an internal message for publishing through the send loop. /// diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 03029e5..4c19aaf 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -19,6 +19,8 @@ public interface IMessageRouter void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender); void RemoveClient(NatsClient client); + void PublishConnectEvent(NatsClient client); + void PublishDisconnectEvent(NatsClient client); } public interface ISubListAccess @@ -445,6 +447,9 @@ public sealed class NatsClient : INatsClient, IDisposable _flags.SetFlag(ClientFlags.ConnectProcessFinished); _logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name); + // Publish connect advisory to the system event bus + Router?.PublishConnectEvent(this); + // Start auth expiry timer if needed if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry) { diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 4c44194..9239f36 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -93,9 +93,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _logger.LogInformation("Initiating Shutdown..."); - // Dispose event system before tearing down clients + // Publish shutdown advisory before tearing down the event system if (_eventSystem != null) + { + var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId); + _eventSystem.Enqueue(new PublishMessage + { + Subject = shutdownSubject, + Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" }, + IsLast = true, + }); + // Give the send loop time to process the shutdown event + await Task.Delay(100); await _eventSystem.DisposeAsync(); + } // Signal all internal loops to stop await _quitCts.CancelAsync(); @@ -713,8 +724,92 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg }); } + /// + /// Builds an EventServerInfo block for embedding in system event messages. + /// Maps to Go's serverInfo() helper used in events.go advisory publishing. + /// + public EventServerInfo BuildEventServerInfo() + { + var seq = _eventSystem?.NextSequence() ?? 0; + return new EventServerInfo + { + Name = _serverInfo.ServerName, + Host = _options.Host, + Id = _serverInfo.ServerId, + Version = NatsProtocol.Version, + Seq = seq, + }; + } + + private static EventClientInfo BuildEventClientInfo(NatsClient client) + { + return new EventClientInfo + { + Id = client.Id, + Host = client.RemoteIp, + Account = client.Account?.Name, + Name = client.ClientOpts?.Name, + Lang = client.ClientOpts?.Lang, + Version = client.ClientOpts?.Version, + Start = client.StartTime, + }; + } + + /// + /// Publishes a $SYS.ACCOUNT.{account}.CONNECT advisory when a client + /// completes authentication. Maps to Go's sendConnectEvent in events.go. + /// + public void PublishConnectEvent(NatsClient client) + { + if (_eventSystem == null) return; + var accountName = client.Account?.Name ?? Account.GlobalAccountName; + var subject = string.Format(EventSubjects.ConnectEvent, accountName); + var evt = new ConnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = DateTime.UtcNow, + Server = BuildEventServerInfo(), + Client = BuildEventClientInfo(client), + }; + SendInternalMsg(subject, null, evt); + } + + /// + /// Publishes a $SYS.ACCOUNT.{account}.DISCONNECT advisory when a client + /// disconnects. Maps to Go's sendDisconnectEvent in events.go. + /// + public void PublishDisconnectEvent(NatsClient client) + { + if (_eventSystem == null) return; + var accountName = client.Account?.Name ?? Account.GlobalAccountName; + var subject = string.Format(EventSubjects.DisconnectEvent, accountName); + var evt = new DisconnectEventMsg + { + Id = Guid.NewGuid().ToString("N"), + Time = DateTime.UtcNow, + Server = BuildEventServerInfo(), + Client = BuildEventClientInfo(client), + Sent = new DataStats + { + Msgs = Interlocked.Read(ref client.OutMsgs), + Bytes = Interlocked.Read(ref client.OutBytes), + }, + Received = new DataStats + { + Msgs = Interlocked.Read(ref client.InMsgs), + Bytes = Interlocked.Read(ref client.InBytes), + }, + Reason = client.CloseReason.ToReasonString(), + }; + SendInternalMsg(subject, null, evt); + } + public void RemoveClient(NatsClient client) { + // Publish disconnect advisory before removing client state + if (client.ConnectReceived) + PublishDisconnectEvent(client); + _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); diff --git a/tests/NATS.Server.Tests/SystemEventsTests.cs b/tests/NATS.Server.Tests/SystemEventsTests.cs new file mode 100644 index 0000000..7a11bdd --- /dev/null +++ b/tests/NATS.Server.Tests/SystemEventsTests.cs @@ -0,0 +1,111 @@ +using System.Text.Json; +using NATS.Server; +using NATS.Server.Events; +using Microsoft.Extensions.Logging.Abstractions; + +namespace NATS.Server.Tests; + +public class SystemEventsTests +{ + [Fact] + public async Task Server_publishes_connect_event_on_client_auth() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.CONNECT", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Connect a real client + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port); + + // Read INFO + var buf = new byte[4096]; + await sock.ReceiveAsync(buf); + + // Send CONNECT + var connect = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n"); + await sock.SendAsync(connect); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldStartWith("$SYS.ACCOUNT."); + result.ShouldEndWith(".CONNECT"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Server_publishes_disconnect_event_on_client_close() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.DISCONNECT", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Connect and then disconnect + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port); + var buf = new byte[4096]; + await sock.ReceiveAsync(buf); + await sock.SendAsync(System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n")); + await Task.Delay(100); + sock.Shutdown(System.Net.Sockets.SocketShutdown.Both); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldStartWith("$SYS.ACCOUNT."); + result.ShouldEndWith(".DISCONNECT"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Server_publishes_shutdown_event() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.SERVER.*.SHUTDOWN", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + await server.ShutdownAsync(); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldContain(".SHUTDOWN"); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +}