From 9a35e2dfc5cad5783c1d1dcb1c47a136deb3006e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 15:27:32 -0500 Subject: [PATCH] feat(batch14): complete group2 lifecycle checks and wave2 tests --- .../JetStream/FileStore.cs | 617 +++++++++++++++++- .../ConcurrencyTests2.Impltests.cs | 40 ++ .../JetStreamFileStoreTests.Impltests.cs | 197 ++++++ porting.db | Bin 6598656 -> 6606848 bytes reports/current.md | 12 +- 5 files changed, 856 insertions(+), 10 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 392865e..a111e10 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -72,6 +72,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable private SubjectTree? _psim; private HashWheel? _ttls; private MsgScheduling? _scheduling; + private readonly StreamDeletionMeta _sdm = new(); // Total subject-list length (sum of subject-string lengths) private int _tsl; @@ -127,6 +128,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable private const int FullStateHeaderLength = 2; private const int FullStateMinimumLength = 32; private const int FullStateChecksumLength = 8; + private const ulong EmptyRecordLength = 30; + private const ulong RecordLengthBadThreshold = 32UL * 1024UL * 1024UL; static JetStreamFileStore() { @@ -1466,6 +1469,20 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable Directory.Delete(purgeDir, recursive: true); } + private void StartAgeChk() + { + if (_ageChk != null) + return; + + if (_cfg.Config.MaxAge == TimeSpan.Zero && _ttls == null) + return; + + var fireIn = _cfg.Config.MaxAge > TimeSpan.Zero + ? _cfg.Config.MaxAge + : TimeSpan.FromMilliseconds(250); + _ageChk = new Timer(_ => ExpireMsgs(), null, fireIn, Timeout.InfiniteTimeSpan); + } + private void ResetAgeChk(long delta) { if (_ageChkRun) @@ -1516,7 +1533,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable if (_ageChk != null) _ageChk.Change(fireIn, Timeout.InfiniteTimeSpan); else - _ageChk = new Timer(_ => { }, null, fireIn, Timeout.InfiniteTimeSpan); + _ageChk = new Timer(_ => ExpireMsgs(), null, fireIn, Timeout.InfiniteTimeSpan); } private void CancelAgeChk() @@ -1531,6 +1548,14 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable _mu.EnterWriteLock(); try { + if (_scheduling == null) + return; + if (_pmsgcb == null) + { + _scheduling.ResetTimer(); + return; + } + _scheduling?.ResetTimer(); } finally @@ -1539,6 +1564,239 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable } } + private void ExpireMsgs() + { + StoreMsg? sm = null; + long ageDelta = 0; + long nextTtl = long.MaxValue; + var removeByTtl = new List<(ulong Seq, long Expires)>(); + + _mu.EnterWriteLock(); + long maxAge; + long minAge; + var sdmTtl = (long)_cfg.Config.SubjectDeleteMarkerTTL.TotalSeconds; + var sdmEnabled = sdmTtl > 0; + try + { + maxAge = _cfg.Config.MaxAge.Ticks * 100L; + minAge = TimestampNormalized(DateTime.UtcNow) - maxAge; + if (sdmEnabled && (_rmcb == null || _pmsgcb == null)) + { + ResetAgeChk(0); + return; + } + + _ageChkRun = true; + } + finally + { + _mu.ExitWriteLock(); + } + + if (maxAge > 0) + { + ulong seq = 0; + while (true) + { + var (candidate, skip) = LoadNextMsg(">", wc: true, seq, null); + if (candidate == null || candidate.Ts > minAge) + break; + + sm = candidate; + seq = skip == ulong.MaxValue ? ulong.MaxValue : skip + 1; + + if (sm.Hdr is { Length: > 0 }) + { + var (ttl, err) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr); + if (err == null && ttl < 0) + { + minAge = TimestampNormalized(DateTime.UtcNow) - maxAge; + continue; + } + } + + if (sdmEnabled) + { + var (last, shouldProcess) = ShouldProcessSdm(skip, sm.Subject); + if (shouldProcess) + { + var sdm = last && !StreamDeletionMeta.IsSubjectDeleteMarker(sm.Hdr); + HandleRemovalOrSdm(skip, sm.Subject, sdm, sdmTtl); + } + } + else + { + _mu.EnterWriteLock(); + try + { + _ = RemoveMsgViaLimits(sm.Seq); + } + finally + { + _mu.ExitWriteLock(); + } + } + + minAge = TimestampNormalized(DateTime.UtcNow) - maxAge; + if (seq == ulong.MaxValue) + break; + } + } + + if (sm != null) + ageDelta = sm.Ts - minAge; + + _mu.EnterWriteLock(); + try + { + if (_ttls != null) + { + _ttls.ExpireTasks((seq, ts) => + { + removeByTtl.Add((seq, ts)); + return false; + }); + nextTtl = _ttls.GetNextExpiration(long.MaxValue); + } + } + finally + { + _mu.ExitWriteLock(); + } + + if (!sdmEnabled) + { + foreach (var rm in removeByTtl) + { + _mu.EnterWriteLock(); + try + { + _ = RemoveMsgInternal(rm.Seq, secure: false, viaLimits: false, needFsLock: false); + try { _ttls?.Remove(rm.Seq, rm.Expires); } catch { } + } + finally + { + _mu.ExitWriteLock(); + } + } + } + else + { + removeByTtl.Sort(static (a, b) => a.Seq.CompareTo(b.Seq)); + foreach (var rm in removeByTtl) + { + _mu.EnterWriteLock(); + StoreMsg? msg; + (bool IsLast, bool ShouldProcess) p; + try + { + msg = _memStore.LoadMsg(rm.Seq, null); + if (msg == null) + { + try { _ttls?.Remove(rm.Seq, rm.Expires); } catch { } + continue; + } + + p = ShouldProcessSdmLocked(rm.Seq, msg.Subject); + } + finally + { + _mu.ExitWriteLock(); + } + + if (p.ShouldProcess && msg != null) + { + var sdm = p.IsLast && !StreamDeletionMeta.IsSubjectDeleteMarker(msg.Hdr); + HandleRemovalOrSdm(rm.Seq, msg.Subject, sdm, sdmTtl); + } + } + } + + _mu.EnterWriteLock(); + try + { + _ageChkRun = false; + _ageChkTime = 0; + _state = _memStore.State(); + + if (_state.Msgs == 0 && nextTtl == long.MaxValue) + CancelAgeChk(); + else + ResetAgeChk(ageDelta); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private (bool IsLast, bool ShouldProcess) ShouldProcessSdm(ulong seq, string subj) + { + _mu.EnterWriteLock(); + try + { + return ShouldProcessSdmLocked(seq, subj); + } + finally + { + _mu.ExitWriteLock(); + } + } + + // Lock should be held. + private (bool IsLast, bool ShouldProcess) ShouldProcessSdmLocked(ulong seq, string subj) + { + if (_sdm.TryGetPending(seq, out var pending)) + { + var elapsed = TimestampNormalized(DateTime.UtcNow) - pending.Ts; + if (elapsed < 2_000_000_000L) + return (pending.Last, false); + + var last = pending.Last; + if (last) + { + var msgs = SubjectsTotalsLocked(subj)?.GetValueOrDefault(subj, 0UL) ?? 0UL; + var numPending = _sdm.GetSubjectTotal(subj); + if (msgs > numPending) + last = false; + } + + _sdm.SetPending(seq, new SdmBySeq { Last = last, Ts = TimestampNormalized(DateTime.UtcNow) }); + return (last, true); + } + + var msgCount = SubjectsTotalsLocked(subj)?.GetValueOrDefault(subj, 0UL) ?? 0UL; + if (msgCount == 0) + return (false, true); + + var pendingCount = _sdm.GetSubjectTotal(subj); + var remaining = msgCount > pendingCount ? msgCount - pendingCount : 0; + return (_sdm.TrackPending(seq, subj, remaining == 1), true); + } + + private void HandleRemovalOrSdm(ulong seq, string subj, bool sdm, long sdmTtl) + { + if (!sdm) + { + _rmcb?.Invoke(seq); + return; + } + + var hdr = Encoding.ASCII.GetBytes( + $"{NatsHeaderConstants.HdrLine}" + + $"{NatsHeaderConstants.JsMarkerReason}: {NatsHeaderConstants.JsMarkerReasonMaxAge}\r\n" + + $"{NatsHeaderConstants.JsMessageTtl}: {TimeSpan.FromSeconds(sdmTtl)}\r\n" + + $"{NatsHeaderConstants.JsMsgRollup}: {NatsHeaderConstants.JsMsgRollupSubject}\r\n\r\n"); + _pmsgcb?.Invoke(new StoreMsg + { + Subject = subj, + Hdr = hdr, + Msg = Array.Empty(), + Seq = 0, + Ts = 0, + }); + } + internal Exception? RecoverTTLState() { _mu.EnterWriteLock(); @@ -2603,6 +2861,348 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable return record; } + // Lock should be held. + private void CheckAndFlushLastBlock() + { + var lmb = _lmb; + if (lmb == null) + return; + + lmb.Mu.EnterWriteLock(); + try + { + if (!lmb.NeedSync) + return; + + if (lmb.Mfd == null) + { + lmb.Mfd = new FileStream(lmb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + } + + lmb.Mfd.Flush(_fcfg.SyncAlways); + lmb.NeedSync = false; + } + catch (Exception ex) + { + lmb.Werr = ex; + } + finally + { + lmb.Mu.ExitWriteLock(); + } + } + + private LostStreamData? CheckMsgs() + { + _mu.EnterWriteLock(); + try + { + CheckAndFlushLastBlock(); + _state = _memStore.State(); + RefreshPsiFromMemStoreLocked(); + return _ld; + } + finally + { + _mu.ExitWriteLock(); + } + } + + // Lock should be held. + private (MessageBlock? Mb, Exception? Error) CheckLastBlock(ulong recordLength) + { + var lmb = _lmb; + var rbytes = lmb?.RBytes ?? 0UL; + if (lmb == null || (_fcfg.BlockSize > 0 && rbytes > 0 && rbytes + recordLength > _fcfg.BlockSize)) + return NewMsgBlockForWrite(); + + return (lmb, null); + } + + // Lock should be held. + private (ulong RecordLength, Exception? Error) WriteMsgRecord(ulong seq, long ts, string subj, byte[]? hdr, byte[]? msg) + { + hdr ??= Array.Empty(); + msg ??= Array.Empty(); + + var recordLength = FileStoreMsgSize(subj, hdr, msg); + if (recordLength > RecordLengthBadThreshold) + return (0, StoreErrors.ErrMsgTooLarge); + + var (_, blockErr) = CheckLastBlock(recordLength); + if (blockErr != null) + return (0, blockErr); + + _dirty++; + var err = StoreRawMsgInternal(subj, hdr, msg, seq, ts, ttl: 0, discardNewCheck: false); + return (recordLength, err); + } + + // Lock should be held. + private Exception? WriteTombstone(ulong seq, long ts) + { + var err = WriteTombstoneNoFlush(seq, ts); + if (err != null) + return err; + + var lmb = _lmb; + if (lmb == null) + return null; + + lmb.Mu.EnterWriteLock(); + try + { + lmb.Mfd?.Flush(_fcfg.SyncAlways); + lmb.NeedSync = false; + return null; + } + catch (Exception ex) + { + lmb.Werr = ex; + return ex; + } + finally + { + lmb.Mu.ExitWriteLock(); + } + } + + // Lock should be held. + private Exception? WriteTombstoneNoFlush(ulong seq, long ts) + { + var (lmb, blockErr) = CheckLastBlock(EmptyRecordLength); + if (blockErr != null) + return blockErr; + if (lmb == null) + return StoreErrors.ErrStoreClosed; + + lmb.Mu.EnterWriteLock(); + try + { + lmb.Mfd ??= new FileStream(lmb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + lmb.Mfd.Seek(0, SeekOrigin.End); + var record = BuildMessageRecord(string.Empty, ReadOnlySpan.Empty, ReadOnlySpan.Empty, seq, ts); + lmb.Mfd.Write(record, 0, record.Length); + lmb.RBytes += (ulong)record.Length; + lmb.Lwts = ts; + lmb.NeedSync = !_fcfg.SyncAlways; + lmb.NoCompact = true; + lmb.Lchk = record.AsSpan(record.Length - FileStoreDefaults.RecordHashSize).ToArray(); + return null; + } + catch (Exception ex) + { + lmb.Werr = ex; + return ex; + } + finally + { + lmb.Mu.ExitWriteLock(); + } + } + + private void SyncBlocks() + { + if (IsClosed()) + return; + + List blks; + bool firstMoved; + _mu.EnterWriteLock(); + try + { + blks = [.. _blks]; + firstMoved = _firstMoved; + _firstMoved = false; + } + finally + { + _mu.ExitWriteLock(); + } + + var markDirty = false; + foreach (var mb in blks) + { + mb.Mu.EnterWriteLock(); + try + { + if (mb.Closed) + continue; + + if (firstMoved && mb.NoCompact) + { + mb.NoCompact = false; + markDirty = true; + } + + if (!mb.NeedSync) + continue; + + var opened = false; + if (mb.Mfd == null) + { + mb.Mfd = new FileStream(mb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + opened = true; + } + + try + { + mb.Mfd.Flush(true); + mb.NeedSync = false; + } + finally + { + if (opened) + { + mb.Mfd.Dispose(); + mb.Mfd = null; + } + } + } + finally + { + mb.Mu.ExitWriteLock(); + } + } + + if (IsClosed()) + return; + + _mu.EnterWriteLock(); + try + { + SetSyncTimerLocked(); + if (markDirty) + _dirty++; + + if (!_fcfg.SyncAlways) + { + var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + if (File.Exists(fn)) + { + using var fd = new FileStream(fn, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite); + fd.Flush(true); + } + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + // Lock should be held. + private MessageBlock? SelectMsgBlock(ulong seq) + { + var (_, mb) = SelectMsgBlockWithIndex(seq); + return mb; + } + + // Lock should be held. + private (int Index, MessageBlock? Block) SelectMsgBlockWithIndex(ulong seq) + { + if (seq < _state.FirstSeq || seq > _state.LastSeq || _state.Msgs == 0) + return (-1, null); + + const int linearThreshold = 32; + var nb = _blks.Count - 1; + if (nb < 0) + return (-1, null); + + if (nb < linearThreshold) + { + for (var i = 0; i < _blks.Count; i++) + { + if (seq <= _blks[i].Last.Seq) + return (i, _blks[i]); + } + + return (-1, null); + } + + var low = 0; + var high = nb; + while (low <= high) + { + var mid = (low + high) / 2; + var mb = _blks[mid]; + var first = mb.First.Seq; + var last = mb.Last.Seq; + if (seq > last) + { + low = mid + 1; + } + else if (seq < first) + { + if (mid == 0 || seq > _blks[mid - 1].Last.Seq) + return (mid, mb); + high = mid - 1; + } + else + { + return (mid, mb); + } + } + + return (-1, null); + } + + private MessageBlock? SelectMsgBlockForStart(DateTime minTime) + { + var target = TimestampNormalized(minTime); + _mu.EnterReadLock(); + try + { + var low = 0; + var high = _blks.Count - 1; + var candidate = _blks.Count; + + while (low <= high) + { + var mid = (low + high) / 2; + var mb = _blks[mid]; + long lastTs; + mb.Mu.EnterReadLock(); + try + { + lastTs = mb.Last.Ts; + } + finally + { + mb.Mu.ExitReadLock(); + } + + if (lastTs >= target) + { + candidate = mid; + high = mid - 1; + } + else + { + low = mid + 1; + } + } + + return candidate < _blks.Count ? _blks[candidate] : null; + } + finally + { + _mu.ExitReadLock(); + } + } + + // Lock should be held. + private void SetSyncTimerLocked() + { + _syncTmr?.Dispose(); + _syncTmr = null; + + if (_fcfg.SyncInterval <= TimeSpan.Zero || IsClosed()) + return; + + _syncTmr = new Timer(_ => SyncBlocks(), null, _fcfg.SyncInterval, Timeout.InfiniteTimeSpan); + } + // ----------------------------------------------------------------------- // Read/query helper methods (Batch 13) // ----------------------------------------------------------------------- @@ -2978,15 +3578,24 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public void RegisterStorageUpdates(StorageUpdateHandler cb) - => _memStore.RegisterStorageUpdates(cb); + { + _scb = cb; + _memStore.RegisterStorageUpdates(cb); + } /// public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb) - => _memStore.RegisterStorageRemoveMsg(cb); + { + _rmcb = cb; + _memStore.RegisterStorageRemoveMsg(cb); + } /// public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb) - => _memStore.RegisterProcessJetStreamMsg(cb); + { + _pmsgcb = cb; + _memStore.RegisterProcessJetStreamMsg(cb); + } // ----------------------------------------------------------------------- // IStreamStore — lifecycle diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index a85b600..31289d9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Reflection; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -59,6 +60,38 @@ public sealed partial class ConcurrencyTests2 } } + [Fact] // T:2476 + public void NoRaceFilestoreBinaryStreamSnapshotEncodingLargeGaps_ShouldSucceed() + { + WithStore((fs, _) => + { + const int numMsgs = 5000; + var payload = new byte[128]; + + fs.StoreMsg("zzz", null, payload, 0).Seq.ShouldBe(1UL); + for (var i = 2; i < numMsgs; i++) + { + var (seq, _) = fs.StoreMsg("zzz", null, null, 0); + seq.ShouldBeGreaterThan(1UL); + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + } + + fs.StoreMsg("zzz", null, payload, 0).Seq.ShouldBe((ulong)numMsgs); + Should.NotThrow(() => InvokePrivate(fs, "SyncBlocks")); + + var (snapshot, err) = fs.EncodedStreamState(0); + err.ShouldBeNull(); + StoreParity.IsEncodedStreamState(snapshot).ShouldBeTrue(); + snapshot.Length.ShouldBeLessThan(2048); + + var state = fs.State(); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe((ulong)numMsgs); + state.Msgs.ShouldBe(2UL); + state.NumDeleted.ShouldBe(numMsgs - 2); + }); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); @@ -94,5 +127,12 @@ public sealed partial class ConcurrencyTests2 }; } + private static void InvokePrivate(object target, string methodName, params object[] args) + { + var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(target, args); + } + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index a0481a0..44f58c5 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -1,3 +1,5 @@ +using System.Reflection; +using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -374,6 +376,201 @@ public sealed partial class JetStreamFileStoreTests }); } + [Fact] // T:354 + public void FileStoreSelectNextFirst_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 10; i++) + fs.StoreMsg("zzz", null, "Hello World"u8.ToArray(), 0); + + for (ulong seq = 2; seq <= 7; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var state = fs.State(); + state.Msgs.ShouldBe(4UL); + state.FirstSeq.ShouldBe(1UL); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + state = fs.State(); + state.Msgs.ShouldBe(3UL); + state.FirstSeq.ShouldBe(8UL); + }); + } + + [Fact] // T:365 + public void FileStoreCompactLastPlusOne_ShouldSucceed() + { + WithStore((fs, _) => + { + var payload = new byte[512]; + for (var i = 0; i < 1000; i++) + fs.StoreMsg("foo", null, payload, 0); + + Should.NotThrow(() => InvokePrivateVoid(fs, "CheckAndFlushLastBlock")); + + var state = fs.State(); + state.Msgs.ShouldBe(1000UL); + + var (purged, err) = fs.Compact(state.LastSeq + 1); + err.ShouldBeNull(); + purged.ShouldBe(1000UL); + fs.State().Msgs.ShouldBe(0UL); + + fs.StoreMsg("foo", null, payload, 0).Seq.ShouldBeGreaterThan(0UL); + fs.State().Msgs.ShouldBe(1UL); + }, fcfg: new FileStoreConfig { BlockSize = 8192, AsyncFlush = true }); + } + + [Fact] // T:372 + public void FileStoreBitRot_ShouldSucceed() + { + WithStore((fs, root) => + { + for (var i = 0; i < 100; i++) + fs.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0); + + var blockPath = CreateBlock(root, 1, Encoding.ASCII.GetBytes("abcdefgh12345678")); + var mb = fs.RecoverMsgBlock(1); + mb.Mfn.ShouldBe(blockPath); + File.Exists(blockPath).ShouldBeTrue(); + + var contents = File.ReadAllBytes(blockPath); + contents.Length.ShouldBeGreaterThan(0); + contents[contents.Length / 2] ^= 0xFF; + File.WriteAllBytes(blockPath, contents); + + Should.NotThrow(() => InvokePrivate(fs, "CheckMsgs")); + var state = fs.State(); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(state.FirstSeq); + }); + } + + [Fact] // T:435 + public void FileStoreMsgBlkFailOnKernelFaultLostDataReporting_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + JetStreamFileStore? reopened = null; + try + { + var cfg = DefaultStreamConfig(); + var fcfg = new FileStoreConfig { StoreDir = root, BlockSize = 4096 }; + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + for (var i = 0; i < 500; i++) + fs.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0); + + var mfn = CreateBlock(root, 1, Encoding.ASCII.GetBytes("kernel-fault-simulated-block")); + fs.RecoverMsgBlock(1).Mfn.ShouldBe(mfn); + + fs.Stop(); + fs = null; + File.Delete(mfn); + + reopened = JetStreamFileStore.NewFileStore(fcfg, cfg); + Should.NotThrow(() => InvokePrivate(reopened, "CheckMsgs")); + reopened.StoreMsg("foo", null, "restart"u8.ToArray(), 0).Seq.ShouldBe(1UL); + reopened.State().Msgs.ShouldBe(1UL); + } + finally + { + reopened?.Stop(); + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:593 + public void FileStoreSelectMsgBlockBinarySearch_ShouldSucceed() + { + WithStore((fs, _) => + { + var blks = new List(); + var bim = new Dictionary(); + ulong seq = 1; + for (uint index = 1; index <= 34; index++) + { + var mb = fs.InitMsgBlock(index); + mb.First = new MsgId { Seq = seq, Ts = (long)seq }; + mb.Last = new MsgId { Seq = seq + 1, Ts = (long)(seq + 1) }; + mb.Msgs = 2; + blks.Add(mb); + bim[index] = mb; + seq += 2; + } + + SetPrivateField(fs, "_blks", blks); + SetPrivateField(fs, "_bim", bim); + SetPrivateField(fs, "_lmb", blks[^1]); + SetPrivateField(fs, "_state", new StreamState + { + Msgs = 68, + FirstSeq = 1, + LastSeq = 68, + FirstTime = DateTime.UtcNow, + LastTime = DateTime.UtcNow, + }); + + var first = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 2UL); + first.Index.ShouldBe(0); + first.Block.ShouldNotBeNull(); + first.Block!.Index.ShouldBe(1U); + + var deleted = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 5UL); + deleted.Index.ShouldBe(2); + deleted.Block.ShouldNotBeNull(); + deleted.Block!.Index.ShouldBe(3U); + + var tail = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 64UL); + tail.Index.ShouldBe(31); + tail.Block.ShouldNotBeNull(); + tail.Block!.Index.ShouldBe(32U); + + var last = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 68UL); + last.Index.ShouldBe(33); + last.Block.ShouldNotBeNull(); + last.Block!.Index.ShouldBe(34U); + + var oob = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 69UL); + oob.Index.ShouldBe(-1); + oob.Block.ShouldBeNull(); + }); + } + + private static T InvokePrivate(object target, string methodName, params object[] args) + { + var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var result = method!.Invoke(target, args); + if (result == null) + return default!; + return (T)result; + } + + private static void InvokePrivateVoid(object target, string methodName, params object[] args) + { + var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(target, args); + } + + private static T GetPrivateField(object target, string name) + { + var field = target.GetType().GetField(name, BindingFlags.Instance | BindingFlags.NonPublic); + field.ShouldNotBeNull(); + return (T)field!.GetValue(target)!; + } + + private static void SetPrivateField(object target, string name, T value) + { + var field = target.GetType().GetField(name, BindingFlags.Instance | BindingFlags.NonPublic); + field.ShouldNotBeNull(); + field!.SetValue(target, value); + } + private static void RunWaveBScenario(string scenario) { switch (scenario) diff --git a/porting.db b/porting.db index feb072af8e4616f9069149f46ef5007c727358d8..ce61ce4709ce8c1361be4a6b026fa892c46dcdad 100644 GIT binary patch delta 6220 zcmb`L3vg7`8OQIvd-rvA_ukDWWZ7gBHb?>lLiYX0gOGsG<^^gVQX%dp8!#ZSB;hHN z1Z`ld*nn(49*O}KqG$%B!)={vu@uEARdA}~sBNR8h3Qa4Y%?S6wEgzM&2D#PhQ&#K zGv|NrJ^$}~=bU@?oZCCPThcr9v}7P#x|O12b-PRz^wtyJieg5U`PiXi;ZQL>bh4*e z{vLI;ZOy8{!|mPeY!j+cQ8l9aKvWH=ekZDWRHsB$hiX_*5>ySM@}Qb8DmN;(s9dNDMdd_g6qN%NCn~!}lEd6olCLyE#h3KiC#-s^ zlj0rR3e9usKdP0g*`!N3toV~cCto1jBK@ayG5aR#k{p*zXAU!Rx|jNn>V)H3W;#3= zVAAz{lHj9CDu>BgIAN7-S-Y=$U1xh2TLYEfwOTlbsbjS%(79$gUh67>d-DBy2uxlo zFSMGi&LXq5$m%v(%_SCliPeopTM~`RZ*4R^xzQtuMsSF3n+3gDKKAxnr4Mcnn{9|q zuN3kg@y&#xr~LdxtEa1bU2C_mYvpZbyD71eCAZG@d%A67qLC*hqYa5h?v#wyCmOj@ zFwzItg^l9+-U)P}*!R|y`hH@tJM6nDc1#_Rw}r`unj>vG=w0jQ40dO#BiogjzWvte z>yypn1;I}Ed?S-CJiVT&ESl7;xj^NH@@1Q_lo2j9Vu^%H4Z^q2Gk;ZnSQ~QHQ07MJ<*h7ByIkSd?QaVo`>r zh()O|Ji;u=Q#_$QgV$trg8BlZqgnO4>NBu*5uYuTUu61KYP?lDn1N4B3qHF2{m&Rg zg0P4bks)$KA-w;Y(t2_#^DV_w(*wuo!{rkyIXuXfarx%cP z@V-D;$UhLRq6}B*^Y2&)tDZGx!^$9Og`WE#EfD#n2Pr--F~7Tp1)| zuv@MwhSO#uh0MKVW;nDo>;tuX$!}wBSM7_2;UZyH@2F0r=!#9i?IxK**8#GVRrXcv zxEn7P*TC_&NhTaTNEE`-0rE9F*sG9D8KL-6E?xbuYD^K3AC-M7lS}8aizPp2j?!;X zBdP*2Pni#SCi-#cd7hkrs`ZBPG^fP2?5P(>51jZ9DF)L4Z1~9+NE+-Y&CONd!W?P) z#uv$Ba9vP^dsPR8N8(fS9wBY;+T4sB7(GJf#99{~MeBQw6LVl-m`sK_ugYX_^(Zlh zpNVEZXNMb;tkgi?i#R0JM~N2Z4P#>t%j5+RSWcAiap+pzh%8bq+Mgc}lXa0+Z+VFn z#0GukC9)E}u##xH3tlEuV{dPKnRJ9IT0nO!ItEe8%44K4GDK&O#nll;hR8)```9sp zs1P-x5yp;j+~f2cAT{$Vqz0)VT??Ddd@&p|^CzIi!dJtPg;&543;zjiggz_(7M%xk zZTz2Eb~(jCzMZdPqz8l!J8z{~nFB99=imofc@@P89w$Ff!jLhc$HQ-+S=C2)?M4YN z&u5jx)Ds}}@{(g9BP4Fg|`>uDt$Mf5IFe|B#}G zyhc6gZ(r8l+1c)A%Ta9=RT-*)s7g__imC)vgQz^H=8MXW$}K7vszOmYQ5i+$Ad=TG zv%(d;nP&JhVaZBampR1Tye@7ryOxAygwAv)eD<%@IU*C z-UZT8?R&7x%csNTQSF%U+2`6KHGLS!RqbAu`vkl5$W?7i8oOJ;Qm|-Z+!ca6B(l6=f!8?Tp zxA@*$ZGv)!;R6*MuEcfWT7h9IOmZ2D!}miAz17yYVcTl>`ig!soIaB!hhMu4Dq+B7 zaL{l)KRXBJ+^;Kuk@xXVuiK!4ERUfu*1XzU^4`iPT7`3J>$q=&&H`5F8*X+3RjAEbsL-|Bmm2@NfG*|{Q3ZQzg@hnvBOwWNp zGyXVL>@!-#h;|oj-e>HC59)|HbVmd3{l>^IU3e{X!Sema3sCi6&3J7MT$-X17j+lx zKVaMwSwNRQV~kxug@psg8}jWtpD`jiNG>u-*!hg9u^93`oiH0r=S>Y_Td-@lo;Uga z1)cR-cY*iq3=etfcyygF;b1xAhk#x zQjaV|79sZ__agTp4M-!>gft`fBa4xrAP*o*kflfq@}Lkn=iQ^H|BmmGcN<+T^pASa z({SxWuUhba;dM#rQ?U1v_f1&eqUGWDEn04S*JUr_Lt2Hd%WXT~acXec(oANx(+?Ookn zGeR3lQ&*dBHM~{nOAnpj@ZaX{vkj()qil+c(h%8JJ0%b3AR5YSzq3#-Oh}g`p?Ww` z^$3ec50JJRx8zoXTUjrWWXCr-cG(lLV>-pW=a;(I8ga zod@BQ1gU|GU$x#0V)+3w;gSR?k501bexBUv6btp1CP|F4Xd5I)E0;H2S`PpB$b#?4#eh3ZCu8?%Qh%WlF9#neo)!nKfpAT zCdtMgd0oN-V|S(a-j7c+$sn8fxuf~kpF5I#$nx|_1|^9`O(_{MiAIen8PSPG4YzHC zpEd`nM5FptjDkIhMs+C}ZA~<)y|qzpl8;oOeF?+uq4;VpXz{fPW)oe&g?1d7NoHPi vWPyup11t`6YH!lbsprNh6Ou%MwHdA1FdD>VPJ7NGBjyyRc=!WIXlL_3ka)~a delta 3421 zcmaKu2~ZSQ8i2cB-*2X;hv6I?aL|A{9$W(A5j-&xybxl%aKtN7gM)ZrL53yY8-W(eg^lb)!jXCA#kZFfjI3ZS72bRsT2t|Mz>Z zr{9|?sjuNmTB^CqPC_xmh;LRo)7au8?zC7!pR8_()f-~j`eR;~^flv{x$r&Dg4v5_ z^O0y4zlEt>1o2xa=Fa*p1nq?1g3%iNW<%TVw*a&Xzgf{%`Azepdi-iZ%k!IxmhCq) zTDso^w0?f$(OiDx&|WQxLGv$(dtqD>cURTZ?y$ex=rD`LjFx1Xr|vdiGb^UoX`#|6 ze=FOhkz#SJ)0zg;JtVMp7Del{v~JB~wXL5|n5qOfktX<;U_J`OorK@^9tC@?N=4UN0BR3*|gHM}9*dAor1@ z5#Nrs+QJDMbdmJR~j!3mr^8`6d_q9PJAkUC*Ba7#WUh@ zu~FP1ZWc?$#o}CXvN&2CEGCLEqFpo#{}!GIcZKW1Md74yMA#>Qut8WUc!Zh4Tf!(| zpb#r`5rPGof6hPRZ}Hdo^Ze)hCwx6$!%zEwU(PS!r}J6-2)-ZRlaJ&Bc%FO4eb0T% zUFN>zPH-P{JGm`f8MlOchnvEU;f8R1xo%uIr*aJWk=!GHA(zM(D%E=yQVZ|O|_U<=xoyd~>`S}Z%Worrl&waoF-o;hyuO838B>);*4YylE;T&6x0?4V?^5`oBrco za!(n=Tc);GaTxik7>wGgxEBf+Oxo&Q{%mANmJfrFCXzEKfMJ+{ZADg$651jSqwZ~y z1tZ@yRE&HBm@)EAL(m(W$kZr#y}21bW4i0+%f!Y){8bXE4;-)6n97@P5QIZ`L_kDD zLS()9hLUhBNyub0CYj;;lYbGO`I$-9GAv#-(wuI}ro)s1Ia7Qn@~u}lzVz?6(-5I` zn!gTU*s)285rYWZ3Bs%CB52Sg2Y6haEudFZ6-?Fi0en_NpTh0UatwIpQUy-MQ313% zj2>>KUBMG3njpNEp8g*t(eOhhXM!G;G#~OVkO*+c(@?lrNuzyj&nxLCK#uX47)b-6 zq|Cv?jVjExw&`hg^dMhXB8#szG1>r2p!xTvr=}saNI4{5mxCmi=oX6j&77Bf%Km|A z0z9YI=+Ph3D+HSA=@B3!95LYB;Gn+i)-+Hjj2o|-;fn^k5}pK$#!)?cXm8(S8~4yM z$g+jEP1XjJ_R?OyMCD$Z2ewS9JyCl|jE8X{oXpKxmkOsFr~{VoqtoHZaa@jPA}&YX zPvhHK<-zOv;hi3$!r?u*ek`*>;eHwm6V|A&YBd<$BzA7Qq&#k23N|E-htf@sQ0R7m z=0Mg)Tl>fd56~Fjlx`fL^We;ij+HF^h<5W0IAUEA_NOGll{q-z`H$!*cp}pF{VTVW zZMdF3L(7wO>sl*i;pjZAkuZ!tb-vcZfwbD%o$xsjzuLO}gh%5`*gLxLgEo_eipSbJ z;GAI%vYKz3uc_gxS+i-p=oZ8EpB`%-iLey<*aF51Kbx(?;5g$;&#E!L;{&0E%T zXe^v_*;C>0ZPBFDp7tvy5R>gIp#HXvCuRBY&xfI1;MAAl68t8`Zqh4L>`5$q^Gu0= z!Ef7Qps5)%CH?GpCZyV3tyiJ?m{faUGL$w44THbO1PSmc-`*82Eo@KMwNRm4@Ys=X zq?0aq942SDX*->(yf2TE-W3-JUOtDbBG=fLY+q(<>rRhWN@uKdw?5;`;3hai^rKrH zf0JNrrQ;lAjSg(zRqxd}R9IE%kYRF_!{OUArB#j+U{7|OW^vVyMBibQU+rjyCs#vX z_4e>}G}Jh13>`DK;<+*|gPO~?>~tUzNF>r(-?Gy=Ar^*a;-;?54Gw{EkAh9$yyVRG z-AwN#r~5gOT%{MpO|yl9Yge!Z;x0rbx24MqE<_<+kgmvUNHh|IbVIr$JrEbt6X}KY zM*1MJNE{N6Bp``MUnB`hMpBS|NGj4F8GsB#1|hE_X~FZnE7g>1pN4G_H-*%@6>{-}$-+dCEx~5p*t6ZyAUiiR`%s}$=!Ur=qo&OKQ^IT#8 diff --git a/reports/current.md b/reports/current.md index 0e8d216..c53e326 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 20:10:46 UTC +Generated: 2026-02-28 20:27:32 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 20:10:46 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1876 | +| deferred | 1858 | | n_a | 24 | | stub | 1 | -| verified | 1750 | +| verified | 1768 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1787 | +| deferred | 1781 | | n_a | 241 | -| verified | 1229 | +| verified | 1235 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 20:10:46 UTC ## Overall Progress -**3278/6942 items complete (47.2%)** +**3302/6942 items complete (47.6%)**