From 0c4bca90730025f7ebb6343911732a925646916e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:22:58 -0500 Subject: [PATCH] feat: add InternalClient class for socketless internal messaging --- src/NATS.Server/InternalClient.cs | 59 +++++++++++++++++++ .../NATS.Server.Tests/InternalClientTests.cs | 56 ++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 src/NATS.Server/InternalClient.cs diff --git a/src/NATS.Server/InternalClient.cs b/src/NATS.Server/InternalClient.cs new file mode 100644 index 0000000..61532a9 --- /dev/null +++ b/src/NATS.Server/InternalClient.cs @@ -0,0 +1,59 @@ +using NATS.Server.Auth; +using NATS.Server.Protocol; +using NATS.Server.Subscriptions; + +namespace NATS.Server; + +/// +/// Lightweight socketless client for internal messaging (SYSTEM, ACCOUNT, JETSTREAM). +/// Maps to Go's internal client created by createInternalClient() in server.go:1910-1936. +/// No network I/O — messages are delivered via callback. +/// +public sealed class InternalClient : INatsClient +{ + public ulong Id { get; } + public ClientKind Kind { get; } + public bool IsInternal => Kind.IsInternal(); + public Account? Account { get; } + public ClientOptions? ClientOpts => null; + public ClientPermissions? Permissions => null; + + /// + /// Callback invoked when a message is delivered to this internal client. + /// Set by the event system or account import infrastructure. + /// + public Action, ReadOnlyMemory>? MessageCallback { get; set; } + + private readonly Dictionary _subs = new(StringComparer.Ordinal); + + public InternalClient(ulong id, ClientKind kind, Account account) + { + if (!kind.IsInternal()) + throw new ArgumentException($"InternalClient requires an internal ClientKind, got {kind}", nameof(kind)); + + Id = id; + Kind = kind; + Account = account; + } + + public void SendMessage(string subject, string sid, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + MessageCallback?.Invoke(subject, sid, replyTo, headers, payload); + } + + public bool QueueOutbound(ReadOnlyMemory data) => true; // no-op for internal clients + + public void RemoveSubscription(string sid) + { + if (_subs.Remove(sid)) + Account?.DecrementSubscriptions(); + } + + public void AddSubscription(Subscription sub) + { + _subs[sub.Sid] = sub; + } + + public IReadOnlyDictionary Subscriptions => _subs; +} diff --git a/tests/NATS.Server.Tests/InternalClientTests.cs b/tests/NATS.Server.Tests/InternalClientTests.cs index bbfa024..6a068c4 100644 --- a/tests/NATS.Server.Tests/InternalClientTests.cs +++ b/tests/NATS.Server.Tests/InternalClientTests.cs @@ -1,3 +1,5 @@ +using NATS.Server.Auth; + namespace NATS.Server.Tests; public class InternalClientTests @@ -26,4 +28,58 @@ public class InternalClientTests { typeof(NatsClient).GetProperty("Kind")!.PropertyType.ShouldBe(typeof(ClientKind)); } + + [Fact] + public void InternalClient_system_kind() + { + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + client.Kind.ShouldBe(ClientKind.System); + client.IsInternal.ShouldBeTrue(); + client.Id.ShouldBe(1UL); + client.Account.ShouldBe(account); + } + + [Fact] + public void InternalClient_account_kind() + { + var account = new Account("myaccount"); + var client = new InternalClient(2, ClientKind.Account, account); + client.Kind.ShouldBe(ClientKind.Account); + client.IsInternal.ShouldBeTrue(); + } + + [Fact] + public void InternalClient_rejects_non_internal_kind() + { + var account = new Account("test"); + Should.Throw(() => new InternalClient(1, ClientKind.Client, account)); + } + + [Fact] + public void InternalClient_SendMessage_invokes_callback() + { + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + string? capturedSubject = null; + string? capturedSid = null; + client.MessageCallback = (subject, sid, replyTo, headers, payload) => + { + capturedSubject = subject; + capturedSid = sid; + }; + + client.SendMessage("test.subject", "1", null, ReadOnlyMemory.Empty, ReadOnlyMemory.Empty); + + capturedSubject.ShouldBe("test.subject"); + capturedSid.ShouldBe("1"); + } + + [Fact] + public void InternalClient_QueueOutbound_returns_true_noop() + { + var account = new Account("$SYS"); + var client = new InternalClient(1, ClientKind.System, account); + client.QueueOutbound(ReadOnlyMemory.Empty).ShouldBeTrue(); + } }