feat: wire system event publishing for connect, disconnect, and shutdown
This commit is contained in:
@@ -127,6 +127,11 @@ public sealed class InternalEventSystem : IAsyncDisposable
|
|||||||
return sub;
|
return sub;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns the next monotonically increasing sequence number for event ordering.
|
||||||
|
/// </summary>
|
||||||
|
public ulong NextSequence() => Interlocked.Increment(ref _sequence);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Enqueue an internal message for publishing through the send loop.
|
/// Enqueue an internal message for publishing through the send loop.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -19,6 +19,8 @@ public interface IMessageRouter
|
|||||||
void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
||||||
ReadOnlyMemory<byte> payload, NatsClient sender);
|
ReadOnlyMemory<byte> payload, NatsClient sender);
|
||||||
void RemoveClient(NatsClient client);
|
void RemoveClient(NatsClient client);
|
||||||
|
void PublishConnectEvent(NatsClient client);
|
||||||
|
void PublishDisconnectEvent(NatsClient client);
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface ISubListAccess
|
public interface ISubListAccess
|
||||||
@@ -445,6 +447,9 @@ public sealed class NatsClient : INatsClient, IDisposable
|
|||||||
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
|
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
|
||||||
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
|
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
|
||||||
|
|
||||||
|
// Publish connect advisory to the system event bus
|
||||||
|
Router?.PublishConnectEvent(this);
|
||||||
|
|
||||||
// Start auth expiry timer if needed
|
// Start auth expiry timer if needed
|
||||||
if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry)
|
if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -93,9 +93,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
|
|
||||||
_logger.LogInformation("Initiating Shutdown...");
|
_logger.LogInformation("Initiating Shutdown...");
|
||||||
|
|
||||||
// Dispose event system before tearing down clients
|
// Publish shutdown advisory before tearing down the event system
|
||||||
if (_eventSystem != null)
|
if (_eventSystem != null)
|
||||||
|
{
|
||||||
|
var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId);
|
||||||
|
_eventSystem.Enqueue(new PublishMessage
|
||||||
|
{
|
||||||
|
Subject = shutdownSubject,
|
||||||
|
Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" },
|
||||||
|
IsLast = true,
|
||||||
|
});
|
||||||
|
// Give the send loop time to process the shutdown event
|
||||||
|
await Task.Delay(100);
|
||||||
await _eventSystem.DisposeAsync();
|
await _eventSystem.DisposeAsync();
|
||||||
|
}
|
||||||
|
|
||||||
// Signal all internal loops to stop
|
// Signal all internal loops to stop
|
||||||
await _quitCts.CancelAsync();
|
await _quitCts.CancelAsync();
|
||||||
@@ -713,8 +724,92 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
|
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builds an EventServerInfo block for embedding in system event messages.
|
||||||
|
/// Maps to Go's serverInfo() helper used in events.go advisory publishing.
|
||||||
|
/// </summary>
|
||||||
|
public EventServerInfo BuildEventServerInfo()
|
||||||
|
{
|
||||||
|
var seq = _eventSystem?.NextSequence() ?? 0;
|
||||||
|
return new EventServerInfo
|
||||||
|
{
|
||||||
|
Name = _serverInfo.ServerName,
|
||||||
|
Host = _options.Host,
|
||||||
|
Id = _serverInfo.ServerId,
|
||||||
|
Version = NatsProtocol.Version,
|
||||||
|
Seq = seq,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static EventClientInfo BuildEventClientInfo(NatsClient client)
|
||||||
|
{
|
||||||
|
return new EventClientInfo
|
||||||
|
{
|
||||||
|
Id = client.Id,
|
||||||
|
Host = client.RemoteIp,
|
||||||
|
Account = client.Account?.Name,
|
||||||
|
Name = client.ClientOpts?.Name,
|
||||||
|
Lang = client.ClientOpts?.Lang,
|
||||||
|
Version = client.ClientOpts?.Version,
|
||||||
|
Start = client.StartTime,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes a $SYS.ACCOUNT.{account}.CONNECT advisory when a client
|
||||||
|
/// completes authentication. Maps to Go's sendConnectEvent in events.go.
|
||||||
|
/// </summary>
|
||||||
|
public void PublishConnectEvent(NatsClient client)
|
||||||
|
{
|
||||||
|
if (_eventSystem == null) return;
|
||||||
|
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
|
||||||
|
var subject = string.Format(EventSubjects.ConnectEvent, accountName);
|
||||||
|
var evt = new ConnectEventMsg
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString("N"),
|
||||||
|
Time = DateTime.UtcNow,
|
||||||
|
Server = BuildEventServerInfo(),
|
||||||
|
Client = BuildEventClientInfo(client),
|
||||||
|
};
|
||||||
|
SendInternalMsg(subject, null, evt);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes a $SYS.ACCOUNT.{account}.DISCONNECT advisory when a client
|
||||||
|
/// disconnects. Maps to Go's sendDisconnectEvent in events.go.
|
||||||
|
/// </summary>
|
||||||
|
public void PublishDisconnectEvent(NatsClient client)
|
||||||
|
{
|
||||||
|
if (_eventSystem == null) return;
|
||||||
|
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
|
||||||
|
var subject = string.Format(EventSubjects.DisconnectEvent, accountName);
|
||||||
|
var evt = new DisconnectEventMsg
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString("N"),
|
||||||
|
Time = DateTime.UtcNow,
|
||||||
|
Server = BuildEventServerInfo(),
|
||||||
|
Client = BuildEventClientInfo(client),
|
||||||
|
Sent = new DataStats
|
||||||
|
{
|
||||||
|
Msgs = Interlocked.Read(ref client.OutMsgs),
|
||||||
|
Bytes = Interlocked.Read(ref client.OutBytes),
|
||||||
|
},
|
||||||
|
Received = new DataStats
|
||||||
|
{
|
||||||
|
Msgs = Interlocked.Read(ref client.InMsgs),
|
||||||
|
Bytes = Interlocked.Read(ref client.InBytes),
|
||||||
|
},
|
||||||
|
Reason = client.CloseReason.ToReasonString(),
|
||||||
|
};
|
||||||
|
SendInternalMsg(subject, null, evt);
|
||||||
|
}
|
||||||
|
|
||||||
public void RemoveClient(NatsClient client)
|
public void RemoveClient(NatsClient client)
|
||||||
{
|
{
|
||||||
|
// Publish disconnect advisory before removing client state
|
||||||
|
if (client.ConnectReceived)
|
||||||
|
PublishDisconnectEvent(client);
|
||||||
|
|
||||||
_clients.TryRemove(client.Id, out _);
|
_clients.TryRemove(client.Id, out _);
|
||||||
_logger.LogDebug("Removed client {ClientId}", client.Id);
|
_logger.LogDebug("Removed client {ClientId}", client.Id);
|
||||||
|
|
||||||
|
|||||||
111
tests/NATS.Server.Tests/SystemEventsTests.cs
Normal file
111
tests/NATS.Server.Tests/SystemEventsTests.cs
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
using System.Text.Json;
|
||||||
|
using NATS.Server;
|
||||||
|
using NATS.Server.Events;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class SystemEventsTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Server_publishes_connect_event_on_client_auth()
|
||||||
|
{
|
||||||
|
using var server = CreateTestServer();
|
||||||
|
_ = server.StartAsync(CancellationToken.None);
|
||||||
|
await server.WaitForReadyAsync();
|
||||||
|
|
||||||
|
var received = new TaskCompletionSource<string>();
|
||||||
|
server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.CONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
|
||||||
|
{
|
||||||
|
received.TrySetResult(subject);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Connect a real client
|
||||||
|
using var sock = new System.Net.Sockets.Socket(
|
||||||
|
System.Net.Sockets.AddressFamily.InterNetwork,
|
||||||
|
System.Net.Sockets.SocketType.Stream,
|
||||||
|
System.Net.Sockets.ProtocolType.Tcp);
|
||||||
|
await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);
|
||||||
|
|
||||||
|
// Read INFO
|
||||||
|
var buf = new byte[4096];
|
||||||
|
await sock.ReceiveAsync(buf);
|
||||||
|
|
||||||
|
// Send CONNECT
|
||||||
|
var connect = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n");
|
||||||
|
await sock.SendAsync(connect);
|
||||||
|
|
||||||
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||||
|
result.ShouldStartWith("$SYS.ACCOUNT.");
|
||||||
|
result.ShouldEndWith(".CONNECT");
|
||||||
|
|
||||||
|
await server.ShutdownAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Server_publishes_disconnect_event_on_client_close()
|
||||||
|
{
|
||||||
|
using var server = CreateTestServer();
|
||||||
|
_ = server.StartAsync(CancellationToken.None);
|
||||||
|
await server.WaitForReadyAsync();
|
||||||
|
|
||||||
|
var received = new TaskCompletionSource<string>();
|
||||||
|
server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.DISCONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
|
||||||
|
{
|
||||||
|
received.TrySetResult(subject);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Connect and then disconnect
|
||||||
|
using var sock = new System.Net.Sockets.Socket(
|
||||||
|
System.Net.Sockets.AddressFamily.InterNetwork,
|
||||||
|
System.Net.Sockets.SocketType.Stream,
|
||||||
|
System.Net.Sockets.ProtocolType.Tcp);
|
||||||
|
await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);
|
||||||
|
var buf = new byte[4096];
|
||||||
|
await sock.ReceiveAsync(buf);
|
||||||
|
await sock.SendAsync(System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||||
|
await Task.Delay(100);
|
||||||
|
sock.Shutdown(System.Net.Sockets.SocketShutdown.Both);
|
||||||
|
|
||||||
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||||
|
result.ShouldStartWith("$SYS.ACCOUNT.");
|
||||||
|
result.ShouldEndWith(".DISCONNECT");
|
||||||
|
|
||||||
|
await server.ShutdownAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Server_publishes_shutdown_event()
|
||||||
|
{
|
||||||
|
using var server = CreateTestServer();
|
||||||
|
_ = server.StartAsync(CancellationToken.None);
|
||||||
|
await server.WaitForReadyAsync();
|
||||||
|
|
||||||
|
var received = new TaskCompletionSource<string>();
|
||||||
|
server.EventSystem!.SysSubscribe("$SYS.SERVER.*.SHUTDOWN", (sub, client, acc, subject, reply, hdr, msg) =>
|
||||||
|
{
|
||||||
|
received.TrySetResult(subject);
|
||||||
|
});
|
||||||
|
|
||||||
|
await server.ShutdownAsync();
|
||||||
|
|
||||||
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||||
|
result.ShouldContain(".SHUTDOWN");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static NatsServer CreateTestServer()
|
||||||
|
{
|
||||||
|
var port = GetFreePort();
|
||||||
|
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int GetFreePort()
|
||||||
|
{
|
||||||
|
using var sock = new System.Net.Sockets.Socket(
|
||||||
|
System.Net.Sockets.AddressFamily.InterNetwork,
|
||||||
|
System.Net.Sockets.SocketType.Stream,
|
||||||
|
System.Net.Sockets.ProtocolType.Tcp);
|
||||||
|
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
|
||||||
|
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user