diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index c80f15a..392865e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -14,6 +14,7 @@ // Adapted from server/filestore.go (fileStore struct and methods) using System.Buffers; +using System.Buffers.Binary; using System.Security.Cryptography; using System.Text; using System.Text.Json; @@ -2032,6 +2033,576 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable _lmb = _blks.Count > 0 ? _blks[^1] : null; } + // ----------------------------------------------------------------------- + // Write/lifecycle helper methods (Batch 14 Group 1) + // ----------------------------------------------------------------------- + + // Lock should be held. + private Dictionary? SubjectsTotalsLocked(string filter) + { + if (_psim == null || _psim.Size() == 0) + return null; + + if (string.IsNullOrEmpty(filter)) + filter = ">"; + + var totals = new Dictionary(StringComparer.Ordinal); + _psim.Match(Encoding.UTF8.GetBytes(filter), (subj, psi) => + { + totals[Encoding.UTF8.GetString(subj)] = psi.Total; + return true; + }); + + return totals; + } + + // Lock should be held. + private byte[] HashKeyForBlock(uint index) + => Encoding.UTF8.GetBytes($"{_cfg.Config.Name}-{index}"); + + // Lock should be held. + private (MessageBlock? Mb, Exception? Error) NewMsgBlockForWrite() + { + uint index = 1; + byte[]? recycledBuf = null; + + if (_lmb != null) + { + _lmb.Mu.EnterWriteLock(); + try + { + index = _lmb.Index + 1; + _lmb.Qch = null; + _lmb.CloseFDs(); + if (_lmb.CacheData?.Buf is { Length: > 0 } buf) + { + recycledBuf = buf; + _lmb.CacheData = null; + } + } + finally + { + _lmb.Mu.ExitWriteLock(); + } + } + + var mb = InitMsgBlock(index); + mb.Mu.EnterWriteLock(); + try + { + mb.Fss = new SubjectTree(); + mb.Llts = 0; + mb.Lwts = TimestampNormalized(DateTime.UtcNow); + mb.First = new MsgId { Seq = _state.LastSeq + 1, Ts = 0 }; + mb.Last = new MsgId { Seq = _state.LastSeq, Ts = TimestampNormalized(_state.LastTime) }; + mb.CacheData = new Cache + { + Buf = recycledBuf ?? GetMsgBlockBuf((int)FileStoreDefaults.DefaultTinyBlockSize), + Fseq = mb.First.Seq, + Idx = Array.Empty(), + Nra = false, + Wp = 0, + }; + } + finally + { + mb.Mu.ExitWriteLock(); + } + + try + { + mb.Mfd = new FileStream(mb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + } + catch (Exception ex) + { + return (null, ex); + } + + if (_prf != null) + { + var encErr = GenEncryptionKeysForBlock(mb); + if (encErr != null) + return (null, encErr); + } + + _ = SHA256.HashData(HashKeyForBlock(index)); + AddMsgBlock(mb); + return (mb, null); + } + + // Lock should be held. + private Exception? GenEncryptionKeysForBlock(MessageBlock? mb) + { + if (mb == null) + return null; + + try + { + var (key, seed, encrypted) = GenEncryptionKeys($"{_cfg.Config.Name}:{mb.Index}"); + mb.Seed = seed; + mb.Nonce = encrypted[..key.NonceSize]; + + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + var keyFile = Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, mb.Index)); + WriteFileWithOptionalSync(keyFile, encrypted); + mb.Kfn = keyFile; + return null; + } + catch (Exception ex) + { + return ex; + } + } + + // Lock should be held. + private Exception? StoreRawMsgInternal(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck) + { + if (IsClosed()) + return StoreErrors.ErrStoreClosed; + + hdr ??= Array.Empty(); + msg ??= Array.Empty(); + + if (_lmb == null) + { + var (newMb, newErr) = NewMsgBlockForWrite(); + if (newErr != null) + return newErr; + _lmb = newMb; + } + + var projectedSize = _lmb!.RBytes + FileStoreMsgSize(subject, hdr, msg); + if (_fcfg.BlockSize > 0 && projectedSize > _fcfg.BlockSize) + { + var (newMb, newErr) = NewMsgBlockForWrite(); + if (newErr != null) + return newErr; + _lmb = newMb; + } + + try + { + _memStore.StoreRawMsg(subject, hdr, msg, seq, ts, ttl, discardNewCheck); + } + catch (Exception ex) + { + return ex; + } + + _state = _memStore.State(); + _receivedAny = true; + + var record = BuildMessageRecord(subject, hdr, msg, seq, ts); + _lmb!.Mu.EnterWriteLock(); + try + { + _lmb.Mfd ??= new FileStream(_lmb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + _lmb.Mfd.Seek(0, SeekOrigin.End); + _lmb.Mfd.Write(record, 0, record.Length); + if (_fcfg.SyncAlways) + _lmb.Mfd.Flush(true); + + var msgSize = FileStoreMsgSize(subject, hdr, msg); + _lmb.Msgs++; + _lmb.Bytes += msgSize; + _lmb.RBytes += (ulong)record.Length; + _lmb.CBytes = _lmb.Bytes; + _lmb.NeedSync = !_fcfg.SyncAlways; + _lmb.Lwts = ts; + _lmb.Lchk = record.AsSpan(record.Length - FileStoreDefaults.RecordHashSize).ToArray(); + + if (_lmb.First.Seq == 0 || _lmb.Msgs == 1 || seq < _lmb.First.Seq) + _lmb.First = new MsgId { Seq = seq, Ts = ts }; + if (seq >= _lmb.Last.Seq) + _lmb.Last = new MsgId { Seq = seq, Ts = ts }; + + _lmb.Fss ??= new SubjectTree(); + if (!string.IsNullOrEmpty(subject)) + { + var subj = Encoding.UTF8.GetBytes(subject); + var (ss, ok) = _lmb.Fss.Find(subj); + if (ok && ss != null) + { + ss.Msgs++; + if (ss.First == 0 || seq < ss.First) + ss.First = seq; + if (seq > ss.Last) + ss.Last = seq; + ss.LastNeedsUpdate = false; + } + else + { + _lmb.Fss.Insert(subj, new SimpleState + { + Msgs = 1, + First = seq, + Last = seq, + LastNeedsUpdate = false, + }); + } + } + } + finally + { + _lmb.Mu.ExitWriteLock(); + } + + RefreshPsiFromMemStoreLocked(); + EnforceMsgLimit(); + EnforceBytesLimit(); + EnforceMsgPerSubjectLimit(fireCallback: false); + return null; + } + + // Lock should be held. + private void RebuildFirst() + { + _state = _memStore.State(); + if (_state.Msgs == 0) + { + _state.FirstSeq = _state.LastSeq + 1; + _state.FirstTime = default; + return; + } + + var ss = _memStore.FilteredState(_state.FirstSeq, ">"); + if (ss.Msgs == 0 || ss.First == 0) + { + _state.FirstSeq = _state.LastSeq + 1; + _state.FirstTime = default; + return; + } + + _state.FirstSeq = ss.First; + var sm = _memStore.LoadMsg(ss.First, null); + if (sm != null) + _state.FirstTime = FromUnixNanosUtc(sm.Ts); + SelectNextFirst(); + } + + // Lock should be held. + private (ulong Seq, Exception? Error) FirstSeqForSubj(string subj) + { + if (string.IsNullOrEmpty(subj)) + return (0, null); + + try + { + var ss = _memStore.FilteredState(_state.FirstSeq, subj); + return (ss.Msgs > 0 ? ss.First : 0, null); + } + catch (Exception ex) + { + return (0, ex); + } + } + + // Lock should be held. + private void EnforceMsgLimit() + { + if (_cfg.Config.MaxMsgs <= 0) + return; + + var maxMsgs = (ulong)_cfg.Config.MaxMsgs; + while (_state.Msgs > maxMsgs) + { + var (removed, err) = DeleteFirstMsg(); + if (err != null || !removed) + { + RebuildFirst(); + break; + } + } + } + + // Lock should be held. + private void EnforceBytesLimit() + { + if (_cfg.Config.MaxBytes <= 0) + return; + + var maxBytes = (ulong)_cfg.Config.MaxBytes; + while (_state.Bytes > maxBytes) + { + var (removed, err) = DeleteFirstMsg(); + if (err != null || !removed) + { + RebuildFirst(); + break; + } + } + } + + // Lock should be held. + private void EnforceMsgPerSubjectLimit(bool fireCallback) + { + if (_cfg.Config.MaxMsgsPer <= 0) + return; + + var limit = (ulong)_cfg.Config.MaxMsgsPer; + var totals = _memStore.SubjectsTotals(">"); + foreach (var entry in totals.ToArray()) + { + var subj = entry.Key; + var remaining = entry.Value; + while (remaining > limit) + { + var (fseq, seqErr) = FirstSeqForSubj(subj); + if (seqErr != null || fseq == 0) + break; + + var (removed, remErr) = RemoveMsgViaLimits(fseq); + if (remErr != null || !removed) + break; + + remaining--; + } + } + + if (fireCallback) + _state = _memStore.State(); + } + + // Lock should be held. + private (bool Removed, Exception? Error) DeleteFirstMsg() + => RemoveMsgViaLimits(_state.FirstSeq); + + // Lock should be held. + private (bool Removed, Exception? Error) RemoveMsgViaLimits(ulong seq) + => RemoveMsgInternal(seq, secure: false, viaLimits: true, needFsLock: false); + + // Lock should be held. + private ulong RemovePerSubject(string subj) + { + if (_psim == null || string.IsNullOrEmpty(subj)) + return 0; + + var subjBytes = Encoding.UTF8.GetBytes(subj); + var (psi, ok) = _psim.Find(subjBytes); + if (!ok || psi == null) + return 0; + + _psim.Delete(subjBytes); + _tsl = Math.Max(0, _tsl - subj.Length); + return psi.Total; + } + + private (bool Removed, Exception? Error) RemoveMsgInternal(ulong seq, bool secure, bool viaLimits, bool needFsLock) + { + if (needFsLock) + _mu.EnterWriteLock(); + + try + { + if (IsClosed()) + return (false, StoreErrors.ErrStoreClosed); + + var sm = _memStore.LoadMsg(seq, null); + var (removed, err) = secure ? _memStore.EraseMsg(seq) : _memStore.RemoveMsg(seq); + if (err != null || !removed) + return (removed, err); + + _state = _memStore.State(); + + var mb = FindMsgBlockForSeq(seq); + if (mb != null) + { + var (_, blockErr) = RemoveMsgFromBlock(mb, seq, secure, viaLimits, sm); + if (blockErr != null) + return (true, blockErr); + } + + RefreshPsiFromMemStoreLocked(); + if (_state.Msgs == 0) + _firstMoved = true; + return (true, null); + } + finally + { + if (needFsLock) + _mu.ExitWriteLock(); + } + } + + // Lock should be held. + private (bool Removed, Exception? Error) RemoveMsgFromBlock(MessageBlock mb, ulong seq, bool secure, bool viaLimits) + => RemoveMsgFromBlock(mb, seq, secure, viaLimits, null); + + // Lock should be held. + private (bool Removed, Exception? Error) RemoveMsgFromBlock(MessageBlock mb, ulong seq, bool secure, bool viaLimits, StoreMsg? sm) + { + ArgumentNullException.ThrowIfNull(mb); + + mb.Mu.EnterWriteLock(); + try + { + if (seq < mb.First.Seq || seq > mb.Last.Seq) + return (false, null); + + if (mb.Msgs > 0) + mb.Msgs--; + + mb.Dmap.Insert(seq); + + if (sm != null) + { + var size = FileStoreMsgSize(sm.Subject, sm.Hdr, sm.Msg); + if (size <= mb.Bytes) + mb.Bytes -= size; + else + mb.Bytes = 0; + + if (!mb.NoTrack && mb.Fss != null && !string.IsNullOrEmpty(sm.Subject)) + { + var subjBytes = Encoding.UTF8.GetBytes(sm.Subject); + var (ss, ok) = mb.Fss.Find(subjBytes); + if (ok && ss != null) + { + if (ss.Msgs > 0) + ss.Msgs--; + if (ss.Msgs == 0) + mb.Fss.Delete(subjBytes); + else if (seq == ss.First || seq == ss.Last) + ss.LastNeedsUpdate = true; + } + } + } + + mb.SelectNextFirst(); + if (mb.Msgs == 0) + { + mb.First = new MsgId { Seq = mb.Last.Seq + 1, Ts = 0 }; + mb.Last = new MsgId { Seq = mb.First.Seq - 1, Ts = 0 }; + } + + mb.NeedSync = mb.NeedSync || viaLimits || secure; + return (true, null); + } + finally + { + mb.Mu.ExitWriteLock(); + } + } + + // Lock should be held. + private void RemoveMsgsInRange(ulong first, ulong last, bool viaLimits) + { + if (first == 0 || last == 0 || last < first) + return; + + for (var seq = first; seq <= last; seq++) + { + _ = RemoveMsgInternal(seq, secure: false, viaLimits: viaLimits, needFsLock: false); + if (seq == ulong.MaxValue) + break; + } + + RebuildFirst(); + } + + private bool IsClosed() + => _closed; + + // Lock should be held. + private void SelectNextFirst() + { + foreach (var mb in _blks) + { + mb.Mu.EnterWriteLock(); + try + { + mb.SelectNextFirst(); + if (mb.Msgs == 0) + continue; + + _state.FirstSeq = mb.First.Seq; + var sm = _memStore.LoadMsg(_state.FirstSeq, null); + _state.FirstTime = sm == null ? default : FromUnixNanosUtc(sm.Ts); + return; + } + finally + { + mb.Mu.ExitWriteLock(); + } + } + + _state.FirstSeq = _state.LastSeq + 1; + _state.FirstTime = default; + } + + // Lock should be held. + private MessageBlock? FindMsgBlockForSeq(ulong seq) + { + foreach (var mb in _blks) + { + if (seq >= mb.First.Seq && seq <= mb.Last.Seq) + return mb; + } + + return null; + } + + // Lock should be held. + private void RefreshPsiFromMemStoreLocked() + { + _psim ??= new SubjectTree(); + _psim.Reset(); + _tsl = 0; + + var totals = _memStore.SubjectsTotals(">"); + if (totals.Count == 0) + return; + + var fblk = _blks.Count > 0 ? _blks[0].Index : 1U; + var lblk = _lmb?.Index ?? fblk; + + foreach (var kvp in totals) + { + if (kvp.Value == 0) + continue; + + var subject = kvp.Key; + _psim.Insert(Encoding.UTF8.GetBytes(subject), new Psi + { + Total = kvp.Value, + Fblk = fblk, + Lblk = lblk, + }); + _tsl += subject.Length; + } + } + + private static byte[] BuildMessageRecord(string subject, ReadOnlySpan hdr, ReadOnlySpan msg, ulong seq, long ts) + { + var subjectBytes = Encoding.UTF8.GetBytes(subject); + var bodyLength = (8 + 8 + 4 + 4 + 4) + subjectBytes.Length + hdr.Length + msg.Length; + var record = new byte[bodyLength + FileStoreDefaults.RecordHashSize]; + var span = record.AsSpan(); + + var index = 0; + BinaryPrimitives.WriteUInt64LittleEndian(span[index..], seq); + index += 8; + BinaryPrimitives.WriteInt64LittleEndian(span[index..], ts); + index += 8; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], subjectBytes.Length); + index += 4; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], hdr.Length); + index += 4; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], msg.Length); + index += 4; + + subjectBytes.CopyTo(span[index..]); + index += subjectBytes.Length; + hdr.CopyTo(span[index..]); + index += hdr.Length; + msg.CopyTo(span[index..]); + index += msg.Length; + + var hash = SHA256.HashData(record.AsSpan(0, index)); + hash.AsSpan(0, FileStoreDefaults.RecordHashSize).CopyTo(span[index..]); + return record; + } + // ----------------------------------------------------------------------- // Read/query helper methods (Batch 13) // ----------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index 692bd78..33341c4 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -267,6 +267,26 @@ internal sealed class MessageBlock CacheData = null; Fss = null; } + + // Lock should be held. + internal void SelectNextFirst() + { + if (Msgs == 0 || First.Seq == 0 || Last.Seq == 0) + return; + + var seq = First.Seq; + while (seq <= Last.Seq && Dmap.Exists(seq)) + { + if (seq == ulong.MaxValue) + break; + seq++; + } + + if (seq > Last.Seq) + First = new MsgId { Seq = Last.Seq + 1, Ts = 0 }; + else + First = new MsgId { Seq = seq, Ts = First.Ts }; + } } // --------------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index 5192cf3..cf6102a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Diagnostics; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -134,6 +135,47 @@ public sealed partial class ConcurrencyTests1 } } + [Fact] // T:2441 + public void NoRaceJetStreamFileStoreLargeKVAccessTiming_ShouldSucceed() + { + var value = Enumerable.Repeat((byte)'Z', 256).ToArray(); + const int keyCount = 5_000; + + WithStore((fs, _) => + { + for (var i = 1; i <= keyCount; i++) + { + fs.StoreMsg($"KV.STREAM_NAME.{i}", null, value, 0).Seq.ShouldBeGreaterThan(0UL); + } + + var sw = Stopwatch.StartNew(); + var last = fs.LoadLastMsg($"KV.STREAM_NAME.{keyCount}", null); + sw.Stop(); + var lastLookup = sw.Elapsed; + + last.ShouldNotBeNull(); + last!.Msg.ShouldBe(value); + + sw.Restart(); + var first = fs.LoadLastMsg("KV.STREAM_NAME.1", null); + sw.Stop(); + var firstLookup = sw.Elapsed; + + first.ShouldNotBeNull(); + first!.Msg.ShouldBe(value); + + // Keep generous bounds to avoid machine-specific flakiness while still + // asserting access stays fast under a large key set. + lastLookup.ShouldBeLessThan(TimeSpan.FromMilliseconds(250)); + firstLookup.ShouldBeLessThan(TimeSpan.FromMilliseconds(350)); + + var firstState = fs.FilteredState(1, "KV.STREAM_NAME.1"); + var lastState = fs.FilteredState(1, $"KV.STREAM_NAME.{keyCount}"); + firstState.First.ShouldBeGreaterThan(0UL); + lastState.First.ShouldBeGreaterThan(0UL); + }, DefaultStreamConfig()); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs new file mode 100644 index 0000000..550a468 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterLongTests.Impltests.cs @@ -0,0 +1,60 @@ +using System.Diagnostics; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterLongTests +{ + [Fact] // T:1219 + public void LongFileStoreEnforceMsgPerSubjectLimit_ShouldSucceed() + { + var root = Path.Combine(Path.GetTempPath(), $"impl-fs-long-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + try + { + fs = JetStreamFileStore.NewFileStore( + new FileStoreConfig { StoreDir = root, BlockSize = 1024 }, + new StreamConfig + { + Name = "zzz", + Storage = StorageType.FileStorage, + Subjects = ["test.>"], + MaxMsgsPer = 1, + MaxMsgs = -1, + MaxBytes = -1, + Discard = DiscardPolicy.DiscardOld, + Retention = RetentionPolicy.LimitsPolicy, + }); + + const int keys = 4_000; + const int rewriteCount = 30_000; + + for (var i = 0; i < keys; i++) + { + fs.StoreMsg($"test.{i:000000}", null, "seed"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + } + + var sw = Stopwatch.StartNew(); + for (var i = 0; i < rewriteCount; i++) + { + var n = Random.Shared.Next(keys); + fs.StoreMsg($"test.{n:000000}", null, "rewrite"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + } + sw.Stop(); + + var totals = fs.SubjectsTotals("test.*"); + totals.Count.ShouldBe(keys); + totals.Values.All(v => v <= 1UL).ShouldBeTrue(); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(30)); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index 368a098..a0481a0 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -203,6 +203,177 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:592 public void FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed)); + [Fact] // T:549 + public void FileStoreTruncateRemovedBlock_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + fs.StoreMsg("foo", null, "2"u8.ToArray(), 0).Seq.ShouldBe(2UL); + fs.StoreMsg("foo", null, "3"u8.ToArray(), 0).Seq.ShouldBe(3UL); + + var before = fs.State(); + before.Msgs.ShouldBe(3UL); + before.FirstSeq.ShouldBe(1UL); + before.LastSeq.ShouldBe(3UL); + before.NumDeleted.ShouldBe(0); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + var mid = fs.State(); + mid.Msgs.ShouldBe(2UL); + mid.FirstSeq.ShouldBe(1UL); + mid.LastSeq.ShouldBe(3UL); + mid.NumDeleted.ShouldBe(1); + + fs.Truncate(2); + var after = fs.State(); + after.Msgs.ShouldBe(1UL); + after.FirstSeq.ShouldBe(1UL); + after.LastSeq.ShouldBe(2UL); + after.NumDeleted.ShouldBe(1); + }); + } + + [Fact] // T:551 + public void FileStoreRemoveBlockWithStaleStreamState_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + JetStreamFileStore? reopened = null; + try + { + fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var blk1 = CreateBlock(root, 1, "blk1-data"u8.ToArray()); + var blk2 = CreateBlock(root, 2, "blk2-data"u8.ToArray()); + var blk3 = CreateBlock(root, 3, "blk3-data"u8.ToArray()); + + fs.RecoverMsgBlock(1).Index.ShouldBe(1u); + fs.RecoverMsgBlock(2).Index.ShouldBe(2u); + fs.RecoverMsgBlock(3).Index.ShouldBe(3u); + fs.Stop(); + fs = null; + + File.Delete(blk2); + + reopened = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + reopened.RecoverMsgBlock(1).Index.ShouldBe(1u); + reopened.RecoverMsgBlock(3).Index.ShouldBe(3u); + Should.Throw(() => reopened.RecoverMsgBlock(2)); + } + finally + { + reopened?.Stop(); + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:561 + public void FileStoreTombstonesSelectNextFirstCleanup_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 100; i++) + fs.StoreMsg("foo", null, "x"u8.ToArray(), 0); + + for (ulong seq = 2; seq <= 100; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var before = fs.State(); + before.Msgs.ShouldBe(1UL); + before.FirstSeq.ShouldBe(1UL); + before.LastSeq.ShouldBe(100UL); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + var after = fs.State(); + after.Msgs.ShouldBe(0UL); + after.FirstSeq.ShouldBe(101UL); + after.LastSeq.ShouldBe(100UL); + }); + } + + [Fact] // T:572 + public void FileStoreEmptyBlockContainsPriorTombstones_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + fs.StoreMsg("foo", null, "2"u8.ToArray(), 0).Seq.ShouldBe(2UL); + fs.StoreMsg("foo", null, "3"u8.ToArray(), 0).Seq.ShouldBe(3UL); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + + var mid = fs.State(); + mid.Msgs.ShouldBe(1UL); + mid.FirstSeq.ShouldBe(1UL); + mid.LastSeq.ShouldBe(3UL); + mid.Deleted.ShouldContain(2UL); + mid.Deleted.ShouldContain(3UL); + + fs.StoreMsg("foo", null, "4"u8.ToArray(), 0).Seq.ShouldBe(4UL); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + var after = fs.State(); + after.Msgs.ShouldBe(1UL); + after.FirstSeq.ShouldBe(4UL); + after.LastSeq.ShouldBe(4UL); + (after.Deleted ?? Array.Empty()).ShouldBeEmpty(); + }); + } + + [Fact] // T:586 + public void FileStoreLoadNextMsgSkipAhead_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 10; i++) + { + var subj = i % 2 == 0 ? "a" : "c"; + fs.StoreMsg(subj, null, "x"u8.ToArray(), 0); + } + + fs.StoreMsg("b", null, "target"u8.ToArray(), 0).Seq.ShouldBe(11UL); + + var (headSm, headSeq) = fs.LoadNextMsg("b", false, 0, null); + headSm.ShouldNotBeNull(); + headSm!.Subject.ShouldBe("b"); + headSeq.ShouldBe(11UL); + + var (interiorSm, interiorSeq) = fs.LoadNextMsg("b", false, 3, null); + interiorSm.ShouldNotBeNull(); + interiorSm!.Subject.ShouldBe("b"); + interiorSeq.ShouldBe(11UL); + }); + } + + [Fact] // T:587 + public void FileStoreLoadNextMsgMultiSkipAhead_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 10; i++) + { + var subj = i % 2 == 0 ? "a" : "c"; + fs.StoreMsg(subj, null, "x"u8.ToArray(), 0); + } + + fs.StoreMsg("b", null, "target"u8.ToArray(), 0).Seq.ShouldBe(11UL); + + var (headSm, headSeq) = fs.LoadNextMsgMulti(new[] { "b", "d" }, 11, null); + headSm.ShouldNotBeNull(); + headSm!.Subject.ShouldBe("b"); + headSeq.ShouldBe(11UL); + + var (interiorSm, interiorSeq) = fs.LoadNextMsgMulti(new[] { "b", "d" }, 3, null); + interiorSm.ShouldNotBeNull(); + interiorSeq.ShouldBeGreaterThanOrEqualTo(3UL); + }); + } + private static void RunWaveBScenario(string scenario) { switch (scenario) diff --git a/porting.db b/porting.db index bb81d5b..feb072a 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 1e1d91a..0e8d216 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 19:56:14 UTC +Generated: 2026-02-28 20:10:46 UTC ## Modules (12 total) @@ -15,8 +15,8 @@ Generated: 2026-02-28 19:56:14 UTC | complete | 22 | | deferred | 1876 | | n_a | 24 | -| stub | 19 | -| verified | 1732 | +| stub | 1 | +| verified | 1750 | ## Unit Tests (3257 total) @@ -24,8 +24,7 @@ Generated: 2026-02-28 19:56:14 UTC |--------|-------| | deferred | 1787 | | n_a | 241 | -| stub | 8 | -| verified | 1221 | +| verified | 1229 | ## Library Mappings (36 total) @@ -36,4 +35,4 @@ Generated: 2026-02-28 19:56:14 UTC ## Overall Progress -**3252/6942 items complete (46.8%)** +**3278/6942 items complete (47.2%)**