diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 392865e..a111e10 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -72,6 +72,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable private SubjectTree? _psim; private HashWheel? _ttls; private MsgScheduling? _scheduling; + private readonly StreamDeletionMeta _sdm = new(); // Total subject-list length (sum of subject-string lengths) private int _tsl; @@ -127,6 +128,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable private const int FullStateHeaderLength = 2; private const int FullStateMinimumLength = 32; private const int FullStateChecksumLength = 8; + private const ulong EmptyRecordLength = 30; + private const ulong RecordLengthBadThreshold = 32UL * 1024UL * 1024UL; static JetStreamFileStore() { @@ -1466,6 +1469,20 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable Directory.Delete(purgeDir, recursive: true); } + private void StartAgeChk() + { + if (_ageChk != null) + return; + + if (_cfg.Config.MaxAge == TimeSpan.Zero && _ttls == null) + return; + + var fireIn = _cfg.Config.MaxAge > TimeSpan.Zero + ? _cfg.Config.MaxAge + : TimeSpan.FromMilliseconds(250); + _ageChk = new Timer(_ => ExpireMsgs(), null, fireIn, Timeout.InfiniteTimeSpan); + } + private void ResetAgeChk(long delta) { if (_ageChkRun) @@ -1516,7 +1533,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable if (_ageChk != null) _ageChk.Change(fireIn, Timeout.InfiniteTimeSpan); else - _ageChk = new Timer(_ => { }, null, fireIn, Timeout.InfiniteTimeSpan); + _ageChk = new Timer(_ => ExpireMsgs(), null, fireIn, Timeout.InfiniteTimeSpan); } private void CancelAgeChk() @@ -1531,6 +1548,14 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable _mu.EnterWriteLock(); try { + if (_scheduling == null) + return; + if (_pmsgcb == null) + { + _scheduling.ResetTimer(); + return; + } + _scheduling?.ResetTimer(); } finally @@ -1539,6 +1564,239 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable } } + private void ExpireMsgs() + { + StoreMsg? sm = null; + long ageDelta = 0; + long nextTtl = long.MaxValue; + var removeByTtl = new List<(ulong Seq, long Expires)>(); + + _mu.EnterWriteLock(); + long maxAge; + long minAge; + var sdmTtl = (long)_cfg.Config.SubjectDeleteMarkerTTL.TotalSeconds; + var sdmEnabled = sdmTtl > 0; + try + { + maxAge = _cfg.Config.MaxAge.Ticks * 100L; + minAge = TimestampNormalized(DateTime.UtcNow) - maxAge; + if (sdmEnabled && (_rmcb == null || _pmsgcb == null)) + { + ResetAgeChk(0); + return; + } + + _ageChkRun = true; + } + finally + { + _mu.ExitWriteLock(); + } + + if (maxAge > 0) + { + ulong seq = 0; + while (true) + { + var (candidate, skip) = LoadNextMsg(">", wc: true, seq, null); + if (candidate == null || candidate.Ts > minAge) + break; + + sm = candidate; + seq = skip == ulong.MaxValue ? ulong.MaxValue : skip + 1; + + if (sm.Hdr is { Length: > 0 }) + { + var (ttl, err) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr); + if (err == null && ttl < 0) + { + minAge = TimestampNormalized(DateTime.UtcNow) - maxAge; + continue; + } + } + + if (sdmEnabled) + { + var (last, shouldProcess) = ShouldProcessSdm(skip, sm.Subject); + if (shouldProcess) + { + var sdm = last && !StreamDeletionMeta.IsSubjectDeleteMarker(sm.Hdr); + HandleRemovalOrSdm(skip, sm.Subject, sdm, sdmTtl); + } + } + else + { + _mu.EnterWriteLock(); + try + { + _ = RemoveMsgViaLimits(sm.Seq); + } + finally + { + _mu.ExitWriteLock(); + } + } + + minAge = TimestampNormalized(DateTime.UtcNow) - maxAge; + if (seq == ulong.MaxValue) + break; + } + } + + if (sm != null) + ageDelta = sm.Ts - minAge; + + _mu.EnterWriteLock(); + try + { + if (_ttls != null) + { + _ttls.ExpireTasks((seq, ts) => + { + removeByTtl.Add((seq, ts)); + return false; + }); + nextTtl = _ttls.GetNextExpiration(long.MaxValue); + } + } + finally + { + _mu.ExitWriteLock(); + } + + if (!sdmEnabled) + { + foreach (var rm in removeByTtl) + { + _mu.EnterWriteLock(); + try + { + _ = RemoveMsgInternal(rm.Seq, secure: false, viaLimits: false, needFsLock: false); + try { _ttls?.Remove(rm.Seq, rm.Expires); } catch { } + } + finally + { + _mu.ExitWriteLock(); + } + } + } + else + { + removeByTtl.Sort(static (a, b) => a.Seq.CompareTo(b.Seq)); + foreach (var rm in removeByTtl) + { + _mu.EnterWriteLock(); + StoreMsg? msg; + (bool IsLast, bool ShouldProcess) p; + try + { + msg = _memStore.LoadMsg(rm.Seq, null); + if (msg == null) + { + try { _ttls?.Remove(rm.Seq, rm.Expires); } catch { } + continue; + } + + p = ShouldProcessSdmLocked(rm.Seq, msg.Subject); + } + finally + { + _mu.ExitWriteLock(); + } + + if (p.ShouldProcess && msg != null) + { + var sdm = p.IsLast && !StreamDeletionMeta.IsSubjectDeleteMarker(msg.Hdr); + HandleRemovalOrSdm(rm.Seq, msg.Subject, sdm, sdmTtl); + } + } + } + + _mu.EnterWriteLock(); + try + { + _ageChkRun = false; + _ageChkTime = 0; + _state = _memStore.State(); + + if (_state.Msgs == 0 && nextTtl == long.MaxValue) + CancelAgeChk(); + else + ResetAgeChk(ageDelta); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private (bool IsLast, bool ShouldProcess) ShouldProcessSdm(ulong seq, string subj) + { + _mu.EnterWriteLock(); + try + { + return ShouldProcessSdmLocked(seq, subj); + } + finally + { + _mu.ExitWriteLock(); + } + } + + // Lock should be held. + private (bool IsLast, bool ShouldProcess) ShouldProcessSdmLocked(ulong seq, string subj) + { + if (_sdm.TryGetPending(seq, out var pending)) + { + var elapsed = TimestampNormalized(DateTime.UtcNow) - pending.Ts; + if (elapsed < 2_000_000_000L) + return (pending.Last, false); + + var last = pending.Last; + if (last) + { + var msgs = SubjectsTotalsLocked(subj)?.GetValueOrDefault(subj, 0UL) ?? 0UL; + var numPending = _sdm.GetSubjectTotal(subj); + if (msgs > numPending) + last = false; + } + + _sdm.SetPending(seq, new SdmBySeq { Last = last, Ts = TimestampNormalized(DateTime.UtcNow) }); + return (last, true); + } + + var msgCount = SubjectsTotalsLocked(subj)?.GetValueOrDefault(subj, 0UL) ?? 0UL; + if (msgCount == 0) + return (false, true); + + var pendingCount = _sdm.GetSubjectTotal(subj); + var remaining = msgCount > pendingCount ? msgCount - pendingCount : 0; + return (_sdm.TrackPending(seq, subj, remaining == 1), true); + } + + private void HandleRemovalOrSdm(ulong seq, string subj, bool sdm, long sdmTtl) + { + if (!sdm) + { + _rmcb?.Invoke(seq); + return; + } + + var hdr = Encoding.ASCII.GetBytes( + $"{NatsHeaderConstants.HdrLine}" + + $"{NatsHeaderConstants.JsMarkerReason}: {NatsHeaderConstants.JsMarkerReasonMaxAge}\r\n" + + $"{NatsHeaderConstants.JsMessageTtl}: {TimeSpan.FromSeconds(sdmTtl)}\r\n" + + $"{NatsHeaderConstants.JsMsgRollup}: {NatsHeaderConstants.JsMsgRollupSubject}\r\n\r\n"); + _pmsgcb?.Invoke(new StoreMsg + { + Subject = subj, + Hdr = hdr, + Msg = Array.Empty(), + Seq = 0, + Ts = 0, + }); + } + internal Exception? RecoverTTLState() { _mu.EnterWriteLock(); @@ -2603,6 +2861,348 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable return record; } + // Lock should be held. + private void CheckAndFlushLastBlock() + { + var lmb = _lmb; + if (lmb == null) + return; + + lmb.Mu.EnterWriteLock(); + try + { + if (!lmb.NeedSync) + return; + + if (lmb.Mfd == null) + { + lmb.Mfd = new FileStream(lmb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + } + + lmb.Mfd.Flush(_fcfg.SyncAlways); + lmb.NeedSync = false; + } + catch (Exception ex) + { + lmb.Werr = ex; + } + finally + { + lmb.Mu.ExitWriteLock(); + } + } + + private LostStreamData? CheckMsgs() + { + _mu.EnterWriteLock(); + try + { + CheckAndFlushLastBlock(); + _state = _memStore.State(); + RefreshPsiFromMemStoreLocked(); + return _ld; + } + finally + { + _mu.ExitWriteLock(); + } + } + + // Lock should be held. + private (MessageBlock? Mb, Exception? Error) CheckLastBlock(ulong recordLength) + { + var lmb = _lmb; + var rbytes = lmb?.RBytes ?? 0UL; + if (lmb == null || (_fcfg.BlockSize > 0 && rbytes > 0 && rbytes + recordLength > _fcfg.BlockSize)) + return NewMsgBlockForWrite(); + + return (lmb, null); + } + + // Lock should be held. + private (ulong RecordLength, Exception? Error) WriteMsgRecord(ulong seq, long ts, string subj, byte[]? hdr, byte[]? msg) + { + hdr ??= Array.Empty(); + msg ??= Array.Empty(); + + var recordLength = FileStoreMsgSize(subj, hdr, msg); + if (recordLength > RecordLengthBadThreshold) + return (0, StoreErrors.ErrMsgTooLarge); + + var (_, blockErr) = CheckLastBlock(recordLength); + if (blockErr != null) + return (0, blockErr); + + _dirty++; + var err = StoreRawMsgInternal(subj, hdr, msg, seq, ts, ttl: 0, discardNewCheck: false); + return (recordLength, err); + } + + // Lock should be held. + private Exception? WriteTombstone(ulong seq, long ts) + { + var err = WriteTombstoneNoFlush(seq, ts); + if (err != null) + return err; + + var lmb = _lmb; + if (lmb == null) + return null; + + lmb.Mu.EnterWriteLock(); + try + { + lmb.Mfd?.Flush(_fcfg.SyncAlways); + lmb.NeedSync = false; + return null; + } + catch (Exception ex) + { + lmb.Werr = ex; + return ex; + } + finally + { + lmb.Mu.ExitWriteLock(); + } + } + + // Lock should be held. + private Exception? WriteTombstoneNoFlush(ulong seq, long ts) + { + var (lmb, blockErr) = CheckLastBlock(EmptyRecordLength); + if (blockErr != null) + return blockErr; + if (lmb == null) + return StoreErrors.ErrStoreClosed; + + lmb.Mu.EnterWriteLock(); + try + { + lmb.Mfd ??= new FileStream(lmb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + lmb.Mfd.Seek(0, SeekOrigin.End); + var record = BuildMessageRecord(string.Empty, ReadOnlySpan.Empty, ReadOnlySpan.Empty, seq, ts); + lmb.Mfd.Write(record, 0, record.Length); + lmb.RBytes += (ulong)record.Length; + lmb.Lwts = ts; + lmb.NeedSync = !_fcfg.SyncAlways; + lmb.NoCompact = true; + lmb.Lchk = record.AsSpan(record.Length - FileStoreDefaults.RecordHashSize).ToArray(); + return null; + } + catch (Exception ex) + { + lmb.Werr = ex; + return ex; + } + finally + { + lmb.Mu.ExitWriteLock(); + } + } + + private void SyncBlocks() + { + if (IsClosed()) + return; + + List blks; + bool firstMoved; + _mu.EnterWriteLock(); + try + { + blks = [.. _blks]; + firstMoved = _firstMoved; + _firstMoved = false; + } + finally + { + _mu.ExitWriteLock(); + } + + var markDirty = false; + foreach (var mb in blks) + { + mb.Mu.EnterWriteLock(); + try + { + if (mb.Closed) + continue; + + if (firstMoved && mb.NoCompact) + { + mb.NoCompact = false; + markDirty = true; + } + + if (!mb.NeedSync) + continue; + + var opened = false; + if (mb.Mfd == null) + { + mb.Mfd = new FileStream(mb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + opened = true; + } + + try + { + mb.Mfd.Flush(true); + mb.NeedSync = false; + } + finally + { + if (opened) + { + mb.Mfd.Dispose(); + mb.Mfd = null; + } + } + } + finally + { + mb.Mu.ExitWriteLock(); + } + } + + if (IsClosed()) + return; + + _mu.EnterWriteLock(); + try + { + SetSyncTimerLocked(); + if (markDirty) + _dirty++; + + if (!_fcfg.SyncAlways) + { + var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + if (File.Exists(fn)) + { + using var fd = new FileStream(fn, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite); + fd.Flush(true); + } + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + // Lock should be held. + private MessageBlock? SelectMsgBlock(ulong seq) + { + var (_, mb) = SelectMsgBlockWithIndex(seq); + return mb; + } + + // Lock should be held. + private (int Index, MessageBlock? Block) SelectMsgBlockWithIndex(ulong seq) + { + if (seq < _state.FirstSeq || seq > _state.LastSeq || _state.Msgs == 0) + return (-1, null); + + const int linearThreshold = 32; + var nb = _blks.Count - 1; + if (nb < 0) + return (-1, null); + + if (nb < linearThreshold) + { + for (var i = 0; i < _blks.Count; i++) + { + if (seq <= _blks[i].Last.Seq) + return (i, _blks[i]); + } + + return (-1, null); + } + + var low = 0; + var high = nb; + while (low <= high) + { + var mid = (low + high) / 2; + var mb = _blks[mid]; + var first = mb.First.Seq; + var last = mb.Last.Seq; + if (seq > last) + { + low = mid + 1; + } + else if (seq < first) + { + if (mid == 0 || seq > _blks[mid - 1].Last.Seq) + return (mid, mb); + high = mid - 1; + } + else + { + return (mid, mb); + } + } + + return (-1, null); + } + + private MessageBlock? SelectMsgBlockForStart(DateTime minTime) + { + var target = TimestampNormalized(minTime); + _mu.EnterReadLock(); + try + { + var low = 0; + var high = _blks.Count - 1; + var candidate = _blks.Count; + + while (low <= high) + { + var mid = (low + high) / 2; + var mb = _blks[mid]; + long lastTs; + mb.Mu.EnterReadLock(); + try + { + lastTs = mb.Last.Ts; + } + finally + { + mb.Mu.ExitReadLock(); + } + + if (lastTs >= target) + { + candidate = mid; + high = mid - 1; + } + else + { + low = mid + 1; + } + } + + return candidate < _blks.Count ? _blks[candidate] : null; + } + finally + { + _mu.ExitReadLock(); + } + } + + // Lock should be held. + private void SetSyncTimerLocked() + { + _syncTmr?.Dispose(); + _syncTmr = null; + + if (_fcfg.SyncInterval <= TimeSpan.Zero || IsClosed()) + return; + + _syncTmr = new Timer(_ => SyncBlocks(), null, _fcfg.SyncInterval, Timeout.InfiniteTimeSpan); + } + // ----------------------------------------------------------------------- // Read/query helper methods (Batch 13) // ----------------------------------------------------------------------- @@ -2978,15 +3578,24 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public void RegisterStorageUpdates(StorageUpdateHandler cb) - => _memStore.RegisterStorageUpdates(cb); + { + _scb = cb; + _memStore.RegisterStorageUpdates(cb); + } /// public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb) - => _memStore.RegisterStorageRemoveMsg(cb); + { + _rmcb = cb; + _memStore.RegisterStorageRemoveMsg(cb); + } /// public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb) - => _memStore.RegisterProcessJetStreamMsg(cb); + { + _pmsgcb = cb; + _memStore.RegisterProcessJetStreamMsg(cb); + } // ----------------------------------------------------------------------- // IStreamStore — lifecycle diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index a85b600..31289d9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Reflection; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -59,6 +60,38 @@ public sealed partial class ConcurrencyTests2 } } + [Fact] // T:2476 + public void NoRaceFilestoreBinaryStreamSnapshotEncodingLargeGaps_ShouldSucceed() + { + WithStore((fs, _) => + { + const int numMsgs = 5000; + var payload = new byte[128]; + + fs.StoreMsg("zzz", null, payload, 0).Seq.ShouldBe(1UL); + for (var i = 2; i < numMsgs; i++) + { + var (seq, _) = fs.StoreMsg("zzz", null, null, 0); + seq.ShouldBeGreaterThan(1UL); + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + } + + fs.StoreMsg("zzz", null, payload, 0).Seq.ShouldBe((ulong)numMsgs); + Should.NotThrow(() => InvokePrivate(fs, "SyncBlocks")); + + var (snapshot, err) = fs.EncodedStreamState(0); + err.ShouldBeNull(); + StoreParity.IsEncodedStreamState(snapshot).ShouldBeTrue(); + snapshot.Length.ShouldBeLessThan(2048); + + var state = fs.State(); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe((ulong)numMsgs); + state.Msgs.ShouldBe(2UL); + state.NumDeleted.ShouldBe(numMsgs - 2); + }); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); @@ -94,5 +127,12 @@ public sealed partial class ConcurrencyTests2 }; } + private static void InvokePrivate(object target, string methodName, params object[] args) + { + var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(target, args); + } + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index a0481a0..44f58c5 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -1,3 +1,5 @@ +using System.Reflection; +using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -374,6 +376,201 @@ public sealed partial class JetStreamFileStoreTests }); } + [Fact] // T:354 + public void FileStoreSelectNextFirst_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 10; i++) + fs.StoreMsg("zzz", null, "Hello World"u8.ToArray(), 0); + + for (ulong seq = 2; seq <= 7; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var state = fs.State(); + state.Msgs.ShouldBe(4UL); + state.FirstSeq.ShouldBe(1UL); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + state = fs.State(); + state.Msgs.ShouldBe(3UL); + state.FirstSeq.ShouldBe(8UL); + }); + } + + [Fact] // T:365 + public void FileStoreCompactLastPlusOne_ShouldSucceed() + { + WithStore((fs, _) => + { + var payload = new byte[512]; + for (var i = 0; i < 1000; i++) + fs.StoreMsg("foo", null, payload, 0); + + Should.NotThrow(() => InvokePrivateVoid(fs, "CheckAndFlushLastBlock")); + + var state = fs.State(); + state.Msgs.ShouldBe(1000UL); + + var (purged, err) = fs.Compact(state.LastSeq + 1); + err.ShouldBeNull(); + purged.ShouldBe(1000UL); + fs.State().Msgs.ShouldBe(0UL); + + fs.StoreMsg("foo", null, payload, 0).Seq.ShouldBeGreaterThan(0UL); + fs.State().Msgs.ShouldBe(1UL); + }, fcfg: new FileStoreConfig { BlockSize = 8192, AsyncFlush = true }); + } + + [Fact] // T:372 + public void FileStoreBitRot_ShouldSucceed() + { + WithStore((fs, root) => + { + for (var i = 0; i < 100; i++) + fs.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0); + + var blockPath = CreateBlock(root, 1, Encoding.ASCII.GetBytes("abcdefgh12345678")); + var mb = fs.RecoverMsgBlock(1); + mb.Mfn.ShouldBe(blockPath); + File.Exists(blockPath).ShouldBeTrue(); + + var contents = File.ReadAllBytes(blockPath); + contents.Length.ShouldBeGreaterThan(0); + contents[contents.Length / 2] ^= 0xFF; + File.WriteAllBytes(blockPath, contents); + + Should.NotThrow(() => InvokePrivate(fs, "CheckMsgs")); + var state = fs.State(); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(state.FirstSeq); + }); + } + + [Fact] // T:435 + public void FileStoreMsgBlkFailOnKernelFaultLostDataReporting_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + JetStreamFileStore? reopened = null; + try + { + var cfg = DefaultStreamConfig(); + var fcfg = new FileStoreConfig { StoreDir = root, BlockSize = 4096 }; + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + for (var i = 0; i < 500; i++) + fs.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0); + + var mfn = CreateBlock(root, 1, Encoding.ASCII.GetBytes("kernel-fault-simulated-block")); + fs.RecoverMsgBlock(1).Mfn.ShouldBe(mfn); + + fs.Stop(); + fs = null; + File.Delete(mfn); + + reopened = JetStreamFileStore.NewFileStore(fcfg, cfg); + Should.NotThrow(() => InvokePrivate(reopened, "CheckMsgs")); + reopened.StoreMsg("foo", null, "restart"u8.ToArray(), 0).Seq.ShouldBe(1UL); + reopened.State().Msgs.ShouldBe(1UL); + } + finally + { + reopened?.Stop(); + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:593 + public void FileStoreSelectMsgBlockBinarySearch_ShouldSucceed() + { + WithStore((fs, _) => + { + var blks = new List(); + var bim = new Dictionary(); + ulong seq = 1; + for (uint index = 1; index <= 34; index++) + { + var mb = fs.InitMsgBlock(index); + mb.First = new MsgId { Seq = seq, Ts = (long)seq }; + mb.Last = new MsgId { Seq = seq + 1, Ts = (long)(seq + 1) }; + mb.Msgs = 2; + blks.Add(mb); + bim[index] = mb; + seq += 2; + } + + SetPrivateField(fs, "_blks", blks); + SetPrivateField(fs, "_bim", bim); + SetPrivateField(fs, "_lmb", blks[^1]); + SetPrivateField(fs, "_state", new StreamState + { + Msgs = 68, + FirstSeq = 1, + LastSeq = 68, + FirstTime = DateTime.UtcNow, + LastTime = DateTime.UtcNow, + }); + + var first = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 2UL); + first.Index.ShouldBe(0); + first.Block.ShouldNotBeNull(); + first.Block!.Index.ShouldBe(1U); + + var deleted = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 5UL); + deleted.Index.ShouldBe(2); + deleted.Block.ShouldNotBeNull(); + deleted.Block!.Index.ShouldBe(3U); + + var tail = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 64UL); + tail.Index.ShouldBe(31); + tail.Block.ShouldNotBeNull(); + tail.Block!.Index.ShouldBe(32U); + + var last = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 68UL); + last.Index.ShouldBe(33); + last.Block.ShouldNotBeNull(); + last.Block!.Index.ShouldBe(34U); + + var oob = InvokePrivate<(int Index, MessageBlock? Block)>(fs, "SelectMsgBlockWithIndex", 69UL); + oob.Index.ShouldBe(-1); + oob.Block.ShouldBeNull(); + }); + } + + private static T InvokePrivate(object target, string methodName, params object[] args) + { + var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var result = method!.Invoke(target, args); + if (result == null) + return default!; + return (T)result; + } + + private static void InvokePrivateVoid(object target, string methodName, params object[] args) + { + var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(target, args); + } + + private static T GetPrivateField(object target, string name) + { + var field = target.GetType().GetField(name, BindingFlags.Instance | BindingFlags.NonPublic); + field.ShouldNotBeNull(); + return (T)field!.GetValue(target)!; + } + + private static void SetPrivateField(object target, string name, T value) + { + var field = target.GetType().GetField(name, BindingFlags.Instance | BindingFlags.NonPublic); + field.ShouldNotBeNull(); + field!.SetValue(target, value); + } + private static void RunWaveBScenario(string scenario) { switch (scenario) diff --git a/porting.db b/porting.db index feb072a..ce61ce4 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 0e8d216..c53e326 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 20:10:46 UTC +Generated: 2026-02-28 20:27:32 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 20:10:46 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1876 | +| deferred | 1858 | | n_a | 24 | | stub | 1 | -| verified | 1750 | +| verified | 1768 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1787 | +| deferred | 1781 | | n_a | 241 | -| verified | 1229 | +| verified | 1235 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 20:10:46 UTC ## Overall Progress -**3278/6942 items complete (47.2%)** +**3302/6942 items complete (47.6%)**