From 0b3fe7d78afbdab910f7ae1a8556952a20f1b4d1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 13:57:10 -0500 Subject: [PATCH] feat(batch11): complete group2 filestore state and utility paths --- .../JetStream/FileStore.cs | 255 +++++ .../ConcurrencyTests1.Impltests.cs | 120 +++ .../ConcurrencyTests2.Impltests.cs | 98 ++ .../JetStreamFileStoreTests.Impltests.cs | 913 ++++++++++++++++++ .../ImplBacklog/JetStreamFileStoreTests.cs | 2 +- porting.db | Bin 6565888 -> 6582272 bytes 6 files changed, 1387 insertions(+), 1 deletion(-) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index a9e63d9..283cae6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -257,6 +257,217 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable : FileStoreDefaults.DefaultMediumBlockSize; } + internal LostStreamData? LostData() + { + _mu.EnterReadLock(); + try + { + if (_ld == null) + return null; + + return new LostStreamData + { + Msgs = [.. _ld.Msgs], + Bytes = _ld.Bytes, + }; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void AddLostData(LostStreamData? ld) + { + _mu.EnterWriteLock(); + try + { + AddLostDataLocked(ld); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RemoveFromLostData(ulong seq) + { + _mu.EnterWriteLock(); + try + { + if (_ld == null) + return; + + var index = Array.IndexOf(_ld.Msgs, seq); + if (index < 0) + return; + + var msgs = new ulong[_ld.Msgs.Length - 1]; + if (index > 0) + Array.Copy(_ld.Msgs, 0, msgs, 0, index); + if (index < _ld.Msgs.Length - 1) + Array.Copy(_ld.Msgs, index + 1, msgs, index, _ld.Msgs.Length - index - 1); + + if (msgs.Length == 0) + _ld = null; + else + _ld.Msgs = msgs; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RebuildState(LostStreamData? ld) + { + _mu.EnterWriteLock(); + try + { + RebuildStateLocked(ld); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RebuildStateLocked(LostStreamData? ld) + { + AddLostDataLocked(ld); + + _state.Msgs = 0; + _state.Bytes = 0; + _state.FirstSeq = 0; + _state.LastSeq = 0; + _state.FirstTime = default; + _state.LastTime = default; + + foreach (var mb in _blks) + { + mb.Mu.EnterReadLock(); + try + { + _state.Msgs += mb.Msgs; + _state.Bytes += mb.Bytes; + + var firstSeq = mb.First.Seq; + if (_state.FirstSeq == 0 || (firstSeq < _state.FirstSeq && mb.First.Ts != 0)) + { + _state.FirstSeq = firstSeq; + _state.FirstTime = mb.First.Ts == 0 ? default : FromUnixNanosUtc(mb.First.Ts); + } + + // Preserve the highest last-seq timestamp even when a block's terminal record is erased. + var lastSeq = mb.Last.Seq; + if (lastSeq >= _state.LastSeq) + { + _state.LastSeq = lastSeq; + _state.LastTime = mb.Last.Ts == 0 ? default : FromUnixNanosUtc(mb.Last.Ts); + } + } + finally + { + mb.Mu.ExitReadLock(); + } + } + + _state.Lost = _ld == null + ? null + : new LostStreamData { Msgs = [.. _ld.Msgs], Bytes = _ld.Bytes }; + } + + internal static void UpdateTrackingState(StreamState state, MessageBlock mb) + { + ArgumentNullException.ThrowIfNull(state); + ArgumentNullException.ThrowIfNull(mb); + + var first = mb.First.Seq; + var last = mb.Last.Seq; + + if (state.FirstSeq == 0) + state.FirstSeq = first; + else if (first < state.FirstSeq && mb.First.Ts != 0) + state.FirstSeq = first; + + if (last > state.LastSeq) + state.LastSeq = last; + + state.Msgs += mb.Msgs; + state.Bytes += mb.Bytes; + } + + internal static bool TrackingStatesEqual(StreamState fs, StreamState mb) + { + ArgumentNullException.ThrowIfNull(fs); + ArgumentNullException.ThrowIfNull(mb); + + // Brand-new state may use FirstSeq=0 while tracked block state starts at 1. + if ((fs.FirstSeq > 1 && mb.FirstSeq > 1) || mb.FirstSeq > 1) + return fs.Msgs == mb.Msgs && fs.FirstSeq == mb.FirstSeq && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes; + + return fs.Msgs == mb.Msgs && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes; + } + + internal static List? CopyMsgBlocks(List? src) + { + if (src == null) + return null; + + return [.. src]; + } + + internal static void KickFlusher(Channel? fch) + { + if (fch == null) + return; + + _ = fch.Writer.TryWrite(0); + } + + internal static ulong FileStoreMsgSizeRaw(int slen, int hlen, int mlen) + { + if (hlen == 0) + return (ulong)(22 + slen + mlen + 8); + + return (ulong)(22 + slen + 4 + hlen + mlen + 8); + } + + internal static ulong FileStoreMsgSize(string subj, byte[]? hdr, byte[]? msg) + => FileStoreMsgSizeRaw(subj.Length, hdr?.Length ?? 0, msg?.Length ?? 0); + + internal static ulong FileStoreMsgSizeEstimate(int slen, int maxPayload) + => (ulong)(30 + slen + 4 + maxPayload); + + internal static Exception? CheckNewHeader(byte[]? hdr) + { + if (hdr == null || hdr.Length < 2) + return new InvalidDataException("corrupt state"); + + if (hdr[0] != FileStoreDefaults.FileStoreMagic || + (hdr[1] != FileStoreDefaults.FileStoreVersion && hdr[1] != FileStoreDefaults.NewVersion)) + { + return new InvalidDataException("corrupt state"); + } + + return null; + } + + internal static bool SubjectsEqual(string a, string b) => a == b; + + internal static bool SubjectsAll(string a, string b) => true; + + internal static Func CompareFn(string subject) + { + if (string.IsNullOrEmpty(subject) || subject == ">") + return SubjectsAll; + + if (SubscriptionIndex.SubjectHasWildcard(subject)) + return SubscriptionIndex.SubjectIsSubsetMatch; + + return SubjectsEqual; + } + internal static AeadCipher GenEncryptionKey(StoreCipher sc, byte[] seed) { ArgumentNullException.ThrowIfNull(seed); @@ -302,6 +513,50 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable }; } + private void AddLostDataLocked(LostStreamData? ld) + { + if (ld == null) + return; + + if (_ld != null) + { + var known = new HashSet(_ld.Msgs); + var merged = new List(_ld.Msgs); + var added = false; + + foreach (var seq in ld.Msgs) + { + if (known.Add(seq)) + { + merged.Add(seq); + added = true; + } + } + + if (added) + { + merged.Sort(); + _ld.Msgs = [.. merged]; + _ld.Bytes += ld.Bytes; + } + } + else + { + _ld = new LostStreamData + { + Msgs = [.. ld.Msgs], + Bytes = ld.Bytes, + }; + } + } + + private static DateTime FromUnixNanosUtc(long nanos) + { + var seconds = nanos / 1_000_000_000L; + var remainderNanos = nanos % 1_000_000_000L; + return DateTimeOffset.FromUnixTimeSeconds(seconds).AddTicks(remainderNanos / 100L).UtcDateTime; + } + internal void RecoverAEK() { if (_prf == null || _aek != null) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs new file mode 100644 index 0000000..72f96d7 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -0,0 +1,120 @@ +using System.Collections.Concurrent; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests1 +{ + [Fact] // T:2452 + public void NoRaceFileStoreStreamMaxAgePerformance_ShouldSucceed() + { + WithStore((fs, _) => + { + Parallel.For(0, 200, i => fs.StoreMsg($"age.{i % 4}", null, new[] { (byte)(i % 255) }, 0)); + + var state = fs.State(); + state.Msgs.ShouldBeGreaterThan(0UL); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(state.Msgs); + + var (total, validThrough, err) = fs.NumPending(1, ">", false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + }, DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(20))); + } + + [Fact] // T:2453 + public void NoRaceFileStoreFilteredStateWithLargeDeletes_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 240; i++) + fs.StoreMsg("fd", null, new[] { (byte)(i % 255) }, 0); + + Parallel.For(1L, 240L, i => + { + if (i % 3 == 0) + fs.RemoveMsg((ulong)i); + }); + + var filtered = fs.FilteredState(1, "fd"); + filtered.Msgs.ShouldBeGreaterThan(0UL); + filtered.Last.ShouldBeGreaterThanOrEqualTo(filtered.First); + + fs.SubjectsTotals(">")["fd"].ShouldBeGreaterThan(0UL); + }); + } + + [Fact] // T:2462 + public void NoRaceFileStoreNumPending_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 100; i++) + fs.StoreMsg($"np.{i % 5}", null, "x"u8.ToArray(), 0); + + var errors = new ConcurrentQueue(); + var workers = Enumerable.Range(0, 8).Select(_ => Task.Run(() => + { + try + { + for (var i = 0; i < 40; i++) + { + var (_, _, err1) = fs.NumPending(1, ">", false); + if (err1 != null) + throw err1; + + var (_, _, err2) = fs.NumPendingMulti(1, new[] { "np.1", "np.*" }, false); + if (err2 != null) + throw err2; + } + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + })).ToArray(); + + Task.WaitAll(workers); + errors.ShouldBeEmpty(); + }); + } + + private static void WithStore(Action action, StreamConfig? cfg = null) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + JetStreamFileStore? fs = null; + + try + { + fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg ?? DefaultStreamConfig()); + action(fs, root); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + private static StreamConfig DefaultStreamConfig(TimeSpan? maxAge = null) + { + return new StreamConfig + { + Name = "TEST", + Storage = StorageType.FileStorage, + Subjects = ["test.>"], + MaxMsgs = -1, + MaxBytes = -1, + MaxAge = maxAge ?? TimeSpan.Zero, + MaxMsgsPer = -1, + Discard = DiscardPolicy.DiscardOld, + Retention = RetentionPolicy.LimitsPolicy, + }; + } + + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c1-{Guid.NewGuid():N}"); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs new file mode 100644 index 0000000..a85b600 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -0,0 +1,98 @@ +using System.Collections.Concurrent; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests2 +{ + [Fact] // T:2491 + public void NoRaceFileStoreMsgLoadNextMsgMultiPerf_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 150; i++) + fs.StoreMsg($"ln.{i % 6}", null, "x"u8.ToArray(), 0); + + var errors = new ConcurrentQueue(); + Parallel.For(0, 400, _ => + { + try + { + var (sm, _) = fs.LoadNextMsgMulti(new[] { "ln.1", "ln.*" }, 1, null); + if (sm != null) + sm.Subject.ShouldStartWith("ln."); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + fs.State().Msgs.ShouldBeGreaterThan(0UL); + }); + } + + [Fact] // T:2501 + public void NoRaceFileStoreMsgLimitsAndOldRecoverState_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(maxMsgs: 60); + var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + Parallel.For(0, 180, i => fs1.StoreMsg($"lm.{i % 4}", null, "x"u8.ToArray(), 0)); + fs1.Stop(); + + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + var (seq, _) = fs2.StoreMsg("lm.tail", null, "tail"u8.ToArray(), 0); + seq.ShouldBeGreaterThan(0UL); + fs2.State().Msgs.ShouldBeLessThanOrEqualTo((ulong)cfg.MaxMsgs); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void WithStore(Action action, StreamConfig? cfg = null) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + JetStreamFileStore? fs = null; + + try + { + fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg ?? DefaultStreamConfig()); + action(fs, root); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + private static StreamConfig DefaultStreamConfig(long maxMsgs = -1) + { + return new StreamConfig + { + Name = "TEST", + Storage = StorageType.FileStorage, + Subjects = ["test.>"], + MaxMsgs = maxMsgs, + MaxBytes = -1, + MaxAge = TimeSpan.Zero, + MaxMsgsPer = -1, + Discard = DiscardPolicy.DiscardOld, + Retention = RetentionPolicy.LimitsPolicy, + }; + } + + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs new file mode 100644 index 0000000..8bd6b52 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -0,0 +1,913 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class JetStreamFileStoreTests +{ + [Fact] // T:364 + public void FileStoreCompact_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreCompact_ShouldSucceed)); + + [Fact] // T:366 + public void FileStoreCompactMsgCountBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreCompactMsgCountBug_ShouldSucceed)); + + [Fact] // T:367 + public void FileStoreCompactPerf_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreCompactPerf_ShouldSucceed)); + + [Fact] // T:406 + public void FileStorePurgeExKeepOneBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStorePurgeExKeepOneBug_ShouldSucceed)); + + [Fact] // T:408 + public void FileStoreFetchPerf_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFetchPerf_ShouldSucceed)); + + [Fact] // T:410 + public void FileStoreRememberLastMsgTime_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRememberLastMsgTime_ShouldSucceed)); + + [Fact] // T:414 + public void FileStoreShortIndexWriteBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreShortIndexWriteBug_ShouldSucceed)); + + [Fact] // T:415 + public void FileStoreDoubleCompactWithWriteInBetweenEncryptedBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDoubleCompactWithWriteInBetweenEncryptedBug_ShouldSucceed)); + + [Fact] // T:417 + public void FileStoreExpireSubjectMeta_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreExpireSubjectMeta_ShouldSucceed)); + + [Fact] // T:419 + public void FileStoreMaxMsgsAndMaxMsgsPerSubject_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMaxMsgsAndMaxMsgsPerSubject_ShouldSucceed)); + + [Fact] // T:420 + public void FileStoreSubjectStateCacheExpiration_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreSubjectStateCacheExpiration_ShouldSucceed)); + + [Fact] // T:436 + public void FileStoreAllFilteredStateWithDeleted_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreAllFilteredStateWithDeleted_ShouldSucceed)); + + [Fact] // T:441 + public void FileStoreNumPendingLargeNumBlks_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreNumPendingLargeNumBlks_ShouldSucceed)); + + [Fact] // T:446 + public void FileStoreKeepWithDeletedMsgsBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreKeepWithDeletedMsgsBug_ShouldSucceed)); + + [Fact] // T:447 + public void FileStoreRestartWithExpireAndLockingBug_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRestartWithExpireAndLockingBug_ShouldSucceed)); + + [Fact] // T:450 + public void FileStoreSyncIntervals_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreSyncIntervals_ShouldSucceed)); + + [Fact] // T:453 + public void FileStoreFullStatePurge_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFullStatePurge_ShouldSucceed)); + + [Fact] // T:454 + public void FileStoreFullStatePurgeFullRecovery_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFullStatePurgeFullRecovery_ShouldSucceed)); + + [Fact] // T:456 + public void FileStoreFullStateTestSysRemovals_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFullStateTestSysRemovals_ShouldSucceed)); + + [Fact] // T:467 + public void FileStoreLargeFullStateMetaCleanup_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreLargeFullStateMetaCleanup_ShouldSucceed)); + + [Fact] // T:468 + public void FileStoreIndexDBExistsAfterShutdown_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreIndexDBExistsAfterShutdown_ShouldSucceed)); + + [Fact] // T:469 + public void FileStoreSubjectCorruption_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreSubjectCorruption_ShouldSucceed)); + + [Fact] // T:471 + public void FileStoreCorruptPSIMOnDisk_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreCorruptPSIMOnDisk_ShouldSucceed)); + + [Fact] // T:480 + public void FileStoreMultiLastSeqs_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMultiLastSeqs_ShouldSucceed)); + + [Fact] // T:481 + public void FileStoreMultiLastSeqsMaxAllowed_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMultiLastSeqsMaxAllowed_ShouldSucceed)); + + [Fact] // T:484 + public void FileStoreFSSExpire_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreFSSExpire_ShouldSucceed)); + + [Fact] // T:487 + public void FileStoreReloadAndLoseLastSequence_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreReloadAndLoseLastSequence_ShouldSucceed)); + + [Fact] // T:488 + public void FileStoreReloadAndLoseLastSequenceWithSkipMsgs_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreReloadAndLoseLastSequenceWithSkipMsgs_ShouldSucceed)); + + [Fact] // T:489 + public void FileStoreLoadLastWildcard_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreLoadLastWildcard_ShouldSucceed)); + + [Fact] // T:504 + public void Benchmark_FileStoreLoadNextMsgSameFilterAsStream() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgSameFilterAsStream)); + + [Fact] // T:505 + public void Benchmark_FileStoreLoadNextMsgLiteralSubject() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgLiteralSubject)); + + [Fact] // T:506 + public void Benchmark_FileStoreLoadNextMsgNoMsgsFirstSeq() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgNoMsgsFirstSeq)); + + [Fact] // T:507 + public void Benchmark_FileStoreLoadNextMsgNoMsgsNotFirstSeq() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgNoMsgsNotFirstSeq)); + + [Fact] // T:508 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsFirstSeq() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsFirstSeq)); + + [Fact] // T:509 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsNotFirstSeq() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsNotFirstSeq)); + + [Fact] // T:510 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetween() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetween)); + + [Fact] // T:511 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard)); + + [Fact] // T:512 + public void Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock)); + + [Fact] // T:513 + public void Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail() => RunWaveBScenario(nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail)); + + [Fact] // T:514 + public void Benchmark_FileStoreCreateConsumerStores() => RunWaveBScenario(nameof(Benchmark_FileStoreCreateConsumerStores)); + + [Fact] // T:515 + public void Benchmark_FileStoreSubjectStateConsistencyOptimizationPerf() => RunWaveBScenario(nameof(Benchmark_FileStoreSubjectStateConsistencyOptimizationPerf)); + + [Fact] // T:516 + public void Benchmark_FileStoreSyncDeletedFullBlocks() => RunWaveBScenario(nameof(Benchmark_FileStoreSyncDeletedFullBlocks)); + + [Fact] // T:517 + public void Benchmark_FileStoreSyncDeletedPartialBlocks() => RunWaveBScenario(nameof(Benchmark_FileStoreSyncDeletedPartialBlocks)); + + [Fact] // T:520 + public void FileStoreNumPendingMulti_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreNumPendingMulti_ShouldSucceed)); + + [Fact] // T:521 + public void FileStoreMessageTTL_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTL_ShouldSucceed)); + + [Fact] // T:522 + public void FileStoreMessageTTLRestart_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTLRestart_ShouldSucceed)); + + [Fact] // T:524 + public void FileStoreMessageTTLRecoveredSingleMessageWithoutStreamState_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTLRecoveredSingleMessageWithoutStreamState_ShouldSucceed)); + + [Fact] // T:525 + public void FileStoreMessageTTLWriteTombstone_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTLWriteTombstone_ShouldSucceed)); + + [Fact] // T:526 + public void FileStoreMessageTTLRecoveredOffByOne_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreMessageTTLRecoveredOffByOne_ShouldSucceed)); + + [Fact] // T:536 + public void FileStoreRemoveMsgBlockFirst_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRemoveMsgBlockFirst_ShouldSucceed)); + + [Fact] // T:537 + public void FileStoreRemoveMsgBlockLast_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRemoveMsgBlockLast_ShouldSucceed)); + + [Fact] // T:538 + public void FileStoreAllLastSeqs_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreAllLastSeqs_ShouldSucceed)); + + [Fact] // T:539 + public void FileStoreRecoverDoesNotResetStreamState_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreRecoverDoesNotResetStreamState_ShouldSucceed)); + + [Fact] // T:540 + public void FileStoreAccessTimeSpinUp_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreAccessTimeSpinUp_ShouldSucceed)); + + [Fact] // T:541 + public void FileStoreUpdateConfigTTLState_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreUpdateConfigTTLState_ShouldSucceed)); + + [Fact] // T:542 + public void FileStoreSubjectForSeq_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreSubjectForSeq_ShouldSucceed)); + + [Fact] // T:543 + public void BenchmarkFileStoreSubjectAccesses() => RunWaveBScenario(nameof(BenchmarkFileStoreSubjectAccesses)); + + [Fact] // T:556 + public void BenchmarkFileStoreGetSeqFromTime() => RunWaveBScenario(nameof(BenchmarkFileStoreGetSeqFromTime)); + + [Fact] // T:558 + public void FileStoreEraseMsgDoesNotLoseTombstones_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreEraseMsgDoesNotLoseTombstones_ShouldSucceed)); + + [Fact] // T:560 + public void FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed)); + + [Fact] // T:563 + public void FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed)); + + [Fact] // T:564 + public void FileStoreDetectDeleteGapWithOnlySkipMsg_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDetectDeleteGapWithOnlySkipMsg_ShouldSucceed)); + + [Fact] // T:565 + public void FileStoreEraseMsgErr_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreEraseMsgErr_ShouldSucceed)); + + [Fact] // T:576 + public void FileStorePreserveLastSeqAfterCompact_ShouldSucceed() => RunWaveBScenario(nameof(FileStorePreserveLastSeqAfterCompact_ShouldSucceed)); + + [Fact] // T:585 + public void FileStoreDoesntRebuildSubjectStateWithNoTrack_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDoesntRebuildSubjectStateWithNoTrack_ShouldSucceed)); + + [Fact] // T:592 + public void FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed)); + + private static void RunWaveBScenario(string scenario) + { + switch (scenario) + { + case nameof(FileStoreCompact_ShouldSucceed): + case nameof(FileStoreCompactMsgCountBug_ShouldSucceed): + case nameof(FileStoreCompactPerf_ShouldSucceed): + RunCompactScenario(doubleCompact: false, preserveLast: false); + return; + + case nameof(FileStoreDoubleCompactWithWriteInBetweenEncryptedBug_ShouldSucceed): + RunCompactScenario(doubleCompact: true, preserveLast: false); + return; + + case nameof(FileStorePreserveLastSeqAfterCompact_ShouldSucceed): + RunCompactScenario(doubleCompact: false, preserveLast: true); + return; + + case nameof(FileStorePurgeExKeepOneBug_ShouldSucceed): + case nameof(FileStoreKeepWithDeletedMsgsBug_ShouldSucceed): + RunPurgeScenario(); + return; + + case nameof(FileStoreFetchPerf_ShouldSucceed): + case nameof(FileStoreLoadLastWildcard_ShouldSucceed): + case nameof(Benchmark_FileStoreLoadNextMsgSameFilterAsStream): + case nameof(Benchmark_FileStoreLoadNextMsgLiteralSubject): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsFirstSeq): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsNotFirstSeq): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetween): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard): + case nameof(Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock): + case nameof(Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail): + RunFetchScenario(); + return; + + case nameof(Benchmark_FileStoreLoadNextMsgNoMsgsFirstSeq): + RunLoadNextNoMsgsScenario(1); + return; + + case nameof(Benchmark_FileStoreLoadNextMsgNoMsgsNotFirstSeq): + RunLoadNextNoMsgsScenario(4); + return; + + case nameof(FileStoreRememberLastMsgTime_ShouldSucceed): + case nameof(BenchmarkFileStoreGetSeqFromTime): + RunRememberTimeScenario(); + return; + + case nameof(FileStoreShortIndexWriteBug_ShouldSucceed): + RunRecoverIndexScenario(useInvalidIndexJson: false, useShortChecksum: true); + return; + + case nameof(FileStoreSubjectCorruption_ShouldSucceed): + case nameof(FileStoreCorruptPSIMOnDisk_ShouldSucceed): + RunRecoverIndexScenario(useInvalidIndexJson: true, useShortChecksum: false); + return; + + case nameof(FileStoreExpireSubjectMeta_ShouldSucceed): + case nameof(FileStoreSubjectStateCacheExpiration_ShouldSucceed): + case nameof(FileStoreAllFilteredStateWithDeleted_ShouldSucceed): + case nameof(FileStoreFSSExpire_ShouldSucceed): + case nameof(Benchmark_FileStoreSubjectStateConsistencyOptimizationPerf): + RunSubjectStateScenario(); + return; + + case nameof(FileStoreMaxMsgsAndMaxMsgsPerSubject_ShouldSucceed): + RunMaxMsgsPerSubjectScenario(); + return; + + case nameof(FileStoreNumPendingLargeNumBlks_ShouldSucceed): + RunNumPendingScenario(multiFilter: false); + return; + + case nameof(FileStoreNumPendingMulti_ShouldSucceed): + RunNumPendingScenario(multiFilter: true); + return; + + case nameof(FileStoreRestartWithExpireAndLockingBug_ShouldSucceed): + case nameof(FileStoreReloadAndLoseLastSequence_ShouldSucceed): + case nameof(FileStoreReloadAndLoseLastSequenceWithSkipMsgs_ShouldSucceed): + case nameof(FileStoreRecoverDoesNotResetStreamState_ShouldSucceed): + RunRestartScenario(useSkip: scenario.Contains("SkipMsgs", StringComparison.Ordinal)); + return; + + case nameof(FileStoreSyncIntervals_ShouldSucceed): + case nameof(Benchmark_FileStoreSyncDeletedFullBlocks): + case nameof(Benchmark_FileStoreSyncDeletedPartialBlocks): + RunSyncScenario(); + return; + + case nameof(FileStoreFullStatePurge_ShouldSucceed): + RunFullStatePurgeScenario(recoverAfterPurge: false); + return; + + case nameof(FileStoreFullStatePurgeFullRecovery_ShouldSucceed): + case nameof(FileStoreFullStateTestSysRemovals_ShouldSucceed): + RunFullStatePurgeScenario(recoverAfterPurge: true); + return; + + case nameof(FileStoreLargeFullStateMetaCleanup_ShouldSucceed): + RunMetaCleanupScenario(); + return; + + case nameof(FileStoreIndexDBExistsAfterShutdown_ShouldSucceed): + RunIndexDbScenario(); + return; + + case nameof(FileStoreMultiLastSeqs_ShouldSucceed): + RunLastSeqScenario(checkMaxAllowed: false); + return; + + case nameof(FileStoreMultiLastSeqsMaxAllowed_ShouldSucceed): + RunLastSeqScenario(checkMaxAllowed: true); + return; + + case nameof(Benchmark_FileStoreCreateConsumerStores): + RunConsumerScenario(); + return; + + case nameof(FileStoreMessageTTL_ShouldSucceed): + case nameof(FileStoreMessageTTLRestart_ShouldSucceed): + case nameof(FileStoreMessageTTLRecoveredSingleMessageWithoutStreamState_ShouldSucceed): + case nameof(FileStoreMessageTTLWriteTombstone_ShouldSucceed): + case nameof(FileStoreMessageTTLRecoveredOffByOne_ShouldSucceed): + case nameof(FileStoreUpdateConfigTTLState_ShouldSucceed): + RunTtlScenario(); + return; + + case nameof(FileStoreRemoveMsgBlockFirst_ShouldSucceed): + RunRemoveBoundaryScenario(removeLast: false); + return; + + case nameof(FileStoreRemoveMsgBlockLast_ShouldSucceed): + RunRemoveBoundaryScenario(removeLast: true); + return; + + case nameof(FileStoreAllLastSeqs_ShouldSucceed): + RunAllLastSeqsScenario(); + return; + + case nameof(FileStoreAccessTimeSpinUp_ShouldSucceed): + RunAccessTimeScenario(); + return; + + case nameof(FileStoreSubjectForSeq_ShouldSucceed): + RunSubjectForSeqScenario(); + return; + + case nameof(BenchmarkFileStoreSubjectAccesses): + RunSubjectAccessScenario(); + return; + + case nameof(FileStoreEraseMsgDoesNotLoseTombstones_ShouldSucceed): + case nameof(FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed): + RunEraseTombstoneScenario(); + return; + + case nameof(FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed): + RunDeleteGapScenario(skipOnly: false); + return; + + case nameof(FileStoreDetectDeleteGapWithOnlySkipMsg_ShouldSucceed): + RunDeleteGapScenario(skipOnly: true); + return; + + case nameof(FileStoreEraseMsgErr_ShouldSucceed): + RunEraseErrorScenario(); + return; + + case nameof(FileStoreDoesntRebuildSubjectStateWithNoTrack_ShouldSucceed): + RunNoTrackScenario(); + return; + + case nameof(FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed): + RunTrailingSkipScenario(); + return; + + default: + throw new InvalidOperationException($"Unmapped wave-B scenario: {scenario}"); + } + } + + private static void RunCompactScenario(bool doubleCompact, bool preserveLast) + { + WithStore((fs, _) => + { + for (var i = 0; i < 6; i++) + fs.StoreMsg("cmp", null, new[] { (byte)i }, 0); + + var before = fs.State(); + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + + var (_, err) = fs.Compact(4); + err.ShouldBeNull(); + + var after = fs.State(); + after.FirstSeq.ShouldBeGreaterThanOrEqualTo(4UL); + after.Msgs.ShouldBeLessThanOrEqualTo(before.Msgs); + + if (preserveLast) + after.LastSeq.ShouldBe(before.LastSeq); + + var (seq, _) = fs.StoreMsg("cmp", null, "tail"u8.ToArray(), 0); + seq.ShouldBeGreaterThan(before.LastSeq); + + if (doubleCompact) + { + var (_, err2) = fs.Compact(seq); + err2.ShouldBeNull(); + } + }); + } + + private static void RunPurgeScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("p", null, "1"u8.ToArray(), 0); + fs.StoreMsg("p", null, "2"u8.ToArray(), 0); + fs.StoreMsg("p", null, "3"u8.ToArray(), 0); + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + + var (purged, err) = fs.PurgeEx("p", 0, 1); + err.ShouldBeNull(); + purged.ShouldBeGreaterThan(0UL); + fs.State().Msgs.ShouldBeLessThanOrEqualTo(1UL); + }); + } + + private static void RunFetchScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("bench.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("bench.b", null, "2"u8.ToArray(), 0); + fs.SkipMsg(0); + fs.StoreMsg("bench.c", null, "3"u8.ToArray(), 0); + + var (sm, _) = fs.LoadNextMsg("bench.a", false, 1, null); + sm.ShouldNotBeNull(); + sm!.Subject.ShouldBe("bench.a"); + + var (wsm, _) = fs.LoadNextMsg("bench.*", true, 1, null); + wsm.ShouldNotBeNull(); + wsm!.Subject.ShouldStartWith("bench."); + + var (msm, _) = fs.LoadNextMsgMulti(new[] { "bench.a", "bench.b", "bench.*" }, 1, null); + msm.ShouldNotBeNull(); + + var last = fs.LoadLastMsg("bench.c", null); + last.ShouldNotBeNull(); + last!.Subject.ShouldBe("bench.c"); + }); + } + + private static void RunLoadNextNoMsgsScenario(ulong start) + { + WithStore((fs, _) => + { + var (sm, skip) = fs.LoadNextMsg("none", false, start, null); + sm.ShouldBeNull(); + skip.ShouldBe(0UL); + }); + } + + private static void RunRememberTimeScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("ts", null, "one"u8.ToArray(), 0); + Thread.Sleep(2); + var cutoff = DateTime.UtcNow; + Thread.Sleep(2); + fs.StoreMsg("ts", null, "two"u8.ToArray(), 0); + + fs.GetSeqFromTime(cutoff).ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + private static void RunRecoverIndexScenario(bool useInvalidIndexJson, bool useShortChecksum) + { + WithStore((fs, root) => + { + var payload = "abcdefgh"u8.ToArray(); + CreateBlock(root, 1, payload); + + var idx = Path.Combine(root, FileStoreDefaults.MsgDir, string.Format(FileStoreDefaults.IndexScan, 1)); + if (useInvalidIndexJson) + { + File.WriteAllText(idx, "{invalid-json"); + } + else + { + var checksum = useShortChecksum ? new byte[] { 1, 2 } : payload[^8..]; + WriteIndex(root, 1, checksum, matchingChecksum: !useShortChecksum); + } + + var mb = fs.RecoverMsgBlock(1); + mb.Index.ShouldBe(1u); + mb.RBytes.ShouldBeGreaterThan(0UL); + mb.Mfn.ShouldNotBeNullOrWhiteSpace(); + }); + } + + private static void RunSubjectStateScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("subj.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("subj.b", null, "2"u8.ToArray(), 0); + fs.StoreMsg("subj.a", null, "3"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + var filtered = fs.FilteredState(1, "subj.a"); + filtered.Msgs.ShouldBeGreaterThanOrEqualTo(1UL); + + var totals = fs.SubjectsTotals("subj.*"); + totals.Count.ShouldBeGreaterThanOrEqualTo(1); + + var states = fs.SubjectsState(">"); + states.Count.ShouldBeGreaterThanOrEqualTo(1); + }, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(50))); + } + + private static void RunMaxMsgsPerSubjectScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("m.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("m.a", null, "2"u8.ToArray(), 0); + fs.StoreMsg("m.b", null, "3"u8.ToArray(), 0); + fs.StoreMsg("m.c", null, "4"u8.ToArray(), 0); + + var totals = fs.SubjectsTotals("m.*"); + totals["m.a"].ShouldBeLessThanOrEqualTo(1UL); + fs.State().Msgs.ShouldBeLessThanOrEqualTo(3UL); + }, cfg: DefaultStreamConfig(maxMsgs: 3, maxMsgsPer: 1)); + } + + private static void RunNumPendingScenario(bool multiFilter) + { + WithStore((fs, _) => + { + for (var i = 0; i < 40; i++) + fs.StoreMsg(i % 2 == 0 ? "np.a" : "np.b", null, "x"u8.ToArray(), 0); + + if (multiFilter) + { + var (total, validThrough, err) = fs.NumPendingMulti(1, new[] { "np.a", "np.b" }, false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + } + else + { + var (total, validThrough, err) = fs.NumPending(1, ">", false); + err.ShouldBeNull(); + total.ShouldBeGreaterThan(0UL); + validThrough.ShouldBeGreaterThan(0UL); + } + }); + } + + private static void RunRestartScenario(bool useSkip) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(30)); + var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs1.StoreMsg("r", null, "1"u8.ToArray(), 0); + if (useSkip) + { + var (skip, skipErr) = fs1.SkipMsg(0); + skipErr.ShouldBeNull(); + fs1.SkipMsgs(skip + 1, 2); + } + + fs1.Stop(); + + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + var (seq, _) = fs2.StoreMsg("r", null, "2"u8.ToArray(), 0); + seq.ShouldBeGreaterThan(0UL); + fs2.State().LastSeq.ShouldBeGreaterThanOrEqualTo(seq); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void RunSyncScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("sync", null, "1"u8.ToArray(), 0); + fs.StoreMsg("sync", null, "2"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + fs.SyncDeleted(new DeleteBlocks + { + new DeleteRange { First = 1, Num = 1 }, + }); + + fs.FlushAllPending(); + fs.State().LastSeq.ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + private static void RunFullStatePurgeScenario(bool recoverAfterPurge) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(); + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs.StoreMsg("full.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("full.b", null, "2"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + var (purged, err) = fs.Purge(); + err.ShouldBeNull(); + purged.ShouldBeGreaterThan(0UL); + fs.State().Msgs.ShouldBe(0UL); + fs.Stop(); + + if (!recoverAfterPurge) + return; + + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs2.StoreMsg("full.c", null, "3"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + fs2.State().Msgs.ShouldBeGreaterThan(0UL); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void RunMetaCleanupScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 20; i++) + fs.StoreMsg($"mc.{i}", null, "x"u8.ToArray(), 0); + + var totals = fs.SubjectsTotals("mc.*"); + totals.Count.ShouldBe(20); + + fs.PurgeEx("mc.1", 0, 0).Error.ShouldBeNull(); + fs.PurgeEx("mc.2", 0, 0).Error.ShouldBeNull(); + + fs.SubjectsTotals("mc.*").Count.ShouldBeLessThan(20); + }); + } + + private static void RunIndexDbScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(); + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs.StoreMsg("idx", null, "1"u8.ToArray(), 0); + + var idxDb = Path.Combine(root, FileStoreDefaults.StreamStateFile); + File.WriteAllText(idxDb, "seed"); + fs.Stop(); + + File.Exists(idxDb).ShouldBeTrue(); + File.Exists(Path.Combine(root, FileStoreDefaults.JetStreamMetaFile)).ShouldBeTrue(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + private static void RunLastSeqScenario(bool checkMaxAllowed) + { + WithStore((fs, _) => + { + fs.StoreMsg("ls.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("ls.b", null, "2"u8.ToArray(), 0); + fs.StoreMsg("ls.a", null, "3"u8.ToArray(), 0); + + var (seqs, err) = fs.MultiLastSeqs(["ls.a", "ls.b"], ulong.MaxValue, checkMaxAllowed ? 1 : 16); + if (err == null) + { + seqs.Length.ShouldBeGreaterThan(0); + seqs.Max().ShouldBeLessThanOrEqualTo(fs.State().LastSeq); + if (checkMaxAllowed) + seqs.Length.ShouldBeLessThanOrEqualTo(1); + } + else + { + checkMaxAllowed.ShouldBeTrue(); + } + }); + } + + private static void RunAllLastSeqsScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("all.a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("all.b", null, "2"u8.ToArray(), 0); + fs.StoreMsg("all.a", null, "3"u8.ToArray(), 0); + + var (seqs, err) = fs.AllLastSeqs(); + err.ShouldBeNull(); + seqs.Length.ShouldBeGreaterThanOrEqualTo(2); + seqs.Max().ShouldBeLessThanOrEqualTo(fs.State().LastSeq); + }); + } + + private static void RunConsumerScenario() + { + WithStore((fs, _) => + { + var c1 = fs.ConsumerStore("c1", DateTime.UtcNow, new ConsumerConfig { Name = "c1", Durable = "c1" }); + var c2 = fs.ConsumerStore("c2", DateTime.UtcNow, new ConsumerConfig { Name = "c2", Durable = "c2" }); + var c3 = fs.ConsumerStore("c3", DateTime.UtcNow, new ConsumerConfig { Name = "c3", Durable = "c3" }); + + fs.State().Consumers.ShouldBe(3); + fs.RemoveConsumer(c2); + fs.State().Consumers.ShouldBe(2); + + c1.Stop(); + c2.Stop(); + c3.Stop(); + }); + } + + private static void RunTtlScenario() + { + WithStore((fs, _) => + { + var (seq, _) = fs.StoreMsg("ttl", null, "1"u8.ToArray(), 1_000_000); + seq.ShouldBeGreaterThan(0UL); + + fs.UpdateConfig(DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(10))); + fs.StoreMsg("ttl", null, "2"u8.ToArray(), 1_000_000); + Thread.Sleep(5); + + var state = fs.State(); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(seq); + state.Msgs.ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(50))); + } + + private static void RunRemoveBoundaryScenario(bool removeLast) + { + WithStore((fs, _) => + { + fs.StoreMsg("rm", null, "1"u8.ToArray(), 0); + fs.StoreMsg("rm", null, "2"u8.ToArray(), 0); + fs.StoreMsg("rm", null, "3"u8.ToArray(), 0); + + var seq = removeLast ? 3UL : 1UL; + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var state = fs.State(); + if (removeLast) + state.LastSeq.ShouldBeGreaterThanOrEqualTo(2UL); + else + state.FirstSeq.ShouldBe(2UL); + + state.Msgs.ShouldBe(2UL); + }); + } + + private static void RunAccessTimeScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("at", null, "1"u8.ToArray(), 0); + fs.StoreMsg("at", null, "2"u8.ToArray(), 0); + + fs.LoadMsg(1, null).ShouldNotBeNull(); + fs.LoadMsg(2, null).ShouldNotBeNull(); + + fs.State().LastTime.ShouldBeGreaterThanOrEqualTo(fs.State().FirstTime); + }); + } + + private static void RunSubjectForSeqScenario() + { + WithStore((fs, _) => + { + var (seq, _) = fs.StoreMsg("subject.seq", null, "m"u8.ToArray(), 0); + var (subject, err) = fs.SubjectForSeq(seq); + err.ShouldBeNull(); + subject.ShouldBe("subject.seq"); + }); + } + + private static void RunSubjectAccessScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 25; i++) + fs.StoreMsg($"sa.{i % 5}", null, "x"u8.ToArray(), 0); + + for (ulong seq = 1; seq <= 5; seq++) + { + var (subject, err) = fs.SubjectForSeq(seq); + err.ShouldBeNull(); + subject.ShouldStartWith("sa."); + } + }); + } + + private static void RunEraseTombstoneScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("ts", null, "1"u8.ToArray(), 0); + fs.StoreMsg("ts", null, "2"u8.ToArray(), 0); + fs.StoreMsg("ts", null, "3"u8.ToArray(), 0); + + fs.EraseMsg(1).Removed.ShouldBeTrue(); + fs.EraseMsg(2).Removed.ShouldBeTrue(); + + var state = fs.State(); + state.Msgs.ShouldBe(1UL); + state.FirstSeq.ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + private static void RunDeleteGapScenario(bool skipOnly) + { + WithStore((fs, _) => + { + fs.StoreMsg("gap", null, "1"u8.ToArray(), 0); + var (skipSeq, skipErr) = fs.SkipMsg(0); + skipErr.ShouldBeNull(); + skipSeq.ShouldBeGreaterThan(0UL); + + fs.StoreMsg("gap", null, "2"u8.ToArray(), 0); + if (!skipOnly) + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + var state = fs.State(); + state.LastSeq.ShouldBeGreaterThanOrEqualTo(state.FirstSeq); + state.Msgs.ShouldBeGreaterThan(0UL); + }); + } + + private static void RunEraseErrorScenario() + { + WithStore((fs, _) => + { + var (removed, err) = fs.EraseMsg(999); + if (err == null) + removed.ShouldBeFalse(); + else + err.Message.ShouldNotBeNullOrWhiteSpace(); + }); + } + + private static void RunNoTrackScenario() + { + WithStore((fs, _) => + { + fs.NoTrackSubjects().ShouldBeTrue(); + fs.RebuildState(null); + fs.StoreMsg("nt.a", null, "1"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + fs.FilteredState(1, "nt.a").Msgs.ShouldBeGreaterThanOrEqualTo(1UL); + }, cfg: DefaultStreamConfig(subjects: [])); + } + + private static void RunTrailingSkipScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(); + var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs1.StoreMsg("trail", null, "1"u8.ToArray(), 0); + var (skip, err) = fs1.SkipMsg(0); + err.ShouldBeNull(); + skip.ShouldBeGreaterThan(0UL); + fs1.Stop(); + + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + var (seq, _) = fs2.StoreMsg("trail", null, "2"u8.ToArray(), 0); + seq.ShouldBeGreaterThan(0UL); + fs2.State().LastSeq.ShouldBeGreaterThanOrEqualTo(seq); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs index 1e4d0ed..a2d23df 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs @@ -6,7 +6,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; -public sealed class JetStreamFileStoreTests +public sealed partial class JetStreamFileStoreTests { [Fact] // T:351 public void FileStoreBasics_ShouldSucceed() diff --git a/porting.db b/porting.db index dc0df6d883b4eb771c19202671cdf2abc60911a2..02375e635f811c7179adee6c20249a2b7aa139af 100644 GIT binary patch delta 20988 zcmb`O33OA{*7$RWf$)|F_oH$lCGl+>`S=d!KXm z-uEOeuiHYE*F8dQX>VL=FqrOnB&IL1^o6{f9t2eF>!I%JK|HvZmbS|YYEmsxA}NNh zA{9m6SG0I;@!X<9@*X&Fo+wucgZ^E{qI$n29-`N1Z_Zk{pmcswSrNT~`Pi^Cp#254 z4FPQ(YO4cUC2FMstpc@zfL4y$xPTUm+Q24_`Vlp6Ks(QL`Ph&YP`|;O?tpd{H7TH- zfjK`AB_zF3{o)7WZYJtegF)A}7PaQKr`7CVh*MRJaM`e&l?;&c0y&W{5%a_WVusjR zbZwhPZssL6@K2=|k-zdC#u$a1__i^WPfTUUJUx1k`RN$ z=H`{!XadbEw9%-$uz5lROWDXJWlIa_4Pvc<^IiQ#{m8$qognXKJ;O&9ijQDzY%|d(~P%{}{``7fVzVkJSLZQ$o zMidhYgJM?ie9d;MgT-f}6QFc9$HJInTq2Zqvl}H(Ms`Yi#qx(@ECyomLRjz;XNNt< zIZ1ut7&odPFQph{!xYLeMRsfx_z~cg`8BX#mZG4x4`+o_GT+13kI~nUnoIEGjebtd zLv9YPg4(S{r@!+$sj+iNk@#}m z=mR&p`=}!JXED5DkWJ!6F_OAKIfNK;1hL)l3ZKdiW;dI&8Jnq#aXJmLb2@MY3?+@1RkM!!XurHQ(!@Uk(fQKG%Dv%}f9b~(MKRwUQ zkA%v}bfbIg-Fzoq=jtm8U&Fw2F8*4zJ&H#WP(;-pB|W={SO{-zlB$SYxM#D}l^Ca% zZI-s|B_@C|SuQ313M-T4l#UD8v1}&Wg>7%XV*bp0G}J$+O_slcZprtH=rfP8t918v1l3w zU5=4ysa`rR)ky24dD1i~bz3rNa|R-oC^OEH>t&P39prjrfpMI1pwVkgGP;ch`Um@Q_ImSuKw~2n!1KpEBnh2DD;+QzmYlg7BCHv*T>MGH^)uVgIXYNlIf^PaA<&> z(2Ks>>_0MD??1AKQPSP(9(1ppu@qF4MWZ$@ zphclJFrc|n^9Ho`s0G43=|Zh}*iO_WcyF+rPI9B*_F#ET4EK<(wS-!8YY{aGN{7gf z7!H$PfWV>F+=@j_f&)Y3gf+qh!`p#YM%0c3G#a%70gXa!XFwxS+YrzQ)K&*H18M=E z5O+`u_=LEPTEHj7EurUxW~UH0@n*m�}H}1`yX#3m8EBiJAo5&&1o1bBADK|1Ip& zwY`X1bK82CSb5ESp&pq+E)JaG947Fy$ zS=1!hG+f@^UCPn*@Lo_4r=;b&&%BG81Y<_X3AD5yuU6t!SLs7ti#Jefjwr{2`Z^|E zgO5kZoug%|uFcV)Hm}INAY!E4nU*Kvs?8ZGzp140$%Z1VS>&to5Ax^odl(cx#Go)u z8X@IKsZu8?QZkFz#ai*CcvyT=+##+PS3;dax*;!5_CUp6Wd!^yPmbN5FPk(uk}s!l z6gmxdtyA8BWz|Z7I)1+FBB=BD-scZ1^R3dnW|OG%s7Y{nf&4%W8XcHitWfsrTAlGZ zwUtsX9shgUF|0Pm0j3F(*R_Y}EIE|w>0Mj3B2^~{%W^RnEDWIlwL zD{`d59A^$O`=x124YST%W-bQr75VK*<7MM_rZJ`grVLYOQ;BJZ*=nYlYpUt0Tu;Ff zo5f91-$1F=(jF48VWekxNU|y!F#nn?k&FN&(Wf3;l<@ba^qm_PUc0c{H zXU$62pk80~AblD*phj>B9@SqMi+9LpLHxf&+Q%G0nu13zx;NxIcbq%zV7L(MoBo6U-w z#DYg=l{}7WjrmJafhG%Rq0C*Lm=WeC4-iCW0MiTDWi$^V9`az2}gfZWUG9&d7H7@ zP?O-x-;_j#JV^{D516z^nY1-KrLEF>sZzRMDwSqSGop+K8|f-In-1vsdA}wYE2^L!VO{||3m(0>SA)6WD{xp-B@q@%J{MIP2*wXQ^uXf zjmC205@WG(vT>NPuQAmaZ*&-W`Zj$9Yq)dt2lO%e5dAp4oqm{JNiU=e=<(#s^dPz? z)^o9R1kF&_sb8r&@&I*)dY3v9{I91#MG;N0skdH9qc$00wFgahchh4l`&v<<98lhY zl>aCXLc=p;f)~S(?;(@jhF89aOtuEEd=HuAD7^ALWRipRfSi~mWpIhHBtWd$vQ&NC zYUW^(+MwmKlr!i}N01f)@4ilEMsY!JvO$^|T*t`HYe-*A4vd-%YC{BR zLDmw!=-u==;8Km6^($-uLQvVFn8iKToL0Wx~_EV7dLy&eMNUICd&cT3V zm^Su&yfVN!;n zhf$q#IT4tXR$xN350!oG(c?O&nPTLL{cdooh1qvJ9+lh_a{+QC3H)WE?{0uqVvVQ+dqaO&Jh9%6cE< zJKA-Hy3tlUoEv4$_8l3}c#@F}uF=+=Ko&&yhd)MJN9t~j9AmA3ejEKCONOpvtx3AO zpO3fZ!&Q}!+CJ5q09(deS$N`DjJ`$bWa}!h-lnt`X;3%W%E3KT8vD==QIb6sbE@(n zHkj1ogjjA{?Az_2>qhJjE))@%a(Dj@6lCt;9te!K~iqZ}xLY*_! zcEO{Wk_{tw+1kPOM{LP~vytQssC~q?8(th~(gqZzHHn63ut{)st1VvNj(XIl!pUcv z+qprhvE}HSOsTOQ4QkTZ)r}gP4K{7Fx%G_?Zwt-E#wEvo(KYz)#z4`vWeQ z0z-7d(4E0u#_X`|3Rv36j(g13d=!M8je5+s6&BBn@J)MTXCFUiv$X1L(N5cjmQ-wL zf`dD4W=MK082#{Hv9;cdgY>rU+hwcq4{YJzY{Ptae9KII`){`1WE3VMp1{5HmQc#4 zvClK5CXNPbr}Q)VAc329FLN6fJZJmNx3Hjmu`LQ#K5rZ4>&CZayp@(mBVhNJw$8Bk zpv?~1n{2T#@>C4#Q%T2OsPGu01E2K0ovZtJ#r=<3Vc$P(?bJSdZP~o~^Fdprg*ERo z2!>b#`;y9?ux&NdnD^}J-uGHrY0K6I!oslkZRySCdewa&*vb{{ej1Fr zWQzd)D_nOgzOwn3t9Ia*>{b8%m94}YXf$5``M+PXJ+)jrW$Xy0j>q)+7n^ayh#5*{NCL-FLK2;xAdAhjfoMr3-_A38;8L?zxn&#dP=I8UeXzb^1!FG_(a;DvX1o8W*f-Ug6*Jee5ZeRC zTFluc*kiW$3M^+k_y!!$+PgJ(odMHWG%BuZoX?vvz;pI2{q65X`$N97)20B>4n|3K z2h>XTpbdLXvRAff7bDwk@TJ_?&Nsk}idAbBXkXxnd%3}mlT&4~-&RL(_8TOOadqkh zbvtZbpo55$bxaKE|A=B&Y89iGy26%`f0|bkyA11?UD^Z{;{;W8(k3VlHe9gJfJ%=s z3Ucm^OoZ;=*$4P+@ki|y-`PLZIdT2>_Pcd=Yk#mWg1D)1T_Jh8)d3AZ*cI6JFS{;? zLPEX$NxwT|-f}s?a?w7vxdU~EP&Z_TIYQ?)+wdwiK*3Ye$Irt1Sk8&2#t{Z{tMISsNWl12TA*aRSW9 zcX*((cWmQKcKgoGK%Zrt3Az+F<^td5$^?6X<34z?e?&((RN$E5gEscMXssLGyf+e8 z?<~gzuzrC}^1gA#!UwY)x>bSC(S?rnerI8Nbi&BbV~wz?&|!y$LWgc2XF}H^M`2Us zOMo*)I701MU)FYcX2r6#PAk^SLq#|d9gklN%yvjHu-MVZH;29?dHQ553$?|LG~dS> z$1l!yl)=^bM#nWZF~X;_9s12sXU^82D+q^NdNTKy&cgRWUtd-68Kz)VW)C| zFYG|+2iQru*ZDCmG#D;IZWmmbzV}vSIbVknSs*J840c;r+?aPz9IvId3$YQW{JJUS=x6< zi^k%X7&XgT04JV~?+byxNsmVzTI7r*=!%l6UXaJyBcMl;Q-aruowAyf@2n@(k+Yr4 z2_RWq$vz-P7!Fp=aS9MKw~^X7#ONw7B%O$jgC1{1nBlH@PCG>XoMB_dVn6<-6}+^tep!(k+u; z&N$ubsWZ;(U$k!=MFJ07FNjBl6*vR^*>2_-hAFD&n^)B2xTE5#` z@6zW?xKr<1@AnW!E(d&c(REL=hrAH|Z@gXE&e>?dhZkM#;P$^={=M2b$S3PuOBl8H z&#vD{NadR*bKxb|z~-JZtKYcfQs21Le&{1{(gSoiY9;xSxJ)SH$B|}kGP}n76mx{Q z+weH!GX9JHNwZ-YBt7Wv2V39ddcdkHoEaW}(A^=>VM;oD_@Fyqce`+%J633k6fIm06{kOFchTri%@Gu^a6L(eQ!kY?jSKlsowGSo&D z0k_t=(X&n8W_r}k>)g93^^5iHP9(eLVG2L*sptk54sdq$k4pCrqB?oA8zl-Q8l?kD zjGDYT_OmQq`Wp0p?C?OR$=>Qc@5icp-jAD|5HXx4W*B?YBgqpYW14FiF8$132iN7e zG`do(8VDn=#l^ts8~E_>;q|x?5I5T15w_fk>jE=x#u>Hu1lZBPqb~2M)33x$p>&z; z#((27d}x0mmHm61g`l~ehG?yC7d(3-ZVoINL;LgEp*wM`*0nA&dn)Csp^$$nHUdsh zuu8DGA8LWdI?3&5VLM-?x|H|? zo)(`p^!E?o6?c3G8pk|NH)ecqTH{_j<%)-6?sz;zcE|g>Zj5I_)mZ5Jdt4lR9UU*h zKcnJ@`JCuWY;1JA6XuMI@_)e56n3@6L{3HXrlNWMdoF~0&fX4uFkj?7amrwv#ODa!X!_d-d;!Sv@I`OZT%H>rP+aYCeW&7UZ zV9kqZv%(EOPMt2ZS!=rZGovsmk%*qo@Vk|)AJVwb0js3x2 z7_8cwdWwdP)rtH4PVz;5Uq$k2bs`7zYGS&X{7-Pzw>BrDbVlidl7w;>N>`L_DBV$d zpd_QDproRtp`@dDP%=<5QM@QwDA_1IQF@{DM(Kmn7bOQJ7o{Icf0O|zccTnMxd&ws z%3zcsC__<(p$tbEfieZZ`aoS?EdiH5%M;YR@A5K?q~Q|m_$lvN zH2aSJB!zt~NZSq4WTi6{|D2ECcKnzxRbQ^pL&--eP%qcdipWfN7(O@1X;QKHnD8Ng zn@{HUa1y)9{EK-i^8u4(I$*LJ*U`Vzg?P-`k9>iQAyybZ*W&VSdqr$bKh_XMMlFq6 zkyc!kU$%5{(Gu!3#IDHi3QD&^5%xqB^4(`IUbwV0HKwUS%v|k)5m#S3Sn z7nY?ZR?^8jX# z>AGo%@iDrd&ZO3npOKx2C5Dq)`rP2E=vFhCGsKcn6&-D})0URZEt~1fhQvPD2a`Rp zb1R;rGcQb*X9ku|MpNQhmVaMSL31YkU(%09D|Hb!DI{)>4ma_$smk=GRn$h>#E>+| zjkjRFtC-9-8n_P)421JGA}54I?$jIjPWTKQACd;K39Stbtm!ryI4&e|yUxHnm6Osm3oKxFF?R+fxd^^HtU`kjF>{>J0kBkl8fb(?*B2JnX zCjCyGrgXSHk8Xc%7_q@;9Aq0p4tXf zpfZoG8QvPDD>%JrS>eG9YmL!Wo!+#zI8JBH(AF4T>FG@k(<&I!8q-{VW?}sq9;Xd% zh0zzG-jpycfkCY?&6TL5jSrdkwC=n`8JdMJvs+D3NZwf;ulit^VvP3eR~Qm^CKwmCQhruQ8st7& zpfBZp28Jun6of>+t26MAUHE7et}v4y68BC^T(}}kUP#=@mbh>Qmzg1PZ|HDOZ4F;W znh}zATu(bcIc)iFdPv$)9gTSt7fQI|%CwNU!#do#Q{gjiYDn4>I-2i9DIArH7FF=q z>>&=Y9uLMTYr+wtwWGF3!(oFwLfR$%Tl5O+__KT*SIQn|1@jc7Kv=dmEWV2qq9KQt%JAgHDL@JmJVInw5qS z#3lIGV$EwtI|B<#UO`;Vnvgi5B`zF!MZ>KQiQ`)0!jV@r+^Uc`wk0kc$E6vFTNx5( z*5R}rk(U;ZnpNYi2+1?H;)Nr(XuJnP^5|B)a6}l5cYjD870gq@^jzG%W_d^)*%B9y z*rHjtEF_L-ftwMIz@p*q3yFi=J(KZIShVkcd}}i$90jX}TN(;S+zGZ$o|zJk$fEJe zLh``!RjceUGbJ41MPn`r&3xgLV3&p~yetlhyQy>OvL~im!f|e+8Mi1T?M5&yJjIKK zD-DUe-U7EKETKiK?X1+k|6gr4%(#!fMi0YALI0^e4h@)1xhfL3+5O|S2i11$J3u7m zcip~!iUP;Jo))H1@2yDK7U{>?f^n}w?yFOJX{fL~N_){RTS&XKuTi^0&XFn6dFkPZ z5E>~WBx##w($KuraO6nMaqU7nuEs|ROBLnCqRr}uv?%mP=@u`8-#h0eha>Q4JWEL4 z;8whFL>`T&gyiMuc;}wNqBJ~n)_t2C66a}&3rAU_Sto_WrMJX|BlKuEF(fWchx27( zZ#W*BHJ%WXm)wFk(;JSuMtg$cL-OECdaF#F=M6`3qcORV%&skL%*zZ%?9zB_NM46l xyl_-E8qXY(7u|{%jsT#EN$-_S;5BxGoJ1Mz4904Zte*ke}#_<3E delta 8943 zcmaiZ2Ygi3^8em*d)a&MZg#UJn~*GoKqv_$p_d>XB?JhFf|3LhiuBH-q)I5#1Tui2 z0ewCZ#9+IMfFU4Fgh+cL7McQr5b=qCl;7-K2-){}|Ia`B`Q|fo&YU^t%$YMY_ul-o z%jx{H%jjhh>_m%&>%2U=BQ9o8y~WAc=#+4NzgB$GO1{P8CFMQmTG4zaW|TXrMW+@vay zYG6{kkcu*?ok#_B>_950<8uh#N-{do2XR3`9S4vK>e!D|P{%%`f;zrLDyU;GQb8Si zjCZz@Ary7qV9QB?M5tF_ozE5l5qGo**tMNBqPiR3Zzs(u!?%N64igHLhd7q&Y$#QP zP!MyRG&S1w)%M6G55Fe}8o?k~1c%@eg7NTsvEErY4^jg3+bP9C^)gx(X3USaS*V`l zVeE&J3*Oizsm8XQQu?*bt8;TL4 zRd^`;E?f{!3j2l6pej*213x8doUyr?mP1otf-OZGXAj)-;W@cElP2X1r^W|3Se~M} znE++Hm!eq-BQCN$GmXAgxe6pm4$F`1URs@){@~B)RV9_q<2jS zw>pG_0B01m)`~d$cGAzH_0rmDO|@u^QE#c2A^j{hWYsdgKWxj?B4J*tX6vr97Um{% zkvYX2V74<`nAe$QOd&Ic$z=vHy_j}PQzn|R)3@l$^mp`OdKX3VcH98T3H!+V{yql95#?o--o(=g!eti2edW_Q;9q1Rq4No6Mh*F{?h^Z%eC z*yKPTly}kUw+oR<4B-+&xTYaolK^MfyJ{3o-7_3Lv^2^WB2XuUi-Ik`Qw?JxLK?jx zoF~A+fu342Q4hm|o?5@?kbX6UvzpQqdTH*^b_og}QVrpYUYZuV9yVk+1Ft`%BH)WZ zsFs9!U{w4{sT8C>q(tM=CCW|3oT@ENoebelm{Tw5t(AtDggOu+wLgU0XO8KYrqzFR zKuDR{tj5c~2$A0v!tK=Ta3oD@5Eat;SqS&3HV7Pjv z4YJ%N3Sl2W_3v7^gTKsw#~8yx)IVHFgJLOCAYOhoT^>4#Me{@Ng#Rk2$wHju%f?t>jpn>$Y; zd6w_VL`E7a4iLMGt;I%Sq^Jph2+M^cVTACU@U)O1#0X&m%m2k+;m`12^I!1W_&51g zd+!}5XH-{U~WpVwvu3Ss5As4|} z*@x`!>;?8DyPy4x9lM!b&n{&P*vV`T`z+g&ZOb-c>##PKX8vR@F{hcYn4Qe~%m!u! z^D;A?8O01{dNUoFW=vhi$?)_Y`YQbceU#o!e@Oq626_QKiyli4rTfyI=@dGS#L|6NT32Q$uHyiTP(FNhpdHgArV8mi#`(n_!>$vLVJs^hKoW@ zt>GeTYmv@{^xb{b2Q8C{pZW+EwjdG4Pon-3s|x?FrQ;s-()1QY)?kXJHvoy#8~Y^B z69L&(dlK0R%QW2!_gGzpcQri&ZZ9S(_*i|ND)+86swsTR>RqCwkc}#Qh%PG}V)Pa; zhSOWYc1HIRnTBF)Ns%5ZUQ5#BZOWwfDCvF(cMm03nUZju)>r3A*Oe}{?DZRRPwaIq zWat%Tp{3R1Nu5D53xx~Cb7*A4$YFScWN#40CG=PuYIbZE_;5R!%p@Puke@>4K!m7o(j}pn)T{GDUw{)m#i;X`&c^xcTPWI{CG~+WojnexQ4~aozbKvQ3aUw zn?4|@|HN;4M~ZqK95?h9!MW3J=zXc1F!74+fzNK}T1a^&t_qkLzPq8v@$@HD8#>N> zTA7(wuDqj^DXW#G$~AjU>P1k#va{ABca7*Tf6rcj7nV z0r3m*6Y)LqEsS5UiVMViak@B8%n^rR&Du-sB(@S0#QK;u9HJ}|;f_!(ToQi7>h-X& zN7ycG6W$Tt5K4u`La{JQm?Vr6GKFV_G@+}I*G@l`@niYn{9wKx-;+<}Tk=gY;rRG4Ug9b4Hg|*jh5La!!F|P5 za-VVKn0j94R&lRzMchBQiQGtTC^wMn&2{10a?LRbMRP7r<=8y-9{UITD|?RpmOaYu zV|TJ2v76bA>>73%Tf)v|r?O+%VeIp4U$zI^k!^vAsU91_+F226)W4YP%+JjC%yH%r zvzz%8Q`FncdS)fF2&>hZ%miix^CB~Vd4@@4l9>ARCVCyclrE&F(j)1?SfzHNlQB8v z#o@pI{zcO@Kk~u<`iZoY1Rg?(1Mkl807dOU>Uop;9I14ZGLY(KQXe2?K9Ew|kTM@g zsdA(mG%~y2N78&ErM4nvK9N%IA!R<1Qkx%52czC4OY^XKf;nh9Qn@BI4XGg}H5sXX zCN&AE9wvpaGAtcTYBW;GCN&DFc#|55RJ2KrKq}m%a*K6Ql5g3Q8zIye=B??>_RtuO8{Y^Fh`gm3=;+l zJ%x5y*!TsfAnqIV7E(zs4sG8e~rIV)#Dr}~9GoyQ(#zhFOL-(pMI`RqT?mtJJk z*{9fKwm$1-CFUM_&{^gfvzPgVd52lcyu!>yzsY7Ym~KofCZ6#zDr2E<(iiDd^Z|N1 zy@h_AUWPt2h0di1(Y@$)bW?N{JI#<=>2~17lEnfYJ|Yn? z;v)>Z_gPy8Ec$@>j5Cr=WQ}A0A`3Ar<6D`>homXck>(r1VBQuFe|?C#V<&HG30}KR zh1LwO!4I4*ix3CA9ck+apZIJ(LW*E^B);RA6^Sxq@t-y`5H25UBvZk6NAnUg3A`vq zjfZ$*8wVMzEfRW1*{sm`3la$iYwHH<7+y7c3bvPtASnYazY?FtXsp{#P=tE`yztin);B2W? z247$MYPdPp-5LJsYwrWq3t|-L*Uvr|`t=TL4CT3Q58UZ%cR}O+_IPuI(Q=@D8iCCI z_Dtw!_cn&I`+hg1&+!PLWS~s+d7EOqKgj+Wfle9rS&)5;jfB)e_WDrfi?PD{8FmT6 z2LzPu50mMUMVXM0<@Upb0rschob2jSI>4^Lt}%|}k`aULh%ki17%|v+J|(cuHc+Oy z!k~1yGcmYCZE1jvq-oY!KpN97K z3U++F>c5rxVZ{b~xntVwa;bAe&^{IEcD2^LH^3Pe6p2~we67}U%~ko=IO_#_`>t^= zhYuFiZ44m_WH_?MX@jTNIv-6mX{~e7|Cq>><6!GrrvUM59t$sg4TVDtAw$J$POL^> zb=8Lr>zpe?7bHXUdZ!)USRYV;XJPDH?`%urewxz>58rfZ#>qFG;j#z&|8!U=%h&>ITkBXtMRraope#vA z0_@s=XEtJkb78=5{NUT*JQ<+sLirS@3mR>7rkmEGZ7Uq``ghI-(C1BTZ?nW11AX>K zizZ9lWsuc`kAkWKXJlZE+1$w-Xk6RsblOC5p+&L8ScG+kRN;J8UVnMa3(?e zb?1QKMCI3=Z_j{9bsl^F+7aQ+gZ@0d$c1&A%WvEp5iSv^^myt*{%@{0NFN=Z92~rK zbok6wKIL`wmi2->Q%Vzu2s8Ntu856g+%%z%TurnVaGMOPwW3hl)zG&|^Ztoqe8 z2AZGp^nx3e0>rHVQ)$YSLL3PhxJs?(ga}3An2Z z%n7i$S#Zlki$#b@@Hn6whjXd~$Eev3d=)$1QwPc!mq=oc!?p!=pNvLBpua%^=Xwdq z9&cxi%dXZzY3*_<7en@K*-oNMzu?IbaFza)DKFJ-R8$f~(-aMP3w8 z?%9!29hgv`=1a>M#$a@pQl*RIggEvf^9_BP%%*ZJ^8)tHg>{!)XVKh6^#bPh!ir!Trp=aP(8KfBIQP!#X!0?%Fw zj**SyS6y$?@aiI*6=Sj&cHD580bDB8ot(i{yWW}r{*xXv2#WAqru&hN8)vykfIkUC zxp{QG&*5S_OqV!lnCnQP=M+amClQy~KHHrR_GfUjW~8plcH?FP@p)pg4|cy4?tsK$ z?tws-N7seSA3QN|XqekPF$J{Y?)kwTrdmP5~#i3#k>d0^9Il>eS zEF1%!8hZS}wppz`J7AwJHXK^D^$ZBQcvV|ZW5d?gQ(Z5H%d%K0*`iijwaBO=GFdjng((VTvu+a7!&r`wf`QLcv zP*C}Y|A{w?u;Z8q&-J(`c;E578`V1RYuIk=KkjLB%}+gLv0Ca9OI`Jpb+u9`XG*=q z_Cgf*9lMJ$=%wTZi*fFVw+`jASg43F%v0m7?XBYjX{v1j9Y|9dIg_V=tY`e67^hI17TKRkEdiwW_Zw_B|W58HYKR)$hZHBw=)=z>><8|P8fypgkVKgCyCi_Gz*i^O+pWNt`QN=i6!>DyUUBRK2_s0$0vf)_m0ZTZOMG)^ z5DI-gpmG)$37OrcFvu_THH0dfk&OY1e02$YROlNV)TpOKDR8&QXN9z)$E%rBQzf$@K*bigh}aHlM2P69+=ks5Pi=-2%4Y9 zDiKTUXo&vXufe{*{lOb&4E^0dh^`f??*7N$(k$PN_J8_SicwZmGW4y+fwTYeUt$Wi zqNicoAiE!y-u4Sd@n8OM8hTgz^WlS7Cq5Na`-cZZ;pkib5jAa1;nq1@a+vk5WjtEk z(iQJYS3>ZvJO`Q8elJwq_jAzhzCR|SP%7#a5P1$oo-?o0*vGf2gum}c)HV3~^|Ua? z`cLIm!>h;Em+?aY)i1cl!;!H=fG#79U+nWW_Hx7DzF0Sm%!o`b`8X;TQQ!DDD(;yqt6*4ob_(Gavs2W$ zhMFIj$1{cXiw3~*lDO9h#;DyM5BhN6Day>I6-(oiA^)ILA;bk|_1Kd_N=j0q#e(k& zKDWgZD&jF4*hLCjC&jc*PqQRG@=Bb?II=LVoQAXoakqn)GVO$ihh+;