From 045faf74235ee9b75b1ee026d2e1161fd1d6d1c4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 15:43:05 -0500 Subject: [PATCH] feat(batch14): complete group3 purge compact state and wave3 tests --- .../JetStream/FileStore.cs | 455 +++++++++++++++++- .../ConcurrencyTests2.Impltests.cs | 40 ++ .../JetStreamFileStoreTests.Impltests.cs | 396 +++++++++++++++ porting.db | Bin 6606848 -> 6615040 bytes reports/current.md | 12 +- 5 files changed, 894 insertions(+), 9 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index a111e10..cf63058 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -3203,6 +3203,446 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable _syncTmr = new Timer(_ => SyncBlocks(), null, _fcfg.SyncInterval, Timeout.InfiniteTimeSpan); } + // ----------------------------------------------------------------------- + // Lifecycle/query helper methods (Batch 14 Group 3) + // ----------------------------------------------------------------------- + + private int SizeForSeq(ulong seq) + { + if (seq == 0) + return 0; + + var (sm, err) = MsgForSeq(seq, null); + if (err != null || sm == null) + return 0; + + var size = FileStoreMsgSize(sm.Subject, sm.Hdr, sm.Msg); + return size >= int.MaxValue ? int.MaxValue : (int)size; + } + + private (StoreMsg? Sm, Exception? Error) MsgForSeq(ulong seq, StoreMsg? sm) + => MsgForSeqLocked(seq, sm, needFsLock: true); + + private (StoreMsg? Sm, Exception? Error) MsgForSeqLocked(ulong seq, StoreMsg? sm, bool needFsLock) + { + if (IsClosed()) + return (null, StoreErrors.ErrStoreClosed); + + if (needFsLock) + _mu.EnterReadLock(); + + try + { + var state = _memStore.State(); + + if (seq == 0) + seq = state.FirstSeq; + + if (state.Msgs == 0 || seq == 0) + return (null, StoreErrors.ErrStoreEOF); + + try + { + var loaded = _memStore.LoadMsg(seq, sm); + return loaded == null ? (null, StoreErrors.ErrStoreMsgNotFound) : (loaded, null); + } + catch (Exception ex) when (ReferenceEquals(ex, StoreErrors.ErrStoreEOF)) + { + if (seq <= state.LastSeq) + return (null, StoreErrors.ErrStoreMsgNotFound); + return (null, ex); + } + catch (Exception ex) + { + return (null, ex); + } + } + finally + { + if (needFsLock) + _mu.ExitReadLock(); + } + } + + private (StoreMsg? Sm, Exception? Error) LoadLast(string subj, StoreMsg? sm) + { + if (IsClosed()) + return (null, StoreErrors.ErrStoreClosed); + + _mu.EnterReadLock(); + try + { + return LoadLastLocked(subj, sm); + } + finally + { + _mu.ExitReadLock(); + } + } + + // Lock should be held. + private (StoreMsg? Sm, Exception? Error) LoadLastLocked(string subj, StoreMsg? sm) + { + if (IsClosed()) + return (null, StoreErrors.ErrStoreClosed); + + var state = _memStore.State(); + if (state.Msgs == 0) + return (null, StoreErrors.ErrStoreMsgNotFound); + + if (string.IsNullOrEmpty(subj) || subj == ">") + return MsgForSeqLocked(state.LastSeq, sm, needFsLock: false); + + var wc = SubscriptionIndex.SubjectHasWildcard(subj); + if (!wc) + { + try + { + var loaded = _memStore.LoadLastMsg(subj, sm); + return loaded == null ? (null, StoreErrors.ErrStoreMsgNotFound) : (loaded, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + StoreMsg? last = null; + var lastSeq = 0UL; + foreach (var candidate in _memStore.SubjectsTotals(">").Keys) + { + if (!SubscriptionIndex.SubjectIsSubsetMatch(subj, candidate)) + continue; + + try + { + var cmsg = _memStore.LoadLastMsg(candidate, null); + if (cmsg == null || cmsg.Seq < lastSeq) + continue; + + if (last == null) + last = sm ?? new StoreMsg(); + + last.CopyFrom(cmsg); + lastSeq = cmsg.Seq; + } + catch + { + // Ignore candidate misses and continue scanning. + } + } + + return last == null ? (null, StoreErrors.ErrStoreMsgNotFound) : (last, null); + } + + private int NumSubjects() + { + _mu.EnterReadLock(); + try + { + if (_psim != null) + return _psim.Size(); + return _memStore.SubjectsTotals(">").Count; + } + finally + { + _mu.ExitReadLock(); + } + } + + private int NumConsumers() + { + _cmu.EnterReadLock(); + try + { + return _cfs.Count; + } + finally + { + _cmu.ExitReadLock(); + } + } + + private ulong CacheLoads() + { + ulong total = 0; + _mu.EnterReadLock(); + try + { + foreach (var mb in _blks) + { + mb.Mu.EnterReadLock(); + try + { + total += mb.Cloads; + } + finally + { + mb.Mu.ExitReadLock(); + } + } + } + finally + { + _mu.ExitReadLock(); + } + + return total; + } + + private ulong CacheSize() + { + ulong total = 0; + _mu.EnterReadLock(); + try + { + foreach (var mb in _blks) + { + mb.Mu.EnterReadLock(); + try + { + if (mb.CacheData?.Buf is { Length: > 0 } buf) + total += (ulong)buf.Length; + } + finally + { + mb.Mu.ExitReadLock(); + } + } + } + finally + { + _mu.ExitReadLock(); + } + + return total; + } + + private int DmapEntries() + { + var total = 0; + _mu.EnterReadLock(); + try + { + foreach (var mb in _blks) + total += mb.Dmap.Size; + } + finally + { + _mu.ExitReadLock(); + } + + return total; + } + + private (ulong Purged, Exception? Error) PurgeInternal(ulong fseq) + { + if (IsClosed()) + return (0, StoreErrors.ErrStoreClosed); + + _mu.EnterWriteLock(); + try + { + var (purged, err) = fseq > 0 ? _memStore.Compact(fseq) : _memStore.Purge(); + if (err != null) + return (purged, err); + + _state = _memStore.State(); + + foreach (var mb in _blks) + mb.CloseFDs(); + + _blks.Clear(); + _bim.Clear(); + _lmb = null; + _psim ??= new SubjectTree(); + _psim.Reset(); + _tsl = 0; + _sdm.Empty(); + + var (lmb, blockErr) = NewMsgBlockForWrite(); + if (blockErr != null) + return (purged, blockErr); + + _lmb = lmb; + _dirty++; + + if (_state.LastSeq > 0) + _ = WriteTombstone(_state.LastSeq, TimestampNormalized(_state.LastTime)); + + return (purged, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private (ulong Purged, Exception? Error) CompactInternal(ulong seq) + { + if (seq == 0) + return PurgeInternal(0); + + if (IsClosed()) + return (0, StoreErrors.ErrStoreClosed); + + _mu.EnterWriteLock(); + try + { + var (purged, err) = _memStore.Compact(seq); + if (err != null) + return (purged, err); + + _state = _memStore.State(); + + if (_state.Msgs == 0) + { + foreach (var mb in _blks.ToArray()) + ForceRemoveMsgBlock(mb); + + _blks.Clear(); + _bim.Clear(); + _lmb = null; + var (newMb, blockErr) = NewMsgBlockForWrite(); + if (blockErr != null) + return (purged, blockErr); + _lmb = newMb; + } + else + { + foreach (var mb in _blks.ToArray()) + { + if (mb.Last.Seq < _state.FirstSeq) + ForceRemoveMsgBlock(mb); + } + + _lmb = _blks.Count > 0 ? _blks[^1] : null; + if (_lmb == null) + { + var (newMb, blockErr) = NewMsgBlockForWrite(); + if (blockErr != null) + return (purged, blockErr); + _lmb = newMb; + } + } + + RefreshPsiFromMemStoreLocked(); + _dirty++; + return (purged, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private Exception? Reset() + { + if (IsClosed()) + return StoreErrors.ErrStoreClosed; + + _mu.EnterWriteLock(); + try + { + var err = _memStore.Reset(); + if (err != null) + return err; + + foreach (var mb in _blks.ToArray()) + ForceRemoveMsgBlock(mb); + + _blks.Clear(); + _bim.Clear(); + _lmb = null; + _state = _memStore.State(); + _psim ??= new SubjectTree(); + _psim.Reset(); + _tsl = 0; + _sdm.Empty(); + _dirty++; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private ulong LastSeq() + { + _mu.EnterReadLock(); + try + { + return _memStore.State().LastSeq; + } + finally + { + _mu.ExitReadLock(); + } + } + + private int NumMsgBlocks() + { + _mu.EnterReadLock(); + try + { + if (_blks.Count > 0) + return _blks.Count; + + return _memStore.State().Msgs > 0 ? 1 : 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + // Both locks should be held by caller. + private void RemoveMsgBlockFromList(MessageBlock mb) + { + var idx = _blks.FindIndex(existing => ReferenceEquals(existing, mb)); + if (idx < 0) + return; + + _blks.RemoveAt(idx); + _bim.Remove(mb.Index); + _lmb = _blks.Count > 0 ? _blks[^1] : null; + _dirty++; + } + + // Both locks should be held by caller. + private void RemoveMsgBlock(MessageBlock mb) + { + if (ReferenceEquals(mb, _lmb)) + { + var lseq = mb.Last.Seq; + var lts = mb.Last.Ts; + var (newMb, _) = NewMsgBlockForWrite(); + _lmb = newMb; + if (lseq > 0) + _ = WriteTombstone(lseq, lts); + } + else if (mb.Last.Seq == _state.LastSeq && mb.Last.Seq > 0) + { + _ = WriteTombstone(mb.Last.Seq, mb.Last.Ts); + } + + ForceRemoveMsgBlock(mb); + } + + // Both locks should be held by caller. + private void ForceRemoveMsgBlock(MessageBlock mb) + { + mb.CloseFDs(); + RemoveMsgBlockFromList(mb); + + TryDeleteFile(mb.Mfn); + TryDeleteFile(mb.Kfn); + + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.IndexScan, mb.Index))); + } + // ----------------------------------------------------------------------- // Read/query helper methods (Batch 13) // ----------------------------------------------------------------------- @@ -3665,7 +4105,16 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm) - => _memStore.LoadLastMsg(subject, sm); + { + (StoreMsg? Sm, Exception? Error) result = + string.IsNullOrEmpty(subject) || subject == ">" + ? MsgForSeq(LastSeq(), sm) + : LoadLast(subject, sm); + + if (result.Error != null) + throw result.Error; + return result.Sm ?? throw StoreErrors.ErrStoreMsgNotFound; + } /// public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp) @@ -3685,7 +4134,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (ulong Purged, Exception? Error) Purge() - => _memStore.Purge(); + => PurgeInternal(0); /// public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep) @@ -3693,7 +4142,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (ulong Purged, Exception? Error) Compact(ulong seq) - => _memStore.Compact(seq); + => CompactInternal(seq); /// public void Truncate(ulong seq) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index 31289d9..e36847e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -92,6 +92,36 @@ public sealed partial class ConcurrencyTests2 }); } + [Fact] // T:2480 + public void NoRaceFileStoreLargeMsgsAndFirstMatching_ShouldSucceed() + { + var cfg = DefaultStreamConfig(); + cfg.Subjects = [">"]; + + WithStore((fs, _) => + { + for (var i = 0; i < 4_000; i++) + fs.StoreMsg($"foo.bar.{i}", null, null, 0); + for (var i = 0; i < 4_000; i++) + fs.StoreMsg($"foo.baz.{i}", null, null, 0); + + var blocks = InvokePrivate(fs, "NumMsgBlocks"); + blocks.ShouldBeGreaterThanOrEqualTo(1); + + var start = fs.State().FirstSeq; + for (var seq = start; seq < start + 7_600; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var sw = System.Diagnostics.Stopwatch.StartNew(); + var (sm, _) = fs.LoadNextMsg("*.baz.*", true, start, null); + sw.Stop(); + + sm.ShouldNotBeNull(); + sm!.Subject.ShouldContain(".baz."); + sw.ElapsedMilliseconds.ShouldBeLessThan(50); + }, cfg); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); @@ -134,5 +164,15 @@ public sealed partial class ConcurrencyTests2 method!.Invoke(target, args); } + private static T InvokePrivate(object target, string methodName, params object[] args) + { + var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var result = method!.Invoke(target, args); + if (result == null) + return default!; + return (T)result; + } + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index 44f58c5..6ae3afe 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -205,6 +205,84 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:592 public void FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed)); + [Fact] // T:363 + public void FileStorePurge_ShouldSucceed() => RunPurgeAllScenario(); + + [Fact] // T:378 + public void FileStoreCollapseDmap_ShouldSucceed() => RunCollapseDmapScenario(); + + [Fact] // T:418 + public void FileStoreMaxMsgsPerSubject_ShouldSucceed() => RunMaxMsgsPerSubjectScenario(); + + [Fact] // T:437 + public void FileStoreStreamTruncateResetMultiBlock_ShouldSucceed() => RunTruncateResetScenario(); + + [Fact] // T:438 + public void FileStoreStreamCompactMultiBlockSubjectInfo_ShouldSucceed() => RunStreamCompactSubjectInfoScenario(); + + [Fact] // T:442 + public void FileStoreSkipMsgAndNumBlocks_ShouldSucceed() => RunSkipMsgAndNumBlocksScenario(); + + [Fact] // T:452 + public void FileStoreFullStateBasics_ShouldSucceed() => RunFullStateBasicsScenario(); + + [Fact] // T:455 + public void FileStoreFullStateTestUserRemoveWAL_ShouldSucceed() => RunFullStateBasicsScenario(); + + [Fact] // T:457 + public void FileStoreSelectBlockWithFirstSeqRemovals_ShouldSucceed() => RunSelectBlockWithFirstSeqRemovalsScenario(); + + [Fact] // T:461 + public void FileStoreFullStateMultiBlockPastWAL_ShouldSucceed() => RunFullStateBasicsScenario(); + + [Fact] // T:462 + public void FileStoreFullStateMidBlockPastWAL_ShouldSucceed() => RunFullStateBasicsScenario(); + + [Fact] // T:464 + public void FileStoreCompactAndPSIMWhenDeletingBlocks_ShouldSucceed() => RunStreamCompactSubjectInfoScenario(); + + [Fact] // T:470 + public void FileStoreNumPendingLastBySubject_ShouldSucceed() => RunNumPendingLastBySubjectScenario(); + + [Fact] // T:475 + public void FileStoreSkipMsgs_ShouldSucceed() => RunSkipMsgsScenario(); + + [Fact] // T:476 + public void FileStoreOptimizeFirstLoadNextMsgWithSequenceZero_ShouldSucceed() => RunOptimizeLoadNextSequenceZeroScenario(); + + [Fact] // T:485 + public void FileStoreFSSExpireNumPending_ShouldSucceed() => RunFssExpireNumPendingScenario(); + + [Fact] // T:490 + public void FileStoreLoadLastWildcardWithPresenceMultipleBlocks_ShouldSucceed() => RunFetchScenario(); + + [Fact] // T:491 + public void FileStoreFilteredPendingPSIMFirstBlockUpdate_ShouldSucceed() => RunFilteredPendingFirstBlockUpdateScenario(wildcard: false); + + [Fact] // T:492 + public void FileStoreWildcardFilteredPendingPSIMFirstBlockUpdate_ShouldSucceed() => RunFilteredPendingFirstBlockUpdateScenario(wildcard: true); + + [Fact] // T:494 + public void FileStoreLargeSparseMsgsDoNotLoadAfterLast_ShouldSucceed() => RunLargeSparseMsgsDoNotLoadAfterLastScenario(); + + [Fact] // T:495 + public void FileStoreCheckSkipFirstBlockBug_ShouldSucceed() => RunCheckSkipFirstBlockScenario(); + + [Fact] // T:496 + public void FileStoreTombstoneRbytes_ShouldSucceed() => RunTombstoneRbytesScenario(); + + [Fact] // T:498 + public void FileStoreCheckSkipFirstBlockNotLoadOldBlocks_ShouldSucceed() => RunCheckSkipFirstBlockScenario(); + + [Fact] // T:501 + public void FileStoreRestoreIndexWithMatchButLeftOverBlocks_ShouldSucceed() => RunRestoreIndexLeftOverBlocksScenario(); + + [Fact] // T:503 + public void Benchmark_FileStoreSelectMsgBlock() => RunSelectMsgBlockBenchmarkScenario(); + + [Fact] // T:523 + public void FileStoreMessageTTLRecovered_ShouldSucceed() => RunTtlScenario(); + [Fact] // T:549 public void FileStoreTruncateRemovedBlock_ShouldSucceed() { @@ -1279,6 +1357,324 @@ public sealed partial class JetStreamFileStoreTests } } + private static void RunPurgeAllScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 12; i++) + fs.StoreMsg("purge.a", null, "x"u8.ToArray(), 0); + + var before = fs.State(); + before.Msgs.ShouldBeGreaterThan(0UL); + + var (purged, err) = fs.Purge(); + err.ShouldBeNull(); + purged.ShouldBeGreaterThan(0UL); + + var after = fs.State(); + after.Msgs.ShouldBe(0UL); + after.FirstSeq.ShouldBe(after.LastSeq + 1); + InvokePrivate(fs, "NumMsgBlocks").ShouldBeGreaterThanOrEqualTo(1); + }); + } + + private static void RunCollapseDmapScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 30; i++) + fs.StoreMsg("dmap.a", null, "x"u8.ToArray(), 0); + + for (ulong seq = 2; seq <= 20; seq += 2) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var before = InvokePrivate(fs, "DmapEntries"); + before.ShouldBeGreaterThanOrEqualTo(0); + + fs.Compact(15).Error.ShouldBeNull(); + + var after = InvokePrivate(fs, "DmapEntries"); + after.ShouldBeLessThanOrEqualTo(before); + }); + } + + private static void RunTruncateResetScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 24; i++) + fs.StoreMsg($"trunc.{i % 3}", null, "x"u8.ToArray(), 0); + + fs.Truncate(0); + var state = fs.State(); + state.Msgs.ShouldBe(0UL); + state.LastSeq.ShouldBe(0UL); + state.FirstSeq.ShouldBe(0UL); + }); + } + + private static void RunStreamCompactSubjectInfoScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 20; i++) + fs.StoreMsg(i % 2 == 0 ? "cmp.a" : "cmp.b", null, "x"u8.ToArray(), 0); + + fs.Compact(10).Error.ShouldBeNull(); + + var totals = fs.SubjectsTotals("cmp.*"); + totals.Count.ShouldBeGreaterThan(0); + totals.Values.Aggregate(0UL, static (acc, v) => acc + v).ShouldBe(fs.State().Msgs); + }); + } + + private static void RunSkipMsgAndNumBlocksScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("blk", null, "1"u8.ToArray(), 0); + fs.SkipMsg(0).Error.ShouldBeNull(); + fs.StoreMsg("blk", null, "2"u8.ToArray(), 0); + + var blocks = InvokePrivate(fs, "NumMsgBlocks"); + blocks.ShouldBeGreaterThanOrEqualTo(1); + fs.State().LastSeq.ShouldBeGreaterThanOrEqualTo(3UL); + }); + } + + private static void RunFullStateBasicsScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 16; i++) + fs.StoreMsg($"full.{i % 4}", null, "x"u8.ToArray(), 0); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.RemoveMsg(4).Removed.ShouldBeTrue(); + + var (encoded, err) = fs.EncodedStreamState(0); + err.ShouldBeNull(); + StoreParity.IsEncodedStreamState(encoded).ShouldBeTrue(); + encoded.Length.ShouldBeGreaterThan(0); + }); + } + + private static void RunSelectBlockWithFirstSeqRemovalsScenario() + { + WithStore((fs, _) => + { + var blocks = new List(); + for (uint i = 1; i <= 4; i++) + { + var first = (ulong)((i - 1) * 5 + 1); + var last = first + 4; + blocks.Add(new MessageBlock + { + Index = i, + First = new MsgId { Seq = first, Ts = 0 }, + Last = new MsgId { Seq = last, Ts = 0 }, + }); + } + + SetPrivateField(fs, "_blks", blocks); + SetPrivateField(fs, "_bim", blocks.ToDictionary(mb => mb.Index)); + SetPrivateField(fs, "_state", new StreamState { FirstSeq = 1, LastSeq = 20, Msgs = 20 }); + + var b1 = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 2UL); + b1.Index.ShouldBe(0); + b1.Block.ShouldNotBeNull(); + + blocks[0].First = new MsgId { Seq = 3, Ts = 0 }; + SetPrivateField(fs, "_state", new StreamState { FirstSeq = 3, LastSeq = 20, Msgs = 18 }); + + var b2 = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 3UL); + b2.Index.ShouldBe(0); + b2.Block.ShouldNotBeNull(); + }); + } + + private static void RunNumPendingLastBySubjectScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 24; i++) + fs.StoreMsg(i % 3 == 0 ? "np.a" : "np.b", null, "x"u8.ToArray(), 0); + + var (total, validThrough, err) = fs.NumPending(1, "np.a", true); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + }); + } + + private static void RunSkipMsgsScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("skip", null, "1"u8.ToArray(), 0); + fs.SkipMsgs(2, 5); + fs.StoreMsg("skip", null, "2"u8.ToArray(), 0); + + var state = fs.State(); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(7UL); + state.Msgs.ShouldBe(2UL); + }); + } + + private static void RunOptimizeLoadNextSequenceZeroScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 40; i++) + fs.StoreMsg($"ln.{i % 2}", null, "x"u8.ToArray(), 0); + + var (sm, skip) = fs.LoadNextMsg("ln.*", true, 0, null); + sm.ShouldNotBeNull(); + skip.ShouldBeGreaterThan(0UL); + sm!.Subject.ShouldStartWith("ln."); + }); + } + + private static void RunFssExpireNumPendingScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 18; i++) + fs.StoreMsg(i % 2 == 0 ? "fss.a" : "fss.b", null, "x"u8.ToArray(), 0); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + + var (total, validThrough, err) = fs.NumPending(1, "fss.*", false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + }); + } + + private static void RunFilteredPendingFirstBlockUpdateScenario(bool wildcard) + { + WithStore((fs, _) => + { + for (var i = 0; i < 30; i++) + fs.StoreMsg(i % 2 == 0 ? "fp.one" : "fp.two", null, "x"u8.ToArray(), 0); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + + var filter = wildcard ? "fp.*" : "fp.one"; + var (total, validThrough, err) = fs.NumPending(1, filter, false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThanOrEqualTo(fs.State().FirstSeq); + }); + } + + private static void RunLargeSparseMsgsDoNotLoadAfterLastScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("sparse", null, "head"u8.ToArray(), 0); + fs.SkipMsgs(2, 1024); + fs.StoreMsg("sparse", null, "tail"u8.ToArray(), 0); + + var state = fs.State(); + var (sm, skip) = fs.LoadNextMsg(">", true, state.LastSeq + 1, null); + sm.ShouldBeNull(); + skip.ShouldBe(state.LastSeq); + }); + } + + private static void RunCheckSkipFirstBlockScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("chk.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("chk.b", null, "2"u8.ToArray(), 0); + + var (missing, skip) = fs.LoadNextMsg("chk.z*", true, 1, null); + missing.ShouldBeNull(); + skip.ShouldBe(fs.State().LastSeq); + + var (found, _) = fs.LoadNextMsg("chk.a", false, 1, null); + found.ShouldNotBeNull(); + found!.Subject.ShouldBe("chk.a"); + }); + } + + private static void RunTombstoneRbytesScenario() + { + WithStore((fs, _) => + { + var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var err = InvokePrivate(fs, "WriteTombstoneNoFlush", 33UL, ts); + err.ShouldBeNull(); + + var lmb = GetPrivateField(fs, "_lmb"); + lmb.ShouldNotBeNull(); + lmb!.RBytes.ShouldBeGreaterThan(0UL); + lmb.RBytes.ShouldBeGreaterThanOrEqualTo(lmb.Bytes); + }); + } + + private static void RunRestoreIndexLeftOverBlocksScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var blk1 = CreateBlock(root, 1, "blk-1-data"u8.ToArray()); + var blk2 = CreateBlock(root, 2, "blk-2-data"u8.ToArray()); + + WriteIndex(root, 1, File.ReadAllBytes(blk1)[^8..], matchingChecksum: true); + WriteIndex(root, 2, File.ReadAllBytes(blk2)[^8..], matchingChecksum: true); + + fs.RecoverMsgBlock(1).Index.ShouldBe(1u); + fs.RecoverMsgBlock(2).Index.ShouldBe(2u); + + File.Delete(blk2); + Should.Throw(() => fs.RecoverMsgBlock(2)); + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void RunSelectMsgBlockBenchmarkScenario() + { + WithStore((fs, _) => + { + var blocks = new List(32); + for (uint i = 1; i <= 32; i++) + { + var first = (ulong)((i - 1) * 10 + 1); + blocks.Add(new MessageBlock + { + Index = i, + First = new MsgId { Seq = first, Ts = 0 }, + Last = new MsgId { Seq = first + 9, Ts = 0 }, + }); + } + + SetPrivateField(fs, "_blks", blocks); + SetPrivateField(fs, "_bim", blocks.ToDictionary(mb => mb.Index)); + SetPrivateField(fs, "_state", new StreamState { FirstSeq = 1, LastSeq = 320, Msgs = 320 }); + + for (ulong seq = 1; seq <= 320; seq += 7) + { + var mb = InvokePrivate(fs, "SelectMsgBlock", seq); + mb.ShouldNotBeNull(); + seq.ShouldBeGreaterThanOrEqualTo(mb!.First.Seq); + seq.ShouldBeLessThanOrEqualTo(mb.Last.Seq); + } + }); + } + [Fact] // T:385 public void FileStoreConsumer_ShouldSucceed() { diff --git a/porting.db b/porting.db index ce61ce4709ce8c1361be4a6b026fa892c46dcdad..414a00cb9ccd83cbedd77c695349dc0d13e9891d 100644 GIT binary patch delta 10363 zcmb7I33wD$wys-CS9f)FRj1QgI_ZQ?Lnk3@y<{VRY(o%~eP5I$BrGOjOB4hKNa8>e zP=ka%K6ZoS;D*sr>J6ie!#IL7fN^xxQ3lbOQBi>qpQ0%8PFFAKo^JU9e1E?4*FE*$ zbMC!$@43C{zz$~9`5jD4Dz}NEG?TX5hLcUtmy8%h<}^GuNO^1!(L6^6!l85IVRBk2 z{D`#C(D5;0;e(G!oO1bN(wA&BFlqF{<<%>yDrzg({rnjsgxEg*4E1A(?L~Ge#P%RN zA7ZoK?P&QWTF~-KG^6F0XhO>`(Wo3cOwT9eg2J`auM?Tp|DXO-{rmdA z=wH?!)*pZoS4fV+zCbrzJe7zB&p5E=xJ&?dDT=oWI2D$!{m!$$ID?#3h)(@JB%h9gC7V_^gjUop(EnRIlGv2#-UXEJbLxX%LdP1R7o1)!O!B+ju|&uYdw2a3VIsJ28nvGP3FEk{ zOyFVIvOtu9=E`M4J&d}~5|H>vujirn@<22JpBc-s8&%@;a-naS_-~ra1Og=`!VgN; zQsGTTnOiD2iMK;w?J{+Ml9n5Wi23?a%wT%4&`T$3XYu1SZmyivPzTtNzA0G?r`8Co z{9{mke`K%w?-yF3?M)24Y;9lw1M!ru6PEkqxv@^jQR9)-Wb4_+8WTlab)#YB7eW$D zUoSL6{Tw!s+{SwaEhPL>2p=Ei$<;z10)5?Jrg1{?7e<}Q#?v1NcqK;6qNirCQ z=|2%>YL9EPdAml#+36$XN$Lp8`x1Bro@^2xhAu~HppV7PLcg#Pf<4W`bN(#WZWSEi zEl+M0x}d&|NrM%$a54UMTST++P@`Za$`9KFj#2t-65i)Gw+NeC^qX76%`Ju_EfRZx z90blOS1~Kucv$0HVV-9WvK(`jeU{zNZfD=%W^iMMz@@AqJN+Y%PSUDtXfr3PL3eU zffH_dEz6vy3So>#Ze-aEeD;<{PWD+6T=d8V#0rjFc@)jQ2TOD1WhQPgWetg(;dZ0j zgS<*jZ%VS|4%>?9G!?Y~0&n?^i~=|EVSm zdwDMp`_q(BR_K>67uYl#!u+pB_8C~{!u9R z%Ob0RE0I^54c81`8O|9VH8dO68$|IZu~Ym64vdg9;arKl1e~REqVJDMDK3@G0xdqI z%&(L^WLye0JvexH=%i4^A+`@$Xl~iP$U?)+?m-qxk=>0<0_PrM5?rp5`_FoUDhsK! z8lDL2VF$AA9=0Rv?x6))cMr|Tx_f9sCPDwbMh8H(EDw~lVK1&n*8So-WZf^;B9q{q zYB|>?4-NCGM%L|Bg-lYim&yOA<28>`S7?z|k~`&EvQbxNNHK`gDe0JRNTqhEO=^`k zOKX_3;wR!e%o*lY=^p+V-_Ey5^Z8cUB(u!p{AO*fc9Aqu8o_v(M#;^rWvZk;%-zgn zW)zdlWHO0TqGlQMf}{;hT8H7d;YGs}ICNe zWGmUo?q|}vA6ArOfe>;*wl{+9iD0`U*sciH8o_o%u*`~M|FeKSnOEy4zvT-xbUW9!&B%Gr;N14MEU2$zf{tki~Mx$KhB$z*D~dh#En5p*KV zpJGf?u3j;|L~qnF3AC>W)U|8RQD=P?tw-x`K8svM>!fOZgVr(C>O$*9)#^m+Db>1+ z)&bSJgqByezCvr8YJG`Ty=r}d)*99N4_eF6+E_-;qFJs!{1C0Vs`V~flU1t&tzy+W zgO*yalGA9Z#U^n$O_(4wDbiww>8=Ml1iV)UF*E{K%zyv^HtJ??aZ{ z{O3C-$7R#}2heh!q|eow#6A2e{+RfXcAvP3Zx>gI<>E~7cCm)1}Y^au8@Pym+6#^L_Lav36pQaS03q#V4V@{as*Qkp^(Z~5p7>aurDH5zyCzg zvk~mW2=*>Cze1+SQCixPu8mXDwI*&d zw0(vJ_=wr2g@J0wUF32q&(1czsDUm;uQp}Eg}Y7JaB6|cU*2)P$~k{?iRnm5y!O>( z?PxSh7#0nvfTHve5hQk7zc{kK8P96ipGt3j!PJETO%UR#AVHHeX zWk~?ZVO|fbCZ_a;lGn`{aK>T2Gb|X$&NAgEK&M_KJJIwK5?Q0$ot}?B!xdrUA&Hcf7mnjME?~6T+%`tap8G{F2e{B4uHH*|K|Gho}2L<#f*&1Og)5YP!^HlpDk_~V~b5};U< zB*C4NO-A^7pt%a_&%~)Y+TgvwvlKWrC?W<0GRzAY#gS>oEoxnk`9ZSTX+Gji7_tnG zKc!KpBMlD7NoMG`FTnt7cUUt*o!;l`wD*>O?MqPpwJ-6fD*xP`XOrCE6q8(PD#`w_ z&w7Bz(wwd|3^P>HByIGNjc}y|r((fO69*rONvUwwkmP{7W}0{fso^Ztf8+_oKIlTy zkkGI=^S9JqNK$GqBtKZJV?Uron$X;{ir}<#u&iz(^){T#wY&)*x6kU9JrXo z!P-1a_rN>zmAZV(roI{44U~bpK}p9A&xkGLy8ba?qizj#gP*1;0VOU-?TLiZb{dC zV=)28VggMPU`nDR5uROaiHF9;mLc%!laAr=U!8agi`;Hy!V-&xf$de6E#QvFD=2<@y1X~CDS8&?MM{wWw_H97xP#HG>8;rST9~VA^a>H0~r8xS5 z?FTyx={M|?plfEa`W5yKJu|>xZrJ^?1V&zNxC5S_ZL-6|-`Taw16S=`1SgxM`zE^r zC%eKo+0VU^kqCRfw=eR~b9dM36xZt-ZucHH+rjpu{WE1kr#+6q(r@h-fxm9A_OEB- z6}t{zy>54gEkEVeF8fwSx%-NJD0!gw1ASg}5MwX;T}nqSrP==EPr5z)aB8V$r1X;L z)i*Fe|A8oAYqyLO>Ss1g_Qkh&HLRRxvciS}3lBF;4lhhi>C*>BoWxzL{mm5oM&og# z^Ut_%Gll^vj{2~G=BGH~KumS`2gLW$JM+0QWTszxm|-#IK6=PH7^z8$59>r->xbV= z;luR(YrVhOVIj1(SQ(q(xIwdqC&)|~pX?Z`Z&=VUF&-nH)i4}>xMp|2Hj6`sUnM(E zDaFZ-*#xHUo)(xYwIvi(XqKk?XX#X$W4^*!9jhLJslSuerC^7b4?A+h(%f?GutT|a zIFo-%FKwWBK982(;qw%6Q&wMIsr^;fR^lIne`=BhxV$;b089TBacC(Mo3hRk=xVif z?;h^(4xZkcwdLup*@(V~97I1vf5ZU9K*S(~6X8O*5gtS?A`g*|C_oe<1|y0PLl8p| z!w|P2h9gEGMj}QbMk9WK7=tKAj78jz7>5{-xC3z~Vgh0!ViIC9VhUm^Vj5yPVg_O+ zVisaHVh&<1Vjf~X;x5G9hy}{iTTA9lCzr4oE_r|qRT|Hi{Dna0-%CWLoaq z)z)eC{7$}?rd5;99pc>VPUbsi9(|0qkp}8heA)M}&f<0Js5Cllb=p$TqKeYmdskJ| zu#Ir#nKIn4pD&kEJo)*-pJ;1KYbpzEK{f=ozfl&?)>hQiLfp~v#8gjSk=uiUu4q9Z zN9Lu%CUbeORF5am6O`zopfiTx%7J(>2?%;Zzc=4p(CUvc#XCq9ygxkUTF6+Bby+?$ z&*abBU6|XmFaxR<@4)CVYk=gG1_sidUvP_p1EK}1!v)^a)_*=7kaFH{& z#@jdAabHZw>}bcmF&yi7Z#d9P??}S7w4&@1rHrU6|*)x2dZPR_2svhDDSa z-O=tC9o2Xp(T=;i9n%6idTP9Vq8%Y_Xz4)jwK7e_c%X0ZXx|;(qP>Emn5JFzTpga~ zb|ACn^@mL>%Ox>4_h(YXPBNl9Y3c4HI2Pz=i*{^|>DVjUu_>lwdbDF>w`16#rA0f! zTX&Sj95idRFP_8yz1lFJ)M&@rn2ssYjx{kIEzyqEF&&em9jjtGCPh0|b~}op72x(J z!ed9ujOji8G9bB%8+xVcJkOqCM>75CttWi%$H%4BX$z|4_Dp3&OqI1kl^(9&;W1VGXQKa0aNcD7`u1r5 zmtgVUrhdy2CE0$+_}_9or}I6oTR2j4u^bx~Ee0bUq3S8$b*q9yV+c;+N>jlJ3j+s# z-rydVWa`21i*iT|(a6&`1yA2wOuZEtPn_ClkX1zW>Pygbj~o~0ZTEQl7|5mwFT7T#)v`O$)5^+F2P3%3|yUbJ9KSjxG#Sj}6! zxzU2n;VHXsHDyn=l{;dpyc<-x$y(M1tAhUr DfTuUy delta 4728 zcmY+Gd0bRSw#RSP-TE$l@1<#I8X9aGng&Gn&4m$HGzu~bZZXDlQb=07lc=LRYaa>|Dqdqj7Z(%#GoAj7JzV~x}x2jIv zbL!lxYV6xgH+F5NJ0jSX6vY*8v=2us54!VHP^f2TioP=iac?0f?7o72gKp`$|3KXs zu3SYy`pv6o5L!iVrEC?oHFXQC8mbttQUdB_BOQqYY8ro@x*bqciRufeQlc&fR0&b1 z0;-6p!vR%D)b4iyG#8GniODA51Q>;(hOOh~@W@O)S5UEMob6WJ2fxT$)!z zQT;9oiRE`Okyw5g6Nu$^kw+}Qi($m_yBMnPK7eN-bX8}2@cRfwL2nQ?Z|T6XI&%nn z3}~I+b_6>SN`|ig!i#j1jIEG||f^t@lIEmG^r%a}kD7y?V+ z2t2`{pS&!L@i^2{WdWLL{7?CNd61MXE)?P@w_ydpm-~P{!knYm;586!N7=y57k|_* zED}@DAGm4}f!i&94tGDYM1uW)#b8)aCMLt34q7r)w?_q0C~pC5oGn`5WSKZ0>P-XU zApex8!ms9r8DK@3XoIcA^d1R*JD>xaRnUhm*$bNnw1u}cF>%?i&AQhL%BEE9yvoQW7gBl@M)w|UE*fp4Z~S}k2jgi z#OC0(mrWEJ(Ub-~-6Tu%mx^SrP26n#=!;?_r;lt9V-W1D`$fPb7WW4%)U~}rM&~pM z;C_jWes!_91WW}&f5x)wf6iFd3*tGUsi>{NyC7Nq1!=7pCqXmkPnHpcC1RX^Eru)+ z_rm1O{=6H((jeOXSsb=PJdI=cN0dUvQoO|pu|o=hF1VwH!+EAm5WkJu^fF>KJ| z+r&$JFe$}NMnlmG@||g`#d$ivTC^cUZCefbrs0d>PAO^lv{9^u z6GOEA02-Raf&OpW+aw-%DzesQ(dlat3fmTtDYBZ!L(_Vj1wLyQFTkc;E)wddkz%~$ zA^p6?sK@+=$kHtxVoQhA(jm8W7!P+SOc(kyu&L@oUS}6G!Sp$3NLAzJ8a~0l#V7GQ z^aA=Iy^CS#`}7U^b0&q^&%D8GWWHiPV?Jan*iyED9n217BUzPQ&#q$ka1#5N{f52D ze#D;UT)fCV;%;+&+(qsbcbMDFZR6H)E4ezZoGal*@ozwHs`@7EPE$Wc7hqMonr-s# z*p*dttLp2kD$!xDf-~vrGKPt!5@Ae++Q#_aUC2CF9dI%*Tg?IUO>_kE zpCALo=BSg=HCURX#``DPm7|_ugfdEj4I@=2Y{*p$pzkh{^{csRj0>#=)~znxl8npY zuv?v{k1bQpB36g#vlgfs2(G=Zx#63IY7#VcYVn|VYBsQL*Kz@BR23F&*NpJRcJi)p z2YKAAQ6u&49a<*Rht;a58!bXP^*xp`y^LN+&!&s%k#r^URL&~Llpdv9 zc~xmqUZOA4=jr3heEt~U!*?sQ_*eNBLxW+iQl#W7=}N2;$}OS~Z7I^ilepImCi+d| zWaDT9W6U<0LA#oc<*p*rSRo+q(6c3#Tag@Pek~`%Mh8@tAueu=}1tn)1N8={E1Xtl{^7HZxd4ilL zr^wNAu*^#Lr5n=c(mClp>7cYr+9I_|FG~xg=cGxx8wKUEFyTImihPUvhD!YT55{c< zo>Bs;1?=OnP2pWny=#YKMQS3$x05n>X&iRKZ|{=ddC&tiP~qQxs^10}Hc7QH{1@JG z`2ksA`vashUiV-p61@iNxu7sOcLW<-qYOcMl)=On!`*i+G0>P|j)vkDRuk0xH6#(V zEsUfem~MKTgUP1wBsgAdvcQemrg%6z$K-1kw|0eat(@DWbM9b#JWM%ioFp5>SA;2s ziF`a4%X~-QL_p2)9$&*@j@|q^Y~1f01H+@uMUZ;kBKx{Ya7PhkeeobOM)1DFyaS@oMvsEp+u;KA#F$f|@sKKGPc0NLvzno! z+NyvSYc7HA zy~OALvtpA5OoBNazDVewq$SmSI5lMGBDgryGz7X-ix~#)2rY~5IaJwVAAdG{bO)!NIz{w(Ws zNYApq5|$+tQ6G^5lgcpW88a}795u9n%E&OlwruMN=#GDBo3$1PNWe6V$y@JIBvudnk zp)V?&lvF09eHiY5C5x;Pt+#5dgm6NHeyhedz+`6z(aZ7M#-(zDRDuoS3}Kbw5By;? zhT6r4vtKdSydf^t$9imLUZ`moNnANiwEyCDs?k;#xH1V)(PZ;qRShNW0=eE~%z-E8 z$@S(>O}3f-J8*Bz011XS+hY6|*P3SAGQe+>GIJM@Ym3EW^Iuz9XRfgkA_-B10s71} z_V7G#d_q<~|8|HSrk}A}q4$hE(HCJW_ko=dt#cnZessm-ZInzEGH5nCl$nH&8+opp zDpYPt4}}*E)%;YHg|et-*)syR*_;kMrd}WSz!8M>Q$IO|p*U^=6&FkjWlD2Io^gt= zQl9r#N=em{hOyNPtEM#6)>lo|2mk0;OEcm&Dheh(aD>B*9~`&z$M+m|veAF*SO<q&732ncg9# zWJsxRNVRVWstFB-x^=D~xW3l)w~&m~rV`H#Z^tasG0WF+JXCAWll?A(oXRjvjDk`u6kIM+cnxqXTj51&Loz z@oWQDK4u$qxzUxtLoa8sKu?>?2;uE6|K`jnE74!z(gxS-LCr6Drjb&^CVSA)b&xNF)p-BoPJ?k_jn!cf96LLT8)gyDpI!U)1h!YIOM!WhC>LIGhMVLag(!UV!ZLLs4uFo{r1_%&fN zVG5yyP)e9em`0dRm_e9Hc$P4W@El<_VUE6Ut$TV2x<#&6?$!F*KKF+RZhzudboZC; z92Omh*RHvH;LaRPX`OoAO(-Lj>r<~+9QcgK%Bz(6jIt%ItU^zwi37S&IH2{#+7_ diff --git a/reports/current.md b/reports/current.md index c53e326..0b412db 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 20:27:32 UTC +Generated: 2026-02-28 20:43:05 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 20:27:32 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1858 | +| deferred | 1838 | | n_a | 24 | | stub | 1 | -| verified | 1768 | +| verified | 1788 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1781 | +| deferred | 1754 | | n_a | 241 | -| verified | 1235 | +| verified | 1262 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 20:27:32 UTC ## Overall Progress -**3302/6942 items complete (47.6%)** +**3349/6942 items complete (48.2%)**