feat: add InternalClient class for socketless internal messaging

This commit is contained in:
Joseph Doherty
2026-02-23 05:22:58 -05:00
parent 0e7db5615e
commit 0c4bca9073
2 changed files with 115 additions and 0 deletions

View File

@@ -0,0 +1,59 @@
using NATS.Server.Auth;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
namespace NATS.Server;
/// <summary>
/// Lightweight socketless client for internal messaging (SYSTEM, ACCOUNT, JETSTREAM).
/// Maps to Go's internal client created by createInternalClient() in server.go:1910-1936.
/// No network I/O — messages are delivered via callback.
/// </summary>
public sealed class InternalClient : INatsClient
{
public ulong Id { get; }
public ClientKind Kind { get; }
public bool IsInternal => Kind.IsInternal();
public Account? Account { get; }
public ClientOptions? ClientOpts => null;
public ClientPermissions? Permissions => null;
/// <summary>
/// Callback invoked when a message is delivered to this internal client.
/// Set by the event system or account import infrastructure.
/// </summary>
public Action<string, string, string?, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>>? MessageCallback { get; set; }
private readonly Dictionary<string, Subscription> _subs = new(StringComparer.Ordinal);
public InternalClient(ulong id, ClientKind kind, Account account)
{
if (!kind.IsInternal())
throw new ArgumentException($"InternalClient requires an internal ClientKind, got {kind}", nameof(kind));
Id = id;
Kind = kind;
Account = account;
}
public void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
MessageCallback?.Invoke(subject, sid, replyTo, headers, payload);
}
public bool QueueOutbound(ReadOnlyMemory<byte> data) => true; // no-op for internal clients
public void RemoveSubscription(string sid)
{
if (_subs.Remove(sid))
Account?.DecrementSubscriptions();
}
public void AddSubscription(Subscription sub)
{
_subs[sub.Sid] = sub;
}
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
}

View File

@@ -1,3 +1,5 @@
using NATS.Server.Auth;
namespace NATS.Server.Tests;
public class InternalClientTests
@@ -26,4 +28,58 @@ public class InternalClientTests
{
typeof(NatsClient).GetProperty("Kind")!.PropertyType.ShouldBe(typeof(ClientKind));
}
[Fact]
public void InternalClient_system_kind()
{
var account = new Account("$SYS");
var client = new InternalClient(1, ClientKind.System, account);
client.Kind.ShouldBe(ClientKind.System);
client.IsInternal.ShouldBeTrue();
client.Id.ShouldBe(1UL);
client.Account.ShouldBe(account);
}
[Fact]
public void InternalClient_account_kind()
{
var account = new Account("myaccount");
var client = new InternalClient(2, ClientKind.Account, account);
client.Kind.ShouldBe(ClientKind.Account);
client.IsInternal.ShouldBeTrue();
}
[Fact]
public void InternalClient_rejects_non_internal_kind()
{
var account = new Account("test");
Should.Throw<ArgumentException>(() => new InternalClient(1, ClientKind.Client, account));
}
[Fact]
public void InternalClient_SendMessage_invokes_callback()
{
var account = new Account("$SYS");
var client = new InternalClient(1, ClientKind.System, account);
string? capturedSubject = null;
string? capturedSid = null;
client.MessageCallback = (subject, sid, replyTo, headers, payload) =>
{
capturedSubject = subject;
capturedSid = sid;
};
client.SendMessage("test.subject", "1", null, ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
capturedSubject.ShouldBe("test.subject");
capturedSid.ShouldBe("1");
}
[Fact]
public void InternalClient_QueueOutbound_returns_true_noop()
{
var account = new Account("$SYS");
var client = new InternalClient(1, ClientKind.System, account);
client.QueueOutbound(ReadOnlyMemory<byte>.Empty).ShouldBeTrue();
}
}