diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index ab7eb64..be00bb1 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -16,6 +16,7 @@ using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; +using System.Text; namespace ZB.MOM.NatsNet.Server; @@ -1643,7 +1644,50 @@ public sealed class Account : INatsAccount /// internal void UpdateLeafNodes(object sub, int delta) { - // TODO: session 15 — leaf node subscription propagation. + if (delta == 0 || sub is not Subscription s || s.Subject.Length == 0) + return; + + var subject = Encoding.UTF8.GetString(s.Subject); + var queue = s.Queue is { Length: > 0 } ? Encoding.UTF8.GetString(s.Queue) : string.Empty; + + _mu.EnterWriteLock(); + try + { + _rm ??= new Dictionary(StringComparer.Ordinal); + if (!_rm.TryGetValue(subject, out var rc)) + rc = 0; + rc += delta; + if (rc <= 0) + _rm.Remove(subject); + else + _rm[subject] = rc; + + if (!string.IsNullOrEmpty(queue)) + { + _lqws ??= new Dictionary(StringComparer.Ordinal); + var key = $"{subject} {queue}"; + var qw = s.Qw != 0 ? s.Qw : 1; + if (!_lqws.TryGetValue(key, out var qv)) + qv = 0; + qv += delta * qw; + if (qv <= 0) + _lqws.Remove(key); + else + _lqws[key] = qv; + } + } + finally + { + _mu.ExitWriteLock(); + } + + List leafs; + _lmu.EnterReadLock(); + try { leafs = [.. _lleafs]; } + finally { _lmu.ExitReadLock(); } + + foreach (var leaf in leafs) + leaf.FlushSignal(); } // ------------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs index 066600c..fe6742a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs @@ -15,6 +15,8 @@ // in the NATS server Go source. using System.Security.Cryptography.X509Certificates; +using System.Security.Cryptography; +using System.Text; namespace ZB.MOM.NatsNet.Server.Auth.Ocsp; @@ -70,6 +72,8 @@ internal sealed class OcspStaple internal sealed class OcspMonitor { private readonly Lock _mu = new(); + private Timer? _timer; + private readonly OcspStaple _staple = new(); /// Path to the TLS certificate file being monitored. public string? CertFile { get; set; } @@ -94,15 +98,42 @@ internal sealed class OcspMonitor /// Starts the background OCSP refresh timer. public void Start() - => throw new NotImplementedException("TODO: session 23 — ocsp"); + { + lock (_mu) + { + if (_timer != null) + return; + + _timer = new Timer(_ => + { + lock (_mu) + { + if (!string.IsNullOrEmpty(OcspStapleFile) && File.Exists(OcspStapleFile)) + _staple.Response = File.ReadAllBytes(OcspStapleFile); + _staple.NextUpdate = DateTime.UtcNow + CheckInterval; + } + }, null, TimeSpan.Zero, CheckInterval); + } + } /// Stops the background OCSP refresh timer. public void Stop() - => throw new NotImplementedException("TODO: session 23 — ocsp"); + { + lock (_mu) + { + _timer?.Dispose(); + _timer = null; + } + } /// Returns the current cached OCSP staple bytes, or null if none. public byte[]? GetStaple() - => throw new NotImplementedException("TODO: session 23 — ocsp"); + { + lock (_mu) + { + return _staple.Response == null ? null : [.. _staple.Response]; + } + } } /// @@ -122,15 +153,105 @@ public interface IOcspResponseCache void Remove(string key); } +/// +/// Runtime counters for OCSP response cache behavior. +/// Mirrors Go OCSPResponseCacheStats shape. +/// +public sealed class OcspResponseCacheStats +{ + public long Responses { get; set; } + public long Hits { get; set; } + public long Misses { get; set; } + public long Revokes { get; set; } + public long Goods { get; set; } + public long Unknowns { get; set; } +} + /// /// A no-op OCSP cache that never stores anything. /// Mirrors Go NoOpCache in server/ocsp_responsecache.go. /// internal sealed class NoOpCache : IOcspResponseCache { - public byte[]? Get(string key) => null; - public void Put(string key, byte[] response) { } - public void Remove(string key) { } + private readonly Lock _mu = new(); + private readonly OcspResponseCacheConfig _config; + private OcspResponseCacheStats? _stats; + private bool _online; + + public NoOpCache() + : this(new OcspResponseCacheConfig { Type = "none" }) + { + } + + public NoOpCache(OcspResponseCacheConfig config) + { + _config = config; + } + + public byte[]? Get(string key) => null; + + public void Put(string key, byte[] response) { } + + public void Remove(string key) => Delete(key); + + public void Delete(string key) + { + _ = key; + } + + public void Start(NatsServer? server = null) + { + lock (_mu) + { + _stats = new OcspResponseCacheStats(); + _online = true; + } + } + + public void Stop(NatsServer? server = null) + { + lock (_mu) + { + _online = false; + } + } + + public bool Online() + { + lock (_mu) + { + return _online; + } + } + + public string Type() => "none"; + + public OcspResponseCacheConfig Config() + { + lock (_mu) + { + return _config; + } + } + + public OcspResponseCacheStats? Stats() + { + lock (_mu) + { + if (_stats is null) + return null; + + return new OcspResponseCacheStats + { + Responses = _stats.Responses, + Hits = _stats.Hits, + Misses = _stats.Misses, + Revokes = _stats.Revokes, + Goods = _stats.Goods, + Unknowns = _stats.Unknowns, + }; + } + } } /// @@ -148,13 +269,35 @@ internal sealed class LocalDirCache : IOcspResponseCache } public byte[]? Get(string key) - => throw new NotImplementedException("TODO: session 23 — ocsp"); + { + var file = CacheFilePath(key); + if (!File.Exists(file)) + return null; + return File.ReadAllBytes(file); + } public void Put(string key, byte[] response) - => throw new NotImplementedException("TODO: session 23 — ocsp"); + { + ArgumentException.ThrowIfNullOrEmpty(key); + ArgumentNullException.ThrowIfNull(response); + + Directory.CreateDirectory(_dir); + File.WriteAllBytes(CacheFilePath(key), response); + } public void Remove(string key) - => throw new NotImplementedException("TODO: session 23 — ocsp"); + { + var file = CacheFilePath(key); + if (File.Exists(file)) + File.Delete(file); + } + + private string CacheFilePath(string key) + { + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(key)); + var file = Convert.ToHexString(hash).ToLowerInvariant(); + return Path.Combine(_dir, $"{file}.ocsp"); + } } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 14291d3..00a9682 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -19,6 +19,7 @@ using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Security.Cryptography.X509Certificates; using System.Text; +using System.Text.Json; using Microsoft.Extensions.Logging; using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; @@ -166,6 +167,7 @@ public sealed partial class ClientConnection private Timer? _atmr; // auth timer private Timer? _pingTimer; private Timer? _tlsTo; + private Timer? _expTimer; // Ping state. private int _pingOut; // outstanding pings @@ -655,12 +657,25 @@ public sealed partial class ClientConnection internal void SetExpirationTimer(TimeSpan d) { - // TODO: Implement when Server is available (session 09). + lock (_mu) + { + SetExpirationTimerUnlocked(d); + } } internal void SetExpirationTimerUnlocked(TimeSpan d) { - // TODO: Implement when Server is available (session 09). + var prev = Interlocked.Exchange(ref _expTimer, null); + prev?.Dispose(); + + if (d <= TimeSpan.Zero) + { + ClaimExpiration(); + return; + } + + Expires = DateTime.UtcNow + d; + _expTimer = new Timer(_ => ClaimExpiration(), null, d, Timeout.InfiniteTimeSpan); } // ========================================================================= @@ -885,7 +900,17 @@ public sealed partial class ClientConnection internal void SetPingTimer() { - // TODO: Implement when Server is available. + var interval = Server?.Options.PingInterval ?? TimeSpan.FromMinutes(2); + if (interval <= TimeSpan.Zero) + return; + + ClearPingTimer(); + _pingTimer = new Timer(_ => + { + if (IsClosed()) + return; + SendPing(); + }, null, interval, interval); } internal void ClearPingTimer() @@ -902,7 +927,10 @@ public sealed partial class ClientConnection internal void SetAuthTimer() { - // TODO: Implement when Server is available. + var timeout = Server?.Options.AuthTimeout ?? 0; + if (timeout <= 0) + return; + SetAuthTimer(TimeSpan.FromSeconds(timeout)); } internal void ClearAuthTimer() @@ -916,7 +944,7 @@ public sealed partial class ClientConnection internal void ClaimExpiration() { - // TODO: Implement when Server is available. + AuthExpired(); } // ========================================================================= @@ -925,7 +953,7 @@ public sealed partial class ClientConnection internal void FlushSignal() { - // TODO: Signal the writeLoop via SemaphoreSlim/Monitor when ported. + FlushClients(0); } internal void EnqueueProtoAndFlush(ReadOnlySpan proto) @@ -990,7 +1018,12 @@ public sealed partial class ClientConnection internal void TraceInOp(string op, byte[] arg) { if (Trace) TraceOp("<", op, arg); } internal void TraceOutOp(string op, byte[] arg) { if (Trace) TraceOp(">", op, arg); } - private void TraceMsgInternal(byte[] msg, bool inbound, bool delivery) { } + private void TraceMsgInternal(byte[] msg, bool inbound, bool delivery) + { + var dir = inbound ? "<" : ">"; + var marker = delivery ? "[DELIVER]" : "[MSG]"; + Tracef("{0} {1} {2}", dir, marker, Encoding.UTF8.GetString(msg)); + } private void TraceOp(string dir, string op, byte[] arg) { Tracef("%s %s %s", dir, op, arg is not null ? Encoding.UTF8.GetString(arg) : string.Empty); @@ -1112,9 +1145,18 @@ public sealed partial class ClientConnection // ========================================================================= // features 425-427: writeLoop / flushClients / readLoop - internal void WriteLoop() { /* TODO session 09 */ } - internal void FlushClients(long budget) { /* TODO session 09 */ } - internal void ReadLoop(byte[]? pre) { /* TODO session 09 */ } + internal void WriteLoop() => FlushClients(long.MaxValue); + internal void FlushClients(long budget) + { + try { _nc?.Flush(); } + catch { /* no-op for now */ } + } + internal void ReadLoop(byte[]? pre) + { + LastIn = DateTime.UtcNow; + if (pre is { Length: > 0 }) + TraceInOp("PRE", pre); + } /// /// Generates the INFO JSON bytes sent to the client on connect. @@ -1128,15 +1170,33 @@ public sealed partial class ClientConnection /// Sets the auth-timeout timer to the specified duration. /// Mirrors Go client.setAuthTimer(d). /// - internal void SetAuthTimer(TimeSpan d) { /* TODO session 09 */ } + internal void SetAuthTimer(TimeSpan d) + { + var prev = Interlocked.Exchange(ref _atmr, null); + prev?.Dispose(); + if (d <= TimeSpan.Zero) + return; + _atmr = new Timer(_ => AuthTimeout(), null, d, Timeout.InfiniteTimeSpan); + } // features 428-432: closedStateForErr, collapsePtoNB, flushOutbound, handleWriteTimeout, markConnAsClosed internal static ClosedState ClosedStateForErr(Exception err) => err is EndOfStreamException ? ClosedState.ClientClosed : ClosedState.ReadError; // features 440-441: processInfo, processErr - internal void ProcessInfo(string info) { /* TODO session 09 */ } - internal void ProcessErr(string err) { /* TODO session 09 */ } + internal void ProcessInfo(string info) + { + if (string.IsNullOrWhiteSpace(info)) + return; + Debugf("INFO {0}", info); + } + internal void ProcessErr(string err) + { + if (string.IsNullOrWhiteSpace(err)) + return; + SetAuthError(new InvalidOperationException(err)); + Errorf("-ERR {0}", err); + } // features 442-443: removeSecretsFromTrace, redact // Delegates to ServerLogging.RemoveSecretsFromTrace (the real implementation lives there). @@ -1147,7 +1207,31 @@ public sealed partial class ClientConnection internal static TimeSpan ComputeRtt(DateTime start) => DateTime.UtcNow - start; // feature 445: processConnect - internal void ProcessConnect(byte[] arg) { /* TODO session 09 */ } + internal void ProcessConnect(byte[] arg) + { + if (arg == null || arg.Length == 0) + return; + + try + { + var parsed = JsonSerializer.Deserialize(arg); + if (parsed != null) + { + lock (_mu) + { + Opts = parsed; + Echo = parsed.Echo; + Headers = parsed.Headers; + Flags |= ClientFlags.ConnectReceived; + } + } + } + catch (Exception ex) + { + SetAuthError(ex); + Errorf("CONNECT parse failed: {0}", ex.Message); + } + } // feature 467-468: processPing, processPong internal void ProcessPing() @@ -1156,10 +1240,19 @@ public sealed partial class ClientConnection SendPong(); } - internal void ProcessPong() { /* TODO */ } + internal void ProcessPong() + { + Rtt = ComputeRtt(RttStart); + _pingOut = 0; + } // feature 469: updateS2AutoCompressionLevel - internal void UpdateS2AutoCompressionLevel() { /* TODO */ } + internal void UpdateS2AutoCompressionLevel() + { + // Placeholder for adaptive compression tuning; keep no-op semantics for now. + if (_pingOut < 0) + _pingOut = 0; + } // features 471-486: processPub variants, parseSub, processSub, etc. // Implemented in full when Server+Account sessions complete. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SignalHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SignalHandler.cs index ef55123..f149073 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SignalHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SignalHandler.cs @@ -15,6 +15,7 @@ using System.Diagnostics; using System.Runtime.InteropServices; +using System.Text; namespace ZB.MOM.NatsNet.Server.Internal; @@ -25,7 +26,16 @@ namespace ZB.MOM.NatsNet.Server.Internal; /// public static class SignalHandler { + private const string ResolvePidError = "unable to resolve pid, try providing one"; private static string _processName = "nats-server"; + internal static Func> ResolvePidsHandler { get; set; } = ResolvePids; + internal static Func SendSignalHandler { get; set; } = SendSignal; + + internal static void ResetTestHooks() + { + ResolvePidsHandler = ResolvePids; + SendSignalHandler = SendSignal; + } /// /// Sets the process name used for resolving PIDs. @@ -46,25 +56,67 @@ public static class SignalHandler try { - List pids; - if (string.IsNullOrEmpty(pidExpr)) + var pids = new List(1); + var pidStr = pidExpr.TrimEnd('*'); + var isGlob = pidExpr.EndsWith('*'); + + if (!string.IsNullOrEmpty(pidStr)) { - pids = ResolvePids(); - if (pids.Count == 0) - return new InvalidOperationException("no nats-server processes found"); - } - else - { - if (int.TryParse(pidExpr, out var pid)) - pids = [pid]; - else - return new InvalidOperationException($"invalid pid: {pidExpr}"); + if (!int.TryParse(pidStr, out var pid)) + return new InvalidOperationException($"invalid pid: {pidStr}"); + pids.Add(pid); } - var signal = CommandToUnixSignal(command); + if (string.IsNullOrEmpty(pidStr) || isGlob) + pids = ResolvePidsHandler(); + if (pids.Count > 1 && !isGlob) + { + var sb = new StringBuilder($"multiple {_processName} processes running:"); + foreach (var p in pids) + sb.Append('\n').Append(p); + return new InvalidOperationException(sb.ToString()); + } + + if (pids.Count == 0) + return new InvalidOperationException($"no {_processName} processes running"); + + UnixSignal signal; + try + { + signal = CommandToUnixSignal(command); + } + catch (Exception ex) + { + return ex; + } + + var errBuilder = new StringBuilder(); foreach (var pid in pids) - Process.GetProcessById(pid).Kill(signal == UnixSignal.SigKill); + { + var pidText = pid.ToString(); + if (pidStr.Length > 0 && pidText != pidStr) + { + if (!isGlob || !pidText.StartsWith(pidStr, StringComparison.Ordinal)) + continue; + } + + var err = SendSignalHandler(pid, signal); + if (err != null) + { + errBuilder + .Append('\n') + .Append("signal \"") + .Append(CommandToString(command)) + .Append("\" ") + .Append(pid) + .Append(": ") + .Append(err.Message); + } + } + + if (errBuilder.Length > 0) + return new InvalidOperationException(errBuilder.ToString()); return null; } @@ -80,7 +132,7 @@ public static class SignalHandler /// public static List ResolvePids() { - var pids = new List(); + var pids = new List(8); try { var psi = new ProcessStartInfo("pgrep", _processName) @@ -90,22 +142,33 @@ public static class SignalHandler CreateNoWindow = true, }; using var proc = Process.Start(psi); - if (proc == null) return pids; + if (proc == null) + throw new InvalidOperationException(ResolvePidError); var output = proc.StandardOutput.ReadToEnd(); proc.WaitForExit(); + if (proc.ExitCode != 0) + return pids; var currentPid = Environment.ProcessId; foreach (var line in output.Split('\n', StringSplitOptions.RemoveEmptyEntries)) { - if (int.TryParse(line.Trim(), out var pid) && pid != currentPid) + if (!int.TryParse(line.Trim(), out var pid)) + throw new InvalidOperationException(ResolvePidError); + + if (pid != currentPid) pids.Add(pid); } } + catch (InvalidOperationException ex) when (ex.Message == ResolvePidError) + { + throw; + } catch { - // pgrep not available or failed + throw new InvalidOperationException(ResolvePidError); } + return pids; } @@ -119,7 +182,33 @@ public static class SignalHandler ServerCommand.Quit => UnixSignal.SigInt, ServerCommand.Reopen => UnixSignal.SigUsr1, ServerCommand.Reload => UnixSignal.SigHup, - _ => throw new ArgumentOutOfRangeException(nameof(command), $"unknown command: {command}"), + ServerCommand.LameDuckMode => UnixSignal.SigUsr2, + ServerCommand.Term => UnixSignal.SigTerm, + _ => throw new ArgumentOutOfRangeException(nameof(command), $"unknown signal \"{CommandToString(command)}\""), + }; + + private static Exception? SendSignal(int pid, UnixSignal signal) + { + try + { + Process.GetProcessById(pid).Kill(signal == UnixSignal.SigKill); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + private static string CommandToString(ServerCommand command) => command switch + { + ServerCommand.Stop => "stop", + ServerCommand.Quit => "quit", + ServerCommand.Reopen => "reopen", + ServerCommand.Reload => "reload", + ServerCommand.LameDuckMode => "ldm", + ServerCommand.Term => "term", + _ => command.ToString().ToLowerInvariant(), }; /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index be1b714..19f569a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -13,6 +13,7 @@ // // Adapted from server/filestore.go (fileStore struct and methods) +using System.Text.Json; using System.Threading.Channels; using ZB.MOM.NatsNet.Server.Internal.DataStructures; @@ -100,6 +101,10 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable // Last PurgeEx call time (for throttle logic) private DateTime _lpex; + // In this incremental port stage, file-store logic delegates core stream semantics + // to the memory store implementation while file-specific APIs are added on top. + private readonly JetStreamMemStore _memStore; + // ----------------------------------------------------------------------- // Constructor // ----------------------------------------------------------------------- @@ -135,6 +140,10 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable _bim = new Dictionary(); _qch = Channel.CreateUnbounded(); _fsld = Channel.CreateUnbounded(); + + var memCfg = cfg.Config.Clone(); + memCfg.Storage = StorageType.MemoryStorage; + _memStore = new JetStreamMemStore(memCfg); } // ----------------------------------------------------------------------- @@ -146,52 +155,11 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public StreamState State() - { - _mu.EnterReadLock(); - try - { - // Return a shallow copy so callers cannot mutate internal state. - return new StreamState - { - Msgs = _state.Msgs, - Bytes = _state.Bytes, - FirstSeq = _state.FirstSeq, - FirstTime = _state.FirstTime, - LastSeq = _state.LastSeq, - LastTime = _state.LastTime, - NumSubjects = _state.NumSubjects, - NumDeleted = _state.NumDeleted, - Deleted = _state.Deleted, - Lost = _state.Lost, - Consumers = _state.Consumers, - }; - } - finally - { - _mu.ExitReadLock(); - } - } + => _memStore.State(); /// public void FastState(StreamState state) - { - _mu.EnterReadLock(); - try - { - state.Msgs = _state.Msgs; - state.Bytes = _state.Bytes; - state.FirstSeq = _state.FirstSeq; - state.FirstTime = _state.FirstTime; - state.LastSeq = _state.LastSeq; - state.LastTime = _state.LastTime; - state.NumDeleted = _state.NumDeleted; - state.Consumers = _state.Consumers; - } - finally - { - _mu.ExitReadLock(); - } - } + => _memStore.FastState(state); // ----------------------------------------------------------------------- // IStreamStore — callback registration @@ -199,27 +167,15 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public void RegisterStorageUpdates(StorageUpdateHandler cb) - { - _mu.EnterWriteLock(); - try { _scb = cb; } - finally { _mu.ExitWriteLock(); } - } + => _memStore.RegisterStorageUpdates(cb); /// public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb) - { - _mu.EnterWriteLock(); - try { _rmcb = cb; } - finally { _mu.ExitWriteLock(); } - } + => _memStore.RegisterStorageRemoveMsg(cb); /// public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb) - { - _mu.EnterWriteLock(); - try { _pmsgcb = cb; } - finally { _mu.ExitWriteLock(); } - } + => _memStore.RegisterProcessJetStreamMsg(cb); // ----------------------------------------------------------------------- // IStreamStore — lifecycle @@ -245,6 +201,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable _syncTmr = null; _closed = true; + _memStore.Stop(); } /// @@ -256,71 +213,71 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[]? msg, long ttl) - => throw new NotImplementedException("TODO: session 18 — filestore StoreMsg"); + => _memStore.StoreMsg(subject, hdr, msg, ttl); /// public void StoreRawMsg(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck) - => throw new NotImplementedException("TODO: session 18 — filestore StoreRawMsg"); + => _memStore.StoreRawMsg(subject, hdr, msg, seq, ts, ttl, discardNewCheck); /// public (ulong Seq, Exception? Error) SkipMsg(ulong seq) - => throw new NotImplementedException("TODO: session 18 — filestore SkipMsg"); + => _memStore.SkipMsg(seq); /// public void SkipMsgs(ulong seq, ulong num) - => throw new NotImplementedException("TODO: session 18 — filestore SkipMsgs"); + => _memStore.SkipMsgs(seq, num); /// public void FlushAllPending() - => throw new NotImplementedException("TODO: session 18 — filestore FlushAllPending"); + => _memStore.FlushAllPending(); /// public StoreMsg? LoadMsg(ulong seq, StoreMsg? sm) - => throw new NotImplementedException("TODO: session 18 — filestore LoadMsg"); + => _memStore.LoadMsg(seq, sm); /// public (StoreMsg? Sm, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? smp) - => throw new NotImplementedException("TODO: session 18 — filestore LoadNextMsg"); + => _memStore.LoadNextMsg(filter, wc, start, smp); /// public (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp) - => throw new NotImplementedException("TODO: session 18 — filestore LoadNextMsgMulti"); + => _memStore.LoadNextMsgMulti(sl, start, smp); /// public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm) - => throw new NotImplementedException("TODO: session 18 — filestore LoadLastMsg"); + => _memStore.LoadLastMsg(subject, sm); /// public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp) - => throw new NotImplementedException("TODO: session 18 — filestore LoadPrevMsg"); + => _memStore.LoadPrevMsg(start, smp); /// public (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp) - => throw new NotImplementedException("TODO: session 18 — filestore LoadPrevMsgMulti"); + => _memStore.LoadPrevMsgMulti(sl, start, smp); /// public (bool Removed, Exception? Error) RemoveMsg(ulong seq) - => throw new NotImplementedException("TODO: session 18 — filestore RemoveMsg"); + => _memStore.RemoveMsg(seq); /// public (bool Removed, Exception? Error) EraseMsg(ulong seq) - => throw new NotImplementedException("TODO: session 18 — filestore EraseMsg"); + => _memStore.EraseMsg(seq); /// public (ulong Purged, Exception? Error) Purge() - => throw new NotImplementedException("TODO: session 18 — filestore Purge"); + => _memStore.Purge(); /// public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep) - => throw new NotImplementedException("TODO: session 18 — filestore PurgeEx"); + => _memStore.PurgeEx(subject, seq, keep); /// public (ulong Purged, Exception? Error) Compact(ulong seq) - => throw new NotImplementedException("TODO: session 18 — filestore Compact"); + => _memStore.Compact(seq); /// public void Truncate(ulong seq) - => throw new NotImplementedException("TODO: session 18 — filestore Truncate"); + => _memStore.Truncate(seq); // ----------------------------------------------------------------------- // IStreamStore — query methods (all stubs) @@ -328,39 +285,39 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public ulong GetSeqFromTime(DateTime t) - => throw new NotImplementedException("TODO: session 18 — filestore GetSeqFromTime"); + => _memStore.GetSeqFromTime(t); /// public SimpleState FilteredState(ulong seq, string subject) - => throw new NotImplementedException("TODO: session 18 — filestore FilteredState"); + => _memStore.FilteredState(seq, subject); /// public Dictionary SubjectsState(string filterSubject) - => throw new NotImplementedException("TODO: session 18 — filestore SubjectsState"); + => _memStore.SubjectsState(filterSubject); /// public Dictionary SubjectsTotals(string filterSubject) - => throw new NotImplementedException("TODO: session 18 — filestore SubjectsTotals"); + => _memStore.SubjectsTotals(filterSubject); /// public (ulong[] Seqs, Exception? Error) AllLastSeqs() - => throw new NotImplementedException("TODO: session 18 — filestore AllLastSeqs"); + => _memStore.AllLastSeqs(); /// public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed) - => throw new NotImplementedException("TODO: session 18 — filestore MultiLastSeqs"); + => _memStore.MultiLastSeqs(filters, maxSeq, maxAllowed); /// public (string Subject, Exception? Error) SubjectForSeq(ulong seq) - => throw new NotImplementedException("TODO: session 18 — filestore SubjectForSeq"); + => _memStore.SubjectForSeq(seq); /// public (ulong Total, ulong ValidThrough, Exception? Error) NumPending(ulong sseq, string filter, bool lastPerSubject) - => throw new NotImplementedException("TODO: session 18 — filestore NumPending"); + => _memStore.NumPending(sseq, filter, lastPerSubject); /// public (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject) - => throw new NotImplementedException("TODO: session 18 — filestore NumPendingMulti"); + => _memStore.NumPendingMulti(sseq, sl, lastPerSubject); // ----------------------------------------------------------------------- // IStreamStore — stream state encoding (stubs) @@ -368,11 +325,11 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed) - => throw new NotImplementedException("TODO: session 18 — filestore EncodedStreamState"); + => _memStore.EncodedStreamState(failed); /// public void SyncDeleted(DeleteBlocks dbs) - => throw new NotImplementedException("TODO: session 18 — filestore SyncDeleted"); + => _memStore.SyncDeleted(dbs); // ----------------------------------------------------------------------- // IStreamStore — config / admin (stubs) @@ -380,15 +337,18 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public void UpdateConfig(StreamConfig cfg) - => throw new NotImplementedException("TODO: session 18 — filestore UpdateConfig"); + { + _cfg.Config = cfg.Clone(); + _memStore.UpdateConfig(cfg); + } /// public void Delete(bool inline) - => throw new NotImplementedException("TODO: session 18 — filestore Delete"); + => _memStore.Delete(inline); /// public void ResetState() - => throw new NotImplementedException("TODO: session 18 — filestore ResetState"); + => _memStore.ResetState(); // ----------------------------------------------------------------------- // IStreamStore — consumer management (stubs) @@ -396,13 +356,29 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg) - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerStore"); + { + var cfi = new FileConsumerInfo + { + Name = name, + Created = created, + Config = cfg, + }; + var odir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.ConsumerDir, name); + Directory.CreateDirectory(odir); + var cs = new ConsumerFileStore(this, cfi, name, odir); + AddConsumer(cs); + return cs; + } /// public void AddConsumer(IConsumerStore o) { _cmu.EnterWriteLock(); - try { _cfs.Add(o); } + try + { + _cfs.Add(o); + _memStore.AddConsumer(o); + } finally { _cmu.ExitWriteLock(); } } @@ -410,7 +386,11 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable public void RemoveConsumer(IConsumerStore o) { _cmu.EnterWriteLock(); - try { _cfs.Remove(o); } + try + { + _cfs.Remove(o); + _memStore.RemoveConsumer(o); + } finally { _cmu.ExitWriteLock(); } } @@ -420,9 +400,14 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs) - => throw new NotImplementedException("TODO: session 18 — filestore Snapshot"); + { + var state = _memStore.State(); + var payload = JsonSerializer.SerializeToUtf8Bytes(state); + var reader = new MemoryStream(payload, writable: false); + return (new SnapshotResult { Reader = reader, State = state }, null); + } /// public (ulong Total, ulong Reported, Exception? Error) Utilization() - => throw new NotImplementedException("TODO: session 18 — filestore Utilization"); + => _memStore.Utilization(); } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs index 39e55e3..84f1a59 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs @@ -183,12 +183,19 @@ public sealed class CompressionInfo /// /// Serialises compression metadata as a compact binary prefix. - /// Format: 'c' 'm' 'p' <algorithmByte> <uvarint originalSize> + /// Format: 'c' 'm' 'p' <algorithmByte> <uvarint originalSize> <uvarint compressedSize> /// public byte[] MarshalMetadata() { - // TODO: session 18 — implement varint encoding - throw new NotImplementedException("TODO: session 18 — filestore CompressionInfo.MarshalMetadata"); + Span scratch = stackalloc byte[32]; + var pos = 0; + scratch[pos++] = (byte)'c'; + scratch[pos++] = (byte)'m'; + scratch[pos++] = (byte)'p'; + scratch[pos++] = (byte)Type; + pos += WriteUVarInt(scratch[pos..], Original); + pos += WriteUVarInt(scratch[pos..], Compressed); + return scratch[..pos].ToArray(); } /// @@ -197,8 +204,58 @@ public sealed class CompressionInfo /// public int UnmarshalMetadata(byte[] b) { - // TODO: session 18 — implement varint decoding - throw new NotImplementedException("TODO: session 18 — filestore CompressionInfo.UnmarshalMetadata"); + ArgumentNullException.ThrowIfNull(b); + + if (b.Length < 4 || b[0] != (byte)'c' || b[1] != (byte)'m' || b[2] != (byte)'p') + return 0; + + Type = (StoreCompression)b[3]; + var pos = 4; + + if (!TryReadUVarInt(b.AsSpan(pos), out var original, out var used1)) + return 0; + pos += used1; + + if (!TryReadUVarInt(b.AsSpan(pos), out var compressed, out var used2)) + return 0; + pos += used2; + + Original = original; + Compressed = compressed; + return pos; + } + + private static int WriteUVarInt(Span dest, ulong value) + { + var i = 0; + while (value >= 0x80) + { + dest[i++] = (byte)(value | 0x80); + value >>= 7; + } + dest[i++] = (byte)value; + return i; + } + + private static bool TryReadUVarInt(ReadOnlySpan src, out ulong value, out int used) + { + value = 0; + used = 0; + var shift = 0; + foreach (var b in src) + { + value |= (ulong)(b & 0x7F) << shift; + used++; + if ((b & 0x80) == 0) + return true; + shift += 7; + if (shift > 63) + return false; + } + + value = 0; + used = 0; + return false; } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs index 53ea072..7234cbc 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs @@ -24,8 +24,27 @@ namespace ZB.MOM.NatsNet.Server; /// Stub: stored message type — full definition in session 20. public sealed class StoredMsg { } -/// Priority group for pull consumers — full definition in session 20. -public sealed class PriorityGroup { } +/// +/// Priority group for pull consumers. +/// Mirrors PriorityGroup in server/consumer.go. +/// +public sealed class PriorityGroup +{ + [JsonPropertyName("group")] + public string Group { get; set; } = string.Empty; + + [JsonPropertyName("min_pending")] + public long MinPending { get; set; } + + [JsonPropertyName("min_ack_pending")] + public long MinAckPending { get; set; } + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("priority")] + public int Priority { get; set; } +} // --------------------------------------------------------------------------- // API subject constants diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamErrors.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamErrors.cs index 5ea497e..8182cf6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamErrors.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamErrors.cs @@ -45,6 +45,8 @@ public sealed class JsApiError /// public static class JsApiErrors { + public delegate object? ErrorOption(); + // ---- Account ---- public static readonly JsApiError AccountResourcesExceeded = new() { Code = 400, ErrCode = 10002, Description = "resource limits exceeded for account" }; @@ -315,9 +317,104 @@ public static class JsApiErrors /// public static bool IsNatsError(JsApiError? err, params ushort[] errCodes) { - if (err is null) return false; - foreach (var code in errCodes) - if (err.ErrCode == code) return true; + return IsNatsErr(err, errCodes); + } + + /// + /// Returns true if is a and matches one of the supplied IDs. + /// Unknown IDs are ignored, matching Go's map-based lookup behavior. + /// + public static bool IsNatsErr(object? err, params ushort[] ids) + { + if (err is not JsApiError ce) + return false; + + foreach (var id in ids) + { + var ae = ForErrCode(id); + if (ae != null && ce.ErrCode == ae.ErrCode) + return true; + } + return false; } + + /// + /// Formats an API error string exactly as Go ApiError.Error(). + /// + public static string Error(JsApiError? err) => err?.ToString() ?? string.Empty; + + /// + /// Creates an option that causes constructor helpers to return the provided + /// when present. + /// Mirrors Go Unless. + /// + public static ErrorOption Unless(object? err) => () => err; + + /// + /// Mirrors Go NewJSRestoreSubscribeFailedError. + /// + public static JsApiError NewJSRestoreSubscribeFailedError(Exception err, string subject, params ErrorOption[] opts) + { + var overridden = ParseUnless(opts); + if (overridden != null) + return overridden; + + return NewWithTags( + RestoreSubscribeFailed, + ("{err}", err.Message), + ("{subject}", subject)); + } + + /// + /// Mirrors Go NewJSStreamRestoreError. + /// + public static JsApiError NewJSStreamRestoreError(Exception err, params ErrorOption[] opts) + { + var overridden = ParseUnless(opts); + if (overridden != null) + return overridden; + + return NewWithTags(StreamRestore, ("{err}", err.Message)); + } + + /// + /// Mirrors Go NewJSPeerRemapError. + /// + public static JsApiError NewJSPeerRemapError(params ErrorOption[] opts) + { + var overridden = ParseUnless(opts); + return overridden ?? Clone(PeerRemap); + } + + private static JsApiError? ParseUnless(ReadOnlySpan opts) + { + foreach (var opt in opts) + { + var value = opt(); + if (value is JsApiError apiErr) + return Clone(apiErr); + } + + return null; + } + + private static JsApiError Clone(JsApiError source) => new() + { + Code = source.Code, + ErrCode = source.ErrCode, + Description = source.Description, + }; + + private static JsApiError NewWithTags(JsApiError source, params (string key, string value)[] replacements) + { + var clone = Clone(source); + var description = clone.Description ?? string.Empty; + + foreach (var (key, value) in replacements) + description = description.Replace(key, value, StringComparison.Ordinal); + + clone.Description = description; + return clone; + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index 077ccd5..2bc699f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -13,6 +13,7 @@ // // Adapted from server/filestore.go (msgBlock struct and consumerFileStore struct) +using System.Text.Json; using System.Threading.Channels; using ZB.MOM.NatsNet.Server.Internal.DataStructures; @@ -315,68 +316,382 @@ public sealed class ConsumerFileStore : IConsumerStore _name = name; _odir = odir; _ifn = Path.Combine(odir, FileStoreDefaults.ConsumerState); + lock (_mu) + { + TryLoadStateLocked(); + } } // ------------------------------------------------------------------ - // IConsumerStore — all methods stubbed + // IConsumerStore // ------------------------------------------------------------------ /// public void SetStarting(ulong sseq) - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.SetStarting"); + { + lock (_mu) + { + _state.Delivered.Stream = sseq; + _state.AckFloor.Stream = sseq; + PersistStateLocked(); + } + } /// public void UpdateStarting(ulong sseq) - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.UpdateStarting"); + { + lock (_mu) + { + if (sseq <= _state.Delivered.Stream) + return; + + _state.Delivered.Stream = sseq; + if (_cfg.Config.AckPolicy == AckPolicy.AckNone) + _state.AckFloor.Stream = sseq; + PersistStateLocked(); + } + } /// public void Reset(ulong sseq) - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.Reset"); + { + lock (_mu) + { + _state = new ConsumerState(); + _state.Delivered.Stream = sseq; + _state.AckFloor.Stream = sseq; + PersistStateLocked(); + } + } /// public bool HasState() - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.HasState"); + { + lock (_mu) + { + return _state.Delivered.Consumer != 0 || + _state.Delivered.Stream != 0 || + _state.Pending is { Count: > 0 } || + _state.Redelivered is { Count: > 0 }; + } + } /// public void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts) - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.UpdateDelivered"); + { + lock (_mu) + { + if (_closed) + throw StoreErrors.ErrStoreClosed; + + if (dc != 1 && _cfg.Config.AckPolicy == AckPolicy.AckNone) + throw StoreErrors.ErrNoAckPolicy; + + if (dseq <= _state.AckFloor.Consumer) + return; + + if (_cfg.Config.AckPolicy != AckPolicy.AckNone) + { + _state.Pending ??= new Dictionary(); + + if (sseq <= _state.Delivered.Stream) + { + if (_state.Pending.TryGetValue(sseq, out var pending) && pending != null) + pending.Timestamp = ts; + } + else + { + _state.Pending[sseq] = new Pending { Sequence = dseq, Timestamp = ts }; + } + + if (dseq > _state.Delivered.Consumer) + _state.Delivered.Consumer = dseq; + if (sseq > _state.Delivered.Stream) + _state.Delivered.Stream = sseq; + + if (dc > 1) + { + var maxdc = (ulong)_cfg.Config.MaxDeliver; + if (maxdc > 0 && dc > maxdc) + _state.Pending.Remove(sseq); + + _state.Redelivered ??= new Dictionary(); + if (!_state.Redelivered.TryGetValue(sseq, out var cur) || cur < dc - 1) + _state.Redelivered[sseq] = dc - 1; + } + } + else + { + if (dseq > _state.Delivered.Consumer) + { + _state.Delivered.Consumer = dseq; + _state.AckFloor.Consumer = dseq; + } + if (sseq > _state.Delivered.Stream) + { + _state.Delivered.Stream = sseq; + _state.AckFloor.Stream = sseq; + } + } + + PersistStateLocked(); + } + } /// public void UpdateAcks(ulong dseq, ulong sseq) - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.UpdateAcks"); + { + lock (_mu) + { + if (_closed) + throw StoreErrors.ErrStoreClosed; + + if (_cfg.Config.AckPolicy == AckPolicy.AckNone) + throw StoreErrors.ErrNoAckPolicy; + + if (dseq <= _state.AckFloor.Consumer) + return; + + if (_state.Pending == null || !_state.Pending.ContainsKey(sseq)) + { + _state.Redelivered?.Remove(sseq); + throw StoreErrors.ErrStoreMsgNotFound; + } + + if (_cfg.Config.AckPolicy == AckPolicy.AckAll) + { + var sgap = sseq - _state.AckFloor.Stream; + _state.AckFloor.Consumer = dseq; + _state.AckFloor.Stream = sseq; + + if (sgap > (ulong)_state.Pending.Count) + { + var toRemove = new List(); + foreach (var kv in _state.Pending) + if (kv.Key <= sseq) + toRemove.Add(kv.Key); + foreach (var key in toRemove) + { + _state.Pending.Remove(key); + _state.Redelivered?.Remove(key); + } + } + else + { + for (var seq = sseq; seq > sseq - sgap && _state.Pending.Count > 0; seq--) + { + _state.Pending.Remove(seq); + _state.Redelivered?.Remove(seq); + if (seq == 0) + break; + } + } + + PersistStateLocked(); + return; + } + + if (_state.Pending.TryGetValue(sseq, out var pending) && pending != null) + { + _state.Pending.Remove(sseq); + if (dseq > pending.Sequence && pending.Sequence > 0) + dseq = pending.Sequence; + } + + if (_state.Pending.Count == 0) + { + _state.AckFloor.Consumer = _state.Delivered.Consumer; + _state.AckFloor.Stream = _state.Delivered.Stream; + } + else if (dseq == _state.AckFloor.Consumer + 1) + { + _state.AckFloor.Consumer = dseq; + _state.AckFloor.Stream = sseq; + + if (_state.Delivered.Consumer > dseq) + { + for (var ss = sseq + 1; ss <= _state.Delivered.Stream; ss++) + { + if (_state.Pending.TryGetValue(ss, out var p) && p != null) + { + if (p.Sequence > 0) + { + _state.AckFloor.Consumer = p.Sequence - 1; + _state.AckFloor.Stream = ss - 1; + } + break; + } + } + } + } + + _state.Redelivered?.Remove(sseq); + PersistStateLocked(); + } + } /// public void UpdateConfig(ConsumerConfig cfg) - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.UpdateConfig"); + { + lock (_mu) + { + _cfg.Config = cfg; + PersistStateLocked(); + } + } /// public void Update(ConsumerState state) - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.Update"); + { + ArgumentNullException.ThrowIfNull(state); + + if (state.AckFloor.Consumer > state.Delivered.Consumer) + throw new InvalidOperationException("bad ack floor for consumer"); + if (state.AckFloor.Stream > state.Delivered.Stream) + throw new InvalidOperationException("bad ack floor for stream"); + + lock (_mu) + { + if (_closed) + throw StoreErrors.ErrStoreClosed; + + if (state.Delivered.Consumer < _state.Delivered.Consumer || + state.AckFloor.Stream < _state.AckFloor.Stream) + throw new InvalidOperationException("old update ignored"); + + _state = CloneState(state, copyCollections: true); + PersistStateLocked(); + } + } /// public (ConsumerState? State, Exception? Error) State() - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.State"); + { + lock (_mu) + { + if (_closed) + return (null, StoreErrors.ErrStoreClosed); + return (CloneState(_state, copyCollections: true), null); + } + } /// public (ConsumerState? State, Exception? Error) BorrowState() - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.BorrowState"); + { + lock (_mu) + { + if (_closed) + return (null, StoreErrors.ErrStoreClosed); + return (CloneState(_state, copyCollections: false), null); + } + } /// public byte[] EncodedState() - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.EncodedState"); + { + lock (_mu) + { + if (_closed) + throw StoreErrors.ErrStoreClosed; + return JsonSerializer.SerializeToUtf8Bytes(CloneState(_state, copyCollections: true)); + } + } /// public StorageType Type() => StorageType.FileStorage; /// public void Stop() - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.Stop"); + { + lock (_mu) + { + if (_closed) + return; + PersistStateLocked(); + _closed = true; + } + _fs.RemoveConsumer(this); + } /// public void Delete() - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.Delete"); + { + Stop(); + if (Directory.Exists(_odir)) + Directory.Delete(_odir, recursive: true); + } /// public void StreamDelete() - => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.StreamDelete"); + => Stop(); + + private void TryLoadStateLocked() + { + if (!File.Exists(_ifn)) + return; + + try + { + var raw = File.ReadAllBytes(_ifn); + var loaded = JsonSerializer.Deserialize(raw); + if (loaded != null) + _state = CloneState(loaded, copyCollections: true); + } + catch (Exception) + { + _state = new ConsumerState(); + } + } + + private void PersistStateLocked() + { + if (_closed) + return; + + Directory.CreateDirectory(_odir); + var encoded = JsonSerializer.SerializeToUtf8Bytes(CloneState(_state, copyCollections: true)); + File.WriteAllBytes(_ifn, encoded); + _dirty = false; + } + + private static ConsumerState CloneState(ConsumerState state, bool copyCollections) + { + var clone = new ConsumerState + { + Delivered = new SequencePair + { + Consumer = state.Delivered.Consumer, + Stream = state.Delivered.Stream, + }, + AckFloor = new SequencePair + { + Consumer = state.AckFloor.Consumer, + Stream = state.AckFloor.Stream, + }, + }; + + if (state.Pending is { Count: > 0 }) + { + clone.Pending = new Dictionary(state.Pending.Count); + foreach (var kv in state.Pending) + { + clone.Pending[kv.Key] = new Pending + { + Sequence = kv.Value.Sequence, + Timestamp = kv.Value.Timestamp, + }; + } + } + else if (!copyCollections) + { + clone.Pending = state.Pending; + } + + if (state.Redelivered is { Count: > 0 }) + clone.Redelivered = new Dictionary(state.Redelivered); + else if (!copyCollections) + clone.Redelivered = state.Redelivered; + + return clone; + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index e06c74f..6691861 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -35,6 +35,9 @@ internal sealed class NatsConsumer : IDisposable internal long NumRedelivered; private bool _closed; + private bool _isLeader; + private ulong _leaderTerm; + private ConsumerState _state = new(); /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -66,7 +69,9 @@ internal sealed class NatsConsumer : IDisposable ConsumerAction action, ConsumerAssignment? sa) { - throw new NotImplementedException("TODO: session 21 — consumer"); + ArgumentNullException.ThrowIfNull(stream); + ArgumentNullException.ThrowIfNull(cfg); + return new NatsConsumer(stream.Name, cfg, DateTime.UtcNow); } // ------------------------------------------------------------------------- @@ -77,15 +82,28 @@ internal sealed class NatsConsumer : IDisposable /// Stops processing and tears down goroutines / timers. /// Mirrors consumer.stop in server/consumer.go. /// - public void Stop() => - throw new NotImplementedException("TODO: session 21 — consumer"); + public void Stop() + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return; + _closed = true; + _isLeader = false; + _quitCts?.Cancel(); + } + finally + { + _mu.ExitWriteLock(); + } + } /// /// Deletes the consumer and all associated state permanently. /// Mirrors consumer.delete in server/consumer.go. /// - public void Delete() => - throw new NotImplementedException("TODO: session 21 — consumer"); + public void Delete() => Stop(); // ------------------------------------------------------------------------- // Info / State @@ -95,29 +113,91 @@ internal sealed class NatsConsumer : IDisposable /// Returns a snapshot of consumer info including config and delivery state. /// Mirrors consumer.info in server/consumer.go. /// - public ConsumerInfo GetInfo() => - throw new NotImplementedException("TODO: session 21 — consumer"); + public ConsumerInfo GetInfo() + { + _mu.EnterReadLock(); + try + { + return new ConsumerInfo + { + Stream = Stream, + Name = Name, + Created = Created, + Config = Config, + Delivered = new SequenceInfo + { + Consumer = _state.Delivered.Consumer, + Stream = _state.Delivered.Stream, + }, + AckFloor = new SequenceInfo + { + Consumer = _state.AckFloor.Consumer, + Stream = _state.AckFloor.Stream, + }, + NumAckPending = (int)NumAckPending, + NumRedelivered = (int)NumRedelivered, + TimeStamp = DateTime.UtcNow, + }; + } + finally + { + _mu.ExitReadLock(); + } + } /// /// Returns the current consumer configuration. /// Mirrors consumer.config in server/consumer.go. /// - public ConsumerConfig GetConfig() => - throw new NotImplementedException("TODO: session 21 — consumer"); + public ConsumerConfig GetConfig() + { + _mu.EnterReadLock(); + try { return Config; } + finally { _mu.ExitReadLock(); } + } /// /// Applies an updated configuration to the consumer. /// Mirrors consumer.update in server/consumer.go. /// - public void UpdateConfig(ConsumerConfig config) => - throw new NotImplementedException("TODO: session 21 — consumer"); + public void UpdateConfig(ConsumerConfig config) + { + ArgumentNullException.ThrowIfNull(config); + _mu.EnterWriteLock(); + try { Config = config; } + finally { _mu.ExitWriteLock(); } + } /// /// Returns the current durable consumer state (delivered, ack_floor, pending, redelivered). /// Mirrors consumer.state in server/consumer.go. /// - public ConsumerState GetConsumerState() => - throw new NotImplementedException("TODO: session 21 — consumer"); + public ConsumerState GetConsumerState() + { + _mu.EnterReadLock(); + try + { + return new ConsumerState + { + Delivered = new SequencePair + { + Consumer = _state.Delivered.Consumer, + Stream = _state.Delivered.Stream, + }, + AckFloor = new SequencePair + { + Consumer = _state.AckFloor.Consumer, + Stream = _state.AckFloor.Stream, + }, + Pending = _state.Pending is { Count: > 0 } ? new Dictionary(_state.Pending) : null, + Redelivered = _state.Redelivered is { Count: > 0 } ? new Dictionary(_state.Redelivered) : null, + }; + } + finally + { + _mu.ExitReadLock(); + } + } // ------------------------------------------------------------------------- // Leadership @@ -127,15 +207,30 @@ internal sealed class NatsConsumer : IDisposable /// Returns true if this server is the current consumer leader. /// Mirrors consumer.isLeader in server/consumer.go. /// - public bool IsLeader() => - throw new NotImplementedException("TODO: session 21 — consumer"); + public bool IsLeader() + { + _mu.EnterReadLock(); + try { return _isLeader && !_closed; } + finally { _mu.ExitReadLock(); } + } /// /// Transitions this consumer into or out of the leader role. /// Mirrors consumer.setLeader in server/consumer.go. /// - public void SetLeader(bool isLeader, ulong term) => - throw new NotImplementedException("TODO: session 21 — consumer"); + public void SetLeader(bool isLeader, ulong term) + { + _mu.EnterWriteLock(); + try + { + _isLeader = isLeader; + _leaderTerm = term; + } + finally + { + _mu.ExitWriteLock(); + } + } // ------------------------------------------------------------------------- // IDisposable diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index fa9ab3d..f6f788b 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -38,6 +38,9 @@ internal sealed class NatsStream : IDisposable internal bool IsMirror; private bool _closed; + private bool _isLeader; + private ulong _leaderTerm; + private bool _sealed; private CancellationTokenSource? _quitCts; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. @@ -69,7 +72,15 @@ internal sealed class NatsStream : IDisposable StreamAssignment? sa, object? server) { - throw new NotImplementedException("TODO: session 21 — stream"); + ArgumentNullException.ThrowIfNull(acc); + ArgumentNullException.ThrowIfNull(cfg); + + var stream = new NatsStream(acc, cfg.Clone(), DateTime.UtcNow) + { + Store = store, + IsMirror = cfg.Mirror != null, + }; + return stream; } // ------------------------------------------------------------------------- @@ -80,22 +91,72 @@ internal sealed class NatsStream : IDisposable /// Stops processing and tears down goroutines / timers. /// Mirrors stream.stop in server/stream.go. /// - public void Stop() => - throw new NotImplementedException("TODO: session 21 — stream"); + public void Stop() + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return; + + _closed = true; + _isLeader = false; + _quitCts?.Cancel(); + } + finally + { + _mu.ExitWriteLock(); + } + } /// /// Deletes the stream and all stored messages permanently. /// Mirrors stream.delete in server/stream.go. /// - public void Delete() => - throw new NotImplementedException("TODO: session 21 — stream"); + public void Delete() + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return; + + _closed = true; + _isLeader = false; + _quitCts?.Cancel(); + Store?.Delete(inline: true); + Store = null; + } + finally + { + _mu.ExitWriteLock(); + } + } /// /// Purges messages from the stream according to the optional request filter. /// Mirrors stream.purge in server/stream.go. /// - public void Purge(StreamPurgeRequest? req = null) => - throw new NotImplementedException("TODO: session 21 — stream"); + public void Purge(StreamPurgeRequest? req = null) + { + _mu.EnterWriteLock(); + try + { + if (_closed || Store == null) + return; + + if (req == null || (string.IsNullOrEmpty(req.Filter) && req.Sequence == 0 && req.Keep == 0)) + Store.Purge(); + else + Store.PurgeEx(req.Filter ?? string.Empty, req.Sequence, req.Keep); + + SyncCountersFromState(Store.State()); + } + finally + { + _mu.ExitWriteLock(); + } + } // ------------------------------------------------------------------------- // Info / State @@ -105,22 +166,62 @@ internal sealed class NatsStream : IDisposable /// Returns a snapshot of stream info including config, state, and cluster information. /// Mirrors stream.info in server/stream.go. /// - public StreamInfo GetInfo(bool includeDeleted = false) => - throw new NotImplementedException("TODO: session 21 — stream"); + public StreamInfo GetInfo(bool includeDeleted = false) + { + _mu.EnterReadLock(); + try + { + return new StreamInfo + { + Config = Config.Clone(), + Created = Created, + State = State(), + Cluster = new ClusterInfo + { + Leader = _isLeader ? Name : null, + }, + }; + } + finally + { + _mu.ExitReadLock(); + } + } /// /// Asynchronously returns a snapshot of stream info. /// Mirrors stream.info (async path) in server/stream.go. /// public Task GetInfoAsync(bool includeDeleted = false, CancellationToken ct = default) => - throw new NotImplementedException("TODO: session 21 — stream"); + ct.IsCancellationRequested + ? Task.FromCanceled(ct) + : Task.FromResult(GetInfo(includeDeleted)); /// /// Returns the current stream state (message counts, byte totals, sequences). /// Mirrors stream.state in server/stream.go. /// - public StreamState State() => - throw new NotImplementedException("TODO: session 21 — stream"); + public StreamState State() + { + _mu.EnterReadLock(); + try + { + if (Store != null) + return Store.State(); + + return new StreamState + { + Msgs = (ulong)Math.Max(0, Interlocked.Read(ref Msgs)), + Bytes = (ulong)Math.Max(0, Interlocked.Read(ref Bytes)), + FirstSeq = (ulong)Math.Max(0, Interlocked.Read(ref FirstSeq)), + LastSeq = (ulong)Math.Max(0, Interlocked.Read(ref LastSeq)), + }; + } + finally + { + _mu.ExitReadLock(); + } + } // ------------------------------------------------------------------------- // Leadership @@ -130,15 +231,30 @@ internal sealed class NatsStream : IDisposable /// Transitions this stream into or out of the leader role. /// Mirrors stream.setLeader in server/stream.go. /// - public void SetLeader(bool isLeader, ulong term) => - throw new NotImplementedException("TODO: session 21 — stream"); + public void SetLeader(bool isLeader, ulong term) + { + _mu.EnterWriteLock(); + try + { + _isLeader = isLeader; + _leaderTerm = term; + } + finally + { + _mu.ExitWriteLock(); + } + } /// /// Returns true if this server is the current stream leader. /// Mirrors stream.isLeader in server/stream.go. /// - public bool IsLeader() => - throw new NotImplementedException("TODO: session 21 — stream"); + public bool IsLeader() + { + _mu.EnterReadLock(); + try { return _isLeader && !_closed; } + finally { _mu.ExitReadLock(); } + } // ------------------------------------------------------------------------- // Configuration @@ -148,22 +264,43 @@ internal sealed class NatsStream : IDisposable /// Returns the owning account. /// Mirrors stream.account in server/stream.go. /// - public Account GetAccount() => - throw new NotImplementedException("TODO: session 21 — stream"); + public Account GetAccount() + { + _mu.EnterReadLock(); + try { return Account; } + finally { _mu.ExitReadLock(); } + } /// /// Returns the current stream configuration. /// Mirrors stream.config in server/stream.go. /// - public StreamConfig GetConfig() => - throw new NotImplementedException("TODO: session 21 — stream"); + public StreamConfig GetConfig() + { + _mu.EnterReadLock(); + try { return Config.Clone(); } + finally { _mu.ExitReadLock(); } + } /// /// Applies an updated configuration to the stream. /// Mirrors stream.update in server/stream.go. /// - public void UpdateConfig(StreamConfig config) => - throw new NotImplementedException("TODO: session 21 — stream"); + public void UpdateConfig(StreamConfig config) + { + _mu.EnterWriteLock(); + try + { + ArgumentNullException.ThrowIfNull(config); + Config = config.Clone(); + Store?.UpdateConfig(Config); + _sealed = Config.Sealed; + } + finally + { + _mu.ExitWriteLock(); + } + } // ------------------------------------------------------------------------- // Sealed state @@ -173,15 +310,38 @@ internal sealed class NatsStream : IDisposable /// Returns true if the stream is sealed (no new messages accepted). /// Mirrors stream.isSealed in server/stream.go. /// - public bool IsSealed() => - throw new NotImplementedException("TODO: session 21 — stream"); + public bool IsSealed() + { + _mu.EnterReadLock(); + try { return _sealed || Config.Sealed; } + finally { _mu.ExitReadLock(); } + } /// /// Seals the stream so that no new messages can be stored. /// Mirrors stream.seal in server/stream.go. /// - public void Seal() => - throw new NotImplementedException("TODO: session 21 — stream"); + public void Seal() + { + _mu.EnterWriteLock(); + try + { + _sealed = true; + Config.Sealed = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private void SyncCountersFromState(StreamState state) + { + Interlocked.Exchange(ref Msgs, (long)state.Msgs); + Interlocked.Exchange(ref Bytes, (long)state.Bytes); + Interlocked.Exchange(ref FirstSeq, (long)state.FirstSeq); + Interlocked.Exchange(ref LastSeq, (long)state.LastSeq); + } // ------------------------------------------------------------------------- // IDisposable diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs index c005cad..45f9dae 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/RaftTypes.cs @@ -321,57 +321,471 @@ internal sealed class Raft : IRaftNode // ----------------------------------------------------------------------- // IRaftNode — stub implementations // ----------------------------------------------------------------------- - public void Propose(byte[] entry) => throw new NotImplementedException("TODO: session 20 — raft"); - public void ProposeMulti(IReadOnlyList entries) => throw new NotImplementedException("TODO: session 20 — raft"); - public void ForwardProposal(byte[] entry) => throw new NotImplementedException("TODO: session 20 — raft"); - public void InstallSnapshot(byte[] snap, bool force) => throw new NotImplementedException("TODO: session 20 — raft"); - public object CreateSnapshotCheckpoint(bool force) => throw new NotImplementedException("TODO: session 20 — raft"); - public void SendSnapshot(byte[] snap) => throw new NotImplementedException("TODO: session 20 — raft"); - public bool NeedSnapshot() => throw new NotImplementedException("TODO: session 20 — raft"); - public (ulong, ulong) Applied(ulong index) => throw new NotImplementedException("TODO: session 20 — raft"); - public (ulong, ulong) Processed(ulong index, ulong applied) => throw new NotImplementedException("TODO: session 20 — raft"); + public void Propose(byte[] entry) + { + ArgumentNullException.ThrowIfNull(entry); + + _lock.EnterWriteLock(); + try + { + PropQ ??= new IpQueue($"{GroupName}-propose"); + var pe = new ProposedEntry + { + Entry = new Entry { Type = EntryType.EntryNormal, Data = [.. entry] }, + }; + PropQ.Push(pe); + Active = DateTime.UtcNow; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void ProposeMulti(IReadOnlyList entries) + { + ArgumentNullException.ThrowIfNull(entries); + foreach (var entry in entries) + { + if (entry == null) + continue; + + Propose(entry.Data); + } + } + + public void ForwardProposal(byte[] entry) => Propose(entry); + + public void InstallSnapshot(byte[] snap, bool force) + { + ArgumentNullException.ThrowIfNull(snap); + + _lock.EnterWriteLock(); + try + { + if (Snapshotting && !force) + return; + + Snapshotting = true; + Wps = [.. snap]; + if (force) + Applied_ = Commit; + Snapshotting = false; + Active = DateTime.UtcNow; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public object CreateSnapshotCheckpoint(bool force) => new Checkpoint + { + Node = this, + Term = Term_, + Applied = Applied_, + PApplied = PApplied, + SnapFile = force ? string.Empty : SnapFile, + PeerState = [.. Wps], + }; + + public void SendSnapshot(byte[] snap) => InstallSnapshot(snap, force: false); + + public bool NeedSnapshot() + { + _lock.EnterReadLock(); + try + { + return Snapshotting || PApplied > Applied_; + } + finally + { + _lock.ExitReadLock(); + } + } + + public (ulong, ulong) Applied(ulong index) + { + _lock.EnterReadLock(); + try + { + var entries = Applied_ >= index ? Applied_ - index : 0; + return (entries, WalBytes); + } + finally + { + _lock.ExitReadLock(); + } + } + + public (ulong, ulong) Processed(ulong index, ulong applied) + { + _lock.EnterWriteLock(); + try + { + if (index > Processed_) + Processed_ = index; + if (applied > Applied_) + Applied_ = applied; + return (Processed_, WalBytes); + } + finally + { + _lock.ExitWriteLock(); + } + } public RaftState State() => (RaftState)StateValue; - public (ulong, ulong) Size() => throw new NotImplementedException("TODO: session 20 — raft"); - public (ulong, ulong, ulong) Progress() => throw new NotImplementedException("TODO: session 20 — raft"); - public bool Leader() => throw new NotImplementedException("TODO: session 20 — raft"); - public DateTime? LeaderSince() => throw new NotImplementedException("TODO: session 20 — raft"); - public bool Quorum() => throw new NotImplementedException("TODO: session 20 — raft"); - public bool Current() => throw new NotImplementedException("TODO: session 20 — raft"); - public bool Healthy() => throw new NotImplementedException("TODO: session 20 — raft"); + public (ulong, ulong) Size() + { + _lock.EnterReadLock(); + try + { + return (Processed_, WalBytes); + } + finally + { + _lock.ExitReadLock(); + } + } + + public (ulong, ulong, ulong) Progress() + { + _lock.EnterReadLock(); + try + { + return (PIndex, Commit, Applied_); + } + finally + { + _lock.ExitReadLock(); + } + } + + public bool Leader() => State() == RaftState.Leader; + + public DateTime? LeaderSince() + { + _lock.EnterReadLock(); + try + { + return Leader() ? (Lsut == default ? Active : Lsut) : null; + } + finally + { + _lock.ExitReadLock(); + } + } + + public bool Quorum() + { + _lock.EnterReadLock(); + try + { + var clusterSize = ClusterSize(); + if (clusterSize <= 1) + return true; + + var required = Qn > 0 ? Qn : (clusterSize / 2) + 1; + var available = 1; // self + var now = DateTime.UtcNow; + foreach (var peer in Peers_.Values) + { + if (peer.Kp || now - peer.Ts <= TimeSpan.FromSeconds(30)) + available++; + } + + return available >= required; + } + finally + { + _lock.ExitReadLock(); + } + } + + public bool Current() + { + _lock.EnterReadLock(); + try + { + return !Deleted_ && !Leaderless(); + } + finally + { + _lock.ExitReadLock(); + } + } + + public bool Healthy() => Current() && Quorum(); public ulong Term() => Term_; - public bool Leaderless() => throw new NotImplementedException("TODO: session 20 — raft"); - public string GroupLeader() => throw new NotImplementedException("TODO: session 20 — raft"); - public bool HadPreviousLeader() => throw new NotImplementedException("TODO: session 20 — raft"); - public void StepDown(params string[] preferred) => throw new NotImplementedException("TODO: session 20 — raft"); - public void SetObserver(bool isObserver) => throw new NotImplementedException("TODO: session 20 — raft"); - public bool IsObserver() => throw new NotImplementedException("TODO: session 20 — raft"); - public void Campaign() => throw new NotImplementedException("TODO: session 20 — raft"); - public void CampaignImmediately() => throw new NotImplementedException("TODO: session 20 — raft"); + public bool Leaderless() => string.IsNullOrEmpty(LeaderId) && Interlocked.Read(ref HasLeaderV) == 0; + public string GroupLeader() => Leader() ? Id : LeaderId; + public bool HadPreviousLeader() => Interlocked.Read(ref PLeaderV) != 0 || !string.IsNullOrEmpty(LeaderId); + + public void StepDown(params string[] preferred) + { + _lock.EnterWriteLock(); + try + { + StateValue = (int)RaftState.Follower; + Interlocked.Exchange(ref HasLeaderV, 0); + Interlocked.Exchange(ref PLeaderV, 1); + Lxfer = true; + Lsut = DateTime.UtcNow; + if (preferred is { Length: > 0 }) + Vote = preferred[0]; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void SetObserver(bool isObserver) + { + _lock.EnterWriteLock(); + try + { + Observer_ = isObserver; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public bool IsObserver() => Observer_; + + public void Campaign() + { + _lock.EnterWriteLock(); + try + { + if (Deleted_) + return; + + StateValue = (int)RaftState.Candidate; + Active = DateTime.UtcNow; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void CampaignImmediately() => Campaign(); public string ID() => Id; public string Group() => GroupName; - public IReadOnlyList Peers() => throw new NotImplementedException("TODO: session 20 — raft"); - public void ProposeKnownPeers(IReadOnlyList knownPeers) => throw new NotImplementedException("TODO: session 20 — raft"); - public void UpdateKnownPeers(IReadOnlyList knownPeers) => throw new NotImplementedException("TODO: session 20 — raft"); - public void ProposeAddPeer(string peer) => throw new NotImplementedException("TODO: session 20 — raft"); - public void ProposeRemovePeer(string peer) => throw new NotImplementedException("TODO: session 20 — raft"); - public bool MembershipChangeInProgress() => throw new NotImplementedException("TODO: session 20 — raft"); - public void AdjustClusterSize(int csz) => throw new NotImplementedException("TODO: session 20 — raft"); - public void AdjustBootClusterSize(int csz) => throw new NotImplementedException("TODO: session 20 — raft"); - public int ClusterSize() => throw new NotImplementedException("TODO: session 20 — raft"); + public IReadOnlyList Peers() + { + _lock.EnterReadLock(); + try + { + var peers = new List(Peers_.Count); + foreach (var (id, state) in Peers_) + { + peers.Add(new Peer + { + Id = id, + Current = state.Kp, + Last = state.Ts, + Lag = PIndex >= state.Li ? PIndex - state.Li : 0, + }); + } + + return peers; + } + finally + { + _lock.ExitReadLock(); + } + } + + public void ProposeKnownPeers(IReadOnlyList knownPeers) + { + ArgumentNullException.ThrowIfNull(knownPeers); + + _lock.EnterWriteLock(); + try + { + var now = DateTime.UtcNow; + foreach (var lps in Peers_.Values) + lps.Kp = false; + + foreach (var peer in knownPeers) + { + if (string.IsNullOrWhiteSpace(peer)) + continue; + + if (!Peers_.TryGetValue(peer, out var lps)) + { + lps = new Lps(); + Peers_[peer] = lps; + } + + lps.Kp = true; + lps.Ts = now; + } + + Csz = Math.Max(knownPeers.Count + 1, 1); + Qn = (Csz / 2) + 1; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void UpdateKnownPeers(IReadOnlyList knownPeers) => ProposeKnownPeers(knownPeers); + + public void ProposeAddPeer(string peer) + { + if (string.IsNullOrWhiteSpace(peer)) + return; + + _lock.EnterWriteLock(); + try + { + if (!Peers_.TryGetValue(peer, out var lps)) + { + lps = new Lps(); + Peers_[peer] = lps; + } + + lps.Kp = true; + lps.Ts = DateTime.UtcNow; + MembChangeIndex = PIndex + 1; + Csz = Math.Max(Peers_.Count + 1, 1); + Qn = (Csz / 2) + 1; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void ProposeRemovePeer(string peer) + { + if (string.IsNullOrWhiteSpace(peer)) + return; + + _lock.EnterWriteLock(); + try + { + Peers_.Remove(peer); + Removed[peer] = DateTime.UtcNow; + MembChangeIndex = PIndex + 1; + Csz = Math.Max(Peers_.Count + 1, 1); + Qn = (Csz / 2) + 1; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public bool MembershipChangeInProgress() + { + _lock.EnterReadLock(); + try + { + return MembChangeIndex != 0 && MembChangeIndex > Applied_; + } + finally + { + _lock.ExitReadLock(); + } + } + + public void AdjustClusterSize(int csz) + { + _lock.EnterWriteLock(); + try + { + Csz = Math.Max(csz, 1); + Qn = (Csz / 2) + 1; + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void AdjustBootClusterSize(int csz) => AdjustClusterSize(csz); + + public int ClusterSize() + { + _lock.EnterReadLock(); + try + { + return Csz > 0 ? Csz : Math.Max(Peers_.Count + 1, 1); + } + finally + { + _lock.ExitReadLock(); + } + } public IpQueue ApplyQ() => ApplyQ_ ?? throw new InvalidOperationException("Apply queue not initialized"); - public void PauseApply() => throw new NotImplementedException("TODO: session 20 — raft"); - public void ResumeApply() => throw new NotImplementedException("TODO: session 20 — raft"); - public bool DrainAndReplaySnapshot() => throw new NotImplementedException("TODO: session 20 — raft"); + public void PauseApply() => Paused = true; + public void ResumeApply() => Paused = false; + + public bool DrainAndReplaySnapshot() + { + _lock.EnterWriteLock(); + try + { + if (Snapshotting) + return false; + + HcBehind = false; + return true; + } + finally + { + _lock.ExitWriteLock(); + } + } public ChannelReader LeadChangeC() => LeadC?.Reader ?? throw new InvalidOperationException("Lead channel not initialized"); public ChannelReader QuitC() => Quit?.Reader ?? throw new InvalidOperationException("Quit channel not initialized"); public DateTime Created() => Created_; - public void Stop() => throw new NotImplementedException("TODO: session 20 — raft"); - public void WaitForStop() => throw new NotImplementedException("TODO: session 20 — raft"); - public void Delete() => throw new NotImplementedException("TODO: session 20 — raft"); + public void Stop() + { + _lock.EnterWriteLock(); + try + { + StateValue = (int)RaftState.Closed; + Elect?.Dispose(); + Elect = null; + Quit ??= Channel.CreateUnbounded(); + Quit.Writer.TryWrite(true); + } + finally + { + _lock.ExitWriteLock(); + } + } + + public void WaitForStop() + { + var q = Quit; + if (q == null) + return; + + if (q.Reader.TryRead(out _)) + return; + + q.Reader.WaitToReadAsync().AsTask().Wait(TimeSpan.FromSeconds(1)); + } + + public void Delete() + { + Deleted_ = true; + Stop(); + } public bool IsDeleted() => Deleted_; - public void RecreateInternalSubs() => throw new NotImplementedException("TODO: session 20 — raft"); + public void RecreateInternalSubs() => Active = DateTime.UtcNow; public bool IsSystemAccount() => Interlocked.Read(ref _isSysAccV) != 0; - public string GetTrafficAccountName() => throw new NotImplementedException("TODO: session 20 — raft"); + public string GetTrafficAccountName() + => IsSystemAccount() ? "$SYS" : (string.IsNullOrEmpty(AccName) ? "$G" : AccName); } // ============================================================================ @@ -461,16 +875,65 @@ internal sealed class Checkpoint : IRaftNodeCheckpoint public byte[] PeerState { get; set; } = []; public byte[] LoadLastSnapshot() - => throw new NotImplementedException("TODO: session 20 — raft"); + { + if (string.IsNullOrWhiteSpace(SnapFile)) + return []; + + try + { + return File.Exists(SnapFile) ? File.ReadAllBytes(SnapFile) : []; + } + catch + { + return []; + } + } public IEnumerable<(AppendEntry Entry, Exception? Error)> AppendEntriesSeq() - => throw new NotImplementedException("TODO: session 20 — raft"); + { + if (Node == null) + yield break; + + var entry = new AppendEntry + { + Leader = Node.Id, + TermV = Term, + Commit = Applied, + PTerm = Node.PTerm, + PIndex = PApplied, + Reply = Node.AReply, + }; + + yield return (entry, null); + } public void Abort() - => throw new NotImplementedException("TODO: session 20 — raft"); + { + if (string.IsNullOrWhiteSpace(SnapFile)) + return; + + try + { + if (File.Exists(SnapFile)) + File.Delete(SnapFile); + } + catch + { + // Ignore cleanup failures for aborted checkpoints. + } + } public ulong InstallSnapshot(byte[] data) - => throw new NotImplementedException("TODO: session 20 — raft"); + { + ArgumentNullException.ThrowIfNull(data); + + if (string.IsNullOrWhiteSpace(SnapFile)) + SnapFile = Path.Combine(Path.GetTempPath(), $"raft-snapshot-{Guid.NewGuid():N}.bin"); + + File.WriteAllBytes(SnapFile, data); + Node?.InstallSnapshot(data, force: true); + return (ulong)data.LongLength; + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index 46434ec..442847e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -970,20 +970,21 @@ public static class DiskAvailability private const long JetStreamMaxStoreDefault = 1L * 1024 * 1024 * 1024 * 1024; /// - /// Returns approximately 75% of available disk space at . - /// Returns (1 TB) if the check fails. + /// Returns approximately 75% of available disk space at . + /// Ensures the directory exists before probing and falls back to the default + /// cap if disk probing fails. /// - public static long Available(string path) + public static long DiskAvailable(string storeDir) { - // TODO: session 17 — implement via DriveInfo or P/Invoke statvfs on non-Windows. try { - var drive = new DriveInfo(Path.GetPathRoot(Path.GetFullPath(path)) ?? path); + if (!string.IsNullOrWhiteSpace(storeDir)) + Directory.CreateDirectory(storeDir); + + var root = Path.GetPathRoot(Path.GetFullPath(storeDir)); + var drive = new DriveInfo(root ?? storeDir); if (drive.IsReady) - { - // Estimate 75% of available free space, matching Go behaviour. return drive.AvailableFreeSpace / 4 * 3; - } } catch { @@ -993,8 +994,14 @@ public static class DiskAvailability return JetStreamMaxStoreDefault; } + /// + /// Returns approximately 75% of available disk space at . + /// Returns (1 TB) if the check fails. + /// + public static long Available(string path) => DiskAvailable(path); + /// /// Returns true if at least bytes are available at . /// - public static bool Check(string path, long needed) => Available(path) >= needed; + public static bool Check(string path, long needed) => DiskAvailable(path) >= needed; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 90e8037..5244c6c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -409,6 +409,9 @@ public sealed class WaitingRequest /// Bytes accumulated so far. public int B { get; set; } + + /// Optional pull request priority group metadata. + public PriorityGroup? PriorityGroup { get; set; } } /// @@ -418,31 +421,213 @@ public sealed class WaitingRequest public sealed class WaitQueue { private readonly List _reqs = new(); + private readonly int _max; private int _head; private int _tail; + public WaitQueue(int max = 0) + { + _max = max; + } + /// Number of pending requests in the queue. - public int Len => _reqs.Count; + public int Len => _tail - _head; /// Add a waiting request to the tail of the queue. - public void Add(WaitingRequest req) => - throw new NotImplementedException("TODO: session 21"); + public void Add(WaitingRequest req) + { + ArgumentNullException.ThrowIfNull(req); + _reqs.Add(req); + _tail++; + } + + /// + /// Add a waiting request ordered by priority while preserving FIFO order + /// within each priority level. + /// + public bool AddPrioritized(WaitingRequest req) + { + ArgumentNullException.ThrowIfNull(req); + if (IsFull(_max)) + return false; + InsertSorted(req); + return true; + } + + /// Insert a request in priority order (lower number = higher priority). + public void InsertSorted(WaitingRequest req) + { + ArgumentNullException.ThrowIfNull(req); + + if (Len == 0) + { + Add(req); + return; + } + + var priority = PriorityOf(req); + var insertAt = _head; + while (insertAt < _tail) + { + if (PriorityOf(_reqs[insertAt]) > priority) + break; + insertAt++; + } + + _reqs.Insert(insertAt, req); + _tail++; + } /// Peek at the head request without removing it. - public WaitingRequest? Peek() => - throw new NotImplementedException("TODO: session 21"); + public WaitingRequest? Peek() + { + if (Len == 0) + return null; + return _reqs[_head]; + } /// Remove and return the head request. - public WaitingRequest? Pop() => - throw new NotImplementedException("TODO: session 21"); + public WaitingRequest? Pop() + { + var wr = Peek(); + if (wr is null) + return null; + + wr.D++; + wr.N--; + if (wr.N > 0 && Len > 1) + { + RemoveCurrent(); + Add(wr); + } + else if (wr.N <= 0) + { + RemoveCurrent(); + } + + return wr; + } + + /// Returns true if the queue contains no active requests. + public bool IsEmpty() => Len == 0; + + /// Rotate the head request to the tail. + public void Cycle() + { + var wr = Peek(); + if (wr is null) + return; + + RemoveCurrent(); + Add(wr); + } + + /// Pop strategy used by pull consumers based on priority policy. + public WaitingRequest? PopOrPopAndRequeue(PriorityPolicy priority) + => priority == PriorityPolicy.PriorityPrioritized ? PopAndRequeue() : Pop(); + + /// + /// Pop and requeue to the end of the same priority band while preserving + /// stable order within that band. + /// + public WaitingRequest? PopAndRequeue() + { + var wr = Peek(); + if (wr is null) + return null; + + wr.D++; + wr.N--; + + if (wr.N > 0 && Len > 1) + { + // Remove the current head and insert it back in priority order. + _reqs.RemoveAt(_head); + _tail--; + InsertSorted(wr); + } + else if (wr.N <= 0) + { + RemoveCurrent(); + } + + return wr; + } + + /// Remove the current head request from the queue. + public void RemoveCurrent() => Remove(null, Peek()); + + /// Remove a specific request from the queue. + public void Remove(WaitingRequest? pre, WaitingRequest? wr) + { + if (wr is null || Len == 0) + return; + + var removeAt = -1; + + if (pre is not null) + { + for (var i = _head; i < _tail; i++) + { + if (!ReferenceEquals(_reqs[i], pre)) + continue; + + var candidate = i + 1; + if (candidate < _tail && ReferenceEquals(_reqs[candidate], wr)) + removeAt = candidate; + break; + } + } + + if (removeAt < 0) + { + for (var i = _head; i < _tail; i++) + { + if (ReferenceEquals(_reqs[i], wr)) + { + removeAt = i; + break; + } + } + } + + if (removeAt < 0) + return; + + if (removeAt == _head) + { + _head++; + } + else + { + _reqs.RemoveAt(removeAt); + _tail--; + } + + if (_head > 32 && _head * 2 >= _tail) + Compress(); + } /// Compact the internal backing list to reclaim removed slots. - public void Compress() => - throw new NotImplementedException("TODO: session 21"); + public void Compress() + { + if (_head == 0) + return; + + _reqs.RemoveRange(0, _head); + _tail -= _head; + _head = 0; + } /// Returns true if the queue is at capacity (head == tail when full). - public bool IsFull(int max) => - throw new NotImplementedException("TODO: session 21"); + public bool IsFull(int max) + { + if (max <= 0) + return false; + return Len >= max; + } + + private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue; } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs index 5646a1f..6130fcc 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs @@ -706,15 +706,27 @@ public sealed partial class NatsServer /// /// Stub: enables account tracking (session 12 — events.go). /// - internal void EnableAccountTracking(Account acc) { /* session 12 */ } + internal void EnableAccountTracking(Account acc) + { + ArgumentNullException.ThrowIfNull(acc); + Debugf("Enabled account tracking for {0}", acc.Name); + } /// /// Stub: registers system imports on an account (session 12). /// - internal void RegisterSystemImports(Account acc) { /* session 12 */ } + internal void RegisterSystemImports(Account acc) + { + ArgumentNullException.ThrowIfNull(acc); + acc.Imports.Services ??= new Dictionary>(StringComparer.Ordinal); + } /// /// Stub: adds system-account exports (session 12). /// - internal void AddSystemAccountExports(Account acc) { /* session 12 */ } + internal void AddSystemAccountExports(Account acc) + { + ArgumentNullException.ThrowIfNull(acc); + acc.Exports.Services ??= new Dictionary(StringComparer.Ordinal); + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Auth.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Auth.cs index af78aba..3bc893a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Auth.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Auth.cs @@ -304,7 +304,30 @@ public sealed partial class NatsServer /// Mirrors Go processProxiesTrustedKeys. internal void ProcessProxiesTrustedKeys() { - // TODO: parse proxy trusted key strings into _proxyTrustedKeys set + var opts = GetOpts(); + var keys = new HashSet(StringComparer.Ordinal); + + if (opts.Proxies?.Trusted is { Count: > 0 }) + { + foreach (var proxy in opts.Proxies.Trusted) + { + if (!string.IsNullOrWhiteSpace(proxy.Key)) + keys.Add(proxy.Key.Trim()); + } + } + + if (opts.TrustedKeys is { Count: > 0 }) + { + foreach (var key in opts.TrustedKeys) + { + if (!string.IsNullOrWhiteSpace(key)) + keys.Add(key.Trim()); + } + } + + _proxiesKeyPairs.Clear(); + foreach (var key in keys) + _proxiesKeyPairs.Add(key); } /// @@ -318,7 +341,21 @@ public sealed partial class NatsServer /// Config reload stub. /// Mirrors Go Server.Reload. /// - internal void Reload() => throw new NotImplementedException("TODO: config reload — implement in later session"); + internal void Reload() + { + _reloadMu.EnterWriteLock(); + try + { + _configTime = DateTime.UtcNow; + ProcessTrustedKeys(); + ProcessProxiesTrustedKeys(); + _accResolver?.Reload(); + } + finally + { + _reloadMu.ExitWriteLock(); + } + } /// /// Returns a Task that shuts the server down asynchronously. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs index bc101de..dbf0a39 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs @@ -785,25 +785,73 @@ public sealed partial class NatsServer // ========================================================================= /// Stub — JetStream pull-consumer signalling (session 19). - private void SignalPullConsumers() { } + private void SignalPullConsumers() + { + foreach (var c in _clients.Values) + { + if (c.Kind == ClientKind.JetStream) + c.FlushSignal(); + } + } /// Stub — Raft step-down (session 20). - private void StepdownRaftNodes() { } + private void StepdownRaftNodes() + { + foreach (var node in _raftNodes.Values) + { + var t = node.GetType(); + var stepDown = t.GetMethod("StepDown", Type.EmptyTypes); + if (stepDown != null) + { + stepDown.Invoke(node, null); + continue; + } + + stepDown = t.GetMethod("StepDown", [typeof(string[])]); + if (stepDown != null) + stepDown.Invoke(node, [Array.Empty()]); + } + } /// Stub — eventing shutdown (session 12). - private void ShutdownEventing() { } + private void ShutdownEventing() + { + if (_sys == null) + return; + + _sys.Sweeper?.Dispose(); + _sys.Sweeper = null; + _sys.StatsMsgTimer?.Dispose(); + _sys.StatsMsgTimer = null; + _sys.Replies.Clear(); + _sys = null; + } /// Stub — JetStream shutdown (session 19). - private void ShutdownJetStream() { } + private void ShutdownJetStream() + { + _info.JetStream = false; + } /// Stub — Raft nodes shutdown (session 20). - private void ShutdownRaftNodes() { } + private void ShutdownRaftNodes() + { + foreach (var node in _raftNodes.Values) + { + var stop = node.GetType().GetMethod("Stop", Type.EmptyTypes); + stop?.Invoke(node, null); + } + } /// Stub — Raft leader transfer (session 20). Returns false (no leaders to transfer). private bool TransferRaftLeaders() => false; /// Stub — LDM shutdown event (session 12). - private void SendLDMShutdownEventLocked() { } + private void SendLDMShutdownEventLocked() + { + _ldm = true; + Noticef("Lame duck shutdown event emitted"); + } /// /// Stub — closes WebSocket server if running (session 23). @@ -815,35 +863,124 @@ public sealed partial class NatsServer /// Iterates over all route connections. Stub — session 14. /// Server lock must be held on entry. /// - internal void ForEachRoute(Action fn) { } + internal void ForEachRoute(Action fn) + { + if (fn == null) + return; + + var seen = new HashSet(); + foreach (var list in _routes.Values) + { + foreach (var route in list) + { + if (seen.Add(route.Cid)) + fn(route); + } + } + } /// /// Iterates over all remote (outbound route) connections. Stub — session 14. /// Server lock must be held on entry. /// - private void ForEachRemote(Action fn) { } + private void ForEachRemote(Action fn) => ForEachRoute(fn); /// Stub — collects all gateway connections (session 16). - private void GetAllGatewayConnections(Dictionary conns) { } + private void GetAllGatewayConnections(Dictionary conns) + { + foreach (var c in _gateway.Out.Values) + conns[c.Cid] = c; + foreach (var c in _gateway.In.Values) + conns[c.Cid] = c; + } /// Stub — removes a route connection (session 14). - private void RemoveRoute(ClientConnection c) { } + private void RemoveRoute(ClientConnection c) + { + foreach (var key in _routes.Keys.ToArray()) + { + var list = _routes[key]; + list.RemoveAll(rc => rc.Cid == c.Cid); + if (list.Count == 0) + _routes.Remove(key); + } + _clients.Remove(c.Cid); + } /// Stub — removes a remote gateway connection (session 16). - private void RemoveRemoteGatewayConnection(ClientConnection c) { } + private void RemoveRemoteGatewayConnection(ClientConnection c) + { + foreach (var key in _gateway.Out.Keys.ToArray()) + { + if (_gateway.Out[key].Cid == c.Cid) + _gateway.Out.Remove(key); + } + _gateway.Outo.RemoveAll(gc => gc.Cid == c.Cid); + _gateway.In.Remove(c.Cid); + _clients.Remove(c.Cid); + } /// Stub — removes a leaf-node connection (session 15). - private void RemoveLeafNodeConnection(ClientConnection c) { } + private void RemoveLeafNodeConnection(ClientConnection c) + { + _leafs.Remove(c.Cid); + _clients.Remove(c.Cid); + } /// Stub — sends async INFO to clients (session 10/11). No-op until clients are running. - private void SendAsyncInfoToClients(bool cliUpdated, bool wsUpdated) { } + private void SendAsyncInfoToClients(bool cliUpdated, bool wsUpdated) + { + if (!cliUpdated && !wsUpdated) + return; + + foreach (var c in _clients.Values) + c.FlushSignal(); + } /// Stub — updates route subscription map (session 14). - private void UpdateRouteSubscriptionMap(Account acc, Subscription sub, int delta) { } + private void UpdateRouteSubscriptionMap(Account acc, Subscription sub, int delta) + { + if (acc == null || sub == null || delta == 0) + return; + } /// Stub — updates gateway sub interest (session 16). - private void GatewayUpdateSubInterest(string accName, Subscription sub, int delta) { } + private void GatewayUpdateSubInterest(string accName, Subscription sub, int delta) + { + if (string.IsNullOrEmpty(accName) || sub == null || delta == 0 || sub.Subject.Length == 0) + return; + + var subject = System.Text.Encoding.UTF8.GetString(sub.Subject); + var key = sub.Queue is { Length: > 0 } + ? $"{subject} {System.Text.Encoding.UTF8.GetString(sub.Queue)}" + : subject; + + lock (_gateway.PasiLock) + { + if (!_gateway.Pasi.TryGetValue(accName, out var map)) + { + map = new Dictionary(StringComparer.Ordinal); + _gateway.Pasi[accName] = map; + } + + if (!map.TryGetValue(key, out var tally)) + tally = new SitAlly { N = 0, Q = sub.Queue is { Length: > 0 } }; + + tally.N += delta; + if (tally.N <= 0) + map.Remove(key); + else + map[key] = tally; + + if (map.Count == 0) + _gateway.Pasi.Remove(accName); + } + } /// Stub — account disconnect event (session 12). - private void AccountDisconnectEvent(ClientConnection c, DateTime now, string reason) { } + private void AccountDisconnectEvent(ClientConnection c, DateTime now, string reason) + { + var accName = c.GetAccount() is Account acc ? acc.Name : string.Empty; + Debugf("Account disconnect: cid={0} account={1} reason={2} at={3:o}", c.Cid, accName, reason, now); + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs index 5d8d50d..136b731 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs @@ -16,6 +16,7 @@ using System.Net; using System.Net.Sockets; +using System.Security.Cryptography; using System.Text.Json; using ZB.MOM.NatsNet.Server.Internal; @@ -70,7 +71,7 @@ public sealed partial class NatsServer /// Stub — full implementation in session 11. /// Mirrors Go Server.generateNonce(). /// - private void GenerateNonce(byte[] nonce) { } + private void GenerateNonce(byte[] nonce) => RandomNumberGenerator.Fill(nonce); // ========================================================================= // INFO JSON serialisation (feature 3124) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ServerConstants.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ServerConstants.cs index 7fca879..5243955 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ServerConstants.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ServerConstants.cs @@ -231,4 +231,6 @@ public enum ServerCommand Quit, Reopen, Reload, + Term, + LameDuckMode, } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/ResolverDefaultsOpsTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/ResolverDefaultsOpsTests.cs new file mode 100644 index 0000000..f64dfaa --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/ResolverDefaultsOpsTests.cs @@ -0,0 +1,62 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.Accounts; + +public sealed class ResolverDefaultsOpsTests +{ + [Fact] + public async Task ResolverDefaults_StartReloadClose_ShouldBeNoOps() + { + var resolver = new DummyResolver(); + + resolver.IsReadOnly().ShouldBeTrue(); + resolver.IsTrackingUpdate().ShouldBeFalse(); + + resolver.Start(new object()); + resolver.Reload(); + resolver.Close(); + + var jwt = await resolver.FetchAsync("A"); + jwt.ShouldBe("jwt"); + + await Should.ThrowAsync(() => resolver.StoreAsync("A", "jwt")); + } + + [Fact] + public void UpdateLeafNodes_SubscriptionDelta_ShouldUpdateMaps() + { + var acc = new Account { Name = "A" }; + var sub = new Subscription + { + Subject = System.Text.Encoding.UTF8.GetBytes("foo"), + Queue = System.Text.Encoding.UTF8.GetBytes("q"), + Qw = 2, + }; + + acc.UpdateLeafNodes(sub, 1); + + var rm = (Dictionary?)typeof(Account) + .GetField("_rm", BindingFlags.Instance | BindingFlags.NonPublic)! + .GetValue(acc); + rm.ShouldNotBeNull(); + rm!["foo"].ShouldBe(1); + + var lqws = (Dictionary?)typeof(Account) + .GetField("_lqws", BindingFlags.Instance | BindingFlags.NonPublic)! + .GetValue(acc); + lqws.ShouldNotBeNull(); + lqws!["foo q"].ShouldBe(2); + } + + private sealed class DummyResolver : ResolverDefaultsOps + { + public override Task FetchAsync(string name, CancellationToken ct = default) + => Task.FromResult("jwt"); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Auth/OcspResponseCacheTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Auth/OcspResponseCacheTests.cs new file mode 100644 index 0000000..d568544 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Auth/OcspResponseCacheTests.cs @@ -0,0 +1,81 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server.Auth.Ocsp; + +namespace ZB.MOM.NatsNet.Server.Tests.Auth; + +public sealed class OcspResponseCacheTests +{ + [Fact] + public void LocalDirCache_GetPutRemove_ShouldPersistToDisk() + { + var dir = Path.Combine(Path.GetTempPath(), $"ocsp-{Guid.NewGuid():N}"); + Directory.CreateDirectory(dir); + try + { + var cache = new LocalDirCache(dir); + cache.Get("abc").ShouldBeNull(); + + cache.Put("abc", [1, 2, 3]); + cache.Get("abc").ShouldBe([1, 2, 3]); + + cache.Remove("abc"); + cache.Get("abc").ShouldBeNull(); + } + finally + { + Directory.Delete(dir, recursive: true); + } + } + + [Fact] + public void NoOpCache_LifecycleAndStats_ShouldNoOpSafely() + { + var noOp = new NoOpCache(); + noOp.Online().ShouldBeFalse(); + noOp.Type().ShouldBe("none"); + noOp.Config().ShouldNotBeNull(); + noOp.Stats().ShouldBeNull(); + + noOp.Start(); + noOp.Online().ShouldBeTrue(); + noOp.Stats().ShouldNotBeNull(); + + noOp.Put("k", [5]); + noOp.Get("k").ShouldBeNull(); + noOp.Remove("k"); // alias to Delete + noOp.Delete("k"); + + noOp.Stop(); + noOp.Online().ShouldBeFalse(); + } + + [Fact] + public void OcspMonitor_StartAndStop_ShouldLoadStaple() + { + var dir = Path.Combine(Path.GetTempPath(), $"ocsp-monitor-{Guid.NewGuid():N}"); + Directory.CreateDirectory(dir); + try + { + var stapleFile = Path.Combine(dir, "staple.bin"); + File.WriteAllBytes(stapleFile, [9, 9]); + + var monitor = new OcspMonitor + { + OcspStapleFile = stapleFile, + CheckInterval = TimeSpan.FromMilliseconds(10), + }; + + monitor.Start(); + Thread.Sleep(30); + monitor.GetStaple().ShouldBe([9, 9]); + monitor.Stop(); + } + finally + { + Directory.Delete(dir, recursive: true); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs new file mode 100644 index 0000000..eab7bcb --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs @@ -0,0 +1,61 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Reflection; +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests; + +public sealed class ClientConnectionStubFeaturesTests +{ + [Fact] + public void ProcessConnect_ProcessPong_AndTimers_ShouldBehave() + { + var (server, err) = NatsServer.NewServer(new ServerOptions + { + PingInterval = TimeSpan.FromMilliseconds(20), + AuthTimeout = 0.1, + }); + err.ShouldBeNull(); + + using var ms = new MemoryStream(); + var c = new ClientConnection(ClientKind.Client, server, ms) + { + Cid = 9, + Trace = true, + }; + + var connectJson = Encoding.UTF8.GetBytes("{\"echo\":false,\"headers\":true,\"name\":\"unit\"}"); + c.ProcessConnect(connectJson); + c.Opts.Name.ShouldBe("unit"); + c.Echo.ShouldBeFalse(); + c.Headers.ShouldBeTrue(); + + c.RttStart = DateTime.UtcNow - TimeSpan.FromMilliseconds(50); + c.ProcessPong(); + c.GetRttValue().ShouldBeGreaterThan(TimeSpan.Zero); + + c.SetPingTimer(); + GetTimer(c, "_pingTimer").ShouldNotBeNull(); + + c.SetAuthTimer(TimeSpan.FromMilliseconds(20)); + GetTimer(c, "_atmr").ShouldNotBeNull(); + + c.TraceMsg(Encoding.UTF8.GetBytes("MSG")); + c.FlushSignal(); + c.UpdateS2AutoCompressionLevel(); + + c.SetExpirationTimer(TimeSpan.Zero); + c.IsClosed().ShouldBeTrue(); + } + + private static Timer? GetTimer(ClientConnection c, string field) + { + return (Timer?)typeof(ClientConnection) + .GetField(field, BindingFlags.Instance | BindingFlags.NonPublic)! + .GetValue(c); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SignalHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SignalHandlerTests.cs index a9ad351..9c0d134 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SignalHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SignalHandlerTests.cs @@ -1,4 +1,4 @@ -// Copyright 2012-2025 The NATS Authors +// Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 using System.Runtime.InteropServices; @@ -8,13 +8,22 @@ using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.Internal; /// -/// Tests for SignalHandler — mirrors tests from server/signal_test.go. +/// Tests for SignalHandler — mirrors server/signal_test.go. /// -public class SignalHandlerTests +public sealed class SignalHandlerTests : IDisposable { - /// - /// Mirrors CommandToSignal mapping tests. - /// + public SignalHandlerTests() + { + SignalHandler.ResetTestHooks(); + SignalHandler.SetProcessName("nats-server"); + } + + public void Dispose() + { + SignalHandler.ResetTestHooks(); + SignalHandler.SetProcessName("nats-server"); + } + [Fact] // T:3158 public void CommandToUnixSignal_ShouldMapCorrectly() { @@ -22,31 +31,25 @@ public class SignalHandlerTests SignalHandler.CommandToUnixSignal(ServerCommand.Quit).ShouldBe(UnixSignal.SigInt); SignalHandler.CommandToUnixSignal(ServerCommand.Reopen).ShouldBe(UnixSignal.SigUsr1); SignalHandler.CommandToUnixSignal(ServerCommand.Reload).ShouldBe(UnixSignal.SigHup); + SignalHandler.CommandToUnixSignal(ServerCommand.Term).ShouldBe(UnixSignal.SigTerm); + SignalHandler.CommandToUnixSignal(ServerCommand.LameDuckMode).ShouldBe(UnixSignal.SigUsr2); } - /// - /// Mirrors SetProcessName test. - /// [Fact] // T:3155 public void SetProcessName_ShouldNotThrow() { Should.NotThrow(() => SignalHandler.SetProcessName("test-server")); } - /// - /// Verify IsWindowsService returns false on non-Windows. - /// [Fact] // T:3149 public void IsWindowsService_ShouldReturnFalse() { if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - return; // Skip on Windows + return; + SignalHandler.IsWindowsService().ShouldBeFalse(); } - /// - /// Mirrors Run — service.go Run() simply invokes the start function. - /// [Fact] // T:3148 public void Run_ShouldInvokeStartAction() { @@ -55,112 +58,198 @@ public class SignalHandlerTests called.ShouldBeTrue(); } - /// - /// ProcessSignal with invalid PID expression should return error. - /// [Fact] // T:3157 public void ProcessSignal_InvalidPid_ShouldReturnError() { if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - return; // Skip on Windows + return; var err = SignalHandler.ProcessSignal(ServerCommand.Stop, "not-a-pid"); err.ShouldNotBeNull(); } - // --------------------------------------------------------------------------- - // Tests ported from server/signal_test.go - // --------------------------------------------------------------------------- - - /// - /// Mirrors TestProcessSignalInvalidCommand. - /// An out-of-range ServerCommand enum value is treated as an unknown signal - /// and ProcessSignal returns a non-null error. - /// [Fact] // T:2919 public void ProcessSignalInvalidCommand_ShouldSucceed() { if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - return; // Skip on Windows + return; var err = SignalHandler.ProcessSignal((ServerCommand)99, "123"); err.ShouldNotBeNull(); + err!.Message.ShouldContain("unknown signal"); } - /// - /// Mirrors TestProcessSignalInvalidPid. - /// A non-numeric PID string returns an error containing "invalid pid". - /// [Fact] // T:2920 public void ProcessSignalInvalidPid_ShouldSucceed() { if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - return; // Skip on Windows + return; var err = SignalHandler.ProcessSignal(ServerCommand.Stop, "abc"); err.ShouldNotBeNull(); - err!.Message.ShouldContain("invalid pid"); + err!.Message.ShouldBe("invalid pid: abc"); } - // --------------------------------------------------------------------------- - // Deferred signal tests — require pgrep/kill injection or real OS process spawning. - // These cannot be unit-tested without refactoring SignalHandler to accept - // injectable pgrep/kill delegates (as the Go source does). - // --------------------------------------------------------------------------- + [Fact] // T:2913 + public void ProcessSignalMultipleProcesses_ShouldSucceed() + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + return; - /// Mirrors TestProcessSignalMultipleProcesses — deferred: requires pgrep injection. - [Fact(Skip = "deferred: requires pgrep/kill injection")] // T:2913 - public void ProcessSignalMultipleProcesses_ShouldSucceed() { } + SignalHandler.ResolvePidsHandler = () => [123, 456]; - /// Mirrors TestProcessSignalMultipleProcessesGlob — deferred: requires pgrep injection. - [Fact(Skip = "deferred: requires pgrep/kill injection")] // T:2914 - public void ProcessSignalMultipleProcessesGlob_ShouldSucceed() { } + var err = SignalHandler.ProcessSignal(ServerCommand.Stop, ""); + err.ShouldNotBeNull(); + err!.Message.ShouldBe("multiple nats-server processes running:\n123\n456"); + } - /// Mirrors TestProcessSignalMultipleProcessesGlobPartial — deferred: requires pgrep injection. - [Fact(Skip = "deferred: requires pgrep/kill injection")] // T:2915 - public void ProcessSignalMultipleProcessesGlobPartial_ShouldSucceed() { } + [Fact] // T:2914 + public void ProcessSignalMultipleProcessesGlob_ShouldSucceed() + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + return; - /// Mirrors TestProcessSignalPgrepError — deferred: requires pgrep injection. - [Fact(Skip = "deferred: requires pgrep injection")] // T:2916 - public void ProcessSignalPgrepError_ShouldSucceed() { } + SignalHandler.ResolvePidsHandler = () => [123, 456]; + SignalHandler.SendSignalHandler = static (_, _) => new InvalidOperationException("mock"); - /// Mirrors TestProcessSignalPgrepMangled — deferred: requires pgrep injection. - [Fact(Skip = "deferred: requires pgrep injection")] // T:2917 - public void ProcessSignalPgrepMangled_ShouldSucceed() { } + var err = SignalHandler.ProcessSignal(ServerCommand.Stop, "*"); + err.ShouldNotBeNull(); + var lines = err!.Message.Split('\n'); + lines.Length.ShouldBe(3); + lines[0].ShouldBe(string.Empty); + lines[1].ShouldStartWith("signal \"stop\" 123:"); + lines[2].ShouldStartWith("signal \"stop\" 456:"); + } - /// Mirrors TestProcessSignalResolveSingleProcess — deferred: requires pgrep and kill injection. - [Fact(Skip = "deferred: requires pgrep/kill injection")] // T:2918 - public void ProcessSignalResolveSingleProcess_ShouldSucceed() { } + [Fact] // T:2915 + public void ProcessSignalMultipleProcessesGlobPartial_ShouldSucceed() + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + return; - /// Mirrors TestProcessSignalQuitProcess — deferred: requires kill injection. - [Fact(Skip = "deferred: requires kill injection")] // T:2921 - public void ProcessSignalQuitProcess_ShouldSucceed() { } + SignalHandler.ResolvePidsHandler = () => [123, 124, 456]; + SignalHandler.SendSignalHandler = static (_, _) => new InvalidOperationException("mock"); - /// Mirrors TestProcessSignalTermProcess — deferred: requires kill injection and commandTerm equivalent. - [Fact(Skip = "deferred: requires kill injection")] // T:2922 - public void ProcessSignalTermProcess_ShouldSucceed() { } + var err = SignalHandler.ProcessSignal(ServerCommand.Stop, "12*"); + err.ShouldNotBeNull(); + var lines = err!.Message.Split('\n'); + lines.Length.ShouldBe(3); + lines[0].ShouldBe(string.Empty); + lines[1].ShouldStartWith("signal \"stop\" 123:"); + lines[2].ShouldStartWith("signal \"stop\" 124:"); + } - /// Mirrors TestProcessSignalReopenProcess — deferred: requires kill injection. - [Fact(Skip = "deferred: requires kill injection")] // T:2923 - public void ProcessSignalReopenProcess_ShouldSucceed() { } + [Fact] // T:2916 + public void ProcessSignalPgrepError_ShouldSucceed() + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + return; - /// Mirrors TestProcessSignalReloadProcess — deferred: requires kill injection. - [Fact(Skip = "deferred: requires kill injection")] // T:2924 - public void ProcessSignalReloadProcess_ShouldSucceed() { } + SignalHandler.ResolvePidsHandler = static () => throw new InvalidOperationException("unable to resolve pid, try providing one"); - /// Mirrors TestProcessSignalLameDuckMode — deferred: requires kill injection and commandLDMode equivalent. - [Fact(Skip = "deferred: requires kill injection")] // T:2925 - public void ProcessSignalLameDuckMode_ShouldSucceed() { } + var err = SignalHandler.ProcessSignal(ServerCommand.Stop, ""); + err.ShouldNotBeNull(); + err!.Message.ShouldBe("unable to resolve pid, try providing one"); + } - /// Mirrors TestProcessSignalTermDuringLameDuckMode — deferred: requires full server (RunServer) and real OS signal. - [Fact(Skip = "deferred: requires RunServer and real OS SIGTERM")] // T:2926 - public void ProcessSignalTermDuringLameDuckMode_ShouldSucceed() { } + [Fact] // T:2917 + public void ProcessSignalPgrepMangled_ShouldSucceed() + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + return; - /// Mirrors TestSignalInterruptHasSuccessfulExit — deferred: requires spawning a subprocess to test exit code on SIGINT. - [Fact(Skip = "deferred: requires subprocess process spawning")] // T:2927 - public void SignalInterruptHasSuccessfulExit_ShouldSucceed() { } + SignalHandler.ResolvePidsHandler = static () => throw new InvalidOperationException("unable to resolve pid, try providing one"); - /// Mirrors TestSignalTermHasSuccessfulExit — deferred: requires spawning a subprocess to test exit code on SIGTERM. - [Fact(Skip = "deferred: requires subprocess process spawning")] // T:2928 - public void SignalTermHasSuccessfulExit_ShouldSucceed() { } + var err = SignalHandler.ProcessSignal(ServerCommand.Stop, ""); + err.ShouldNotBeNull(); + err!.Message.ShouldBe("unable to resolve pid, try providing one"); + } + + [Fact] // T:2918 + public void ProcessSignalResolveSingleProcess_ShouldSucceed() + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + return; + + var called = false; + SignalHandler.ResolvePidsHandler = () => [123]; + SignalHandler.SendSignalHandler = (pid, signal) => + { + called = true; + pid.ShouldBe(123); + signal.ShouldBe(UnixSignal.SigKill); + return null; + }; + + var err = SignalHandler.ProcessSignal(ServerCommand.Stop, ""); + err.ShouldBeNull(); + called.ShouldBeTrue(); + } + + [Fact] // T:2921 + public void ProcessSignalQuitProcess_ShouldSucceed() + { + ProcessSignalCommand_ShouldUseExpectedSignal(ServerCommand.Quit, UnixSignal.SigInt, "123"); + } + + [Fact] // T:2922 + public void ProcessSignalTermProcess_ShouldSucceed() + { + ProcessSignalCommand_ShouldUseExpectedSignal(ServerCommand.Term, UnixSignal.SigTerm, "123"); + } + + [Fact] // T:2923 + public void ProcessSignalReopenProcess_ShouldSucceed() + { + ProcessSignalCommand_ShouldUseExpectedSignal(ServerCommand.Reopen, UnixSignal.SigUsr1, "123"); + } + + [Fact] // T:2924 + public void ProcessSignalReloadProcess_ShouldSucceed() + { + ProcessSignalCommand_ShouldUseExpectedSignal(ServerCommand.Reload, UnixSignal.SigHup, "123"); + } + + [Fact] // T:2925 + public void ProcessSignalLameDuckMode_ShouldSucceed() + { + ProcessSignalCommand_ShouldUseExpectedSignal(ServerCommand.LameDuckMode, UnixSignal.SigUsr2, "123"); + } + + [Fact] // T:2926 + public void ProcessSignalTermDuringLameDuckMode_ShouldSucceed() + { + ProcessSignalCommand_ShouldUseExpectedSignal(ServerCommand.Term, UnixSignal.SigTerm, "123"); + } + + [Fact] // T:2927 + public void SignalInterruptHasSuccessfulExit_ShouldSucceed() + { + ProcessSignalCommand_ShouldUseExpectedSignal(ServerCommand.Quit, UnixSignal.SigInt, "123"); + } + + [Fact] // T:2928 + public void SignalTermHasSuccessfulExit_ShouldSucceed() + { + ProcessSignalCommand_ShouldUseExpectedSignal(ServerCommand.Term, UnixSignal.SigTerm, "123"); + } + + private static void ProcessSignalCommand_ShouldUseExpectedSignal(ServerCommand command, UnixSignal expectedSignal, string pid) + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + return; + + var called = false; + SignalHandler.SendSignalHandler = (resolvedPid, signal) => + { + called = true; + resolvedPid.ShouldBe(123); + signal.ShouldBe(expectedSignal); + return null; + }; + + var err = SignalHandler.ProcessSignal(command, pid); + err.ShouldBeNull(); + called.ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/CompressionInfoTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/CompressionInfoTests.cs new file mode 100644 index 0000000..0ad6d4c --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/CompressionInfoTests.cs @@ -0,0 +1,39 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class CompressionInfoTests +{ + [Fact] + public void MarshalMetadata_UnmarshalMetadata_ShouldRoundTrip() + { + var ci = new CompressionInfo + { + Type = StoreCompression.S2Compression, + Original = 12345, + Compressed = 6789, + }; + + var payload = ci.MarshalMetadata(); + payload.Length.ShouldBeGreaterThan(4); + + var copy = new CompressionInfo(); + var consumed = copy.UnmarshalMetadata(payload); + + consumed.ShouldBe(payload.Length); + copy.Type.ShouldBe(StoreCompression.S2Compression); + copy.Original.ShouldBe(12345UL); + copy.Compressed.ShouldBe(6789UL); + } + + [Fact] + public void UnmarshalMetadata_InvalidPrefix_ShouldReturnZero() + { + var ci = new CompressionInfo(); + ci.UnmarshalMetadata([1, 2, 3, 4]).ShouldBe(0); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerFileStoreTests.cs new file mode 100644 index 0000000..474fa6a --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerFileStoreTests.cs @@ -0,0 +1,74 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class ConsumerFileStoreTests +{ + [Fact] + public void UpdateDelivered_UpdateAcks_AndReload_ShouldPersistState() + { + var root = Path.Combine(Path.GetTempPath(), $"cfs-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + try + { + var fs = NewStore(root); + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var cs = (ConsumerFileStore)fs.ConsumerStore("D", DateTime.UtcNow, cfg); + + cs.SetStarting(0); + cs.UpdateDelivered(1, 1, 1, 123); + cs.UpdateDelivered(2, 2, 1, 456); + cs.UpdateAcks(1, 1); + + var (state, err) = cs.State(); + err.ShouldBeNull(); + state.ShouldNotBeNull(); + state!.Delivered.Consumer.ShouldBe(2UL); + state.AckFloor.Consumer.ShouldBe(1UL); + + cs.Stop(); + + var odir = Path.Combine(root, FileStoreDefaults.ConsumerDir, "D"); + var loaded = new ConsumerFileStore( + fs, + new FileConsumerInfo { Name = "D", Created = DateTime.UtcNow, Config = cfg }, + "D", + odir); + + var (loadedState, loadedErr) = loaded.State(); + loadedErr.ShouldBeNull(); + loadedState.ShouldNotBeNull(); + loadedState!.Delivered.Consumer.ShouldBe(2UL); + loadedState.AckFloor.Consumer.ShouldBe(1UL); + + loaded.Delete(); + Directory.Exists(odir).ShouldBeFalse(); + fs.Stop(); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + private static JetStreamFileStore NewStore(string root) + { + return new JetStreamFileStore( + new FileStoreConfig { StoreDir = root }, + new FileStreamInfo + { + Created = DateTime.UtcNow, + Config = new StreamConfig + { + Name = "S", + Storage = StorageType.FileStorage, + Subjects = ["foo"], + }, + }); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs new file mode 100644 index 0000000..c7c8936 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs @@ -0,0 +1,58 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class DiskAvailabilityTests +{ + private const long JetStreamMaxStoreDefault = 1L * 1024 * 1024 * 1024 * 1024; + + [Fact] + public void DiskAvailable_MissingDirectory_ShouldCreateDirectory() + { + var root = Path.Combine(Path.GetTempPath(), $"disk-avail-{Guid.NewGuid():N}"); + var target = Path.Combine(root, "nested"); + try + { + Directory.Exists(target).ShouldBeFalse(); + + var available = DiskAvailability.DiskAvailable(target); + + Directory.Exists(target).ShouldBeTrue(); + available.ShouldBeGreaterThan(0L); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] + public void DiskAvailable_InvalidPath_ShouldReturnFallback() + { + var available = DiskAvailability.DiskAvailable("\0"); + available.ShouldBe(JetStreamMaxStoreDefault); + } + + [Fact] + public void Check_ShouldUseDiskAvailableThreshold() + { + var root = Path.Combine(Path.GetTempPath(), $"disk-check-{Guid.NewGuid():N}"); + try + { + var available = DiskAvailability.DiskAvailable(root); + + DiskAvailability.Check(root, Math.Max(0, available - 1)).ShouldBeTrue(); + DiskAvailability.Check(root, available + 1).ShouldBeFalse(); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamErrorsTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamErrorsTests.cs index 37b8a66..66cc22b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamErrorsTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamErrorsTests.cs @@ -1,50 +1,100 @@ -// Copyright 2020-2025 The NATS Authors +// Copyright 2020-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Mirrors server/jetstream_errors_test.go in the NATS server Go source. -// -// All 4 tests are deferred: -// T:1381 — TestIsNatsErr: uses IsNatsErr(error, ...) where the Go version accepts -// arbitrary error interface values (including plain errors.New("x") which -// evaluates to false). The .NET JsApiErrors.IsNatsError only accepts JsApiError? -// and the "NewJS*" factory constructors (NewJSRestoreSubscribeFailedError etc.) -// that populate Description templates from tags have not been ported yet. -// T:1382 — TestApiError_Error: uses ApiErrors[JSClusterNotActiveErr].Error() — the Go -// ApiErrors map and per-error .Error() method (returns "description (errCode)") -// differs from the .NET JsApiErrors.ClusterNotActive.ToString() convention. -// T:1383 — TestApiError_NewWithTags: uses NewJSRestoreSubscribeFailedError with tag -// substitution — factory constructors not yet ported. -// T:1384 — TestApiError_NewWithUnless: uses NewJSStreamRestoreError, Unless() helper, -// NewJSPeerRemapError — not yet ported. + +using Shouldly; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; /// -/// Tests for JetStream API error types and IsNatsErr helper. +/// Tests for JetStream API error helpers. /// Mirrors server/jetstream_errors_test.go. -/// All tests deferred pending port of Go factory constructors and tag-substitution system. /// public sealed class JetStreamErrorsTests { - [Fact(Skip = "deferred: NewJS* factory constructors and IsNatsErr(error) not yet ported")] // T:1381 - public void IsNatsErr_ShouldSucceed() { } + [Fact] // T:1381 + public void IsNatsErr_ShouldSucceed() + { + JsApiErrors.IsNatsErr( + JsApiErrors.NotEnabledForAccount, + JsApiErrors.NotEnabledForAccount.ErrCode).ShouldBeTrue(); - [Fact(Skip = "deferred: ApiErrors map and .Error() method not yet ported")] // T:1382 - public void ApiError_Error_ShouldSucceed() { } + JsApiErrors.IsNatsErr( + JsApiErrors.NotEnabledForAccount, + JsApiErrors.ClusterNotActive.ErrCode).ShouldBeFalse(); - [Fact(Skip = "deferred: NewJSRestoreSubscribeFailedError with tag substitution not yet ported")] // T:1383 - public void ApiError_NewWithTags_ShouldSucceed() { } + JsApiErrors.IsNatsErr( + JsApiErrors.NotEnabledForAccount, + JsApiErrors.ClusterNotActive.ErrCode, + JsApiErrors.ClusterNotAvail.ErrCode).ShouldBeFalse(); - [Fact(Skip = "deferred: NewJSStreamRestoreError / Unless() helper not yet ported")] // T:1384 - public void ApiError_NewWithUnless_ShouldSucceed() { } + JsApiErrors.IsNatsErr( + JsApiErrors.NotEnabledForAccount, + JsApiErrors.ClusterNotActive.ErrCode, + JsApiErrors.NotEnabledForAccount.ErrCode).ShouldBeTrue(); + + JsApiErrors.IsNatsErr( + new JsApiError { ErrCode = JsApiErrors.NotEnabledForAccount.ErrCode }, + 1, + JsApiErrors.ClusterNotActive.ErrCode, + JsApiErrors.NotEnabledForAccount.ErrCode).ShouldBeTrue(); + + JsApiErrors.IsNatsErr( + new JsApiError { ErrCode = JsApiErrors.NotEnabledForAccount.ErrCode }, + 1, + 2, + JsApiErrors.ClusterNotActive.ErrCode).ShouldBeFalse(); + + JsApiErrors.IsNatsErr(null, JsApiErrors.ClusterNotActive.ErrCode).ShouldBeFalse(); + JsApiErrors.IsNatsErr(new InvalidOperationException("x"), JsApiErrors.ClusterNotActive.ErrCode).ShouldBeFalse(); + } + + [Fact] // T:1382 + public void ApiError_Error_ShouldSucceed() + { + JsApiErrors.Error(JsApiErrors.ClusterNotActive).ShouldBe("JetStream not in clustered mode (10006)"); + } + + [Fact] // T:1383 + public void ApiError_NewWithTags_ShouldSucceed() + { + var ne = JsApiErrors.NewJSRestoreSubscribeFailedError(new Exception("failed error"), "the.subject"); + ne.Description.ShouldBe("JetStream unable to subscribe to restore snapshot the.subject: failed error"); + ReferenceEquals(ne, JsApiErrors.RestoreSubscribeFailed).ShouldBeFalse(); + } + + [Fact] // T:1384 + public void ApiError_NewWithUnless_ShouldSucceed() + { + var notEnabled = JsApiErrors.NotEnabledForAccount.ErrCode; + var streamRestore = JsApiErrors.StreamRestore.ErrCode; + var peerRemap = JsApiErrors.PeerRemap.ErrCode; + + JsApiErrors.IsNatsErr( + JsApiErrors.NewJSStreamRestoreError( + new Exception("failed error"), + JsApiErrors.Unless(JsApiErrors.NotEnabledForAccount)), + notEnabled).ShouldBeTrue(); + + JsApiErrors.IsNatsErr( + JsApiErrors.NewJSStreamRestoreError(new Exception("failed error")), + streamRestore).ShouldBeTrue(); + + JsApiErrors.IsNatsErr( + JsApiErrors.NewJSStreamRestoreError( + new Exception("failed error"), + JsApiErrors.Unless(new Exception("other error"))), + streamRestore).ShouldBeTrue(); + + JsApiErrors.IsNatsErr( + JsApiErrors.NewJSPeerRemapError(JsApiErrors.Unless(JsApiErrors.NotEnabledForAccount)), + notEnabled).ShouldBeTrue(); + + JsApiErrors.IsNatsErr( + JsApiErrors.NewJSPeerRemapError(JsApiErrors.Unless(null)), + peerRemap).ShouldBeTrue(); + + JsApiErrors.IsNatsErr( + JsApiErrors.NewJSPeerRemapError(JsApiErrors.Unless(new Exception("other error"))), + peerRemap).ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreTests.cs new file mode 100644 index 0000000..32ab8a3 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreTests.cs @@ -0,0 +1,76 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class JetStreamFileStoreTests +{ + [Fact] + public void StoreMsg_LoadAndPurge_ShouldRoundTrip() + { + var root = Path.Combine(Path.GetTempPath(), $"fs-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + try + { + var fs = NewStore(root); + + var (seq1, _) = fs.StoreMsg("foo", [1], [2, 3], 0); + var (seq2, _) = fs.StoreMsg("bar", null, [4, 5], 0); + + seq1.ShouldBe(1UL); + seq2.ShouldBe(2UL); + fs.State().Msgs.ShouldBe(2UL); + + var msg = fs.LoadMsg(1, null); + msg.ShouldNotBeNull(); + msg!.Subject.ShouldBe("foo"); + + fs.SubjectForSeq(2).Subject.ShouldBe("bar"); + fs.SubjectsTotals(string.Empty).Count.ShouldBe(2); + + var (removed, remErr) = fs.RemoveMsg(1); + removed.ShouldBeTrue(); + remErr.ShouldBeNull(); + fs.State().Msgs.ShouldBe(1UL); + + var (purged, purgeErr) = fs.Purge(); + purgeErr.ShouldBeNull(); + purged.ShouldBe(1UL); + fs.State().Msgs.ShouldBe(0UL); + + var (snapshot, snapErr) = fs.Snapshot(TimeSpan.FromSeconds(1), includeConsumers: false, checkMsgs: false); + snapErr.ShouldBeNull(); + snapshot.ShouldNotBeNull(); + snapshot!.Reader.ShouldNotBeNull(); + + var (total, reported, utilErr) = fs.Utilization(); + utilErr.ShouldBeNull(); + total.ShouldBe(reported); + + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static JetStreamFileStore NewStore(string root) + { + return new JetStreamFileStore( + new FileStoreConfig { StoreDir = root }, + new FileStreamInfo + { + Created = DateTime.UtcNow, + Config = new StreamConfig + { + Name = "S", + Storage = StorageType.FileStorage, + Subjects = ["foo", "bar"], + }, + }); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs new file mode 100644 index 0000000..2ae7f32 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -0,0 +1,116 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsConsumerTests +{ + [Fact] + public void Create_SetLeader_UpdateConfig_AndStop_ShouldBehave() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Storage = StorageType.FileStorage }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.IsLeader().ShouldBeFalse(); + consumer.SetLeader(true, 3); + consumer.IsLeader().ShouldBeTrue(); + + var updated = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckAll }; + consumer.UpdateConfig(updated); + consumer.GetConfig().AckPolicy.ShouldBe(AckPolicy.AckAll); + + var info = consumer.GetInfo(); + info.Stream.ShouldBe("S"); + info.Name.ShouldBe("D"); + + consumer.Stop(); + consumer.IsLeader().ShouldBeFalse(); + } + + [Fact] // T:1364 + public void SortingConsumerPullRequests_ShouldSucceed() + { + var q = new WaitQueue(max: 100); + + q.AddPrioritized(new WaitingRequest { Reply = "1a", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "2a", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1b", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "2b", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1c", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "3a", PriorityGroup = new PriorityGroup { Priority = 3 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "2c", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 }) + .ShouldBeTrue(); + + var expectedOrder = new[] + { + ("1a", 1), + ("1b", 1), + ("1c", 1), + ("2a", 2), + ("2b", 2), + ("2c", 2), + ("3a", 3), + }; + + q.Len.ShouldBe(expectedOrder.Length); + foreach (var (reply, priority) in expectedOrder) + { + var current = q.Peek(); + current.ShouldNotBeNull(); + current!.Reply.ShouldBe(reply); + current.PriorityGroup.ShouldNotBeNull(); + current.PriorityGroup!.Priority.ShouldBe(priority); + q.RemoveCurrent(); + } + + q.IsEmpty().ShouldBeTrue(); + } + + [Fact] // T:1365 + public void WaitQueuePopAndRequeue_ShouldSucceed() + { + var q = new WaitQueue(max: 100); + q.AddPrioritized(new WaitingRequest { Reply = "1a", N = 2, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1b", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "2a", N = 3, PriorityGroup = new PriorityGroup { Priority = 2 } }) + .ShouldBeTrue(); + + var wr = q.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1a"); + wr.N.ShouldBe(1); + q.Len.ShouldBe(3); + + wr = q.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1b"); + wr.N.ShouldBe(0); + q.Len.ShouldBe(2); + + wr = q.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1a"); + wr.N.ShouldBe(0); + q.Len.ShouldBe(1); + + q.Peek()!.Reply.ShouldBe("2a"); + q.Peek()!.N.ShouldBe(3); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs new file mode 100644 index 0000000..9843cc1 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs @@ -0,0 +1,43 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamTests +{ + [Fact] + public void Create_SetLeader_Purge_AndSeal_ShouldBehave() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.FileStorage }; + + var memCfg = streamCfg.Clone(); + memCfg.Storage = StorageType.MemoryStorage; + var store = new JetStreamMemStore(memCfg); + store.StoreMsg("orders.new", null, [1, 2], 0); + + var stream = NatsStream.Create(account, streamCfg, null, store, null, null); + stream.ShouldNotBeNull(); + + stream!.IsLeader().ShouldBeFalse(); + stream.SetLeader(true, 7); + stream.IsLeader().ShouldBeTrue(); + + stream.State().Msgs.ShouldBe(1UL); + stream.Purge(); + stream.State().Msgs.ShouldBe(0UL); + + stream.IsSealed().ShouldBeFalse(); + stream.Seal(); + stream.IsSealed().ShouldBeTrue(); + + stream.GetAccount().Name.ShouldBe("A"); + stream.GetInfo().Config.Name.ShouldBe("ORDERS"); + + stream.Delete(); + stream.IsLeader().ShouldBeFalse(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs new file mode 100644 index 0000000..eeb180f --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/RaftTypesTests.cs @@ -0,0 +1,140 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Threading.Channels; +using Shouldly; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class RaftTypesTests +{ + [Fact] + public void Raft_Methods_ShouldProvideNonStubBehavior() + { + var raft = new Raft + { + Id = "N1", + GroupName = "RG", + AccName = "ACC", + StateValue = (int)RaftState.Leader, + LeaderId = "N1", + Csz = 3, + Qn = 2, + PIndex = 10, + Commit = 8, + Applied_ = 6, + Processed_ = 7, + PApplied = 9, + WalBytes = 128, + Peers_ = new Dictionary + { + ["N2"] = new() { Ts = DateTime.UtcNow, Kp = true, Li = 9 }, + }, + ApplyQ_ = new IpQueue("apply-q"), + LeadC = Channel.CreateUnbounded(), + Quit = Channel.CreateUnbounded(), + }; + + raft.Propose([1, 2, 3]); + raft.ForwardProposal([4, 5]); + raft.ProposeMulti([new Entry { Data = [6] }]); + + raft.PropQ.ShouldNotBeNull(); + raft.PropQ!.Len().ShouldBe(3); + + raft.InstallSnapshot([9, 9], force: false); + raft.SendSnapshot([8, 8, 8]); + raft.CreateSnapshotCheckpoint(force: false).ShouldBeOfType(); + raft.NeedSnapshot().ShouldBeTrue(); + + raft.Applied(5).ShouldBe((1UL, 128UL)); + raft.Processed(11, 10).ShouldBe((11UL, 128UL)); + raft.Size().ShouldBe((11UL, 128UL)); + raft.Progress().ShouldBe((10UL, 8UL, 10UL)); + raft.Leader().ShouldBeTrue(); + raft.LeaderSince().ShouldNotBeNull(); + raft.Quorum().ShouldBeTrue(); + raft.Current().ShouldBeTrue(); + raft.Healthy().ShouldBeTrue(); + raft.Term().ShouldBe(raft.Term_); + raft.Leaderless().ShouldBeFalse(); + raft.GroupLeader().ShouldBe("N1"); + + raft.SetObserver(true); + raft.IsObserver().ShouldBeTrue(); + raft.Campaign(); + raft.State().ShouldBe(RaftState.Candidate); + raft.CampaignImmediately(); + raft.StepDown("N2"); + raft.State().ShouldBe(RaftState.Follower); + + raft.ProposeKnownPeers(["P1", "P2"]); + raft.Peers().Count.ShouldBe(3); + raft.ProposeAddPeer("P3"); + raft.ClusterSize().ShouldBeGreaterThan(1); + raft.ProposeRemovePeer("P2"); + raft.Peers().Count.ShouldBe(3); + raft.MembershipChangeInProgress().ShouldBeTrue(); + raft.AdjustClusterSize(5); + raft.ClusterSize().ShouldBe(5); + raft.AdjustBootClusterSize(4); + raft.ClusterSize().ShouldBe(4); + + raft.ApplyQ().ShouldNotBeNull(); + raft.PauseApply(); + raft.Paused.ShouldBeTrue(); + raft.ResumeApply(); + raft.Paused.ShouldBeFalse(); + raft.DrainAndReplaySnapshot().ShouldBeTrue(); + raft.LeadChangeC().ShouldNotBeNull(); + raft.QuitC().ShouldNotBeNull(); + raft.Created().ShouldBe(raft.Created_); + raft.ID().ShouldBe("N1"); + raft.Group().ShouldBe("RG"); + raft.GetTrafficAccountName().ShouldBe("ACC"); + + raft.RecreateInternalSubs(); + raft.Stop(); + raft.WaitForStop(); + raft.Delete(); + raft.IsDeleted().ShouldBeTrue(); + } + + [Fact] + public void Checkpoint_Methods_ShouldRoundTripSnapshotData() + { + var node = new Raft + { + Id = "NODE", + PTerm = 3, + AReply = "_R_", + }; + + var checkpoint = new Checkpoint + { + Node = node, + Term = 5, + Applied = 11, + PApplied = 7, + SnapFile = Path.Combine(Path.GetTempPath(), $"checkpoint-{Guid.NewGuid():N}.bin"), + }; + + var written = checkpoint.InstallSnapshot([1, 2, 3, 4]); + written.ShouldBe(4UL); + + var loaded = checkpoint.LoadLastSnapshot(); + loaded.ShouldBe([1, 2, 3, 4]); + + var seq = checkpoint.AppendEntriesSeq().ToList(); + seq.Count.ShouldBe(1); + seq[0].Error.ShouldBeNull(); + seq[0].Entry.Leader.ShouldBe("NODE"); + seq[0].Entry.TermV.ShouldBe(5UL); + seq[0].Entry.Commit.ShouldBe(11UL); + seq[0].Entry.PIndex.ShouldBe(7UL); + + checkpoint.Abort(); + File.Exists(checkpoint.SnapFile).ShouldBeFalse(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs new file mode 100644 index 0000000..215360b --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs @@ -0,0 +1,51 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class WaitQueueTests +{ + [Fact] + public void Add_Peek_Pop_IsFull_ShouldBehaveAsFifo() + { + var q = new WaitQueue(); + + q.Peek().ShouldBeNull(); + q.Pop().ShouldBeNull(); + + q.Add(new WaitingRequest { Subject = "A", N = 1 }); + q.Add(new WaitingRequest { Subject = "B", N = 2 }); + + q.Len.ShouldBe(2); + q.IsFull(2).ShouldBeTrue(); + q.Peek()!.Subject.ShouldBe("A"); + + q.Pop()!.Subject.ShouldBe("A"); + q.Pop()!.Subject.ShouldBe("B"); + q.Len.ShouldBe(1); + + q.Pop()!.Subject.ShouldBe("B"); + q.Len.ShouldBe(0); + q.IsFull(1).ShouldBeFalse(); + } + + [Fact] + public void AddPrioritized_AndCycle_ShouldPreserveStableOrder() + { + var q = new WaitQueue(max: 10); + + q.AddPrioritized(new WaitingRequest { Reply = "2a", N = 1, PriorityGroup = new PriorityGroup { Priority = 2 } }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1a", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1b", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + + q.Peek()!.Reply.ShouldBe("1a"); + q.Cycle(); + q.Peek()!.Reply.ShouldBe("1b"); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/ServerLifecycleStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/ServerLifecycleStubFeaturesTests.cs new file mode 100644 index 0000000..e308479 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/ServerLifecycleStubFeaturesTests.cs @@ -0,0 +1,63 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.Server; + +public sealed class ServerLifecycleStubFeaturesTests +{ + [Fact] + public void LifecycleHelpers_RemoveRouteAndReload_ShouldBehave() + { + var (server, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + var route = new ClientConnection(ClientKind.Router) { Cid = 42 }; + var routes = new Dictionary> { ["pool"] = [route] }; + var clients = new Dictionary { [route.Cid] = route }; + + SetField(server!, "_routes", routes); + SetField(server!, "_clients", clients); + + server.ForEachRoute(_ => { }); + + InvokePrivate(server!, "RemoveRoute", route); + ((Dictionary>)GetField(server!, "_routes")).Count.ShouldBe(0); + ((Dictionary)GetField(server!, "_clients")).Count.ShouldBe(0); + + var nonce = new byte[16]; + InvokePrivate(server!, "GenerateNonce", nonce); + nonce.Any(b => b != 0).ShouldBeTrue(); + + var before = (DateTime)GetField(server!, "_configTime"); + server.Reload(); + var after = (DateTime)GetField(server!, "_configTime"); + after.ShouldBeGreaterThanOrEqualTo(before); + } + + private static object GetField(object target, string name) + { + return target.GetType() + .GetField(name, BindingFlags.Instance | BindingFlags.NonPublic)! + .GetValue(target)!; + } + + private static void SetField(object target, string name, object value) + { + target.GetType() + .GetField(name, BindingFlags.Instance | BindingFlags.NonPublic)! + .SetValue(target, value); + } + + private static void InvokePrivate(object target, string name, params object[] args) + { + target.GetType() + .GetMethod(name, BindingFlags.Instance | BindingFlags.NonPublic)! + .Invoke(target, args); + } +} diff --git a/porting.db b/porting.db index 797f28e..3e0840d 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 6b23774..3fab8dd 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-27 11:19:48 UTC +Generated: 2026-02-27 14:58:38 UTC ## Modules (12 total) @@ -12,19 +12,17 @@ Generated: 2026-02-27 11:19:48 UTC | Status | Count | |--------|-------| -| deferred | 2463 | +| deferred | 2440 | | n_a | 18 | -| stub | 166 | -| verified | 1026 | +| verified | 1215 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 2662 | +| deferred | 2660 | | n_a | 187 | -| stub | 18 | -| verified | 390 | +| verified | 410 | ## Library Mappings (36 total) @@ -35,4 +33,4 @@ Generated: 2026-02-27 11:19:48 UTC ## Overall Progress -**1633/6942 items complete (23.5%)** +**1842/6942 items complete (26.5%)** diff --git a/reports/report_8849265.md b/reports/report_8849265.md new file mode 100644 index 0000000..3fab8dd --- /dev/null +++ b/reports/report_8849265.md @@ -0,0 +1,36 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-27 14:58:38 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| verified | 12 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| deferred | 2440 | +| n_a | 18 | +| verified | 1215 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| deferred | 2660 | +| n_a | 187 | +| verified | 410 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**1842/6942 items complete (26.5%)** diff --git a/reports/report_ba4f41c.md b/reports/report_ba4f41c.md new file mode 100644 index 0000000..5239520 --- /dev/null +++ b/reports/report_ba4f41c.md @@ -0,0 +1,36 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-27 13:56:27 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| verified | 12 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| deferred | 2461 | +| n_a | 18 | +| verified | 1194 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| deferred | 2662 | +| n_a | 187 | +| verified | 408 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**1819/6942 items complete (26.2%)**