feat: add InternalEventSystem with Channel-based send/receive loops
This commit is contained in:
238
src/NATS.Server/Events/InternalEventSystem.cs
Normal file
238
src/NATS.Server/Events/InternalEventSystem.cs
Normal file
@@ -0,0 +1,238 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Threading.Channels;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Events;
|
||||
|
||||
/// <summary>
|
||||
/// Internal publish message queued for the send loop.
|
||||
/// </summary>
|
||||
public sealed class PublishMessage
|
||||
{
|
||||
public InternalClient? Client { get; init; }
|
||||
public required string Subject { get; init; }
|
||||
public string? Reply { get; init; }
|
||||
public byte[]? Headers { get; init; }
|
||||
public object? Body { get; init; }
|
||||
public bool Echo { get; init; }
|
||||
public bool IsLast { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Internal received message queued for the receive loop.
|
||||
/// </summary>
|
||||
public sealed class InternalSystemMessage
|
||||
{
|
||||
public required Subscription? Sub { get; init; }
|
||||
public required INatsClient? Client { get; init; }
|
||||
public required Account? Account { get; init; }
|
||||
public required string Subject { get; init; }
|
||||
public required string? Reply { get; init; }
|
||||
public required ReadOnlyMemory<byte> Headers { get; init; }
|
||||
public required ReadOnlyMemory<byte> Message { get; init; }
|
||||
public required SystemMessageHandler Callback { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Manages the server's internal event system with Channel-based send/receive loops.
|
||||
/// Maps to Go's internal struct in events.go:124-147 and the goroutines
|
||||
/// internalSendLoop (events.go:495) and internalReceiveLoop (events.go:476).
|
||||
/// </summary>
|
||||
public sealed class InternalEventSystem : IAsyncDisposable
|
||||
{
|
||||
private readonly ILogger _logger;
|
||||
private readonly Channel<PublishMessage> _sendQueue;
|
||||
private readonly Channel<InternalSystemMessage> _receiveQueue;
|
||||
private readonly Channel<InternalSystemMessage> _receiveQueuePings;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
private Task? _sendLoop;
|
||||
private Task? _receiveLoop;
|
||||
private Task? _receiveLoopPings;
|
||||
private NatsServer? _server;
|
||||
|
||||
private ulong _sequence;
|
||||
private int _subscriptionId;
|
||||
|
||||
public Account SystemAccount { get; }
|
||||
public InternalClient SystemClient { get; }
|
||||
public string ServerHash { get; }
|
||||
|
||||
public InternalEventSystem(Account systemAccount, InternalClient systemClient, string serverName, ILogger logger)
|
||||
{
|
||||
_logger = logger;
|
||||
SystemAccount = systemAccount;
|
||||
SystemClient = systemClient;
|
||||
|
||||
// Hash server name for inbox routing (matches Go's shash)
|
||||
ServerHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(serverName)))[..8].ToLowerInvariant();
|
||||
|
||||
_sendQueue = Channel.CreateUnbounded<PublishMessage>(new UnboundedChannelOptions { SingleReader = true });
|
||||
_receiveQueue = Channel.CreateUnbounded<InternalSystemMessage>(new UnboundedChannelOptions { SingleReader = true });
|
||||
_receiveQueuePings = Channel.CreateUnbounded<InternalSystemMessage>(new UnboundedChannelOptions { SingleReader = true });
|
||||
}
|
||||
|
||||
public void Start(NatsServer server)
|
||||
{
|
||||
_server = server;
|
||||
var ct = _cts.Token;
|
||||
_sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct);
|
||||
_receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct);
|
||||
_receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a system subscription in the system account's SubList.
|
||||
/// Maps to Go's sysSubscribe in events.go:2796.
|
||||
/// </summary>
|
||||
public Subscription SysSubscribe(string subject, SystemMessageHandler callback)
|
||||
{
|
||||
var sid = Interlocked.Increment(ref _subscriptionId).ToString();
|
||||
var sub = new Subscription
|
||||
{
|
||||
Subject = subject,
|
||||
Sid = sid,
|
||||
Client = SystemClient,
|
||||
};
|
||||
|
||||
// Wrap callback in noInlineCallback pattern: enqueue to receive loop
|
||||
SystemClient.MessageCallback = (subj, s, reply, hdr, msg) =>
|
||||
{
|
||||
_receiveQueue.Writer.TryWrite(new InternalSystemMessage
|
||||
{
|
||||
Sub = sub,
|
||||
Client = SystemClient,
|
||||
Account = SystemAccount,
|
||||
Subject = subj,
|
||||
Reply = reply,
|
||||
Headers = hdr,
|
||||
Message = msg,
|
||||
Callback = callback,
|
||||
});
|
||||
};
|
||||
|
||||
SystemAccount.SubList.Insert(sub);
|
||||
return sub;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Enqueue an internal message for publishing through the send loop.
|
||||
/// </summary>
|
||||
public void Enqueue(PublishMessage message)
|
||||
{
|
||||
_sendQueue.Writer.TryWrite(message);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The send loop: serializes messages and delivers them via the server's routing.
|
||||
/// Maps to Go's internalSendLoop in events.go:495-668.
|
||||
/// </summary>
|
||||
private async Task InternalSendLoopAsync(CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await foreach (var pm in _sendQueue.Reader.ReadAllAsync(ct))
|
||||
{
|
||||
try
|
||||
{
|
||||
var seq = Interlocked.Increment(ref _sequence);
|
||||
|
||||
// Serialize body to JSON
|
||||
byte[] payload;
|
||||
if (pm.Body is byte[] raw)
|
||||
{
|
||||
payload = raw;
|
||||
}
|
||||
else if (pm.Body != null)
|
||||
{
|
||||
// Try source-generated context first, fall back to reflection-based for unknown types
|
||||
var bodyType = pm.Body.GetType();
|
||||
var typeInfo = EventJsonContext.Default.GetTypeInfo(bodyType);
|
||||
payload = typeInfo != null
|
||||
? JsonSerializer.SerializeToUtf8Bytes(pm.Body, typeInfo)
|
||||
: JsonSerializer.SerializeToUtf8Bytes(pm.Body, bodyType);
|
||||
}
|
||||
else
|
||||
{
|
||||
payload = [];
|
||||
}
|
||||
|
||||
// Deliver via the system account's SubList matching
|
||||
var result = SystemAccount.SubList.Match(pm.Subject);
|
||||
|
||||
foreach (var sub in result.PlainSubs)
|
||||
{
|
||||
sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply,
|
||||
pm.Headers ?? ReadOnlyMemory<byte>.Empty,
|
||||
payload);
|
||||
}
|
||||
|
||||
foreach (var queueGroup in result.QueueSubs)
|
||||
{
|
||||
if (queueGroup.Length == 0) continue;
|
||||
var sub = queueGroup[0]; // Simple pick for internal
|
||||
sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply,
|
||||
pm.Headers ?? ReadOnlyMemory<byte>.Empty,
|
||||
payload);
|
||||
}
|
||||
|
||||
if (pm.IsLast)
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error in internal send loop processing message on {Subject}", pm.Subject);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Normal shutdown
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The receive loop: dispatches callbacks for internally-received messages.
|
||||
/// Maps to Go's internalReceiveLoop in events.go:476-491.
|
||||
/// </summary>
|
||||
private async Task InternalReceiveLoopAsync(Channel<InternalSystemMessage> queue, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await foreach (var msg in queue.Reader.ReadAllAsync(ct))
|
||||
{
|
||||
try
|
||||
{
|
||||
msg.Callback(msg.Sub, msg.Client, msg.Account, msg.Subject, msg.Reply, msg.Headers, msg.Message);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error in internal receive loop processing {Subject}", msg.Subject);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Normal shutdown
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_sendQueue.Writer.TryComplete();
|
||||
_receiveQueue.Writer.TryComplete();
|
||||
_receiveQueuePings.Writer.TryComplete();
|
||||
|
||||
if (_sendLoop != null) await _sendLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
if (_receiveLoop != null) await _receiveLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
if (_receiveLoopPings != null) await _receiveLoopPings.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
|
||||
_cts.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ using Microsoft.Extensions.Logging;
|
||||
using NATS.NKeys;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Events;
|
||||
using NATS.Server.Monitoring;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Subscriptions;
|
||||
@@ -35,6 +36,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
private string? _configDigest;
|
||||
private readonly Account _globalAccount;
|
||||
private readonly Account _systemAccount;
|
||||
private InternalEventSystem? _eventSystem;
|
||||
private readonly SslServerAuthenticationOptions? _sslOptions;
|
||||
private readonly TlsRateLimiter? _tlsRateLimiter;
|
||||
private readonly SubjectTransform[] _subjectTransforms;
|
||||
@@ -70,6 +72,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
public int Port => _options.Port;
|
||||
public Account SystemAccount => _systemAccount;
|
||||
public string ServerNKey { get; }
|
||||
public InternalEventSystem? EventSystem => _eventSystem;
|
||||
public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0;
|
||||
public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0;
|
||||
public Action? ReOpenLogFile { get; set; }
|
||||
@@ -90,6 +93,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
|
||||
_logger.LogInformation("Initiating Shutdown...");
|
||||
|
||||
// Dispose event system before tearing down clients
|
||||
if (_eventSystem != null)
|
||||
await _eventSystem.DisposeAsync();
|
||||
|
||||
// Signal all internal loops to stop
|
||||
await _quitCts.CancelAsync();
|
||||
|
||||
@@ -265,6 +272,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
_systemAccount = new Account("$SYS");
|
||||
_accounts["$SYS"] = _systemAccount;
|
||||
|
||||
// Create system internal client and event system
|
||||
var sysClientId = Interlocked.Increment(ref _nextClientId);
|
||||
var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount);
|
||||
_eventSystem = new InternalEventSystem(
|
||||
_systemAccount, sysClient,
|
||||
options.ServerName ?? $"nats-dotnet-{Environment.MachineName}",
|
||||
_loggerFactory.CreateLogger<InternalEventSystem>());
|
||||
|
||||
// Generate Ed25519 server NKey identity
|
||||
using var serverKeyPair = KeyPair.CreatePair(PrefixByte.Server);
|
||||
ServerNKey = serverKeyPair.GetPublicKey();
|
||||
@@ -371,6 +386,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
|
||||
_listeningStarted.TrySetResult();
|
||||
|
||||
_eventSystem?.Start(this);
|
||||
|
||||
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
|
||||
|
||||
// Warn about stub features
|
||||
@@ -686,6 +703,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
});
|
||||
}
|
||||
|
||||
public void SendInternalMsg(string subject, string? reply, object? msg)
|
||||
{
|
||||
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg });
|
||||
}
|
||||
|
||||
public void SendInternalAccountMsg(Account account, string subject, object? msg)
|
||||
{
|
||||
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
|
||||
}
|
||||
|
||||
public void RemoveClient(NatsClient client)
|
||||
{
|
||||
_clients.TryRemove(client.Id, out _);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server.Events;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
@@ -65,4 +66,56 @@ public class EventSystemTests
|
||||
json.ShouldContain("\"connections\":10");
|
||||
json.ShouldContain("\"in_msgs\":5000");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task InternalEventSystem_start_and_stop_lifecycle()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var eventSystem = server.EventSystem;
|
||||
eventSystem.ShouldNotBeNull();
|
||||
eventSystem.SystemClient.ShouldNotBeNull();
|
||||
eventSystem.SystemClient.Kind.ShouldBe(ClientKind.System);
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SendInternalMsg_delivers_to_system_subscriber()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<string>();
|
||||
server.EventSystem!.SysSubscribe("test.subject", (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(subject);
|
||||
});
|
||||
|
||||
server.SendInternalMsg("test.subject", null, new { Value = "hello" });
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
result.ShouldBe("test.subject");
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
private static NatsServer CreateTestServer()
|
||||
{
|
||||
var port = GetFreePort();
|
||||
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new System.Net.Sockets.Socket(
|
||||
System.Net.Sockets.AddressFamily.InterNetwork,
|
||||
System.Net.Sockets.SocketType.Stream,
|
||||
System.Net.Sockets.ProtocolType.Tcp);
|
||||
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
|
||||
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user