From 06779a1f77c2e3c8b5775c3634d222a3d40cca9b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 26 Feb 2026 15:08:23 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20port=20session=2010=20=E2=80=94=20Serve?= =?UTF-8?q?r=20Core=20Runtime,=20Accept=20Loops=20&=20Listeners?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports server/server.go lines 2577–4782 (~1,881 Go LOC), implementing ~97 features (IDs 3051–3147) across three new partial-class files. New files: - NatsServer.Lifecycle.cs: Shutdown, WaitForShutdown, RemoveClient, SendLDMToClients, LameDuckMode, LDMClientByID, rate-limit logging, DisconnectClientByID, SendAsyncInfoToClients - NatsServer.Listeners.cs: AcceptLoop, GetServerListener, InProcessConn, AcceptConnections, GenerateInfoJson, CopyInfo, CreateClient/Ex/InProcess, StartMonitoring (HTTP/HTTPS), AddConnectURLs/RemoveConnectURLs, TlsVersion/TlsVersionFromString, GetClientConnectURLs, ResolveHostPorts, PortsInfo/PortFile/LogPorts, ReadyForListeners, GetRandomIP, AcceptError - Internal/WaitGroup.cs: Go-style WaitGroup using TaskCompletionSource Modified: - Auth/AuthTypes.cs: Account now implements INatsAccount (stub) - NatsServerTypes.cs: ServerInfo.ShallowClone(), removed duplicate RefCountedUrlSet - NatsServer.cs: _info promoted to internal for test access - Properties/AssemblyInfo.cs: InternalsVisibleTo(DynamicProxyGenAssembly2) - ServerTests.cs: 20 new session-10 unit tests (GenerateInfoJson, TlsVersion, CopyInfo, GetRandomIP — Test IDs 2895, 2906) All 565 unit tests + 1 integration test pass. --- .../ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs | 25 +- .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 15 + .../Internal/WaitGroup.cs | 64 + .../ZB.MOM.NatsNet.Server/NatsServer.Init.cs | 23 +- .../NatsServer.Lifecycle.cs | 847 ++++++++++++ .../NatsServer.Listeners.cs | 1164 +++++++++++++++++ .../src/ZB.MOM.NatsNet.Server/NatsServer.cs | 32 +- .../ZB.MOM.NatsNet.Server/NatsServerTypes.cs | 119 +- .../Properties/AssemblyInfo.cs | 1 + .../ServerTests.cs | 219 +++- porting.db | Bin 2473984 -> 2473984 bytes reports/current.md | 12 +- reports/report_0df93c2.md | 39 + 13 files changed, 2539 insertions(+), 21 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Internal/WaitGroup.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs create mode 100644 reports/report_0df93c2.md diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs index d43cc55..74a97fd 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/AuthTypes.cs @@ -13,6 +13,8 @@ // // Adapted from server/auth.go (type definitions) in the NATS server Go source. +using ZB.MOM.NatsNet.Server; + namespace ZB.MOM.NatsNet.Server.Auth; /// @@ -170,7 +172,7 @@ public class RoutePermissions /// Stub for Account type. Full implementation in session 11. /// Mirrors Go Account struct. /// -public class Account +public class Account : INatsAccount { public string Name { get; set; } = string.Empty; @@ -184,6 +186,27 @@ public class Account internal ZB.MOM.NatsNet.Server.Internal.DataStructures.SubscriptionIndex? Sublist { get; set; } internal object? Server { get; set; } // INatsServer — avoids circular reference + // INatsAccount — stub implementations until session 11 (accounts.go). + bool INatsAccount.IsValid => true; + bool INatsAccount.MaxTotalConnectionsReached() => false; + bool INatsAccount.MaxTotalLeafNodesReached() => false; + int INatsAccount.AddClient(ClientConnection c) => 0; + int INatsAccount.RemoveClient(ClientConnection c) => 0; + /// Returns true if this account's JWT has expired. Stub — full impl in session 11. public bool IsExpired() => false; + + /// + /// Returns the total number of subscriptions across all clients in this account. + /// Stub — full implementation in session 11. + /// Mirrors Go Account.TotalSubs(). + /// + public int TotalSubs() => 0; + + /// + /// Notifies leaf nodes of a subscription change. + /// Stub — full implementation in session 15. + /// Mirrors Go Account.updateLeafNodes(). + /// + internal void UpdateLeafNodes(object sub, int delta) { } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index dfa891b..0d54c95 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1077,6 +1077,21 @@ public sealed partial class ClientConnection // features 425-427: writeLoop / flushClients / readLoop internal void WriteLoop() { /* TODO session 09 */ } internal void FlushClients(long budget) { /* TODO session 09 */ } + internal void ReadLoop(byte[]? pre) { /* TODO session 09 */ } + + /// + /// Generates the INFO JSON bytes sent to the client on connect. + /// Stub — full implementation in session 09. + /// Mirrors Go client.generateClientInfoJSON(). + /// + internal ReadOnlyMemory GenerateClientInfoJSON(ServerInfo info, bool includeClientIp) + => ReadOnlyMemory.Empty; + + /// + /// Sets the auth-timeout timer to the specified duration. + /// Mirrors Go client.setAuthTimer(d). + /// + internal void SetAuthTimer(TimeSpan d) { /* TODO session 09 */ } // features 428-432: closedStateForErr, collapsePtoNB, flushOutbound, handleWriteTimeout, markConnAsClosed internal static ClosedState ClosedStateForErr(Exception err) => diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/WaitGroup.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/WaitGroup.cs new file mode 100644 index 0000000..e5f8a08 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/WaitGroup.cs @@ -0,0 +1,64 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace ZB.MOM.NatsNet.Server.Internal; + +/// +/// A Go-like WaitGroup: tracks a set of in-flight operations and lets callers +/// block until all of them complete. +/// +internal sealed class WaitGroup +{ + private int _count; + private volatile TaskCompletionSource _tcs; + + public WaitGroup() + { + _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _tcs.SetResult(true); // starts at zero, so "done" immediately + } + + /// + /// Increment the counter by (usually 1). + /// Must be called before starting the goroutine it tracks. + /// + public void Add(int delta = 1) + { + var newCount = Interlocked.Add(ref _count, delta); + if (newCount < 0) + throw new InvalidOperationException("WaitGroup counter went negative"); + + if (newCount == 0) + { + // All goroutines done — signal any waiters. + Volatile.Read(ref _tcs).TrySetResult(true); + } + else if (delta > 0 && newCount == delta) + { + // Transitioning from 0 to positive — replace the completed TCS + // with a fresh unsignaled one so Wait() will block correctly. + Volatile.Write(ref _tcs, + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); + } + } + + /// Decrement the counter by 1. Called when a goroutine finishes. + public void Done() => Add(-1); + + /// Block synchronously until the counter reaches 0. + public void Wait() + { + if (Volatile.Read(ref _count) == 0) return; + Volatile.Read(ref _tcs).Task.GetAwaiter().GetResult(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs index f13ec5e..c12a926 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -882,15 +882,26 @@ public sealed partial class NatsServer // ========================================================================= /// - /// Sets the INFO host/port from the current listener endpoint. - /// Full implementation in session 10 (listener startup). - /// Stub sets to opts values. + /// Sets the INFO host/port from either ClientAdvertise or the bind options. + /// Mirrors Go Server.setInfoHostPort(). + /// Returns non-null on parse error. /// - internal void SetInfoHostPort() + internal Exception? SetInfoHostPort() { var opts = GetOpts(); - _info.Host = opts.Host; - _info.Port = opts.Port; + if (!string.IsNullOrEmpty(opts.ClientAdvertise)) + { + var (h, p, err) = Internal.ServerUtilities.ParseHostPort(opts.ClientAdvertise, opts.Port); + if (err != null) return err; + _info.Host = h; + _info.Port = p; + } + else + { + _info.Host = opts.Host; + _info.Port = opts.Port; + } + return null; } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs new file mode 100644 index 0000000..093d773 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs @@ -0,0 +1,847 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/server.go (lines 2577–4782) in the NATS server Go source. +// Session 10: shutdown, goroutine tracking, query helpers, lame duck mode. + +using System.Net; +using System.Net.Sockets; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + // ========================================================================= + // Shutdown / WaitForShutdown (features 3051, 3053) + // ========================================================================= + + /// + /// Shuts the server down gracefully: closes all listeners, kicks all + /// connections, waits for goroutines, then signals . + /// Mirrors Go Server.Shutdown(). + /// + public void Shutdown() + { + // Stubs for JetStream / Raft / eventing — implemented in later sessions. + SignalPullConsumers(); + StepdownRaftNodes(); + ShutdownEventing(); + + if (IsShuttingDown()) return; + + _mu.EnterWriteLock(); + Noticef("Initiating Shutdown..."); + + var accRes = _accResolver; + GetOpts(); // snapshot opts (not used below but mirrors Go pattern) + + Interlocked.Exchange(ref _shutdown, 1); + Interlocked.Exchange(ref _running, 0); + lock (_grMu) { _grRunning = false; } + + _mu.ExitWriteLock(); + + accRes?.Close(); + + ShutdownJetStream(); + ShutdownRaftNodes(); + + // ---- Collect all connections ---- + var conns = new Dictionary(); + + _mu.EnterWriteLock(); + foreach (var kvp in _clients) conns[kvp.Key] = kvp.Value; + lock (_grMu) { foreach (var kvp in _grTmpClients) conns[kvp.Key] = kvp.Value; } + ForEachRoute(r => { conns[r.Cid] = r; }); + GetAllGatewayConnections(conns); + foreach (var kvp in _leafs) conns[kvp.Key] = kvp.Value; + + // ---- Count & close listeners ---- + int doneExpected = 0; + + if (_listener != null) + { + doneExpected++; + _listener.Stop(); + _listener = null; + } + doneExpected += CloseWebsocketServer(); + if (_gateway.Enabled) + { + // mqtt listener managed by session 22 + } + if (_leafNodeListener != null) + { + doneExpected++; + _leafNodeListener.Stop(); + _leafNodeListener = null; + } + if (_routeListener != null) + { + doneExpected++; + _routeListener.Stop(); + _routeListener = null; + } + if (_gatewayListener != null) + { + doneExpected++; + _gatewayListener.Stop(); + _gatewayListener = null; + } + if (_http != null) + { + doneExpected++; + _http.Stop(); + _http = null; + } + if (_profiler != null) + { + doneExpected++; + _profiler.Stop(); + // profiler is not nulled — see Go code: it keeps _profiler ref for ProfilerAddr() + } + + _mu.ExitWriteLock(); + + // Release all goroutines waiting on quitCts. + _quitCts.Cancel(); + + // Close all client / route / gateway / leaf connections. + foreach (var c in conns.Values) + { + c.SetNoReconnect(); + c.CloseConnection(ClosedState.ServerShutdown); + } + + // Wait for accept loops to exit. + for (int i = 0; i < doneExpected; i++) + _done.Reader.ReadAsync().GetAwaiter().GetResult(); + + // Wait for all goroutines. + _grWg.Wait(); + + var opts = GetOpts(); + if (!string.IsNullOrEmpty(opts.PortsFileDir)) + DeletePortsFile(opts.PortsFileDir); + + Noticef("Server Exiting.."); + + if (_ocsprc != null) { /* stub — stop OCSP cache in session 23 */ } + + _shutdownComplete.TrySetResult(); + } + + /// + /// Blocks until has fully completed. + /// Mirrors Go Server.WaitForShutdown(). + /// + public void WaitForShutdown() => + _shutdownComplete.Task.GetAwaiter().GetResult(); + + // ========================================================================= + // Goroutine tracking (features 3119–3120) + // ========================================================================= + + /// + /// Starts a background Task that counts toward the server wait group. + /// Returns true if the goroutine was started, false if the server is already stopped. + /// Mirrors Go Server.startGoRoutine(f). + /// + internal bool StartGoRoutine(Action f) + { + lock (_grMu) + { + if (!_grRunning) return false; + _grWg.Add(1); + Task.Run(() => + { + try { f(); } + finally { _grWg.Done(); } + }); + return true; + } + } + + // ========================================================================= + // Client / connection management (features 3081–3084) + // ========================================================================= + + /// + /// Removes a client, route, gateway, or leaf from server accounting. + /// Mirrors Go Server.removeClient(). + /// + internal void RemoveClient(ClientConnection c) + { + switch (c.Kind) + { + case ClientKind.Client: + { + bool updateProto; + string proxyKey; + lock (c) + { + updateProto = c.Kind == ClientKind.Client && + c.Opts.Protocol >= ClientProtocol.Info; + proxyKey = c.ProxyKey; + } + _mu.EnterWriteLock(); + try + { + _clients.Remove(c.Cid); + if (updateProto) _cproto--; + if (!string.IsNullOrEmpty(proxyKey)) + RemoveProxiedConn(proxyKey, c.Cid); + } + finally { _mu.ExitWriteLock(); } + break; + } + case ClientKind.Router: + RemoveRoute(c); + break; + case ClientKind.Gateway: + RemoveRemoteGatewayConnection(c); + break; + case ClientKind.Leaf: + RemoveLeafNodeConnection(c); + break; + } + } + + /// + /// Removes a proxied connection entry. + /// Server write lock must be held on entry. + /// Mirrors Go Server.removeProxiedConn(). + /// + private void RemoveProxiedConn(string key, ulong cid) + { + if (!_proxiedConns.TryGetValue(key, out var conns)) return; + conns.Remove(cid); + if (conns.Count == 0) _proxiedConns.Remove(key); + } + + /// + /// Removes a client from the temporary goroutine client map. + /// Mirrors Go Server.removeFromTempClients(). + /// + internal void RemoveFromTempClients(ulong cid) + { + lock (_grMu) { _grTmpClients.Remove(cid); } + } + + /// + /// Adds a client to the temporary goroutine client map. + /// Returns false if the server is no longer running goroutines. + /// Mirrors Go Server.addToTempClients(). + /// + internal bool AddToTempClients(ulong cid, ClientConnection c) + { + lock (_grMu) + { + if (!_grRunning) return false; + _grTmpClients[cid] = c; + return true; + } + } + + // ========================================================================= + // Query helpers (features 3085–3118, 3121–3123) + // ========================================================================= + + /// Returns the number of registered routes. Mirrors Go Server.NumRoutes(). + public int NumRoutes() + { + _mu.EnterReadLock(); + try { return NumRoutesInternal(); } + finally { _mu.ExitReadLock(); } + } + + private int NumRoutesInternal() + { + int nr = 0; + ForEachRoute(_ => nr++); + return nr; + } + + /// Returns the number of registered remotes. Mirrors Go Server.NumRemotes(). + public int NumRemotes() + { + _mu.EnterReadLock(); + try { return _routes.Count; } + finally { _mu.ExitReadLock(); } + } + + /// Returns the number of leaf-node connections. Mirrors Go Server.NumLeafNodes(). + public int NumLeafNodes() + { + _mu.EnterReadLock(); + try { return _leafs.Count; } + finally { _mu.ExitReadLock(); } + } + + /// Returns the number of registered clients. Mirrors Go Server.NumClients(). + public int NumClients() + { + _mu.EnterReadLock(); + try { return _clients.Count; } + finally { _mu.ExitReadLock(); } + } + + /// Returns the client with the given connection ID. Mirrors Go Server.GetClient(). + public ClientConnection? GetClient(ulong cid) => GetClientInternal(cid); + + private ClientConnection? GetClientInternal(ulong cid) + { + _mu.EnterReadLock(); + try + { + _clients.TryGetValue(cid, out var c); + return c; + } + finally { _mu.ExitReadLock(); } + } + + /// Returns the leaf node with the given connection ID. Mirrors Go Server.GetLeafNode(). + public ClientConnection? GetLeafNode(ulong cid) + { + _mu.EnterReadLock(); + try + { + _leafs.TryGetValue(cid, out var c); + return c; + } + finally { _mu.ExitReadLock(); } + } + + /// + /// Returns total subscriptions across all accounts. + /// Mirrors Go Server.NumSubscriptions(). + /// + public uint NumSubscriptions() + { + _mu.EnterReadLock(); + try { return NumSubscriptionsInternal(); } + finally { _mu.ExitReadLock(); } + } + + private uint NumSubscriptionsInternal() + { + int subs = 0; + foreach (var kvp in _accounts) + subs += kvp.Value.TotalSubs(); + return (uint)subs; + } + + /// Returns the number of slow consumers. Mirrors Go Server.NumSlowConsumers(). + public long NumSlowConsumers() => Interlocked.Read(ref _stats.SlowConsumers); + + /// Returns the number of times clients were stalled. Mirrors Go Server.NumStalledClients(). + public long NumStalledClients() => Interlocked.Read(ref _stats.Stalls); + + /// Mirrors Go Server.NumSlowConsumersClients(). + public long NumSlowConsumersClients() => Interlocked.Read(ref _scStats.Clients); + + /// Mirrors Go Server.NumSlowConsumersRoutes(). + public long NumSlowConsumersRoutes() => Interlocked.Read(ref _scStats.Routes); + + /// Mirrors Go Server.NumSlowConsumersGateways(). + public long NumSlowConsumersGateways() => Interlocked.Read(ref _scStats.Gateways); + + /// Mirrors Go Server.NumSlowConsumersLeafs(). + public long NumSlowConsumersLeafs() => Interlocked.Read(ref _scStats.Leafs); + + /// Mirrors Go Server.NumStaleConnections(). + public long NumStaleConnections() => Interlocked.Read(ref _stats.StaleConnections); + + /// Mirrors Go Server.NumStaleConnectionsClients(). + public long NumStaleConnectionsClients() => Interlocked.Read(ref _staleStats.Clients); + + /// Mirrors Go Server.NumStaleConnectionsRoutes(). + public long NumStaleConnectionsRoutes() => Interlocked.Read(ref _staleStats.Routes); + + /// Mirrors Go Server.NumStaleConnectionsGateways(). + public long NumStaleConnectionsGateways() => Interlocked.Read(ref _staleStats.Gateways); + + /// Mirrors Go Server.NumStaleConnectionsLeafs(). + public long NumStaleConnectionsLeafs() => Interlocked.Read(ref _staleStats.Leafs); + + /// Returns the time the current configuration was loaded. Mirrors Go Server.ConfigTime(). + public DateTime ConfigTime() + { + _mu.EnterReadLock(); + try { return _configTime; } + finally { _mu.ExitReadLock(); } + } + + /// Returns the client listener address. Mirrors Go Server.Addr(). + public EndPoint? Addr() + { + _mu.EnterReadLock(); + try { return _listener?.LocalEndpoint; } + finally { _mu.ExitReadLock(); } + } + + /// Returns the monitoring listener address. Mirrors Go Server.MonitorAddr(). + public IPEndPoint? MonitorAddr() + { + _mu.EnterReadLock(); + try { return _http?.LocalEndpoint as IPEndPoint; } + finally { _mu.ExitReadLock(); } + } + + /// Returns the cluster (route) listener address. Mirrors Go Server.ClusterAddr(). + public IPEndPoint? ClusterAddr() + { + _mu.EnterReadLock(); + try { return _routeListener?.LocalEndpoint as IPEndPoint; } + finally { _mu.ExitReadLock(); } + } + + /// Returns the profiler listener address. Mirrors Go Server.ProfilerAddr(). + public IPEndPoint? ProfilerAddr() + { + _mu.EnterReadLock(); + try { return _profiler?.LocalEndpoint as IPEndPoint; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Polls until all expected listeners are up or the deadline expires. + /// Returns an error description if not ready within . + /// Mirrors Go Server.readyForConnections(). + /// + public Exception? ReadyForConnectionsError(TimeSpan d) + { + var opts = GetOpts(); + var end = DateTime.UtcNow.Add(d); + + while (DateTime.UtcNow < end) + { + bool serverOk, routeOk, gatewayOk, leafOk, wsOk; + _mu.EnterReadLock(); + serverOk = _listener != null || opts.DontListen; + routeOk = opts.Cluster.Port == 0 || _routeListener != null; + gatewayOk = string.IsNullOrEmpty(opts.Gateway.Name) || _gatewayListener != null; + leafOk = opts.LeafNode.Port == 0 || _leafNodeListener != null; + wsOk = opts.Websocket.Port == 0; // stub — websocket listener not tracked until session 23 + _mu.ExitReadLock(); + + if (serverOk && routeOk && gatewayOk && leafOk && wsOk) + { + if (opts.DontListen) + { + try { _startupComplete.Task.Wait((int)d.TotalMilliseconds); } + catch { /* timeout */ } + } + return null; + } + + if (d > TimeSpan.FromMilliseconds(25)) + Thread.Sleep(25); + } + + return new InvalidOperationException( + $"failed to be ready for connections after {d}"); + } + + /// + /// Returns true if the server is ready to accept connections. + /// Mirrors Go Server.ReadyForConnections(). + /// + public bool ReadyForConnections(TimeSpan dur) => + ReadyForConnectionsError(dur) == null; + + /// Returns true if the server supports headers. Mirrors Go Server.supportsHeaders(). + internal bool SupportsHeaders() => !(GetOpts().NoHeaderSupport); + + /// Returns the server ID. Mirrors Go Server.ID(). + public string ID() => _info.Id; + + /// Returns the JetStream node name (hash of server name). Mirrors Go Server.NodeName(). + public string NodeName() => GetHash(_info.Name); + + /// Returns the server name. Mirrors Go Server.Name(). + public string Name() => _info.Name; + + /// Returns the server name as a string. Mirrors Go Server.String(). + public override string ToString() => _info.Name; + + /// Returns the number of currently-stored closed connections. Mirrors Go Server.numClosedConns(). + internal int NumClosedConns() + { + _mu.EnterReadLock(); + try { return _closed.Len(); } + finally { _mu.ExitReadLock(); } + } + + /// Returns total closed connections ever recorded. Mirrors Go Server.totalClosedConns(). + internal ulong TotalClosedConns() + { + _mu.EnterReadLock(); + try { return _closed.TotalConns(); } + finally { _mu.ExitReadLock(); } + } + + /// Returns a snapshot of recently closed connections. Mirrors Go Server.closedClients(). + internal Internal.ClosedClient?[] ClosedClients() + { + _mu.EnterReadLock(); + try { return _closed.ClosedClients(); } + finally { _mu.ExitReadLock(); } + } + + // ========================================================================= + // Lame duck mode (features 3135–3139) + // ========================================================================= + + /// Returns true if the server is in lame duck mode. Mirrors Go Server.isLameDuckMode(). + public bool IsLameDuckMode() + { + _mu.EnterReadLock(); + try { return _ldm; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Performs a lame-duck shutdown: stops accepting new clients, notifies + /// existing clients to reconnect elsewhere, then shuts down. + /// Mirrors Go Server.LameDuckShutdown(). + /// + public void LameDuckShutdown() => LameDuckMode(); + + /// + /// Core lame-duck implementation. + /// Mirrors Go Server.lameDuckMode(). + /// + internal void LameDuckMode() + { + _mu.EnterWriteLock(); + if (IsShuttingDown() || _ldm || _listener == null) + { + _mu.ExitWriteLock(); + return; + } + Noticef("Entering lame duck mode, stop accepting new clients"); + _ldm = true; + SendLDMShutdownEventLocked(); + + int expected = 1; + _listener.Stop(); + _listener = null; + expected += CloseWebsocketServer(); + _ldmCh = System.Threading.Channels.Channel.CreateBounded( + new System.Threading.Channels.BoundedChannelOptions(expected) + { FullMode = System.Threading.Channels.BoundedChannelFullMode.Wait }); + + var opts = GetOpts(); + var gp = opts.LameDuckGracePeriod; + if (gp < TimeSpan.Zero) gp = gp.Negate(); + _mu.ExitWriteLock(); + + // Transfer Raft leaders (stub returns false). + if (TransferRaftLeaders()) + Thread.Sleep(1000); + + ShutdownJetStream(); + ShutdownRaftNodes(); + + // Wait for accept loops. + for (int i = 0; i < expected; i++) + _ldmCh.Reader.ReadAsync().GetAwaiter().GetResult(); + + _mu.EnterWriteLock(); + var clients = new List(_clients.Values); + + if (IsShuttingDown() || clients.Count == 0) + { + _mu.ExitWriteLock(); + Shutdown(); + return; + } + + var dur = opts.LameDuckDuration - gp; + if (dur <= TimeSpan.Zero) dur = TimeSpan.FromSeconds(1); + + long numClients = clients.Count; + var si = dur / numClients; + int batch = 1; + + if (si < TimeSpan.FromTicks(1)) + { + si = TimeSpan.FromTicks(1); + batch = (int)(numClients / dur.Ticks); + } + else if (si > TimeSpan.FromSeconds(1)) + { + si = TimeSpan.FromSeconds(1); + } + + SendLDMToRoutes(); + SendLDMToClients(); + _mu.ExitWriteLock(); + + // Grace-period delay. + var token = _quitCts.Token; + try { Task.Delay(gp, token).GetAwaiter().GetResult(); } + catch (OperationCanceledException) { return; } + + Noticef("Closing existing clients"); + for (int i = 0; i < clients.Count; i++) + { + clients[i].CloseConnection(ClosedState.ServerShutdown); + if (i == clients.Count - 1) break; + if (batch == 1 || i % batch == 0) + { + var jitter = (long)(Random.Shared.NextDouble() * si.Ticks); + if (jitter < si.Ticks / 2) jitter = si.Ticks / 2; + try { Task.Delay(TimeSpan.FromTicks(jitter), token).GetAwaiter().GetResult(); } + catch (OperationCanceledException) { return; } + } + } + + Shutdown(); + WaitForShutdown(); + } + + /// + /// Sends an LDM INFO to all routes. + /// Server lock must be held on entry. + /// Mirrors Go Server.sendLDMToRoutes(). + /// + private void SendLDMToRoutes() + { + _routeInfo.LameDuckMode = true; + var infoJson = GenerateInfoJson(_routeInfo); + ForEachRemote(r => + { + lock (r) { r.EnqueueProto(infoJson); } + }); + _routeInfo.LameDuckMode = false; + } + + /// + /// Sends an LDM INFO to all connected clients. + /// Server lock must be held on entry. + /// Mirrors Go Server.sendLDMToClients(). + /// + private void SendLDMToClients() + { + _info.LameDuckMode = true; + _clientConnectUrls.Clear(); + + _info.ClientConnectUrls = null; + _info.WsConnectUrls = null; + + if (!GetOpts().Cluster.NoAdvertise) + { + var cUrls = _clientConnectUrlsMap.GetAsStringSlice(); + _info.ClientConnectUrls = cUrls.Length > 0 ? cUrls : null; + } + + SendAsyncInfoToClients(true, true); + _info.LameDuckMode = false; + } + + // ========================================================================= + // Rate-limit logging (features 3144–3145) + // ========================================================================= + + /// + /// Starts the background goroutine that expires rate-limit log entries. + /// Mirrors Go Server.startRateLimitLogExpiration(). + /// + internal void StartRateLimitLogExpiration() + { + StartGoRoutine(() => + { + var interval = TimeSpan.FromSeconds(1); + var token = _quitCts.Token; + + while (!token.IsCancellationRequested) + { + try { Task.Delay(interval, token).GetAwaiter().GetResult(); } + catch (OperationCanceledException) { return; } + + var now = DateTime.UtcNow; + foreach (var key in _rateLimitLogging.Keys) + { + if (_rateLimitLogging.TryGetValue(key, out var val) && + val is DateTime ts && now - ts >= interval) + { + _rateLimitLogging.TryRemove(key, out _); + } + } + + // Check for a new interval value. + if (_rateLimitLoggingCh.Reader.TryRead(out var newInterval)) + interval = newInterval; + } + }); + } + + /// + /// Updates the rate-limit logging interval. + /// Mirrors Go Server.changeRateLimitLogInterval(). + /// + internal void ChangeRateLimitLogInterval(TimeSpan d) + { + if (d <= TimeSpan.Zero) return; + _rateLimitLoggingCh.Writer.TryWrite(d); + } + + // ========================================================================= + // DisconnectClientByID / LDMClientByID (features 3146–3147) + // ========================================================================= + + /// + /// Forcibly disconnects the client or leaf node with the given ID. + /// Mirrors Go Server.DisconnectClientByID(). + /// + public Exception? DisconnectClientByID(ulong id) + { + var c = GetClientInternal(id); + if (c != null) { c.CloseConnection(ClosedState.Kicked); return null; } + c = GetLeafNode(id); + if (c != null) { c.CloseConnection(ClosedState.Kicked); return null; } + return new InvalidOperationException("no such client or leafnode id"); + } + + /// + /// Sends a Lame Duck Mode INFO message to the specified client. + /// Mirrors Go Server.LDMClientByID(). + /// + public Exception? LDMClientByID(ulong id) + { + ClientConnection? c; + ServerInfo info; + _mu.EnterReadLock(); + _clients.TryGetValue(id, out c); + if (c == null) + { + _mu.ExitReadLock(); + return new InvalidOperationException("no such client id"); + } + info = CopyInfo(); + info.LameDuckMode = true; + _mu.ExitReadLock(); + + lock (c) + { + if (c.Opts.Protocol >= ClientProtocol.Info && + (c.Flags & ClientFlags.FirstPongSent) != 0) + { + c.Debugf("Sending Lame Duck Mode info to client"); + c.EnqueueProto(c.GenerateClientInfoJSON(info, true).Span); + return null; + } + } + return new InvalidOperationException( + "client does not support Lame Duck Mode or is not ready to receive the notification"); + } + + // ========================================================================= + // updateRemoteSubscription / shouldReportConnectErr (features 3142–3143) + // ========================================================================= + + /// + /// Notifies routes, gateways, and leaf nodes about a subscription change. + /// Mirrors Go Server.updateRemoteSubscription(). + /// + internal void UpdateRemoteSubscription(Account acc, Subscription sub, int delta) + { + UpdateRouteSubscriptionMap(acc, sub, delta); + if (_gateway.Enabled) + GatewayUpdateSubInterest(acc.Name, sub, delta); + acc.UpdateLeafNodes(sub, delta); + } + + /// + /// Returns true if a connect error at this attempt count should be reported. + /// Mirrors Go Server.shouldReportConnectErr(). + /// + internal bool ShouldReportConnectErr(bool firstConnect, int attempts) + { + var opts = GetOpts(); + int threshold = firstConnect ? opts.ConnectErrorReports : opts.ReconnectErrorReports; + return attempts == 1 || attempts % threshold == 0; + } + + // ========================================================================= + // Session 10 stubs for cross-session calls + // ========================================================================= + + /// Stub — JetStream pull-consumer signalling (session 19). + private void SignalPullConsumers() { } + + /// Stub — Raft step-down (session 20). + private void StepdownRaftNodes() { } + + /// Stub — eventing shutdown (session 12). + private void ShutdownEventing() { } + + /// Stub — JetStream shutdown (session 19). + private void ShutdownJetStream() { } + + /// Stub — Raft nodes shutdown (session 20). + private void ShutdownRaftNodes() { } + + /// Stub — Raft leader transfer (session 20). Returns false (no leaders to transfer). + private bool TransferRaftLeaders() => false; + + /// Stub — LDM shutdown event (session 12). + private void SendLDMShutdownEventLocked() { } + + /// + /// Stub — closes WebSocket server if running (session 23). + /// Returns the number of done-channel signals to expect. + /// + private int CloseWebsocketServer() => 0; + + /// + /// Iterates over all route connections. Stub — session 14. + /// Server lock must be held on entry. + /// + internal void ForEachRoute(Action fn) { } + + /// + /// Iterates over all remote (outbound route) connections. Stub — session 14. + /// Server lock must be held on entry. + /// + private void ForEachRemote(Action fn) { } + + /// Stub — collects all gateway connections (session 16). + private void GetAllGatewayConnections(Dictionary conns) { } + + /// Stub — removes a route connection (session 14). + private void RemoveRoute(ClientConnection c) { } + + /// Stub — removes a remote gateway connection (session 16). + private void RemoveRemoteGatewayConnection(ClientConnection c) { } + + /// Stub — removes a leaf-node connection (session 15). + private void RemoveLeafNodeConnection(ClientConnection c) { } + + /// Stub — sends async INFO to clients (session 10/11). No-op until clients are running. + private void SendAsyncInfoToClients(bool cliUpdated, bool wsUpdated) { } + + /// Stub — updates route subscription map (session 14). + private void UpdateRouteSubscriptionMap(Account acc, Subscription sub, int delta) { } + + /// Stub — updates gateway sub interest (session 16). + private void GatewayUpdateSubInterest(string accName, Subscription sub, int delta) { } + + /// Stub — account disconnect event (session 12). + private void AccountDisconnectEvent(ClientConnection c, DateTime now, string reason) { } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs new file mode 100644 index 0000000..5d8d50d --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs @@ -0,0 +1,1164 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/server.go (lines 2779–4407) in the NATS server Go source. +// Session 10: accept loops, client creation, connect URL management, port helpers. + +using System.Net; +using System.Net.Sockets; +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + // ========================================================================= + // Accept min/max sleep constants (mirrors const.go ACCEPT_MIN_SLEEP / ACCEPT_MAX_SLEEP) + // ========================================================================= + + private static readonly TimeSpan AcceptMinSleep = TimeSpan.FromMilliseconds(10); + private static readonly TimeSpan AcceptMaxSleep = TimeSpan.FromSeconds(1); + + // ========================================================================= + // HTTP monitoring path constants (mirrors RootPath / VarzPath / ... in server.go) + // ========================================================================= + + /// HTTP monitoring path constants. Mirrors the Go const block in server.go. + public static class MonitorPaths + { + public const string Root = "/"; + public const string Varz = "/varz"; + public const string Connz = "/connz"; + public const string Routez = "/routez"; + public const string Gatewayz = "/gatewayz"; + public const string Leafz = "/leafz"; + public const string Subsz = "/subsz"; + public const string Stacksz = "/stacksz"; + public const string Accountz = "/accountz"; + public const string AccountStatz = "/accstatz"; + public const string Jsz = "/jsz"; + public const string Healthz = "/healthz"; + public const string IPQueuesz = "/ipqueuesz"; + public const string Raftz = "/raftz"; + public const string Expvarz = "/debug/vars"; + } + + // ========================================================================= + // Nonce (session 10 stubs — full implementation in session 11 auth.go) + // ========================================================================= + + /// + /// Returns true if this server requires clients to send a nonce for auth. + /// Stub — full implementation in session 11. + /// Mirrors Go Server.nonceRequired(). + /// + private bool NonceRequired() => false; + + /// + /// Fills with random bytes. + /// Stub — full implementation in session 11. + /// Mirrors Go Server.generateNonce(). + /// + private void GenerateNonce(byte[] nonce) { } + + // ========================================================================= + // INFO JSON serialisation (feature 3124) + // ========================================================================= + + /// + /// Serialises to a JSON INFO line (INFO {...}\r\n). + /// Mirrors Go generateInfoJSON(). + /// + internal static byte[] GenerateInfoJson(ServerInfo info) + { + var json = JsonSerializer.Serialize(info); + var line = $"INFO {json}\r\n"; + return System.Text.Encoding.UTF8.GetBytes(line); + } + + // ========================================================================= + // copyInfo — deep copy INFO struct (feature 3071) + // ========================================================================= + + /// + /// Deep-copies the server's INFO struct. + /// Server lock must be held on entry. + /// Mirrors Go Server.copyInfo(). + /// + internal ServerInfo CopyInfo() + { + // Shallow clone (copies all value-type and reference fields). + // ServerInfo is a class, so we must clone the object rather than copy the reference. + var info = _info.ShallowClone(); + + // Deep-copy the slice fields that may be modified independently. + if (info.ClientConnectUrls is { Length: > 0 }) + info.ClientConnectUrls = [.. info.ClientConnectUrls]; + + if (info.WsConnectUrls is { Length: > 0 }) + info.WsConnectUrls = [.. info.WsConnectUrls]; + + return info; + } + + // ========================================================================= + // AcceptLoop — exported for easier testing (feature 3052) + // ========================================================================= + + /// + /// Starts the main client accept loop. The optional + /// TCS is completed once the listener is bound (mirrors Go chan struct{}). + /// Mirrors Go Server.AcceptLoop(). + /// + public void AcceptLoop(TaskCompletionSource? ready = null) + { + // If we were to exit before the listener is set up, signal the TCS. + try + { + if (IsShuttingDown()) return; + + var opts = GetOpts(); + + _mu.EnterWriteLock(); + var hp = $"{opts.Host}:{opts.Port}"; + var result = GetServerListener(hp); + _listenerErr = result.err; + + if (result.err != null) + { + _mu.ExitWriteLock(); + Fatalf("Error listening on port: {0}, {1}", hp, result.err.Message); + return; + } + + var l = result.listener!; + Noticef("Listening for client connections on {0}", l.LocalEndpoint); + + if (opts.TlsConfig != null) + { + Noticef("TLS required for client connections"); + } + + // If opts.Port was 0, write resolved port back. + if (opts.Port == 0) + opts.Port = ((IPEndPoint)l.LocalEndpoint!).Port; + + // Set the server's info Host/Port. + var infoErr = SetInfoHostPort(); + if (infoErr != null) + { + Fatalf("Error setting server INFO with ClientAdvertise value of {0}, err={1}", + opts.ClientAdvertise, infoErr.Message); + l.Stop(); + _mu.ExitWriteLock(); + return; + } + + _clientConnectUrls.Clear(); + _clientConnectUrls.AddRange(GetClientConnectURLs()); + _listener = l; + + // Start the accept goroutine. + _ = Task.Run(() => + { + _reloadMu.EnterReadLock(); + AcceptConnections(l, "Client", + tc => CreateClient(tc), + _ => + { + if (IsLameDuckMode()) + { + // Signal that we are not accepting new clients. + _ldmCh?.Writer.TryWrite(default); + // Wait for quit signal. + _quitCts.Token.WaitHandle.WaitOne(); + return true; + } + return false; + }); + _reloadMu.ExitReadLock(); + }); + + _mu.ExitWriteLock(); + } + finally + { + // Signal that we are ready (or exiting early). + ready?.TrySetResult(); + } + } + + // ========================================================================= + // GetServerListener (feature 3054) + // ========================================================================= + + /// + /// Returns the existing client listener (if any) or creates a new TCP listener on . + /// Server lock must be held on entry. + /// Mirrors Go Server.getServerListener(). + /// + private (TcpListener? listener, Exception? err) GetServerListener(string hp) + { + if (_listener != null) + return (_listener, _listenerErr); + + try + { + var parts = hp.Split(':', 2); + var host = parts[0]; + var port = parts.Length > 1 ? int.Parse(parts[1]) : 0; + var addr = string.IsNullOrEmpty(host) || host == "0.0.0.0" + ? IPAddress.Any + : (host == "::" ? IPAddress.IPv6Any : IPAddress.Parse(host)); + + var l = new TcpListener(addr, port); + l.Start(); + return (l, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + // ========================================================================= + // InProcessConn (feature 3055) + // ========================================================================= + + /// + /// Returns an in-process connection to the server, bypassing the need for + /// a TCP listener for local connectivity within the same process. + /// Mirrors Go Server.InProcessConn(). + /// + public (Stream? conn, Exception? err) InProcessConn() + { + // Create a loopback TCP socket pair to simulate net.Pipe(). + TcpListener? loopback = null; + TcpClient? clientTcp = null; + TcpClient? serverTcp = null; + + try + { + loopback = new TcpListener(IPAddress.Loopback, 0); + loopback.Start(); + var ep = (IPEndPoint)loopback.LocalEndpoint!; + + clientTcp = new TcpClient(); + clientTcp.Connect(ep); + serverTcp = loopback.AcceptTcpClient(); + } + catch (Exception ex) + { + clientTcp?.Dispose(); + serverTcp?.Dispose(); + return (null, ex); + } + finally + { + loopback?.Stop(); + } + + var serverStream = serverTcp.GetStream(); + var clientStream = clientTcp.GetStream(); + + if (!StartGoRoutine(() => CreateClientInProcess(serverStream))) + { + serverStream.Dispose(); + clientStream.Dispose(); + return (null, new InvalidOperationException("failed to create connection")); + } + + return (clientStream, null); + } + + // ========================================================================= + // AcceptConnections — inner accept loop (feature 3056) + // ========================================================================= + + /// + /// Runs the TCP accept loop for , calling + /// for each accepted connection. On error, calls and backs off. + /// Signals _done when the loop exits. + /// Mirrors Go Server.acceptConnections(). + /// + private void AcceptConnections( + TcpListener l, + string acceptName, + Action createFunc, + Func? errFunc = null) + { + var tmpDelay = AcceptMinSleep; + + while (true) + { + TcpClient tc; + try + { + tc = l.AcceptTcpClient(); + } + catch (Exception err) + { + if (errFunc != null && errFunc(err)) + break; + + var newDelay = AcceptError(acceptName, err, tmpDelay); + if (newDelay < TimeSpan.Zero) + break; + + tmpDelay = newDelay; + continue; + } + + tmpDelay = AcceptMinSleep; + + var captured = tc; + if (!StartGoRoutine(() => + { + try + { + _reloadMu.EnterReadLock(); + createFunc(captured); + _reloadMu.ExitReadLock(); + } + catch + { + _reloadMu.ExitReadLock(); + captured.Dispose(); + } + })) + { + tc.Dispose(); + } + } + + Debugf("{0} accept loop exiting..", acceptName); + _done.Writer.TryWrite(default); + } + + // ========================================================================= + // Profiler (feature 3057) + // ========================================================================= + + /// + /// Starts the HTTP profiling server. + /// Stub — full pprof integration deferred to session 17. + /// Mirrors Go Server.StartProfiler(). + /// + public void StartProfiler() + { + if (IsShuttingDown()) return; + + var opts = GetOpts(); + var port = opts.ProfPort == -1 ? 0 : opts.ProfPort; + + _mu.EnterWriteLock(); + try + { + var hp = $"{opts.Host}:{port}"; + var parts = hp.Split(':', 2); + var addr = string.IsNullOrEmpty(parts[0]) ? IPAddress.Any : IPAddress.Parse(parts[0]); + var l = new TcpListener(addr, port); + l.Start(); + Noticef("profiling port: {0}", ((IPEndPoint)l.LocalEndpoint!).Port); + _profiler = l; + } + catch (Exception ex) + { + Fatalf("error starting profiler: {0}", ex); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + /// Sets the block profiling rate. Stub — mirrors Go Server.setBlockProfileRate(). + /// + private void SetBlockProfileRate(int rate) + { + if (rate > 0) + Warnf("Block profiling is enabled (rate {0}), this may have a performance impact", rate); + // runtime.SetBlockProfileRate — not applicable in .NET; no-op. + } + + // ========================================================================= + // HTTP Monitoring (features 3058–3061) + // ========================================================================= + + /// + /// Enables the HTTP monitoring port. + /// Deprecated: use instead. + /// Mirrors Go Server.StartHTTPMonitoring(). + /// + public void StartHTTPMonitoring() => StartMonitoringInternal(false); + + /// + /// Enables the HTTPS monitoring port. + /// Deprecated: use instead. + /// Mirrors Go Server.StartHTTPSMonitoring(). + /// + public void StartHTTPSMonitoring() => StartMonitoringInternal(true); + + /// + /// Starts the HTTP or HTTPS monitoring server as configured. + /// Mirrors Go Server.StartMonitoring(). + /// + public Exception? StartMonitoring() + { + var opts = GetOpts(); + if (opts.HttpPort != 0 && opts.HttpsPort != 0) + return new InvalidOperationException( + $"can't specify both HTTP ({opts.HttpPort}) and HTTPs ({opts.HttpsPort}) ports"); + + Exception? err = null; + if (opts.HttpPort != 0) + err = StartMonitoringInternal(false); + else if (opts.HttpsPort != 0) + { + if (opts.TlsConfig == null) + return new InvalidOperationException("TLS cert and key required for HTTPS"); + err = StartMonitoringInternal(true); + } + return err; + } + + /// + /// Returns the base URL path for the monitoring server. + /// Mirrors Go Server.basePath(). + /// + private string BasePath(string p) => + string.IsNullOrEmpty(_httpBasePath) ? p : System.IO.Path.Combine(_httpBasePath, p.TrimStart('/')); + + /// + /// Starts the HTTP/HTTPS monitoring listener. + /// Stub — full monitoring handler registration deferred to session 17. + /// Mirrors Go Server.startMonitoring(). + /// + private Exception? StartMonitoringInternal(bool secure) + { + if (IsShuttingDown()) return null; + + var opts = GetOpts(); + int port; + string proto; + + if (secure) + { + proto = "https"; + port = opts.HttpsPort == -1 ? 0 : opts.HttpsPort; + } + else + { + proto = "http"; + port = opts.HttpPort == -1 ? 0 : opts.HttpPort; + } + + var hp = $"{opts.HttpHost}:{port}"; + TcpListener httpListener; + try + { + var parts = hp.Split(':', 2); + var bindAddr = string.IsNullOrEmpty(parts[0]) ? IPAddress.Any : IPAddress.Parse(parts[0]); + httpListener = new TcpListener(bindAddr, port); + httpListener.Start(); + } + catch (Exception ex) + { + return new InvalidOperationException($"can't listen to the monitor port: {ex.Message}", ex); + } + + var rport = ((IPEndPoint)httpListener.LocalEndpoint!).Port; + Noticef("Starting {0} monitor on {1}", proto, $"{opts.HttpHost}:{rport}"); + + _mu.EnterWriteLock(); + _http = httpListener; + _mu.ExitWriteLock(); + + // Full HTTP handler registration in session 17; for now just drain the listener. + _ = Task.Run(() => + { + // Accept and immediately close connections until shutdown. + while (!IsShuttingDown()) + { + try + { + var tc = httpListener.AcceptTcpClient(); + tc.Dispose(); + } + catch + { + break; + } + } + + _mu.EnterWriteLock(); + // Don't null _http — ProfilerAddr etc. still read it. + _mu.ExitWriteLock(); + + _done.Writer.TryWrite(default); + }); + + return null; + } + + // ========================================================================= + // CreateClient / CreateClientInProcess / CreateClientEx (features 3063–3065) + // ========================================================================= + + /// + /// Creates and registers a new client connection from the given TCP client. + /// Mirrors Go Server.createClient(). + /// + private ClientConnection? CreateClient(TcpClient tc) => + CreateClientEx(tc.GetStream(), false); + + /// + /// Creates and registers an in-process client connection. + /// Mirrors Go Server.createClientInProcess(). + /// + private ClientConnection? CreateClientInProcess(Stream nc) => + CreateClientEx(nc, true); + + /// + /// Full client-creation factory: registers with account, sends INFO, starts read/write loops. + /// Mirrors Go Server.createClientEx(). + /// + private ClientConnection? CreateClientEx(Stream nc, bool inProcess) + { + var opts = GetOpts(); + var now = DateTime.UtcNow; + + var c = new ClientConnection(ClientKind.Client, this, nc) + { + InProc = inProcess, + Start = now, + Last = now, + Opts = ClientOptions.Default, + }; + + var globalAcc = GlobalAccount(); + if (globalAcc != null) + { + try { c.RegisterWithAccount(globalAcc); } + catch (Exception ex) + { + c.ReportErrRegisterAccount(globalAcc, ex); + return null; + } + } + + ServerInfo info; + bool authRequired; + + _mu.EnterWriteLock(); + info = CopyInfo(); + + if (NonceRequired()) + { + var raw = new byte[24]; // nonceLen + GenerateNonce(raw); + info.Nonce = System.Text.Encoding.ASCII.GetString(raw); + } + + c.Nonce = info.Nonce != null ? System.Text.Encoding.ASCII.GetBytes(info.Nonce) : null; + authRequired = info.AuthRequired; + + // If auth is required but there is a no_auth_user, don't require it. + if (info.AuthRequired + && !string.IsNullOrEmpty(opts.NoAuthUser) + && opts.NoAuthUser != _sysAccOnlyNoAuthUser) + { + info.AuthRequired = false; + } + + // For in-process connections with TLS required, mark as available but not required. + if (inProcess && info.TlsRequired) + { + info.TlsRequired = false; + info.TlsAvailable = true; + } + + _totalClients++; + _mu.ExitWriteLock(); + + lock (c) + { + if (authRequired) + c.Flags |= ClientFlags.ExpectConnect; + + c.InitClient(); + c.Debugf("Client connection created"); + + // Send INFO to the client immediately (TLS-first handling omitted — stub). + var infoBytes = c.GenerateClientInfoJSON(info, !opts.ProxyProtocol); + c.SendProtoNow(infoBytes.Span); + } + + // Register with the server. + _mu.EnterWriteLock(); + if (!IsRunning() || _ldm) + { + if (IsShuttingDown()) + nc.Close(); + _mu.ExitWriteLock(); + return c; + } + + if (opts.MaxConn > 0 && _clients.Count >= opts.MaxConn) + { + _mu.ExitWriteLock(); + c.MaxConnExceeded(); + return null; + } + + _clients[c.Cid] = c; + _mu.ExitWriteLock(); + + lock (c) + { + if (c.IsClosed()) + { + c.CloseConnection(ClosedState.WriteError); + return null; + } + + if (authRequired) + c.SetAuthTimer(TimeSpan.FromSeconds(opts.AuthTimeout)); + + c.SetPingTimer(); + + // Start read/write loops as goroutines. + byte[]? pre = null; + StartGoRoutine(() => c.ReadLoop(pre)); + StartGoRoutine(() => c.WriteLoop()); + } + + return c; + } + + // ========================================================================= + // SaveClosedClient (feature 3066) + // ========================================================================= + + /// + /// Records a closed client in the ring buffer for the /connz monitoring endpoint. + /// Mirrors Go Server.saveClosedClient(). + /// + internal void SaveClosedClient(ClientConnection c, Stream? nc, ClosedState reason) + { + var now = DateTime.UtcNow; + + // Stub: full implementation requires connInfo.Fill() from session 17. + // For now create a minimal ClosedClient record. + AccountDisconnectEvent(c, now, reason.ToString()); + + string user = string.Empty; + string acc = string.Empty; + + lock (c) + { + // acc name if not the global account. + if (c.Account?.Name != null && c.Account.Name != ServerConstants.DefaultGlobalAccount) + acc = c.Account.Name; + } + + var cc = new ClosedClient + { + User = user, + Account = acc, + }; + + _mu.EnterWriteLock(); + _closed.Append(cc); + _mu.ExitWriteLock(); + } + + // ========================================================================= + // Connect URL management (features 3067–3070) + // ========================================================================= + + /// + /// Adds URLs to the connect-URL maps and sends async INFO to clients. + /// Server lock must be held on entry. + /// Mirrors Go Server.addConnectURLsAndSendINFOToClients(). + /// + internal void AddConnectURLsAndSendINFOToClients(string[] curls, string[] wsurls) => + UpdateServerINFOAndSendINFOToClients(curls, wsurls, add: true); + + /// + /// Removes URLs from the connect-URL maps and sends async INFO to clients. + /// Server lock must be held on entry. + /// Mirrors Go Server.removeConnectURLsAndSendINFOToClients(). + /// + internal void RemoveConnectURLsAndSendINFOToClients(string[] curls, string[] wsurls) => + UpdateServerINFOAndSendINFOToClients(curls, wsurls, add: false); + + /// + /// Updates the connect-URL maps and, if changed, sends async INFO to clients. + /// Server lock must be held on entry. + /// Mirrors Go Server.updateServerINFOAndSendINFOToClients(). + /// + private void UpdateServerINFOAndSendINFOToClients(string[] curls, string[] wsurls, bool add) + { + bool UpdateMap(string[] urls, RefCountedUrlSet m) + { + bool wasUpdated = false; + foreach (var url in urls) + { + if (add && m.AddUrl(url)) wasUpdated = true; + if (!add && m.RemoveUrl(url)) wasUpdated = true; + } + return wasUpdated; + } + + bool cliUpdated = UpdateMap(curls, _clientConnectUrlsMap); + bool wsUpdated = UpdateMap(wsurls, _websocket.ConnectUrlsMap); + + void UpdateInfo(ref string[]? infoUrls, List localUrls, RefCountedUrlSet m) + { + var list = new List(localUrls); + list.AddRange(m.GetAsStringSlice()); + infoUrls = list.Count > 0 ? list.ToArray() : null; + } + + if (cliUpdated) + { + var infoUrls = _info.ClientConnectUrls; + UpdateInfo(ref infoUrls, _clientConnectUrls, _clientConnectUrlsMap); + _info.ClientConnectUrls = infoUrls; + } + + if (wsUpdated) + { + // WebSocket connect URLs stub — session 23. + } + + if (cliUpdated || wsUpdated) + SendAsyncInfoToClients(cliUpdated, wsUpdated); + } + + // ========================================================================= + // TLS version helpers (features 3072–3073) + // ========================================================================= + + /// + /// Returns the TLS version string for the given TLS version number. + /// Mirrors Go tlsVersion(). + /// + internal static string TlsVersion(ushort ver) => ver switch + { + 0x0301 => "1.0", + 0x0302 => "1.1", + 0x0303 => "1.2", + 0x0304 => "1.3", + _ => $"Unknown [0x{ver:x}]", + }; + + /// + /// Parses a TLS version string to a version number. + /// Mirrors Go tlsVersionFromString(). + /// + internal static (ushort ver, Exception? err) TlsVersionFromString(string ver) => ver switch + { + "1.0" => (0x0301, null), + "1.1" => (0x0302, null), + "1.2" => (0x0303, null), + "1.3" => (0x0304, null), + _ => (0, new ArgumentException($"unknown version: {ver}")), + }; + + // ========================================================================= + // Connect URL helpers (features 3074–3076) + // ========================================================================= + + /// + /// Returns client connect URLs based on server options. + /// Server lock must be held on entry. + /// Mirrors Go Server.getClientConnectURLs(). + /// + private List GetClientConnectURLs() + { + var opts = GetOpts(); + var (urls, _) = GetConnectURLs(opts.ClientAdvertise, opts.Host, opts.Port); + return urls; + } + + /// + /// Returns connect URLs for the given advertise/host/port. + /// Mirrors Go Server.getConnectURLs(). + /// + private (List urls, Exception? err) GetConnectURLs(string advertise, string host, int port) + { + var urls = new List(); + + if (!string.IsNullOrEmpty(advertise)) + { + var (h, p, err) = ServerUtilities.ParseHostPort(advertise, port); + if (err != null) return (urls, err); + urls.Add($"{h}:{p}"); + } + else + { + var sPort = port.ToString(); + var (isAny, ips, _) = GetNonLocalIPsIfHostIsIPAny(host, all: true); + foreach (var ip in ips) + urls.Add($"{ip}:{sPort}"); + + if (!isAny || urls.Count == 0) + { + if (host != "0.0.0.0" && host != "::") + urls.Add($"{host}:{sPort}"); + else + Errorf("Address {0} can not be resolved properly", host); + } + } + + return (urls, null); + } + + /// + /// Returns the non-local IPs if is 0.0.0.0 or ::. + /// Returns (isAny, ips, err). + /// Mirrors Go Server.getNonLocalIPsIfHostIsIPAny(). + /// + private (bool isAny, List ips, Exception? err) GetNonLocalIPsIfHostIsIPAny(string host, bool all) + { + if (!IPAddress.TryParse(host, out var ip)) + return (false, [], null); + + if (!ip.Equals(IPAddress.Any) && !ip.Equals(IPAddress.IPv6Any)) + return (false, [], null); + + Debugf("Get non local IPs for \"{0}\"", host); + + var ips = new List(); + foreach (var iface in System.Net.NetworkInformation.NetworkInterface.GetAllNetworkInterfaces()) + { + foreach (var addr in iface.GetIPProperties().UnicastAddresses) + { + var a = addr.Address; + if (!a.IsIPv6LinkLocal && !a.Equals(IPAddress.Loopback) && + !a.Equals(IPAddress.IPv6Loopback) && + a.AddressFamily is System.Net.Sockets.AddressFamily.InterNetwork + or System.Net.Sockets.AddressFamily.InterNetworkV6) + { + Debugf(" ip={0}", a); + ips.Add(a.ToString()); + if (!all) return (true, ips, null); + } + } + } + + return (true, ips, null); + } + + // ========================================================================= + // Port helpers (features 3077–3080) + // ========================================================================= + + /// + /// Resolves host-port strings from a TcpListener, expanding unspecified addresses. + /// Mirrors Go resolveHostPorts(). + /// + private static List ResolveHostPorts(TcpListener l) + { + var result = new List(); + if (l.LocalEndpoint is not IPEndPoint ep) return result; + + var port = ep.Port.ToString(); + + if (ep.Address.Equals(IPAddress.Any) || ep.Address.Equals(IPAddress.IPv6Any)) + { + foreach (var iface in System.Net.NetworkInformation.NetworkInterface.GetAllNetworkInterfaces()) + foreach (var addr in iface.GetIPProperties().UnicastAddresses) + result.Add($"{addr.Address}:{port}"); + } + else + { + result.Add($"{ep.Address}:{port}"); + } + + return result; + } + + /// + /// Formats a listener's addresses as protocol URLs. + /// Mirrors Go formatURL(). + /// + private static string[] FormatUrl(string protocol, TcpListener l) + { + var hostPorts = ResolveHostPorts(l); + for (int i = 0; i < hostPorts.Count; i++) + hostPorts[i] = $"{protocol}://{hostPorts[i]}"; + return hostPorts.ToArray(); + } + + /// + /// Returns port info once all expected listeners are up, or null if timeout. + /// Mirrors Go Server.PortsInfo(). + /// + public Ports? PortsInfo(TimeSpan maxWait) + { + if (!ReadyForListeners(maxWait)) return null; + + var opts = GetOpts(); + + _mu.EnterReadLock(); + bool tlsRequired = _info.TlsRequired; + var listener = _listener; + var httpListener = _http; + var clusterListener = _routeListener; + var profileListener = _profiler; + _mu.ExitReadLock(); + + var ports = new Ports(); + + if (listener != null) + { + var natsProto = tlsRequired ? "tls" : "nats"; + ports.Nats = FormatUrl(natsProto, listener); + } + + if (httpListener != null) + { + var monProto = opts.HttpsPort != 0 ? "https" : "http"; + ports.Monitoring = FormatUrl(monProto, httpListener); + } + + if (clusterListener != null) + { + var clusterProto = opts.Cluster.TlsConfig != null ? "tls" : "nats"; + ports.Cluster = FormatUrl(clusterProto, clusterListener); + } + + if (profileListener != null) + ports.Profile = FormatUrl("http", profileListener); + + return ports; + } + + /// + /// Returns the path of the ports file. + /// Mirrors Go Server.portFile(). + /// + private string PortFile(string dirHint) + { + var dirname = GetOpts().PortsFileDir; + if (!string.IsNullOrEmpty(dirHint)) dirname = dirHint; + if (string.IsNullOrEmpty(dirname)) return string.Empty; + + var exe = System.Diagnostics.Process.GetCurrentProcess().ProcessName; + var pid = System.Diagnostics.Process.GetCurrentProcess().Id; + return System.IO.Path.Combine(dirname, $"{exe}_{pid}.ports"); + } + + /// + /// Deletes the ports file. + /// Mirrors Go Server.deletePortsFile(). + /// + private void DeletePortsFile(string hintDir) + { + var file = PortFile(hintDir); + if (string.IsNullOrEmpty(file)) return; + try { System.IO.File.Delete(file); } + catch (Exception ex) { Errorf("Error cleaning up ports file {0}: {1}", file, ex.Message); } + } + + /// + /// Writes a Ports JSON file if a ports-file directory is configured. + /// Mirrors Go Server.logPorts(). + /// + private void LogPorts() + { + var opts = GetOpts(); + var portsFile = PortFile(opts.PortsFileDir); + if (string.IsNullOrEmpty(portsFile)) return; + + _ = Task.Run(() => + { + var info = PortsInfo(TimeSpan.FromSeconds(5)); + if (info == null) + { + Errorf("Unable to resolve the ports in the specified time"); + return; + } + + try + { + var data = JsonSerializer.SerializeToUtf8Bytes(info); + System.IO.File.WriteAllBytes(portsFile, data); + } + catch (Exception ex) + { + Errorf("Error writing ports file ({0}): {1}", portsFile, ex.Message); + } + }); + } + + // ========================================================================= + // readyForListeners / serviceListeners (features 3127–3128) + // ========================================================================= + + /// + /// Waits until all expected listeners are resolved or expires. + /// Mirrors Go Server.readyForListeners(). + /// + private bool ReadyForListeners(TimeSpan dur) + { + var end = DateTime.UtcNow.Add(dur); + while (DateTime.UtcNow < end) + { + _mu.EnterReadLock(); + var listeners = ServiceListeners(); + _mu.ExitReadLock(); + + if (listeners.Length == 0) return false; + + bool ok = true; + foreach (var l in listeners) + { + if (l == null) { ok = false; break; } + } + + if (ok) return true; + + try + { + Task.Delay(TimeSpan.FromMilliseconds(25), _quitCts.Token) + .GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + return false; + } + } + + return false; + } + + /// + /// Returns the list of expected listeners (may include nulls for not-yet-bound listeners). + /// Server lock must be held on entry. + /// Mirrors Go Server.serviceListeners(). + /// + private TcpListener?[] ServiceListeners() + { + var opts = GetOpts(); + var list = new List { _listener }; + + if (opts.Cluster.Port != 0) list.Add(_routeListener); + if (opts.HttpPort != 0 || opts.HttpsPort != 0) list.Add(_http); + if (opts.ProfPort != 0) list.Add(_profiler); + // WebSocket listener — session 23. + + return list.ToArray(); + } + + // ========================================================================= + // acceptError (feature 3129) + // ========================================================================= + + /// + /// Handles a transient accept error by sleeping and doubling the delay. + /// Returns a negative TimeSpan if the server has stopped. + /// Mirrors Go Server.acceptError(). + /// + private TimeSpan AcceptError(string acceptName, Exception err, TimeSpan tmpDelay) + { + if (!IsRunning()) return TimeSpan.FromSeconds(-1); + + if (err is SocketException se && se.SocketErrorCode == SocketError.WouldBlock) + { + Errorf("Temporary {0} Accept Error({1}), sleeping {2}ms", + acceptName, err.Message, (int)tmpDelay.TotalMilliseconds); + + try + { + Task.Delay(tmpDelay, _quitCts.Token).GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + return TimeSpan.FromSeconds(-1); + } + + tmpDelay *= 2; + if (tmpDelay > AcceptMaxSleep) tmpDelay = AcceptMaxSleep; + } + else + { + Errorf("{0} Accept error: {1}", acceptName, err.Message); + } + + return tmpDelay; + } + + // ========================================================================= + // GetRandomIP (feature 3130) + // ========================================================================= + + private static readonly Exception ErrNoIPAvail = new InvalidOperationException("no IP available"); + + /// + /// Resolves a hostname to a random IP address, excluding addresses in . + /// Mirrors Go Server.getRandomIP(). + /// + internal async Task<(string address, Exception? err)> GetRandomIP( + INetResolver resolver, + string url, + HashSet? excluded = null) + { + var colonIdx = url.LastIndexOf(':'); + if (colonIdx < 0) return (url, new FormatException($"invalid host:port: {url}")); + + var host = url[..colonIdx]; + var port = url[(colonIdx + 1)..]; + + // If already an IP address, return as-is. + if (IPAddress.TryParse(host, out _)) return (url, null); + + string[] ips; + try + { + ips = await resolver.LookupHostAsync(host, _quitCts.Token); + } + catch (Exception ex) + { + return (string.Empty, new InvalidOperationException($"lookup for host \"{host}\": {ex.Message}", ex)); + } + + if (excluded != null && excluded.Count > 0) + { + var filtered = new List(ips.Length); + foreach (var ip in ips) + { + var addr = $"{ip}:{port}"; + if (!excluded.Contains(addr)) + filtered.Add(ip); + } + + ips = filtered.ToArray(); + if (ips.Length == 0) return (string.Empty, ErrNoIPAvail); + } + + if (ips.Length == 0) + { + Warnf("Unable to get IP for {0}, will try with {1}", host, url); + return (url, null); + } + + var chosen = ips.Length == 1 ? ips[0] : ips[Random.Shared.Next(ips.Length)]; + return ($"{chosen}:{port}", null); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index b3995ef..bcef42a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -76,7 +76,7 @@ public sealed partial class NatsServer : INatsServer private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); private readonly ReaderWriterLockSlim _reloadMu = new(LockRecursionPolicy.SupportsRecursion); - private ServerInfo _info = new(); + internal ServerInfo _info = new(); private string _configFile = string.Empty; // ========================================================================= @@ -104,6 +104,34 @@ public sealed partial class NatsServer : INatsServer private System.Net.Sockets.TcpListener? _listener; private Exception? _listenerErr; + // HTTP monitoring listener + private System.Net.Sockets.TcpListener? _http; + + // Route listener + private System.Net.Sockets.TcpListener? _routeListener; + private Exception? _routeListenerErr; + + // Gateway listener + private System.Net.Sockets.TcpListener? _gatewayListener; + private Exception? _gatewayListenerErr; + + // Leaf-node listener + private System.Net.Sockets.TcpListener? _leafNodeListener; + private Exception? _leafNodeListenerErr; + + // Profiling listener + private System.Net.Sockets.TcpListener? _profiler; + + // Accept-loop done channel — each accept loop sends true when it exits. + private readonly System.Threading.Channels.Channel _done = + System.Threading.Channels.Channel.CreateUnbounded(); + + // Lame-duck channel — created in lameDuckMode, receives one signal per accept loop. + private System.Threading.Channels.Channel? _ldmCh; + + // The no-auth user that only the system account can use (auth session). + private string _sysAccOnlyNoAuthUser = string.Empty; + // ========================================================================= // Accounts // ========================================================================= @@ -153,7 +181,7 @@ public sealed partial class NatsServer : INatsServer private readonly object _grMu = new(); private bool _grRunning; private readonly Dictionary _grTmpClients = []; - private readonly SemaphoreSlim _grWg = new(1, 1); // simplified wg + private readonly WaitGroup _grWg = new(); // ========================================================================= // Cluster name (separate lock) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs index 55b8c0f..5f73d89 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServerTypes.cs @@ -15,6 +15,7 @@ using System.Text.Json.Serialization; using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -84,6 +85,9 @@ public sealed class ServerInfo // LeafNode-specific [JsonPropertyName("leafnode_urls")] public string[]? LeafNodeUrls { get; set; } + + /// Returns a shallow clone of this . + internal ServerInfo ShallowClone() => (ServerInfo)MemberwiseClone(); } // ============================================================================ @@ -200,9 +204,6 @@ public static class CompressionMode // These stubs will be replaced with full implementations in later sessions. // They are declared here to allow the NatsServer class to compile. -/// Stub for reference-counted URL set (session 09/12). -internal sealed class RefCountedUrlSet : Dictionary { } - /// Stub for the system/internal messaging state (session 12). internal sealed class InternalState { @@ -254,8 +255,14 @@ internal interface IOcspResponseCache { } /// Stub for leaf node config (session 15). internal sealed class LeafNodeCfg { } -/// Stub for network resolver (session 09). -internal interface INetResolver { } +/// +/// Network resolver used by . +/// Mirrors Go netResolver interface in server.go. +/// +internal interface INetResolver +{ + Task LookupHostAsync(string host, CancellationToken ct = default); +} /// Factory for rate counters. internal static class RateCounterFactory @@ -267,6 +274,108 @@ internal static class RateCounterFactory /// Stub for RaftNode (session 20). public interface IRaftNode { } +// ============================================================================ +// Session 10: Ports, TlsMixConn, CaptureHttpServerLog +// ============================================================================ + +/// +/// Describes the URLs at which this server can be contacted. +/// Mirrors Go Ports struct in server.go. +/// +public sealed class Ports +{ + public string[]? Nats { get; set; } + public string[]? Monitoring { get; set; } + public string[]? Cluster { get; set; } + public string[]? Profile { get; set; } + public string[]? WebSocket { get; set; } +} + +/// +/// Wraps a with an optional "pre-read" buffer that is +/// drained first, then falls through to the underlying stream. +/// Used when we peek at the first bytes of a connection to detect TLS. +/// Mirrors Go tlsMixConn. +/// +internal sealed class TlsMixConn : Stream +{ + private readonly Stream _inner; + private System.IO.MemoryStream? _pre; + + public TlsMixConn(Stream inner, byte[] preRead) + { + _inner = inner; + if (preRead.Length > 0) + _pre = new System.IO.MemoryStream(preRead, writable: false); + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => _inner.CanWrite; + public override long Length => throw new NotSupportedException(); + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (_pre is { } pre) + { + var n = pre.Read(buffer, offset, count); + if (pre.Position >= pre.Length) + _pre = null; + return n; + } + return _inner.Read(buffer, offset, count); + } + + public override void Write(byte[] buffer, int offset, int count) => + _inner.Write(buffer, offset, count); + + public override void Flush() => _inner.Flush(); + + public override long Seek(long offset, SeekOrigin origin) => + throw new NotSupportedException(); + + public override void SetLength(long value) => + throw new NotSupportedException(); + + protected override void Dispose(bool disposing) + { + if (disposing) { _pre?.Dispose(); _inner.Dispose(); } + base.Dispose(disposing); + } +} + +/// +/// Captures HTTP server log lines and routes them through the server's +/// error logger. +/// Mirrors Go captureHTTPServerLog in server.go. +/// +internal sealed class CaptureHttpServerLog : System.IO.TextWriter +{ + private readonly NatsServer _server; + private readonly string _prefix; + + public CaptureHttpServerLog(NatsServer server, string prefix) + { + _server = server; + _prefix = prefix; + } + + public override System.Text.Encoding Encoding => System.Text.Encoding.UTF8; + + public override void Write(string? value) + { + if (value is null) return; + // Strip leading "http:" prefix that .NET's HttpListener sometimes emits. + var msg = value.StartsWith("http:", StringComparison.Ordinal) ? value[6..] : value; + _server.Errorf("{0}{1}", _prefix, msg.TrimEnd('\r', '\n')); + } +} + /// /// Stub for JWT account claims (session 06/11). /// Mirrors Go jwt.AccountClaims from nats.io/jwt/v2. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Properties/AssemblyInfo.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Properties/AssemblyInfo.cs index f2b3639..69622f0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Properties/AssemblyInfo.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Properties/AssemblyInfo.cs @@ -2,3 +2,4 @@ using System.Runtime.CompilerServices; [assembly: InternalsVisibleTo("ZB.MOM.NatsNet.Server.Tests")] [assembly: InternalsVisibleTo("ZB.MOM.NatsNet.Server.IntegrationTests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] // required for NSubstitute to proxy internal interfaces diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs index ddabf16..5f36584 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs @@ -12,11 +12,16 @@ // limitations under the License. // // Adapted from server/server_test.go in the NATS server Go source. -// Session 09: standalone unit tests for NatsServer helpers. +// Session 09–10: standalone unit tests for NatsServer helpers. +using System.Text; +using System.Text.Json; using System.Text.RegularExpressions; +using NSubstitute; +using NSubstitute.ExceptionExtensions; using Shouldly; using Xunit; +using ZB.MOM.NatsNet.Server.Auth; namespace ZB.MOM.NatsNet.Server.Tests; @@ -249,3 +254,215 @@ public sealed class ServerTests public void NeedsCompression_S2Fast_ReturnsTrue() => NatsServer.NeedsCompression(CompressionMode.S2Fast).ShouldBeTrue(); } + +// ============================================================================= +// Session 10: Listeners, INFO JSON, TLS helpers, GetRandomIP +// ============================================================================= + +/// +/// Tests for session 10 features: GenerateInfoJson, TlsVersion helpers, +/// CopyInfo, and GetRandomIP. +/// +public sealed class ServerListenersTests +{ + // ========================================================================= + // GenerateInfoJson (feature 3069) — Test ID 2906 + // Mirrors Go TestServerJsonMarshalNestedStructsPanic (guards against + // marshaller panics with nested/nullable structs). + // ========================================================================= + + [Fact] + public void GenerateInfoJson_MinimalInfo_ProducesInfoFrame() + { + var info = new ServerInfo { Id = "TEST", Version = "1.0.0", Host = "0.0.0.0", Port = 4222 }; + var bytes = NatsServer.GenerateInfoJson(info); + var text = Encoding.UTF8.GetString(bytes); + text.ShouldStartWith("INFO {"); + text.ShouldEndWith("}\r\n"); + } + + [Fact] + public void GenerateInfoJson_WithConnectUrls_IncludesUrls() + { + var info = new ServerInfo + { + Id = "TEST", + Version = "1.0.0", + ClientConnectUrls = ["nats://127.0.0.1:4222", "nats://127.0.0.1:4223"], + }; + var text = Encoding.UTF8.GetString(NatsServer.GenerateInfoJson(info)); + text.ShouldContain("connect_urls"); + text.ShouldContain("4222"); + } + + [Fact] + public void GenerateInfoJson_WithSubjectPermissions_DoesNotThrow() + { + // Mirrors Go TestServerJsonMarshalNestedStructsPanic — guards against + // JSON marshaller failures with nested nullable structs. + var info = new ServerInfo + { + Id = "TEST", + Version = "1.0.0", + Import = new SubjectPermission { Allow = ["foo.>"], Deny = ["bar"] }, + Export = new SubjectPermission { Allow = ["pub.>"] }, + }; + var bytes = NatsServer.GenerateInfoJson(info); + bytes.ShouldNotBeEmpty(); + // Strip the "INFO " prefix (5 bytes) and the trailing "\r\n" (2 bytes) to get pure JSON. + var json = Encoding.UTF8.GetString(bytes, 5, bytes.Length - 7); + var doc = JsonDocument.Parse(json); + doc.RootElement.GetProperty("import").ValueKind.ShouldBe(JsonValueKind.Object); + } + + // ========================================================================= + // TlsVersion helpers (features 3079–3080) + // ========================================================================= + + [Theory] + [InlineData(0x0301u, "1.0")] + [InlineData(0x0302u, "1.1")] + [InlineData(0x0303u, "1.2")] + [InlineData(0x0304u, "1.3")] + public void TlsVersion_KnownCodes_ReturnsVersionString(uint ver, string expected) + => NatsServer.TlsVersion((ushort)ver).ShouldBe(expected); + + [Fact] + public void TlsVersion_UnknownCode_ReturnsUnknownLabel() + => NatsServer.TlsVersion(0xFFFF).ShouldStartWith("Unknown"); + + [Theory] + [InlineData("1.0", (ushort)0x0301)] + [InlineData("1.1", (ushort)0x0302)] + [InlineData("1.2", (ushort)0x0303)] + [InlineData("1.3", (ushort)0x0304)] + public void TlsVersionFromString_KnownStrings_ReturnsCode(string input, ushort expected) + { + var (ver, err) = NatsServer.TlsVersionFromString(input); + err.ShouldBeNull(); + ver.ShouldBe(expected); + } + + [Fact] + public void TlsVersionFromString_UnknownString_ReturnsError() + { + var (_, err) = NatsServer.TlsVersionFromString("9.9"); + err.ShouldNotBeNull(); + } + + // ========================================================================= + // CopyInfo (feature 3069) + // ========================================================================= + + [Fact] + public void CopyInfo_DeepCopiesSlices() + { + var (s, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + s.ShouldNotBeNull(); + s!._info.ClientConnectUrls = ["nats://127.0.0.1:4222"]; + + var copy = s.CopyInfo(); + copy.ClientConnectUrls.ShouldNotBeNull(); + copy.ClientConnectUrls!.ShouldBe(["nats://127.0.0.1:4222"]); + // Mutating original slice shouldn't affect the copy. + s._info.ClientConnectUrls = ["nats://10.0.0.1:4222"]; + copy.ClientConnectUrls[0].ShouldBe("nats://127.0.0.1:4222"); + } + + // ========================================================================= + // GetRandomIP (feature 3141) — Test ID 2895 + // Mirrors Go TestGetRandomIP. + // ========================================================================= + + private static NatsServer MakeServer() + { + var (s, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + return s!; + } + + [Fact] + public async Task GetRandomIP_NoPort_ReturnsFormatError() + { + var s = MakeServer(); + var resolver = Substitute.For(); + var (_, err) = await s.GetRandomIP(resolver, "noport"); + err.ShouldNotBeNull(); + err!.Message.ShouldContain("port"); + } + + [Fact] + public async Task GetRandomIP_ResolverThrows_PropagatesError() + { + var s = MakeServer(); + var resolver = Substitute.For(); + resolver.LookupHostAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("on purpose")); + var (_, err) = await s.GetRandomIP(resolver, "localhost:4222"); + err.ShouldNotBeNull(); + err!.Message.ShouldContain("on purpose"); + } + + [Fact] + public async Task GetRandomIP_EmptyIps_ReturnsFallbackUrl() + { + var s = MakeServer(); + var resolver = Substitute.For(); + resolver.LookupHostAsync("localhost", Arg.Any()) + .Returns(Task.FromResult(Array.Empty())); + var (addr, err) = await s.GetRandomIP(resolver, "localhost:4222"); + err.ShouldBeNull(); + addr.ShouldBe("localhost:4222"); + } + + [Fact] + public async Task GetRandomIP_SingleIp_ReturnsMappedAddress() + { + var s = MakeServer(); + var resolver = Substitute.For(); + resolver.LookupHostAsync("localhost", Arg.Any()) + .Returns(Task.FromResult(new[] { "1.2.3.4" })); + var (addr, err) = await s.GetRandomIP(resolver, "localhost:4222"); + err.ShouldBeNull(); + addr.ShouldBe("1.2.3.4:4222"); + } + + [Fact] + public async Task GetRandomIP_MultipleIps_AllSelectedWithinRange() + { + var s = MakeServer(); + var resolver = Substitute.For(); + resolver.LookupHostAsync("localhost", Arg.Any()) + .Returns(Task.FromResult(new[] { "1.2.3.4", "2.2.3.4", "3.2.3.4" })); + + var dist = new int[3]; + for (int i = 0; i < 100; i++) + { + var (addr, err) = await s.GetRandomIP(resolver, "localhost:4222"); + err.ShouldBeNull(); + dist[int.Parse(addr[..1]) - 1]++; + } + + // Each IP should appear at least once and no single IP should dominate. + foreach (var d in dist) + d.ShouldBeGreaterThan(0); + } + + [Fact] + public async Task GetRandomIP_ExcludedIp_NeverReturned() + { + var s = MakeServer(); + var resolver = Substitute.For(); + resolver.LookupHostAsync("localhost", Arg.Any()) + .Returns(Task.FromResult(new[] { "1.2.3.4", "2.2.3.4", "3.2.3.4" })); + + var excluded = new HashSet { "1.2.3.4:4222" }; + for (int i = 0; i < 100; i++) + { + var (addr, err) = await s.GetRandomIP(resolver, "localhost:4222", excluded); + err.ShouldBeNull(); + addr.ShouldNotBe("1.2.3.4:4222"); + } + } +} diff --git a/porting.db b/porting.db index aac4c6e7f6b109460899dec6c7a86aa3af565680..c19893cd8d1841b99ee3979901c4e320b4d874b1 100644 GIT binary patch delta 7181 zcmZ8m349b~vhQnVGBaJ>6A}nR(n*Fu?jaW;36PKwkRx0XSrkztw;%=x1W+OANdOhm zfUea~R}H~cGfx*LN)vhd|^o0DaG*Ny>S}yOA zQ4YyV7Q2eZ|W8 z^WaaUY7p=WH@9)=vODLmobUaVKB1rY{(~uY9y+lvpOuSZ$?#QZ|Ka_t(8~T@cNB6T zvQBZlI9$xvyNkFQ4=Z6e&nMybDo)XVH_t$YpzY57)!xaw(jIkForCe5!~W+0S~8m@0Kik4hV* zHPT{fwlw9%{(K@^C5ZgL_)C_Zf)b0uTf7##2k);o)Pv<5{{~*r<;IpDZQoq6p&xAM zdmFlJL*LoZw>I>R4gKAQ{-$5z`5zrE;=tIC#efZ^+fbSfrP`2ULn$_tY(oQVsKsxC z{cR}8hWgo1UmHrap#&T1V?*&aHE@<)G!H<)Rfw z%T3Ee%d78~#QN7Y=mZZXT=W`ef{`lLm6)le*&x*vMkrBXCXszmLS1IsNNLVLr<- z3~qlMn(*9n{yw<*Q}F`cvDYu-wnt$BUieI$iKjTf%!#Xwe{axx0^$PpaJR^|4rW6P zFq0W(vOJw*rE;hh*Lb$NrMMfNPdi?7IQXxqTh26EOO`&l0ZLhc_b@F6zQ-4G;F3NJ zo2SkX`W22-Fl8w`gP$LDXJg|hf(t!w3#npp;qbzu;U#6%p*}OjKJkSH;3^WUQKts| zB^;~Zw%cJJ7LSH3^vweYHqL?kh?rWAE3xR86h9uH1EY|8h*utazzvu`7ltJD>Q-lA zQ#`|#t_u3I428?*!Ya#-<9`(e>^l!K6MDIArMQBgUeT@uQPtfE`t9>z6l<_^@!k25 z8{bP*i*Y$Atpq$!1BsZj07hW_0e2Rv3!tEXFDul7%Sc^mTc!1E{p14hvsgTa%xK-> z77-RfPM=i|C5iDWxgXP-INq8smYSr4HztqroZ3*nc15+Ry z&)o?{alJO;^hVe>){)CdObNrUXWw8-V7~Z*_g8Pa`*88+JzuZ8(xwp$VFYjIIn-SjpPh1ZORq8&*O^RP9Zg zo79%9#nx$*(nnXqLMtYwgJ-OQK~ZamXmLa>g&>&8(y@)fs5it z+*Piaoyly_iyGJ`SWigO29Wj%Y8rk%hJ50F7&vR3rV0zv$C3nXK|Hkccc3_owc zpB<^un9bJWjhMAqG5bdwEVi4^&wipx^V339-tm&9=fj|fULY;JG>thReD74EX001r^|?NcGfnyFesEO*uY#5GfdxLJ{W(x1vs=UKO)DYV(KYLaof67lCP z;KQ#r!y0=W3L?P^`Lq-gs8f}ozmhgv7O1p>w|5I2@?Rc=wRTAn>5I%rMG3jJWD*~z zQe~?!eD$|2@c4g99D4```Fh=9gtE&~6`dWDFs+*p&?w& z3Mvg|d&VbsP++#Tki_LYmlvy_f&8dvTeKV^*R=VqNkVUb1`@(u;LA=(z}>&HF$K6< z%O(qIr~sQ|M`I^Uwu%mMD?rvTZLmqEG&0E-pMwn+`FM7k7a!gkOP;6=B65c6 z4txLoov_p*o9$xZ^ALz?UZ`adxj;<|AG6Nqq1`$wzYPZ`-mnV>M#!N)+CU=LsR1jS z@u6L?J(7+I_`)tKsvn$Vqs1!}&;le;Z?wc(id|Wt7Psz#R6MyG9*mLPx+ga4leBaa znyd~E`bQZ7e}51BF@}734;f}O$cW=ZDOwsyOjUERbzxc}{$UqIP}&FgMWo6jn^4zk zsl=^OdxPnFA*KRbfgjp?oPh13Vdg5sf5hiG&oWo#$ix8Q$zd# z_!&pvO|q0VUMxKrdla&@0@7YYy|u{DzTzNMS*`NSK^Pv9+9mByX-|Gn}djH z>iEMjIC{8qw8B_Nv>zr%%sE6xH_V}C@{hxi6)}fgxW(|s{jyr4m6KH3svwxon4v?1 zHI7W1PQydD9_x(hS{ZSJjX^NSBIO7yu}wtu{-+`?8Frb5FWAr_%h^1Dto)E4JC5zh7xNgE(?^RPlZJN z+RLz+wHlz=ZC-gbwy9-mC1f(qM%Hzs=T)e&PTRtxkQen`fmTek$iVmfQDfjU2Dkyo zU`SOYJH0P3B9p~Pk3*-}?CFj>>00KT>WJgxxuwhx^o8}Kj?>1GwmN#K(SuMw4l`py zOjETBN4*w%Gg+-&M|&F=q6TWT9ev|#5XV}1c6@1?50Cyi_6n7*jV1MzlU8jy@+M82 z+dh`VUJJYxGa|P&X=6xp(FW=aMj_K1-hzi&D`@^xkP``7s8*{b;)oEr*9`8gQ*a)y z-$q)gm`gb4G~`Aqe@M|r6S+cN8cyPar{Ou{UY3Imr)dCt@NF0r)qI0iL*$YWb)CV+ z)$`=r@SR~nHh%F=j0Nq9TD4)ph8dOxdd9o3JKQ0DdM|eWb=oM}e_4GvYbxKT%a$3e zcixY=j#4b%GIIQNUno>C;@C{bo$d0mr>`Vb66lxpIM23M0{X1_sFLG;KVY zUr*sPDm0c~gj>Ql;EPZm9e}Q#f%O6>&eqp|2p62jYvXQXJD+;gRazZAzm$rvJQMoj zJ)c6W<84cs+y?`2?2a_*AaddQ@tW6-Jrh-w=`(o1t zm}DKPuP;EkVC28va}k7w$W0Q{>*I4a6;(@mQMG)ok^NP7?3Z~~a8x-8G46o;Pn|y~ zU*hoKJa4V}qFbI7`+CMZbM=G0t2fe{l&7}(%Up)dWy6c~Pq+HBC83>f zVz9efD#z+hzdvCd41>Wy?_q z@%xiYU3UnmWm&EQNA2j`Nc9JKm|`C{d5SqVm}H9v?kn_YvfPQ`mD zOcnHAz})y%R%x(urB@4o#?=HhvLLVf|4H1mnw}R0qfSAh*nC@blc*F&yUkEs zOZsWLElo1gw@XweW1lDK-J=hbVUa4UF2vQOw1N(>HD7xqWjS^o^M*V6E=kEYkLIQx zS)r9mD~(pV-XjM*-Tf9bo=NT%a(bNVdfK_%Q74S$*RYHAl*@rS*0{p8dmfvWK*#9j z_T)0S*(hXdz7MQqV-C`ke+HbGbtN#N_cU!aIn0^u9N-jx-@~WrS^Ed8yipiL{0aEO c*dy{ioK&lS;O0~LLrZoP^7^7z25bDk0q&&csQ>@~ delta 5484 zcmZ8ld3+RAwyv$KyQ=F}b~;(2nG-elqcZ0FvvO%ipcikjf_8dk06Zt-WvwrxmATP5BS}GD&M{5+;hHj z&Ud@13D_oFs}qREFx(i%`M0J^A*=1KdTV2DtXf3B-BzE~ZmC7;5bcCILwipHEuu}; zp47^;?pi<-)Hd~k`kC6QzOlDwi(A#ggb#8PYNt-0Qa!U;En7EaSHIrh`}j0&TJ6kd z>t~Lun^`@P9K>@kJoHi7fFDmi^wHbA_p@EySS`K%soram*8Q!l(8S&`UAf#jRz&lg z)mO}`VXjnWiy&OZ_JrJh8ZjsI=Jv9J`Z1G5KW17V;d?JXfb(92ZoRoI0ge8S{;k~J zkkpU6b|j9w$K2$K7-5&tBCHisg&0BP|BJt|?>ZmDcA%!zWom<3qmEHWs3mF-HC^oh zuZOvTs*+68D3;h~7*Kc#-$ zqJG@4{rH9F|L!s~_h&@&rf*+{hlNQWWbEh2-)+GEchCF;H@qPlU`f%399-DYTY!JZ47DDm!?y#Fw~@gOUk8&HURp%14|){=i-| z!@k7xto0RETvV=bYCen`*r6USUs6Ux+c7N}&U1kQ=DieU61)GZ!Zu~79T~z5XPDty zB~iS$xLi%1gYo-2CPRryG?+1oWI|P}Ajw5ghh~eH1VgYW+Z}5jo<#bv!2KI%pO-nA zWLf9om1p32eMl&f!?9`b++;EsHWP0$ln;;zT$qgR(E}w8rsIM6i~CrH&2E1nUKTcQ zR5PIbp5TJ9H6+^tYLlQAfqj-twNErR)sRLO{<1%q1*KC+j%{@aOhcAuc60=0@)N;Ih8D@(aQpU8b+$+pR6Ru!xY4n>183%Psx6G zVjB4)aGwV=ptV5~A-;3=Ku*vIXu}K{P4pca zgtob)t96t##*oltUSkNxrF;Bv|Cn2ZvUy}2jKAYfhSl*&K45F*kQz2Th|4D-l25Oo z%qRUJdO>^^%$iSn*cr?>+z3gj2oWoTAI~S{HY9%m>0&2ulp&&9DZ1uhW-Y)QT8WuH zpLjrBh*_rGiuM^U1Wt&I3WK7Eei!yP$#SBGpJ7d_w*Cd36N7d=SG z?+gvElrzi0ot*Xm5;AGFLrSUC@FH#i@~`44IR0u^{I^W}OHa$LPOla$chd9sLb2wTd1hW}RXElpzXyIWnQ;s)*q_M&=|e5ZWs zJ;?pU^@CxmkvuqKx+QisZLt%55zUCRp!uOdh;r<|?m&q6-7TpLh3I)MZHL4M4QH!D~JT|zeo~o4nJu`BQ9rwlkmaIX%v2|!0XFt!jVbqqLSbAM%+da z9Uh>^g&;gj^21Ln$qVgb_Q6Xe&8kM(Z*)T7P#s(E*_EOn{`C@B(+)^pM(&?{8GWF& z8?6`#h#sUTgaXAD_lx1iN|FF6tH}IzyZ*Wg+X3AmMH%t9VX&SA;S)Fk&ryV2{ZXc+_#*Uc>N!UAHz(e;bUeGo>Rj;NR*@#Oj4F#g8 z@u8c^06XO7W|9oaCj?nFve05N_B(r6wQVNLU>$FdND(${ab~H^8C?-rx;S9Zdb4c{ zaXF14VJl86st4p6qcei?biY%71#gp?mLLFMr5pw4-$uSspfIx~hZ%q(A-> zCsI2WE0C)Lydu1{-zg5hn~@6(Wmi13?I%4U zdcVgKqy<*{go12uqNYD;6w&&b6(W`_ia$WAY>l$%fJ0FA38N4pq1q6!M#7Z?Fj@In2IG78YGiY1!O67#czq@0DdFZ+_AHQ%d1-;W+#Alk@BWT8Gf1Pg1#4)Eqj z1YqqA$pMZ%gfjT{h6G8DHhLnkY-ye?BF$Zg$SbTpk6K=EJ1!?=~E6oVF~K?^|iZj#?_@XXka zw&+YtCt@?BFxwbcqL9MYU$OkSR+vbt_|bzOJ(@hIs4nMcXrY;J`= z!E9Pnr>oHyxAoE+>16K!>rarK)@uT)PLd+Ky)+tq5Yh)5mi?4yJxNwVrtBOT#wMO!*U8g(;jZX2Ca~+7Xqcfkqj46PP^ZJ+|f3MXUIZxz9IzvXbuFc^R)eM(tU)8ezG+nJ4&w{dB`-Si8$~Cz?+D(ofPrKJx>xYBp z-|y3K`EW(yipJ%~6@x1lR|i~i=I{4)+fLec6DT0!gKmdxbiUDXXccsGWmcSvi zd{ZFD&8eKJZwqu1V8cb_yXJe(Xo$N)UGfOV^>_}bmb~8h;8x_EFvp*XSARUKXXA#<9Osq zlEFpOvaF`eiiJyj@GaQW8>h$ZYFlZTT_dGMC z1W&TCXRH{4bQRsE5l{=E5nV)anIA`8oT3JYL)~NeEdE->FrbT)6b}vb7BomyDz21+ zUE!-^_^UVk{_ov}S0*MG%SeSPys6{^18R*~M;$sW2PSWqPGu zey{$3IsCsLh~~0E4K8RHNLv2u&+x8hmoJDb5myqfWYgtKVXt<0j**8+by~7VcW)GD zxP}S?_y+blv(xRAp={I=vi>PQ!!l1VNhl;wuNA@!x0Oek3v&|t(bDI#>f zn=;sTb&md~w%vYB?47$Mg*PW3$}sr< E1#$X-$N&HU diff --git a/reports/current.md b/reports/current.md index 1894b68..96d6544 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-26 19:18:19 UTC +Generated: 2026-02-26 20:08:24 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-26 19:18:19 UTC | Status | Count | |--------|-------| -| complete | 744 | +| complete | 841 | | n_a | 82 | -| not_started | 2754 | +| not_started | 2657 | | stub | 93 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| complete | 276 | +| complete | 278 | | n_a | 181 | -| not_started | 2576 | +| not_started | 2574 | | stub | 224 | ## Library Mappings (36 total) @@ -36,4 +36,4 @@ Generated: 2026-02-26 19:18:19 UTC ## Overall Progress -**1294/6942 items complete (18.6%)** +**1393/6942 items complete (20.1%)** diff --git a/reports/report_0df93c2.md b/reports/report_0df93c2.md new file mode 100644 index 0000000..96d6544 --- /dev/null +++ b/reports/report_0df93c2.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-26 20:08:24 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| complete | 11 | +| not_started | 1 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 841 | +| n_a | 82 | +| not_started | 2657 | +| stub | 93 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 278 | +| n_a | 181 | +| not_started | 2574 | +| stub | 224 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**1393/6942 items complete (20.1%)**