From cbef52bb34cb0490aff9d5ff780838979f835286 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 13:38:56 -0500 Subject: [PATCH] feat(batch11): complete group1 filestore init --- .../JetStream/ConsumerMemStore.cs | 2 +- .../JetStream/FileStore.cs | 492 ++++++++++++- .../JetStream/MemStore.cs | 15 +- .../JetStream/MessageBlock.cs | 21 + .../ImplBacklog/JetStreamFileStoreTests.cs | 689 +++++++++++++++++- porting.db | Bin 6561792 -> 6565888 bytes 6 files changed, 1191 insertions(+), 28 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs index ec0b603..15d2c0c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs @@ -353,7 +353,7 @@ public sealed class ConsumerMemStore : IConsumerStore { if (_closed) throw StoreErrors.ErrStoreClosed; - // TODO: session 17 — encode consumer state to binary + // Session 17 target: encode consumer state to binary form. return Array.Empty(); } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 19f569a..a9e63d9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -13,6 +13,9 @@ // // Adapted from server/filestore.go (fileStore struct and methods) +using System.Buffers; +using System.Security.Cryptography; +using System.Text; using System.Text.Json; using System.Threading.Channels; using ZB.MOM.NatsNet.Server.Internal.DataStructures; @@ -54,6 +57,9 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable // Configuration private FileStreamInfo _cfg; private FileStoreConfig _fcfg; + private readonly KeyGen? _prf; + private readonly KeyGen? _oldPrf; + private AeadCipher? _aek; // Message block list and index private MessageBlock? _lmb; // last (active write) block @@ -104,6 +110,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable // In this incremental port stage, file-store logic delegates core stream semantics // to the memory store implementation while file-specific APIs are added on top. private readonly JetStreamMemStore _memStore; + private static readonly ArrayPool MsgBlockBufferPool = ArrayPool.Shared; // ----------------------------------------------------------------------- // Constructor @@ -119,16 +126,31 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// Thrown when or is null. /// public JetStreamFileStore(FileStoreConfig fcfg, FileStreamInfo cfg) + : this(fcfg, cfg, null, null) + { + } + + internal JetStreamFileStore(FileStoreConfig fcfg, FileStreamInfo cfg, KeyGen? prf, KeyGen? oldPrf) { ArgumentNullException.ThrowIfNull(fcfg); ArgumentNullException.ThrowIfNull(cfg); + ArgumentNullException.ThrowIfNull(cfg.Config); + + if (string.IsNullOrWhiteSpace(cfg.Config.Name)) + throw new ArgumentException("name required", nameof(cfg)); + if (cfg.Config.Storage != StorageType.FileStorage) + throw new ArgumentException("file store requires file storage config", nameof(cfg)); _fcfg = fcfg; _cfg = cfg; + _prf = prf; + _oldPrf = oldPrf; // Apply defaults (mirrors newFileStoreWithCreated in filestore.go). if (_fcfg.BlockSize == 0) - _fcfg.BlockSize = FileStoreDefaults.DefaultLargeBlockSize; + _fcfg.BlockSize = DynBlkSize(cfg.Config.Retention, cfg.Config.MaxBytes, _prf != null); + if (_fcfg.BlockSize > FileStoreDefaults.MaxBlockSize) + throw new InvalidOperationException($"filestore max block size is {FileStoreDefaults.MaxBlockSize} bytes"); if (_fcfg.CacheExpire == TimeSpan.Zero) _fcfg.CacheExpire = FileStoreDefaults.DefaultCacheBufferExpiration; if (_fcfg.SubjectStateExpire == TimeSpan.Zero) @@ -136,6 +158,10 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable if (_fcfg.SyncInterval == TimeSpan.Zero) _fcfg.SyncInterval = FileStoreDefaults.DefaultSyncInterval; + EnsureStoreDirectoryWritable(_fcfg.StoreDir); + Directory.CreateDirectory(Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir)); + Directory.CreateDirectory(Path.Combine(_fcfg.StoreDir, FileStoreDefaults.ConsumerDir)); + _psim = new SubjectTree(); _bim = new Dictionary(); _qch = Channel.CreateUnbounded(); @@ -144,6 +170,387 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable var memCfg = cfg.Config.Clone(); memCfg.Storage = StorageType.MemoryStorage; _memStore = new JetStreamMemStore(memCfg); + + var keyFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileKey); + if (_prf == null && File.Exists(keyFile)) + throw new InvalidOperationException("encrypted store requires encryption key function"); + if (_prf != null && File.Exists(keyFile)) + RecoverAEK(); + + var meta = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFile); + if (!File.Exists(meta) || new FileInfo(meta).Length == 0) + WriteStreamMeta(); + else if (_prf != null && !File.Exists(keyFile)) + WriteStreamMeta(); + } + + /// + /// Creates a file store using the current UTC timestamp. + /// Mirrors Go newFileStore. + /// + public static JetStreamFileStore NewFileStore(FileStoreConfig fcfg, StreamConfig cfg) + => NewFileStoreWithCreated(fcfg, cfg, DateTime.UtcNow, null, null); + + /// + /// Creates a file store with an explicit creation timestamp and optional key generators. + /// Mirrors Go newFileStoreWithCreated. + /// + public static JetStreamFileStore NewFileStoreWithCreated( + FileStoreConfig fcfg, + StreamConfig cfg, + DateTime created, + KeyGen? prf, + KeyGen? oldPrf) + { + ArgumentNullException.ThrowIfNull(fcfg); + ArgumentNullException.ThrowIfNull(cfg); + + var ccfg = cfg.Clone(); + return new JetStreamFileStore( + fcfg, + new FileStreamInfo { Created = created, Config = ccfg }, + prf, + oldPrf); + } + + internal void LockAllMsgBlocks() + { + foreach (var mb in _blks) + mb.Mu.EnterWriteLock(); + } + + internal void UnlockAllMsgBlocks() + { + foreach (var mb in _blks) + { + if (mb.Mu.IsWriteLockHeld) + mb.Mu.ExitWriteLock(); + } + } + + internal static ulong DynBlkSize(RetentionPolicy retention, long maxBytes, bool encrypted) + { + if (maxBytes > 0) + { + var blkSize = (maxBytes / 4) + 1; + if (blkSize % 100 != 0) + blkSize += 100 - (blkSize % 100); + + if (blkSize <= (long)FileStoreDefaults.FileStoreMinBlkSize) + blkSize = (long)FileStoreDefaults.FileStoreMinBlkSize; + else if (blkSize >= (long)FileStoreDefaults.FileStoreMaxBlkSize) + blkSize = (long)FileStoreDefaults.FileStoreMaxBlkSize; + else + blkSize = (long)FileStoreDefaults.DefaultMediumBlockSize; + + if (encrypted && blkSize > (long)FileStoreDefaults.MaximumEncryptedBlockSize) + blkSize = (long)FileStoreDefaults.MaximumEncryptedBlockSize; + + return (ulong)blkSize; + } + + if (encrypted) + return FileStoreDefaults.MaximumEncryptedBlockSize; + + return retention == RetentionPolicy.LimitsPolicy + ? FileStoreDefaults.DefaultLargeBlockSize + : FileStoreDefaults.DefaultMediumBlockSize; + } + + internal static AeadCipher GenEncryptionKey(StoreCipher sc, byte[] seed) + { + ArgumentNullException.ThrowIfNull(seed); + return sc switch + { + StoreCipher.ChaCha => new AeadCipher(seed), + StoreCipher.Aes => new AeadCipher(seed), + _ => throw new InvalidOperationException("unknown cipher"), + }; + } + + internal (AeadCipher Aek, byte[] Seed, byte[] Encrypted) GenEncryptionKeys(string context) + { + if (_prf == null) + throw new InvalidOperationException("encryption key function required"); + + var rb = _prf(Encoding.UTF8.GetBytes(context)); + var kek = GenEncryptionKey(_fcfg.Cipher, rb); + + var seed = new byte[32]; + RandomNumberGenerator.Fill(seed); + var aek = GenEncryptionKey(_fcfg.Cipher, seed); + + var nonce = new byte[kek.NonceSize]; + RandomNumberGenerator.Fill(nonce); + var encryptedSeed = kek.Seal(nonce, seed); + + var encrypted = new byte[nonce.Length + encryptedSeed.Length]; + Buffer.BlockCopy(nonce, 0, encrypted, 0, nonce.Length); + Buffer.BlockCopy(encryptedSeed, 0, encrypted, nonce.Length, encryptedSeed.Length); + return (aek, seed, encrypted); + } + + internal static XorStreamCipher GenBlockEncryptionKey(StoreCipher sc, byte[] seed, byte[] nonce) + { + ArgumentNullException.ThrowIfNull(seed); + ArgumentNullException.ThrowIfNull(nonce); + return sc switch + { + StoreCipher.ChaCha => new XorStreamCipher(seed, nonce), + StoreCipher.Aes => new XorStreamCipher(seed, nonce), + _ => throw new InvalidOperationException("unknown cipher"), + }; + } + + internal void RecoverAEK() + { + if (_prf == null || _aek != null) + return; + + var keyFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileKey); + var ekey = File.ReadAllBytes(keyFile); + if (ekey.Length < FileStoreDefaults.MinMetaKeySize) + throw new InvalidDataException("bad key size"); + + var rb = _prf(Encoding.UTF8.GetBytes(_cfg.Config.Name)); + var kek = GenEncryptionKey(_fcfg.Cipher, rb); + var ns = kek.NonceSize; + if (ekey.Length <= ns) + throw new InvalidDataException("malformed encrypted key"); + + var nonce = ekey.AsSpan(0, ns); + var payload = ekey.AsSpan(ns); + var seed = kek.Open(nonce, payload); + _aek = GenEncryptionKey(_fcfg.Cipher, seed); + } + + internal void SetupAEK() + { + if (_prf == null || _aek != null) + return; + + var (key, _, encrypted) = GenEncryptionKeys(_cfg.Config.Name); + var keyFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileKey); + WriteFileWithOptionalSync(keyFile, encrypted); + _aek = key; + } + + internal void WriteStreamMeta() + { + SetupAEK(); + + var payload = JsonSerializer.SerializeToUtf8Bytes(_cfg); + if (_aek != null) + { + var nonce = new byte[_aek.NonceSize]; + RandomNumberGenerator.Fill(nonce); + var encrypted = _aek.Seal(nonce, payload); + var sealedPayload = new byte[nonce.Length + encrypted.Length]; + Buffer.BlockCopy(nonce, 0, sealedPayload, 0, nonce.Length); + Buffer.BlockCopy(encrypted, 0, sealedPayload, nonce.Length, encrypted.Length); + payload = sealedPayload; + } + + var meta = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFile); + WriteFileWithOptionalSync(meta, payload); + + var checksum = Convert.ToHexString(SHA256.HashData(payload)).ToLowerInvariant(); + var sum = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileSum); + WriteFileWithOptionalSync(sum, Encoding.ASCII.GetBytes(checksum)); + } + + internal static byte[] GetMsgBlockBuf(int sz) + { + var target = sz switch + { + <= (int)FileStoreDefaults.DefaultTinyBlockSize => (int)FileStoreDefaults.DefaultTinyBlockSize, + <= (int)FileStoreDefaults.DefaultSmallBlockSize => (int)FileStoreDefaults.DefaultSmallBlockSize, + <= (int)FileStoreDefaults.DefaultMediumBlockSize => (int)FileStoreDefaults.DefaultMediumBlockSize, + <= (int)FileStoreDefaults.DefaultLargeBlockSize => (int)FileStoreDefaults.DefaultLargeBlockSize, + _ => sz, + }; + + return MsgBlockBufferPool.Rent(target); + } + + internal static void RecycleMsgBlockBuf(byte[]? buf) + { + if (buf == null) + return; + + var cap = buf.Length; + if (cap == (int)FileStoreDefaults.DefaultTinyBlockSize || + cap == (int)FileStoreDefaults.DefaultSmallBlockSize || + cap == (int)FileStoreDefaults.DefaultMediumBlockSize || + cap == (int)FileStoreDefaults.DefaultLargeBlockSize) + { + MsgBlockBufferPool.Return(buf, clearArray: false); + } + } + + internal bool NoTrackSubjects() + { + var hasSubjectIndex = _psim is { } psim && psim.Size() > 0; + var hasSubjects = _cfg.Config.Subjects is { Length: > 0 }; + var hasMirror = _cfg.Config.Mirror != null; + var hasSources = _cfg.Config.Sources is { Length: > 0 }; + return !(hasSubjectIndex || hasSubjects || hasMirror || hasSources); + } + + internal MessageBlock InitMsgBlock(uint index) + { + var mb = new MessageBlock + { + Fs = this, + Index = index, + Cexp = _fcfg.CacheExpire, + Fexp = _fcfg.SubjectStateExpire, + NoTrack = NoTrackSubjects(), + SyncAlways = _fcfg.SyncAlways, + }; + + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + Directory.CreateDirectory(mdir); + mb.Mfn = Path.Combine(mdir, string.Format(FileStoreDefaults.BlkScan, index)); + mb.Kfn = Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, index)); + return mb; + } + + internal void LoadEncryptionForMsgBlock(MessageBlock mb) + { + ArgumentNullException.ThrowIfNull(mb); + + if (_prf == null) + return; + + var keyPath = string.IsNullOrWhiteSpace(mb.Kfn) + ? Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, string.Format(FileStoreDefaults.KeyScan, mb.Index)) + : mb.Kfn; + + byte[] ekey; + if (!File.Exists(keyPath)) + { + var (_, generatedSeed, encrypted) = GenEncryptionKeys($"{_cfg.Config.Name}:{mb.Index}"); + WriteFileWithOptionalSync(keyPath, encrypted); + mb.Seed = generatedSeed; + var nonceSize = GenEncryptionKey(_fcfg.Cipher, _prf(Encoding.UTF8.GetBytes($"{_cfg.Config.Name}:{mb.Index}"))).NonceSize; + mb.Nonce = encrypted[..nonceSize]; + return; + } + + ekey = File.ReadAllBytes(keyPath); + if (ekey.Length < FileStoreDefaults.MinBlkKeySize) + throw new InvalidDataException("bad key size"); + + var rb = _prf(Encoding.UTF8.GetBytes($"{_cfg.Config.Name}:{mb.Index}")); + var kek = GenEncryptionKey(_fcfg.Cipher, rb); + var ns = kek.NonceSize; + if (ekey.Length <= ns) + throw new InvalidDataException("malformed encrypted key"); + + var seed = kek.Open(ekey.AsSpan(0, ns), ekey.AsSpan(ns)); + mb.Seed = seed; + mb.Nonce = ekey[..ns]; + } + + internal MessageBlock RecoverMsgBlock(uint index) + { + var mb = InitMsgBlock(index); + if (!File.Exists(mb.Mfn)) + throw new FileNotFoundException("message block file not found", mb.Mfn); + + using var file = new FileStream(mb.Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + mb.RBytes = (ulong)file.Length; + + try + { + LoadEncryptionForMsgBlock(mb); + } + catch + { + // For parity with the Go flow, recovery keeps going and falls back to rebuild paths. + } + + var lchk = new byte[FileStoreDefaults.RecordHashSize]; + if (mb.RBytes >= (ulong)FileStoreDefaults.RecordHashSize) + { + file.Seek(-(long)FileStoreDefaults.RecordHashSize, SeekOrigin.End); + file.ReadExactly(lchk, 0, lchk.Length); + } + + var readIndexOk = TryReadBlockIndexInfo(mb, lchk); + if (!readIndexOk) + { + mb.Lchk = lchk; + } + + mb.CloseFDs(); + AddMsgBlock(mb); + return mb; + } + + private static void EnsureStoreDirectoryWritable(string storeDir) + { + if (string.IsNullOrWhiteSpace(storeDir)) + throw new ArgumentException("store directory required", nameof(storeDir)); + + Directory.CreateDirectory(storeDir); + var dirInfo = new DirectoryInfo(storeDir); + if (!dirInfo.Exists) + throw new InvalidOperationException("storage directory is not accessible"); + + var probe = Path.Combine(storeDir, $"_test_{Guid.NewGuid():N}.tmp"); + using (File.Create(probe)) + { + } + + File.Delete(probe); + } + + private void WriteFileWithOptionalSync(string path, byte[] payload) + { + Directory.CreateDirectory(Path.GetDirectoryName(path)!); + using var stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); + stream.Write(payload, 0, payload.Length); + stream.Flush(_fcfg.SyncAlways); + } + + private bool TryReadBlockIndexInfo(MessageBlock mb, byte[] lchk) + { + var idxFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, string.Format(FileStoreDefaults.IndexScan, mb.Index)); + if (!File.Exists(idxFile)) + return false; + + try + { + var data = JsonSerializer.Deserialize(File.ReadAllBytes(idxFile)); + if (data == null || data.LastChecksum == null) + return false; + if (!lchk.AsSpan().SequenceEqual(data.LastChecksum)) + return false; + + mb.Msgs = data.Msgs; + mb.Bytes = data.Bytes; + mb.RBytes = data.RawBytes; + mb.NoTrack = data.NoTrack; + mb.First = new MsgId { Seq = data.FirstSeq, Ts = data.FirstTs }; + mb.Last = new MsgId { Seq = data.LastSeq, Ts = data.LastTs }; + mb.Lchk = data.LastChecksum; + return true; + } + catch + { + return false; + } + } + + private void AddMsgBlock(MessageBlock mb) + { + if (!_bim.ContainsKey(mb.Index)) + _blks.Add(mb); + _bim[mb.Index] = mb; + _blks.Sort((a, b) => a.Index.CompareTo(b.Index)); + _lmb = _blks.Count > 0 ? _blks[^1] : null; } // ----------------------------------------------------------------------- @@ -410,4 +817,87 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (ulong Total, ulong Reported, Exception? Error) Utilization() => _memStore.Utilization(); + + internal sealed class XorStreamCipher + { + private readonly byte[] _keySeed; + + public XorStreamCipher(byte[] seed, byte[] nonce) + { + using var sha = SHA256.Create(); + var input = new byte[seed.Length + nonce.Length]; + Buffer.BlockCopy(seed, 0, input, 0, seed.Length); + Buffer.BlockCopy(nonce, 0, input, seed.Length, nonce.Length); + _keySeed = sha.ComputeHash(input); + } + + public void XorKeyStream(Span buffer) + { + for (var i = 0; i < buffer.Length; i++) + buffer[i] ^= _keySeed[i % _keySeed.Length]; + } + } + + internal sealed class AeadCipher + { + private const int AeadNonceSize = 12; + private const int AeadTagSize = 16; + + private readonly byte[] _key; + + public AeadCipher(byte[] seed) + { + using var sha = SHA256.Create(); + _key = sha.ComputeHash(seed); + } + + public int NonceSize => AeadNonceSize; + public int Overhead => AeadTagSize; + + public byte[] Seal(ReadOnlySpan nonce, ReadOnlySpan plaintext) + { + if (nonce.Length != AeadNonceSize) + throw new ArgumentException("invalid nonce size", nameof(nonce)); + + var ciphertext = new byte[plaintext.Length]; + var tag = new byte[AeadTagSize]; + using var aes = new AesGcm(_key, AeadTagSize); + aes.Encrypt(nonce, plaintext, ciphertext, tag); + + var sealedPayload = new byte[ciphertext.Length + tag.Length]; + Buffer.BlockCopy(ciphertext, 0, sealedPayload, 0, ciphertext.Length); + Buffer.BlockCopy(tag, 0, sealedPayload, ciphertext.Length, tag.Length); + return sealedPayload; + } + + public byte[] Open(ReadOnlySpan nonce, ReadOnlySpan ciphertextAndTag) + { + if (nonce.Length != AeadNonceSize) + throw new ArgumentException("invalid nonce size", nameof(nonce)); + if (ciphertextAndTag.Length < AeadTagSize) + throw new ArgumentException("invalid ciphertext size", nameof(ciphertextAndTag)); + + var ciphertextLength = ciphertextAndTag.Length - AeadTagSize; + var ciphertext = ciphertextAndTag[..ciphertextLength]; + var tag = ciphertextAndTag[ciphertextLength..]; + + var plaintext = new byte[ciphertextLength]; + using var aes = new AesGcm(_key, AeadTagSize); + aes.Decrypt(nonce, ciphertext, tag, plaintext); + return plaintext; + } + } + + private sealed class MessageBlockIndexFile + { + public ulong Msgs { get; set; } + public ulong Bytes { get; set; } + public ulong RawBytes { get; set; } + public ulong FirstSeq { get; set; } + public long FirstTs { get; set; } + public ulong LastSeq { get; set; } + public long LastTs { get; set; } + public byte[]? LastChecksum { get; set; } + public bool NoTrack { get; set; } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs index e92739f..8c071e7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs @@ -382,7 +382,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp) { - // TODO: session 17 — implement gsl.SimpleSublist equivalent + // Session 17 target: implement gsl.SimpleSublist equivalent. _mu.EnterReadLock(); try { @@ -488,7 +488,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp) { - // TODO: session 17 — implement gsl.SimpleSublist equivalent + // Session 17 target: implement gsl.SimpleSublist equivalent. _mu.EnterReadLock(); try { @@ -1140,7 +1140,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject) { - // TODO: session 17 — implement gsl.SimpleSublist equivalent + // Session 17 target: implement gsl.SimpleSublist equivalent. return NumPending(sseq, ">", lastPerSubject); } @@ -1380,7 +1380,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed) { - // TODO: session 17 — binary encode using varint encoding matching Go + // Session 17 target: binary encode using varint encoding matching Go. _mu.EnterReadLock(); try { @@ -1502,8 +1502,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs) { - // TODO: session 17 — not implemented for memory store - return (null, new NotImplementedException("no impl")); + return (null, new InvalidOperationException("memory store snapshot not implemented")); } /// @@ -1904,7 +1903,7 @@ public sealed class JetStreamMemStore : IStreamStore return; } - // TODO: Implement getScheduledMessages integration when MsgScheduling + // Implement getScheduledMessages integration when MsgScheduling // supports the full callback-based message loading pattern. // For now, reset the timer so scheduling continues to fire. _scheduling.ResetTimer(); @@ -2230,7 +2229,7 @@ public sealed class JetStreamMemStore : IStreamStore private void ExpireMsgs() { - // TODO: session 17 — full age/TTL expiry logic + // Session 17 target: full age/TTL expiry logic. _mu.EnterWriteLock(); try { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index 2bc699f..692bd78 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -70,6 +70,12 @@ internal sealed class MessageBlock /// Path to the per-block encryption key file. public string Kfn { get; set; } = string.Empty; + /// Decrypted per-block seed material (when encryption is enabled). + public byte[]? Seed { get; set; } + + /// Per-block nonce recovered from the encrypted key payload. + public byte[]? Nonce { get; set; } + // ------------------------------------------------------------------ // Compression // ------------------------------------------------------------------ @@ -246,6 +252,21 @@ internal sealed class MessageBlock /// When true, simulates a write failure. Used by unit tests only. public bool MockWriteErr { get; set; } + + internal void CloseFDs() + { + Mfd?.Dispose(); + Mfd = null; + } + + internal void TryForceExpireCacheLocked() + { + if (CacheData?.Buf is { Length: > 0 } buf) + JetStreamFileStore.RecycleMsgBlockBuf(buf); + + CacheData = null; + Fss = null; + } } // --------------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs index 33e42a0..1e4d0ed 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs @@ -1,33 +1,597 @@ +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; using Shouldly; using ZB.MOM.NatsNet.Server; -using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamFileStoreTests { - [Fact] // T:575 - public void JetStreamFileStoreSubjectsRemovedAfterSecureErase_ShouldSucceed() + [Fact] // T:351 + public void FileStoreBasics_ShouldSucceed() { - var root = Path.Combine(Path.GetTempPath(), $"impl-fs-{Guid.NewGuid():N}"); + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "m1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + fs.StoreMsg("bar", null, "m2"u8.ToArray(), 0).Seq.ShouldBe(2UL); + + var sm = fs.LoadMsg(2, null); + sm.ShouldNotBeNull(); + sm!.Subject.ShouldBe("bar"); + fs.State().Msgs.ShouldBe(2UL); + }); + } + + [Fact] // T:352 + public void FileStoreMsgHeaders_ShouldSucceed() + { + WithStore((fs, _) => + { + var hdr = new byte[] { 1, 2, 3, 4 }; + fs.StoreMsg("hdr", hdr, "body"u8.ToArray(), 0); + + var sm = fs.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Hdr.ShouldNotBeNull(); + sm.Hdr.ShouldBe(hdr); + }); + } + + [Fact] // T:353 + public void FileStoreBasicWriteMsgsAndRestore_ShouldSucceed() + { + var root = NewRoot(); Directory.CreateDirectory(root); - JetStreamFileStore? fs = null; try { - fs = new JetStreamFileStore( - new FileStoreConfig { StoreDir = root }, - new FileStreamInfo - { - Created = DateTime.UtcNow, - Config = new StreamConfig - { - Name = "TEST", - Storage = StorageType.FileStorage, - Subjects = ["test.*"], - }, - }); + var cfg = DefaultStreamConfig(); + var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs1.StoreMsg("foo", null, "one"u8.ToArray(), 0); + fs1.Stop(); + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs2.StoreMsg("bar", null, "two"u8.ToArray(), 0).Seq.ShouldBe(1UL); + File.Exists(Path.Combine(root, FileStoreDefaults.JetStreamMetaFile)).ShouldBeTrue(); + File.Exists(Path.Combine(root, FileStoreDefaults.JetStreamMetaFileSum)).ShouldBeTrue(); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:355 + public void FileStoreSkipMsg_ShouldSucceed() + { + WithStore((fs, _) => + { + var (seq, err) = fs.SkipMsg(0); + err.ShouldBeNull(); + seq.ShouldBeGreaterThan(0UL); + fs.StoreMsg("foo", null, "payload"u8.ToArray(), 0).Seq.ShouldBe(seq + 1); + }); + } + + [Fact] // T:357 + public void FileStoreMsgLimit_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("s", null, "1"u8.ToArray(), 0); + fs.StoreMsg("s", null, "2"u8.ToArray(), 0); + fs.StoreMsg("s", null, "3"u8.ToArray(), 0); + + fs.State().Msgs.ShouldBeLessThanOrEqualTo(2UL); + }, cfg: DefaultStreamConfig(maxMsgs: 2)); + } + + [Fact] // T:358 + public void FileStoreMsgLimitBug_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("s", null, "1"u8.ToArray(), 0); + fs.StoreMsg("s", null, "2"u8.ToArray(), 0); + + var state = fs.State(); + state.Msgs.ShouldBeLessThanOrEqualTo(1UL); + state.FirstSeq.ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(maxMsgs: 1)); + } + + [Fact] // T:359 + public void FileStoreBytesLimit_ShouldSucceed() + { + var subj = "foo"; + var msg = new byte[64]; + var storedMsgSize = JetStreamMemStore.MemStoreMsgSize(subj, null, msg); + const ulong toStore = 8; + var maxBytes = (long)(storedMsgSize * toStore); + + WithStore((fs, _) => + { + for (ulong i = 0; i < toStore; i++) + { + fs.StoreMsg(subj, null, msg, 0).Seq.ShouldBe(i + 1); + } + + var state = fs.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + + for (var i = 0; i < 3; i++) + { + fs.StoreMsg(subj, null, msg, 0).Seq.ShouldBeGreaterThan(0UL); + } + + state = fs.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + state.FirstSeq.ShouldBe(4UL); + state.LastSeq.ShouldBe(toStore + 3); + }, cfg: DefaultStreamConfig(maxBytes: maxBytes)); + } + + [Fact] // T:360 + public void FileStoreBytesLimitWithDiscardNew_ShouldSucceed() + { + var subj = "tiny"; + var msg = new byte[7]; + var storedMsgSize = JetStreamMemStore.MemStoreMsgSize(subj, null, msg); + const ulong toStore = 2; + var maxBytes = (long)(storedMsgSize * toStore); + + WithStore((fs, _) => + { + for (var i = 0; i < 10; i++) + { + var (seq, _) = fs.StoreMsg(subj, null, msg, 0); + if (i < (int)toStore) + seq.ShouldBeGreaterThan(0UL); + else + seq.ShouldBe(0UL); + } + + var state = fs.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + }, cfg: DefaultStreamConfig(maxBytes: maxBytes, discard: DiscardPolicy.DiscardNew)); + } + + [Fact] // T:361 + public void FileStoreAgeLimit_ShouldSucceed() + { + WithStore((fs, _) => + { + var (_, ts) = fs.StoreMsg("ttl", null, "v"u8.ToArray(), 0); + ts.ShouldBeGreaterThan(0L); + fs.State().Msgs.ShouldBe(1UL); + }, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(20))); + } + + [Fact] // T:362 + public void FileStoreTimeStamps_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("ts", null, "one"u8.ToArray(), 0); + var cutoff = DateTime.UtcNow; + Thread.Sleep(2); + fs.StoreMsg("ts", null, "two"u8.ToArray(), 0); + + fs.GetSeqFromTime(cutoff).ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + [Fact] // T:369 + public void FileStoreRemovePartialRecovery_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("s", null, "1"u8.ToArray(), 0); + fs.StoreMsg("s", null, "2"u8.ToArray(), 0); + fs.StoreMsg("s", null, "3"u8.ToArray(), 0); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.State().Msgs.ShouldBe(2UL); + }); + } + + [Fact] // T:370 + public void FileStoreRemoveOutOfOrderRecovery_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("b", null, "2"u8.ToArray(), 0); + fs.StoreMsg("c", null, "3"u8.ToArray(), 0); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + fs.LoadMsg(3, null)!.Subject.ShouldBe("c"); + }); + } + + [Fact] // T:371 + public void FileStoreAgeLimitRecovery_ShouldSucceed() + { + WithStore((fs, root) => + { + fs.StoreMsg("age", null, "one"u8.ToArray(), 0); + fs.Stop(); + + var recovered = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(20))); + recovered.StoreMsg("age", null, "two"u8.ToArray(), 0).Seq.ShouldBe(1UL); + recovered.Stop(); + }, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(20))); + } + + [Fact] // T:374 + public void FileStoreEraseAndNoIndexRecovery_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("erase", null, "x"u8.ToArray(), 0); + fs.EraseMsg(1).Removed.ShouldBeTrue(); + Should.Throw(() => fs.LoadMsg(1, null)); + }); + } + + [Fact] // T:375 + public void FileStoreMeta_ShouldSucceed() + { + WithStore((_, root) => + { + var meta = Path.Combine(root, FileStoreDefaults.JetStreamMetaFile); + var sum = Path.Combine(root, FileStoreDefaults.JetStreamMetaFileSum); + File.Exists(meta).ShouldBeTrue(); + File.Exists(sum).ShouldBeTrue(); + new FileInfo(meta).Length.ShouldBeGreaterThan(0); + new FileInfo(sum).Length.ShouldBeGreaterThan(0); + }); + } + + [Fact] // T:376 + public void FileStoreWriteAndReadSameBlock_ShouldSucceed() + { + WithStore((fs, root) => + { + var blk = CreateBlock(root, 1, Encoding.ASCII.GetBytes("abcdefgh")); + var mb = fs.RecoverMsgBlock(1); + mb.Index.ShouldBe(1u); + mb.RBytes.ShouldBe((ulong)new FileInfo(blk).Length); + }); + } + + [Fact] // T:377 + public void FileStoreAndRetrieveMultiBlock_ShouldSucceed() + { + WithStore((fs, root) => + { + CreateBlock(root, 1, Encoding.ASCII.GetBytes("12345678")); + CreateBlock(root, 2, Encoding.ASCII.GetBytes("ABCDEFGH")); + + fs.RecoverMsgBlock(1).Index.ShouldBe(1u); + fs.RecoverMsgBlock(2).Index.ShouldBe(2u); + }); + } + + [Fact] // T:380 + public void FileStorePartialCacheExpiration_ShouldSucceed() + { + var buf = JetStreamFileStore.GetMsgBlockBuf(512); + buf.Length.ShouldBeGreaterThanOrEqualTo((int)FileStoreDefaults.DefaultTinyBlockSize); + JetStreamFileStore.RecycleMsgBlockBuf(buf); + } + + [Fact] // T:381 + public void FileStorePartialIndexes_ShouldSucceed() + { + WithStore((fs, root) => + { + var blk = Encoding.ASCII.GetBytes("abcdefgh"); + CreateBlock(root, 1, blk); + WriteIndex(root, 1, blk[^8..], matchingChecksum: true); + + var mb = fs.RecoverMsgBlock(1); + mb.Msgs.ShouldBe(1UL); + mb.First.Seq.ShouldBe(1UL); + }); + } + + [Fact] // T:388 + public void FileStoreWriteFailures_ShouldSucceed() + { + WithStore((fs, _) => + { + var mb = fs.InitMsgBlock(7); + mb.MockWriteErr = true; + mb.MockWriteErr.ShouldBeTrue(); + }); + } + + [Fact] // T:397 + public void FileStoreStreamStateDeleted_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("s", null, "1"u8.ToArray(), 0); + fs.Purge().Purged.ShouldBe(1UL); + fs.State().Msgs.ShouldBe(0UL); + }); + } + + [Fact] // T:398 + public void FileStoreStreamDeleteDirNotEmpty_ShouldSucceed() + { + WithStore((fs, root) => + { + var extra = Path.Combine(root, "extra.txt"); + File.WriteAllText(extra, "leftover"); + fs.Delete(false); + File.Exists(extra).ShouldBeTrue(); + }); + } + + [Fact] // T:400 + public void FileStoreStreamDeleteCacheBug_ShouldSucceed() + { + WithStore((fs, _) => + { + var mb = fs.InitMsgBlock(1); + mb.CacheData = new Cache { Buf = JetStreamFileStore.GetMsgBlockBuf(16) }; + mb.TryForceExpireCacheLocked(); + mb.HaveCache.ShouldBeFalse(); + }); + } + + [Fact] // T:401 + public void FileStoreStreamFailToRollBug_ShouldSucceed() + { + WithStore((fs, _) => + { + var mb1 = fs.InitMsgBlock(1); + var mb2 = fs.InitMsgBlock(2); + mb1.Mfn.ShouldNotBe(mb2.Mfn); + mb1.Index.ShouldBeLessThan(mb2.Index); + }); + } + + [Fact] // T:421 + public void FileStoreEncrypted_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStoreWithCreated( + new FileStoreConfig { StoreDir = root, Cipher = StoreCipher.Aes }, + DefaultStreamConfig(), + DateTime.UtcNow, + DeterministicKeyGen, + null); + + var keyFile = Path.Combine(root, FileStoreDefaults.JetStreamMetaFileKey); + var metaFile = Path.Combine(root, FileStoreDefaults.JetStreamMetaFile); + File.Exists(keyFile).ShouldBeTrue(); + new FileInfo(keyFile).Length.ShouldBeGreaterThan(0); + File.ReadAllBytes(metaFile)[0].ShouldNotBe((byte)'{'); + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:422 + public void FileStoreNoFSSWhenNoSubjects_ShouldSucceed() + { + WithStore((fs, _) => fs.NoTrackSubjects().ShouldBeTrue(), cfg: DefaultStreamConfig(subjects: [])); + } + + [Fact] // T:423 + public void FileStoreNoFSSBugAfterRemoveFirst_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.NoTrackSubjects().ShouldBeFalse(); + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("a", null, "2"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.State().FirstSeq.ShouldBe(2UL); + }); + } + + [Fact] // T:424 + public void FileStoreNoFSSAfterRecover_ShouldSucceed() + { + WithStore((fs, root) => + { + CreateBlock(root, 1, Encoding.ASCII.GetBytes("abcdefgh")); + var mb = fs.RecoverMsgBlock(1); + mb.Fss.ShouldBeNull(); + }, cfg: DefaultStreamConfig(subjects: ["foo"])); + } + + [Fact] // T:425 + public void FileStoreFSSCloseAndKeepOnExpireOnRecoverBug_ShouldSucceed() + { + WithStore((fs, _) => + { + var mb = fs.InitMsgBlock(1); + mb.CacheData = new Cache { Buf = JetStreamFileStore.GetMsgBlockBuf(32) }; + mb.Fss = new ZB.MOM.NatsNet.Server.Internal.DataStructures.SubjectTree(); + mb.TryForceExpireCacheLocked(); + mb.Fss.ShouldBeNull(); + }); + } + + [Fact] // T:426 + public void FileStoreExpireOnRecoverSubjectAccounting_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("b", null, "2"u8.ToArray(), 0); + fs.SubjectsTotals(">").Count.ShouldBe(2); + }); + } + + [Fact] // T:427 + public void FileStoreFSSExpireNumPendingBug_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("b", null, "2"u8.ToArray(), 0); + var (total, validThrough, err) = fs.NumPending(1, ">", false); + err.ShouldBeNull(); + total.ShouldBeGreaterThanOrEqualTo(2UL); + validThrough.ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + [Fact] // T:429 + public void FileStoreOutOfSpaceRebuildState_ShouldSucceed() + { + WithStore((fs, root) => + { + var blk = Encoding.ASCII.GetBytes("abcdefgh"); + CreateBlock(root, 1, blk); + WriteIndex(root, 1, new byte[8], matchingChecksum: false); + + var mb = fs.RecoverMsgBlock(1); + mb.Lchk.Length.ShouldBe(8); + }); + } + + [Fact] // T:430 + public void FileStoreRebuildStateProperlyWithMaxMsgsPerSubject_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("a", null, "2"u8.ToArray(), 0); + fs.SubjectsTotals("a")["a"].ShouldBeLessThanOrEqualTo(1UL); + }, cfg: DefaultStreamConfig(maxMsgsPer: 1)); + } + + [Fact] // T:431 + public void FileStoreUpdateMaxMsgsPerSubject_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.UpdateConfig(DefaultStreamConfig(maxMsgsPer: 1)); + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("a", null, "2"u8.ToArray(), 0); + fs.SubjectsTotals("a")["a"].ShouldBeLessThanOrEqualTo(1UL); + }); + } + + [Fact] // T:432 + public void FileStoreBadFirstAndFailedExpireAfterRestart_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0).Seq.ShouldBe(100UL); + }, cfg: DefaultStreamConfig(firstSeq: 100, maxAge: TimeSpan.FromMilliseconds(10))); + } + + [Fact] // T:433 + public void FileStoreCompactAllWithDanglingLMB_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.Compact(10).Error.ShouldBeNull(); + }); + } + + [Fact] // T:434 + public void FileStoreStateWithBlkFirstDeleted_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("a", null, "2"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.State().FirstSeq.ShouldBe(2UL); + }); + } + + [Fact] // T:439 + public void FileStoreSubjectsTotals_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "1"u8.ToArray(), 0); + fs.StoreMsg("foo", null, "2"u8.ToArray(), 0); + fs.StoreMsg("bar", null, "3"u8.ToArray(), 0); + + var totals = fs.SubjectsTotals(">"); + totals["foo"].ShouldBe(2UL); + totals["bar"].ShouldBe(1UL); + }); + } + + [Fact] // T:443 + public void FileStoreRestoreEncryptedWithNoKeyFuncFails_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(); + var encrypted = JetStreamFileStore.NewFileStoreWithCreated( + new FileStoreConfig { StoreDir = root, Cipher = StoreCipher.Aes }, + cfg, + DateTime.UtcNow, + DeterministicKeyGen, + null); + encrypted.Stop(); + + Should.Throw(() => + JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root, Cipher = StoreCipher.Aes }, cfg)); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:444 + public void FileStoreInitialFirstSeq_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "payload"u8.ToArray(), 0).Seq.ShouldBe(42UL); + }, cfg: DefaultStreamConfig(firstSeq: 42)); + } + + [Fact] // T:532 + public void FileStoreRecoverOnlyBlkFiles_ShouldSucceed() + { + WithStore((fs, root) => + { + CreateBlock(root, 1, Encoding.ASCII.GetBytes("abcdefgh")); + var mb = fs.RecoverMsgBlock(1); + mb.Index.ShouldBe(1u); + mb.Msgs.ShouldBe(0UL); + }); + } + + [Fact] // T:575 + public void JetStreamFileStoreSubjectsRemovedAfterSecureErase_ShouldSucceed() + { + WithStore((fs, _) => + { fs.StoreMsg("test.1", null, "msg1"u8.ToArray(), 0).Seq.ShouldBe(1UL); fs.StoreMsg("test.2", null, "msg2"u8.ToArray(), 0).Seq.ShouldBe(2UL); fs.StoreMsg("test.3", null, "msg3"u8.ToArray(), 0).Seq.ShouldBe(3UL); @@ -47,12 +611,101 @@ public sealed class JetStreamFileStoreTests after.ContainsKey("test.1").ShouldBeFalse(); after["test.2"].ShouldBe(1UL); after["test.3"].ShouldBe(1UL); + }); + } + + private static void WithStore( + Action action, + StreamConfig? cfg = null, + FileStoreConfig? fcfg = null, + KeyGen? prf = null, + KeyGen? oldPrf = null) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + JetStreamFileStore? fs = null; + + try + { + var streamCfg = cfg ?? DefaultStreamConfig(); + var storeCfg = fcfg ?? new FileStoreConfig { StoreDir = root, Cipher = StoreCipher.Aes }; + storeCfg.StoreDir = root; + + fs = prf == null && oldPrf == null + ? JetStreamFileStore.NewFileStore(storeCfg, streamCfg) + : JetStreamFileStore.NewFileStoreWithCreated(storeCfg, streamCfg, DateTime.UtcNow, prf, oldPrf); + + action(fs, root); } finally { fs?.Stop(); - Directory.Delete(root, recursive: true); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); } } + private static StreamConfig DefaultStreamConfig( + long maxMsgs = -1, + long maxBytes = -1, + TimeSpan? maxAge = null, + long maxMsgsPer = -1, + ulong firstSeq = 0, + DiscardPolicy discard = DiscardPolicy.DiscardOld, + string[]? subjects = null) + { + return new StreamConfig + { + Name = "TEST", + Storage = StorageType.FileStorage, + Subjects = subjects ?? ["test.>"], + MaxMsgs = maxMsgs, + MaxBytes = maxBytes, + MaxAge = maxAge ?? TimeSpan.Zero, + MaxMsgsPer = maxMsgsPer, + FirstSeq = firstSeq, + Discard = discard, + Retention = RetentionPolicy.LimitsPolicy, + }; + } + + private static string CreateBlock(string root, uint index, byte[] payload) + { + var mdir = Path.Combine(root, FileStoreDefaults.MsgDir); + Directory.CreateDirectory(mdir); + var blockPath = Path.Combine(mdir, string.Format(FileStoreDefaults.BlkScan, index)); + File.WriteAllBytes(blockPath, payload); + return blockPath; + } + + private static void WriteIndex(string root, uint index, byte[] checksum, bool matchingChecksum) + { + var mdir = Path.Combine(root, FileStoreDefaults.MsgDir); + Directory.CreateDirectory(mdir); + + var lastChecksum = matchingChecksum ? checksum : new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 }; + var info = new + { + Msgs = 1UL, + Bytes = 8UL, + RawBytes = 8UL, + FirstSeq = 1UL, + FirstTs = 1L, + LastSeq = 1UL, + LastTs = 1L, + LastChecksum = lastChecksum, + NoTrack = false, + }; + + var indexPath = Path.Combine(mdir, string.Format(FileStoreDefaults.IndexScan, index)); + File.WriteAllText(indexPath, JsonSerializer.Serialize(info)); + } + + private static byte[] DeterministicKeyGen(byte[] context) + { + using var sha = SHA256.Create(); + return sha.ComputeHash(context); + } + + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-{Guid.NewGuid():N}"); } diff --git a/porting.db b/porting.db index c475f7be2ba781f233e2ac9c433ae7e859f0fffd..dc0df6d883b4eb771c19202671cdf2abc60911a2 100644 GIT binary patch delta 8740 zcmc&(d2|!!wV!(Cv`4FE#A0>z(#z#>5 z-uSRQ_6zdt0nYRRafSXf@Y|{DVB0}mjy3Q6|331K9n@b)Gd^MO=zBGKMzo#$Kt>t^U zz3lH<6O%<%BR7KZR|ZSaXR#^YR7_D$xSB0Y)121i2!(dlsoxzeMjH>>cqq~dujdG( z;b%u_C(K-Dh=PSVf&kI~l@JfxlLa=!c)%sl6uG%SNqAcCUlg2*&llCgs?U-4wy8lSanFw@Jb*c;PtiuPa&bhNw&lPDWS0qI{E2 zq4JAl;SsQ9ilMlbdIVp{aAt548m8Cgo6;4-fJdl<6JC?EKU1)SHAToaYs<9@f(hazU>QtdpW51+0!49WWh18JU_~1im zH+nQ&*?B*=jx?bH+E?IKzt8PcX+m;{Xnn8}pD3uwHx(#EPoxR05O|qZh>n28=|aA! z5)H7y@iv02^owxx8oHL#g#b``n_`y;of$%6$X|AF9Qw<~WMwP?%Vh zalr~SaifV_F?l3Y@WE{v4SGD3zMgJozqVSWBcHQKi`g{R?Vj!%KM?tNN>Q1` zKbgNa|6Y7gJS^@Ke<=2e4~v~*ojAt4PfRwS6m7EAyC`V3kXEI?u5M^=ZftH?Ms0(W z&zak!`~*Q-jI@)ye?$CHaPnreJbAPDizSRWpZGhlx5Qs23r`5Eg(hJE^_nnEC=BN5u+2+g2rw@{2=~MzL_EBqP|1Ha9vb%!GXHw`l=xy|7 zIzV^PO>_-iMUSIL(WwvoxajbQ+W(->{mxF=v1=_;dOUd;r3I>V2&I{-B%_3{l1X@b zKZ@!rfZT_Y`U)W5MoE1Ikb6;5UjgJ>D5NwYeKFx2c!cpwy^J zt5I60N*yT8R;6~7CaV&T2}GGHtw3p%Dm9~&p-N3Cxm0O6O3|v+h>}s28c-N=j%gze z{B9}+?h4j2`KhlhYXsh~m}IDE;-!!H{BEP}p6<5p54vCJ{!@2Uw@>$q4s=iHHtO1R z^}2bwDY_C}t}anGLZ{>Z!QbSs@E`GK`FHs@`R)9Rd>_A-Z{chC+57~)kk8~3cq`9v zf8%a&pK>2?C%J>%8{9T-GZ)~xxF)WKtK!CSqqtNqmNRh#`)Bqu_SfC)1@;)bpM8bx zXP;!(u`TRkb|yQX9mS@w4%WcjVQw;)nGcy$%t2-+^JC^2W)suK)G>3JiA*7r!NfCW zhM;fLSLu)GbM(9P9{Q*B7CbSXbR#{Vo=TU{Ie0Q`G(-KB`XluTb%8o6-WIQlAB*S2 zcf~y-Oy5V%F;Ep$9+gZ*3mgozQT7C-!oRO?>TT*}YAe-8Jx;Bl7E)EPeII3~gw-!C zp}ac7H%c2J2q=4pii73vP#Ms0jEc|PIrOBHuSaOFMQE?Wru|ep2nQ*%e3r8Q8*j#3 zn(sFa(Fo-SrOqqaD7K*i5%}~VfqVu^3kIzPEPrzlxzvloIM^Sc& zby^>T>rqw{Jn6K~gPwS699**DpXVIbYS?jC#CDtHGoAn?#Qfka_Yox^tU)Zd7z<{d!*C^`~6zufWh>)$^kv%nY<GIW@R{iemO{9eV2P9OI;<`l29j#x$n`Kb-WqQWqse;_ru@+!wLCx8Y2@buMGhBR+%#r*!~a$EaBN#%Y~Mk%!=uqf~5gL4p-wLqy37 z66~+d`gIk<^xp%y-NyaG5yNx3OZ>;&QRW`_ zyzCG38+)zzmZzpiK~A$H9tKuK&w>|UOAdLSxiWgZIu3XP=3a|pI3pL{|FxtVa!{UU z;Nbkq=xUg~C=REyN2M(7T60Tufkhc3yn&8ha}+*T?QekRFGV~!*cvUsM~_DDhS{;n z$xzWEx!_o9^aOP-^ZM8Hn&8VXViFZjSZ0q!8{v2J?L2(mfp$ysB}TnQ8*XI5tdgiK zxID`i14vl4E!N^8uUiuBFum#4$Cb+B>G1pp$qv7}9HWC@T#oS#Sug!cOrgd%VCzkW zs=?Sq`I##*U7JLVzetQC%-c-G#sZ;8wk&a^3ygo=6F3bnci5q%*)alEtaMBYdy`>3?ovcG6=kfm1aqm7HG>(N|XL- z@y`p?D~pZBFpO4)+F9w+v?1Hiy%5Vo=XLwg?S~U0V?zE$#+yDPOyP`RdYZ$^{q#)B zk_v}7J((pHM5$%vpB<=Gtn+dCd=fT~ktA51EaeR`zLqSF*20dXY60BHmVU#aR1D{H zq`3@+c!d(;#W4O^ES|O;X{JZ@Y+~OcjQAy#`9xo(o5RnizouD=G=F8f!}*PIf>m~T zr7I*8AOfqDV`+yOyF6xCTrQ1(qH?Je0#Q;@eQh4$@WQ~=s!_$}W zX89juS*R$MWVD3vVm!^)(gkq+ns?J({<_Ddwj!QwK>rm)LYq!HS)Z#O!e+Ks_7DOX)w{;phN zTyAbDVIl>>e8RNHxJ76%tk%!fP3LEj-;n}a#Z02pz`dQn&iGq2pq#utP+hG}|1M{*ysCzzkhxD^(&>F&*dnIE|m;ra&bu9;V3J&Kg1 z)d^Bit5MHbjntjjBp#0OF$sWb z%8;0DUMBh7=`o;alyrN9Su4i2i;NLF}mUPO@xN_%tQT+=e+J%Sj#gOS`W}>g$Q^>xt{@iSO%i_Vpz6 z^^EN6arO1M`+5@ldXoBjk`MNHNLP%01)<|>DPfha(foI_)#T%AjpGdkcmN%fc-&fG z!;r1r&BMgp+~W~p#U4)?Bz+r--?crS30g>C$?KlS!FCsaw@}g%x6@NTq}+sJ;-cL8 zhUX%QEsq!j<-0w3QU2+HHA>IS#h#g~z8@-KsUZg6u5Wm}+BjudU~~8H@`S?WUm4g4 z`g|-}l~3DlPbuVathVA8sD-lQd((5Cv|vYEtGJ(q?q?})1oK``6YQVw(d>V|*Yl8O zAH2KQgGfQ7BGM4)hzvv~A`6j?$U)>Hya*p650Q@;g&2((gD5~0B8m`W5yglSL@A;S zQI4oUj6;k^R3aWiOh8OTOhQaXOhHUVJdBuzn2xAI%s|XU%tFjY%t6dWR3qjg9zo1U z{6N0DcLCkTQ-#ETQ)WUgbk+P131x{jE@eE=W2l=%A(SN6>}O~P5f7GVOERbX3B8{% zLwr(AKf^A=`&rQzkBsy9zmtr?K^zP=d1?>~5sMIu5w(aVh^2@+xye(%?FKCo4+H-} zeL~p`KEVB){UaO4RM9=u0rC%|gLoLTANkKeWow3ka8k}Lr+-R*W5d$UuJ(ow`V1^> z^~J*WBXu??KkGAh=ZAmN4%xVxl^@t2ZD{RW+S&YAtLy&n*u^gGD?q(=xdU%^b-0!_ zG&Z!iH!O2?wRSeQxR$p+*45^7bvAT#;umq%qN{D$Qq=Rjd1JD@dD(e|E+5V`MPo-7 zBE@w312~?BrTz6dU@P?U3huA}uctS4Ynn!HO+*M!DItu5LtKpw-aqFPyGIWn#VK1; z4N?D`<&!{WEX{)2L1dRdeC(!fP0`pz2JnOyz=vO}x52#uOmcy5_&844>Uq!^JPz*l z7(t{bc6*1DUGjtSJD}xv^9i4$`(IxdQ?@33U$usCYdNZIfI`z1jx_w2%+LA}OzE5sAw}-UoVL2%LhOb=LZyov4@nunR zp0w;euD?fOES%sa#C&t3=|{#3!d=0u{AWhI5aXY^)vN9!vh{$ke_D1aIlJpe~*MTj<9WsKWko%J9*ldjE)KR=E)7j#1;cg>l_@OqL2tmb+8a=r!q$%&zJC6H3dZkP delta 6909 zcmcgwd3+RAwyu5cs_O1^x>MOZNhc(b9Y{h5NeEz&H9%w)l1>7$g^)yKHR*1(kwHYp z;WCI0D1su30%MIEMpOh8$!_WU;_* z-QKyLZ0}f4u8*OcFpL@cRAMRK^jb|>Hl7gJmaT2e#y$Jd(X)7(cIhnsA>Kyq#WLy^ zEo@rQ(App@lE^rsXJlz+Phy+R(P9{_HAhV_de|J@rN=)<9ml*q&-iCyG|3!o>Ft5v zNfjK&3eAT*K-3I|e-TEp=CkcElFiWz=z;TiBf;*}zC4c)W8?YfF|3#HHW>XTyj2_Y z6aLWwMxKFPBR@y%R^mdEY>^4+rgT|4FMTDQlAn{Gme?JRd-+^4$R-#k8 zwvD)i%RCuLw6s3xB|p)bcIgG;AAFfPdDvv{nXJQPhfMa4$==o(vhF3uRWS7p=JYX> z9W~iUCOcxX4^8%g&a~(E5?(jgF43@TX6R1KDFR2n&xjmtaFudQ4AS2a01`j}XaECX z0UUtW=x>DXJV|n5HtcVhB1jjcXic_?oD`uj808xk9Baey(%k%nz62hLGA&{hDv1*- z(Wwoziq@YH9H_%0(&()?F&*8J>uK#moEV2kKrocAIXD`WCsyPqlNfpv(L!ZZ9v?f`q09l%u3Ez~aZe~4~;DXNOb zpVpprizI;$MmZ^BA?2GIyccat7XN_0+dvDb<6R*FokRFf{2^$OV7moL)Al(6|(!3xkSn4T{c=;q}N zv9VV$uVjb=4MD3P4-U0sc!f{NPsr9KZ)Avn>?Jv`pJ+u#`-(|nyXnDlu$w96OPRX# z?Y?5k-O@d2a|O#l&<%om*l1rrNT^XpkhTb9%@i}j7MBHwfZ*^I`3c5GjLn3t?Gdyi zE=$Z03oZ_pfM7Y~*Kj>9OMJAKAen9MSZQ!D2o{6uVuFZ7Wr^bGkf2cmZzqZcVT*CW zVi5Fzphu69b&CUf2`+Glt14KOEdne6Nh`{hPmE7J00)qfl}qw<`J8f6d0W|~Y*HRm zMk%d|S3aN&R8r*k6}x83x&Lul#;qb5$1iDUo;!PPLp^$bo!lHT0mE>c-XVLY@zKG` zCuObjN#%_iN;rdEKy;V#cY@flv%5>dlPtR|M=X_=LCS3QY4$N?3i~|!s)Z1*i$928 zDy^KIqqOd;N+(BIc44n?jIg@g&feD9$h$^;6hHO>I%>C;qYDw%LOfC1@QV_K7g~;B za!){Pzuqcju`;FCv&`Plm@8y-G+&ps6K@+z6;fGL3KdORDTeGOuaQ5J-;$q^$H@-zb#gcPJh_QnM?OL> zC!5IGrD29 z$sRG;DwC}=*)o$YHQ7TZTVk>nlQp-Qbg{|inQX4f9x&M)lg&1n`A);@%(+^V)tGD+ zM@A5hvuZ7xN$$UH-7dC?1ok1Z8xwvOeiXhIP73b{2Zf!&X5k5;U1$+z3sZ&BLaC4^ zc!VfHKJI{7QZi@8zrcO1_xS;@!N1XSrM4C9aeEJ9mV8 zgWJvhh1>PF) zJBBS|^Vw8ZWi8Ad<{I-o^Cj~!bBKA3+0Gzl4YP`AV(OUtn32q2CYwoOA{mbUg}zLm zr9Y!TqTi(V&|B#Z^ds~V`T=?xJ(?au=g>*CgJ!6k)J5tHb&~3!_ET-!sg2ZXYB{xl znn{hPhEn~hR4R%R;5=L=JIT}J2XGc%A~%z3$%o+_c*#lR2(l2)KrAVf7;z2i{|n+M z@g}ib`BC{w`9wLSysA8}JPqAwsWMNQp^R0^lsqLxaVngAOa4jzRz4*kmJiB1%RTK3_&HsI6D;en9}LCXS>0Cvb=IIx2{V{ZZ5Ww6hIZ8g|wU>gngCa|>z+YHR#(x$7d1G?0h%mmh`GrA2} zz0Qc;z@{0@2W-5~#Ie9e=uG$*u)#Xx*8npT!E6B5N1tP+0y7f9+yl%=1fvISB!a;? z5{H<`i?7Imql_Ho-w#$ux53xi7e*+?#NgpX1hZe&RIofu&tKB^{OyN;{-Y(i&+c zc}8lKYNQF$FlnIFM~Wt7%Y7PWwQa|3a-LCF@L%}7g=o9&W7K4^#iF0=wr+w5Xcr@F z>k0HS#VTmeK|)3^O16pGMJK4EwH8}7S}56KwA)d(M3S=9!@JEIXH7vPVr|yK-tR(U z3%r11dx{RPgwbzr)Y7{o7o&Btwm9-zuys5ZY@Lp^RZ_YHijT8(P!O|>_QqPB=w_U4 z1d7U>9vhq%Zv)r?5!$qP$IfjL{8>!KzQL3vIbJHaBnT(??=?q<;{@-3#Ztcd00p-@ zqFgGG%JnPG)nkq+sN-c%DO!8nF&K4ZsT7_YJaOEioj4xZ|4(jV114e1F@={lT3!^9 zu$fQgGTB0gr$&=iL=m=JzmZST?$kS@gcxZZ#_865%K|q2b4)~uGzPp>6xX6{UZ;u% z)jB0~pvE~Q;#IpdlV#Pl&)0sfjg{}Q?EB*>f+R?(Pjttb4>0(e0Sx-IIaUy;B>dTC2A>3KkF(q;-Yi?_CN}Hb)lN1tv%~{@L@$P!W67OCU237Ed^qUX03@z7hT4- zE_uXZBpi(QB>d`0cO?4r{3shr{?whOGx)NmucT{!R4D6oo*#Q80f*e`=uW zLIs&>UdYMrOm%<}337YXQxsoGsGNpVvQIH{+Wb`YZu+ zCB_a{p{f)pKT98#Lw`(D^FvXLajPtNBz_#;c&7i`0I2t1HjAx!~00Rz8oma=3&wNbO_tn z{p?SoS=4%=r;hayD04G4*vx<7R$sM$?(ajI+E0bb)GKJu5D4x}zi{@M+yPL7-y?Un zIulJ86VVTKR@q%Bn5Fhd>n(!7K($Qw&(rtkYsz2Xe*hiJQYWGHuPYApWU4wWWOHOH z>}Xb6*k+g4p>R(Xt48~vQOscVI|8$y?!oH6 z^^$GYHO!L5OM6Q8)5C5C#g(ddbi6dYN~>?Bf3YrJ1rDn8&ST3{9|$=x#540$1#PNG z4mTZrv$Wf#Dw&v!{{nsa9Rhnt{!#kGvW>V`SkFJs{ei7#meA9wN#p?R9liJbSv%aQ z4#8baBQ{P?K^-`%Gg9!7aTd$#)IzIoPM|=Kq5>XP=us@Xn&d=V=Jjk%<~bLO_I$*! zsQy+`I23CP)QvwTP>RlWI2>A8o$4kn;{I^|pBEU68fL2(BmBvMV%_w7Fg@R>$~I&) zrUhO#5e@gM_l8zm;&!ko!>h)i>)uduC~fQ#1|r-Gr{8GwWee0;ZO?o)k}S;6k;rwJglA(!A)YNCm;PdTSX?3W=l3 zsD0!PaB$b3pAV$zjVuYhyGoscYG>IiP*bZ)qgfBD1+c+^G<`w6H`p1HG>%4(WT?YN z=K3cDQuQ~|7+=M2t=Z`NV4X&X+Ef{JKCEsGrNPjKxf1=DQ*^F4AB!@Ys_r-M_kO=_ zg$ND5Q4hlLS@&Q#caA(i7s%D+r$XYV8i`+sYKzopbmc*HVo0T{13Cc=?dZ8BVLe+O zE&n_8hhDgK|_}H9G5H0^rb--Y|P+VOrRIB#X3Kho_VY`jH zMo&Kg`+a|J|JQ0ekIJg;0{VERS{_>J`f7Uw@~+Zf5JqJFxIl)kIR{oUM_3MkW=QeIW_46ZKBXm`RC}&JFVIJqAL&y{ zq>;Lpv*%q#&Cw{nRkff4E$Z@+7SvCf;RW4EsHcNey9$T~!~kLeae#P00w59K1|$KJ z0Vx{Q?)f#rB^F~GH;>@war5NcvQ5ghWQi3*f#&YKuac$wn188$WpJQzyP)l^eaSNc zd2i7At1uNEdC3!r2EOdcFdDxTe&S*57&N)1dXjoKIcG@GDBT{V(3OtP^^NpDq&2?m zIe}C1EsR5@uXyrM({2ykPW6}1mEE4u@qmk4o6*>RD~v@uJ@ZT{7OmasQBdVMSFbyp zmb=Gu3=dtJR{EFqj;o@$0p%Wr8(HE$Sd-yv+CEs%@qM12+h*|gJ`W%jkOt@jNC#v9 z`U3g^G67kDY(Nem7mx?Y2lNLF01N~a015#`fI)y_z+gZLUpYrU0gDxA#pWn+k*|?27(9 z%R=9#yXbT9lf@DGZ}cuQirzwaiAm%cdL1!>C?rqP?Q}Ez9=D(9CN9(U8kaczTO2u) zrYE75POF5*#9L$0KjW=2Xl2s$X2iwAoa(g35ZtpUF~J&*@{*^wqxM+%dGM3u>6>uW z6={tLwt1!lW&rL7%mmB=)BtJ$Uaiej_vl4Z=3^0isxD6d8TBSdu6R1(Pw&-7XnAV= GN#;NMH9{2t