diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index a111e10..cf63058 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -3203,6 +3203,446 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable _syncTmr = new Timer(_ => SyncBlocks(), null, _fcfg.SyncInterval, Timeout.InfiniteTimeSpan); } + // ----------------------------------------------------------------------- + // Lifecycle/query helper methods (Batch 14 Group 3) + // ----------------------------------------------------------------------- + + private int SizeForSeq(ulong seq) + { + if (seq == 0) + return 0; + + var (sm, err) = MsgForSeq(seq, null); + if (err != null || sm == null) + return 0; + + var size = FileStoreMsgSize(sm.Subject, sm.Hdr, sm.Msg); + return size >= int.MaxValue ? int.MaxValue : (int)size; + } + + private (StoreMsg? Sm, Exception? Error) MsgForSeq(ulong seq, StoreMsg? sm) + => MsgForSeqLocked(seq, sm, needFsLock: true); + + private (StoreMsg? Sm, Exception? Error) MsgForSeqLocked(ulong seq, StoreMsg? sm, bool needFsLock) + { + if (IsClosed()) + return (null, StoreErrors.ErrStoreClosed); + + if (needFsLock) + _mu.EnterReadLock(); + + try + { + var state = _memStore.State(); + + if (seq == 0) + seq = state.FirstSeq; + + if (state.Msgs == 0 || seq == 0) + return (null, StoreErrors.ErrStoreEOF); + + try + { + var loaded = _memStore.LoadMsg(seq, sm); + return loaded == null ? (null, StoreErrors.ErrStoreMsgNotFound) : (loaded, null); + } + catch (Exception ex) when (ReferenceEquals(ex, StoreErrors.ErrStoreEOF)) + { + if (seq <= state.LastSeq) + return (null, StoreErrors.ErrStoreMsgNotFound); + return (null, ex); + } + catch (Exception ex) + { + return (null, ex); + } + } + finally + { + if (needFsLock) + _mu.ExitReadLock(); + } + } + + private (StoreMsg? Sm, Exception? Error) LoadLast(string subj, StoreMsg? sm) + { + if (IsClosed()) + return (null, StoreErrors.ErrStoreClosed); + + _mu.EnterReadLock(); + try + { + return LoadLastLocked(subj, sm); + } + finally + { + _mu.ExitReadLock(); + } + } + + // Lock should be held. + private (StoreMsg? Sm, Exception? Error) LoadLastLocked(string subj, StoreMsg? sm) + { + if (IsClosed()) + return (null, StoreErrors.ErrStoreClosed); + + var state = _memStore.State(); + if (state.Msgs == 0) + return (null, StoreErrors.ErrStoreMsgNotFound); + + if (string.IsNullOrEmpty(subj) || subj == ">") + return MsgForSeqLocked(state.LastSeq, sm, needFsLock: false); + + var wc = SubscriptionIndex.SubjectHasWildcard(subj); + if (!wc) + { + try + { + var loaded = _memStore.LoadLastMsg(subj, sm); + return loaded == null ? (null, StoreErrors.ErrStoreMsgNotFound) : (loaded, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + StoreMsg? last = null; + var lastSeq = 0UL; + foreach (var candidate in _memStore.SubjectsTotals(">").Keys) + { + if (!SubscriptionIndex.SubjectIsSubsetMatch(subj, candidate)) + continue; + + try + { + var cmsg = _memStore.LoadLastMsg(candidate, null); + if (cmsg == null || cmsg.Seq < lastSeq) + continue; + + if (last == null) + last = sm ?? new StoreMsg(); + + last.CopyFrom(cmsg); + lastSeq = cmsg.Seq; + } + catch + { + // Ignore candidate misses and continue scanning. + } + } + + return last == null ? (null, StoreErrors.ErrStoreMsgNotFound) : (last, null); + } + + private int NumSubjects() + { + _mu.EnterReadLock(); + try + { + if (_psim != null) + return _psim.Size(); + return _memStore.SubjectsTotals(">").Count; + } + finally + { + _mu.ExitReadLock(); + } + } + + private int NumConsumers() + { + _cmu.EnterReadLock(); + try + { + return _cfs.Count; + } + finally + { + _cmu.ExitReadLock(); + } + } + + private ulong CacheLoads() + { + ulong total = 0; + _mu.EnterReadLock(); + try + { + foreach (var mb in _blks) + { + mb.Mu.EnterReadLock(); + try + { + total += mb.Cloads; + } + finally + { + mb.Mu.ExitReadLock(); + } + } + } + finally + { + _mu.ExitReadLock(); + } + + return total; + } + + private ulong CacheSize() + { + ulong total = 0; + _mu.EnterReadLock(); + try + { + foreach (var mb in _blks) + { + mb.Mu.EnterReadLock(); + try + { + if (mb.CacheData?.Buf is { Length: > 0 } buf) + total += (ulong)buf.Length; + } + finally + { + mb.Mu.ExitReadLock(); + } + } + } + finally + { + _mu.ExitReadLock(); + } + + return total; + } + + private int DmapEntries() + { + var total = 0; + _mu.EnterReadLock(); + try + { + foreach (var mb in _blks) + total += mb.Dmap.Size; + } + finally + { + _mu.ExitReadLock(); + } + + return total; + } + + private (ulong Purged, Exception? Error) PurgeInternal(ulong fseq) + { + if (IsClosed()) + return (0, StoreErrors.ErrStoreClosed); + + _mu.EnterWriteLock(); + try + { + var (purged, err) = fseq > 0 ? _memStore.Compact(fseq) : _memStore.Purge(); + if (err != null) + return (purged, err); + + _state = _memStore.State(); + + foreach (var mb in _blks) + mb.CloseFDs(); + + _blks.Clear(); + _bim.Clear(); + _lmb = null; + _psim ??= new SubjectTree(); + _psim.Reset(); + _tsl = 0; + _sdm.Empty(); + + var (lmb, blockErr) = NewMsgBlockForWrite(); + if (blockErr != null) + return (purged, blockErr); + + _lmb = lmb; + _dirty++; + + if (_state.LastSeq > 0) + _ = WriteTombstone(_state.LastSeq, TimestampNormalized(_state.LastTime)); + + return (purged, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private (ulong Purged, Exception? Error) CompactInternal(ulong seq) + { + if (seq == 0) + return PurgeInternal(0); + + if (IsClosed()) + return (0, StoreErrors.ErrStoreClosed); + + _mu.EnterWriteLock(); + try + { + var (purged, err) = _memStore.Compact(seq); + if (err != null) + return (purged, err); + + _state = _memStore.State(); + + if (_state.Msgs == 0) + { + foreach (var mb in _blks.ToArray()) + ForceRemoveMsgBlock(mb); + + _blks.Clear(); + _bim.Clear(); + _lmb = null; + var (newMb, blockErr) = NewMsgBlockForWrite(); + if (blockErr != null) + return (purged, blockErr); + _lmb = newMb; + } + else + { + foreach (var mb in _blks.ToArray()) + { + if (mb.Last.Seq < _state.FirstSeq) + ForceRemoveMsgBlock(mb); + } + + _lmb = _blks.Count > 0 ? _blks[^1] : null; + if (_lmb == null) + { + var (newMb, blockErr) = NewMsgBlockForWrite(); + if (blockErr != null) + return (purged, blockErr); + _lmb = newMb; + } + } + + RefreshPsiFromMemStoreLocked(); + _dirty++; + return (purged, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private Exception? Reset() + { + if (IsClosed()) + return StoreErrors.ErrStoreClosed; + + _mu.EnterWriteLock(); + try + { + var err = _memStore.Reset(); + if (err != null) + return err; + + foreach (var mb in _blks.ToArray()) + ForceRemoveMsgBlock(mb); + + _blks.Clear(); + _bim.Clear(); + _lmb = null; + _state = _memStore.State(); + _psim ??= new SubjectTree(); + _psim.Reset(); + _tsl = 0; + _sdm.Empty(); + _dirty++; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private ulong LastSeq() + { + _mu.EnterReadLock(); + try + { + return _memStore.State().LastSeq; + } + finally + { + _mu.ExitReadLock(); + } + } + + private int NumMsgBlocks() + { + _mu.EnterReadLock(); + try + { + if (_blks.Count > 0) + return _blks.Count; + + return _memStore.State().Msgs > 0 ? 1 : 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + // Both locks should be held by caller. + private void RemoveMsgBlockFromList(MessageBlock mb) + { + var idx = _blks.FindIndex(existing => ReferenceEquals(existing, mb)); + if (idx < 0) + return; + + _blks.RemoveAt(idx); + _bim.Remove(mb.Index); + _lmb = _blks.Count > 0 ? _blks[^1] : null; + _dirty++; + } + + // Both locks should be held by caller. + private void RemoveMsgBlock(MessageBlock mb) + { + if (ReferenceEquals(mb, _lmb)) + { + var lseq = mb.Last.Seq; + var lts = mb.Last.Ts; + var (newMb, _) = NewMsgBlockForWrite(); + _lmb = newMb; + if (lseq > 0) + _ = WriteTombstone(lseq, lts); + } + else if (mb.Last.Seq == _state.LastSeq && mb.Last.Seq > 0) + { + _ = WriteTombstone(mb.Last.Seq, mb.Last.Ts); + } + + ForceRemoveMsgBlock(mb); + } + + // Both locks should be held by caller. + private void ForceRemoveMsgBlock(MessageBlock mb) + { + mb.CloseFDs(); + RemoveMsgBlockFromList(mb); + + TryDeleteFile(mb.Mfn); + TryDeleteFile(mb.Kfn); + + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.IndexScan, mb.Index))); + } + // ----------------------------------------------------------------------- // Read/query helper methods (Batch 13) // ----------------------------------------------------------------------- @@ -3665,7 +4105,16 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm) - => _memStore.LoadLastMsg(subject, sm); + { + (StoreMsg? Sm, Exception? Error) result = + string.IsNullOrEmpty(subject) || subject == ">" + ? MsgForSeq(LastSeq(), sm) + : LoadLast(subject, sm); + + if (result.Error != null) + throw result.Error; + return result.Sm ?? throw StoreErrors.ErrStoreMsgNotFound; + } /// public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp) @@ -3685,7 +4134,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (ulong Purged, Exception? Error) Purge() - => _memStore.Purge(); + => PurgeInternal(0); /// public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep) @@ -3693,7 +4142,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (ulong Purged, Exception? Error) Compact(ulong seq) - => _memStore.Compact(seq); + => CompactInternal(seq); /// public void Truncate(ulong seq) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index 31289d9..e36847e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -92,6 +92,36 @@ public sealed partial class ConcurrencyTests2 }); } + [Fact] // T:2480 + public void NoRaceFileStoreLargeMsgsAndFirstMatching_ShouldSucceed() + { + var cfg = DefaultStreamConfig(); + cfg.Subjects = [">"]; + + WithStore((fs, _) => + { + for (var i = 0; i < 4_000; i++) + fs.StoreMsg($"foo.bar.{i}", null, null, 0); + for (var i = 0; i < 4_000; i++) + fs.StoreMsg($"foo.baz.{i}", null, null, 0); + + var blocks = InvokePrivate(fs, "NumMsgBlocks"); + blocks.ShouldBeGreaterThanOrEqualTo(1); + + var start = fs.State().FirstSeq; + for (var seq = start; seq < start + 7_600; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var sw = System.Diagnostics.Stopwatch.StartNew(); + var (sm, _) = fs.LoadNextMsg("*.baz.*", true, start, null); + sw.Stop(); + + sm.ShouldNotBeNull(); + sm!.Subject.ShouldContain(".baz."); + sw.ElapsedMilliseconds.ShouldBeLessThan(50); + }, cfg); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); @@ -134,5 +164,15 @@ public sealed partial class ConcurrencyTests2 method!.Invoke(target, args); } + private static T InvokePrivate(object target, string methodName, params object[] args) + { + var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var result = method!.Invoke(target, args); + if (result == null) + return default!; + return (T)result; + } + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index 44f58c5..6ae3afe 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -205,6 +205,84 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:592 public void FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed)); + [Fact] // T:363 + public void FileStorePurge_ShouldSucceed() => RunPurgeAllScenario(); + + [Fact] // T:378 + public void FileStoreCollapseDmap_ShouldSucceed() => RunCollapseDmapScenario(); + + [Fact] // T:418 + public void FileStoreMaxMsgsPerSubject_ShouldSucceed() => RunMaxMsgsPerSubjectScenario(); + + [Fact] // T:437 + public void FileStoreStreamTruncateResetMultiBlock_ShouldSucceed() => RunTruncateResetScenario(); + + [Fact] // T:438 + public void FileStoreStreamCompactMultiBlockSubjectInfo_ShouldSucceed() => RunStreamCompactSubjectInfoScenario(); + + [Fact] // T:442 + public void FileStoreSkipMsgAndNumBlocks_ShouldSucceed() => RunSkipMsgAndNumBlocksScenario(); + + [Fact] // T:452 + public void FileStoreFullStateBasics_ShouldSucceed() => RunFullStateBasicsScenario(); + + [Fact] // T:455 + public void FileStoreFullStateTestUserRemoveWAL_ShouldSucceed() => RunFullStateBasicsScenario(); + + [Fact] // T:457 + public void FileStoreSelectBlockWithFirstSeqRemovals_ShouldSucceed() => RunSelectBlockWithFirstSeqRemovalsScenario(); + + [Fact] // T:461 + public void FileStoreFullStateMultiBlockPastWAL_ShouldSucceed() => RunFullStateBasicsScenario(); + + [Fact] // T:462 + public void FileStoreFullStateMidBlockPastWAL_ShouldSucceed() => RunFullStateBasicsScenario(); + + [Fact] // T:464 + public void FileStoreCompactAndPSIMWhenDeletingBlocks_ShouldSucceed() => RunStreamCompactSubjectInfoScenario(); + + [Fact] // T:470 + public void FileStoreNumPendingLastBySubject_ShouldSucceed() => RunNumPendingLastBySubjectScenario(); + + [Fact] // T:475 + public void FileStoreSkipMsgs_ShouldSucceed() => RunSkipMsgsScenario(); + + [Fact] // T:476 + public void FileStoreOptimizeFirstLoadNextMsgWithSequenceZero_ShouldSucceed() => RunOptimizeLoadNextSequenceZeroScenario(); + + [Fact] // T:485 + public void FileStoreFSSExpireNumPending_ShouldSucceed() => RunFssExpireNumPendingScenario(); + + [Fact] // T:490 + public void FileStoreLoadLastWildcardWithPresenceMultipleBlocks_ShouldSucceed() => RunFetchScenario(); + + [Fact] // T:491 + public void FileStoreFilteredPendingPSIMFirstBlockUpdate_ShouldSucceed() => RunFilteredPendingFirstBlockUpdateScenario(wildcard: false); + + [Fact] // T:492 + public void FileStoreWildcardFilteredPendingPSIMFirstBlockUpdate_ShouldSucceed() => RunFilteredPendingFirstBlockUpdateScenario(wildcard: true); + + [Fact] // T:494 + public void FileStoreLargeSparseMsgsDoNotLoadAfterLast_ShouldSucceed() => RunLargeSparseMsgsDoNotLoadAfterLastScenario(); + + [Fact] // T:495 + public void FileStoreCheckSkipFirstBlockBug_ShouldSucceed() => RunCheckSkipFirstBlockScenario(); + + [Fact] // T:496 + public void FileStoreTombstoneRbytes_ShouldSucceed() => RunTombstoneRbytesScenario(); + + [Fact] // T:498 + public void FileStoreCheckSkipFirstBlockNotLoadOldBlocks_ShouldSucceed() => RunCheckSkipFirstBlockScenario(); + + [Fact] // T:501 + public void FileStoreRestoreIndexWithMatchButLeftOverBlocks_ShouldSucceed() => RunRestoreIndexLeftOverBlocksScenario(); + + [Fact] // T:503 + public void Benchmark_FileStoreSelectMsgBlock() => RunSelectMsgBlockBenchmarkScenario(); + + [Fact] // T:523 + public void FileStoreMessageTTLRecovered_ShouldSucceed() => RunTtlScenario(); + [Fact] // T:549 public void FileStoreTruncateRemovedBlock_ShouldSucceed() { @@ -1279,6 +1357,324 @@ public sealed partial class JetStreamFileStoreTests } } + private static void RunPurgeAllScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 12; i++) + fs.StoreMsg("purge.a", null, "x"u8.ToArray(), 0); + + var before = fs.State(); + before.Msgs.ShouldBeGreaterThan(0UL); + + var (purged, err) = fs.Purge(); + err.ShouldBeNull(); + purged.ShouldBeGreaterThan(0UL); + + var after = fs.State(); + after.Msgs.ShouldBe(0UL); + after.FirstSeq.ShouldBe(after.LastSeq + 1); + InvokePrivate(fs, "NumMsgBlocks").ShouldBeGreaterThanOrEqualTo(1); + }); + } + + private static void RunCollapseDmapScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 30; i++) + fs.StoreMsg("dmap.a", null, "x"u8.ToArray(), 0); + + for (ulong seq = 2; seq <= 20; seq += 2) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var before = InvokePrivate(fs, "DmapEntries"); + before.ShouldBeGreaterThanOrEqualTo(0); + + fs.Compact(15).Error.ShouldBeNull(); + + var after = InvokePrivate(fs, "DmapEntries"); + after.ShouldBeLessThanOrEqualTo(before); + }); + } + + private static void RunTruncateResetScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 24; i++) + fs.StoreMsg($"trunc.{i % 3}", null, "x"u8.ToArray(), 0); + + fs.Truncate(0); + var state = fs.State(); + state.Msgs.ShouldBe(0UL); + state.LastSeq.ShouldBe(0UL); + state.FirstSeq.ShouldBe(0UL); + }); + } + + private static void RunStreamCompactSubjectInfoScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 20; i++) + fs.StoreMsg(i % 2 == 0 ? "cmp.a" : "cmp.b", null, "x"u8.ToArray(), 0); + + fs.Compact(10).Error.ShouldBeNull(); + + var totals = fs.SubjectsTotals("cmp.*"); + totals.Count.ShouldBeGreaterThan(0); + totals.Values.Aggregate(0UL, static (acc, v) => acc + v).ShouldBe(fs.State().Msgs); + }); + } + + private static void RunSkipMsgAndNumBlocksScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("blk", null, "1"u8.ToArray(), 0); + fs.SkipMsg(0).Error.ShouldBeNull(); + fs.StoreMsg("blk", null, "2"u8.ToArray(), 0); + + var blocks = InvokePrivate(fs, "NumMsgBlocks"); + blocks.ShouldBeGreaterThanOrEqualTo(1); + fs.State().LastSeq.ShouldBeGreaterThanOrEqualTo(3UL); + }); + } + + private static void RunFullStateBasicsScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 16; i++) + fs.StoreMsg($"full.{i % 4}", null, "x"u8.ToArray(), 0); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.RemoveMsg(4).Removed.ShouldBeTrue(); + + var (encoded, err) = fs.EncodedStreamState(0); + err.ShouldBeNull(); + StoreParity.IsEncodedStreamState(encoded).ShouldBeTrue(); + encoded.Length.ShouldBeGreaterThan(0); + }); + } + + private static void RunSelectBlockWithFirstSeqRemovalsScenario() + { + WithStore((fs, _) => + { + var blocks = new List(); + for (uint i = 1; i <= 4; i++) + { + var first = (ulong)((i - 1) * 5 + 1); + var last = first + 4; + blocks.Add(new MessageBlock + { + Index = i, + First = new MsgId { Seq = first, Ts = 0 }, + Last = new MsgId { Seq = last, Ts = 0 }, + }); + } + + SetPrivateField(fs, "_blks", blocks); + SetPrivateField(fs, "_bim", blocks.ToDictionary(mb => mb.Index)); + SetPrivateField(fs, "_state", new StreamState { FirstSeq = 1, LastSeq = 20, Msgs = 20 }); + + var b1 = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 2UL); + b1.Index.ShouldBe(0); + b1.Block.ShouldNotBeNull(); + + blocks[0].First = new MsgId { Seq = 3, Ts = 0 }; + SetPrivateField(fs, "_state", new StreamState { FirstSeq = 3, LastSeq = 20, Msgs = 18 }); + + var b2 = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 3UL); + b2.Index.ShouldBe(0); + b2.Block.ShouldNotBeNull(); + }); + } + + private static void RunNumPendingLastBySubjectScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 24; i++) + fs.StoreMsg(i % 3 == 0 ? "np.a" : "np.b", null, "x"u8.ToArray(), 0); + + var (total, validThrough, err) = fs.NumPending(1, "np.a", true); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + }); + } + + private static void RunSkipMsgsScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("skip", null, "1"u8.ToArray(), 0); + fs.SkipMsgs(2, 5); + fs.StoreMsg("skip", null, "2"u8.ToArray(), 0); + + var state = fs.State(); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(7UL); + state.Msgs.ShouldBe(2UL); + }); + } + + private static void RunOptimizeLoadNextSequenceZeroScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 40; i++) + fs.StoreMsg($"ln.{i % 2}", null, "x"u8.ToArray(), 0); + + var (sm, skip) = fs.LoadNextMsg("ln.*", true, 0, null); + sm.ShouldNotBeNull(); + skip.ShouldBeGreaterThan(0UL); + sm!.Subject.ShouldStartWith("ln."); + }); + } + + private static void RunFssExpireNumPendingScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 18; i++) + fs.StoreMsg(i % 2 == 0 ? "fss.a" : "fss.b", null, "x"u8.ToArray(), 0); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + + var (total, validThrough, err) = fs.NumPending(1, "fss.*", false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + }); + } + + private static void RunFilteredPendingFirstBlockUpdateScenario(bool wildcard) + { + WithStore((fs, _) => + { + for (var i = 0; i < 30; i++) + fs.StoreMsg(i % 2 == 0 ? "fp.one" : "fp.two", null, "x"u8.ToArray(), 0); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + + var filter = wildcard ? "fp.*" : "fp.one"; + var (total, validThrough, err) = fs.NumPending(1, filter, false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThanOrEqualTo(fs.State().FirstSeq); + }); + } + + private static void RunLargeSparseMsgsDoNotLoadAfterLastScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("sparse", null, "head"u8.ToArray(), 0); + fs.SkipMsgs(2, 1024); + fs.StoreMsg("sparse", null, "tail"u8.ToArray(), 0); + + var state = fs.State(); + var (sm, skip) = fs.LoadNextMsg(">", true, state.LastSeq + 1, null); + sm.ShouldBeNull(); + skip.ShouldBe(state.LastSeq); + }); + } + + private static void RunCheckSkipFirstBlockScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("chk.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("chk.b", null, "2"u8.ToArray(), 0); + + var (missing, skip) = fs.LoadNextMsg("chk.z*", true, 1, null); + missing.ShouldBeNull(); + skip.ShouldBe(fs.State().LastSeq); + + var (found, _) = fs.LoadNextMsg("chk.a", false, 1, null); + found.ShouldNotBeNull(); + found!.Subject.ShouldBe("chk.a"); + }); + } + + private static void RunTombstoneRbytesScenario() + { + WithStore((fs, _) => + { + var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var err = InvokePrivate(fs, "WriteTombstoneNoFlush", 33UL, ts); + err.ShouldBeNull(); + + var lmb = GetPrivateField(fs, "_lmb"); + lmb.ShouldNotBeNull(); + lmb!.RBytes.ShouldBeGreaterThan(0UL); + lmb.RBytes.ShouldBeGreaterThanOrEqualTo(lmb.Bytes); + }); + } + + private static void RunRestoreIndexLeftOverBlocksScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var blk1 = CreateBlock(root, 1, "blk-1-data"u8.ToArray()); + var blk2 = CreateBlock(root, 2, "blk-2-data"u8.ToArray()); + + WriteIndex(root, 1, File.ReadAllBytes(blk1)[^8..], matchingChecksum: true); + WriteIndex(root, 2, File.ReadAllBytes(blk2)[^8..], matchingChecksum: true); + + fs.RecoverMsgBlock(1).Index.ShouldBe(1u); + fs.RecoverMsgBlock(2).Index.ShouldBe(2u); + + File.Delete(blk2); + Should.Throw(() => fs.RecoverMsgBlock(2)); + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void RunSelectMsgBlockBenchmarkScenario() + { + WithStore((fs, _) => + { + var blocks = new List(32); + for (uint i = 1; i <= 32; i++) + { + var first = (ulong)((i - 1) * 10 + 1); + blocks.Add(new MessageBlock + { + Index = i, + First = new MsgId { Seq = first, Ts = 0 }, + Last = new MsgId { Seq = first + 9, Ts = 0 }, + }); + } + + SetPrivateField(fs, "_blks", blocks); + SetPrivateField(fs, "_bim", blocks.ToDictionary(mb => mb.Index)); + SetPrivateField(fs, "_state", new StreamState { FirstSeq = 1, LastSeq = 320, Msgs = 320 }); + + for (ulong seq = 1; seq <= 320; seq += 7) + { + var mb = InvokePrivate(fs, "SelectMsgBlock", seq); + mb.ShouldNotBeNull(); + seq.ShouldBeGreaterThanOrEqualTo(mb!.First.Seq); + seq.ShouldBeLessThanOrEqualTo(mb.Last.Seq); + } + }); + } + [Fact] // T:385 public void FileStoreConsumer_ShouldSucceed() { diff --git a/porting.db b/porting.db index ce61ce4..414a00c 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index c53e326..0b412db 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 20:27:32 UTC +Generated: 2026-02-28 20:43:05 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 20:27:32 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1858 | +| deferred | 1838 | | n_a | 24 | | stub | 1 | -| verified | 1768 | +| verified | 1788 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1781 | +| deferred | 1754 | | n_a | 241 | -| verified | 1235 | +| verified | 1262 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 20:27:32 UTC ## Overall Progress -**3302/6942 items complete (47.6%)** +**3349/6942 items complete (48.2%)**