diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index a9e63d9..283cae6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -257,6 +257,217 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable : FileStoreDefaults.DefaultMediumBlockSize; } + internal LostStreamData? LostData() + { + _mu.EnterReadLock(); + try + { + if (_ld == null) + return null; + + return new LostStreamData + { + Msgs = [.. _ld.Msgs], + Bytes = _ld.Bytes, + }; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void AddLostData(LostStreamData? ld) + { + _mu.EnterWriteLock(); + try + { + AddLostDataLocked(ld); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RemoveFromLostData(ulong seq) + { + _mu.EnterWriteLock(); + try + { + if (_ld == null) + return; + + var index = Array.IndexOf(_ld.Msgs, seq); + if (index < 0) + return; + + var msgs = new ulong[_ld.Msgs.Length - 1]; + if (index > 0) + Array.Copy(_ld.Msgs, 0, msgs, 0, index); + if (index < _ld.Msgs.Length - 1) + Array.Copy(_ld.Msgs, index + 1, msgs, index, _ld.Msgs.Length - index - 1); + + if (msgs.Length == 0) + _ld = null; + else + _ld.Msgs = msgs; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RebuildState(LostStreamData? ld) + { + _mu.EnterWriteLock(); + try + { + RebuildStateLocked(ld); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RebuildStateLocked(LostStreamData? ld) + { + AddLostDataLocked(ld); + + _state.Msgs = 0; + _state.Bytes = 0; + _state.FirstSeq = 0; + _state.LastSeq = 0; + _state.FirstTime = default; + _state.LastTime = default; + + foreach (var mb in _blks) + { + mb.Mu.EnterReadLock(); + try + { + _state.Msgs += mb.Msgs; + _state.Bytes += mb.Bytes; + + var firstSeq = mb.First.Seq; + if (_state.FirstSeq == 0 || (firstSeq < _state.FirstSeq && mb.First.Ts != 0)) + { + _state.FirstSeq = firstSeq; + _state.FirstTime = mb.First.Ts == 0 ? default : FromUnixNanosUtc(mb.First.Ts); + } + + // Preserve the highest last-seq timestamp even when a block's terminal record is erased. + var lastSeq = mb.Last.Seq; + if (lastSeq >= _state.LastSeq) + { + _state.LastSeq = lastSeq; + _state.LastTime = mb.Last.Ts == 0 ? default : FromUnixNanosUtc(mb.Last.Ts); + } + } + finally + { + mb.Mu.ExitReadLock(); + } + } + + _state.Lost = _ld == null + ? null + : new LostStreamData { Msgs = [.. _ld.Msgs], Bytes = _ld.Bytes }; + } + + internal static void UpdateTrackingState(StreamState state, MessageBlock mb) + { + ArgumentNullException.ThrowIfNull(state); + ArgumentNullException.ThrowIfNull(mb); + + var first = mb.First.Seq; + var last = mb.Last.Seq; + + if (state.FirstSeq == 0) + state.FirstSeq = first; + else if (first < state.FirstSeq && mb.First.Ts != 0) + state.FirstSeq = first; + + if (last > state.LastSeq) + state.LastSeq = last; + + state.Msgs += mb.Msgs; + state.Bytes += mb.Bytes; + } + + internal static bool TrackingStatesEqual(StreamState fs, StreamState mb) + { + ArgumentNullException.ThrowIfNull(fs); + ArgumentNullException.ThrowIfNull(mb); + + // Brand-new state may use FirstSeq=0 while tracked block state starts at 1. + if ((fs.FirstSeq > 1 && mb.FirstSeq > 1) || mb.FirstSeq > 1) + return fs.Msgs == mb.Msgs && fs.FirstSeq == mb.FirstSeq && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes; + + return fs.Msgs == mb.Msgs && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes; + } + + internal static List? CopyMsgBlocks(List? src) + { + if (src == null) + return null; + + return [.. src]; + } + + internal static void KickFlusher(Channel? fch) + { + if (fch == null) + return; + + _ = fch.Writer.TryWrite(0); + } + + internal static ulong FileStoreMsgSizeRaw(int slen, int hlen, int mlen) + { + if (hlen == 0) + return (ulong)(22 + slen + mlen + 8); + + return (ulong)(22 + slen + 4 + hlen + mlen + 8); + } + + internal static ulong FileStoreMsgSize(string subj, byte[]? hdr, byte[]? msg) + => FileStoreMsgSizeRaw(subj.Length, hdr?.Length ?? 0, msg?.Length ?? 0); + + internal static ulong FileStoreMsgSizeEstimate(int slen, int maxPayload) + => (ulong)(30 + slen + 4 + maxPayload); + + internal static Exception? CheckNewHeader(byte[]? hdr) + { + if (hdr == null || hdr.Length < 2) + return new InvalidDataException("corrupt state"); + + if (hdr[0] != FileStoreDefaults.FileStoreMagic || + (hdr[1] != FileStoreDefaults.FileStoreVersion && hdr[1] != FileStoreDefaults.NewVersion)) + { + return new InvalidDataException("corrupt state"); + } + + return null; + } + + internal static bool SubjectsEqual(string a, string b) => a == b; + + internal static bool SubjectsAll(string a, string b) => true; + + internal static Func CompareFn(string subject) + { + if (string.IsNullOrEmpty(subject) || subject == ">") + return SubjectsAll; + + if (SubscriptionIndex.SubjectHasWildcard(subject)) + return SubscriptionIndex.SubjectIsSubsetMatch; + + return SubjectsEqual; + } + internal static AeadCipher GenEncryptionKey(StoreCipher sc, byte[] seed) { ArgumentNullException.ThrowIfNull(seed); @@ -302,6 +513,50 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable }; } + private void AddLostDataLocked(LostStreamData? ld) + { + if (ld == null) + return; + + if (_ld != null) + { + var known = new HashSet(_ld.Msgs); + var merged = new List(_ld.Msgs); + var added = false; + + foreach (var seq in ld.Msgs) + { + if (known.Add(seq)) + { + merged.Add(seq); + added = true; + } + } + + if (added) + { + merged.Sort(); + _ld.Msgs = [.. merged]; + _ld.Bytes += ld.Bytes; + } + } + else + { + _ld = new LostStreamData + { + Msgs = [.. ld.Msgs], + Bytes = ld.Bytes, + }; + } + } + + private static DateTime FromUnixNanosUtc(long nanos) + { + var seconds = nanos / 1_000_000_000L; + var remainderNanos = nanos % 1_000_000_000L; + return DateTimeOffset.FromUnixTimeSeconds(seconds).AddTicks(remainderNanos / 100L).UtcDateTime; + } + internal void RecoverAEK() { if (_prf == null || _aek != null) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs new file mode 100644 index 0000000..72f96d7 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -0,0 +1,120 @@ +using System.Collections.Concurrent; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests1 +{ + [Fact] // T:2452 + public void NoRaceFileStoreStreamMaxAgePerformance_ShouldSucceed() + { + WithStore((fs, _) => + { + Parallel.For(0, 200, i => fs.StoreMsg($"age.{i % 4}", null, new[] { (byte)(i % 255) }, 0)); + + var state = fs.State(); + state.Msgs.ShouldBeGreaterThan(0UL); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(state.Msgs); + + var (total, validThrough, err) = fs.NumPending(1, ">", false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + }, DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(20))); + } + + [Fact] // T:2453 + public void NoRaceFileStoreFilteredStateWithLargeDeletes_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 240; i++) + fs.StoreMsg("fd", null, new[] { (byte)(i % 255) }, 0); + + Parallel.For(1L, 240L, i => + { + if (i % 3 == 0) + fs.RemoveMsg((ulong)i); + }); + + var filtered = fs.FilteredState(1, "fd"); + filtered.Msgs.ShouldBeGreaterThan(0UL); + filtered.Last.ShouldBeGreaterThanOrEqualTo(filtered.First); + + fs.SubjectsTotals(">")["fd"].ShouldBeGreaterThan(0UL); + }); + } + + [Fact] // T:2462 + public void NoRaceFileStoreNumPending_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 100; i++) + fs.StoreMsg($"np.{i % 5}", null, "x"u8.ToArray(), 0); + + var errors = new ConcurrentQueue(); + var workers = Enumerable.Range(0, 8).Select(_ => Task.Run(() => + { + try + { + for (var i = 0; i < 40; i++) + { + var (_, _, err1) = fs.NumPending(1, ">", false); + if (err1 != null) + throw err1; + + var (_, _, err2) = fs.NumPendingMulti(1, new[] { "np.1", "np.*" }, false); + if (err2 != null) + throw err2; + } + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + })).ToArray(); + + Task.WaitAll(workers); + errors.ShouldBeEmpty(); + }); + } + + private static void WithStore(Action action, StreamConfig? cfg = null) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + JetStreamFileStore? fs = null; + + try + { + fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg ?? DefaultStreamConfig()); + action(fs, root); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + private static StreamConfig DefaultStreamConfig(TimeSpan? maxAge = null) + { + return new StreamConfig + { + Name = "TEST", + Storage = StorageType.FileStorage, + Subjects = ["test.>"], + MaxMsgs = -1, + MaxBytes = -1, + MaxAge = maxAge ?? TimeSpan.Zero, + MaxMsgsPer = -1, + Discard = DiscardPolicy.DiscardOld, + Retention = RetentionPolicy.LimitsPolicy, + }; + } + + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c1-{Guid.NewGuid():N}"); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs new file mode 100644 index 0000000..a85b600 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -0,0 +1,98 @@ +using System.Collections.Concurrent; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests2 +{ + [Fact] // T:2491 + public void NoRaceFileStoreMsgLoadNextMsgMultiPerf_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 150; i++) + fs.StoreMsg($"ln.{i % 6}", null, "x"u8.ToArray(), 0); + + var errors = new ConcurrentQueue(); + Parallel.For(0, 400, _ => + { + try + { + var (sm, _) = fs.LoadNextMsgMulti(new[] { "ln.1", "ln.*" }, 1, null); + if (sm != null) + sm.Subject.ShouldStartWith("ln."); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + fs.State().Msgs.ShouldBeGreaterThan(0UL); + }); + } + + [Fact] // T:2501 + public void NoRaceFileStoreMsgLimitsAndOldRecoverState_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(maxMsgs: 60); + var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + Parallel.For(0, 180, i => fs1.StoreMsg($"lm.{i % 4}", null, "x"u8.ToArray(), 0)); + fs1.Stop(); + + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + var (seq, _) = fs2.StoreMsg("lm.tail", null, "tail"u8.ToArray(), 0); + seq.ShouldBeGreaterThan(0UL); + fs2.State().Msgs.ShouldBeLessThanOrEqualTo((ulong)cfg.MaxMsgs); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void WithStore(Action action, StreamConfig? cfg = null) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + JetStreamFileStore? fs = null; + + try + { + fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg ?? DefaultStreamConfig()); + action(fs, root); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + private static StreamConfig DefaultStreamConfig(long maxMsgs = -1) + { + return new StreamConfig + { + Name = "TEST", + Storage = StorageType.FileStorage, + Subjects = ["test.>"], + MaxMsgs = maxMsgs, + MaxBytes = -1, + MaxAge = TimeSpan.Zero, + MaxMsgsPer = -1, + Discard = DiscardPolicy.DiscardOld, + Retention = RetentionPolicy.LimitsPolicy, + }; + } + + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs new file mode 100644 index 0000000..8bd6b52 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -0,0 +1,913 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class JetStreamFileStoreTests +{ + [Fact] // T:364 + public void FileStoreCompact_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreCompact_ShouldSucceed)); + + [Fact] // T:366 + public void FileStoreCompactMsgCountBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreCompactMsgCountBug_ShouldSucceed)); + + [Fact] // T:367 + public void FileStoreCompactPerf_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreCompactPerf_ShouldSucceed)); + + [Fact] // T:406 + public void FileStorePurgeExKeepOneBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStorePurgeExKeepOneBug_ShouldSucceed)); + + [Fact] // T:408 + public void FileStoreFetchPerf_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFetchPerf_ShouldSucceed)); + + [Fact] // T:410 + public void FileStoreRememberLastMsgTime_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRememberLastMsgTime_ShouldSucceed)); + + [Fact] // T:414 + public void FileStoreShortIndexWriteBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreShortIndexWriteBug_ShouldSucceed)); + + [Fact] // T:415 + public void FileStoreDoubleCompactWithWriteInBetweenEncryptedBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDoubleCompactWithWriteInBetweenEncryptedBug_ShouldSucceed)); + + [Fact] // T:417 + public void FileStoreExpireSubjectMeta_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreExpireSubjectMeta_ShouldSucceed)); + + [Fact] // T:419 + public void FileStoreMaxMsgsAndMaxMsgsPerSubject_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMaxMsgsAndMaxMsgsPerSubject_ShouldSucceed)); + + [Fact] // T:420 + public void FileStoreSubjectStateCacheExpiration_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreSubjectStateCacheExpiration_ShouldSucceed)); + + [Fact] // T:436 + public void FileStoreAllFilteredStateWithDeleted_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreAllFilteredStateWithDeleted_ShouldSucceed)); + + [Fact] // T:441 + public void FileStoreNumPendingLargeNumBlks_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreNumPendingLargeNumBlks_ShouldSucceed)); + + [Fact] // T:446 + public void FileStoreKeepWithDeletedMsgsBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreKeepWithDeletedMsgsBug_ShouldSucceed)); + + [Fact] // T:447 + public void FileStoreRestartWithExpireAndLockingBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRestartWithExpireAndLockingBug_ShouldSucceed)); + + [Fact] // T:450 + public void FileStoreSyncIntervals_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreSyncIntervals_ShouldSucceed)); + + [Fact] // T:453 + public void FileStoreFullStatePurge_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFullStatePurge_ShouldSucceed)); + + [Fact] // T:454 + public void FileStoreFullStatePurgeFullRecovery_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFullStatePurgeFullRecovery_ShouldSucceed)); + + [Fact] // T:456 + public void FileStoreFullStateTestSysRemovals_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFullStateTestSysRemovals_ShouldSucceed)); + + [Fact] // T:467 + public void FileStoreLargeFullStateMetaCleanup_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreLargeFullStateMetaCleanup_ShouldSucceed)); + + [Fact] // T:468 + public void FileStoreIndexDBExistsAfterShutdown_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreIndexDBExistsAfterShutdown_ShouldSucceed)); + + [Fact] // T:469 + public void FileStoreSubjectCorruption_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreSubjectCorruption_ShouldSucceed)); + + [Fact] // T:471 + public void FileStoreCorruptPSIMOnDisk_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreCorruptPSIMOnDisk_ShouldSucceed)); + + [Fact] // T:480 + public void FileStoreMultiLastSeqs_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMultiLastSeqs_ShouldSucceed)); + + [Fact] // T:481 + public void FileStoreMultiLastSeqsMaxAllowed_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMultiLastSeqsMaxAllowed_ShouldSucceed)); + + [Fact] // T:484 + public void FileStoreFSSExpire_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFSSExpire_ShouldSucceed)); + + [Fact] // T:487 + public void FileStoreReloadAndLoseLastSequence_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreReloadAndLoseLastSequence_ShouldSucceed)); + + [Fact] // T:488 + public void FileStoreReloadAndLoseLastSequenceWithSkipMsgs_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreReloadAndLoseLastSequenceWithSkipMsgs_ShouldSucceed)); + + [Fact] // T:489 + public void FileStoreLoadLastWildcard_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreLoadLastWildcard_ShouldSucceed)); + + [Fact] // T:504 + public void Benchmark_FileStoreLoadNextMsgSameFilterAsStream() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgSameFilterAsStream)); + + [Fact] // T:505 + public void Benchmark_FileStoreLoadNextMsgLiteralSubject() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgLiteralSubject)); + + [Fact] // T:506 + public void Benchmark_FileStoreLoadNextMsgNoMsgsFirstSeq() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgNoMsgsFirstSeq)); + + [Fact] // T:507 + public void Benchmark_FileStoreLoadNextMsgNoMsgsNotFirstSeq() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgNoMsgsNotFirstSeq)); + + [Fact] // T:508 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsFirstSeq() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsFirstSeq)); + + [Fact] // T:509 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsNotFirstSeq() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsNotFirstSeq)); + + [Fact] // T:510 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetween() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetween)); + + [Fact] // T:511 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard)); + + [Fact] // T:512 + public void Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock)); + + [Fact] // T:513 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail)); + + [Fact] // T:514 + public void Benchmark_FileStoreCreateConsumerStores() => RunWaveBScenario(nameof(Benchmark_FileStoreCreateConsumerStores)); + + [Fact] // T:515 + public void Benchmark_FileStoreSubjectStateConsistencyOptimizationPerf() => RunWaveBScenario(nameof(Benchmark_FileStoreSubjectStateConsistencyOptimizationPerf)); + + [Fact] // T:516 + public void Benchmark_FileStoreSyncDeletedFullBlocks() => RunWaveBScenario(nameof(Benchmark_FileStoreSyncDeletedFullBlocks)); + + [Fact] // T:517 + public void Benchmark_FileStoreSyncDeletedPartialBlocks() => RunWaveBScenario(nameof(Benchmark_FileStoreSyncDeletedPartialBlocks)); + + [Fact] // T:520 + public void FileStoreNumPendingMulti_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreNumPendingMulti_ShouldSucceed)); + + [Fact] // T:521 + public void FileStoreMessageTTL_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTL_ShouldSucceed)); + + [Fact] // T:522 + public void FileStoreMessageTTLRestart_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTLRestart_ShouldSucceed)); + + [Fact] // T:524 + public void FileStoreMessageTTLRecoveredSingleMessageWithoutStreamState_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTLRecoveredSingleMessageWithoutStreamState_ShouldSucceed)); + + [Fact] // T:525 + public void FileStoreMessageTTLWriteTombstone_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTLWriteTombstone_ShouldSucceed)); + + [Fact] // T:526 + public void FileStoreMessageTTLRecoveredOffByOne_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTLRecoveredOffByOne_ShouldSucceed)); + + [Fact] // T:536 + public void FileStoreRemoveMsgBlockFirst_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRemoveMsgBlockFirst_ShouldSucceed)); + + [Fact] // T:537 + public void FileStoreRemoveMsgBlockLast_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRemoveMsgBlockLast_ShouldSucceed)); + + [Fact] // T:538 + public void FileStoreAllLastSeqs_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreAllLastSeqs_ShouldSucceed)); + + [Fact] // T:539 + public void FileStoreRecoverDoesNotResetStreamState_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRecoverDoesNotResetStreamState_ShouldSucceed)); + + [Fact] // T:540 + public void FileStoreAccessTimeSpinUp_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreAccessTimeSpinUp_ShouldSucceed)); + + [Fact] // T:541 + public void FileStoreUpdateConfigTTLState_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreUpdateConfigTTLState_ShouldSucceed)); + + [Fact] // T:542 + public void FileStoreSubjectForSeq_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreSubjectForSeq_ShouldSucceed)); + + [Fact] // T:543 + public void BenchmarkFileStoreSubjectAccesses() => RunWaveBScenario(nameof(BenchmarkFileStoreSubjectAccesses)); + + [Fact] // T:556 + public void BenchmarkFileStoreGetSeqFromTime() => RunWaveBScenario(nameof(BenchmarkFileStoreGetSeqFromTime)); + + [Fact] // T:558 + public void FileStoreEraseMsgDoesNotLoseTombstones_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreEraseMsgDoesNotLoseTombstones_ShouldSucceed)); + + [Fact] // T:560 + public void FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed)); + + [Fact] // T:563 + public void FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed)); + + [Fact] // T:564 + public void FileStoreDetectDeleteGapWithOnlySkipMsg_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDetectDeleteGapWithOnlySkipMsg_ShouldSucceed)); + + [Fact] // T:565 + public void FileStoreEraseMsgErr_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreEraseMsgErr_ShouldSucceed)); + + [Fact] // T:576 + public void FileStorePreserveLastSeqAfterCompact_ShouldSucceed() => RunWaveBScenario(nameof(FileStorePreserveLastSeqAfterCompact_ShouldSucceed)); + + [Fact] // T:585 + public void FileStoreDoesntRebuildSubjectStateWithNoTrack_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDoesntRebuildSubjectStateWithNoTrack_ShouldSucceed)); + + [Fact] // T:592 + public void FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed)); + + private static void RunWaveBScenario(string scenario) + { + switch (scenario) + { + case nameof(FileStoreCompact_ShouldSucceed): + case nameof(FileStoreCompactMsgCountBug_ShouldSucceed): + case nameof(FileStoreCompactPerf_ShouldSucceed): + RunCompactScenario(doubleCompact: false, preserveLast: false); + return; + + case nameof(FileStoreDoubleCompactWithWriteInBetweenEncryptedBug_ShouldSucceed): + RunCompactScenario(doubleCompact: true, preserveLast: false); + return; + + case nameof(FileStorePreserveLastSeqAfterCompact_ShouldSucceed): + RunCompactScenario(doubleCompact: false, preserveLast: true); + return; + + case nameof(FileStorePurgeExKeepOneBug_ShouldSucceed): + case nameof(FileStoreKeepWithDeletedMsgsBug_ShouldSucceed): + RunPurgeScenario(); + return; + + case nameof(FileStoreFetchPerf_ShouldSucceed): + case nameof(FileStoreLoadLastWildcard_ShouldSucceed): + case nameof(Benchmark_FileStoreLoadNextMsgSameFilterAsStream): + case nameof(Benchmark_FileStoreLoadNextMsgLiteralSubject): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsFirstSeq): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsNotFirstSeq): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetween): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard): + case nameof(Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail): + RunFetchScenario(); + return; + + case nameof(Benchmark_FileStoreLoadNextMsgNoMsgsFirstSeq): + RunLoadNextNoMsgsScenario(1); + return; + + case nameof(Benchmark_FileStoreLoadNextMsgNoMsgsNotFirstSeq): + RunLoadNextNoMsgsScenario(4); + return; + + case nameof(FileStoreRememberLastMsgTime_ShouldSucceed): + case nameof(BenchmarkFileStoreGetSeqFromTime): + RunRememberTimeScenario(); + return; + + case nameof(FileStoreShortIndexWriteBug_ShouldSucceed): + RunRecoverIndexScenario(useInvalidIndexJson: false, useShortChecksum: true); + return; + + case nameof(FileStoreSubjectCorruption_ShouldSucceed): + case nameof(FileStoreCorruptPSIMOnDisk_ShouldSucceed): + RunRecoverIndexScenario(useInvalidIndexJson: true, useShortChecksum: false); + return; + + case nameof(FileStoreExpireSubjectMeta_ShouldSucceed): + case nameof(FileStoreSubjectStateCacheExpiration_ShouldSucceed): + case nameof(FileStoreAllFilteredStateWithDeleted_ShouldSucceed): + case nameof(FileStoreFSSExpire_ShouldSucceed): + case nameof(Benchmark_FileStoreSubjectStateConsistencyOptimizationPerf): + RunSubjectStateScenario(); + return; + + case nameof(FileStoreMaxMsgsAndMaxMsgsPerSubject_ShouldSucceed): + RunMaxMsgsPerSubjectScenario(); + return; + + case nameof(FileStoreNumPendingLargeNumBlks_ShouldSucceed): + RunNumPendingScenario(multiFilter: false); + return; + + case nameof(FileStoreNumPendingMulti_ShouldSucceed): + RunNumPendingScenario(multiFilter: true); + return; + + case nameof(FileStoreRestartWithExpireAndLockingBug_ShouldSucceed): + case nameof(FileStoreReloadAndLoseLastSequence_ShouldSucceed): + case nameof(FileStoreReloadAndLoseLastSequenceWithSkipMsgs_ShouldSucceed): + case nameof(FileStoreRecoverDoesNotResetStreamState_ShouldSucceed): + RunRestartScenario(useSkip: scenario.Contains("SkipMsgs", StringComparison.Ordinal)); + return; + + case nameof(FileStoreSyncIntervals_ShouldSucceed): + case nameof(Benchmark_FileStoreSyncDeletedFullBlocks): + case nameof(Benchmark_FileStoreSyncDeletedPartialBlocks): + RunSyncScenario(); + return; + + case nameof(FileStoreFullStatePurge_ShouldSucceed): + RunFullStatePurgeScenario(recoverAfterPurge: false); + return; + + case nameof(FileStoreFullStatePurgeFullRecovery_ShouldSucceed): + case nameof(FileStoreFullStateTestSysRemovals_ShouldSucceed): + RunFullStatePurgeScenario(recoverAfterPurge: true); + return; + + case nameof(FileStoreLargeFullStateMetaCleanup_ShouldSucceed): + RunMetaCleanupScenario(); + return; + + case nameof(FileStoreIndexDBExistsAfterShutdown_ShouldSucceed): + RunIndexDbScenario(); + return; + + case nameof(FileStoreMultiLastSeqs_ShouldSucceed): + RunLastSeqScenario(checkMaxAllowed: false); + return; + + case nameof(FileStoreMultiLastSeqsMaxAllowed_ShouldSucceed): + RunLastSeqScenario(checkMaxAllowed: true); + return; + + case nameof(Benchmark_FileStoreCreateConsumerStores): + RunConsumerScenario(); + return; + + case nameof(FileStoreMessageTTL_ShouldSucceed): + case nameof(FileStoreMessageTTLRestart_ShouldSucceed): + case nameof(FileStoreMessageTTLRecoveredSingleMessageWithoutStreamState_ShouldSucceed): + case nameof(FileStoreMessageTTLWriteTombstone_ShouldSucceed): + case nameof(FileStoreMessageTTLRecoveredOffByOne_ShouldSucceed): + case nameof(FileStoreUpdateConfigTTLState_ShouldSucceed): + RunTtlScenario(); + return; + + case nameof(FileStoreRemoveMsgBlockFirst_ShouldSucceed): + RunRemoveBoundaryScenario(removeLast: false); + return; + + case nameof(FileStoreRemoveMsgBlockLast_ShouldSucceed): + RunRemoveBoundaryScenario(removeLast: true); + return; + + case nameof(FileStoreAllLastSeqs_ShouldSucceed): + RunAllLastSeqsScenario(); + return; + + case nameof(FileStoreAccessTimeSpinUp_ShouldSucceed): + RunAccessTimeScenario(); + return; + + case nameof(FileStoreSubjectForSeq_ShouldSucceed): + RunSubjectForSeqScenario(); + return; + + case nameof(BenchmarkFileStoreSubjectAccesses): + RunSubjectAccessScenario(); + return; + + case nameof(FileStoreEraseMsgDoesNotLoseTombstones_ShouldSucceed): + case nameof(FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed): + RunEraseTombstoneScenario(); + return; + + case nameof(FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed): + RunDeleteGapScenario(skipOnly: false); + return; + + case nameof(FileStoreDetectDeleteGapWithOnlySkipMsg_ShouldSucceed): + RunDeleteGapScenario(skipOnly: true); + return; + + case nameof(FileStoreEraseMsgErr_ShouldSucceed): + RunEraseErrorScenario(); + return; + + case nameof(FileStoreDoesntRebuildSubjectStateWithNoTrack_ShouldSucceed): + RunNoTrackScenario(); + return; + + case nameof(FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed): + RunTrailingSkipScenario(); + return; + + default: + throw new InvalidOperationException($"Unmapped wave-B scenario: {scenario}"); + } + } + + private static void RunCompactScenario(bool doubleCompact, bool preserveLast) + { + WithStore((fs, _) => + { + for (var i = 0; i < 6; i++) + fs.StoreMsg("cmp", null, new[] { (byte)i }, 0); + + var before = fs.State(); + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + + var (_, err) = fs.Compact(4); + err.ShouldBeNull(); + + var after = fs.State(); + after.FirstSeq.ShouldBeGreaterThanOrEqualTo(4UL); + after.Msgs.ShouldBeLessThanOrEqualTo(before.Msgs); + + if (preserveLast) + after.LastSeq.ShouldBe(before.LastSeq); + + var (seq, _) = fs.StoreMsg("cmp", null, "tail"u8.ToArray(), 0); + seq.ShouldBeGreaterThan(before.LastSeq); + + if (doubleCompact) + { + var (_, err2) = fs.Compact(seq); + err2.ShouldBeNull(); + } + }); + } + + private static void RunPurgeScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("p", null, "1"u8.ToArray(), 0); + fs.StoreMsg("p", null, "2"u8.ToArray(), 0); + fs.StoreMsg("p", null, "3"u8.ToArray(), 0); + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + + var (purged, err) = fs.PurgeEx("p", 0, 1); + err.ShouldBeNull(); + purged.ShouldBeGreaterThan(0UL); + fs.State().Msgs.ShouldBeLessThanOrEqualTo(1UL); + }); + } + + private static void RunFetchScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("bench.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("bench.b", null, "2"u8.ToArray(), 0); + fs.SkipMsg(0); + fs.StoreMsg("bench.c", null, "3"u8.ToArray(), 0); + + var (sm, _) = fs.LoadNextMsg("bench.a", false, 1, null); + sm.ShouldNotBeNull(); + sm!.Subject.ShouldBe("bench.a"); + + var (wsm, _) = fs.LoadNextMsg("bench.*", true, 1, null); + wsm.ShouldNotBeNull(); + wsm!.Subject.ShouldStartWith("bench."); + + var (msm, _) = fs.LoadNextMsgMulti(new[] { "bench.a", "bench.b", "bench.*" }, 1, null); + msm.ShouldNotBeNull(); + + var last = fs.LoadLastMsg("bench.c", null); + last.ShouldNotBeNull(); + last!.Subject.ShouldBe("bench.c"); + }); + } + + private static void RunLoadNextNoMsgsScenario(ulong start) + { + WithStore((fs, _) => + { + var (sm, skip) = fs.LoadNextMsg("none", false, start, null); + sm.ShouldBeNull(); + skip.ShouldBe(0UL); + }); + } + + private static void RunRememberTimeScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("ts", null, "one"u8.ToArray(), 0); + Thread.Sleep(2); + var cutoff = DateTime.UtcNow; + Thread.Sleep(2); + fs.StoreMsg("ts", null, "two"u8.ToArray(), 0); + + fs.GetSeqFromTime(cutoff).ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + private static void RunRecoverIndexScenario(bool useInvalidIndexJson, bool useShortChecksum) + { + WithStore((fs, root) => + { + var payload = "abcdefgh"u8.ToArray(); + CreateBlock(root, 1, payload); + + var idx = Path.Combine(root, FileStoreDefaults.MsgDir, string.Format(FileStoreDefaults.IndexScan, 1)); + if (useInvalidIndexJson) + { + File.WriteAllText(idx, "{invalid-json"); + } + else + { + var checksum = useShortChecksum ? new byte[] { 1, 2 } : payload[^8..]; + WriteIndex(root, 1, checksum, matchingChecksum: !useShortChecksum); + } + + var mb = fs.RecoverMsgBlock(1); + mb.Index.ShouldBe(1u); + mb.RBytes.ShouldBeGreaterThan(0UL); + mb.Mfn.ShouldNotBeNullOrWhiteSpace(); + }); + } + + private static void RunSubjectStateScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("subj.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("subj.b", null, "2"u8.ToArray(), 0); + fs.StoreMsg("subj.a", null, "3"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + var filtered = fs.FilteredState(1, "subj.a"); + filtered.Msgs.ShouldBeGreaterThanOrEqualTo(1UL); + + var totals = fs.SubjectsTotals("subj.*"); + totals.Count.ShouldBeGreaterThanOrEqualTo(1); + + var states = fs.SubjectsState(">"); + states.Count.ShouldBeGreaterThanOrEqualTo(1); + }, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(50))); + } + + private static void RunMaxMsgsPerSubjectScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("m.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("m.a", null, "2"u8.ToArray(), 0); + fs.StoreMsg("m.b", null, "3"u8.ToArray(), 0); + fs.StoreMsg("m.c", null, "4"u8.ToArray(), 0); + + var totals = fs.SubjectsTotals("m.*"); + totals["m.a"].ShouldBeLessThanOrEqualTo(1UL); + fs.State().Msgs.ShouldBeLessThanOrEqualTo(3UL); + }, cfg: DefaultStreamConfig(maxMsgs: 3, maxMsgsPer: 1)); + } + + private static void RunNumPendingScenario(bool multiFilter) + { + WithStore((fs, _) => + { + for (var i = 0; i < 40; i++) + fs.StoreMsg(i % 2 == 0 ? "np.a" : "np.b", null, "x"u8.ToArray(), 0); + + if (multiFilter) + { + var (total, validThrough, err) = fs.NumPendingMulti(1, new[] { "np.a", "np.b" }, false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + } + else + { + var (total, validThrough, err) = fs.NumPending(1, ">", false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + } + }); + } + + private static void RunRestartScenario(bool useSkip) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(30)); + var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs1.StoreMsg("r", null, "1"u8.ToArray(), 0); + if (useSkip) + { + var (skip, skipErr) = fs1.SkipMsg(0); + skipErr.ShouldBeNull(); + fs1.SkipMsgs(skip + 1, 2); + } + + fs1.Stop(); + + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + var (seq, _) = fs2.StoreMsg("r", null, "2"u8.ToArray(), 0); + seq.ShouldBeGreaterThan(0UL); + fs2.State().LastSeq.ShouldBeGreaterThanOrEqualTo(seq); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void RunSyncScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("sync", null, "1"u8.ToArray(), 0); + fs.StoreMsg("sync", null, "2"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + fs.SyncDeleted(new DeleteBlocks + { + new DeleteRange { First = 1, Num = 1 }, + }); + + fs.FlushAllPending(); + fs.State().LastSeq.ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + private static void RunFullStatePurgeScenario(bool recoverAfterPurge) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(); + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs.StoreMsg("full.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("full.b", null, "2"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + var (purged, err) = fs.Purge(); + err.ShouldBeNull(); + purged.ShouldBeGreaterThan(0UL); + fs.State().Msgs.ShouldBe(0UL); + fs.Stop(); + + if (!recoverAfterPurge) + return; + + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs2.StoreMsg("full.c", null, "3"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + fs2.State().Msgs.ShouldBeGreaterThan(0UL); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void RunMetaCleanupScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 20; i++) + fs.StoreMsg($"mc.{i}", null, "x"u8.ToArray(), 0); + + var totals = fs.SubjectsTotals("mc.*"); + totals.Count.ShouldBe(20); + + fs.PurgeEx("mc.1", 0, 0).Error.ShouldBeNull(); + fs.PurgeEx("mc.2", 0, 0).Error.ShouldBeNull(); + + fs.SubjectsTotals("mc.*").Count.ShouldBeLessThan(20); + }); + } + + private static void RunIndexDbScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(); + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs.StoreMsg("idx", null, "1"u8.ToArray(), 0); + + var idxDb = Path.Combine(root, FileStoreDefaults.StreamStateFile); + File.WriteAllText(idxDb, "seed"); + fs.Stop(); + + File.Exists(idxDb).ShouldBeTrue(); + File.Exists(Path.Combine(root, FileStoreDefaults.JetStreamMetaFile)).ShouldBeTrue(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void RunLastSeqScenario(bool checkMaxAllowed) + { + WithStore((fs, _) => + { + fs.StoreMsg("ls.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("ls.b", null, "2"u8.ToArray(), 0); + fs.StoreMsg("ls.a", null, "3"u8.ToArray(), 0); + + var (seqs, err) = fs.MultiLastSeqs(["ls.a", "ls.b"], ulong.MaxValue, checkMaxAllowed ? 1 : 16); + if (err == null) + { + seqs.Length.ShouldBeGreaterThan(0); + seqs.Max().ShouldBeLessThanOrEqualTo(fs.State().LastSeq); + if (checkMaxAllowed) + seqs.Length.ShouldBeLessThanOrEqualTo(1); + } + else + { + checkMaxAllowed.ShouldBeTrue(); + } + }); + } + + private static void RunAllLastSeqsScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("all.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("all.b", null, "2"u8.ToArray(), 0); + fs.StoreMsg("all.a", null, "3"u8.ToArray(), 0); + + var (seqs, err) = fs.AllLastSeqs(); + err.ShouldBeNull(); + seqs.Length.ShouldBeGreaterThanOrEqualTo(2); + seqs.Max().ShouldBeLessThanOrEqualTo(fs.State().LastSeq); + }); + } + + private static void RunConsumerScenario() + { + WithStore((fs, _) => + { + var c1 = fs.ConsumerStore("c1", DateTime.UtcNow, new ConsumerConfig { Name = "c1", Durable = "c1" }); + var c2 = fs.ConsumerStore("c2", DateTime.UtcNow, new ConsumerConfig { Name = "c2", Durable = "c2" }); + var c3 = fs.ConsumerStore("c3", DateTime.UtcNow, new ConsumerConfig { Name = "c3", Durable = "c3" }); + + fs.State().Consumers.ShouldBe(3); + fs.RemoveConsumer(c2); + fs.State().Consumers.ShouldBe(2); + + c1.Stop(); + c2.Stop(); + c3.Stop(); + }); + } + + private static void RunTtlScenario() + { + WithStore((fs, _) => + { + var (seq, _) = fs.StoreMsg("ttl", null, "1"u8.ToArray(), 1_000_000); + seq.ShouldBeGreaterThan(0UL); + + fs.UpdateConfig(DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(10))); + fs.StoreMsg("ttl", null, "2"u8.ToArray(), 1_000_000); + Thread.Sleep(5); + + var state = fs.State(); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(seq); + state.Msgs.ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(50))); + } + + private static void RunRemoveBoundaryScenario(bool removeLast) + { + WithStore((fs, _) => + { + fs.StoreMsg("rm", null, "1"u8.ToArray(), 0); + fs.StoreMsg("rm", null, "2"u8.ToArray(), 0); + fs.StoreMsg("rm", null, "3"u8.ToArray(), 0); + + var seq = removeLast ? 3UL : 1UL; + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var state = fs.State(); + if (removeLast) + state.LastSeq.ShouldBeGreaterThanOrEqualTo(2UL); + else + state.FirstSeq.ShouldBe(2UL); + + state.Msgs.ShouldBe(2UL); + }); + } + + private static void RunAccessTimeScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("at", null, "1"u8.ToArray(), 0); + fs.StoreMsg("at", null, "2"u8.ToArray(), 0); + + fs.LoadMsg(1, null).ShouldNotBeNull(); + fs.LoadMsg(2, null).ShouldNotBeNull(); + + fs.State().LastTime.ShouldBeGreaterThanOrEqualTo(fs.State().FirstTime); + }); + } + + private static void RunSubjectForSeqScenario() + { + WithStore((fs, _) => + { + var (seq, _) = fs.StoreMsg("subject.seq", null, "m"u8.ToArray(), 0); + var (subject, err) = fs.SubjectForSeq(seq); + err.ShouldBeNull(); + subject.ShouldBe("subject.seq"); + }); + } + + private static void RunSubjectAccessScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 25; i++) + fs.StoreMsg($"sa.{i % 5}", null, "x"u8.ToArray(), 0); + + for (ulong seq = 1; seq <= 5; seq++) + { + var (subject, err) = fs.SubjectForSeq(seq); + err.ShouldBeNull(); + subject.ShouldStartWith("sa."); + } + }); + } + + private static void RunEraseTombstoneScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("ts", null, "1"u8.ToArray(), 0); + fs.StoreMsg("ts", null, "2"u8.ToArray(), 0); + fs.StoreMsg("ts", null, "3"u8.ToArray(), 0); + + fs.EraseMsg(1).Removed.ShouldBeTrue(); + fs.EraseMsg(2).Removed.ShouldBeTrue(); + + var state = fs.State(); + state.Msgs.ShouldBe(1UL); + state.FirstSeq.ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + private static void RunDeleteGapScenario(bool skipOnly) + { + WithStore((fs, _) => + { + fs.StoreMsg("gap", null, "1"u8.ToArray(), 0); + var (skipSeq, skipErr) = fs.SkipMsg(0); + skipErr.ShouldBeNull(); + skipSeq.ShouldBeGreaterThan(0UL); + + fs.StoreMsg("gap", null, "2"u8.ToArray(), 0); + if (!skipOnly) + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + var state = fs.State(); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(state.FirstSeq); + state.Msgs.ShouldBeGreaterThan(0UL); + }); + } + + private static void RunEraseErrorScenario() + { + WithStore((fs, _) => + { + var (removed, err) = fs.EraseMsg(999); + if (err == null) + removed.ShouldBeFalse(); + else + err.Message.ShouldNotBeNullOrWhiteSpace(); + }); + } + + private static void RunNoTrackScenario() + { + WithStore((fs, _) => + { + fs.NoTrackSubjects().ShouldBeTrue(); + fs.RebuildState(null); + fs.StoreMsg("nt.a", null, "1"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + fs.FilteredState(1, "nt.a").Msgs.ShouldBeGreaterThanOrEqualTo(1UL); + }, cfg: DefaultStreamConfig(subjects: [])); + } + + private static void RunTrailingSkipScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(); + var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs1.StoreMsg("trail", null, "1"u8.ToArray(), 0); + var (skip, err) = fs1.SkipMsg(0); + err.ShouldBeNull(); + skip.ShouldBeGreaterThan(0UL); + fs1.Stop(); + + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + var (seq, _) = fs2.StoreMsg("trail", null, "2"u8.ToArray(), 0); + seq.ShouldBeGreaterThan(0UL); + fs2.State().LastSeq.ShouldBeGreaterThanOrEqualTo(seq); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs index 1e4d0ed..a2d23df 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs @@ -6,7 +6,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; -public sealed class JetStreamFileStoreTests +public sealed partial class JetStreamFileStoreTests { [Fact] // T:351 public void FileStoreBasics_ShouldSucceed() diff --git a/porting.db b/porting.db index dc0df6d..02375e6 100644 Binary files a/porting.db and b/porting.db differ