diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs new file mode 100644 index 0000000..96f84d8 --- /dev/null +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -0,0 +1,238 @@ +using System.Collections.Concurrent; +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using NATS.Server.Auth; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Events; + +/// +/// Internal publish message queued for the send loop. +/// +public sealed class PublishMessage +{ + public InternalClient? Client { get; init; } + public required string Subject { get; init; } + public string? Reply { get; init; } + public byte[]? Headers { get; init; } + public object? Body { get; init; } + public bool Echo { get; init; } + public bool IsLast { get; init; } +} + +/// +/// Internal received message queued for the receive loop. +/// +public sealed class InternalSystemMessage +{ + public required Subscription? Sub { get; init; } + public required INatsClient? Client { get; init; } + public required Account? Account { get; init; } + public required string Subject { get; init; } + public required string? Reply { get; init; } + public required ReadOnlyMemory Headers { get; init; } + public required ReadOnlyMemory Message { get; init; } + public required SystemMessageHandler Callback { get; init; } +} + +/// +/// Manages the server's internal event system with Channel-based send/receive loops. +/// Maps to Go's internal struct in events.go:124-147 and the goroutines +/// internalSendLoop (events.go:495) and internalReceiveLoop (events.go:476). +/// +public sealed class InternalEventSystem : IAsyncDisposable +{ + private readonly ILogger _logger; + private readonly Channel _sendQueue; + private readonly Channel _receiveQueue; + private readonly Channel _receiveQueuePings; + private readonly CancellationTokenSource _cts = new(); + + private Task? _sendLoop; + private Task? _receiveLoop; + private Task? _receiveLoopPings; + private NatsServer? _server; + + private ulong _sequence; + private int _subscriptionId; + + public Account SystemAccount { get; } + public InternalClient SystemClient { get; } + public string ServerHash { get; } + + public InternalEventSystem(Account systemAccount, InternalClient systemClient, string serverName, ILogger logger) + { + _logger = logger; + SystemAccount = systemAccount; + SystemClient = systemClient; + + // Hash server name for inbox routing (matches Go's shash) + ServerHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(serverName)))[..8].ToLowerInvariant(); + + _sendQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _receiveQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + _receiveQueuePings = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + } + + public void Start(NatsServer server) + { + _server = server; + var ct = _cts.Token; + _sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct); + _receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct); + _receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct); + } + + /// + /// Creates a system subscription in the system account's SubList. + /// Maps to Go's sysSubscribe in events.go:2796. + /// + public Subscription SysSubscribe(string subject, SystemMessageHandler callback) + { + var sid = Interlocked.Increment(ref _subscriptionId).ToString(); + var sub = new Subscription + { + Subject = subject, + Sid = sid, + Client = SystemClient, + }; + + // Wrap callback in noInlineCallback pattern: enqueue to receive loop + SystemClient.MessageCallback = (subj, s, reply, hdr, msg) => + { + _receiveQueue.Writer.TryWrite(new InternalSystemMessage + { + Sub = sub, + Client = SystemClient, + Account = SystemAccount, + Subject = subj, + Reply = reply, + Headers = hdr, + Message = msg, + Callback = callback, + }); + }; + + SystemAccount.SubList.Insert(sub); + return sub; + } + + /// + /// Enqueue an internal message for publishing through the send loop. + /// + public void Enqueue(PublishMessage message) + { + _sendQueue.Writer.TryWrite(message); + } + + /// + /// The send loop: serializes messages and delivers them via the server's routing. + /// Maps to Go's internalSendLoop in events.go:495-668. + /// + private async Task InternalSendLoopAsync(CancellationToken ct) + { + try + { + await foreach (var pm in _sendQueue.Reader.ReadAllAsync(ct)) + { + try + { + var seq = Interlocked.Increment(ref _sequence); + + // Serialize body to JSON + byte[] payload; + if (pm.Body is byte[] raw) + { + payload = raw; + } + else if (pm.Body != null) + { + // Try source-generated context first, fall back to reflection-based for unknown types + var bodyType = pm.Body.GetType(); + var typeInfo = EventJsonContext.Default.GetTypeInfo(bodyType); + payload = typeInfo != null + ? JsonSerializer.SerializeToUtf8Bytes(pm.Body, typeInfo) + : JsonSerializer.SerializeToUtf8Bytes(pm.Body, bodyType); + } + else + { + payload = []; + } + + // Deliver via the system account's SubList matching + var result = SystemAccount.SubList.Match(pm.Subject); + + foreach (var sub in result.PlainSubs) + { + sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply, + pm.Headers ?? ReadOnlyMemory.Empty, + payload); + } + + foreach (var queueGroup in result.QueueSubs) + { + if (queueGroup.Length == 0) continue; + var sub = queueGroup[0]; // Simple pick for internal + sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply, + pm.Headers ?? ReadOnlyMemory.Empty, + payload); + } + + if (pm.IsLast) + break; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error in internal send loop processing message on {Subject}", pm.Subject); + } + } + } + catch (OperationCanceledException) + { + // Normal shutdown + } + } + + /// + /// The receive loop: dispatches callbacks for internally-received messages. + /// Maps to Go's internalReceiveLoop in events.go:476-491. + /// + private async Task InternalReceiveLoopAsync(Channel queue, CancellationToken ct) + { + try + { + await foreach (var msg in queue.Reader.ReadAllAsync(ct)) + { + try + { + msg.Callback(msg.Sub, msg.Client, msg.Account, msg.Subject, msg.Reply, msg.Headers, msg.Message); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error in internal receive loop processing {Subject}", msg.Subject); + } + } + } + catch (OperationCanceledException) + { + // Normal shutdown + } + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _sendQueue.Writer.TryComplete(); + _receiveQueue.Writer.TryComplete(); + _receiveQueuePings.Writer.TryComplete(); + + if (_sendLoop != null) await _sendLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + if (_receiveLoop != null) await _receiveLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + if (_receiveLoopPings != null) await _receiveLoopPings.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + + _cts.Dispose(); + } +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 89c3bd4..4c44194 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Logging; using NATS.NKeys; using NATS.Server.Auth; using NATS.Server.Configuration; +using NATS.Server.Events; using NATS.Server.Monitoring; using NATS.Server.Protocol; using NATS.Server.Subscriptions; @@ -35,6 +36,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private string? _configDigest; private readonly Account _globalAccount; private readonly Account _systemAccount; + private InternalEventSystem? _eventSystem; private readonly SslServerAuthenticationOptions? _sslOptions; private readonly TlsRateLimiter? _tlsRateLimiter; private readonly SubjectTransform[] _subjectTransforms; @@ -70,6 +72,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public int Port => _options.Port; public Account SystemAccount => _systemAccount; public string ServerNKey { get; } + public InternalEventSystem? EventSystem => _eventSystem; public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0; public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0; public Action? ReOpenLogFile { get; set; } @@ -90,6 +93,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _logger.LogInformation("Initiating Shutdown..."); + // Dispose event system before tearing down clients + if (_eventSystem != null) + await _eventSystem.DisposeAsync(); + // Signal all internal loops to stop await _quitCts.CancelAsync(); @@ -265,6 +272,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _systemAccount = new Account("$SYS"); _accounts["$SYS"] = _systemAccount; + // Create system internal client and event system + var sysClientId = Interlocked.Increment(ref _nextClientId); + var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount); + _eventSystem = new InternalEventSystem( + _systemAccount, sysClient, + options.ServerName ?? $"nats-dotnet-{Environment.MachineName}", + _loggerFactory.CreateLogger()); + // Generate Ed25519 server NKey identity using var serverKeyPair = KeyPair.CreatePair(PrefixByte.Server); ServerNKey = serverKeyPair.GetPublicKey(); @@ -371,6 +386,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _listeningStarted.TrySetResult(); + _eventSystem?.Start(this); + _logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port); // Warn about stub features @@ -686,6 +703,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable }); } + public void SendInternalMsg(string subject, string? reply, object? msg) + { + _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg }); + } + + public void SendInternalAccountMsg(Account account, string subject, object? msg) + { + _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg }); + } + public void RemoveClient(NatsClient client) { _clients.TryRemove(client.Id, out _); diff --git a/tests/NATS.Server.Tests/EventSystemTests.cs b/tests/NATS.Server.Tests/EventSystemTests.cs index ab6e501..4924bc8 100644 --- a/tests/NATS.Server.Tests/EventSystemTests.cs +++ b/tests/NATS.Server.Tests/EventSystemTests.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using Microsoft.Extensions.Logging.Abstractions; using NATS.Server.Events; namespace NATS.Server.Tests; @@ -65,4 +66,56 @@ public class EventSystemTests json.ShouldContain("\"connections\":10"); json.ShouldContain("\"in_msgs\":5000"); } + + [Fact] + public async Task InternalEventSystem_start_and_stop_lifecycle() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var eventSystem = server.EventSystem; + eventSystem.ShouldNotBeNull(); + eventSystem.SystemClient.ShouldNotBeNull(); + eventSystem.SystemClient.Kind.ShouldBe(ClientKind.System); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task SendInternalMsg_delivers_to_system_subscriber() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("test.subject", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + server.SendInternalMsg("test.subject", null, new { Value = "hello" }); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldBe("test.subject"); + + await server.ShutdownAsync(); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } }