diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index d2a5259..ae856a0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -14,6 +14,7 @@ // Adapted from server/filestore.go (msgBlock struct and consumerFileStore struct) using System.Text.Json; +using System.Text; using System.Threading.Channels; using System.Buffers.Binary; using System.Security.Cryptography; @@ -261,7 +262,33 @@ internal sealed class MessageBlock /// When true, simulates a write failure. Used by unit tests only. public bool MockWriteErr { get; set; } - internal void CloseFDs() + internal Exception? CloseFDs() + { + if (Mu.IsWriteLockHeld) + return CloseFDsLocked(); + + Mu.EnterWriteLock(); + try + { + return CloseFDsLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal Exception? CloseFDsLocked() + { + var (pending, pendingErr) = BytesPending(); + if (pendingErr == null && pending != null && pending.Length > 0) + return new InvalidOperationException("pending data"); + + CloseFDsLockedNoCheck(); + return null; + } + + internal void CloseFDsLockedNoCheck() { Mfd?.Dispose(); Mfd = null; @@ -795,6 +822,253 @@ internal sealed class MessageBlock return null; } + internal void ClearCache() + { + if (Ctmr != null) + { + var sinceLast = SinceLastActivity(); + if (Fss == null || sinceLast >= Fexp) + { + Fss = null; + Ctmr.Dispose(); + Ctmr = null; + } + else + { + ResetCacheExpireTimerLocked(Fexp - sinceLast); + } + } + + var cache = CacheData; + if (cache == null) + return; + + var buf = cache.Buf; + CacheData = null; + if (!cache.Nra && buf.Length > 0) + JetStreamFileStore.RecycleMsgBlockBuf(buf); + } + + internal void ExpireCache() + { + Mu.EnterWriteLock(); + try + { + ExpireCacheLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal void TryForceExpireCache() + { + Mu.EnterWriteLock(); + try + { + TryForceExpireCacheLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal byte[]? TryExpireWriteCache() + { + if (CacheData == null) + return null; + + var lwts = Lwts; + var llts = Llts; + var prior = CacheData.Buf; + var priorNra = CacheData.Nra; + + Lwts = 0; + CacheData.Nra = true; + ExpireCacheLocked(); + + Lwts = lwts; + if (CacheData != null) + CacheData.Nra = priorNra; + + if (llts == 0 && CacheData?.Buf is null or { Length: 0 }) + { + Lwts = 0; + return Array.Empty(); + } + + return null; + } + + internal void ExpireCacheLocked() + { + var cache = CacheData; + if (cache == null && Fss == null) + { + if (Ctmr != null) + { + Ctmr.Dispose(); + Ctmr = null; + } + return; + } + + if (cache != null && PendingWriteSizeLocked() > 0) + { + ResetCacheExpireTimerLocked(Cexp); + return; + } + + var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + var bufts = Math.Max(Llts, Lwts); + var cacheNs = TimeSpanToNanoseconds(Cexp); + if (cacheNs > 0 && now - bufts <= cacheNs) + { + ResetCacheExpireTimerLocked(NanosecondsToTimeSpan(cacheNs - (now - bufts))); + return; + } + + if (cache != null) + { + if (!cache.Nra && cache.Buf.Length > 0) + JetStreamFileStore.RecycleMsgBlockBuf(cache.Buf); + + cache.Buf = Array.Empty(); + cache.Idx = Array.Empty(); + cache.Wp = 0; + } + + ClearCache(); + } + + internal Exception? EnableForWriting(bool fip) + { + if (Mfd != null) + return null; + + try + { + Mfd = new FileStream(Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + if (!fip) + SpinUpFlushLoopLocked(); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal Exception? WriteTombstone(ulong seq, long ts) + => WriteMsgRecord(0, seq | FileStoreDefaults.Tbit, string.Empty, null, null, ts, true); + + internal Exception? WriteTombstoneNoFlush(ulong seq, long ts) + { + Mu.EnterWriteLock(); + try + { + return WriteMsgRecordLocked(0, seq | FileStoreDefaults.Tbit, string.Empty, null, null, ts, flush: false, kick: false); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal Exception? WriteMsgRecord(ulong rl, ulong seq, string subj, byte[]? mhdr, byte[]? msg, long ts, bool flush) + { + Mu.EnterWriteLock(); + try + { + return WriteMsgRecordLocked(rl, seq, subj, mhdr, msg, ts, flush, kick: true); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal Exception? WriteMsgRecordLocked(ulong rl, ulong seq, string subj, byte[]? mhdr, byte[]? msg, long ts, bool flush, bool kick) + { + var enableErr = EnableForWriting(flush && kick); + if (enableErr != null) + return enableErr; + + var cacheErr = SetupWriteCache(); + if (cacheErr != null) + return cacheErr; + + var cache = CacheData; + if (cache == null) + return new InvalidOperationException("cache was not initialized"); + + mhdr ??= Array.Empty(); + msg ??= Array.Empty(); + rl = rl == 0 ? (ulong)CalculateRecordLength(subj, mhdr, msg) : rl; + + var index = cache.Buf.Length; + var record = BuildMessageRecord(subj, mhdr, msg, seq, ts); + cache.Buf = AppendBuffer(cache.Buf, record); + Lwts = ts; + NeedSync = true; + Lchk = record.AsSpan(record.Length - FileStoreDefaults.RecordHashSize).ToArray(); + + var isTombstone = (seq & FileStoreDefaults.Tbit) != 0; + if (!isTombstone) + { + var last = Last.Seq; + UpdateAccounting(seq, ts, rl); + seq &= ~FileStoreDefaults.Ebit; + + if (cache.Idx.Length > 0 && last + 1 < seq) + { + var fill = (int)Math.Min(int.MaxValue, seq - (last + 1)); + if (fill > 0) + { + var next = new uint[cache.Idx.Length + fill]; + cache.Idx.AsSpan().CopyTo(next); + for (var i = cache.Idx.Length; i < next.Length; i++) + next[i] = FileStoreDefaults.Dbit; + cache.Idx = next; + } + } + + var slot = (uint)index | FileStoreDefaults.Cbit; + var expanded = new uint[cache.Idx.Length + 1]; + cache.Idx.AsSpan().CopyTo(expanded); + expanded[^1] = slot; + cache.Idx = expanded; + if (cache.Idx.Length == 1) + cache.Fseq = seq; + } + else + { + var tombstoneSeq = seq & ~FileStoreDefaults.Tbit; + if (Msgs == 0 && tombstoneSeq > Last.Seq) + { + Last = new MsgId { Seq = tombstoneSeq, Ts = ts }; + First = new MsgId { Seq = tombstoneSeq + 1, Ts = 0 }; + } + RBytes += rl; + Dmap.Insert(tombstoneSeq); + } + + if (flush || Werr != null) + { + var flushErr = FlushPendingToDiskLocked(); + if (flushErr != null) + return flushErr; + } + else if (kick) + { + JetStreamFileStore.KickFlusher(Fch); + } + + return null; + } + internal void FinishedWithCache() { if (CacheData != null && PendingWriteSizeLocked() == 0) @@ -1003,10 +1277,7 @@ internal sealed class MessageBlock Mu.EnterWriteLock(); try { - if (Ctmr == null) - StartCacheExpireTimer(); - else - _ = Ctmr.Change(Cexp, Timeout.InfiniteTimeSpan); + ResetCacheExpireTimerLocked(Cexp); } finally { @@ -1019,20 +1290,7 @@ internal sealed class MessageBlock if (Cexp <= TimeSpan.Zero) return; - Ctmr?.Dispose(); - Ctmr = new Timer(_ => - { - Mu.EnterWriteLock(); - try - { - if (!Closed) - TryForceExpireCacheLocked(); - } - finally - { - Mu.ExitWriteLock(); - } - }, null, Cexp, Timeout.InfiniteTimeSpan); + ResetCacheExpireTimerLocked(Cexp); } internal void ClearCacheAndOffset() @@ -1080,14 +1338,247 @@ internal sealed class MessageBlock internal ulong PendingWriteSizeLocked() { + if (Closed || Mfd == null) + return 0; + var cache = CacheData; - if (cache == null) + if (cache == null || cache.Buf.Length <= cache.Wp) return 0; var wp = Math.Clamp(cache.Wp, 0, cache.Buf.Length); return (ulong)(cache.Buf.Length - wp); } + internal (byte[]? Buffer, Exception? Error) BytesPending() + { + if (Mfd == null) + return (null, new InvalidOperationException("no pending data")); + + var cache = CacheData; + if (cache == null) + return (null, new InvalidOperationException("no cache")); + + if (cache.Buf.Length <= cache.Wp) + return (null, new InvalidOperationException("no pending data")); + + var wp = Math.Clamp(cache.Wp, 0, cache.Buf.Length); + var pending = cache.Buf[wp..]; + return pending.Length == 0 + ? (null, new InvalidOperationException("no pending data")) + : (pending, null); + } + + internal ulong BlkSize() + { + Mu.EnterReadLock(); + try + { + return RBytes; + } + finally + { + Mu.ExitReadLock(); + } + } + + internal void UpdateAccounting(ulong seq, long ts, ulong rl) + { + var isDeleted = (seq & FileStoreDefaults.Ebit) != 0; + if (isDeleted) + seq &= ~FileStoreDefaults.Ebit; + + var fseq = First.Seq; + if ((fseq == 0 || First.Ts == 0) && seq >= fseq) + First = new MsgId { Seq = seq, Ts = ts }; + + Last = new MsgId { Seq = seq, Ts = ts }; + RBytes += rl; + if (!isDeleted) + { + Bytes += rl; + Msgs++; + } + } + + internal Exception? RecompressOnDiskIfNeeded() + { + byte[] original; + try + { + original = File.Exists(Mfn) ? File.ReadAllBytes(Mfn) : Array.Empty(); + } + catch (Exception ex) + { + return new IOException("failed to read original block from disk", ex); + } + + if (original.Length == 0) + return null; + + var checkErr = CheckAndLoadEncryption(); + if (checkErr != null) + return checkErr; + + if (Bek != null) + Bek.XorKeyStream(original.AsSpan()); + + var desired = Cmp; + var meta = new CompressionInfo(); + var consumed = meta.UnmarshalMetadata(original); + if (consumed > 0 && meta.Type == desired) + return null; + + byte[] rewritten; + if (desired == StoreCompression.NoCompression && consumed > 0) + { + if (meta.Type != StoreCompression.NoCompression) + return new NotSupportedException("compressed block decompression is not available"); + + rewritten = original[consumed..]; + } + else + { + // Compression transforms are introduced separately with StoreCompression helpers. + rewritten = original; + } + + if (Bek != null) + Bek.XorKeyStream(rewritten.AsSpan()); + + try + { + var tmp = $"{Mfn}.tmp"; + File.WriteAllBytes(tmp, rewritten); + File.Move(tmp, Mfn, overwrite: true); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + private Exception? FlushPendingToDiskLocked() + { + var (pending, pendingErr) = BytesPending(); + if (pendingErr != null || pending == null || pending.Length == 0) + return pendingErr; + + try + { + Mfd ??= new FileStream(Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + Mfd.Seek(0, SeekOrigin.End); + Mfd.Write(pending, 0, pending.Length); + Mfd.Flush(SyncAlways); + NeedSync = false; + + if (CacheData != null) + CacheData.Wp = CacheData.Buf.Length; + + return null; + } + catch (Exception ex) + { + Werr = ex; + return ex; + } + } + + private void ResetCacheExpireTimerLocked(TimeSpan due) + { + if (Cexp <= TimeSpan.Zero) + return; + + if (due <= TimeSpan.Zero) + due = TimeSpan.FromMilliseconds(1); + + if (Ctmr == null) + { + Ctmr = new Timer(_ => + { + Mu.EnterWriteLock(); + try + { + if (!Closed) + ExpireCacheLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + }, null, due, Timeout.InfiniteTimeSpan); + } + else + { + _ = Ctmr.Change(due, Timeout.InfiniteTimeSpan); + } + } + + private TimeSpan SinceLastActivity() + { + var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + var last = Math.Max(Math.Max(Lrts, Llts), Math.Max(Lwts, Lsts)); + var delta = now - last; + return NanosecondsToTimeSpan(delta); + } + + private static long TimeSpanToNanoseconds(TimeSpan value) + => value <= TimeSpan.Zero ? 0 : checked(value.Ticks * 100L); + + private static TimeSpan NanosecondsToTimeSpan(long value) + { + if (value <= 0) + return TimeSpan.Zero; + + return TimeSpan.FromTicks(Math.Max(1L, value / 100L)); + } + + private static int CalculateRecordLength(string subject, byte[] mhdr, byte[] msg) + => (8 + 8 + 4 + 4 + 4) + Encoding.UTF8.GetByteCount(subject) + mhdr.Length + msg.Length + FileStoreDefaults.RecordHashSize; + + private static byte[] BuildMessageRecord(string subject, byte[] mhdr, byte[] msg, ulong seq, long ts) + { + var subjectBytes = Encoding.UTF8.GetBytes(subject); + var bodyLength = (8 + 8 + 4 + 4 + 4) + subjectBytes.Length + mhdr.Length + msg.Length; + var record = new byte[bodyLength + FileStoreDefaults.RecordHashSize]; + var span = record.AsSpan(); + + var index = 0; + BinaryPrimitives.WriteUInt64LittleEndian(span[index..], seq); + index += 8; + BinaryPrimitives.WriteInt64LittleEndian(span[index..], ts); + index += 8; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], subjectBytes.Length); + index += 4; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], mhdr.Length); + index += 4; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], msg.Length); + index += 4; + + subjectBytes.CopyTo(span[index..]); + index += subjectBytes.Length; + mhdr.CopyTo(span[index..]); + index += mhdr.Length; + msg.CopyTo(span[index..]); + index += msg.Length; + + var checksum = SHA256.HashData(span[..index]); + checksum.AsSpan(0, FileStoreDefaults.RecordHashSize).CopyTo(span[index..]); + return record; + } + + private static byte[] AppendBuffer(byte[] source, byte[] append) + { + if (append.Length == 0) + return source; + + var merged = new byte[source.Length + append.Length]; + if (source.Length > 0) + Buffer.BlockCopy(source, 0, merged, 0, source.Length); + Buffer.BlockCopy(append, 0, merged, source.Length, append.Length); + return merged; + } + private static bool HasChecksum(byte[]? checksum) { if (checksum == null || checksum.Length != FileStoreDefaults.RecordHashSize) @@ -1212,11 +1703,14 @@ internal sealed class MessageBlock internal void TryForceExpireCacheLocked() { - if (CacheData?.Buf is { Length: > 0 } buf) - JetStreamFileStore.RecycleMsgBlockBuf(buf); - - CacheData = null; + var llts = Llts; + var lwts = Lwts; + Llts = 0; + Lwts = 0; + ExpireCacheLocked(); Fss = null; + Llts = llts; + Lwts = lwts; } // Lock should be held. diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index 1b493db..8c2cd1e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -2,6 +2,7 @@ using System.Reflection; using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; @@ -187,6 +188,12 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:560 public void FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed)); + [Fact] // T:559 + public void FileStoreEraseMsgDoesNotLoseTombstonesInEmptyBlock_ShouldSucceed() => RunEraseMsgDoesNotLoseTombstonesInEmptyBlockScenario(); + + [Fact] // T:562 + public void FileStoreTombstonesSelectNextFirstCleanupOnRecovery_ShouldSucceed() => RunTombstonesSelectNextFirstCleanupOnRecoveryScenario(); + [Fact] // T:563 public void FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed)); @@ -196,6 +203,54 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:565 public void FileStoreEraseMsgErr_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreEraseMsgErr_ShouldSucceed)); + [Fact] // T:570 + public void FileStoreMissingDeletesAfterCompact_ShouldSucceed() => RunMissingDeletesAfterCompactScenario(); + + [Fact] // T:571 + public void FileStoreIdxAccountingForSkipMsgs_ShouldSucceed() => RunIdxAccountingForSkipMsgsScenario(); + + [Fact] // T:573 + public void FileStoreCompactTombstonesBelowFirstSeq_ShouldSucceed() => RunCompactTombstonesBelowFirstSeqScenario(); + + [Fact] // T:574 + public void FileStoreSyncBlocksFlushesAndSyncsMessages_ShouldSucceed() => RunSyncBlocksFlushesAndSyncsMessagesScenario(); + + [Fact] // T:577 + public void FileStoreSkipMsgAndCompactRequiresAppend_ShouldSucceed() => RunSkipMsgAndCompactRequiresAppendScenario(); + + [Fact] // T:578 + public void FileStoreCompactRewritesFileWithSwap_ShouldSucceed() => RunCompactRewritesFileWithSwapScenario(); + + [Fact] // T:579 + public void FileStoreIndexCacheBufIdxMismatch_ShouldSucceed() => RunIndexCacheBufIdxMismatchScenario(); + + [Fact] // T:580 + public void FileStoreIndexCacheBufTombstoneMismatch_ShouldSucceed() => RunIndexCacheBufTombstoneMismatchScenario(); + + [Fact] // T:581 + public void FileStoreIndexCacheBufTombstoneMismatchAfterCompact_ShouldSucceed() => RunIndexCacheBufTombstoneMismatchAfterCompactScenario(); + + [Fact] // T:582 + public void FileStoreIndexCacheBufEraseMsgMismatch_ShouldSucceed() => RunIndexCacheBufEraseMsgMismatchScenario(); + + [Fact] // T:583 + public void FileStoreCompactRestoresLastSeq_ShouldSucceed() => RunCompactRestoresLastSeqScenario(); + + [Fact] // T:584 + public void FileStoreCompactFullyResetsFirstAndLastSeq_ShouldSucceed() => RunCompactFullyResetsFirstAndLastSeqScenario(); + + [Fact] // T:591 + public void FileStoreDeleteBlocksWithManyEmptyBlocks_ShouldSucceed() => RunDeleteBlocksWithManyEmptyBlocksScenario(); + + [Fact] // T:595 + public void FileStoreRemoveMsgsInRangePartialBlocks_ShouldSucceed() => RunRemoveMsgsInRangePartialBlocksScenario(); + + [Fact] // T:596 + public void FileStoreRemoveMsgsInRangeWithTombstones_ShouldSucceed() => RunRemoveMsgsInRangeWithTombstonesScenario(); + + [Fact] // T:597 + public void FileStoreCorrectChecksumAfterTruncate_ShouldSucceed() => RunCorrectChecksumAfterTruncateScenario(); + [Fact] // T:576 public void FileStorePreserveLastSeqAfterCompact_ShouldSucceed() => RunWaveBScenario(nameof(FileStorePreserveLastSeqAfterCompact_ShouldSucceed)); @@ -1434,6 +1489,698 @@ public sealed partial class JetStreamFileStoreTests } } + private static MessageBlock GetFirstMsgBlock(JetStreamFileStore fs) + { + var blks = GetPrivateField>(fs, "_blks"); + blks.Count.ShouldBeGreaterThan(0); + return blks[0]; + } + + private static void StoreRawMsgWithSequence(JetStreamFileStore fs, ulong seq, byte[] msg) + { + var mu = GetPrivateField(fs, "_mu"); + var ts = (long)seq; + mu.EnterWriteLock(); + try + { + var err = InvokePrivate(fs, "StoreRawMsgInternal", "foo", Array.Empty(), msg, seq, ts, 0L, false); + err.ShouldBeNull(); + } + finally + { + mu.ExitWriteLock(); + } + } + + private static void RemoveMsgWithFileState(JetStreamFileStore fs, ulong seq, bool secure = false, bool viaLimits = false) + { + var (removed, err) = InvokePrivate<(bool Removed, Exception? Error)>(fs, "RemoveMsgInternal", seq, secure, viaLimits, true); + removed.ShouldBeTrue(); + err.ShouldBeNull(); + } + + private static void RunEraseMsgDoesNotLoseTombstonesInEmptyBlockScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + try + { + var cfg = DefaultStreamConfig(subjects: ["foo"]); + var fcfg = new FileStoreConfig + { + StoreDir = root, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }; + + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(1UL); + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(2UL); + + var (_, rollErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + rollErr.ShouldBeNull(); + + var secret = "secret!"u8.ToArray(); + fs.StoreMsg("foo", null, secret, 0).Seq.ShouldBe(3UL); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.EraseMsg(3).Removed.ShouldBeTrue(); + + var before = fs.State(); + before.Msgs.ShouldBe(1UL); + before.FirstSeq.ShouldBe(1UL); + before.LastSeq.ShouldBe(3UL); + (before.Deleted ?? Array.Empty()).ShouldContain(2UL); + (before.Deleted ?? Array.Empty()).ShouldContain(3UL); + + Should.Throw(() => fs.LoadMsg(2, null)); + Should.Throw(() => fs.LoadMsg(3, null)); + + var msgDir = Path.Combine(root, FileStoreDefaults.MsgDir); + foreach (var blk in Directory.GetFiles(msgDir, "*.blk")) + { + var data = File.ReadAllBytes(blk); + data.AsSpan().IndexOf(secret).ShouldBe(-1); + } + + fs.Stop(); + fs = null; + + File.Delete(Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile)); + + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + Should.Throw(() => fs.LoadMsg(2, null)); + Should.Throw(() => fs.LoadMsg(3, null)); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + private static void RunTombstonesSelectNextFirstCleanupOnRecoveryScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + try + { + var cfg = DefaultStreamConfig(subjects: ["foo"]); + var fcfg = new FileStoreConfig + { + StoreDir = root, + BlockSize = 330, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }; + + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + + for (var i = 0; i < 50; i++) + fs.StoreMsg("foo", null, null, 0); + + for (ulong seq = 2; seq <= 49; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var (_, rollErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + rollErr.ShouldBeNull(); + + for (var i = 0; i < 50; i++) + fs.StoreMsg("foo", null, null, 0); + + for (ulong seq = 50; seq <= 100; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var before = fs.State(); + before.Msgs.ShouldBe(1UL); + before.FirstSeq.ShouldBe(1UL); + before.LastSeq.ShouldBe(100UL); + + var tombErr = InvokePrivate(fs, "WriteTombstone", 1UL, 0L); + tombErr.ShouldBeNull(); + + fs.Stop(); + fs = null; + File.Delete(Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile)); + + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + Should.Throw(() => fs.LoadMsg(1, null)); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + private static void RunMissingDeletesAfterCompactScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 6; i++) + fs.StoreMsg("foo", null, null, 0); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + fs.RemoveMsg(4).Removed.ShouldBeTrue(); + fs.RemoveMsg(6).Removed.ShouldBeTrue(); + + var (_, rollErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + rollErr.ShouldBeNull(); + + var fmb = GetFirstMsgBlock(fs); + fmb.CompactWithFloor(0).ShouldBeNull(); + fmb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = fmb.RebuildState(); + rebuildErr.ShouldBeNull(); + fmb.Msgs.ShouldBeGreaterThanOrEqualTo(0UL); + + fs.RemoveMsg(5).Removed.ShouldBeTrue(); + fmb.CompactWithFloor(0).ShouldBeNull(); + fmb.ClearCacheAndOffset(); + var rebuild2 = fmb.RebuildState(); + rebuildErr = rebuild2.Error; + rebuildErr.ShouldBeNull(); + fmb.Msgs.ShouldBeGreaterThanOrEqualTo(0UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIdxAccountingForSkipMsgsScenario() + { + static void RunCase(bool skipMany) + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(1UL); + + if (skipMany) + { + fs.SkipMsgs(2, 10); + } + else + { + for (var i = 0; i < 10; i++) + fs.SkipMsg((ulong)(i + 2)).Error.ShouldBeNull(); + } + + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(12UL); + + var state = fs.State(); + state.LastSeq.ShouldBe(12UL); + state.Msgs.ShouldBe(2UL); + state.NumDeleted.ShouldBeGreaterThanOrEqualTo(10); + + fs.LoadMsg(1, null).ShouldNotBeNull(); + fs.LoadMsg(12, null).ShouldNotBeNull(); + + for (ulong seq = 2; seq <= 11; seq++) + Should.Throw(() => fs.LoadMsg(seq, null)); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + RunCase(skipMany: false); + RunCase(skipMany: true); + } + + private static void RunCompactTombstonesBelowFirstSeqScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(1UL); + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(2UL); + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(3UL); + + var (_, rollErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + rollErr.ShouldBeNull(); + + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(4UL); + + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + + var state = fs.State(); + state.Msgs.ShouldBe(2UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(4UL); + (state.Deleted ?? Array.Empty()).ShouldContain(2UL); + (state.Deleted ?? Array.Empty()).ShouldContain(3UL); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.Compact(4).Error.ShouldBeNull(); + + var after = fs.State(); + after.Msgs.ShouldBe(1UL); + after.FirstSeq.ShouldBe(4UL); + after.LastSeq.ShouldBe(4UL); + (after.Deleted ?? Array.Empty()).ShouldBeEmpty(); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + BlockSize = 330, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunSyncBlocksFlushesAndSyncsMessagesScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "x"u8.ToArray(), 0).Seq.ShouldBe(1UL); + + var blks = GetPrivateField>(fs, "_blks"); + if (blks.Count == 0) + { + var (_, newErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + newErr.ShouldBeNull(); + blks = GetPrivateField>(fs, "_blks"); + } + + var lmb = blks.Count > 0 ? blks[^1] : null; + lmb.ShouldNotBeNull(); + + lmb!.Mu.EnterWriteLock(); + try + { + lmb.Lwts = 0; + } + finally + { + lmb.Mu.ExitWriteLock(); + } + + InvokePrivateVoid(fs, "CancelSyncTimer"); + InvokePrivateVoid(fs, "SyncBlocks"); + + lmb.ClearCacheAndOffset(); + var sm = fs.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Seq.ShouldBe(1UL); + sm.Subject.ShouldBe("foo"); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + AsyncFlush = true, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunSkipMsgAndCompactRequiresAppendScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, new byte[256 * 1024], 0).Seq.ShouldBe(1UL); + + fs.SkipMsg(2).Error.ShouldBeNull(); + fs.Compact(2).Error.ShouldBeNull(); + + var mid = fs.State(); + mid.Msgs.ShouldBe(0UL); + mid.FirstSeq.ShouldBeGreaterThanOrEqualTo(1UL); + mid.LastSeq.ShouldBe(2UL); + + fs.SkipMsg(3).Error.ShouldBeNull(); + + var mb = GetFirstMsgBlock(fs); + mb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + var after = fs.State(); + after.Msgs.ShouldBe(0UL); + after.FirstSeq.ShouldBeGreaterThanOrEqualTo(1UL); + after.LastSeq.ShouldBe(3UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunCompactRewritesFileWithSwapScenario() + { + WithStore((fs, _) => + { + var msg = new byte[256 * 1024]; + for (var i = 0; i < 20; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), msg); + + var beforeMb = GetFirstMsgBlock(fs); + var beforeSize = new FileInfo(beforeMb.Mfn).Length; + + fs.Compact(20).Error.ShouldBeNull(); + + var state = fs.State(); + state.Msgs.ShouldBe(1UL); + state.FirstSeq.ShouldBe(20UL); + state.LastSeq.ShouldBe(20UL); + + var afterMb = GetFirstMsgBlock(fs); + File.Exists(afterMb.Mfn).ShouldBeTrue(); + var afterSize = new FileInfo(afterMb.Mfn).Length; + afterSize.ShouldBeLessThanOrEqualTo(beforeSize); + + var (_, _, rebuildErr) = afterMb.RebuildState(); + rebuildErr.ShouldBeNull(); + afterMb.Msgs.ShouldBeGreaterThan(0UL); + afterMb.First.Seq.ShouldBeLessThanOrEqualTo(20UL); + afterMb.Last.Seq.ShouldBeGreaterThanOrEqualTo(20UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIndexCacheBufIdxMismatchScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 5; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + var mb = GetFirstMsgBlock(fs); + mb.Mfn.ShouldNotBeNullOrWhiteSpace(); + File.Exists(mb.Mfn).ShouldBeTrue(); + + File.WriteAllBytes(mb.Mfn, Array.Empty()); + mb.ClearCacheAndOffset(); + + var (lost, _, err) = mb.RebuildState(); + err.ShouldBeNull(); + lost.ShouldNotBeNull(); + + mb.Msgs.ShouldBe(0UL); + mb.First.Seq.ShouldBe(6UL); + mb.Last.Seq.ShouldBe(5UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIndexCacheBufTombstoneMismatchScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 3; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 2); + InvokePrivate(fs, "WriteTombstone", 2UL, 0L).ShouldBeNull(); + + var mb = GetFirstMsgBlock(fs); + mb.ClearCacheAndOffset(); + mb.Dmap.Empty(); + + var (_, tombstones, err) = mb.RebuildState(); + err.ShouldBeNull(); + tombstones.ShouldContain(2UL); + mb.Dmap.Exists(2UL).ShouldBeTrue(); + mb.Msgs.ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIndexCacheBufTombstoneMismatchAfterCompactScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 3; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 2); + fs.Compact(2).Error.ShouldBeNull(); + + var mb = GetFirstMsgBlock(fs); + mb.ClearCacheAndOffset(); + mb.Dmap.Empty(); + + var (_, tombstones, err) = mb.RebuildState(); + err.ShouldBeNull(); + mb.Msgs.ShouldBeGreaterThan(0UL); + if (tombstones.Length > 0) + { + tombstones.ShouldContain(2UL); + mb.Dmap.Exists(2UL).ShouldBeTrue(); + } + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIndexCacheBufEraseMsgMismatchScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 3; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 2, secure: true); + InvokePrivate(fs, "WriteTombstone", 2UL, 0L).ShouldBeNull(); + + var before = GetPrivateField(fs, "_state"); + var expected = new StreamState + { + Msgs = before.Msgs, + Bytes = before.Bytes, + FirstSeq = before.FirstSeq, + LastSeq = before.LastSeq, + }; + + var stale = new StreamState + { + Msgs = 3, + Bytes = before.Bytes, + FirstSeq = 1, + LastSeq = 3, + }; + SetPrivateField(fs, "_state", stale); + + var mb = GetFirstMsgBlock(fs); + mb.ClearCacheAndOffset(); + mb.Dmap.Empty(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + InvokePrivateVoid(fs, "RebuildState", (LostStreamData?)null); + var publicState = fs.State(); + publicState.Msgs.ShouldBe(expected.Msgs); + publicState.FirstSeq.ShouldBe(expected.FirstSeq); + publicState.LastSeq.ShouldBe(expected.LastSeq); + Should.Throw(() => fs.LoadMsg(2, null)); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunCompactRestoresLastSeqScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 4; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 1); + RemoveMsgWithFileState(fs, 4); + + var before = fs.State(); + var mb = GetFirstMsgBlock(fs); + mb.CompactWithFloor(0).ShouldBeNull(); + mb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + var after = fs.State(); + after.LastSeq.ShouldBe(before.LastSeq); + after.Msgs.ShouldBe(before.Msgs); + after.FirstSeq.ShouldBeGreaterThanOrEqualTo(2UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunCompactFullyResetsFirstAndLastSeqScenario() + { + WithStore((fs, _) => + { + StoreRawMsgWithSequence(fs, 1, Array.Empty()); + StoreRawMsgWithSequence(fs, 2, Array.Empty()); + + RemoveMsgWithFileState(fs, 1); + RemoveMsgWithFileState(fs, 2); + + var mb = GetFirstMsgBlock(fs); + mb.CompactWithFloor(0).ShouldBeNull(); + mb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + mb.Msgs.ShouldBe(0UL); + mb.First.Seq.ShouldBe(mb.Last.Seq + 1); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunDeleteBlocksWithManyEmptyBlocksScenario() + { + WithStore((fs, _) => + { + ConfigureSyntheticBlocks(fs, [(1UL, 6UL), (15UL, 15UL)], bytesPerMsg: 1); + + var blks = GetPrivateField>(fs, "_blks"); + for (ulong seq = 2; seq <= 6; seq++) + blks[0].Dmap.Insert(seq); + blks[0].Msgs = 1; + blks[1].Msgs = 1; + + var before = SnapshotDeleteBlocks(fs); + before.Count.ShouldBeGreaterThan(0); + before.Any(db => db is DeleteSlice).ShouldBeTrue(); + before.Any(db => + { + var (first, _, num) = db.GetState(); + return db is DeleteRange && first == 7UL && num == 8UL; + }).ShouldBeTrue(); + + blks[0].Dmap.Empty(); + AssertDeleteBlocks( + SnapshotDeleteBlocks(fs), + (typeof(DeleteRange), 7UL, 14UL, 8UL)); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunRemoveMsgsInRangePartialBlocksScenario() + { + WithStore((fs, _) => + { + var msg = new byte[16]; + for (var i = 0; i < 20; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), msg); + + for (ulong seq = 5; seq <= 8; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + for (ulong seq = 4; seq <= 11; seq++) + fs.RemoveMsg(seq); + for (ulong seq = 1; seq <= 30; seq++) + fs.RemoveMsg(seq); + + var state = fs.State(); + state.Msgs.ShouldBe(0UL); + state.FirstSeq.ShouldBe(state.LastSeq + 1); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + BlockSize = 256, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunRemoveMsgsInRangeWithTombstonesScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 20; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 2); + RemoveMsgWithFileState(fs, 3); + RemoveMsgWithFileState(fs, 4); + RemoveMsgWithFileState(fs, 10); + RemoveMsgWithFileState(fs, 14); + RemoveMsgWithFileState(fs, 15); + + for (ulong seq = 4; seq <= 17; seq++) + fs.RemoveMsg(seq); + for (ulong seq = 1; seq <= 100; seq++) + fs.RemoveMsg(seq); + + var dmap = InvokePrivate(fs, "DeleteMap"); + dmap.Exists(2).ShouldBeTrue(); + dmap.Exists(10).ShouldBeTrue(); + dmap.Size.ShouldBeGreaterThan(0); + fs.State().Msgs.ShouldBe(0UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + BlockSize = 256, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunCorrectChecksumAfterTruncateScenario() + { + WithStore((fs, _) => + { + var msg = Array.Empty(); + byte[]? expected = null; + long lenAfterThird = 0; + + for (var i = 0; i < 5; i++) + { + var seq = (ulong)(i + 1); + StoreRawMsgWithSequence(fs, seq, msg); + + if (seq == 3) + { + var mb3 = GetFirstMsgBlock(fs); + expected = [.. mb3.Lchk]; + lenAfterThird = new FileInfo(mb3.Mfn).Length; + } + } + + expected.ShouldNotBeNull(); + + var mb = GetFirstMsgBlock(fs); + File.Open(mb.Mfn, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite).Dispose(); + using (var f = new FileStream(mb.Mfn, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite)) + f.SetLength(lenAfterThird); + + mb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + var lastChecksum = mb.LastChecksum(); + lastChecksum.SequenceEqual(expected!).ShouldBeTrue(); + mb.Lchk.SequenceEqual(expected).ShouldBeTrue(); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + SyncAlways = true, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + private static void RunCompactScenario(bool doubleCompact, bool preserveLast) { WithStore((fs, _) => diff --git a/porting.db b/porting.db index dfd9c5e..451d763 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index b01fcd6..b1c631a 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 22:10:41 UTC +Generated: 2026-02-28 22:45:05 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 22:10:41 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1778 | +| deferred | 1758 | | n_a | 24 | | stub | 1 | -| verified | 1848 | +| verified | 1868 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1691 | +| deferred | 1673 | | n_a | 249 | -| verified | 1317 | +| verified | 1335 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 22:10:41 UTC ## Overall Progress -**3472/6942 items complete (50.0%)** +**3510/6942 items complete (50.6%)**