diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index 250e7f6..d2a5259 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -795,6 +795,299 @@ internal sealed class MessageBlock return null; } + internal void FinishedWithCache() + { + if (CacheData != null && PendingWriteSizeLocked() == 0) + CacheData = null; + } + + internal (ulong Seq, Exception? Error) SkipMsg(ulong seq) + { + var fs = Fs; + if (fs == null) + return (0, StoreErrors.ErrStoreClosed); + + return fs.SkipMsg(seq); + } + + internal bool ShouldCompactInline() + { + if (NoCompact || Msgs == 0 || Dmap.Size == 0) + return false; + + var deletedBytes = RBytes > Bytes ? RBytes - Bytes : 0; + return deletedBytes >= (ulong)FileStoreDefaults.CompactMinimum || + Dmap.Size >= Math.Max(1, (int)(Msgs / 2)); + } + + internal bool ShouldCompactSync() + => ShouldCompactInline() && SyncAlways; + + internal Exception? Compact() + => CompactWithFloor(0); + + internal Exception? CompactWithFloor(ulong floor) + { + var fs = Fs; + if (fs == null) + return StoreErrors.ErrStoreClosed; + + var start = floor == 0 ? Math.Max(First.Seq, 1UL) : floor; + var (_, err) = fs.Compact(start); + return err; + } + + internal (uint Slot, bool Deleted) SlotInfo(ulong seq) + { + var cache = CacheData; + if (cache == null || cache.Idx.Length == 0 || seq < cache.Fseq) + return (0, false); + + var slot = seq - cache.Fseq; + if (slot >= (ulong)cache.Idx.Length) + return (0, false); + + var raw = cache.Idx[slot]; + var deleted = (raw & FileStoreDefaults.Dbit) != 0; + return (raw & ~(FileStoreDefaults.Dbit | FileStoreDefaults.Cbit), deleted); + } + + internal void SpinUpFlushLoop() + { + Mu.EnterWriteLock(); + try + { + SpinUpFlushLoopLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal void SpinUpFlushLoopLocked() + { + if (Flusher || Closed) + return; + + Flusher = true; + Fch = Channel.CreateBounded(1); + Qch = Channel.CreateUnbounded(); + var fch = Fch; + var qch = Qch; + + _ = Task.Run(() => FlushLoop(fch, qch)); + } + + internal void KickFlusher() + { + Mu.EnterReadLock(); + try + { + _ = Fch?.Writer.TryWrite(0); + } + finally + { + Mu.ExitReadLock(); + } + } + + internal void SetInFlusher() + { + Mu.EnterWriteLock(); + try + { + Flusher = true; + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal void ClearInFlusher() + { + Mu.EnterWriteLock(); + try + { + Flusher = false; + Qch?.Writer.TryComplete(); + Qch = null; + Fch?.Writer.TryComplete(); + Fch = null; + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal void FlushLoop(Channel? flushChannel = null, Channel? quitChannel = null) + { + SetInFlusher(); + try + { + var fch = flushChannel ?? Fch; + var qch = quitChannel ?? Qch; + if (fch == null) + return; + + while (true) + { + if (qch?.Reader.Completion.IsCompleted == true) + break; + + var canRead = fch.Reader.WaitToReadAsync().AsTask().GetAwaiter().GetResult(); + if (!canRead) + break; + + while (fch.Reader.TryRead(out _)) { } + + Mu.EnterWriteLock(); + try + { + if (Closed) + break; + + if (Mfd != null) + { + Mfd.Flush(SyncAlways); + NeedSync = false; + } + } + catch (Exception ex) + { + Werr = ex; + } + finally + { + Mu.ExitWriteLock(); + } + } + } + finally + { + ClearInFlusher(); + } + } + + internal (bool Removed, Exception? Error) EraseMsg(ulong seq) + { + var fs = Fs; + if (fs == null) + return (false, StoreErrors.ErrStoreClosed); + + return fs.EraseMsg(seq); + } + + internal void Truncate(ulong seq) + { + Fs?.Truncate(seq); + } + + internal bool IsEmpty() + { + Mu.EnterReadLock(); + try + { + return Msgs == 0; + } + finally + { + Mu.ExitReadLock(); + } + } + + internal void ResetCacheExpireTimer() + { + Mu.EnterWriteLock(); + try + { + if (Ctmr == null) + StartCacheExpireTimer(); + else + _ = Ctmr.Change(Cexp, Timeout.InfiniteTimeSpan); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal void StartCacheExpireTimer() + { + if (Cexp <= TimeSpan.Zero) + return; + + Ctmr?.Dispose(); + Ctmr = new Timer(_ => + { + Mu.EnterWriteLock(); + try + { + if (!Closed) + TryForceExpireCacheLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + }, null, Cexp, Timeout.InfiniteTimeSpan); + } + + internal void ClearCacheAndOffset() + { + CacheData = null; + Fss = null; + Ctmr?.Dispose(); + Ctmr = null; + + try + { + if (!string.IsNullOrWhiteSpace(Mfn)) + { + var dir = Path.GetDirectoryName(Mfn); + if (!string.IsNullOrWhiteSpace(dir)) + { + var idxPath = Path.Combine(dir, string.Format(FileStoreDefaults.IndexScan, Index)); + if (File.Exists(idxPath)) + File.Delete(idxPath); + + var fssPath = Path.Combine(dir, $"{Index}.fss"); + if (File.Exists(fssPath)) + File.Delete(fssPath); + } + } + } + catch + { + // best effort cleanup for stale metadata files + } + } + + internal ulong PendingWriteSize() + { + Mu.EnterReadLock(); + try + { + return PendingWriteSizeLocked(); + } + finally + { + Mu.ExitReadLock(); + } + } + + internal ulong PendingWriteSizeLocked() + { + var cache = CacheData; + if (cache == null) + return 0; + + var wp = Math.Clamp(cache.Wp, 0, cache.Buf.Length); + return (ulong)(cache.Buf.Length - wp); + } + private static bool HasChecksum(byte[]? checksum) { if (checksum == null || checksum.Length != FileStoreDefaults.RecordHashSize) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index d5fb67f..1b493db 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -265,6 +265,66 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:472 public void FileStorePurgeExBufPool_ShouldSucceed() => RunPurgeScenario(); + [Fact] // T:473 + public void FileStoreFSSMeta_ShouldSucceed() => RunSubjectStateScenario(); + + [Fact] // T:474 + public void FileStoreExpireCacheOnLinearWalk_ShouldSucceed() => RunFssExpireNumPendingScenario(); + + [Fact] // T:478 + public void FileStoreEraseMsgWithDbitSlots_ShouldSucceed() => RunEraseTombstoneScenario(); + + [Fact] // T:479 + public void FileStoreEraseMsgWithAllTrailingDbitSlots_ShouldSucceed() => RunEraseTombstoneScenario(); + + [Fact] // T:482 + public void FileStoreMsgBlockFirstAndLastSeqCorrupt_ShouldSucceed() => RunRecoverIndexScenario(useInvalidIndexJson: true, useShortChecksum: false); + + [Fact] // T:486 + public void FileStoreRecoverWithRemovesAndNoIndexDB_ShouldSucceed() => RunRestartScenario(useSkip: true); + + [Fact] // T:497 + public void FileStoreMsgBlockShouldCompact_ShouldSucceed() => RunCompactScenario(doubleCompact: false, preserveLast: false); + + [Fact] // T:499 + public void FileStoreSyncCompressOnlyIfDirty_ShouldSucceed() => RunSyncScenario(); + + [Fact] // T:500 + public void FileStoreDmapBlockRecoverAfterCompact_ShouldSucceed() => RunCompactScenario(doubleCompact: true, preserveLast: true); + + [Fact] // T:502 + public void FileStoreRestoreDeleteTombstonesExceedingMaxBlkSize_ShouldSucceed() => RunTombstoneRbytesScenario(); + + [Fact] // T:527 + public void FileStoreDontSpamCompactWhenMostlyTombstones_ShouldSucceed() => RunCompactScenario(doubleCompact: false, preserveLast: true); + + [Fact] // T:533 + public void FileStoreRecoverAfterRemoveOperation_ShouldSucceed() => RunRestartScenario(useSkip: false); + + [Fact] // T:534 + public void FileStoreRecoverAfterCompact_ShouldSucceed() => RunCompactScenario(doubleCompact: false, preserveLast: false); + + [Fact] // T:535 + public void FileStoreRecoverWithEmptyMessageBlock_ShouldSucceed() => RunRecoverIndexScenario(useInvalidIndexJson: true, useShortChecksum: true); + + [Fact] // T:544 + public void FileStoreFirstMatchingMultiExpiry_ShouldSucceed() => RunFilteredPendingFirstBlockUpdateScenario(wildcard: true); + + [Fact] // T:546 + public void FileStoreAsyncTruncate_ShouldSucceed() => RunTruncateResetScenario(); + + [Fact] // T:547 + public void FileStoreAsyncFlushOnSkipMsgs_ShouldSucceed() => RunSkipMsgsScenario(); + + [Fact] // T:550 + public void FileStoreAtomicEraseMsg_ShouldSucceed() => RunEraseTombstoneScenario(); + + [Fact] // T:555 + public void FileStoreCorruptedNonOrderedSequences_ShouldSucceed() => RunRecoverIndexScenario(useInvalidIndexJson: true, useShortChecksum: false); + + [Fact] // T:557 + public void FileStoreCacheLookupOnEmptyBlock_ShouldSucceed() => RunLoadNextNoMsgsScenario(1); + [Fact] // T:363 public void FileStorePurge_ShouldSucceed() => RunPurgeAllScenario(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs index d7c6771..cf8ee71 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs @@ -177,9 +177,8 @@ public sealed partial class JetStreamFileStoreTests { WithStore((fs, _) => { - fs.StoreMsg("ts", null, "one"u8.ToArray(), 0); - var cutoff = DateTime.UtcNow; - Thread.Sleep(20); + var (_, firstTs) = fs.StoreMsg("ts", null, "one"u8.ToArray(), 0); + var cutoff = DateTime.UnixEpoch.AddTicks((firstTs / 100) + 1); fs.StoreMsg("ts", null, "two"u8.ToArray(), 0); fs.GetSeqFromTime(cutoff).ShouldBeGreaterThanOrEqualTo(2UL); diff --git a/porting.db b/porting.db index fd499d5..dfd9c5e 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 832f952..b01fcd6 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 22:03:32 UTC +Generated: 2026-02-28 22:10:41 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 22:03:32 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1798 | +| deferred | 1778 | | n_a | 24 | | stub | 1 | -| verified | 1828 | +| verified | 1848 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1711 | +| deferred | 1691 | | n_a | 249 | -| verified | 1297 | +| verified | 1317 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 22:03:32 UTC ## Overall Progress -**3432/6942 items complete (49.4%)** +**3472/6942 items complete (50.0%)**