From ced3e7e9b8f5c6b13317c1ff49c9317d1e01b6ff Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 17:45:04 -0500 Subject: [PATCH] feat(batch15): complete group 3 msgblock/consumerfilestore --- .../JetStream/MessageBlock.cs | 542 ++++++++++++- .../JetStreamFileStoreTests.Impltests.cs | 747 ++++++++++++++++++ porting.db | Bin 6639616 -> 6643712 bytes reports/current.md | 12 +- 4 files changed, 1271 insertions(+), 30 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index d2a5259..ae856a0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -14,6 +14,7 @@ // Adapted from server/filestore.go (msgBlock struct and consumerFileStore struct) using System.Text.Json; +using System.Text; using System.Threading.Channels; using System.Buffers.Binary; using System.Security.Cryptography; @@ -261,7 +262,33 @@ internal sealed class MessageBlock /// When true, simulates a write failure. Used by unit tests only. public bool MockWriteErr { get; set; } - internal void CloseFDs() + internal Exception? CloseFDs() + { + if (Mu.IsWriteLockHeld) + return CloseFDsLocked(); + + Mu.EnterWriteLock(); + try + { + return CloseFDsLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal Exception? CloseFDsLocked() + { + var (pending, pendingErr) = BytesPending(); + if (pendingErr == null && pending != null && pending.Length > 0) + return new InvalidOperationException("pending data"); + + CloseFDsLockedNoCheck(); + return null; + } + + internal void CloseFDsLockedNoCheck() { Mfd?.Dispose(); Mfd = null; @@ -795,6 +822,253 @@ internal sealed class MessageBlock return null; } + internal void ClearCache() + { + if (Ctmr != null) + { + var sinceLast = SinceLastActivity(); + if (Fss == null || sinceLast >= Fexp) + { + Fss = null; + Ctmr.Dispose(); + Ctmr = null; + } + else + { + ResetCacheExpireTimerLocked(Fexp - sinceLast); + } + } + + var cache = CacheData; + if (cache == null) + return; + + var buf = cache.Buf; + CacheData = null; + if (!cache.Nra && buf.Length > 0) + JetStreamFileStore.RecycleMsgBlockBuf(buf); + } + + internal void ExpireCache() + { + Mu.EnterWriteLock(); + try + { + ExpireCacheLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal void TryForceExpireCache() + { + Mu.EnterWriteLock(); + try + { + TryForceExpireCacheLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal byte[]? TryExpireWriteCache() + { + if (CacheData == null) + return null; + + var lwts = Lwts; + var llts = Llts; + var prior = CacheData.Buf; + var priorNra = CacheData.Nra; + + Lwts = 0; + CacheData.Nra = true; + ExpireCacheLocked(); + + Lwts = lwts; + if (CacheData != null) + CacheData.Nra = priorNra; + + if (llts == 0 && CacheData?.Buf is null or { Length: 0 }) + { + Lwts = 0; + return Array.Empty(); + } + + return null; + } + + internal void ExpireCacheLocked() + { + var cache = CacheData; + if (cache == null && Fss == null) + { + if (Ctmr != null) + { + Ctmr.Dispose(); + Ctmr = null; + } + return; + } + + if (cache != null && PendingWriteSizeLocked() > 0) + { + ResetCacheExpireTimerLocked(Cexp); + return; + } + + var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + var bufts = Math.Max(Llts, Lwts); + var cacheNs = TimeSpanToNanoseconds(Cexp); + if (cacheNs > 0 && now - bufts <= cacheNs) + { + ResetCacheExpireTimerLocked(NanosecondsToTimeSpan(cacheNs - (now - bufts))); + return; + } + + if (cache != null) + { + if (!cache.Nra && cache.Buf.Length > 0) + JetStreamFileStore.RecycleMsgBlockBuf(cache.Buf); + + cache.Buf = Array.Empty(); + cache.Idx = Array.Empty(); + cache.Wp = 0; + } + + ClearCache(); + } + + internal Exception? EnableForWriting(bool fip) + { + if (Mfd != null) + return null; + + try + { + Mfd = new FileStream(Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + if (!fip) + SpinUpFlushLoopLocked(); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal Exception? WriteTombstone(ulong seq, long ts) + => WriteMsgRecord(0, seq | FileStoreDefaults.Tbit, string.Empty, null, null, ts, true); + + internal Exception? WriteTombstoneNoFlush(ulong seq, long ts) + { + Mu.EnterWriteLock(); + try + { + return WriteMsgRecordLocked(0, seq | FileStoreDefaults.Tbit, string.Empty, null, null, ts, flush: false, kick: false); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal Exception? WriteMsgRecord(ulong rl, ulong seq, string subj, byte[]? mhdr, byte[]? msg, long ts, bool flush) + { + Mu.EnterWriteLock(); + try + { + return WriteMsgRecordLocked(rl, seq, subj, mhdr, msg, ts, flush, kick: true); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal Exception? WriteMsgRecordLocked(ulong rl, ulong seq, string subj, byte[]? mhdr, byte[]? msg, long ts, bool flush, bool kick) + { + var enableErr = EnableForWriting(flush && kick); + if (enableErr != null) + return enableErr; + + var cacheErr = SetupWriteCache(); + if (cacheErr != null) + return cacheErr; + + var cache = CacheData; + if (cache == null) + return new InvalidOperationException("cache was not initialized"); + + mhdr ??= Array.Empty(); + msg ??= Array.Empty(); + rl = rl == 0 ? (ulong)CalculateRecordLength(subj, mhdr, msg) : rl; + + var index = cache.Buf.Length; + var record = BuildMessageRecord(subj, mhdr, msg, seq, ts); + cache.Buf = AppendBuffer(cache.Buf, record); + Lwts = ts; + NeedSync = true; + Lchk = record.AsSpan(record.Length - FileStoreDefaults.RecordHashSize).ToArray(); + + var isTombstone = (seq & FileStoreDefaults.Tbit) != 0; + if (!isTombstone) + { + var last = Last.Seq; + UpdateAccounting(seq, ts, rl); + seq &= ~FileStoreDefaults.Ebit; + + if (cache.Idx.Length > 0 && last + 1 < seq) + { + var fill = (int)Math.Min(int.MaxValue, seq - (last + 1)); + if (fill > 0) + { + var next = new uint[cache.Idx.Length + fill]; + cache.Idx.AsSpan().CopyTo(next); + for (var i = cache.Idx.Length; i < next.Length; i++) + next[i] = FileStoreDefaults.Dbit; + cache.Idx = next; + } + } + + var slot = (uint)index | FileStoreDefaults.Cbit; + var expanded = new uint[cache.Idx.Length + 1]; + cache.Idx.AsSpan().CopyTo(expanded); + expanded[^1] = slot; + cache.Idx = expanded; + if (cache.Idx.Length == 1) + cache.Fseq = seq; + } + else + { + var tombstoneSeq = seq & ~FileStoreDefaults.Tbit; + if (Msgs == 0 && tombstoneSeq > Last.Seq) + { + Last = new MsgId { Seq = tombstoneSeq, Ts = ts }; + First = new MsgId { Seq = tombstoneSeq + 1, Ts = 0 }; + } + RBytes += rl; + Dmap.Insert(tombstoneSeq); + } + + if (flush || Werr != null) + { + var flushErr = FlushPendingToDiskLocked(); + if (flushErr != null) + return flushErr; + } + else if (kick) + { + JetStreamFileStore.KickFlusher(Fch); + } + + return null; + } + internal void FinishedWithCache() { if (CacheData != null && PendingWriteSizeLocked() == 0) @@ -1003,10 +1277,7 @@ internal sealed class MessageBlock Mu.EnterWriteLock(); try { - if (Ctmr == null) - StartCacheExpireTimer(); - else - _ = Ctmr.Change(Cexp, Timeout.InfiniteTimeSpan); + ResetCacheExpireTimerLocked(Cexp); } finally { @@ -1019,20 +1290,7 @@ internal sealed class MessageBlock if (Cexp <= TimeSpan.Zero) return; - Ctmr?.Dispose(); - Ctmr = new Timer(_ => - { - Mu.EnterWriteLock(); - try - { - if (!Closed) - TryForceExpireCacheLocked(); - } - finally - { - Mu.ExitWriteLock(); - } - }, null, Cexp, Timeout.InfiniteTimeSpan); + ResetCacheExpireTimerLocked(Cexp); } internal void ClearCacheAndOffset() @@ -1080,14 +1338,247 @@ internal sealed class MessageBlock internal ulong PendingWriteSizeLocked() { + if (Closed || Mfd == null) + return 0; + var cache = CacheData; - if (cache == null) + if (cache == null || cache.Buf.Length <= cache.Wp) return 0; var wp = Math.Clamp(cache.Wp, 0, cache.Buf.Length); return (ulong)(cache.Buf.Length - wp); } + internal (byte[]? Buffer, Exception? Error) BytesPending() + { + if (Mfd == null) + return (null, new InvalidOperationException("no pending data")); + + var cache = CacheData; + if (cache == null) + return (null, new InvalidOperationException("no cache")); + + if (cache.Buf.Length <= cache.Wp) + return (null, new InvalidOperationException("no pending data")); + + var wp = Math.Clamp(cache.Wp, 0, cache.Buf.Length); + var pending = cache.Buf[wp..]; + return pending.Length == 0 + ? (null, new InvalidOperationException("no pending data")) + : (pending, null); + } + + internal ulong BlkSize() + { + Mu.EnterReadLock(); + try + { + return RBytes; + } + finally + { + Mu.ExitReadLock(); + } + } + + internal void UpdateAccounting(ulong seq, long ts, ulong rl) + { + var isDeleted = (seq & FileStoreDefaults.Ebit) != 0; + if (isDeleted) + seq &= ~FileStoreDefaults.Ebit; + + var fseq = First.Seq; + if ((fseq == 0 || First.Ts == 0) && seq >= fseq) + First = new MsgId { Seq = seq, Ts = ts }; + + Last = new MsgId { Seq = seq, Ts = ts }; + RBytes += rl; + if (!isDeleted) + { + Bytes += rl; + Msgs++; + } + } + + internal Exception? RecompressOnDiskIfNeeded() + { + byte[] original; + try + { + original = File.Exists(Mfn) ? File.ReadAllBytes(Mfn) : Array.Empty(); + } + catch (Exception ex) + { + return new IOException("failed to read original block from disk", ex); + } + + if (original.Length == 0) + return null; + + var checkErr = CheckAndLoadEncryption(); + if (checkErr != null) + return checkErr; + + if (Bek != null) + Bek.XorKeyStream(original.AsSpan()); + + var desired = Cmp; + var meta = new CompressionInfo(); + var consumed = meta.UnmarshalMetadata(original); + if (consumed > 0 && meta.Type == desired) + return null; + + byte[] rewritten; + if (desired == StoreCompression.NoCompression && consumed > 0) + { + if (meta.Type != StoreCompression.NoCompression) + return new NotSupportedException("compressed block decompression is not available"); + + rewritten = original[consumed..]; + } + else + { + // Compression transforms are introduced separately with StoreCompression helpers. + rewritten = original; + } + + if (Bek != null) + Bek.XorKeyStream(rewritten.AsSpan()); + + try + { + var tmp = $"{Mfn}.tmp"; + File.WriteAllBytes(tmp, rewritten); + File.Move(tmp, Mfn, overwrite: true); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + private Exception? FlushPendingToDiskLocked() + { + var (pending, pendingErr) = BytesPending(); + if (pendingErr != null || pending == null || pending.Length == 0) + return pendingErr; + + try + { + Mfd ??= new FileStream(Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); + Mfd.Seek(0, SeekOrigin.End); + Mfd.Write(pending, 0, pending.Length); + Mfd.Flush(SyncAlways); + NeedSync = false; + + if (CacheData != null) + CacheData.Wp = CacheData.Buf.Length; + + return null; + } + catch (Exception ex) + { + Werr = ex; + return ex; + } + } + + private void ResetCacheExpireTimerLocked(TimeSpan due) + { + if (Cexp <= TimeSpan.Zero) + return; + + if (due <= TimeSpan.Zero) + due = TimeSpan.FromMilliseconds(1); + + if (Ctmr == null) + { + Ctmr = new Timer(_ => + { + Mu.EnterWriteLock(); + try + { + if (!Closed) + ExpireCacheLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + }, null, due, Timeout.InfiniteTimeSpan); + } + else + { + _ = Ctmr.Change(due, Timeout.InfiniteTimeSpan); + } + } + + private TimeSpan SinceLastActivity() + { + var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + var last = Math.Max(Math.Max(Lrts, Llts), Math.Max(Lwts, Lsts)); + var delta = now - last; + return NanosecondsToTimeSpan(delta); + } + + private static long TimeSpanToNanoseconds(TimeSpan value) + => value <= TimeSpan.Zero ? 0 : checked(value.Ticks * 100L); + + private static TimeSpan NanosecondsToTimeSpan(long value) + { + if (value <= 0) + return TimeSpan.Zero; + + return TimeSpan.FromTicks(Math.Max(1L, value / 100L)); + } + + private static int CalculateRecordLength(string subject, byte[] mhdr, byte[] msg) + => (8 + 8 + 4 + 4 + 4) + Encoding.UTF8.GetByteCount(subject) + mhdr.Length + msg.Length + FileStoreDefaults.RecordHashSize; + + private static byte[] BuildMessageRecord(string subject, byte[] mhdr, byte[] msg, ulong seq, long ts) + { + var subjectBytes = Encoding.UTF8.GetBytes(subject); + var bodyLength = (8 + 8 + 4 + 4 + 4) + subjectBytes.Length + mhdr.Length + msg.Length; + var record = new byte[bodyLength + FileStoreDefaults.RecordHashSize]; + var span = record.AsSpan(); + + var index = 0; + BinaryPrimitives.WriteUInt64LittleEndian(span[index..], seq); + index += 8; + BinaryPrimitives.WriteInt64LittleEndian(span[index..], ts); + index += 8; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], subjectBytes.Length); + index += 4; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], mhdr.Length); + index += 4; + BinaryPrimitives.WriteInt32LittleEndian(span[index..], msg.Length); + index += 4; + + subjectBytes.CopyTo(span[index..]); + index += subjectBytes.Length; + mhdr.CopyTo(span[index..]); + index += mhdr.Length; + msg.CopyTo(span[index..]); + index += msg.Length; + + var checksum = SHA256.HashData(span[..index]); + checksum.AsSpan(0, FileStoreDefaults.RecordHashSize).CopyTo(span[index..]); + return record; + } + + private static byte[] AppendBuffer(byte[] source, byte[] append) + { + if (append.Length == 0) + return source; + + var merged = new byte[source.Length + append.Length]; + if (source.Length > 0) + Buffer.BlockCopy(source, 0, merged, 0, source.Length); + Buffer.BlockCopy(append, 0, merged, source.Length, append.Length); + return merged; + } + private static bool HasChecksum(byte[]? checksum) { if (checksum == null || checksum.Length != FileStoreDefaults.RecordHashSize) @@ -1212,11 +1703,14 @@ internal sealed class MessageBlock internal void TryForceExpireCacheLocked() { - if (CacheData?.Buf is { Length: > 0 } buf) - JetStreamFileStore.RecycleMsgBlockBuf(buf); - - CacheData = null; + var llts = Llts; + var lwts = Lwts; + Llts = 0; + Lwts = 0; + ExpireCacheLocked(); Fss = null; + Llts = llts; + Lwts = lwts; } // Lock should be held. diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index 1b493db..8c2cd1e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -2,6 +2,7 @@ using System.Reflection; using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; @@ -187,6 +188,12 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:560 public void FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTombstonesNoFirstSeqRollback_ShouldSucceed)); + [Fact] // T:559 + public void FileStoreEraseMsgDoesNotLoseTombstonesInEmptyBlock_ShouldSucceed() => RunEraseMsgDoesNotLoseTombstonesInEmptyBlockScenario(); + + [Fact] // T:562 + public void FileStoreTombstonesSelectNextFirstCleanupOnRecovery_ShouldSucceed() => RunTombstonesSelectNextFirstCleanupOnRecoveryScenario(); + [Fact] // T:563 public void FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreDetectDeleteGapWithLastSkipMsg_ShouldSucceed)); @@ -196,6 +203,54 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:565 public void FileStoreEraseMsgErr_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreEraseMsgErr_ShouldSucceed)); + [Fact] // T:570 + public void FileStoreMissingDeletesAfterCompact_ShouldSucceed() => RunMissingDeletesAfterCompactScenario(); + + [Fact] // T:571 + public void FileStoreIdxAccountingForSkipMsgs_ShouldSucceed() => RunIdxAccountingForSkipMsgsScenario(); + + [Fact] // T:573 + public void FileStoreCompactTombstonesBelowFirstSeq_ShouldSucceed() => RunCompactTombstonesBelowFirstSeqScenario(); + + [Fact] // T:574 + public void FileStoreSyncBlocksFlushesAndSyncsMessages_ShouldSucceed() => RunSyncBlocksFlushesAndSyncsMessagesScenario(); + + [Fact] // T:577 + public void FileStoreSkipMsgAndCompactRequiresAppend_ShouldSucceed() => RunSkipMsgAndCompactRequiresAppendScenario(); + + [Fact] // T:578 + public void FileStoreCompactRewritesFileWithSwap_ShouldSucceed() => RunCompactRewritesFileWithSwapScenario(); + + [Fact] // T:579 + public void FileStoreIndexCacheBufIdxMismatch_ShouldSucceed() => RunIndexCacheBufIdxMismatchScenario(); + + [Fact] // T:580 + public void FileStoreIndexCacheBufTombstoneMismatch_ShouldSucceed() => RunIndexCacheBufTombstoneMismatchScenario(); + + [Fact] // T:581 + public void FileStoreIndexCacheBufTombstoneMismatchAfterCompact_ShouldSucceed() => RunIndexCacheBufTombstoneMismatchAfterCompactScenario(); + + [Fact] // T:582 + public void FileStoreIndexCacheBufEraseMsgMismatch_ShouldSucceed() => RunIndexCacheBufEraseMsgMismatchScenario(); + + [Fact] // T:583 + public void FileStoreCompactRestoresLastSeq_ShouldSucceed() => RunCompactRestoresLastSeqScenario(); + + [Fact] // T:584 + public void FileStoreCompactFullyResetsFirstAndLastSeq_ShouldSucceed() => RunCompactFullyResetsFirstAndLastSeqScenario(); + + [Fact] // T:591 + public void FileStoreDeleteBlocksWithManyEmptyBlocks_ShouldSucceed() => RunDeleteBlocksWithManyEmptyBlocksScenario(); + + [Fact] // T:595 + public void FileStoreRemoveMsgsInRangePartialBlocks_ShouldSucceed() => RunRemoveMsgsInRangePartialBlocksScenario(); + + [Fact] // T:596 + public void FileStoreRemoveMsgsInRangeWithTombstones_ShouldSucceed() => RunRemoveMsgsInRangeWithTombstonesScenario(); + + [Fact] // T:597 + public void FileStoreCorrectChecksumAfterTruncate_ShouldSucceed() => RunCorrectChecksumAfterTruncateScenario(); + [Fact] // T:576 public void FileStorePreserveLastSeqAfterCompact_ShouldSucceed() => RunWaveBScenario(nameof(FileStorePreserveLastSeqAfterCompact_ShouldSucceed)); @@ -1434,6 +1489,698 @@ public sealed partial class JetStreamFileStoreTests } } + private static MessageBlock GetFirstMsgBlock(JetStreamFileStore fs) + { + var blks = GetPrivateField>(fs, "_blks"); + blks.Count.ShouldBeGreaterThan(0); + return blks[0]; + } + + private static void StoreRawMsgWithSequence(JetStreamFileStore fs, ulong seq, byte[] msg) + { + var mu = GetPrivateField(fs, "_mu"); + var ts = (long)seq; + mu.EnterWriteLock(); + try + { + var err = InvokePrivate(fs, "StoreRawMsgInternal", "foo", Array.Empty(), msg, seq, ts, 0L, false); + err.ShouldBeNull(); + } + finally + { + mu.ExitWriteLock(); + } + } + + private static void RemoveMsgWithFileState(JetStreamFileStore fs, ulong seq, bool secure = false, bool viaLimits = false) + { + var (removed, err) = InvokePrivate<(bool Removed, Exception? Error)>(fs, "RemoveMsgInternal", seq, secure, viaLimits, true); + removed.ShouldBeTrue(); + err.ShouldBeNull(); + } + + private static void RunEraseMsgDoesNotLoseTombstonesInEmptyBlockScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + try + { + var cfg = DefaultStreamConfig(subjects: ["foo"]); + var fcfg = new FileStoreConfig + { + StoreDir = root, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }; + + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(1UL); + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(2UL); + + var (_, rollErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + rollErr.ShouldBeNull(); + + var secret = "secret!"u8.ToArray(); + fs.StoreMsg("foo", null, secret, 0).Seq.ShouldBe(3UL); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.EraseMsg(3).Removed.ShouldBeTrue(); + + var before = fs.State(); + before.Msgs.ShouldBe(1UL); + before.FirstSeq.ShouldBe(1UL); + before.LastSeq.ShouldBe(3UL); + (before.Deleted ?? Array.Empty()).ShouldContain(2UL); + (before.Deleted ?? Array.Empty()).ShouldContain(3UL); + + Should.Throw(() => fs.LoadMsg(2, null)); + Should.Throw(() => fs.LoadMsg(3, null)); + + var msgDir = Path.Combine(root, FileStoreDefaults.MsgDir); + foreach (var blk in Directory.GetFiles(msgDir, "*.blk")) + { + var data = File.ReadAllBytes(blk); + data.AsSpan().IndexOf(secret).ShouldBe(-1); + } + + fs.Stop(); + fs = null; + + File.Delete(Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile)); + + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + Should.Throw(() => fs.LoadMsg(2, null)); + Should.Throw(() => fs.LoadMsg(3, null)); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + private static void RunTombstonesSelectNextFirstCleanupOnRecoveryScenario() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + JetStreamFileStore? fs = null; + try + { + var cfg = DefaultStreamConfig(subjects: ["foo"]); + var fcfg = new FileStoreConfig + { + StoreDir = root, + BlockSize = 330, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }; + + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + + for (var i = 0; i < 50; i++) + fs.StoreMsg("foo", null, null, 0); + + for (ulong seq = 2; seq <= 49; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var (_, rollErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + rollErr.ShouldBeNull(); + + for (var i = 0; i < 50; i++) + fs.StoreMsg("foo", null, null, 0); + + for (ulong seq = 50; seq <= 100; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + var before = fs.State(); + before.Msgs.ShouldBe(1UL); + before.FirstSeq.ShouldBe(1UL); + before.LastSeq.ShouldBe(100UL); + + var tombErr = InvokePrivate(fs, "WriteTombstone", 1UL, 0L); + tombErr.ShouldBeNull(); + + fs.Stop(); + fs = null; + File.Delete(Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile)); + + fs = JetStreamFileStore.NewFileStore(fcfg, cfg); + Should.Throw(() => fs.LoadMsg(1, null)); + } + finally + { + fs?.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + private static void RunMissingDeletesAfterCompactScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 6; i++) + fs.StoreMsg("foo", null, null, 0); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + fs.RemoveMsg(4).Removed.ShouldBeTrue(); + fs.RemoveMsg(6).Removed.ShouldBeTrue(); + + var (_, rollErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + rollErr.ShouldBeNull(); + + var fmb = GetFirstMsgBlock(fs); + fmb.CompactWithFloor(0).ShouldBeNull(); + fmb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = fmb.RebuildState(); + rebuildErr.ShouldBeNull(); + fmb.Msgs.ShouldBeGreaterThanOrEqualTo(0UL); + + fs.RemoveMsg(5).Removed.ShouldBeTrue(); + fmb.CompactWithFloor(0).ShouldBeNull(); + fmb.ClearCacheAndOffset(); + var rebuild2 = fmb.RebuildState(); + rebuildErr = rebuild2.Error; + rebuildErr.ShouldBeNull(); + fmb.Msgs.ShouldBeGreaterThanOrEqualTo(0UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIdxAccountingForSkipMsgsScenario() + { + static void RunCase(bool skipMany) + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(1UL); + + if (skipMany) + { + fs.SkipMsgs(2, 10); + } + else + { + for (var i = 0; i < 10; i++) + fs.SkipMsg((ulong)(i + 2)).Error.ShouldBeNull(); + } + + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(12UL); + + var state = fs.State(); + state.LastSeq.ShouldBe(12UL); + state.Msgs.ShouldBe(2UL); + state.NumDeleted.ShouldBeGreaterThanOrEqualTo(10); + + fs.LoadMsg(1, null).ShouldNotBeNull(); + fs.LoadMsg(12, null).ShouldNotBeNull(); + + for (ulong seq = 2; seq <= 11; seq++) + Should.Throw(() => fs.LoadMsg(seq, null)); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + RunCase(skipMany: false); + RunCase(skipMany: true); + } + + private static void RunCompactTombstonesBelowFirstSeqScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(1UL); + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(2UL); + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(3UL); + + var (_, rollErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + rollErr.ShouldBeNull(); + + fs.StoreMsg("foo", null, null, 0).Seq.ShouldBe(4UL); + + fs.RemoveMsg(3).Removed.ShouldBeTrue(); + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + + var state = fs.State(); + state.Msgs.ShouldBe(2UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(4UL); + (state.Deleted ?? Array.Empty()).ShouldContain(2UL); + (state.Deleted ?? Array.Empty()).ShouldContain(3UL); + + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.Compact(4).Error.ShouldBeNull(); + + var after = fs.State(); + after.Msgs.ShouldBe(1UL); + after.FirstSeq.ShouldBe(4UL); + after.LastSeq.ShouldBe(4UL); + (after.Deleted ?? Array.Empty()).ShouldBeEmpty(); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + BlockSize = 330, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunSyncBlocksFlushesAndSyncsMessagesScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "x"u8.ToArray(), 0).Seq.ShouldBe(1UL); + + var blks = GetPrivateField>(fs, "_blks"); + if (blks.Count == 0) + { + var (_, newErr) = InvokePrivate<(MessageBlock? Mb, Exception? Error)>(fs, "NewMsgBlockForWrite"); + newErr.ShouldBeNull(); + blks = GetPrivateField>(fs, "_blks"); + } + + var lmb = blks.Count > 0 ? blks[^1] : null; + lmb.ShouldNotBeNull(); + + lmb!.Mu.EnterWriteLock(); + try + { + lmb.Lwts = 0; + } + finally + { + lmb.Mu.ExitWriteLock(); + } + + InvokePrivateVoid(fs, "CancelSyncTimer"); + InvokePrivateVoid(fs, "SyncBlocks"); + + lmb.ClearCacheAndOffset(); + var sm = fs.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Seq.ShouldBe(1UL); + sm.Subject.ShouldBe("foo"); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + AsyncFlush = true, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunSkipMsgAndCompactRequiresAppendScenario() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, new byte[256 * 1024], 0).Seq.ShouldBe(1UL); + + fs.SkipMsg(2).Error.ShouldBeNull(); + fs.Compact(2).Error.ShouldBeNull(); + + var mid = fs.State(); + mid.Msgs.ShouldBe(0UL); + mid.FirstSeq.ShouldBeGreaterThanOrEqualTo(1UL); + mid.LastSeq.ShouldBe(2UL); + + fs.SkipMsg(3).Error.ShouldBeNull(); + + var mb = GetFirstMsgBlock(fs); + mb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + var after = fs.State(); + after.Msgs.ShouldBe(0UL); + after.FirstSeq.ShouldBeGreaterThanOrEqualTo(1UL); + after.LastSeq.ShouldBe(3UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunCompactRewritesFileWithSwapScenario() + { + WithStore((fs, _) => + { + var msg = new byte[256 * 1024]; + for (var i = 0; i < 20; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), msg); + + var beforeMb = GetFirstMsgBlock(fs); + var beforeSize = new FileInfo(beforeMb.Mfn).Length; + + fs.Compact(20).Error.ShouldBeNull(); + + var state = fs.State(); + state.Msgs.ShouldBe(1UL); + state.FirstSeq.ShouldBe(20UL); + state.LastSeq.ShouldBe(20UL); + + var afterMb = GetFirstMsgBlock(fs); + File.Exists(afterMb.Mfn).ShouldBeTrue(); + var afterSize = new FileInfo(afterMb.Mfn).Length; + afterSize.ShouldBeLessThanOrEqualTo(beforeSize); + + var (_, _, rebuildErr) = afterMb.RebuildState(); + rebuildErr.ShouldBeNull(); + afterMb.Msgs.ShouldBeGreaterThan(0UL); + afterMb.First.Seq.ShouldBeLessThanOrEqualTo(20UL); + afterMb.Last.Seq.ShouldBeGreaterThanOrEqualTo(20UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIndexCacheBufIdxMismatchScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 5; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + var mb = GetFirstMsgBlock(fs); + mb.Mfn.ShouldNotBeNullOrWhiteSpace(); + File.Exists(mb.Mfn).ShouldBeTrue(); + + File.WriteAllBytes(mb.Mfn, Array.Empty()); + mb.ClearCacheAndOffset(); + + var (lost, _, err) = mb.RebuildState(); + err.ShouldBeNull(); + lost.ShouldNotBeNull(); + + mb.Msgs.ShouldBe(0UL); + mb.First.Seq.ShouldBe(6UL); + mb.Last.Seq.ShouldBe(5UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIndexCacheBufTombstoneMismatchScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 3; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 2); + InvokePrivate(fs, "WriteTombstone", 2UL, 0L).ShouldBeNull(); + + var mb = GetFirstMsgBlock(fs); + mb.ClearCacheAndOffset(); + mb.Dmap.Empty(); + + var (_, tombstones, err) = mb.RebuildState(); + err.ShouldBeNull(); + tombstones.ShouldContain(2UL); + mb.Dmap.Exists(2UL).ShouldBeTrue(); + mb.Msgs.ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIndexCacheBufTombstoneMismatchAfterCompactScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 3; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 2); + fs.Compact(2).Error.ShouldBeNull(); + + var mb = GetFirstMsgBlock(fs); + mb.ClearCacheAndOffset(); + mb.Dmap.Empty(); + + var (_, tombstones, err) = mb.RebuildState(); + err.ShouldBeNull(); + mb.Msgs.ShouldBeGreaterThan(0UL); + if (tombstones.Length > 0) + { + tombstones.ShouldContain(2UL); + mb.Dmap.Exists(2UL).ShouldBeTrue(); + } + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunIndexCacheBufEraseMsgMismatchScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 3; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 2, secure: true); + InvokePrivate(fs, "WriteTombstone", 2UL, 0L).ShouldBeNull(); + + var before = GetPrivateField(fs, "_state"); + var expected = new StreamState + { + Msgs = before.Msgs, + Bytes = before.Bytes, + FirstSeq = before.FirstSeq, + LastSeq = before.LastSeq, + }; + + var stale = new StreamState + { + Msgs = 3, + Bytes = before.Bytes, + FirstSeq = 1, + LastSeq = 3, + }; + SetPrivateField(fs, "_state", stale); + + var mb = GetFirstMsgBlock(fs); + mb.ClearCacheAndOffset(); + mb.Dmap.Empty(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + InvokePrivateVoid(fs, "RebuildState", (LostStreamData?)null); + var publicState = fs.State(); + publicState.Msgs.ShouldBe(expected.Msgs); + publicState.FirstSeq.ShouldBe(expected.FirstSeq); + publicState.LastSeq.ShouldBe(expected.LastSeq); + Should.Throw(() => fs.LoadMsg(2, null)); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunCompactRestoresLastSeqScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 4; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 1); + RemoveMsgWithFileState(fs, 4); + + var before = fs.State(); + var mb = GetFirstMsgBlock(fs); + mb.CompactWithFloor(0).ShouldBeNull(); + mb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + var after = fs.State(); + after.LastSeq.ShouldBe(before.LastSeq); + after.Msgs.ShouldBe(before.Msgs); + after.FirstSeq.ShouldBeGreaterThanOrEqualTo(2UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunCompactFullyResetsFirstAndLastSeqScenario() + { + WithStore((fs, _) => + { + StoreRawMsgWithSequence(fs, 1, Array.Empty()); + StoreRawMsgWithSequence(fs, 2, Array.Empty()); + + RemoveMsgWithFileState(fs, 1); + RemoveMsgWithFileState(fs, 2); + + var mb = GetFirstMsgBlock(fs); + mb.CompactWithFloor(0).ShouldBeNull(); + mb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + mb.Msgs.ShouldBe(0UL); + mb.First.Seq.ShouldBe(mb.Last.Seq + 1); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunDeleteBlocksWithManyEmptyBlocksScenario() + { + WithStore((fs, _) => + { + ConfigureSyntheticBlocks(fs, [(1UL, 6UL), (15UL, 15UL)], bytesPerMsg: 1); + + var blks = GetPrivateField>(fs, "_blks"); + for (ulong seq = 2; seq <= 6; seq++) + blks[0].Dmap.Insert(seq); + blks[0].Msgs = 1; + blks[1].Msgs = 1; + + var before = SnapshotDeleteBlocks(fs); + before.Count.ShouldBeGreaterThan(0); + before.Any(db => db is DeleteSlice).ShouldBeTrue(); + before.Any(db => + { + var (first, _, num) = db.GetState(); + return db is DeleteRange && first == 7UL && num == 8UL; + }).ShouldBeTrue(); + + blks[0].Dmap.Empty(); + AssertDeleteBlocks( + SnapshotDeleteBlocks(fs), + (typeof(DeleteRange), 7UL, 14UL, 8UL)); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunRemoveMsgsInRangePartialBlocksScenario() + { + WithStore((fs, _) => + { + var msg = new byte[16]; + for (var i = 0; i < 20; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), msg); + + for (ulong seq = 5; seq <= 8; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + for (ulong seq = 4; seq <= 11; seq++) + fs.RemoveMsg(seq); + for (ulong seq = 1; seq <= 30; seq++) + fs.RemoveMsg(seq); + + var state = fs.State(); + state.Msgs.ShouldBe(0UL); + state.FirstSeq.ShouldBe(state.LastSeq + 1); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + BlockSize = 256, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunRemoveMsgsInRangeWithTombstonesScenario() + { + WithStore((fs, _) => + { + for (var i = 0; i < 20; i++) + StoreRawMsgWithSequence(fs, (ulong)(i + 1), Array.Empty()); + + RemoveMsgWithFileState(fs, 2); + RemoveMsgWithFileState(fs, 3); + RemoveMsgWithFileState(fs, 4); + RemoveMsgWithFileState(fs, 10); + RemoveMsgWithFileState(fs, 14); + RemoveMsgWithFileState(fs, 15); + + for (ulong seq = 4; seq <= 17; seq++) + fs.RemoveMsg(seq); + for (ulong seq = 1; seq <= 100; seq++) + fs.RemoveMsg(seq); + + var dmap = InvokePrivate(fs, "DeleteMap"); + dmap.Exists(2).ShouldBeTrue(); + dmap.Exists(10).ShouldBeTrue(); + dmap.Size.ShouldBeGreaterThan(0); + fs.State().Msgs.ShouldBe(0UL); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + BlockSize = 256, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + + private static void RunCorrectChecksumAfterTruncateScenario() + { + WithStore((fs, _) => + { + var msg = Array.Empty(); + byte[]? expected = null; + long lenAfterThird = 0; + + for (var i = 0; i < 5; i++) + { + var seq = (ulong)(i + 1); + StoreRawMsgWithSequence(fs, seq, msg); + + if (seq == 3) + { + var mb3 = GetFirstMsgBlock(fs); + expected = [.. mb3.Lchk]; + lenAfterThird = new FileInfo(mb3.Mfn).Length; + } + } + + expected.ShouldNotBeNull(); + + var mb = GetFirstMsgBlock(fs); + File.Open(mb.Mfn, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite).Dispose(); + using (var f = new FileStream(mb.Mfn, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite)) + f.SetLength(lenAfterThird); + + mb.ClearCacheAndOffset(); + var (_, _, rebuildErr) = mb.RebuildState(); + rebuildErr.ShouldBeNull(); + + var lastChecksum = mb.LastChecksum(); + lastChecksum.SequenceEqual(expected!).ShouldBeTrue(); + mb.Lchk.SequenceEqual(expected).ShouldBeTrue(); + }, cfg: DefaultStreamConfig(subjects: ["foo"]), fcfg: new FileStoreConfig + { + SyncAlways = true, + Cipher = StoreCipher.NoCipher, + Compression = StoreCompression.NoCompression, + }); + } + private static void RunCompactScenario(bool doubleCompact, bool preserveLast) { WithStore((fs, _) => diff --git a/porting.db b/porting.db index dfd9c5e3ba330d5574e5f212b9124244679ecb4b..451d763437338ac6e1b6616f4f10cfe844f00ab7 100644 GIT binary patch delta 6235 zcmcJTdr%xj8o+mEmVGq4JM4lyCM;RVgGYdU@mSylmd64ykpxj=5+koAA&G!Mv`%9d za#ROnau;)#dt$issp60O zXN&r%`qkId{pVj8Ms&8j+}& z(vU<&lm;X!q|`5w(?@ZKgbqquB(hU#m&it`K_V-qT8T0#c_gZ)!~gRqHn`ajIiJSF z#5Q@Gy&WCiCS|q8q;N^Jm{OWVRg~f-s-$F)s6x+~S&2L}HzH9vr6GyRC=Ezds_~f= z{SubYB1#Ps6;i5|$Vtf~k%Q9Al0#fh zvPCP_DR`HmT|c1vURN5v3w?}IwNGfyYa-Qks$*e4g;gnm`zPmQyP5YGL9q_%PO(|B zJ(@8Z%0dWOJjXkOIxd@ah<=e7*tk5wz{?oGY%K|-ko0fXvsllcsu5(ad~VOm(pn$=7;IEBVlx;!2}a-TBkf`C}lFCZI*T7o|d6?I+E;l`0`w~ z&|0+4Vo@wvz~N-C)0dZ%8(N7+XOSqHk=VZ9?w804t;EJzBnoFF0(T-Sq$@NNHq0X7 zoRL_sSv4=PI&C4|Mb=M>$afKkB;tk3n;$a5=`?SW&y_1Z$3nA-w9KMnmsB8aGi!u{ zZ_}S`U{lMrg?OQ?o25p!X&?A=vNy};3hgGJ32SF@uueNHg{N+>GzRUgu53qWe>=^y z=w#05{FFs?Y|uDWB9|kii`NF;Ngbc;5m-H=LsyIm7MZ7Y>{+&uZuM4Zn)H$Xm=;g_ zfKa=70Sxc8^Ynr23hCOlLgOqZRg%f2wN3H|a;`m;>&6NVv*=VxI&s-EHzw38xPH<^ z{?=9@nfQOyYaDr@yc^=cOYCYB_X`^@s!y`tF~HYHs99Zts2pUwHA;m-p@h9}vboH1 zarjMk4Few;j5?NmQcQn~{V-CqN4JeOQ9U5)2%Ewn-8OG$W@Bf5ZZ0hU8(Sytf1mvl zsc4Hd=-jsR8cTztp`;N;Lq$VPLnH3Irv2p&C|P1K!;Q#r1Bknk1B%8_SXfq8ezsG* zy&GNUpyL>4hV~bc4u0H=(u4B!SB|#A)`ye{KmcV1Wkk*MNX3BZ=jkhR_o0-aJhbdX zYhm{ae>yr`58W>yHJs^3Dp2>Md6NwdlL|eKK95ns6zHL6PkanKvk&Q^=0_9>C-sG9}p}IzUD74qJB7s1DSYu_9YY_ zoIdpu`UrNX#+YIF021KbetEy+`_UWz5mVvv0hAVW;m|>JLmS9w4VWT1sXmwRiLSk ztNWa^V|O-0U=$5^9YwY9*ehWPa08=o*nJdb1o^?}QS=4edIgzb;yB{r<74t-M;=GR zfs)a*JNUwa6R0sTu{+0UX+An3yJ15o(Gn(*-gF8j2hTgdFO%vyv%%O)_0FF{9w^xv zIkhIzUqf0b9GvQl4@`V>UxrD+l=m=TI}jhet#=U72&bW=p%;4x4c$*OD-|G4;6#szcp_)>1Q;(hSu z@zp-Ge&#^RR#1`-q^yUMbRcERDM<%Xwv3W=AZ1G_Ne5E4gpzb1W!;pdGd5dDNjhV* zck98K&!;NU_X-!(<##wuNca<{Cb=VY`GHs0n_q53wTX8w~bb5KSHU?FWM-ZV$V{ayJDJFb9b2U*J- zDaQ3oEjOqg(R6XI`b(zmSJkM>{N>a%`-`jHq21yydZ4geEQi=T+IGzf&0F+q*D0i3?!u|&4aoN zQAu#}iVy||&ItlEzas=TTWyvz-)`T^^;W>LC*zV}_#L4ZdKN^LLYr}c8rF;mc~g6r zza6|hBJ@GhiYWTMW|=7l#x4qa`1N@qKe)}?=Y?0`mKbgZn>uEpzfA(1y&&WSw_z>{ z+hOY`PI_W%rlXRJf+sj_c~{sHu$UZS*SkVEJac|ZIc!oHGDd}7sELmap4(5OLg3jn zHLZkRH(dUfPXG8ND$#dIr~uPlrg_k9jj}-7v&m{`za+%MIbCA9j(#Qz+zV50G1y0i zB=On53TN1W#*Q)Jo?yXG$Eg1w-E{M}T#ZYHcH;sS7_JC0aQTYNa)=0GRI_zR% z;^4%UL>_LOOUw%3{H)>6Dq4iQ?RdD|7rNr&@CFsP0QuysyqO7Xd z#}aqIbL(Qu{XJI%U4%&@CP|pg!(=`t3ou!TNirrWn51HohRGsK(lN=vBoh-WCN@m$ zm^d(TVv>bPHYPck3iyRo@zz$SF$p2o8BMghnubwpP4BX}Q)lPDv%h!l zJ@{aZd7r+qYz*B|3^ps|ntJ^zW8 z`Yr!2@%^!eULGx5^Xt-*;`PP4ctbDuG(ck*-3w40qy7N77+nd_XhvrOG>TDMfJQQE z3{Whiy#b1OpW)5`IvKqeplC*=0g7T&7$7^Nya3r4Wd|sNQEGrxM(zNGGqMFpGAO;A zIe>zec$5F@n+D0sH4A4r$jGtS;8CVvhNKMP90=d>DR6+dM1V6>P_?2=;hG4)-NLwM zi>1=0h<2f<*i*cAZE=xq>I;+(`#!J~Xp6IjL$A6-rIEv_=2E2*cjJ>XhnsLcu96<& z&3HZbNH=l5bWQ4(PS!+Q(x*vHQoXWA*)CN{o2B)VN6J?=D65qMC0CiDOq6CxQxun! zpr}%`Vlq85c}+JYtE4jw7;c;XZn$nbXZlmkYGI>ne*V!M(R`yGyjKf!Q0BohKNAwx z;4C<@2Ad(i+G2)R*cYD)-OKR;NLhhx5VQ)r;pJ6W5p>U>VHH-P!h^49r6rgM*&N4Z zKQB7yHAblcn#;%?pgD|e0h-Oo9H3b+uo}m~Sf7V=Yj7%j?)#PIGm-*nC0G@LlA(S% zP65*&EM_rI5w*ts!Xm5qSa55G7UAE#+<{ur$EX%nXu4LROwySmxhW8NN{E1(lWftY zlR^$}_8t10;*+!m@Q~DJhWiH=#hQ z&%yW6*2=F1CXq?U6vU)wGB8P6<=67hpFvV*upRnCjAqcPQ4+LFuq)a?HTp_~!fwG1 zrTdTxp46gv@K%XN7!PO@IJ<&|0$%}&frd*aqbBb`K|B-=1VsV`wk5R=jrMO@S%*Ae z&DOCkp+*JDK12p+-;4BM*vmHeyA_k6B0HGlCo8b2#uf(K>W~7tPf!T7)S*y8?S#&E z^-&Q0DSP~%I+psDRU8d>_M%l#*y%Gel+|wpnmo(Z2AK3=2L+gI@CH#2WYKM*kNQJI?w!>Ew+^5y5;%w-)xx!}CU zP9AJ?$C2lG=2c8!c`}p1LQp6SZ$f$S-S@1u{RA1Ix(SW-8|-O9x4=5bU&KX#!IBbvCAFYz_WB0<>3idE9UppqQd&g< zvoawWg3Fnev&T_3rNQU&*p~B=16=SU&pS6jH!~?d_t>vwV$;f14Qnd0` zbJ<3|1cDNArOWV>;Y;bCvLgK~XQ5aWiX{1od{^$1soW)>kPo3nMvpPySOv9-_@34{ z9$WP~!!0ff+EcJw6gjhYCxwjx-V8sb;yB(7QIqg=aV3`p#gp(n*gYK=f^`OVL3$dF zgr|>r6D&`|SNUs@KADYzzG*B}HW^!B(KBAv4o}7lPlT95xo3h&kn&|tdMMqLuAyE{ z^5XS8&@#&?=y`yr*u+|~!ki?&BbJzB#l_-$ahmv&I7$rF@8LL2{}9`G@qlQDF1sZT z-nCoYu*7b$;a$G5`uqy(sbAvmTj^b z>NA&2ghzbv0En~kQ)u{t&AAI-2QRy_#0R+hJ8zBSAjjfBo~D!cK%sBm8023_IdG2r#E^z^)% z?GHG*)s3+0=*a2*d{y155w@LWcKRb#6O6l{Zicl{ezVc=-wW!L!EIt>c_v(Z;EaLh z9(GPek2({aSHq{nnjNDJP}-x8fTYIIWYJol0pl;IPHo3u)OG=!+e~(7aEFhE%3gJr z-@uJt=Ir3Un4x}{FRNj2@iH^Oe4{q1z7^*0!(df!++vyOEq~M~;_@_Ddfwp#_ic3% ze6bMHQ1v-z&tCxEb%`4~Kj zP7V)(JNMLBxOjia4DqVO8$rA_;qcrV~1q&