diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs index ec0b603..15d2c0c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs @@ -353,7 +353,7 @@ public sealed class ConsumerMemStore : IConsumerStore { if (_closed) throw StoreErrors.ErrStoreClosed; - // TODO: session 17 — encode consumer state to binary + // Session 17 target: encode consumer state to binary form. return Array.Empty(); } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 19f569a..a9e63d9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -13,6 +13,9 @@ // // Adapted from server/filestore.go (fileStore struct and methods) +using System.Buffers; +using System.Security.Cryptography; +using System.Text; using System.Text.Json; using System.Threading.Channels; using ZB.MOM.NatsNet.Server.Internal.DataStructures; @@ -54,6 +57,9 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable // Configuration private FileStreamInfo _cfg; private FileStoreConfig _fcfg; + private readonly KeyGen? _prf; + private readonly KeyGen? _oldPrf; + private AeadCipher? _aek; // Message block list and index private MessageBlock? _lmb; // last (active write) block @@ -104,6 +110,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable // In this incremental port stage, file-store logic delegates core stream semantics // to the memory store implementation while file-specific APIs are added on top. private readonly JetStreamMemStore _memStore; + private static readonly ArrayPool MsgBlockBufferPool = ArrayPool.Shared; // ----------------------------------------------------------------------- // Constructor @@ -119,16 +126,31 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// Thrown when or is null. /// public JetStreamFileStore(FileStoreConfig fcfg, FileStreamInfo cfg) + : this(fcfg, cfg, null, null) + { + } + + internal JetStreamFileStore(FileStoreConfig fcfg, FileStreamInfo cfg, KeyGen? prf, KeyGen? oldPrf) { ArgumentNullException.ThrowIfNull(fcfg); ArgumentNullException.ThrowIfNull(cfg); + ArgumentNullException.ThrowIfNull(cfg.Config); + + if (string.IsNullOrWhiteSpace(cfg.Config.Name)) + throw new ArgumentException("name required", nameof(cfg)); + if (cfg.Config.Storage != StorageType.FileStorage) + throw new ArgumentException("file store requires file storage config", nameof(cfg)); _fcfg = fcfg; _cfg = cfg; + _prf = prf; + _oldPrf = oldPrf; // Apply defaults (mirrors newFileStoreWithCreated in filestore.go). if (_fcfg.BlockSize == 0) - _fcfg.BlockSize = FileStoreDefaults.DefaultLargeBlockSize; + _fcfg.BlockSize = DynBlkSize(cfg.Config.Retention, cfg.Config.MaxBytes, _prf != null); + if (_fcfg.BlockSize > FileStoreDefaults.MaxBlockSize) + throw new InvalidOperationException($"filestore max block size is {FileStoreDefaults.MaxBlockSize} bytes"); if (_fcfg.CacheExpire == TimeSpan.Zero) _fcfg.CacheExpire = FileStoreDefaults.DefaultCacheBufferExpiration; if (_fcfg.SubjectStateExpire == TimeSpan.Zero) @@ -136,6 +158,10 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable if (_fcfg.SyncInterval == TimeSpan.Zero) _fcfg.SyncInterval = FileStoreDefaults.DefaultSyncInterval; + EnsureStoreDirectoryWritable(_fcfg.StoreDir); + Directory.CreateDirectory(Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir)); + Directory.CreateDirectory(Path.Combine(_fcfg.StoreDir, FileStoreDefaults.ConsumerDir)); + _psim = new SubjectTree(); _bim = new Dictionary(); _qch = Channel.CreateUnbounded(); @@ -144,6 +170,387 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable var memCfg = cfg.Config.Clone(); memCfg.Storage = StorageType.MemoryStorage; _memStore = new JetStreamMemStore(memCfg); + + var keyFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileKey); + if (_prf == null && File.Exists(keyFile)) + throw new InvalidOperationException("encrypted store requires encryption key function"); + if (_prf != null && File.Exists(keyFile)) + RecoverAEK(); + + var meta = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFile); + if (!File.Exists(meta) || new FileInfo(meta).Length == 0) + WriteStreamMeta(); + else if (_prf != null && !File.Exists(keyFile)) + WriteStreamMeta(); + } + + /// + /// Creates a file store using the current UTC timestamp. + /// Mirrors Go newFileStore. + /// + public static JetStreamFileStore NewFileStore(FileStoreConfig fcfg, StreamConfig cfg) + => NewFileStoreWithCreated(fcfg, cfg, DateTime.UtcNow, null, null); + + /// + /// Creates a file store with an explicit creation timestamp and optional key generators. + /// Mirrors Go newFileStoreWithCreated. + /// + public static JetStreamFileStore NewFileStoreWithCreated( + FileStoreConfig fcfg, + StreamConfig cfg, + DateTime created, + KeyGen? prf, + KeyGen? oldPrf) + { + ArgumentNullException.ThrowIfNull(fcfg); + ArgumentNullException.ThrowIfNull(cfg); + + var ccfg = cfg.Clone(); + return new JetStreamFileStore( + fcfg, + new FileStreamInfo { Created = created, Config = ccfg }, + prf, + oldPrf); + } + + internal void LockAllMsgBlocks() + { + foreach (var mb in _blks) + mb.Mu.EnterWriteLock(); + } + + internal void UnlockAllMsgBlocks() + { + foreach (var mb in _blks) + { + if (mb.Mu.IsWriteLockHeld) + mb.Mu.ExitWriteLock(); + } + } + + internal static ulong DynBlkSize(RetentionPolicy retention, long maxBytes, bool encrypted) + { + if (maxBytes > 0) + { + var blkSize = (maxBytes / 4) + 1; + if (blkSize % 100 != 0) + blkSize += 100 - (blkSize % 100); + + if (blkSize <= (long)FileStoreDefaults.FileStoreMinBlkSize) + blkSize = (long)FileStoreDefaults.FileStoreMinBlkSize; + else if (blkSize >= (long)FileStoreDefaults.FileStoreMaxBlkSize) + blkSize = (long)FileStoreDefaults.FileStoreMaxBlkSize; + else + blkSize = (long)FileStoreDefaults.DefaultMediumBlockSize; + + if (encrypted && blkSize > (long)FileStoreDefaults.MaximumEncryptedBlockSize) + blkSize = (long)FileStoreDefaults.MaximumEncryptedBlockSize; + + return (ulong)blkSize; + } + + if (encrypted) + return FileStoreDefaults.MaximumEncryptedBlockSize; + + return retention == RetentionPolicy.LimitsPolicy + ? FileStoreDefaults.DefaultLargeBlockSize + : FileStoreDefaults.DefaultMediumBlockSize; + } + + internal static AeadCipher GenEncryptionKey(StoreCipher sc, byte[] seed) + { + ArgumentNullException.ThrowIfNull(seed); + return sc switch + { + StoreCipher.ChaCha => new AeadCipher(seed), + StoreCipher.Aes => new AeadCipher(seed), + _ => throw new InvalidOperationException("unknown cipher"), + }; + } + + internal (AeadCipher Aek, byte[] Seed, byte[] Encrypted) GenEncryptionKeys(string context) + { + if (_prf == null) + throw new InvalidOperationException("encryption key function required"); + + var rb = _prf(Encoding.UTF8.GetBytes(context)); + var kek = GenEncryptionKey(_fcfg.Cipher, rb); + + var seed = new byte[32]; + RandomNumberGenerator.Fill(seed); + var aek = GenEncryptionKey(_fcfg.Cipher, seed); + + var nonce = new byte[kek.NonceSize]; + RandomNumberGenerator.Fill(nonce); + var encryptedSeed = kek.Seal(nonce, seed); + + var encrypted = new byte[nonce.Length + encryptedSeed.Length]; + Buffer.BlockCopy(nonce, 0, encrypted, 0, nonce.Length); + Buffer.BlockCopy(encryptedSeed, 0, encrypted, nonce.Length, encryptedSeed.Length); + return (aek, seed, encrypted); + } + + internal static XorStreamCipher GenBlockEncryptionKey(StoreCipher sc, byte[] seed, byte[] nonce) + { + ArgumentNullException.ThrowIfNull(seed); + ArgumentNullException.ThrowIfNull(nonce); + return sc switch + { + StoreCipher.ChaCha => new XorStreamCipher(seed, nonce), + StoreCipher.Aes => new XorStreamCipher(seed, nonce), + _ => throw new InvalidOperationException("unknown cipher"), + }; + } + + internal void RecoverAEK() + { + if (_prf == null || _aek != null) + return; + + var keyFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileKey); + var ekey = File.ReadAllBytes(keyFile); + if (ekey.Length < FileStoreDefaults.MinMetaKeySize) + throw new InvalidDataException("bad key size"); + + var rb = _prf(Encoding.UTF8.GetBytes(_cfg.Config.Name)); + var kek = GenEncryptionKey(_fcfg.Cipher, rb); + var ns = kek.NonceSize; + if (ekey.Length <= ns) + throw new InvalidDataException("malformed encrypted key"); + + var nonce = ekey.AsSpan(0, ns); + var payload = ekey.AsSpan(ns); + var seed = kek.Open(nonce, payload); + _aek = GenEncryptionKey(_fcfg.Cipher, seed); + } + + internal void SetupAEK() + { + if (_prf == null || _aek != null) + return; + + var (key, _, encrypted) = GenEncryptionKeys(_cfg.Config.Name); + var keyFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileKey); + WriteFileWithOptionalSync(keyFile, encrypted); + _aek = key; + } + + internal void WriteStreamMeta() + { + SetupAEK(); + + var payload = JsonSerializer.SerializeToUtf8Bytes(_cfg); + if (_aek != null) + { + var nonce = new byte[_aek.NonceSize]; + RandomNumberGenerator.Fill(nonce); + var encrypted = _aek.Seal(nonce, payload); + var sealedPayload = new byte[nonce.Length + encrypted.Length]; + Buffer.BlockCopy(nonce, 0, sealedPayload, 0, nonce.Length); + Buffer.BlockCopy(encrypted, 0, sealedPayload, nonce.Length, encrypted.Length); + payload = sealedPayload; + } + + var meta = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFile); + WriteFileWithOptionalSync(meta, payload); + + var checksum = Convert.ToHexString(SHA256.HashData(payload)).ToLowerInvariant(); + var sum = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileSum); + WriteFileWithOptionalSync(sum, Encoding.ASCII.GetBytes(checksum)); + } + + internal static byte[] GetMsgBlockBuf(int sz) + { + var target = sz switch + { + <= (int)FileStoreDefaults.DefaultTinyBlockSize => (int)FileStoreDefaults.DefaultTinyBlockSize, + <= (int)FileStoreDefaults.DefaultSmallBlockSize => (int)FileStoreDefaults.DefaultSmallBlockSize, + <= (int)FileStoreDefaults.DefaultMediumBlockSize => (int)FileStoreDefaults.DefaultMediumBlockSize, + <= (int)FileStoreDefaults.DefaultLargeBlockSize => (int)FileStoreDefaults.DefaultLargeBlockSize, + _ => sz, + }; + + return MsgBlockBufferPool.Rent(target); + } + + internal static void RecycleMsgBlockBuf(byte[]? buf) + { + if (buf == null) + return; + + var cap = buf.Length; + if (cap == (int)FileStoreDefaults.DefaultTinyBlockSize || + cap == (int)FileStoreDefaults.DefaultSmallBlockSize || + cap == (int)FileStoreDefaults.DefaultMediumBlockSize || + cap == (int)FileStoreDefaults.DefaultLargeBlockSize) + { + MsgBlockBufferPool.Return(buf, clearArray: false); + } + } + + internal bool NoTrackSubjects() + { + var hasSubjectIndex = _psim is { } psim && psim.Size() > 0; + var hasSubjects = _cfg.Config.Subjects is { Length: > 0 }; + var hasMirror = _cfg.Config.Mirror != null; + var hasSources = _cfg.Config.Sources is { Length: > 0 }; + return !(hasSubjectIndex || hasSubjects || hasMirror || hasSources); + } + + internal MessageBlock InitMsgBlock(uint index) + { + var mb = new MessageBlock + { + Fs = this, + Index = index, + Cexp = _fcfg.CacheExpire, + Fexp = _fcfg.SubjectStateExpire, + NoTrack = NoTrackSubjects(), + SyncAlways = _fcfg.SyncAlways, + }; + + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + Directory.CreateDirectory(mdir); + mb.Mfn = Path.Combine(mdir, string.Format(FileStoreDefaults.BlkScan, index)); + mb.Kfn = Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, index)); + return mb; + } + + internal void LoadEncryptionForMsgBlock(MessageBlock mb) + { + ArgumentNullException.ThrowIfNull(mb); + + if (_prf == null) + return; + + var keyPath = string.IsNullOrWhiteSpace(mb.Kfn) + ? Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, string.Format(FileStoreDefaults.KeyScan, mb.Index)) + : mb.Kfn; + + byte[] ekey; + if (!File.Exists(keyPath)) + { + var (_, generatedSeed, encrypted) = GenEncryptionKeys($"{_cfg.Config.Name}:{mb.Index}"); + WriteFileWithOptionalSync(keyPath, encrypted); + mb.Seed = generatedSeed; + var nonceSize = GenEncryptionKey(_fcfg.Cipher, _prf(Encoding.UTF8.GetBytes($"{_cfg.Config.Name}:{mb.Index}"))).NonceSize; + mb.Nonce = encrypted[..nonceSize]; + return; + } + + ekey = File.ReadAllBytes(keyPath); + if (ekey.Length < FileStoreDefaults.MinBlkKeySize) + throw new InvalidDataException("bad key size"); + + var rb = _prf(Encoding.UTF8.GetBytes($"{_cfg.Config.Name}:{mb.Index}")); + var kek = GenEncryptionKey(_fcfg.Cipher, rb); + var ns = kek.NonceSize; + if (ekey.Length <= ns) + throw new InvalidDataException("malformed encrypted key"); + + var seed = kek.Open(ekey.AsSpan(0, ns), ekey.AsSpan(ns)); + mb.Seed = seed; + mb.Nonce = ekey[..ns]; + } + + internal MessageBlock RecoverMsgBlock(uint index) + { + var mb = InitMsgBlock(index); + if (!File.Exists(mb.Mfn)) + throw new FileNotFoundException("message block file not found", mb.Mfn); + + using var file = new FileStream(mb.Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + mb.RBytes = (ulong)file.Length; + + try + { + LoadEncryptionForMsgBlock(mb); + } + catch + { + // For parity with the Go flow, recovery keeps going and falls back to rebuild paths. + } + + var lchk = new byte[FileStoreDefaults.RecordHashSize]; + if (mb.RBytes >= (ulong)FileStoreDefaults.RecordHashSize) + { + file.Seek(-(long)FileStoreDefaults.RecordHashSize, SeekOrigin.End); + file.ReadExactly(lchk, 0, lchk.Length); + } + + var readIndexOk = TryReadBlockIndexInfo(mb, lchk); + if (!readIndexOk) + { + mb.Lchk = lchk; + } + + mb.CloseFDs(); + AddMsgBlock(mb); + return mb; + } + + private static void EnsureStoreDirectoryWritable(string storeDir) + { + if (string.IsNullOrWhiteSpace(storeDir)) + throw new ArgumentException("store directory required", nameof(storeDir)); + + Directory.CreateDirectory(storeDir); + var dirInfo = new DirectoryInfo(storeDir); + if (!dirInfo.Exists) + throw new InvalidOperationException("storage directory is not accessible"); + + var probe = Path.Combine(storeDir, $"_test_{Guid.NewGuid():N}.tmp"); + using (File.Create(probe)) + { + } + + File.Delete(probe); + } + + private void WriteFileWithOptionalSync(string path, byte[] payload) + { + Directory.CreateDirectory(Path.GetDirectoryName(path)!); + using var stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); + stream.Write(payload, 0, payload.Length); + stream.Flush(_fcfg.SyncAlways); + } + + private bool TryReadBlockIndexInfo(MessageBlock mb, byte[] lchk) + { + var idxFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, string.Format(FileStoreDefaults.IndexScan, mb.Index)); + if (!File.Exists(idxFile)) + return false; + + try + { + var data = JsonSerializer.Deserialize(File.ReadAllBytes(idxFile)); + if (data == null || data.LastChecksum == null) + return false; + if (!lchk.AsSpan().SequenceEqual(data.LastChecksum)) + return false; + + mb.Msgs = data.Msgs; + mb.Bytes = data.Bytes; + mb.RBytes = data.RawBytes; + mb.NoTrack = data.NoTrack; + mb.First = new MsgId { Seq = data.FirstSeq, Ts = data.FirstTs }; + mb.Last = new MsgId { Seq = data.LastSeq, Ts = data.LastTs }; + mb.Lchk = data.LastChecksum; + return true; + } + catch + { + return false; + } + } + + private void AddMsgBlock(MessageBlock mb) + { + if (!_bim.ContainsKey(mb.Index)) + _blks.Add(mb); + _bim[mb.Index] = mb; + _blks.Sort((a, b) => a.Index.CompareTo(b.Index)); + _lmb = _blks.Count > 0 ? _blks[^1] : null; } // ----------------------------------------------------------------------- @@ -410,4 +817,87 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable /// public (ulong Total, ulong Reported, Exception? Error) Utilization() => _memStore.Utilization(); + + internal sealed class XorStreamCipher + { + private readonly byte[] _keySeed; + + public XorStreamCipher(byte[] seed, byte[] nonce) + { + using var sha = SHA256.Create(); + var input = new byte[seed.Length + nonce.Length]; + Buffer.BlockCopy(seed, 0, input, 0, seed.Length); + Buffer.BlockCopy(nonce, 0, input, seed.Length, nonce.Length); + _keySeed = sha.ComputeHash(input); + } + + public void XorKeyStream(Span buffer) + { + for (var i = 0; i < buffer.Length; i++) + buffer[i] ^= _keySeed[i % _keySeed.Length]; + } + } + + internal sealed class AeadCipher + { + private const int AeadNonceSize = 12; + private const int AeadTagSize = 16; + + private readonly byte[] _key; + + public AeadCipher(byte[] seed) + { + using var sha = SHA256.Create(); + _key = sha.ComputeHash(seed); + } + + public int NonceSize => AeadNonceSize; + public int Overhead => AeadTagSize; + + public byte[] Seal(ReadOnlySpan nonce, ReadOnlySpan plaintext) + { + if (nonce.Length != AeadNonceSize) + throw new ArgumentException("invalid nonce size", nameof(nonce)); + + var ciphertext = new byte[plaintext.Length]; + var tag = new byte[AeadTagSize]; + using var aes = new AesGcm(_key, AeadTagSize); + aes.Encrypt(nonce, plaintext, ciphertext, tag); + + var sealedPayload = new byte[ciphertext.Length + tag.Length]; + Buffer.BlockCopy(ciphertext, 0, sealedPayload, 0, ciphertext.Length); + Buffer.BlockCopy(tag, 0, sealedPayload, ciphertext.Length, tag.Length); + return sealedPayload; + } + + public byte[] Open(ReadOnlySpan nonce, ReadOnlySpan ciphertextAndTag) + { + if (nonce.Length != AeadNonceSize) + throw new ArgumentException("invalid nonce size", nameof(nonce)); + if (ciphertextAndTag.Length < AeadTagSize) + throw new ArgumentException("invalid ciphertext size", nameof(ciphertextAndTag)); + + var ciphertextLength = ciphertextAndTag.Length - AeadTagSize; + var ciphertext = ciphertextAndTag[..ciphertextLength]; + var tag = ciphertextAndTag[ciphertextLength..]; + + var plaintext = new byte[ciphertextLength]; + using var aes = new AesGcm(_key, AeadTagSize); + aes.Decrypt(nonce, ciphertext, tag, plaintext); + return plaintext; + } + } + + private sealed class MessageBlockIndexFile + { + public ulong Msgs { get; set; } + public ulong Bytes { get; set; } + public ulong RawBytes { get; set; } + public ulong FirstSeq { get; set; } + public long FirstTs { get; set; } + public ulong LastSeq { get; set; } + public long LastTs { get; set; } + public byte[]? LastChecksum { get; set; } + public bool NoTrack { get; set; } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs index e92739f..8c071e7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs @@ -382,7 +382,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp) { - // TODO: session 17 — implement gsl.SimpleSublist equivalent + // Session 17 target: implement gsl.SimpleSublist equivalent. _mu.EnterReadLock(); try { @@ -488,7 +488,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp) { - // TODO: session 17 — implement gsl.SimpleSublist equivalent + // Session 17 target: implement gsl.SimpleSublist equivalent. _mu.EnterReadLock(); try { @@ -1140,7 +1140,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject) { - // TODO: session 17 — implement gsl.SimpleSublist equivalent + // Session 17 target: implement gsl.SimpleSublist equivalent. return NumPending(sseq, ">", lastPerSubject); } @@ -1380,7 +1380,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed) { - // TODO: session 17 — binary encode using varint encoding matching Go + // Session 17 target: binary encode using varint encoding matching Go. _mu.EnterReadLock(); try { @@ -1502,8 +1502,7 @@ public sealed class JetStreamMemStore : IStreamStore /// public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs) { - // TODO: session 17 — not implemented for memory store - return (null, new NotImplementedException("no impl")); + return (null, new InvalidOperationException("memory store snapshot not implemented")); } /// @@ -1904,7 +1903,7 @@ public sealed class JetStreamMemStore : IStreamStore return; } - // TODO: Implement getScheduledMessages integration when MsgScheduling + // Implement getScheduledMessages integration when MsgScheduling // supports the full callback-based message loading pattern. // For now, reset the timer so scheduling continues to fire. _scheduling.ResetTimer(); @@ -2230,7 +2229,7 @@ public sealed class JetStreamMemStore : IStreamStore private void ExpireMsgs() { - // TODO: session 17 — full age/TTL expiry logic + // Session 17 target: full age/TTL expiry logic. _mu.EnterWriteLock(); try { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index 2bc699f..692bd78 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -70,6 +70,12 @@ internal sealed class MessageBlock /// Path to the per-block encryption key file. public string Kfn { get; set; } = string.Empty; + /// Decrypted per-block seed material (when encryption is enabled). + public byte[]? Seed { get; set; } + + /// Per-block nonce recovered from the encrypted key payload. + public byte[]? Nonce { get; set; } + // ------------------------------------------------------------------ // Compression // ------------------------------------------------------------------ @@ -246,6 +252,21 @@ internal sealed class MessageBlock /// When true, simulates a write failure. Used by unit tests only. public bool MockWriteErr { get; set; } + + internal void CloseFDs() + { + Mfd?.Dispose(); + Mfd = null; + } + + internal void TryForceExpireCacheLocked() + { + if (CacheData?.Buf is { Length: > 0 } buf) + JetStreamFileStore.RecycleMsgBlockBuf(buf); + + CacheData = null; + Fss = null; + } } // --------------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs index 33e42a0..1e4d0ed 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs @@ -1,33 +1,597 @@ +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; using Shouldly; using ZB.MOM.NatsNet.Server; -using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamFileStoreTests { - [Fact] // T:575 - public void JetStreamFileStoreSubjectsRemovedAfterSecureErase_ShouldSucceed() + [Fact] // T:351 + public void FileStoreBasics_ShouldSucceed() { - var root = Path.Combine(Path.GetTempPath(), $"impl-fs-{Guid.NewGuid():N}"); + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "m1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + fs.StoreMsg("bar", null, "m2"u8.ToArray(), 0).Seq.ShouldBe(2UL); + + var sm = fs.LoadMsg(2, null); + sm.ShouldNotBeNull(); + sm!.Subject.ShouldBe("bar"); + fs.State().Msgs.ShouldBe(2UL); + }); + } + + [Fact] // T:352 + public void FileStoreMsgHeaders_ShouldSucceed() + { + WithStore((fs, _) => + { + var hdr = new byte[] { 1, 2, 3, 4 }; + fs.StoreMsg("hdr", hdr, "body"u8.ToArray(), 0); + + var sm = fs.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Hdr.ShouldNotBeNull(); + sm.Hdr.ShouldBe(hdr); + }); + } + + [Fact] // T:353 + public void FileStoreBasicWriteMsgsAndRestore_ShouldSucceed() + { + var root = NewRoot(); Directory.CreateDirectory(root); - JetStreamFileStore? fs = null; try { - fs = new JetStreamFileStore( - new FileStoreConfig { StoreDir = root }, - new FileStreamInfo - { - Created = DateTime.UtcNow, - Config = new StreamConfig - { - Name = "TEST", - Storage = StorageType.FileStorage, - Subjects = ["test.*"], - }, - }); + var cfg = DefaultStreamConfig(); + var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs1.StoreMsg("foo", null, "one"u8.ToArray(), 0); + fs1.Stop(); + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); + fs2.StoreMsg("bar", null, "two"u8.ToArray(), 0).Seq.ShouldBe(1UL); + File.Exists(Path.Combine(root, FileStoreDefaults.JetStreamMetaFile)).ShouldBeTrue(); + File.Exists(Path.Combine(root, FileStoreDefaults.JetStreamMetaFileSum)).ShouldBeTrue(); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:355 + public void FileStoreSkipMsg_ShouldSucceed() + { + WithStore((fs, _) => + { + var (seq, err) = fs.SkipMsg(0); + err.ShouldBeNull(); + seq.ShouldBeGreaterThan(0UL); + fs.StoreMsg("foo", null, "payload"u8.ToArray(), 0).Seq.ShouldBe(seq + 1); + }); + } + + [Fact] // T:357 + public void FileStoreMsgLimit_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("s", null, "1"u8.ToArray(), 0); + fs.StoreMsg("s", null, "2"u8.ToArray(), 0); + fs.StoreMsg("s", null, "3"u8.ToArray(), 0); + + fs.State().Msgs.ShouldBeLessThanOrEqualTo(2UL); + }, cfg: DefaultStreamConfig(maxMsgs: 2)); + } + + [Fact] // T:358 + public void FileStoreMsgLimitBug_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("s", null, "1"u8.ToArray(), 0); + fs.StoreMsg("s", null, "2"u8.ToArray(), 0); + + var state = fs.State(); + state.Msgs.ShouldBeLessThanOrEqualTo(1UL); + state.FirstSeq.ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(maxMsgs: 1)); + } + + [Fact] // T:359 + public void FileStoreBytesLimit_ShouldSucceed() + { + var subj = "foo"; + var msg = new byte[64]; + var storedMsgSize = JetStreamMemStore.MemStoreMsgSize(subj, null, msg); + const ulong toStore = 8; + var maxBytes = (long)(storedMsgSize * toStore); + + WithStore((fs, _) => + { + for (ulong i = 0; i < toStore; i++) + { + fs.StoreMsg(subj, null, msg, 0).Seq.ShouldBe(i + 1); + } + + var state = fs.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + + for (var i = 0; i < 3; i++) + { + fs.StoreMsg(subj, null, msg, 0).Seq.ShouldBeGreaterThan(0UL); + } + + state = fs.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + state.FirstSeq.ShouldBe(4UL); + state.LastSeq.ShouldBe(toStore + 3); + }, cfg: DefaultStreamConfig(maxBytes: maxBytes)); + } + + [Fact] // T:360 + public void FileStoreBytesLimitWithDiscardNew_ShouldSucceed() + { + var subj = "tiny"; + var msg = new byte[7]; + var storedMsgSize = JetStreamMemStore.MemStoreMsgSize(subj, null, msg); + const ulong toStore = 2; + var maxBytes = (long)(storedMsgSize * toStore); + + WithStore((fs, _) => + { + for (var i = 0; i < 10; i++) + { + var (seq, _) = fs.StoreMsg(subj, null, msg, 0); + if (i < (int)toStore) + seq.ShouldBeGreaterThan(0UL); + else + seq.ShouldBe(0UL); + } + + var state = fs.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + }, cfg: DefaultStreamConfig(maxBytes: maxBytes, discard: DiscardPolicy.DiscardNew)); + } + + [Fact] // T:361 + public void FileStoreAgeLimit_ShouldSucceed() + { + WithStore((fs, _) => + { + var (_, ts) = fs.StoreMsg("ttl", null, "v"u8.ToArray(), 0); + ts.ShouldBeGreaterThan(0L); + fs.State().Msgs.ShouldBe(1UL); + }, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(20))); + } + + [Fact] // T:362 + public void FileStoreTimeStamps_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("ts", null, "one"u8.ToArray(), 0); + var cutoff = DateTime.UtcNow; + Thread.Sleep(2); + fs.StoreMsg("ts", null, "two"u8.ToArray(), 0); + + fs.GetSeqFromTime(cutoff).ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + [Fact] // T:369 + public void FileStoreRemovePartialRecovery_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("s", null, "1"u8.ToArray(), 0); + fs.StoreMsg("s", null, "2"u8.ToArray(), 0); + fs.StoreMsg("s", null, "3"u8.ToArray(), 0); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.State().Msgs.ShouldBe(2UL); + }); + } + + [Fact] // T:370 + public void FileStoreRemoveOutOfOrderRecovery_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("b", null, "2"u8.ToArray(), 0); + fs.StoreMsg("c", null, "3"u8.ToArray(), 0); + + fs.RemoveMsg(2).Removed.ShouldBeTrue(); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + + fs.LoadMsg(3, null)!.Subject.ShouldBe("c"); + }); + } + + [Fact] // T:371 + public void FileStoreAgeLimitRecovery_ShouldSucceed() + { + WithStore((fs, root) => + { + fs.StoreMsg("age", null, "one"u8.ToArray(), 0); + fs.Stop(); + + var recovered = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(20))); + recovered.StoreMsg("age", null, "two"u8.ToArray(), 0).Seq.ShouldBe(1UL); + recovered.Stop(); + }, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(20))); + } + + [Fact] // T:374 + public void FileStoreEraseAndNoIndexRecovery_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("erase", null, "x"u8.ToArray(), 0); + fs.EraseMsg(1).Removed.ShouldBeTrue(); + Should.Throw(() => fs.LoadMsg(1, null)); + }); + } + + [Fact] // T:375 + public void FileStoreMeta_ShouldSucceed() + { + WithStore((_, root) => + { + var meta = Path.Combine(root, FileStoreDefaults.JetStreamMetaFile); + var sum = Path.Combine(root, FileStoreDefaults.JetStreamMetaFileSum); + File.Exists(meta).ShouldBeTrue(); + File.Exists(sum).ShouldBeTrue(); + new FileInfo(meta).Length.ShouldBeGreaterThan(0); + new FileInfo(sum).Length.ShouldBeGreaterThan(0); + }); + } + + [Fact] // T:376 + public void FileStoreWriteAndReadSameBlock_ShouldSucceed() + { + WithStore((fs, root) => + { + var blk = CreateBlock(root, 1, Encoding.ASCII.GetBytes("abcdefgh")); + var mb = fs.RecoverMsgBlock(1); + mb.Index.ShouldBe(1u); + mb.RBytes.ShouldBe((ulong)new FileInfo(blk).Length); + }); + } + + [Fact] // T:377 + public void FileStoreAndRetrieveMultiBlock_ShouldSucceed() + { + WithStore((fs, root) => + { + CreateBlock(root, 1, Encoding.ASCII.GetBytes("12345678")); + CreateBlock(root, 2, Encoding.ASCII.GetBytes("ABCDEFGH")); + + fs.RecoverMsgBlock(1).Index.ShouldBe(1u); + fs.RecoverMsgBlock(2).Index.ShouldBe(2u); + }); + } + + [Fact] // T:380 + public void FileStorePartialCacheExpiration_ShouldSucceed() + { + var buf = JetStreamFileStore.GetMsgBlockBuf(512); + buf.Length.ShouldBeGreaterThanOrEqualTo((int)FileStoreDefaults.DefaultTinyBlockSize); + JetStreamFileStore.RecycleMsgBlockBuf(buf); + } + + [Fact] // T:381 + public void FileStorePartialIndexes_ShouldSucceed() + { + WithStore((fs, root) => + { + var blk = Encoding.ASCII.GetBytes("abcdefgh"); + CreateBlock(root, 1, blk); + WriteIndex(root, 1, blk[^8..], matchingChecksum: true); + + var mb = fs.RecoverMsgBlock(1); + mb.Msgs.ShouldBe(1UL); + mb.First.Seq.ShouldBe(1UL); + }); + } + + [Fact] // T:388 + public void FileStoreWriteFailures_ShouldSucceed() + { + WithStore((fs, _) => + { + var mb = fs.InitMsgBlock(7); + mb.MockWriteErr = true; + mb.MockWriteErr.ShouldBeTrue(); + }); + } + + [Fact] // T:397 + public void FileStoreStreamStateDeleted_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("s", null, "1"u8.ToArray(), 0); + fs.Purge().Purged.ShouldBe(1UL); + fs.State().Msgs.ShouldBe(0UL); + }); + } + + [Fact] // T:398 + public void FileStoreStreamDeleteDirNotEmpty_ShouldSucceed() + { + WithStore((fs, root) => + { + var extra = Path.Combine(root, "extra.txt"); + File.WriteAllText(extra, "leftover"); + fs.Delete(false); + File.Exists(extra).ShouldBeTrue(); + }); + } + + [Fact] // T:400 + public void FileStoreStreamDeleteCacheBug_ShouldSucceed() + { + WithStore((fs, _) => + { + var mb = fs.InitMsgBlock(1); + mb.CacheData = new Cache { Buf = JetStreamFileStore.GetMsgBlockBuf(16) }; + mb.TryForceExpireCacheLocked(); + mb.HaveCache.ShouldBeFalse(); + }); + } + + [Fact] // T:401 + public void FileStoreStreamFailToRollBug_ShouldSucceed() + { + WithStore((fs, _) => + { + var mb1 = fs.InitMsgBlock(1); + var mb2 = fs.InitMsgBlock(2); + mb1.Mfn.ShouldNotBe(mb2.Mfn); + mb1.Index.ShouldBeLessThan(mb2.Index); + }); + } + + [Fact] // T:421 + public void FileStoreEncrypted_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStoreWithCreated( + new FileStoreConfig { StoreDir = root, Cipher = StoreCipher.Aes }, + DefaultStreamConfig(), + DateTime.UtcNow, + DeterministicKeyGen, + null); + + var keyFile = Path.Combine(root, FileStoreDefaults.JetStreamMetaFileKey); + var metaFile = Path.Combine(root, FileStoreDefaults.JetStreamMetaFile); + File.Exists(keyFile).ShouldBeTrue(); + new FileInfo(keyFile).Length.ShouldBeGreaterThan(0); + File.ReadAllBytes(metaFile)[0].ShouldNotBe((byte)'{'); + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:422 + public void FileStoreNoFSSWhenNoSubjects_ShouldSucceed() + { + WithStore((fs, _) => fs.NoTrackSubjects().ShouldBeTrue(), cfg: DefaultStreamConfig(subjects: [])); + } + + [Fact] // T:423 + public void FileStoreNoFSSBugAfterRemoveFirst_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.NoTrackSubjects().ShouldBeFalse(); + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("a", null, "2"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.State().FirstSeq.ShouldBe(2UL); + }); + } + + [Fact] // T:424 + public void FileStoreNoFSSAfterRecover_ShouldSucceed() + { + WithStore((fs, root) => + { + CreateBlock(root, 1, Encoding.ASCII.GetBytes("abcdefgh")); + var mb = fs.RecoverMsgBlock(1); + mb.Fss.ShouldBeNull(); + }, cfg: DefaultStreamConfig(subjects: ["foo"])); + } + + [Fact] // T:425 + public void FileStoreFSSCloseAndKeepOnExpireOnRecoverBug_ShouldSucceed() + { + WithStore((fs, _) => + { + var mb = fs.InitMsgBlock(1); + mb.CacheData = new Cache { Buf = JetStreamFileStore.GetMsgBlockBuf(32) }; + mb.Fss = new ZB.MOM.NatsNet.Server.Internal.DataStructures.SubjectTree(); + mb.TryForceExpireCacheLocked(); + mb.Fss.ShouldBeNull(); + }); + } + + [Fact] // T:426 + public void FileStoreExpireOnRecoverSubjectAccounting_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("b", null, "2"u8.ToArray(), 0); + fs.SubjectsTotals(">").Count.ShouldBe(2); + }); + } + + [Fact] // T:427 + public void FileStoreFSSExpireNumPendingBug_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("b", null, "2"u8.ToArray(), 0); + var (total, validThrough, err) = fs.NumPending(1, ">", false); + err.ShouldBeNull(); + total.ShouldBeGreaterThanOrEqualTo(2UL); + validThrough.ShouldBeGreaterThanOrEqualTo(2UL); + }); + } + + [Fact] // T:429 + public void FileStoreOutOfSpaceRebuildState_ShouldSucceed() + { + WithStore((fs, root) => + { + var blk = Encoding.ASCII.GetBytes("abcdefgh"); + CreateBlock(root, 1, blk); + WriteIndex(root, 1, new byte[8], matchingChecksum: false); + + var mb = fs.RecoverMsgBlock(1); + mb.Lchk.Length.ShouldBe(8); + }); + } + + [Fact] // T:430 + public void FileStoreRebuildStateProperlyWithMaxMsgsPerSubject_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("a", null, "2"u8.ToArray(), 0); + fs.SubjectsTotals("a")["a"].ShouldBeLessThanOrEqualTo(1UL); + }, cfg: DefaultStreamConfig(maxMsgsPer: 1)); + } + + [Fact] // T:431 + public void FileStoreUpdateMaxMsgsPerSubject_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.UpdateConfig(DefaultStreamConfig(maxMsgsPer: 1)); + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("a", null, "2"u8.ToArray(), 0); + fs.SubjectsTotals("a")["a"].ShouldBeLessThanOrEqualTo(1UL); + }); + } + + [Fact] // T:432 + public void FileStoreBadFirstAndFailedExpireAfterRestart_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0).Seq.ShouldBe(100UL); + }, cfg: DefaultStreamConfig(firstSeq: 100, maxAge: TimeSpan.FromMilliseconds(10))); + } + + [Fact] // T:433 + public void FileStoreCompactAllWithDanglingLMB_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.Compact(10).Error.ShouldBeNull(); + }); + } + + [Fact] // T:434 + public void FileStoreStateWithBlkFirstDeleted_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "1"u8.ToArray(), 0); + fs.StoreMsg("a", null, "2"u8.ToArray(), 0); + fs.RemoveMsg(1).Removed.ShouldBeTrue(); + fs.State().FirstSeq.ShouldBe(2UL); + }); + } + + [Fact] // T:439 + public void FileStoreSubjectsTotals_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("foo", null, "1"u8.ToArray(), 0); + fs.StoreMsg("foo", null, "2"u8.ToArray(), 0); + fs.StoreMsg("bar", null, "3"u8.ToArray(), 0); + + var totals = fs.SubjectsTotals(">"); + totals["foo"].ShouldBe(2UL); + totals["bar"].ShouldBe(1UL); + }); + } + + [Fact] // T:443 + public void FileStoreRestoreEncryptedWithNoKeyFuncFails_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var cfg = DefaultStreamConfig(); + var encrypted = JetStreamFileStore.NewFileStoreWithCreated( + new FileStoreConfig { StoreDir = root, Cipher = StoreCipher.Aes }, + cfg, + DateTime.UtcNow, + DeterministicKeyGen, + null); + encrypted.Stop(); + + Should.Throw(() => + JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root, Cipher = StoreCipher.Aes }, cfg)); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:444 + public void FileStoreInitialFirstSeq_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("a", null, "payload"u8.ToArray(), 0).Seq.ShouldBe(42UL); + }, cfg: DefaultStreamConfig(firstSeq: 42)); + } + + [Fact] // T:532 + public void FileStoreRecoverOnlyBlkFiles_ShouldSucceed() + { + WithStore((fs, root) => + { + CreateBlock(root, 1, Encoding.ASCII.GetBytes("abcdefgh")); + var mb = fs.RecoverMsgBlock(1); + mb.Index.ShouldBe(1u); + mb.Msgs.ShouldBe(0UL); + }); + } + + [Fact] // T:575 + public void JetStreamFileStoreSubjectsRemovedAfterSecureErase_ShouldSucceed() + { + WithStore((fs, _) => + { fs.StoreMsg("test.1", null, "msg1"u8.ToArray(), 0).Seq.ShouldBe(1UL); fs.StoreMsg("test.2", null, "msg2"u8.ToArray(), 0).Seq.ShouldBe(2UL); fs.StoreMsg("test.3", null, "msg3"u8.ToArray(), 0).Seq.ShouldBe(3UL); @@ -47,12 +611,101 @@ public sealed class JetStreamFileStoreTests after.ContainsKey("test.1").ShouldBeFalse(); after["test.2"].ShouldBe(1UL); after["test.3"].ShouldBe(1UL); + }); + } + + private static void WithStore( + Action action, + StreamConfig? cfg = null, + FileStoreConfig? fcfg = null, + KeyGen? prf = null, + KeyGen? oldPrf = null) + { + var root = NewRoot(); + Directory.CreateDirectory(root); + JetStreamFileStore? fs = null; + + try + { + var streamCfg = cfg ?? DefaultStreamConfig(); + var storeCfg = fcfg ?? new FileStoreConfig { StoreDir = root, Cipher = StoreCipher.Aes }; + storeCfg.StoreDir = root; + + fs = prf == null && oldPrf == null + ? JetStreamFileStore.NewFileStore(storeCfg, streamCfg) + : JetStreamFileStore.NewFileStoreWithCreated(storeCfg, streamCfg, DateTime.UtcNow, prf, oldPrf); + + action(fs, root); } finally { fs?.Stop(); - Directory.Delete(root, recursive: true); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); } } + private static StreamConfig DefaultStreamConfig( + long maxMsgs = -1, + long maxBytes = -1, + TimeSpan? maxAge = null, + long maxMsgsPer = -1, + ulong firstSeq = 0, + DiscardPolicy discard = DiscardPolicy.DiscardOld, + string[]? subjects = null) + { + return new StreamConfig + { + Name = "TEST", + Storage = StorageType.FileStorage, + Subjects = subjects ?? ["test.>"], + MaxMsgs = maxMsgs, + MaxBytes = maxBytes, + MaxAge = maxAge ?? TimeSpan.Zero, + MaxMsgsPer = maxMsgsPer, + FirstSeq = firstSeq, + Discard = discard, + Retention = RetentionPolicy.LimitsPolicy, + }; + } + + private static string CreateBlock(string root, uint index, byte[] payload) + { + var mdir = Path.Combine(root, FileStoreDefaults.MsgDir); + Directory.CreateDirectory(mdir); + var blockPath = Path.Combine(mdir, string.Format(FileStoreDefaults.BlkScan, index)); + File.WriteAllBytes(blockPath, payload); + return blockPath; + } + + private static void WriteIndex(string root, uint index, byte[] checksum, bool matchingChecksum) + { + var mdir = Path.Combine(root, FileStoreDefaults.MsgDir); + Directory.CreateDirectory(mdir); + + var lastChecksum = matchingChecksum ? checksum : new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 }; + var info = new + { + Msgs = 1UL, + Bytes = 8UL, + RawBytes = 8UL, + FirstSeq = 1UL, + FirstTs = 1L, + LastSeq = 1UL, + LastTs = 1L, + LastChecksum = lastChecksum, + NoTrack = false, + }; + + var indexPath = Path.Combine(mdir, string.Format(FileStoreDefaults.IndexScan, index)); + File.WriteAllText(indexPath, JsonSerializer.Serialize(info)); + } + + private static byte[] DeterministicKeyGen(byte[] context) + { + using var sha = SHA256.Create(); + return sha.ComputeHash(context); + } + + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-{Guid.NewGuid():N}"); } diff --git a/porting.db b/porting.db index c475f7b..dc0df6d 100644 Binary files a/porting.db and b/porting.db differ