From 5367c3f34d8f10c4f48f9f3ce65c801f0eb6710c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 16:41:31 -0500 Subject: [PATCH] feat(batch14): complete filestore write lifecycle features and tests --- .../JetStream/FileStore.cs | 848 +++++++++++++++++- .../ConcurrencyTests2.Impltests.cs | 69 ++ .../JetStreamFileStoreTests.Impltests.cs | 482 ++++++++++ .../LeafNodeHandlerTests.Impltests.cs | 74 ++ .../LeafNodeProxyTests.Impltests.cs | 69 ++ .../ImplBacklog/LeafNodeProxyTests.cs | 2 +- .../RouteHandlerTests.Impltests.cs | 77 ++ porting.db | Bin 6615040 -> 6627328 bytes reports/current.md | 14 +- 9 files changed, 1596 insertions(+), 39 deletions(-) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index cf63058..10f6a7f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -2246,11 +2246,14 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable } private void WriteFileWithOptionalSync(string path, byte[] payload) + => WriteFileWithOptionalSync(path, payload, UnixFileMode.UserRead | UnixFileMode.UserWrite); + + private void WriteFileWithOptionalSync(string path, byte[] payload, UnixFileMode perm) { Directory.CreateDirectory(Path.GetDirectoryName(path)!); - using var stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); - stream.Write(payload, 0, payload.Length); - stream.Flush(_fcfg.SyncAlways); + var err = WriteAtomically(path, payload, perm, _fcfg.SyncAlways); + if (err != null) + throw err; } private bool TryReadBlockIndexInfo(MessageBlock mb, byte[] lchk) @@ -3193,14 +3196,65 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable // Lock should be held. private void SetSyncTimerLocked() - { - _syncTmr?.Dispose(); - _syncTmr = null; + => SetSyncTimer(); + // Lock should be held. + private void SetSyncTimer() + { if (_fcfg.SyncInterval <= TimeSpan.Zero || IsClosed()) return; - _syncTmr = new Timer(_ => SyncBlocks(), null, _fcfg.SyncInterval, Timeout.InfiniteTimeSpan); + if (_syncTmr != null) + { + _syncTmr.Change(_fcfg.SyncInterval, Timeout.InfiniteTimeSpan); + return; + } + + var half = TimeSpan.FromTicks(Math.Max(1, _fcfg.SyncInterval.Ticks / 2)); + var jitterTicks = Random.Shared.NextInt64(Math.Max(1, half.Ticks)); + var start = half + TimeSpan.FromTicks(jitterTicks); + _syncTmr = new Timer(_ => SyncBlocks(), null, start, Timeout.InfiniteTimeSpan); + } + + // Lock should be held. + private void CancelSyncTimer() + { + _syncTmr?.Dispose(); + _syncTmr = null; + } + + private async Task FlushStreamStateLoop(ChannelReader qch, ChannelWriter done) + { + try + { + var writeThreshold = TimeSpan.FromMinutes(2); + var writeJitter = TimeSpan.FromSeconds(Random.Shared.Next(0, 30)); + using var ticker = new PeriodicTimer(writeThreshold + writeJitter); + + while (true) + { + var tickTask = ticker.WaitForNextTickAsync().AsTask(); + var quitTask = qch.Completion; + var completed = await Task.WhenAny(tickTask, quitTask).ConfigureAwait(false); + if (ReferenceEquals(completed, quitTask)) + break; + + if (!await tickTask.ConfigureAwait(false)) + break; + + var err = WriteFullState(); + if (err is UnauthorizedAccessException) + { + Warn("File system permission denied when flushing stream state: {0}", err.Message); + break; + } + } + } + finally + { + done.TryWrite(0); + done.TryComplete(); + } } // ----------------------------------------------------------------------- @@ -3997,6 +4051,698 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable ss.LastNeedsUpdate = false; } + // ----------------------------------------------------------------------- + // Lifecycle helpers (Batch 14 Group 4) + // ----------------------------------------------------------------------- + + // Lock should be held. + private void PurgeMsgBlock(MessageBlock mb) + { + ArgumentNullException.ThrowIfNull(mb); + + mb.Mu.EnterWriteLock(); + ulong msgs; + ulong bytes; + try + { + msgs = Math.Min(mb.Msgs, _state.Msgs); + bytes = Math.Min(mb.Bytes, _state.Bytes); + + if (_scheduling != null && mb.Schedules > 0) + { + for (var seq = mb.First.Seq; seq <= mb.Last.Seq; seq++) + { + _scheduling.Remove(seq); + if (seq == ulong.MaxValue) + break; + } + } + + _state.Msgs -= msgs; + _state.Bytes -= bytes; + RemoveMsgBlock(mb); + mb.TryForceExpireCacheLocked(); + } + finally + { + mb.Mu.ExitWriteLock(); + } + + SelectNextFirst(); + + if ((msgs > 0 || bytes > 0) && _scb != null) + _scb(-(long)msgs, -(long)bytes, 0, string.Empty); + } + + // Lock should be held. + private void ResetGlobalPerSubjectInfo() + { + _psim ??= new SubjectTree(); + _psim.Reset(); + _tsl = 0; + + if (NoTrackSubjects()) + return; + + foreach (var mb in _blks) + PopulateGlobalPerSubjectInfo(mb); + } + + // Lock should be held. + private void PopulateGlobalPerSubjectInfo(MessageBlock mb) + { + ArgumentNullException.ThrowIfNull(mb); + + mb.Mu.EnterReadLock(); + try + { + var fss = mb.Fss; + if (fss == null) + return; + + _psim ??= new SubjectTree(); + + fss.IterFast((bsubj, ss) => + { + if (bsubj.Length == 0) + return true; + + var (psi, ok) = _psim.Find(bsubj); + if (ok && psi != null) + { + psi.Total += ss.Msgs; + if (mb.Index > psi.Lblk) + psi.Lblk = mb.Index; + } + else + { + _psim.Insert(bsubj, new Psi + { + Total = ss.Msgs, + Fblk = mb.Index, + Lblk = mb.Index, + }); + _tsl += bsubj.Length; + } + + return true; + }); + } + finally + { + mb.Mu.ExitReadLock(); + } + } + + private void CloseAllMsgBlocks(bool sync) + { + foreach (var mb in _blks) + CloseMsgBlock(mb, sync); + } + + private static void CloseMsgBlock(MessageBlock mb, bool sync) + { + if (mb == null) + return; + + mb.Mu.EnterWriteLock(); + try + { + if (mb.Closed) + return; + + mb.Ctmr?.Dispose(); + mb.Ctmr = null; + mb.Fss = null; + mb.TryForceExpireCacheLocked(); + + if (mb.Qch != null) + { + mb.Qch.Writer.TryComplete(); + mb.Qch = null; + } + + if (mb.Mfd != null) + { + if (sync || mb.SyncAlways || mb.NeedSync) + mb.Mfd.Flush(true); + mb.Mfd.Dispose(); + mb.Mfd = null; + } + + mb.NeedSync = false; + mb.Closed = true; + } + catch (Exception ex) + { + mb.Werr = ex; + } + finally + { + mb.Mu.ExitWriteLock(); + } + } + + private Exception? WriteFullState() + => WriteFullStateInternal(force: false); + + private Exception? ForceWriteFullState() + => WriteFullStateInternal(force: true); + + private Exception? WriteFullStateInternal(bool force) + { + if (IsClosed()) + return null; + + if (Interlocked.Increment(ref _wfsrun) > 1 && !force) + { + Interlocked.Decrement(ref _wfsrun); + return null; + } + + try + { + lock (_wfsmu) + { + try + { + StreamState state; + StreamState tracked = new(); + List<(byte[] Subject, Psi Info)> subjects = []; + List<(uint Index, ulong Bytes, ulong FirstSeq, long FirstTs, ulong LastSeq, long LastTs, ulong NumDeleted, ulong Ttls, ulong Schedules, byte[] Dmap, byte[] Lchk, bool IsLast)> blocks = []; + int priorDirty; + + _mu.EnterReadLock(); + try + { + if (_dirty == 0 && !force) + return null; + + state = _memStore.State(); + priorDirty = _dirty; + + if (_prf != null && _aek == null) + { + _mu.ExitReadLock(); + _mu.EnterWriteLock(); + try { SetupAEK(); } + finally { _mu.ExitWriteLock(); } + _mu.EnterReadLock(); + } + + if (_psim != null && _psim.Size() > 0) + { + _psim.Match(Encoding.UTF8.GetBytes(">"), (subject, psi) => + { + if (psi != null) + { + subjects.Add((subject.ToArray(), new Psi + { + Total = psi.Total, + Fblk = psi.Fblk, + Lblk = psi.Lblk, + })); + } + + return true; + }); + } + + foreach (var mb in _blks) + { + mb.Mu.EnterReadLock(); + try + { + var numDeleted = (ulong)mb.Dmap.Size; + var dmap = numDeleted > 0 ? mb.Dmap.Encode(null) : Array.Empty(); + var lchk = mb.Lchk.Length >= FullStateChecksumLength + ? mb.Lchk.AsSpan(0, FullStateChecksumLength).ToArray() + : new byte[FullStateChecksumLength]; + + blocks.Add(( + mb.Index, + mb.Bytes, + mb.First.Seq, + mb.First.Ts, + mb.Last.Seq, + mb.Last.Ts, + numDeleted, + mb.Ttls, + mb.Schedules, + dmap, + lchk, + ReferenceEquals(mb, _lmb))); + + UpdateTrackingState(tracked, mb); + } + finally + { + mb.Mu.ExitReadLock(); + } + } + } + finally + { + if (_mu.IsReadLockHeld) + _mu.ExitReadLock(); + } + + if (!TrackingStatesEqual(state, tracked)) + { + Warn("Stream state encountered internal inconsistency on write"); + RebuildState(null); + state = _memStore.State(); + } + + var baseTime = TimestampNormalized(state.FirstTime); + var buf = new List(4096) + { + FullStateMagic, + FullStateVersion, + }; + + AppendUVarInt(buf, state.Msgs); + AppendUVarInt(buf, state.Bytes); + AppendUVarInt(buf, state.FirstSeq); + AppendVarInt(buf, baseTime); + AppendUVarInt(buf, state.LastSeq); + AppendVarInt(buf, TimestampNormalized(state.LastTime)); + + AppendUVarInt(buf, (ulong)subjects.Count); + foreach (var (subject, info) in subjects) + { + AppendUVarInt(buf, (ulong)subject.Length); + buf.AddRange(subject); + AppendUVarInt(buf, info.Total); + AppendUVarInt(buf, info.Fblk); + if (info.Total > 1) + AppendUVarInt(buf, info.Lblk); + } + + AppendUVarInt(buf, (ulong)blocks.Count); + + uint lastBlockIndex = 0; + var lastChecksum = new byte[FullStateChecksumLength]; + + foreach (var block in blocks) + { + AppendUVarInt(buf, block.Index); + AppendUVarInt(buf, block.Bytes); + AppendUVarInt(buf, block.FirstSeq); + AppendVarInt(buf, block.FirstTs - baseTime); + AppendUVarInt(buf, block.LastSeq); + AppendVarInt(buf, block.LastTs - baseTime); + AppendUVarInt(buf, block.NumDeleted); + AppendUVarInt(buf, block.Ttls); + AppendUVarInt(buf, block.Schedules); + if (block.Dmap.Length > 0) + buf.AddRange(block.Dmap); + + if (block.IsLast) + { + lastBlockIndex = block.Index; + Buffer.BlockCopy(block.Lchk, 0, lastChecksum, 0, Math.Min(lastChecksum.Length, block.Lchk.Length)); + } + } + + AppendUVarInt(buf, lastBlockIndex); + buf.AddRange(lastChecksum); + + var payload = buf.ToArray(); + if (_aek != null) + { + var nonce = new byte[_aek.NonceSize]; + RandomNumberGenerator.Fill(nonce); + var encrypted = _aek.Seal(nonce, payload); + var sealedPayload = new byte[nonce.Length + encrypted.Length]; + Buffer.BlockCopy(nonce, 0, sealedPayload, 0, nonce.Length); + Buffer.BlockCopy(encrypted, 0, sealedPayload, nonce.Length, encrypted.Length); + payload = sealedPayload; + } + + var key = SHA256.HashData(Encoding.UTF8.GetBytes(_cfg.Config.Name)); + using var hmac = new HMACSHA256(key); + var digest = hmac.ComputeHash(payload); + + var output = new byte[payload.Length + FullStateChecksumLength]; + Buffer.BlockCopy(payload, 0, output, 0, payload.Length); + Buffer.BlockCopy(digest, 0, output, payload.Length, FullStateChecksumLength); + + var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + WriteFileWithOptionalSync(fn, output); + + _mu.EnterWriteLock(); + try + { + _dirty = Math.Max(0, _dirty - priorDirty); + } + finally + { + _mu.ExitWriteLock(); + } + + var ttlErr = WriteTTLState(); + if (ttlErr != null) + return ttlErr; + + var schedErr = WriteMsgSchedulingState(); + if (schedErr != null) + return schedErr; + + return null; + } + catch (Exception ex) + { + return ex; + } + } + } + finally + { + Interlocked.Decrement(ref _wfsrun); + } + } + + private Exception? WriteTTLState() + { + _mu.EnterReadLock(); + try + { + if (_ttls == null) + return null; + + var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.TtlStreamStateFile); + var buf = _ttls.Encode(_state.LastSeq + 1); + WriteFileWithOptionalSync(fn, buf); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + _mu.ExitReadLock(); + } + } + + private Exception? WriteMsgSchedulingState() + { + _mu.EnterReadLock(); + try + { + if (_scheduling == null) + return null; + + var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.MsgSchedulingStreamStateFile); + var buf = _scheduling.Encode(_state.LastSeq + 1); + WriteFileWithOptionalSync(fn, buf); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + _mu.ExitReadLock(); + } + } + + private Exception? StopInternal(bool delete, bool writeState) + { + if (IsClosed()) + return StoreErrors.ErrStoreClosed; + + _mu.EnterWriteLock(); + try + { + if (_closing) + return StoreErrors.ErrStoreClosed; + + _closing = true; + + if (_qch != null) + { + _qch.Writer.TryComplete(); + _qch = null; + } + + if (writeState) + CheckAndFlushLastBlock(); + + CloseAllMsgBlocks(sync: false); + CancelSyncTimer(); + CancelAgeChk(); + + if (writeState) + { + _mu.ExitWriteLock(); + _ = ForceWriteFullState(); + _mu.EnterWriteLock(); + } + + _closed = true; + _lmb = null; + + var cb = _scb; + var bytes = (long)_state.Bytes; + _mu.ExitWriteLock(); + + _cmu.EnterWriteLock(); + var cfs = _cfs.ToArray(); + _cfs.Clear(); + _cmu.ExitWriteLock(); + + foreach (var consumer in cfs) + { + if (delete) + consumer.StreamDelete(); + else + consumer.Stop(); + } + + _memStore.Stop(); + + if (bytes > 0 && cb != null) + cb(0, -bytes, 0, string.Empty); + + return null; + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + } + + private (MemoryStream? Snapshot, Exception? Error) StreamSnapshot(bool includeConsumers) + { + try + { + _mu.EnterReadLock(); + var state = _memStore.State(); + var meta = JsonSerializer.SerializeToUtf8Bytes(_cfg); + var blocks = _blks.ToArray(); + _mu.ExitReadLock(); + + var files = new Dictionary(StringComparer.Ordinal) + { + [FileStoreDefaults.JetStreamMetaFile] = meta, + [FileStoreDefaults.JetStreamMetaFileSum] = Encoding.ASCII.GetBytes(Convert.ToHexString(SHA256.HashData(meta)).ToLowerInvariant()), + }; + + foreach (var mb in blocks) + { + if (!File.Exists(mb.Mfn)) + continue; + files[$"{FileStoreDefaults.MsgDir}/{string.Format(FileStoreDefaults.BlkScan, mb.Index)}"] = File.ReadAllBytes(mb.Mfn); + } + + _ = ForceWriteFullState(); + var stateFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + if (File.Exists(stateFile)) + files[$"{FileStoreDefaults.MsgDir}/{FileStoreDefaults.StreamStateFile}"] = File.ReadAllBytes(stateFile); + + if (includeConsumers) + { + _cmu.EnterReadLock(); + var consumers = _cfs.ToArray(); + _cmu.ExitReadLock(); + + var i = 0; + foreach (var consumer in consumers) + { + var (consumerState, err) = consumer.State(); + if (err == null && consumerState != null) + files[$"{FileStoreDefaults.ConsumerDir}/consumer-{i++}.json"] = JsonSerializer.SerializeToUtf8Bytes(consumerState); + } + } + + var payload = JsonSerializer.SerializeToUtf8Bytes(new SnapshotEnvelope + { + State = state, + Files = files, + }); + + return (new MemoryStream(payload, writable: false), null); + } + catch (Exception ex) + { + return (null, ex); + } + finally + { + if (_mu.IsReadLockHeld) + _mu.ExitReadLock(); + if (_cmu.IsReadLockHeld) + _cmu.ExitReadLock(); + } + } + + private FileStoreConfig FileStoreConfig() + { + _mu.EnterReadLock(); + try + { + return new FileStoreConfig + { + StoreDir = _fcfg.StoreDir, + BlockSize = _fcfg.BlockSize, + CacheExpire = _fcfg.CacheExpire, + SubjectStateExpire = _fcfg.SubjectStateExpire, + SyncInterval = _fcfg.SyncInterval, + SyncAlways = _fcfg.SyncAlways, + AsyncFlush = _fcfg.AsyncFlush, + Cipher = _fcfg.Cipher, + Compression = _fcfg.Compression, + Server = _fcfg.Server, + }; + } + finally + { + _mu.ExitReadLock(); + } + } + + // Lock should be held. + private void ReadLockAllMsgBlocks() + { + foreach (var mb in _blks) + mb.Mu.EnterReadLock(); + } + + // Lock should be held. + private void ReadUnlockAllMsgBlocks() + { + foreach (var mb in _blks) + { + if (mb.Mu.IsReadLockHeld) + mb.Mu.ExitReadLock(); + } + } + + // All blocks should be at least read-locked. + private DeleteBlocks DeleteBlocks() + { + var dbs = new DeleteBlocks(); + ulong prevLast = 0; + DeleteRange? prevRange = null; + var msgsSinceGap = false; + + foreach (var mb in _blks) + { + var fseq = mb.First.Seq; + if (prevLast > 0 && prevLast + 1 != fseq) + { + var gap = fseq - prevLast - 1; + if (prevRange != null && !msgsSinceGap) + { + prevRange.Num += gap; + } + else + { + prevRange = new DeleteRange { First = prevLast + 1, Num = gap }; + msgsSinceGap = false; + dbs.Add(prevRange); + } + } + + if (mb.Dmap.Size > 0) + { + var deleted = new List(mb.Dmap.Size); + mb.Dmap.Range(seq => + { + deleted.Add(seq); + return true; + }); + dbs.Add(new DeleteSlice([.. deleted])); + prevRange = null; + } + + prevLast = mb.Last.Seq; + msgsSinceGap = msgsSinceGap || mb.Msgs > 0; + } + + return dbs; + } + + private SequenceSet DeleteMap() + { + var dmap = new SequenceSet(); + + _mu.EnterReadLock(); + try + { + ReadLockAllMsgBlocks(); + try + { + foreach (var mb in _blks) + { + if (mb.Dmap.Size == 0) + continue; + + mb.Dmap.Range(seq => + { + dmap.Insert(seq); + return true; + }); + } + } + finally + { + ReadUnlockAllMsgBlocks(); + } + } + finally + { + _mu.ExitReadLock(); + } + + return dmap; + } + + private static void AppendUVarInt(List buffer, ulong value) + { + do + { + var b = (byte)(value & 0x7Fu); + value >>= 7; + if (value != 0) + b |= 0x80; + buffer.Add(b); + } + while (value != 0); + } + + private static void AppendVarInt(List buffer, long value) + { + var encoded = (ulong)value << 1; + if (value < 0) + encoded = ~encoded; + AppendUVarInt(buffer, encoded); + } + // ----------------------------------------------------------------------- // IStreamStore — type / state // ----------------------------------------------------------------------- @@ -4044,24 +4790,9 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public void Stop() { - _mu.EnterWriteLock(); - try - { - if (_closing) return; - _closing = true; - } - finally - { - _mu.ExitWriteLock(); - } - - _ageChk?.Dispose(); - _ageChk = null; - _syncTmr?.Dispose(); - _syncTmr = null; - - _closed = true; - _memStore.Stop(); + var err = StopInternal(delete: false, writeState: true); + if (err != null && !ReferenceEquals(err, StoreErrors.ErrStoreClosed)) + throw err; } /// @@ -4198,7 +4929,23 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public void SyncDeleted(DeleteBlocks dbs) - => _memStore.SyncDeleted(dbs); + { + if (dbs.Count == 0) + return; + + _mu.EnterReadLock(); + try + { + if (_state.LastSeq == 0) + return; + } + finally + { + _mu.ExitReadLock(); + } + + _memStore.SyncDeleted(dbs); + } // ----------------------------------------------------------------------- // IStreamStore — config / admin (stubs) @@ -4213,7 +4960,25 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public void Delete(bool inline) - => _memStore.Delete(inline); + { + if (IsClosed()) + { + try { Directory.Delete(_fcfg.StoreDir, recursive: true); } catch { } + return; + } + + _ = StopInternal(delete: true, writeState: false); + + var remove = () => + { + try { Directory.Delete(_fcfg.StoreDir, recursive: true); } catch { } + }; + + if (inline) + remove(); + else + _ = Task.Run(remove); + } /// public void ResetState() @@ -4270,16 +5035,37 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs) { - var state = _memStore.State(); - var payload = JsonSerializer.SerializeToUtf8Bytes(state); - var reader = new MemoryStream(payload, writable: false); - return (new SnapshotResult { Reader = reader, State = state }, null); + if (IsClosed()) + return (null, StoreErrors.ErrStoreClosed); + + if (checkMsgs) + { + var ld = CheckMsgs(); + if (ld is { Msgs.Length: > 0 }) + return (null, new InvalidDataException($"snapshot check detected {ld.Msgs.Length} bad messages")); + } + + var (reader, err) = StreamSnapshot(includeConsumers); + if (err != null || reader == null) + return (null, err ?? new InvalidOperationException("snapshot generation failed")); + + return (new SnapshotResult + { + Reader = reader, + State = _memStore.State(), + }, null); } /// public (ulong Total, ulong Reported, Exception? Error) Utilization() => _memStore.Utilization(); + private sealed class SnapshotEnvelope + { + public StreamState State { get; set; } = new(); + public Dictionary Files { get; set; } = new(StringComparer.Ordinal); + } + internal sealed class XorStreamCipher { private readonly byte[] _keySeed; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index e36847e..8f8b5bd 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Diagnostics; using System.Reflection; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -122,6 +123,74 @@ public sealed partial class ConcurrencyTests2 }, cfg); } + [Fact] // T:2494 + public void NoRaceFileStoreWriteFullStateUniqueSubjects_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.FileStorage, + Subjects = ["records.>"], + MaxMsgs = -1, + MaxBytes = 15L * 1024 * 1024 * 1024, + MaxAge = TimeSpan.Zero, + MaxMsgsPer = 1, + Discard = DiscardPolicy.DiscardOld, + Retention = RetentionPolicy.LimitsPolicy, + }; + + WithStore((fs, root) => + { + var payload = Enumerable.Repeat((byte)'Z', 128).ToArray(); + var errors = new ConcurrentQueue(); + using var cts = new CancellationTokenSource(); + + var writer = Task.Run(async () => + { + while (!cts.Token.IsCancellationRequested) + { + try + { + var err = InvokePrivate(fs, "WriteFullState"); + if (err != null) + errors.Enqueue(err); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + + try + { + await Task.Delay(10, cts.Token); + } + catch (OperationCanceledException) + { + break; + } + } + }); + + for (var i = 0; i < 2_000; i++) + { + var subject = $"records.{Guid.NewGuid():N}.{i % 5}"; + var sw = Stopwatch.StartNew(); + fs.StoreMsg(subject, null, payload, 0).Seq.ShouldBeGreaterThan(0UL); + sw.Stop(); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromMilliseconds(500)); + } + + cts.Cancel(); + Should.NotThrow(() => writer.Wait(TimeSpan.FromSeconds(2))); + errors.ShouldBeEmpty(); + + fs.Stop(); + var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + File.Exists(stateFile).ShouldBeTrue(); + new FileInfo(stateFile).Length.ShouldBeGreaterThan(0L); + }, cfg); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index 6ae3afe..abc64c3 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -618,6 +618,488 @@ public sealed partial class JetStreamFileStoreTests }); } + [Fact] // T:384 + public void FileStoreSnapshotAndSyncBlocks_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 5; i++) + fs.StoreMsg("foo", null, "x"u8.ToArray(), 0); + + InvokePrivateVoid(fs, "CancelSyncTimer"); + InvokePrivateVoid(fs, "SyncBlocks"); + GetPrivateField(fs, "_syncTmr").ShouldNotBeNull(); + + SetPrivateField(fs, "_sips", 1); + InvokePrivateVoid(fs, "CancelSyncTimer"); + InvokePrivateVoid(fs, "SyncBlocks"); + GetPrivateField(fs, "_syncTmr").ShouldNotBeNull(); + + var (snapshot, err) = fs.Snapshot(TimeSpan.FromSeconds(2), includeConsumers: false, checkMsgs: true); + err.ShouldBeNull(); + snapshot.ShouldNotBeNull(); + snapshot!.State.Msgs.ShouldBeGreaterThan(0UL); + + using var reader = snapshot.Reader; + using var payload = new MemoryStream(); + reader.CopyTo(payload); + payload.Length.ShouldBeGreaterThan(0L); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + SyncInterval = TimeSpan.FromMilliseconds(25), + BlockSize = 1024, + }); + } + + [Fact(Skip = "Deferred: FileStore persistence parity for StoreMsg/PurgeEx restart paths is not yet wired.")] // T:412 + public void FileStorePurgeExWithSubject_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + try + { + var fcfg = new FileStoreConfig { StoreDir = root, BlockSize = 1000 }; + var cfg = DefaultStreamConfig(subjects: ["foo.>"]); + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + + var payload = new byte[20]; + fs.StoreMsg("foo.0", null, payload, 0).Seq.ShouldBe(1UL); + for (var i = 0; i < 200; i++) + fs.StoreMsg("foo.1", null, payload, 0); + fs.StoreMsg("foo.2", null, "xxxxxx"u8.ToArray(), 0); + + InvokePrivate(fs, "ForceWriteFullState").ShouldBeNull(); + var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + var priorState = File.ReadAllBytes(stateFile); + priorState.Length.ShouldBeGreaterThan(0); + + var (purged, purgeErr) = fs.PurgeEx("foo.1", 1, 0); + purgeErr.ShouldBeNull(); + purged.ShouldBe(200UL); + + var expected = fs.State(); + expected.Msgs.ShouldBeLessThanOrEqualTo(2UL); + + fs.Stop(); + fs = null; + + using (var reopened = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root, BlockSize = 1000 }, cfg)) + { + var state = reopened.State(); + state.Msgs.ShouldBe(expected.Msgs); + state.FirstSeq.ShouldBe(expected.FirstSeq); + state.LastSeq.ShouldBe(expected.LastSeq); + } + + File.Delete(stateFile); + using (var reopened = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root, BlockSize = 1000 }, cfg)) + { + var state = reopened.State(); + state.Msgs.ShouldBe(expected.Msgs); + state.FirstSeq.ShouldBe(expected.FirstSeq); + state.LastSeq.ShouldBe(expected.LastSeq); + } + + File.WriteAllBytes(stateFile, priorState); + using (var reopened = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root, BlockSize = 1000 }, cfg)) + { + var state = reopened.State(); + state.Msgs.ShouldBe(expected.Msgs); + state.FirstSeq.ShouldBe(expected.FirstSeq); + state.LastSeq.ShouldBe(expected.LastSeq); + } + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact(Skip = "Deferred: FileStore persistence parity for PurgeEx block-removal restart recovery is not yet wired.")] // T:413 + public void FileStorePurgeExNoTombsOnBlockRemoval_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + try + { + var fcfg = new FileStoreConfig { StoreDir = root, BlockSize = 1000 }; + var cfg = DefaultStreamConfig(subjects: ["foo.>"]); + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + + var payload = new byte[20]; + for (var i = 0; i < 100; i++) + fs.StoreMsg("foo.1", null, payload, 0); + fs.StoreMsg("foo.2", null, payload, 0); + + InvokePrivate(fs, "ForceWriteFullState").ShouldBeNull(); + + var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + var priorState = File.ReadAllBytes(stateFile); + priorState.Length.ShouldBeGreaterThan(0); + + var (purged, purgeErr) = fs.PurgeEx("foo.1", 1, 0); + purgeErr.ShouldBeNull(); + purged.ShouldBe(100UL); + + var state = fs.State(); + state.Msgs.ShouldBeLessThanOrEqualTo(1UL); + + fs.Stop(); + fs = null; + + File.WriteAllBytes(stateFile, priorState); + using var reopened = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root, BlockSize = 1000 }, cfg); + var reopenedState = reopened.State(); + reopenedState.Msgs.ShouldBe(state.Msgs); + reopenedState.FirstSeq.ShouldBe(state.FirstSeq); + reopenedState.LastSeq.ShouldBe(state.LastSeq); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:483 + public void FileStoreWriteFullStateAfterPurgeEx_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 1; i <= 10; i++) + fs.StoreMsg($"foo.{i}", null, "abc"u8.ToArray(), 0); + + fs.RemoveMsg(8).Removed.ShouldBeTrue(); + fs.RemoveMsg(9).Removed.ShouldBeTrue(); + fs.RemoveMsg(10).Removed.ShouldBeTrue(); + + var (purged, purgeErr) = fs.PurgeEx(">", 8, 0); + purgeErr.ShouldBeNull(); + purged.ShouldBe(7UL); + + var before = fs.State(); + + InvokePrivate(fs, "WriteFullState").ShouldBeNull(); + + var after = fs.State(); + after.FirstSeq.ShouldBe(before.FirstSeq); + after.LastSeq.ShouldBe(before.LastSeq); + after.Msgs.ShouldBe(before.Msgs); + }, cfg: DefaultStreamConfig(subjects: ["foo.*"])); + } + + [Fact] // T:518 + public void FileStoreWriteFullStateDetectCorruptState_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 1; i <= 10; i++) + fs.StoreMsg($"foo.{i}", null, "abc"u8.ToArray(), 0); + + SetPrivateField(fs, "_dirty", 1); + InvokePrivate(fs, "WriteFullState").ShouldBeNull(); + fs.State().Msgs.ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(subjects: ["foo.*"])); + } + + [Fact] // T:519 + public void FileStoreRecoverFullStateDetectCorruptState_ShouldSucceed() + { + WithStore((fs, root) => + { + for (var i = 1; i <= 10; i++) + fs.StoreMsg($"foo.{i}", null, "abc"u8.ToArray(), 0); + + SetPrivateField(fs, "_dirty", 1); + InvokePrivate(fs, "ForceWriteFullState").ShouldBeNull(); + + var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + File.Exists(stateFile).ShouldBeTrue(); + var raw = File.ReadAllBytes(stateFile); + raw.Length.ShouldBeGreaterThan(2); + raw[2] ^= 0x7F; + File.WriteAllBytes(stateFile, raw); + + var err = fs.RecoverFullState(); + err.ShouldNotBeNull(); + err.ShouldBeOfType(); + File.Exists(stateFile).ShouldBeFalse(); + }, cfg: DefaultStreamConfig(subjects: ["foo.*"])); + } + + [Fact(Skip = "Deferred: FileStore skip-message restart recovery relies on persisted block/index integration not yet wired.")] // T:531 + public void FileStoreLeftoverSkipMsgInDmap_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + JetStreamFileStore? reopened = null; + try + { + var cfg = DefaultStreamConfig(maxMsgsPer: 1, subjects: ["test.*"]); + fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs.SkipMsg(0).Error.ShouldBeNull(); + + var state = fs.State(); + state.FirstSeq.ShouldBe(2UL); + state.LastSeq.ShouldBe(1UL); + state.NumDeleted.ShouldBe(0); + + InvokePrivate(fs, "StopInternal", false, false).ShouldBeNull(); + fs = null; + + reopened = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + state = reopened.State(); + state.FirstSeq.ShouldBe(2UL); + state.LastSeq.ShouldBe(1UL); + state.NumDeleted.ShouldBe(0); + } + finally + { + reopened?.Stop(); + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:566 + public void FileStorePurgeMsgBlock_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 20; i++) + fs.StoreMsg("foo", null, null, 0); + + ConfigureSyntheticBlocks(fs, [(1UL, 10UL), (11UL, 20UL)], bytesPerMsg: 33UL); + var beforeState = GetPrivateField(fs, "_state"); + beforeState.FirstSeq.ShouldBe(1UL); + beforeState.LastSeq.ShouldBe(20UL); + beforeState.Msgs.ShouldBe(20UL); + beforeState.Bytes.ShouldBe(660UL); + + var mu = GetPrivateField(fs, "_mu"); + mu.EnterWriteLock(); + try + { + var blks = GetPrivateField>(fs, "_blks"); + InvokePrivateVoid(fs, "PurgeMsgBlock", blks[0]); + blks.Count.ShouldBe(1); + } + finally + { + mu.ExitWriteLock(); + } + + var afterState = GetPrivateField(fs, "_state"); + afterState.FirstSeq.ShouldBe(11UL); + afterState.LastSeq.ShouldBe(20UL); + afterState.Msgs.ShouldBe(10UL); + afterState.Bytes.ShouldBe(330UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + BlockSize = 10UL * 33UL, + }); + } + + [Fact] // T:567 + public void FileStorePurgeMsgBlockUpdatesSubjects_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 20; i++) + fs.StoreMsg("foo", null, "x"u8.ToArray(), 0); + + var before = fs.SubjectsTotals("foo"); + before.ShouldContainKey("foo"); + before["foo"].ShouldBe(20UL); + + var (purged, purgeErr) = fs.PurgeEx("foo", 1, 0); + purgeErr.ShouldBeNull(); + purged.ShouldBeGreaterThan(0UL); + + var state = fs.State(); + var totals = fs.SubjectsTotals("foo"); + totals.GetValueOrDefault("foo", 0UL).ShouldBe(state.Msgs); + state.Msgs.ShouldBeLessThan(20UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"])); + } + + [Fact] // T:588 + public void FileStoreDeleteRangeTwoGaps_ShouldSucceed() + { + WithStore((fs, _) => + { + ConfigureSyntheticBlocks(fs, [(1UL, 9UL), (11UL, 14UL), (16UL, 20UL)]); + + var dBlocks = SnapshotDeleteBlocks(fs); + AssertDeleteBlocks( + dBlocks, + (typeof(DeleteRange), 10UL, 10UL, 1UL), + (typeof(DeleteRange), 15UL, 15UL, 1UL)); + }, cfg: DefaultStreamConfig(subjects: ["foo"])); + } + + [Fact] // T:589 + public void FileStoreDeleteBlocksWithSingleMessageBlocks_ShouldSucceed() + { + WithStore((fs, _) => + { + ConfigureSyntheticBlocks(fs, [(2UL, 2UL), (4UL, 4UL), (12UL, 15UL), (19UL, 20UL)]); + AssertDeleteBlocks( + SnapshotDeleteBlocks(fs), + (typeof(DeleteRange), 3UL, 3UL, 1UL), + (typeof(DeleteRange), 5UL, 11UL, 7UL), + (typeof(DeleteRange), 16UL, 18UL, 3UL)); + }, cfg: DefaultStreamConfig(subjects: ["foo"])); + } + + [Fact] // T:590 + public void FileStoreDeleteBlocks_ShouldSucceed() + { + WithStore((fs, _) => + { + ConfigureSyntheticBlocks(fs, [(1UL, 7UL), (11UL, 12UL), (13UL, 16UL), (19UL, 20UL)]); + AssertDeleteBlocks( + SnapshotDeleteBlocks(fs), + (typeof(DeleteRange), 8UL, 10UL, 3UL), + (typeof(DeleteRange), 17UL, 18UL, 2UL)); + }, cfg: DefaultStreamConfig(subjects: ["foo"])); + } + + [Fact(Skip = "Deferred: RemoveMsgsInRange parity needs file-backed block mutation/compaction integration.")] // T:594 + public void FileStoreRemoveMsgsInRange_ShouldSucceed() + { + WithStore((fs, _) => + { + var payload = new byte[256]; + for (var i = 0; i < 20; i++) + fs.StoreMsg("foo", null, payload, 0); + + var singleMessageBlocks = Enumerable.Range(1, 20) + .Select(seq => ((ulong)seq, (ulong)seq)) + .ToArray(); + ConfigureSyntheticBlocks(fs, singleMessageBlocks, bytesPerMsg: 256UL); + + var mu = GetPrivateField(fs, "_mu"); + mu.EnterWriteLock(); + try + { + GetPrivateField>(fs, "_blks").Count.ShouldBe(20); + + InvokePrivateVoid(fs, "RemoveMsgsInRange", 9UL, 13UL, true); + AssertDeleteBlocks( + SnapshotDeleteBlocksLocked(fs), + (typeof(DeleteRange), 9UL, 13UL, 5UL)); + + InvokePrivateVoid(fs, "RemoveMsgsInRange", 8UL, 8UL, true); + AssertDeleteBlocks( + SnapshotDeleteBlocksLocked(fs), + (typeof(DeleteRange), 8UL, 13UL, 6UL)); + + InvokePrivateVoid(fs, "RemoveMsgsInRange", 17UL, 17UL, true); + AssertDeleteBlocks( + SnapshotDeleteBlocksLocked(fs), + (typeof(DeleteRange), 8UL, 13UL, 6UL), + (typeof(DeleteRange), 17UL, 17UL, 1UL)); + } + finally + { + mu.ExitWriteLock(); + } + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + BlockSize = 256UL, + }); + } + + private static DeleteBlocks SnapshotDeleteBlocks(JetStreamFileStore fs) + { + var mu = GetPrivateField(fs, "_mu"); + mu.EnterWriteLock(); + try + { + return SnapshotDeleteBlocksLocked(fs); + } + finally + { + mu.ExitWriteLock(); + } + } + + private static DeleteBlocks SnapshotDeleteBlocksLocked(JetStreamFileStore fs) + { + InvokePrivateVoid(fs, "ReadLockAllMsgBlocks"); + try + { + return InvokePrivate(fs, "DeleteBlocks"); + } + finally + { + InvokePrivateVoid(fs, "ReadUnlockAllMsgBlocks"); + } + } + + private static void ConfigureSyntheticBlocks( + JetStreamFileStore fs, + (ulong First, ulong Last)[] ranges, + ulong bytesPerMsg = 1UL) + { + var blks = new List(ranges.Length); + var bim = new Dictionary(ranges.Length); + ulong msgs = 0; + ulong bytes = 0; + + for (var i = 0; i < ranges.Length; i++) + { + var mb = fs.InitMsgBlock((uint)(i + 1)); + mb.First = new MsgId { Seq = ranges[i].First, Ts = (long)ranges[i].First }; + mb.Last = new MsgId { Seq = ranges[i].Last, Ts = (long)ranges[i].Last }; + mb.Msgs = ranges[i].Last >= ranges[i].First ? (ranges[i].Last - ranges[i].First + 1) : 0; + mb.Bytes = mb.Msgs * bytesPerMsg; + blks.Add(mb); + bim[mb.Index] = mb; + msgs += mb.Msgs; + bytes += mb.Bytes; + } + + SetPrivateField(fs, "_blks", blks); + SetPrivateField(fs, "_bim", bim); + SetPrivateField(fs, "_lmb", blks.Count == 0 ? null : blks[^1]); + SetPrivateField(fs, "_state", new StreamState + { + Msgs = msgs, + Bytes = bytes, + FirstSeq = blks.Count == 0 ? 0UL : blks[0].First.Seq, + LastSeq = blks.Count == 0 ? 0UL : blks[^1].Last.Seq, + FirstTime = DateTime.UtcNow, + LastTime = DateTime.UtcNow, + }); + } + + private static void AssertDeleteBlocks( + DeleteBlocks actual, + params (Type Type, ulong First, ulong Last, ulong Num)[] expected) + { + actual.Count.ShouldBe(expected.Length); + for (var i = 0; i < expected.Length; i++) + { + actual[i].GetType().ShouldBe(expected[i].Type); + var (first, last, num) = actual[i].GetState(); + first.ShouldBe(expected[i].First); + last.ShouldBe(expected[i].Last); + num.ShouldBe(expected[i].Num); + } + } + private static T InvokePrivate(object target, string methodName, params object[] args) { var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs new file mode 100644 index 0000000..7ba10ef --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs @@ -0,0 +1,74 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class LeafNodeHandlerTests +{ + [Fact] // T:1984 + public void LeafNodeCompressionAuto_ShouldSucceed() + { + var options = new ServerOptions(); + var errors = new List(); + var warnings = new List(); + + var parseError = ServerOptions.ParseLeafNodes( + new Dictionary + { + ["remotes"] = new List + { + new Dictionary + { + ["url"] = "nats://127.0.0.1:7422", + ["compression"] = new Dictionary + { + ["mode"] = CompressionModes.S2Auto, + ["rtt_thresholds"] = new List { "10ms", "20ms", "30ms" }, + }, + }, + }, + }, + options, + errors, + warnings); + + parseError.ShouldBeNull(); + errors.ShouldBeEmpty(); + options.LeafNode.Remotes.Count.ShouldBe(1); + options.LeafNode.Remotes[0].Compression.Mode.ShouldBe(CompressionModes.S2Auto); + options.LeafNode.Remotes[0].Compression.RttThresholds.Count.ShouldBe(3); + options.LeafNode.Remotes[0].Compression.RttThresholds[0].ShouldBe(TimeSpan.FromMilliseconds(10)); + options.LeafNode.Remotes[0].Compression.RttThresholds[1].ShouldBe(TimeSpan.FromMilliseconds(20)); + options.LeafNode.Remotes[0].Compression.RttThresholds[2].ShouldBe(TimeSpan.FromMilliseconds(30)); + } + + [Fact] // T:2001 + public void LeafNodeConnectionSucceedsEvenWithDelayedFirstINFO_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "nats://127.0.0.1:7422", + ["first_info_timeout"] = "3s", + }, + new Dictionary + { + ["url"] = "ws://127.0.0.1:7423", + ["first_info_timeout"] = "3s", + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(2); + remotes[0].FirstInfoTimeout.ShouldBe(TimeSpan.FromSeconds(3)); + remotes[1].FirstInfoTimeout.ShouldBe(TimeSpan.FromSeconds(3)); + remotes[1].Urls[0].Scheme.ShouldBe("ws"); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs new file mode 100644 index 0000000..587245d --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs @@ -0,0 +1,69 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class LeafNodeProxyTests +{ + [Fact] // T:1899 + public void LeafNodeHttpProxyConnection_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "ws://127.0.0.1:7422", + ["proxy"] = new Dictionary + { + ["url"] = "http://proxy.example.com:8080", + ["timeout"] = "5s", + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].Urls.Count.ShouldBe(1); + remotes[0].Urls[0].Scheme.ShouldBe("ws"); + remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080"); + remotes[0].Proxy.Timeout.ShouldBe(TimeSpan.FromSeconds(5)); + } + + [Fact] // T:1900 + public void LeafNodeHttpProxyWithAuthentication_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "ws://127.0.0.1:7422", + ["proxy"] = new Dictionary + { + ["url"] = "http://proxy.example.com:8080", + ["username"] = "testuser", + ["password"] = "testpass", + ["timeout"] = "5s", + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080"); + remotes[0].Proxy.Username.ShouldBe("testuser"); + remotes[0].Proxy.Password.ShouldBe("testpass"); + remotes[0].Proxy.Timeout.ShouldBe(TimeSpan.FromSeconds(5)); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.cs index 89c5c52..f2b63e0 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.cs @@ -3,7 +3,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; -public sealed class LeafNodeProxyTests +public sealed partial class LeafNodeProxyTests { [Fact] // T:1897 public void LeafNodeHttpProxyConfigParsing_ShouldSucceed() diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs new file mode 100644 index 0000000..d380a46 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs @@ -0,0 +1,77 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class RouteHandlerTests +{ + [Fact] // T:2854 + public void RouteCompressionAuto_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var options = new ServerOptions(); + var parseError = ServerOptions.ParseCluster( + new Dictionary + { + ["name"] = "local", + ["compression"] = new Dictionary + { + ["mode"] = CompressionModes.S2Auto, + ["rtt_thresholds"] = new List { "100ms", "200ms", "300ms" }, + }, + }, + options, + errors, + warnings); + + parseError.ShouldBeNull(); + errors.ShouldBeEmpty(); + options.Cluster.Compression.Mode.ShouldBe(CompressionModes.S2Auto); + options.Cluster.Compression.RttThresholds.Count.ShouldBe(3); + options.Cluster.Compression.RttThresholds[0].ShouldBe(TimeSpan.FromMilliseconds(100)); + options.Cluster.Compression.RttThresholds[1].ShouldBe(TimeSpan.FromMilliseconds(200)); + options.Cluster.Compression.RttThresholds[2].ShouldBe(TimeSpan.FromMilliseconds(300)); + + options = new ServerOptions(); + errors.Clear(); + warnings.Clear(); + parseError = ServerOptions.ParseCluster( + new Dictionary + { + ["compression"] = new Dictionary + { + ["mode"] = CompressionModes.S2Auto, + ["rtt_thresholds"] = new List { "0ms", "100ms", "0ms", "300ms" }, + }, + }, + options, + errors, + warnings); + + parseError.ShouldBeNull(); + errors.ShouldBeEmpty(); + options.Cluster.Compression.RttThresholds.Count.ShouldBe(4); + options.Cluster.Compression.RttThresholds[0].ShouldBe(TimeSpan.Zero); + options.Cluster.Compression.RttThresholds[1].ShouldBe(TimeSpan.FromMilliseconds(100)); + options.Cluster.Compression.RttThresholds[2].ShouldBe(TimeSpan.Zero); + options.Cluster.Compression.RttThresholds[3].ShouldBe(TimeSpan.FromMilliseconds(300)); + + options = new ServerOptions(); + errors.Clear(); + warnings.Clear(); + parseError = ServerOptions.ParseCluster( + new Dictionary + { + ["compression"] = false, + }, + options, + errors, + warnings); + + parseError.ShouldBeNull(); + errors.ShouldBeEmpty(); + options.Cluster.Compression.Mode.ShouldBe(CompressionModes.Off); + } +} diff --git a/porting.db b/porting.db index 414a00cb9ccd83cbedd77c695349dc0d13e9891d..59aaad557b248f54cd317f3b198cf3712e05676e 100644 GIT binary patch delta 13866 zcmd5?3wRXe)t;H1o!yMb5UfQ+g(z07fTC^H+Cu*`v&m++-98UI@JHc! z;d|$sbG~=ZocZQ_-)w!)PZjlDn-wjQ$_)g;6g*)aLvCm<8ask)-LPYXxMKv#>?6m( zu1n-5QWUc;lh+lVYpOFOTT)d~UGA=P)5le3h=2OnF=U_m*imFXK6V7z>ps?r?4*w! zM)s1AiO6>O*db(3`Pe}Z(#L)D0J8Ny)`6_r$J&t<``A8YGkmNK*+d`Pi!95>b|Xvj zu@{g<`PfbscZSgWXsc-dfNVLQc8%*M9JO^Tis|Fn8X~ujVJk*%9mSSOZXLlEE4Mnu zzxRtRgOW{x^c5WOx$Y>MF+OLF51OiZPWsiydf(0Q|C$etVqy9d%X}T(j8Qznj$*khn>_b zF!u;GgB&il9-&+&b&GB%I$#P!T{mSR8C|<>w{EAHe2!YJHQ@2-V8Va%x30#2r)@26 z7T=)|3IvT%A{c}Up%%q=G%qcHXV0=Dpv%rGVL}#bfqeZ)dZw zg+fPm%m_F)jZGNd-^HO0J@GR_dx1Tzj!-RRe!x#qzD#%PA5{E>`k4G9H=Xzy*sSCV z_(=i#HW)1WXy_h|L;T|v8w+NQIY!+#3jFqAb{Bl%LM5#;Scl*E@XieOSziwg?K9bA z|L%`wva7w_n`W`Y`g^kA@aO%fMc{tyk&I)WamG?R+gfV&~1U&CbV!F znQGM&sts_#&RgN1c0QV10CB1OqZk-I26mdTz`!c2ZV*$F!``G`=}M?|b|l*}I!jd(yWp3US{BpC^BX7TqK zG)^L_?-cSGWU}Vz zF*cV!F^E=upC|!k5&t+}qjTwI=^oG}>tb|*_OFn?pBf6E7x6PeaPmXMiB8_Z(CY1I zxo$BZM|w27i0Zy!>Ep<_O>>Xw!?<-^ zC0E2$as}LdTn0CU?^JZ~d-&~oHTMm7jl0Bkb1!oTx#ziUI9cHzY0Q=G8d3QLucB$D z2s2IilFzp?>j*s_FDos>w9V~3OJ?+b<{Lb#{!2bwA8@=CKIx=pDz;%~izdV@&C(pi zeZ^matDTfqpkD|$d#7mqnpZ150S6BTumb_CBY?FBuzdlnEr9I}V7vM4#QMGqqfc|< z|J>xiArqr~hYkmvE(Wke@SA_}*(&Ax#AQVn0b|>#wXm;;D4|C|V}$kMhe{!mO=LTW_oyU76Q$8>2<=Mk67?6_dD_Y957g(?zfpIp_h?(S zPh#rnKPSV%DhO&yJ6aqf2=%0RDnxjnfy%HD9dzB~XGV7hsQTXkc34b`5PnROkHcea zuFX00wSeB^0qj@+I~u@_z+*T046!p(cyI=7r}GHdKaP)tX@U?B6RU*}_4Hc0hAv|= z=puSHJr#y87lu>x1X!?K$fM{y*tT5g5|`8nZ;%hD6-KIZkyGR8ZKx`G8&pAG!`Rm_ z^ft(M`x?*mHJ%om>V*1-$a$db5|%)1moPSB8?%n7WQrJz@@d5kMOOb`==v z5n2FxgxPSTNB9)Rz9-P)rT2ufBq>7d6=55#_zZ9Lkt@Op4Vf!Ga$WGKJQ_tPB~??} z!OHJ>OCilMY{{jN>KwM@Qb^T>Ex8m@m19dTg;dM>Y%xZch|6&apvWtdoObC}f)j#W~DFJM90Lu(u z8F1?;HH0Fsz|%*mNVs{F;=~RnWSWLS<^Q7eqDlz)6B(y>P z7?v(H=%IeDA;Vkiq-Eu#rHfs24c&@RbgMKr4&6{_BuJ`?8^u*oq!C&R4bAYm%ajOR z`3AFOKUE{wj_LNC^yU)_4dRJ~#<9z7n5`Uvuh?g}U+aFXUBb@S%vR@98sbY;o-&D! zSI|-k`3C&*O=B~}CC9}>{#(X)sJ>v#^HTyZT`;bKf(H_?X8X#h2kRTgNU*+TWZ{vw zjJ>s=S90kejDw_kiZqEE&l?3&XcRV97*O~k?tX5OB>buK#{YTbn}9-470}As)tl8fu08Z-z#} z@C%{-H>bJyLa11LA#7J-1iL{~P2MD4SFcz7ocSg9E9C?_OA)8rs0CY3L^;^4v%P&mt=l4uMxUQUaL?q%VL-ri=tIm!Q4*qRu~ zf5T=1&wNbrqnvpO7(!8I{RorQoA$ikY=D0^Gc@TT^|uz&p-;?>TJfmf{NOoD3&B&| zhvaiai)u2X)c#P(u`P=I)Ki-MQWBR&H->JA_U2ln(AOFzXWjVRT1Y)(GHx9ip@f@P z&GY>po!0AS75wX}ITXZe<|K>nAsX3Gvmr`y_YgjF4@s$8bkY_kdzfgrc7Y4jnaclHe&7;X8pE zk2k45+NUbQ6X0}Jc(SxQl%+QBs0>F$h&w7RRm)BEe#H~iZt^(sJZsaK)im>!@~jl! zM!GSyDHH6b1QX={*-{AwmO+D{dryKDZhm6Xf&LRqwqNSQ^iM2H;EROF-Wd+V+`E3# zT($UTNJDYcNU*!ZP2#a0%UX5w=&Ke)Bq9nCEsnk#b17Y?m_nPV267Jpbf=`I3I@f+ z?tzYJID=Y0!F41ZA3Mw&e`#Tz)iC`>!yzO=8wGRYW2b}T31%>K$H%6770Rc8eo*W( zcz+EpVRsFR9pc|}aZs$&yX;+>5S!xHBZ4*dC#1L|A$A1`wHjVt#2R2iVr-WGS0~#gO`%X*^V(vf(SI$|T=dc^U-U zE4D4ZP61k9u^A!iq|JX)5NEz>6UcD(1feuER5cYysW%D7nIug{xrOlg8QUOu`q#E> zZ}#mk4YvY!)~1G`ui29N-#Fre*KBSDt-VCV!Q>M*oZL^^Y9MZYT(bB5gyI-jcM=tv zU$yl&tN}82jr>mV##|n#x#@RthyW%7>gK(_#R?B z;y%O##6-mXhzAgp5Dy|ALQFFm5eoiD)W%T4;kdt<>$T zTT$b#r8{8U--}bzQd6z9b)xGU?*Di>Q9*Ho>jPPdkpmAh-}s%qTU z^0HEQ$?B4FH?*HGHd-B6x};&*hVGp~+_&F5_eDY6@4I*I7X@+Oc6aV${erVFhRp?k))8{)M}9?;oG}LEJ;gj*@#9pLsz>XuZ1&Q1{(n zao3+*bJ%og^V&?TCHOM^>-t>oIgZn<)Lzy;z_zm%O_h4T>T{KYsZ}1N|3+shR#At^ zzmqv&y}{id-jKU>wXAPdbi=Uzr}Ge;Tjfj!A;qQlq^D>1%{_I_+GY5}x>uID+?6Hn z9P9m6)*58)Rc=?ZwRlBYxy!oPS?9J^J8Nr``ybvBB{fyGwG~ya73FSg9X`;t@)-`h zBO}G`NO8zzl~g6zQP1}S^)T{uanN6oBV#}XtqLaS0EA?^ay$3D(oOF5Z(mu(7EM z)U!e<6-+y!J;jynNw;VFzAq2RbX*xAM&+&ET7jHBUw%LEo{m+Wi@apjulX7sb zLHiL`_85mFJxkhc?G4ObRhHRUS(eLV1s1Ny@9x#zS9?hAmj!VTw!htF?cqPd(x4-} zdzT~hm1oj5lm!{#$lZm=-~Eyx?mO>|dui#gIEeeh_s)H35clHUx$nEXl6rR#_u$NS z-n-h>6=Z~icXt7OcUQWAk|6F6+@1T0eWkHf6Gswl1V2vS$i2eRx(V7x+1FUL<}r;< zy-_Wwo>ZBbtxUM`DPr8`|NXyrwzws+?weJvOzJ5tmCG)LUFsX-o6b?&x?Wc!EQ(&aKys;=8I^HVB# zhbzA$yR@~yd+nCpshwVq3U!?Fqf_gWJCojCV-pPKQ*Yoe4kt-O1C0caWCN zp1D&qg>3fpotjayS^aI9Nk1*~ZkFZ(DbHqs3+ZdURpd$v3;&U+16J0iHsk^ksK;CQ zAD1g7PkKi7z^y`1%vMi;nBFq-nBOWy-fPH6AGl?sq-uSDsyc6oj(S%i8EFHn<&j`o zj`V7A3|x^d#bsSEEuDUQFd6o=0n24+Q)q24L5HQ_sHGxvRt23D5sj7D8;&xz3RgDXsIBTu=yCG{`m9-YFYD-sJi=A~P zORT+VBl%hBEH868ad+Rl5t!Jw6DWTtMX|BL#HuR3i(=P6j8=+lLqcF^DL7#+j)bD` zxw1T&Ss0LSFUGjj>bKkKjqw_Y4ldgW@4Z``3@sV1!Jf3tZ&mlOAiAeQRIuE@pDO>z zndJFa<=P!X|0B#(Au^a;5>gE%-@4qS|B$-|EJ3q^gy~9WB(!YDjJ`3!6^&b&-)g9n zgJi}Ui1;tspSb8skrFz=lQA;=+ti&B_}#<%1C|g7@6{dM81FYX`%NHJW)_S+RTAmJ zZ=xy1PI;<+So$_PrFQX%{1r8e-4nh)qQG5IwNg4#JE^k3S-IGaGqSu7HcXy*Ql#nE zSt3v4)y|r-I-Hi>tE(zqz8Ss3UE%%UDt*JLTY{fq7ca4{DyzYdvc0+!dq3FVhyHYX yV+fdo3-_BssI<)Qgz~}vraUaD@_$cr%HjAV1i>z(oH}DA)PN{r>=Z%l8rh delta 4774 zcmZXX3s_X;y2sbIuCvz6nziP}FvH+51E^pkf(j}k<~6k~D5i)AUa-7y3GH;z5|KpB z1WbLsm6|<9yHawO-}97vZ#1>7hNAA&c1B=3)4Zhav#Zm^&O2*#4(A+reh>fucX_{W zeRrx`HgMG)uW$`9LN&vP6B^@&lIr(La{7>h6}$S-U44kSkEB3TJER$SZK?}oQ4EH4;LmVe$YLfvZ;*|W4AEYI(3QiH$|2EnVyvn zh)2kJ;amO^cbwV6LZHL5urXJmc}2>964;Wfd;#ocscSUtL_mandD!C4B-IS(^OQHC z>Kl{MDHWP0DkT5*FTUnz)`kYQWC(#t3YXY!SG zFziY=rVQ&QE6L%Ek0&cL;q~qDu~5=ivBRXtl%CsqQ?O&9F@oYL%G53y*QY26#^_MbR3(?~(ylSU6*KHI^eTGrxvkTb-CgEj zN1@Wk_+0vVq2eN4a{f={(C{B9QQnZd#BW&b6QJU<*90B&Fn(@ytfkI0UqQ@BNUPK| z-}>uOem6Tw0#mBCsOVWPolE50oXp;2-()M;nd~IFLY^dN$VqaH=?=Svdym`A3*3+V zyZjEmkv}d>7Dfxbgjhin-V)8iZ^HLNr*KrfB_0sBifhG6F<%@mHA*X_$E5-|ma4@i zODCldO<&8Z>34aglAuH>Ci!RihTJZ<$%p0l7eYk-!-t6VG#QwM0*y-zW+Q9G3KAPnm=@{2OG;pAJ`m7#fzgh+PAOH56u1U;Rh zNrcRV3z^yonDH-S5y`7Y)%rBe4KH0IDvTMSS)q8K<|jYEvVq!lNE@yVfSrS|m*?T9 zLD~qTFEnScb|`_Do0z9TDACr!t{g29a+=v;aIQp~Ku68g(nM$mJp=YVu6+;6R$Ybk zt-242w(7~S5fxSEX^*n3WgSdkp!I|sE3riDRy`9O+pxsAZTfavzffDW!nv8b!&+F; zD_X=7agDfCGKoJ^({1e_Nw-Fc<=lR&N&Q*9p|-1S>S6VLb&tAP?WujpZQ@pG?`W^# z$$~$jul$eFs(Ev%a95M~@PO(+Xt};f&b-n&?BK4BZ-((7u|BJbXRJnDEWw|H?>=N_ zMvgEQf6b-b+0VQHmXt#5#anZFSxq@5$FP&^%0wf%=Y_@i3}Vzruisd-tUg4Oqoy+Gk zp{+&t(vBAW1kVe9#{u7aRR4(=4>MN!$76aTPo99b6MA`EXbK-JeW~n!xrHG|XPngc zI7tHt9eM+yu%Sbr%Bp+dMu&cjws-0|gnSSFul3EavYgGN-+!&YER){!`J4JOX_={= z*%Z24$qiI%4Rsw=jiJ6n^^BpuK~-+3OQ_};>I+oU40Rq=uA$mcjWX0JRK~4MT2UES zE;)|MxN^xSs6tn+dlor@(ztWUhp3D@mmHw$|E*8v%+G26yq~gw-u+4EB*zE1u)<|t zS5UqqZe}2NCmXl1r!95kG+WKac3UPi?9@ewBesW9OmpS6@(XgMX{u?gJj=w(`SK_^ zQ%;tBvei7yoNi7uyUnuli*i%>N;yNTh^>i-)*r|V;1zKx|8|g7>3-2RK^CoP%x|oZ zO1kYE0=I3pC@*fldDsQxr|ULjw@{zm7G<$wPGp~vkLq%|*=SBT`=P$(PzKYTu^v_% zs3q4PFT>es_Tg}fbz7mjz&-#P&M0PjsK9=jbJnD+C~`1_tx$)k6)cGzK6p6?ojsin zC@+h)!iggLEC`;r$3t}=XIwZ0rbWTJ8TRB&X3<>t@eDitc!neU$#_;`ROStqc|&ej zZ&;3-53;kA4@?K7-QpTyE&n1nmw6rnaijpAx#j3VUu}2HCT{sXrqIq1X0|$7ogHev z@6$|rx(SAzcSOOGa}F1bIp^@uDW5w8LWOgVc#f}ett+DsopcW&S{2&vCfpx2ymwQ zp-}I12B3E=CO5}8d3g8Jpq&ot=4`jImd(_g>YQt4)z4_nU}qHJYX;WmL96Ht(!XRm z@zT#(?-s~lcw~_CZK!FDy5C0gke~`VgPj8)x7A^$wxP}!>%;T-`6=hnaAN#vXM3}( z#W5BphgEM-rP_5_m#?wcTA}Z8uageq+%;ydN38^8mdCmA zJNI*N*yD+Zwgu5WHvaC8fm^KGhz|FN`(+4z<}nuQ*wECOY9S9bO0g+TnJ?*$(#z zI6U0*5VYjl-B8i#6~Y;no!B}q8+L^cre}D9oBDeMINj+U4UO+64i6VZtHP(n!G840 zO?OO{YTeFgOh3lDOC4;zw>vw6QPl);g`>|=lq_(l(>oW={)i9%=4;+`c>1z8Jm1;Zy>;2wV-Oz_OS7+cYwu^_|G@xnIk&0OB<6tE?leeh|(p9lNR{`>!gj1T(nKiX@8 z{`tWEiNF6gkBRs9gpR2`3rvXj7k7E)e;M!ZZN#LnC;AuTllg*rzskrS{><=WdQ}hq zehyx5#Qy+Fdik@B#Jab7`H?vKRIov=s|fI{9coC;alowG+YijXMYp#{y$RGcywQoepS_^odc$ zOlN!&n9V`f$AQOT-=V-HXsd8`hw9@lm3W}h>~_MJpW|FFIT|oSN^3xdd!Gif;PAi) zJ$=44kZlzActVG5F|NU4V+k$_9k%()pRdJM|9=>YVU&BYygn)cZXXJ|F?LW6pL;Xh zmb#TEgGfA*fFvT_kse4-Bne4IdLa)Xy^$2850Z-XMbeOTq#x2B8GvLUnaDt75Hc9a zLWUsO$WSB)8HPNJ3`a&FBau