From 4f0a7f40fc038f3488d97a527e7cedb722841336 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 15:10:46 -0500 Subject: [PATCH] feat(batch14): complete group1 write path and wave1 tests --- .../JetStream/FileStore.cs | 571 ++++++++++++++++++ .../JetStream/MessageBlock.cs | 20 + .../ConcurrencyTests1.Impltests.cs | 42 ++ .../JetStreamClusterLongTests.Impltests.cs | 60 ++ .../JetStreamFileStoreTests.Impltests.cs | 171 ++++++ porting.db | Bin 6594560 -> 6598656 bytes reports/current.md | 11 +- 7 files changed, 869 insertions(+), 6 deletions(-) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index c80f15a..392865e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -14,6 +14,7 @@ // Adapted from server/filestore.go (fileStore struct and methods) using System.Buffers; +using System.Buffers.Binary; using System.Security.Cryptography; using System.Text; using System.Text.Json; @@ -2032,6 +2033,576 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable _lmb = _blks.Count > 0 ? _blks[^1] : null; } + // ----------------------------------------------------------------------- + // Write/lifecycle helper methods (Batch 14 Group 1) + // ----------------------------------------------------------------------- + + // Lock should be held. + private Dictionary? SubjectsTotalsLocked(string filter) + { + if (_psim == null || _psim.Size() == 0) + return null; + + if (string.IsNullOrEmpty(filter)) + filter = ">"; + + var totals = new Dictionary(StringComparer.Ordinal); + _psim.Match(Encoding.UTF8.GetBytes(filter), (subj, psi) => + { + totals[Encoding.UTF8.GetString(subj)] = psi.Total; + return true; + }); + + return totals; + } + + // Lock should be held. + private byte[] HashKeyForBlock(uint index) + => Encoding.UTF8.GetBytes($"{_cfg.Config.Name}-{index}"); + + // Lock should be held. + private (MessageBlock? Mb, Exception? Error) NewMsgBlockForWrite() + { + uint index = 1; + byte[]? recycledBuf = null; + + if (_lmb != null) + { + _lmb.Mu.EnterWriteLock(); + try + { + index = _lmb.Index + 1; + _lmb.Qch = null; + _lmb.CloseFDs(); + if (_lmb.CacheData?.Buf is { Length: > 0 } buf) + { + recycledBuf = buf; + _lmb.CacheData = null; + } + } + finally + { + _lmb.Mu.ExitWriteLock(); + } + } + + var mb = InitMsgBlock(index); + mb.Mu.EnterWriteLock(); + try + { + mb.Fss = new SubjectTree(); + mb.Llts = 0; + mb.Lwts = TimestampNormalized(DateTime.UtcNow); + mb.First = new MsgId { Seq = _state.LastSeq + 1, Ts = 0 }; + mb.Last = new MsgId { Seq = _state.LastSeq, Ts = TimestampNormalized(_state.LastTime) }; + mb.CacheData = new Cache + { + Buf = recycledBuf ?? GetMsgBlockBuf((int)FileStoreDefaults.DefaultTinyBlockSize), + Fseq = mb.First.Seq, + Idx = Array.Empty(), + Nra = false, + Wp = 0, + }; + } + finally + { + mb.Mu.ExitWriteLock(); + } + + try + { + mb.Mfd = new FileStream(mb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + } + catch (Exception ex) + { + return (null, ex); + } + + if (_prf != null) + { + var encErr = GenEncryptionKeysForBlock(mb); + if (encErr != null) + return (null, encErr); + } + + _ = SHA256.HashData(HashKeyForBlock(index)); + AddMsgBlock(mb); + return (mb, null); + } + + // Lock should be held. + private Exception? GenEncryptionKeysForBlock(MessageBlock? mb) + { + if (mb == null) + return null; + + try + { + var (key, seed, encrypted) = GenEncryptionKeys($"{_cfg.Config.Name}:{mb.Index}"); + mb.Seed = seed; + mb.Nonce = encrypted[..key.NonceSize]; + + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + var keyFile = Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, mb.Index)); + WriteFileWithOptionalSync(keyFile, encrypted); + mb.Kfn = keyFile; + return null; + } + catch (Exception ex) + { + return ex; + } + } + + // Lock should be held. + private Exception? StoreRawMsgInternal(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck) + { + if (IsClosed()) + return StoreErrors.ErrStoreClosed; + + hdr ??= Array.Empty(); + msg ??= Array.Empty(); + + if (_lmb == null) + { + var (newMb, newErr) = NewMsgBlockForWrite(); + if (newErr != null) + return newErr; + _lmb = newMb; + } + + var projectedSize = _lmb!.RBytes + FileStoreMsgSize(subject, hdr, msg); + if (_fcfg.BlockSize > 0 && projectedSize > _fcfg.BlockSize) + { + var (newMb, newErr) = NewMsgBlockForWrite(); + if (newErr != null) + return newErr; + _lmb = newMb; + } + + try + { + _memStore.StoreRawMsg(subject, hdr, msg, seq, ts, ttl, discardNewCheck); + } + catch (Exception ex) + { + return ex; + } + + _state = _memStore.State(); + _receivedAny = true; + + var record = BuildMessageRecord(subject, hdr, msg, seq, ts); + _lmb!.Mu.EnterWriteLock(); + try + { + _lmb.Mfd ??= new FileStream(_lmb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + _lmb.Mfd.Seek(0, SeekOrigin.End); + _lmb.Mfd.Write(record, 0, record.Length); + if (_fcfg.SyncAlways) + _lmb.Mfd.Flush(true); + + var msgSize = FileStoreMsgSize(subject, hdr, msg); + _lmb.Msgs++; + _lmb.Bytes += msgSize; + _lmb.RBytes += (ulong)record.Length; + _lmb.CBytes = _lmb.Bytes; + _lmb.NeedSync = !_fcfg.SyncAlways; + _lmb.Lwts = ts; + _lmb.Lchk = record.AsSpan(record.Length - FileStoreDefaults.RecordHashSize).ToArray(); + + if (_lmb.First.Seq == 0 || _lmb.Msgs == 1 || seq < _lmb.First.Seq) + _lmb.First = new MsgId { Seq = seq, Ts = ts }; + if (seq >= _lmb.Last.Seq) + _lmb.Last = new MsgId { Seq = seq, Ts = ts }; + + _lmb.Fss ??= new SubjectTree(); + if (!string.IsNullOrEmpty(subject)) + { + var subj = Encoding.UTF8.GetBytes(subject); + var (ss, ok) = _lmb.Fss.Find(subj); + if (ok && ss != null) + { + ss.Msgs++; + if (ss.First == 0 || seq < ss.First) + ss.First = seq; + if (seq > ss.Last) + ss.Last = seq; + ss.LastNeedsUpdate = false; + } + else + { + _lmb.Fss.Insert(subj, new SimpleState + { + Msgs = 1, + First = seq, + Last = seq, + LastNeedsUpdate = false, + }); + } + } + } + finally + { + _lmb.Mu.ExitWriteLock(); + } + + RefreshPsiFromMemStoreLocked(); + EnforceMsgLimit(); + EnforceBytesLimit(); + EnforceMsgPerSubjectLimit(fireCallback: false); + return null; + } + + // Lock should be held. + private void RebuildFirst() + { + _state = _memStore.State(); + if (_state.Msgs == 0) + { + _state.FirstSeq = _state.LastSeq + 1; + _state.FirstTime = default; + return; + } + + var ss = _memStore.FilteredState(_state.FirstSeq, ">"); + if (ss.Msgs == 0 || ss.First == 0) + { + _state.FirstSeq = _state.LastSeq + 1; + _state.FirstTime = default; + return; + } + + _state.FirstSeq = ss.First; + var sm = _memStore.LoadMsg(ss.First, null); + if (sm != null) + _state.FirstTime = FromUnixNanosUtc(sm.Ts); + SelectNextFirst(); + } + + // Lock should be held. + private (ulong Seq, Exception? Error) FirstSeqForSubj(string subj) + { + if (string.IsNullOrEmpty(subj)) + return (0, null); + + try + { + var ss = _memStore.FilteredState(_state.FirstSeq, subj); + return (ss.Msgs > 0 ? ss.First : 0, null); + } + catch (Exception ex) + { + return (0, ex); + } + } + + // Lock should be held. + private void EnforceMsgLimit() + { + if (_cfg.Config.MaxMsgs <= 0) + return; + + var maxMsgs = (ulong)_cfg.Config.MaxMsgs; + while (_state.Msgs > maxMsgs) + { + var (removed, err) = DeleteFirstMsg(); + if (err != null || !removed) + { + RebuildFirst(); + break; + } + } + } + + // Lock should be held. + private void EnforceBytesLimit() + { + if (_cfg.Config.MaxBytes <= 0) + return; + + var maxBytes = (ulong)_cfg.Config.MaxBytes; + while (_state.Bytes > maxBytes) + { + var (removed, err) = DeleteFirstMsg(); + if (err != null || !removed) + { + RebuildFirst(); + break; + } + } + } + + // Lock should be held. + private void EnforceMsgPerSubjectLimit(bool fireCallback) + { + if (_cfg.Config.MaxMsgsPer <= 0) + return; + + var limit = (ulong)_cfg.Config.MaxMsgsPer; + var totals = _memStore.SubjectsTotals(">"); + foreach (var entry in totals.ToArray()) + { + var subj = entry.Key; + var remaining = entry.Value; + while (remaining > limit) + { + var (fseq, seqErr) = FirstSeqForSubj(subj); + if (seqErr != null || fseq == 0) + break; + + var (removed, remErr) = RemoveMsgViaLimits(fseq); + if (remErr != null || !removed) + break; + + remaining--; + } + } + + if (fireCallback) + _state = _memStore.State(); + } + + // Lock should be held. + private (bool Removed, Exception? Error) DeleteFirstMsg() + => RemoveMsgViaLimits(_state.FirstSeq); + + // Lock should be held. + private (bool Removed, Exception? Error) RemoveMsgViaLimits(ulong seq) + => RemoveMsgInternal(seq, secure: false, viaLimits: true, needFsLock: false); + + // Lock should be held. + private ulong RemovePerSubject(string subj) + { + if (_psim == null || string.IsNullOrEmpty(subj)) + return 0; + + var subjBytes = Encoding.UTF8.GetBytes(subj); + var (psi, ok) = _psim.Find(subjBytes); + if (!ok || psi == null) + return 0; + + _psim.Delete(subjBytes); + _tsl = Math.Max(0, _tsl - subj.Length); + return psi.Total; + } + + private (bool Removed, Exception? Error) RemoveMsgInternal(ulong seq, bool secure, bool viaLimits, bool needFsLock) + { + if (needFsLock) + _mu.EnterWriteLock(); + + try + { + if (IsClosed()) + return (false, StoreErrors.ErrStoreClosed); + + var sm = _memStore.LoadMsg(seq, null); + var (removed, err) = secure ? _memStore.EraseMsg(seq) : _memStore.RemoveMsg(seq); + if (err != null || !removed) + return (removed, err); + + _state = _memStore.State(); + + var mb = FindMsgBlockForSeq(seq); + if (mb != null) + { + var (_, blockErr) = RemoveMsgFromBlock(mb, seq, secure, viaLimits, sm); + if (blockErr != null) + return (true, blockErr); + } + + RefreshPsiFromMemStoreLocked(); + if (_state.Msgs == 0) + _firstMoved = true; + return (true, null); + } + finally + { + if (needFsLock) + _mu.ExitWriteLock(); + } + } + + // Lock should be held. + private (bool Removed, Exception? Error) RemoveMsgFromBlock(MessageBlock mb, ulong seq, bool secure, bool viaLimits) + => RemoveMsgFromBlock(mb, seq, secure, viaLimits, null); + + // Lock should be held. + private (bool Removed, Exception? Error) RemoveMsgFromBlock(MessageBlock mb, ulong seq, bool secure, bool viaLimits, StoreMsg? sm) + { + ArgumentNullException.ThrowIfNull(mb); + + mb.Mu.EnterWriteLock(); + try + { + if (seq < mb.First.Seq || seq > mb.Last.Seq) + return (false, null); + + if (mb.Msgs > 0) + mb.Msgs--; + + mb.Dmap.Insert(seq); + + if (sm != null) + { + var size = FileStoreMsgSize(sm.Subject, sm.Hdr, sm.Msg); + if (size <= mb.Bytes) + mb.Bytes -= size; + else + mb.Bytes = 0; + + if (!mb.NoTrack && mb.Fss != null && !string.IsNullOrEmpty(sm.Subject)) + { + var subjBytes = Encoding.UTF8.GetBytes(sm.Subject); + var (ss, ok) = mb.Fss.Find(subjBytes); + if (ok && ss != null) + { + if (ss.Msgs > 0) + ss.Msgs--; + if (ss.Msgs == 0) + mb.Fss.Delete(subjBytes); + else if (seq == ss.First || seq == ss.Last) + ss.LastNeedsUpdate = true; + } + } + } + + mb.SelectNextFirst(); + if (mb.Msgs == 0) + { + mb.First = new MsgId { Seq = mb.Last.Seq + 1, Ts = 0 }; + mb.Last = new MsgId { Seq = mb.First.Seq - 1, Ts = 0 }; + } + + mb.NeedSync = mb.NeedSync || viaLimits || secure; + return (true, null); + } + finally + { + mb.Mu.ExitWriteLock(); + } + } + + // Lock should be held. + private void RemoveMsgsInRange(ulong first, ulong last, bool viaLimits) + { + if (first == 0 || last == 0 || last < first) + return; + + for (var seq = first; seq <= last; seq++) + { + _ = RemoveMsgInternal(seq, secure: false, viaLimits: viaLimits, needFsLock: false); + if (seq == ulong.MaxValue) + break; + } + + RebuildFirst(); + } + + private bool IsClosed() + => _closed; + + // Lock should be held. + private void SelectNextFirst() + { + foreach (var mb in _blks) + { + mb.Mu.EnterWriteLock(); + try + { + mb.SelectNextFirst(); + if (mb.Msgs == 0) + continue; + + _state.FirstSeq = mb.First.Seq; + var sm = _memStore.LoadMsg(_state.FirstSeq, null); + _state.FirstTime = sm == null ? default : FromUnixNanosUtc(sm.Ts); + return; + } + finally + { + mb.Mu.ExitWriteLock(); + } + } + + _state.FirstSeq = _state.LastSeq + 1; + _state.FirstTime = default; + } + + // Lock should be held. + private MessageBlock? FindMsgBlockForSeq(ulong seq) + { + foreach (var mb in _blks) + { + if (seq >= mb.First.Seq && seq <= mb.Last.Seq) + return mb; + } + + return null; + } + + // Lock should be held. + private void RefreshPsiFromMemStoreLocked() + { + _psim ??= new SubjectTree(); + _psim.Reset(); + _tsl = 0; + + var totals = _memStore.SubjectsTotals(">"); + if (totals.Count == 0) + return; + + var fblk = _blks.Count > 0 ? _blks[0].Index : 1U; + var lblk = _lmb?.Index ?? fblk; + + foreach (var kvp in totals) + { + if (kvp.Value == 0) + continue; + + var subject = kvp.Key; + _psim.Insert(Encoding.UTF8.GetBytes(subject), new Psi + { + Total = kvp.Value, + Fblk = fblk, + Lblk = lblk, + }); + _tsl += subject.Length; + } + } + + private static byte[] BuildMessageRecord(string subject, ReadOnlySpan hdr, ReadOnlySpan msg, ulong seq, long ts) + { + var subjectBytes = Encoding.UTF8.GetBytes(subject); + var bodyLength = (8 + 8 + 4 + 4 + 4) + subjectBytes.Length + hdr.Length + msg.Length; + var record = new byte[bodyLength + FileStoreDefaults.RecordHashSize]; + var span = record.AsSpan(); + + var index = 0; + BinaryPrimitives.WriteUInt64LittleEndian(span[index..], seq); + index += 8; + BinaryPrimitives.WriteInt64LittleEndian(span[index..], ts); + index += 8; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], subjectBytes.Length); + index += 4; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], hdr.Length); + index += 4; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], msg.Length); + index += 4; + + subjectBytes.CopyTo(span[index..]); + index += subjectBytes.Length; + hdr.CopyTo(span[index..]); + index += hdr.Length; + msg.CopyTo(span[index..]); + index += msg.Length; + + var hash = SHA256.HashData(record.AsSpan(0, index)); + hash.AsSpan(0, FileStoreDefaults.RecordHashSize).CopyTo(span[index..]); + return record; + } + // ----------------------------------------------------------------------- // Read/query helper methods (Batch 13) // ----------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index 692bd78..33341c4 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -267,6 +267,26 @@ internal sealed class MessageBlock CacheData = null; Fss = null; } + + // Lock should be held. + internal void SelectNextFirst() + { + if (Msgs == 0 || First.Seq == 0 || Last.Seq == 0) + return; + + var seq = First.Seq; + while (seq <= Last.Seq && Dmap.Exists(seq)) + { + if (seq == ulong.MaxValue) + break; + seq++; + } + + if (seq > Last.Seq) + First = new MsgId { Seq = Last.Seq + 1, Ts = 0 }; + else + First = new MsgId { Seq = seq, Ts = First.Ts }; + } } // --------------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index 5192cf3..cf6102a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Diagnostics; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -134,6 +135,47 @@ public sealed partial class ConcurrencyTests1 } } + [Fact] // T:2441 + public void NoRaceJetStreamFileStoreLargeKVAccessTiming_ShouldSucceed() + { + var value = Enumerable.Repeat((byte)'Z', 256).ToArray(); + const int keyCount = 5_000; + + WithStore((fs, _) => + { + for (var i = 1; i <= keyCount; i++) + { + fs.StoreMsg($"KV.STREAM_NAME.{i}", null, value, 0).Seq.ShouldBeGreaterThan(0UL); + } + + var sw = Stopwatch.StartNew(); + var last = fs.LoadLastMsg($"KV.STREAM_NAME.{keyCount}", null); + sw.Stop(); + var lastLookup = sw.Elapsed; + + last.ShouldNotBeNull(); + last!.Msg.ShouldBe(value); + + sw.Restart(); + var first = fs.LoadLastMsg("KV.STREAM_NAME.1", null); + sw.Stop(); + var firstLookup = sw.Elapsed; + + first.ShouldNotBeNull(); + first!.Msg.ShouldBe(value); + + // Keep generous bounds to avoid machine-specific flakiness while still + // asserting access stays fast under a large key set. + lastLookup.ShouldBeLessThan(TimeSpan.FromMilliseconds(250)); + firstLookup.ShouldBeLessThan(TimeSpan.FromMilliseconds(350)); + + var firstState = fs.FilteredState(1, "KV.STREAM_NAME.1"); + var lastState = fs.FilteredState(1, $"KV.STREAM_NAME.{keyCount}"); + firstState.First.ShouldBeGreaterThan(0UL); + lastState.First.ShouldBeGreaterThan(0UL); + }, DefaultStreamConfig()); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs new file mode 100644 index 0000000..550a468 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs @@ -0,0 +1,60 @@ +using System.Diagnostics; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterLongTests +{ + [Fact] // T:1219 + public void LongFileStoreEnforceMsgPerSubjectLimit_ShouldSucceed() + { + var root = Path.Combine(Path.GetTempPath(), $"impl-fs-long-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + try + { + fs = JetStreamFileStore.NewFileStore( + new FileStoreConfig { StoreDir = root, BlockSize = 1024 }, + new StreamConfig + { + Name = "zzz", + Storage = StorageType.FileStorage, + Subjects = ["test.>"], + MaxMsgsPer = 1, + MaxMsgs = -1, + MaxBytes = -1, + Discard = DiscardPolicy.DiscardOld, + Retention = RetentionPolicy.LimitsPolicy, + }); + + const int keys = 4_000; + const int rewriteCount = 30_000; + + for (var i = 0; i < keys; i++) + { + fs.StoreMsg($"test.{i:000000}", null, "seed"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + } + + var sw = Stopwatch.StartNew(); + for (var i = 0; i < rewriteCount; i++) + { + var n = Random.Shared.Next(keys); + fs.StoreMsg($"test.{n:000000}", null, "rewrite"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + } + sw.Stop(); + + var totals = fs.SubjectsTotals("test.*"); + totals.Count.ShouldBe(keys); + totals.Values.All(v => v <= 1UL).ShouldBeTrue(); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(30)); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index 368a098..a0481a0 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -203,6 +203,177 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:592 public void FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed)); + [Fact] // T:549 + public void FileStoreTruncateRemovedBlock_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + fs.StoreMsg("foo", null, "2"u8.ToArray(), 0).Seq.ShouldBe(2UL); + fs.StoreMsg("foo", null, "3"u8.ToArray(), 0).Seq.ShouldBe(3UL); + + var before = fs.State(); + before.Msgs.ShouldBe(3UL); + before.FirstSeq.ShouldBe(1UL); + before.LastSeq.ShouldBe(3UL); + before.NumDeleted.ShouldBe(0); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + var mid = fs.State(); + mid.Msgs.ShouldBe(2UL); + mid.FirstSeq.ShouldBe(1UL); + mid.LastSeq.ShouldBe(3UL); + mid.NumDeleted.ShouldBe(1); + + fs.Truncate(2); + var after = fs.State(); + after.Msgs.ShouldBe(1UL); + after.FirstSeq.ShouldBe(1UL); + after.LastSeq.ShouldBe(2UL); + after.NumDeleted.ShouldBe(1); + }); + } + + [Fact] // T:551 + public void FileStoreRemoveBlockWithStaleStreamState_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + JetStreamFileStore? reopened = null; + try + { + fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var blk1 = CreateBlock(root, 1, "blk1-data"u8.ToArray()); + var blk2 = CreateBlock(root, 2, "blk2-data"u8.ToArray()); + var blk3 = CreateBlock(root, 3, "blk3-data"u8.ToArray()); + + fs.RecoverMsgBlock(1).Index.ShouldBe(1u); + fs.RecoverMsgBlock(2).Index.ShouldBe(2u); + fs.RecoverMsgBlock(3).Index.ShouldBe(3u); + fs.Stop(); + fs = null; + + File.Delete(blk2); + + reopened = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + reopened.RecoverMsgBlock(1).Index.ShouldBe(1u); + reopened.RecoverMsgBlock(3).Index.ShouldBe(3u); + Should.Throw(() => reopened.RecoverMsgBlock(2)); + } + finally + { + reopened?.Stop(); + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:561 + public void FileStoreTombstonesSelectNextFirstCleanup_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 100; i++) + fs.StoreMsg("foo", null, "x"u8.ToArray(), 0); + + for (ulong seq = 2; seq <= 100; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var before = fs.State(); + before.Msgs.ShouldBe(1UL); + before.FirstSeq.ShouldBe(1UL); + before.LastSeq.ShouldBe(100UL); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + var after = fs.State(); + after.Msgs.ShouldBe(0UL); + after.FirstSeq.ShouldBe(101UL); + after.LastSeq.ShouldBe(100UL); + }); + } + + [Fact] // T:572 + public void FileStoreEmptyBlockContainsPriorTombstones_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + fs.StoreMsg("foo", null, "2"u8.ToArray(), 0).Seq.ShouldBe(2UL); + fs.StoreMsg("foo", null, "3"u8.ToArray(), 0).Seq.ShouldBe(3UL); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + + var mid = fs.State(); + mid.Msgs.ShouldBe(1UL); + mid.FirstSeq.ShouldBe(1UL); + mid.LastSeq.ShouldBe(3UL); + mid.Deleted.ShouldContain(2UL); + mid.Deleted.ShouldContain(3UL); + + fs.StoreMsg("foo", null, "4"u8.ToArray(), 0).Seq.ShouldBe(4UL); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + var after = fs.State(); + after.Msgs.ShouldBe(1UL); + after.FirstSeq.ShouldBe(4UL); + after.LastSeq.ShouldBe(4UL); + (after.Deleted ?? Array.Empty()).ShouldBeEmpty(); + }); + } + + [Fact] // T:586 + public void FileStoreLoadNextMsgSkipAhead_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 10; i++) + { + var subj = i % 2 == 0 ? "a" : "c"; + fs.StoreMsg(subj, null, "x"u8.ToArray(), 0); + } + + fs.StoreMsg("b", null, "target"u8.ToArray(), 0).Seq.ShouldBe(11UL); + + var (headSm, headSeq) = fs.LoadNextMsg("b", false, 0, null); + headSm.ShouldNotBeNull(); + headSm!.Subject.ShouldBe("b"); + headSeq.ShouldBe(11UL); + + var (interiorSm, interiorSeq) = fs.LoadNextMsg("b", false, 3, null); + interiorSm.ShouldNotBeNull(); + interiorSm!.Subject.ShouldBe("b"); + interiorSeq.ShouldBe(11UL); + }); + } + + [Fact] // T:587 + public void FileStoreLoadNextMsgMultiSkipAhead_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 10; i++) + { + var subj = i % 2 == 0 ? "a" : "c"; + fs.StoreMsg(subj, null, "x"u8.ToArray(), 0); + } + + fs.StoreMsg("b", null, "target"u8.ToArray(), 0).Seq.ShouldBe(11UL); + + var (headSm, headSeq) = fs.LoadNextMsgMulti(new[] { "b", "d" }, 11, null); + headSm.ShouldNotBeNull(); + headSm!.Subject.ShouldBe("b"); + headSeq.ShouldBe(11UL); + + var (interiorSm, interiorSeq) = fs.LoadNextMsgMulti(new[] { "b", "d" }, 3, null); + interiorSm.ShouldNotBeNull(); + interiorSeq.ShouldBeGreaterThanOrEqualTo(3UL); + }); + } + private static void RunWaveBScenario(string scenario) { switch (scenario) diff --git a/porting.db b/porting.db index bb81d5b59872bb029b880d8942877cae20a56d43..feb072af8e4616f9069149f46ef5007c727358d8 100644 GIT binary patch delta 6952 zcmchbdt4M(mdC4ZRd-iix~r-intssTbOQpmsNFOQDn^JC-x6amK5!gBz_$`alTo5J zkl0R`S>vpp=*(m^yQ@wnJ~LXgF`1}QKchp`B)XGz6P1ivF`}YwjJhPwX&x~COG)Ef=#M0TZDcam|;T(qEkeraVX??NR-lr>yAqAYmt zNJPb=+8p=Lt;=xhQzgW9uoJJK^=C9J@l&Efbr66s~CBJG}rK3{TaO&^#olkZWq1~OuCWU zD%D+1rfaRik?Sic<0Q@(oWbFNveKf;B^9NMc`vZuqBJf#?n(j*<*w>fdH$BUp?i=hIFxa1tZam+O8!qs_?2ot+=*f-AKY$zM-M9g4uW4~` z+FKHDPtVATB4Yc$Rm}jldw^<8Wg^H*T0A z_+&x*+z4b2cpPrb@5T*Nrzsl9^gj+a=5^zSv9lWkdt9)Ox>CY(V{SKYK+*N7ad0ZG z$kg`i2n2d`<%gK2bGmUw?zCD0{y~o;F|)gIL+b2?@3G8}S>3o{=(K3lvod;$M$RKX zN&Wj*5!S6WoMsxh+o0P_u0YHda^59v|3@FSX$!f+TCtXEapjU?J{GN*R?CbQRj>UE z`J0#{O=m{5UgXhgE6AbMR**r9hVODMB4;M8<-C!*uJBz@c9N%uk1HvS6bSDzvRjKP zXr;GWGFqvvmV{PYH{o|`TQ_>NsQTtkQp~aqs%AHdCGabUF-{%0m*fd1y^F2uyiFm? z)D7>F5X)mVNlkcJjQ(Vs%aq^kPUL}7{k6!z) za~wQw_BZPB-E=B1R{KI;6T=P*!k#tM3M2Q?U2ymNUuT2Ese(Dm! zFX=&p2Mko7+fTn2_+VW|01;^c(Rr-BRtg*K#eiS{JgS3&Vra(Cm$}z_k+W`QUN79Evj%ozQ%h#=tMi zs16pLpvl&bwyRS^7OeGDaC{)%Plyu^ouK<*&-c{dR`{*I&=h+|;b2)P*2u7d#ew*E z!a6hk{oz6hwE(H71u*e>^t-MMJ3V#5l@!yq7YPru+7mKk7rDXl5d1d`PD4F?11ffK z?LNKz5lu67^iVx9WD2`eod`cRMcLp+2|7$q(xc(9WC3@IrWiZQ2J=Ej^k(P)cx?tc zv2!BT!rR-l0u-L2xou&rKSk3XhP(L?eNz_>cT8)z)u{(5u=`I_R`Ik(_%I_3*Y%UW zlNfzz)G|6t{E4=hKgs=x>|_pylf7A~nH^Hly=Q9^1M$N!T58i*bRioKSATfaNnUP;uAVnvU#;6T=wNU~8GdL4o+h@Wi-$jsB)>q`R(Df27xH zYlz2k&jvLh$Ky!F8}#RG&9x44Zd#NAAK#!U4=eDYsMe9%+D3Xur<<-0YoKcgt+9jz zyw$g6B|QCW8ms0v(dDfA=f6>w#AAl^%wdMV!2!F6IV?L2FX^51lK8opsGF%R28!Ys8Oo<5Qoi-46=dv0M1ibbce2mTic?YFku;uhCbg?U&Ni7>=41ixwQs(@O$jD z=36?IP;3I91DK9>C)xY7rfo{?x=;H)oKlzW(=1*L7Ot+snK*g?^h=1hK+XeP4sXhm z6UrxJcPnmE1LQrx+1XOBbPl?6MC88(XGHyOMgPXuT$z^cK zT&&iv)oTm2Khsv}1nqrollHv!wDz#}58AD|J3>FfEld#^;g)bo_zzqq2SI9<8wcx0 z@mlUK_qATp6a8Q@i=+}e(Q|@yNA&8`#1UdX-An(KZjc(Kb2LPk(fOKY&6m3RsQ&@| zO?fu>3gsC1Y`-*8J0pnlksWIJv5WyZIdwx&fk+ajp8j`O2&3k<}S+AMH#y& z1*E+sRc*X4TUi(H?Q)Onq8MuU$vl(a@zYLw12&nAcD1EN z7D)KL=1_$P@;$|@OJ_bHUWRNZn@Am5LCQ%nH(^6AiHBj$vIQ=>3lzd9fR82i1R=Y1 zPM8E zaMC2LE=hQs*$xwu3M??*qHKgzlW`Jz52|olDLSTeEOjg>7;hMOPsSZ;i|o({bT4xR zc3YJRJX+-{Z&P-$Tq@I|?zbxygqLqI*CEZR91L4twb`kV)__auop>ecv`#qBe5hH+ zsMcoX6;{H<)#;=D`lj+3XT>6IJD3mAYTdz*v1n1X?i*zyC&`!EFQ%YH)uy}3|7tys zaHc!9NF*CG+`6wKev0PLg)t1|>>(r8=NY%Bh0`To6h9G<2$kY~al5!lC==_%6=FH2 zbBtIl=HnvH61}2B#N7{TOx2C(91=3NiyG5KWp`2jE-D3F?-9HDmSCKwW4{B9VjNAR z95qid?qyYCG6wWEsbjvf2Rc{fKU|&XG9o4<8i`T!T;_L*T(TP<+d0J0q<DWs~5>f7&L%#~;QFhZW~+IgzrfkDarr$IjV@XcDn&iP_3X9~idk zgHc1t@7X$LD}6@H5*#{{<^g|;I}v_$RjXSs+LJiGI;l1r-v7v+0D~`KWJ1)^D{Lh} zMRY>gtCz#EtB28JX2$?T`(JV-i&BSN1tL6i=-mCY-^cUbMrQ|4M9_+om+z=SJysjYz*uiK+nH(s+N zafkzPA}+P@T6}L48@tcsUZ5d{eQHqSUe77jDQkv@b68}UY$yu9c)giP3N?ucfRsz2y?QWHhPwWk`Y^|fQ+*b-X;tUT(O))wHGKJr9? zyOL_)pN@J&c(LBo+4*2)?L=7BXh~3CJ?2T&u(nP~!ZxB{Q9V|A{iw&+X1VG;Tf@m5 z^N-7s{s-Whf0vV>@K}r+(1o0bkvG>)>J+ZDlO6{Q3fOvqvL=OJ?K|l~5|BhB3F(C- zBPmEK(i`z2eULPyFVYX`kN6Njl8ywB3}gT@5XnTckU_{`WC)Ut3`L$mh9NmfF7hNY z9QhIQ6fy!CiHt%fDQkjVv^uE!3-pUlwL+*dy@z<-&S6F+-78FK#SEijZP; zapTO_wsCf*0Ca0hdh7XoZVh>#y~QTu6(?qv;e12kQq|{X*XddpDE!f?Z*OBSz^Bl`%>mm>1p;&WgcmLt_*Ny9Xqj5(ih43nX?YIBZH07;Jd+Rg+=> syGe1vGfr&trEotBoY>r-?8;GC9*cGEIV!Z delta 5054 zcmZ9P3tSY}y~k(H%H(TluR|ucn`iP2w}RaoQ%WghU$?Lb$Cqjn^nqlVC+8Nv}aKjrTv`E@Sz8zHojs z|JV7SnKOsF_MJ>!(@thbyu6O0EE6b-N=N8d(nrwiPL>yC)9srMW$TBs>B@t&1GqMN zkaeCCRBx&Oqvy2IKT+s)@T<%8c4Flr{|cQ-XaeO|=<(hZjPlUWV3|oE{;Gvlt5;X~ znN#|?EA)LDE`CY7^e?`o{dDcmnFdO!Tv5GpdDWUKcDrF7QF|V`xAUQUM;=nYeMtSD zJxHAjM-f-C$Ef^>T4}aqT5HXc-DYj2_J&Q_t*Ev|)Miv0BdQ+N%7_Y}s)(pMR5K&W zhpHs3n6J(Gn6G5L@H+k8RzBP_C^|`I>+@HV1WWCTpVU}aOH4djTeGaj5Ub2$rCF>n ziyE`2HjCwEvCJ%%n#GbnQTrh4V)Ly%%XOdS#x7SqjQnpsRWiz#L?*(@eSL@loP z#E3#M!7Rp`#W=Hg+AK=UVysy_Wfo)1;>msy)Bn&Fn{O4NfGLaAOoy-I~gZsVFb*)>6T{SXGE^5@>t0G#I~F{S$n$R~y1g-DuFe z7wsp$s`bL{<*Glal*)|6|2N~MiKuOIDVmf0+8Aa%4#w@*QZw=W88EL$HBejtl@f8T zgSP!zhW=@_`Up8p*AcHpnMZxkd<@ou)#zB*XUS*agi6+tgT?9BO{kF(r^8eToZ~S7um>&iHxZ5i0KWF z*v!Z%(HMbtj7%M4@mEDI+an__GEyTWB{GsDBgq))uN@;5H2n{qJ3*oed~#2X*B^U} zOyOBb~y=%K^8!5glY~#;cZ(36<&v2>i z7{*WhbR%^Rd|rCLe(r=ghjpqmDYiZrP0+TJi-VQ3#V9D; z{H@D_$aKF4Gan;+9^6$&Pz%#oGVX@Mg7iAr^Wot+=9m+ z9sYQ3zAWsdWU7EtACcRsuc!j)wC!{8q|jh(wv6U{Ia2lOxIlS^A9je&3odj4@VK8xLHi=o2pU~ipKfny^xob^dRrre>VQ~ zO5z~!iZ;&GH`n@vkPEXE8<~e1K1<&hPndkhe;^Kv4VPIzCFC$ls!uV>L}M)JiphBY z88N4?5c`CX9SeEl(Y&M#v>TC|0S8M(CmcC!;ek0T7WX@~jmB$h&x)CRIK!znk=DH2 zhuHsSO6ib4W6v-jO2f$)^hZ>)ZMkrXZ?k^Df$v1zA$|T0(ZZbS5^0k9Xr^SH-;#8(U%f zH8I`np`b@$AmhOk*?LXf_ppZu|M|YfXKMN;DI^;{n{xYo^vl;p?}M7Wf5XRk*w~!&iMR&NBt?{o2$!y{c1um3HB?#AEOg^>TrE-WH^dR?VJF#AMoG9* z>}>|I0<{Pi?YkG6HxQBBJm+;M?LJTH(IzZgfxEI|%kX?m^Ex?~1?Z57XX= z6FYfN91?k?^y~M;e767npSNW_c5^&-^HC%Fko)5DNI#-rxCvGA!ymCkBP%c_qVDY!= z;(Y5J{#j-nv)syC?pZo5mo06UX3MWFjm-1RH0DWWIFrgm3l#qye_i@f>gN9nOh|XwXTeA#ur&s6 zblz3B!S9^fEEwX_;`NSuYAg-gW3>dDfy7>{q2(TWroTlLsOVK!!{ul#UGMExc>;~b zFzdd0L9>!B>JCvT*sE!saO4CL9kzqEowlIuMTM8|$(@$00kNKhlp3QQNOCk$7#E1*mBI@2K$fu^YwS)wQ3sH z4=t0S`Y2kuj^VHB1g#d14JwoLo&@b44ebuQr2m+tF>NvGb?ReoDaCj2m(@FJCl}&2 za%;GyI^V5rrlCmF5}~Pmfe7QKYq|Ol-)b9}fm6`?;Zp~h*mvsK_P<&wJ_~OW-a^v) zu5|Wj|7Y>>Mk^z>qaveR5$CprKR_A(29@lZV*gb`vFe4ob~;ucM%z8sTt7J~45jkJ z$2sx_s>s2Q{0FMY!H?`irND_EItJvsw4@*4?F+4RFL0WDygp5{zeP7Bm$`Mf!yYf$ zBCAWk1KTCLd&}HJJK{j15T`yj(RFx1BAtx;U=fi!r31Fj;woV{Z?XJ_eT!K@O6XJy zd`)g26xF$|Ca7NAzC8af;Pyq9$s^@uxDSoVuBGzwc;+1l1YFs$G~gN!O|K75f)i_8 z8E`+~DvWGqTT1F(NHh|I#OfvWaqmoyvHlmOQp+gid-*--g6+I`RyfCBu zD1~aIl+$vf^s?;<@+Q5FY80mn`Mk&K;O?{EG5AGiHtCJ+?m!6II^MHnXJctM&c0< zl7J-Y9k-G)9JKpwhqsy|LbM*>yt^2u{5Gae`RN!V^#Xaeyue7EDtKQuQl~@tF4+k; z1@DxBJ{5~zy;$^p_%`z)H%FJ6d{1(T#*u=|!OG{5dOkRDKpB0n?r+6QXMA(}=d~XBC_n4;0!ADS(;AaxINsuO#ZDQg{ri^WpMvx~N$6iRusE*Z zEK^srT74cU$cq~c+E3HiEjg_|BpFFT1|fryR3r^aM>3F1WC)Ul3`K?^*+>qOi#&ql zA;XbJk;jk`NIp`4JdTV+Mj=liqme?S2q{LMM8+UbA!Cseh|()Xz037 zuIS}g%SUkZpJ4YlE&&6b57<;Vi0LSNWXdEhYP