// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Adapted from server/filestore.go (fileStore struct and methods) using System.Buffers; using System.Security.Cryptography; using System.Text; using System.Text.Json; using System.Threading.Channels; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; /// /// File-backed implementation of . /// Stores JetStream messages in per-block files on disk with optional /// encryption and compression. /// Mirrors the fileStore struct in filestore.go. /// public sealed class JetStreamFileStore : IStreamStore, IDisposable { // ----------------------------------------------------------------------- // Fields — mirrors fileStore struct fields // ----------------------------------------------------------------------- private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.NoRecursion); // State private StreamState _state = new(); private List? _tombs; private LostStreamData? _ld; // Callbacks private StorageUpdateHandler? _scb; private StorageRemoveMsgHandler? _rmcb; private ProcessJetStreamMsgHandler? _pmsgcb; // Age-check timer private Timer? _ageChk; private bool _ageChkRun; private long _ageChkTime; // Background sync timer private Timer? _syncTmr; // Configuration private FileStreamInfo _cfg; private FileStoreConfig _fcfg; private readonly KeyGen? _prf; private readonly KeyGen? _oldPrf; private AeadCipher? _aek; // Message block list and index private MessageBlock? _lmb; // last (active write) block private List _blks = []; private Dictionary _bim = []; // Per-subject index map private SubjectTree? _psim; private HashWheel? _ttls; private MsgScheduling? _scheduling; // Total subject-list length (sum of subject-string lengths) private int _tsl; // writeFullState concurrency guard private readonly object _wfsmu = new(); private long _wfsrun; // Interlocked: is writeFullState running? private int _wfsadml; // Average dmap length (protected by _wfsmu) // Quit / load-done channels (Channel mimics chan struct{}) private Channel? _qch; private Channel? _fsld; // Consumer list private readonly ReaderWriterLockSlim _cmu = new(LockRecursionPolicy.NoRecursion); private List _cfs = []; // Snapshot-in-progress count private int _sips; // Dirty-write counter (incremented when writes are pending flush) private int _dirty; // Lifecycle flags private bool _closing; private volatile bool _closed; // Flush-in-progress flag private bool _fip; // Whether the store has ever received a message private bool _receivedAny; // Whether the first sequence has been moved forward private bool _firstMoved; // Last PurgeEx call time (for throttle logic) private DateTime _lpex; // In this incremental port stage, file-store logic delegates core stream semantics // to the memory store implementation while file-specific APIs are added on top. private readonly JetStreamMemStore _memStore; private static readonly ArrayPool MsgBlockBufferPool = ArrayPool.Shared; private static readonly object InitLock = new(); private static SemaphoreSlim? _diskIoSlots; private static int _diskIoCount; private const int ConsumerHeaderLength = 2; private const int MaxVarIntLength = 10; private const long NanosecondsPerSecond = 1_000_000_000L; private const byte FullStateMagic = 11; private const byte FullStateMinVersion = 1; private const byte FullStateVersion = 3; private const int FullStateHeaderLength = 2; private const int FullStateMinimumLength = 32; private const int FullStateChecksumLength = 8; static JetStreamFileStore() { Init(); } // ----------------------------------------------------------------------- // Constructor // ----------------------------------------------------------------------- /// /// Initialises a file-backed stream store using the supplied file-store /// configuration and stream information. /// /// File-store configuration (block size, cipher, paths, etc.). /// Stream metadata (created time and stream config). /// /// Thrown when or is null. /// public JetStreamFileStore(FileStoreConfig fcfg, FileStreamInfo cfg) : this(fcfg, cfg, null, null) { } internal JetStreamFileStore(FileStoreConfig fcfg, FileStreamInfo cfg, KeyGen? prf, KeyGen? oldPrf) { ArgumentNullException.ThrowIfNull(fcfg); ArgumentNullException.ThrowIfNull(cfg); ArgumentNullException.ThrowIfNull(cfg.Config); if (string.IsNullOrWhiteSpace(cfg.Config.Name)) throw new ArgumentException("name required", nameof(cfg)); if (cfg.Config.Storage != StorageType.FileStorage) throw new ArgumentException("file store requires file storage config", nameof(cfg)); _fcfg = fcfg; _cfg = cfg; _prf = prf; _oldPrf = oldPrf; // Apply defaults (mirrors newFileStoreWithCreated in filestore.go). if (_fcfg.BlockSize == 0) _fcfg.BlockSize = DynBlkSize(cfg.Config.Retention, cfg.Config.MaxBytes, _prf != null); if (_fcfg.BlockSize > FileStoreDefaults.MaxBlockSize) throw new InvalidOperationException($"filestore max block size is {FileStoreDefaults.MaxBlockSize} bytes"); if (_fcfg.CacheExpire == TimeSpan.Zero) _fcfg.CacheExpire = FileStoreDefaults.DefaultCacheBufferExpiration; if (_fcfg.SubjectStateExpire == TimeSpan.Zero) _fcfg.SubjectStateExpire = FileStoreDefaults.DefaultFssExpiration; if (_fcfg.SyncInterval == TimeSpan.Zero) _fcfg.SyncInterval = FileStoreDefaults.DefaultSyncInterval; EnsureStoreDirectoryWritable(_fcfg.StoreDir); Directory.CreateDirectory(Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir)); Directory.CreateDirectory(Path.Combine(_fcfg.StoreDir, FileStoreDefaults.ConsumerDir)); _psim = new SubjectTree(); _bim = new Dictionary(); _qch = Channel.CreateUnbounded(); _fsld = Channel.CreateUnbounded(); var memCfg = cfg.Config.Clone(); memCfg.Storage = StorageType.MemoryStorage; _memStore = new JetStreamMemStore(memCfg); var keyFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileKey); if (_prf == null && File.Exists(keyFile)) throw new InvalidOperationException("encrypted store requires encryption key function"); if (_prf != null && File.Exists(keyFile)) RecoverAEK(); var meta = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFile); if (!File.Exists(meta) || new FileInfo(meta).Length == 0) WriteStreamMeta(); else if (_prf != null && !File.Exists(keyFile)) WriteStreamMeta(); } /// /// Creates a file store using the current UTC timestamp. /// Mirrors Go newFileStore. /// public static JetStreamFileStore NewFileStore(FileStoreConfig fcfg, StreamConfig cfg) => NewFileStoreWithCreated(fcfg, cfg, DateTime.UtcNow, null, null); /// /// Creates a file store with an explicit creation timestamp and optional key generators. /// Mirrors Go newFileStoreWithCreated. /// public static JetStreamFileStore NewFileStoreWithCreated( FileStoreConfig fcfg, StreamConfig cfg, DateTime created, KeyGen? prf, KeyGen? oldPrf) { ArgumentNullException.ThrowIfNull(fcfg); ArgumentNullException.ThrowIfNull(cfg); var ccfg = cfg.Clone(); return new JetStreamFileStore( fcfg, new FileStreamInfo { Created = created, Config = ccfg }, prf, oldPrf); } internal void LockAllMsgBlocks() { foreach (var mb in _blks) mb.Mu.EnterWriteLock(); } internal void UnlockAllMsgBlocks() { foreach (var mb in _blks) { if (mb.Mu.IsWriteLockHeld) mb.Mu.ExitWriteLock(); } } internal static ulong DynBlkSize(RetentionPolicy retention, long maxBytes, bool encrypted) { if (maxBytes > 0) { var blkSize = (maxBytes / 4) + 1; if (blkSize % 100 != 0) blkSize += 100 - (blkSize % 100); if (blkSize <= (long)FileStoreDefaults.FileStoreMinBlkSize) blkSize = (long)FileStoreDefaults.FileStoreMinBlkSize; else if (blkSize >= (long)FileStoreDefaults.FileStoreMaxBlkSize) blkSize = (long)FileStoreDefaults.FileStoreMaxBlkSize; else blkSize = (long)FileStoreDefaults.DefaultMediumBlockSize; if (encrypted && blkSize > (long)FileStoreDefaults.MaximumEncryptedBlockSize) blkSize = (long)FileStoreDefaults.MaximumEncryptedBlockSize; return (ulong)blkSize; } if (encrypted) return FileStoreDefaults.MaximumEncryptedBlockSize; return retention == RetentionPolicy.LimitsPolicy ? FileStoreDefaults.DefaultLargeBlockSize : FileStoreDefaults.DefaultMediumBlockSize; } internal LostStreamData? LostData() { _mu.EnterReadLock(); try { if (_ld == null) return null; return new LostStreamData { Msgs = [.. _ld.Msgs], Bytes = _ld.Bytes, }; } finally { _mu.ExitReadLock(); } } internal void AddLostData(LostStreamData? ld) { _mu.EnterWriteLock(); try { AddLostDataLocked(ld); } finally { _mu.ExitWriteLock(); } } internal void RemoveFromLostData(ulong seq) { _mu.EnterWriteLock(); try { if (_ld == null) return; var index = Array.IndexOf(_ld.Msgs, seq); if (index < 0) return; var msgs = new ulong[_ld.Msgs.Length - 1]; if (index > 0) Array.Copy(_ld.Msgs, 0, msgs, 0, index); if (index < _ld.Msgs.Length - 1) Array.Copy(_ld.Msgs, index + 1, msgs, index, _ld.Msgs.Length - index - 1); if (msgs.Length == 0) _ld = null; else _ld.Msgs = msgs; } finally { _mu.ExitWriteLock(); } } internal void RebuildState(LostStreamData? ld) { _mu.EnterWriteLock(); try { RebuildStateLocked(ld); } finally { _mu.ExitWriteLock(); } } internal void RebuildStateLocked(LostStreamData? ld) { AddLostDataLocked(ld); _state.Msgs = 0; _state.Bytes = 0; _state.FirstSeq = 0; _state.LastSeq = 0; _state.FirstTime = default; _state.LastTime = default; foreach (var mb in _blks) { mb.Mu.EnterReadLock(); try { _state.Msgs += mb.Msgs; _state.Bytes += mb.Bytes; var firstSeq = mb.First.Seq; if (_state.FirstSeq == 0 || (firstSeq < _state.FirstSeq && mb.First.Ts != 0)) { _state.FirstSeq = firstSeq; _state.FirstTime = mb.First.Ts == 0 ? default : FromUnixNanosUtc(mb.First.Ts); } // Preserve the highest last-seq timestamp even when a block's terminal record is erased. var lastSeq = mb.Last.Seq; if (lastSeq >= _state.LastSeq) { _state.LastSeq = lastSeq; _state.LastTime = mb.Last.Ts == 0 ? default : FromUnixNanosUtc(mb.Last.Ts); } } finally { mb.Mu.ExitReadLock(); } } _state.Lost = _ld == null ? null : new LostStreamData { Msgs = [.. _ld.Msgs], Bytes = _ld.Bytes }; } internal static void UpdateTrackingState(StreamState state, MessageBlock mb) { ArgumentNullException.ThrowIfNull(state); ArgumentNullException.ThrowIfNull(mb); var first = mb.First.Seq; var last = mb.Last.Seq; if (state.FirstSeq == 0) state.FirstSeq = first; else if (first < state.FirstSeq && mb.First.Ts != 0) state.FirstSeq = first; if (last > state.LastSeq) state.LastSeq = last; state.Msgs += mb.Msgs; state.Bytes += mb.Bytes; } internal static bool TrackingStatesEqual(StreamState fs, StreamState mb) { ArgumentNullException.ThrowIfNull(fs); ArgumentNullException.ThrowIfNull(mb); // Brand-new state may use FirstSeq=0 while tracked block state starts at 1. if ((fs.FirstSeq > 1 && mb.FirstSeq > 1) || mb.FirstSeq > 1) return fs.Msgs == mb.Msgs && fs.FirstSeq == mb.FirstSeq && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes; return fs.Msgs == mb.Msgs && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes; } internal static List? CopyMsgBlocks(List? src) { if (src == null) return null; return [.. src]; } internal static void KickFlusher(Channel? fch) { if (fch == null) return; _ = fch.Writer.TryWrite(0); } internal static ulong FileStoreMsgSizeRaw(int slen, int hlen, int mlen) { if (hlen == 0) return (ulong)(22 + slen + mlen + 8); return (ulong)(22 + slen + 4 + hlen + mlen + 8); } internal static ulong FileStoreMsgSize(string subj, byte[]? hdr, byte[]? msg) => FileStoreMsgSizeRaw(subj.Length, hdr?.Length ?? 0, msg?.Length ?? 0); internal static ulong FileStoreMsgSizeEstimate(int slen, int maxPayload) => (ulong)(30 + slen + 4 + maxPayload); internal static Exception? CheckNewHeader(byte[]? hdr) { if (hdr == null || hdr.Length < 2) return new InvalidDataException("corrupt state"); if (hdr[0] != FileStoreDefaults.FileStoreMagic || (hdr[1] != FileStoreDefaults.FileStoreVersion && hdr[1] != FileStoreDefaults.NewVersion)) { return new InvalidDataException("corrupt state"); } return null; } internal static bool SubjectsEqual(string a, string b) => a == b; internal static bool SubjectsAll(string a, string b) => true; internal static Func CompareFn(string subject) { if (string.IsNullOrEmpty(subject) || subject == ">") return SubjectsAll; if (SubscriptionIndex.SubjectHasWildcard(subject)) return SubscriptionIndex.SubjectIsSubsetMatch; return SubjectsEqual; } internal static long TimestampNormalized(DateTime t) { if (t == default) return 0; var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime(); return (utc - DateTime.UnixEpoch).Ticks * 100L; } internal static void Init() { var mp = Environment.ProcessorCount; var nIo = Math.Min(16, Math.Max(4, mp)); if (mp > 32) nIo = Math.Max(16, Math.Min(mp, mp / 2)); lock (InitLock) { if (_diskIoSlots != null && _diskIoCount == nIo) return; _diskIoSlots?.Dispose(); _diskIoSlots = new SemaphoreSlim(nIo, nIo); _diskIoCount = nIo; } } internal static (byte Version, Exception? Error) CheckConsumerHeader(byte[]? hdr) { if (hdr is not { Length: >= ConsumerHeaderLength } || hdr[0] != FileStoreDefaults.FileStoreMagic) return (0, new InvalidDataException("corrupt state")); var version = hdr[1]; return version switch { FileStoreDefaults.FileStoreVersion or FileStoreDefaults.NewVersion => (version, null), _ => (0, new InvalidDataException($"unsupported version: {version}")), }; } internal static (ConsumerState? State, Exception? Error) DecodeConsumerState(byte[]? buf) { if (buf == null) return (null, new InvalidDataException("corrupt state")); var (version, headerErr) = CheckConsumerHeader(buf); if (headerErr != null) return (null, headerErr); var index = ConsumerHeaderLength; if (!TryReadUVarInt(buf, ref index, out var ackConsumer) || !TryReadUVarInt(buf, ref index, out var ackStream) || !TryReadUVarInt(buf, ref index, out var deliveredConsumer) || !TryReadUVarInt(buf, ref index, out var deliveredStream)) { return (null, new InvalidDataException("corrupt state")); } var state = new ConsumerState { AckFloor = new SequencePair { Consumer = ackConsumer, Stream = ackStream }, Delivered = new SequencePair { Consumer = deliveredConsumer, Stream = deliveredStream }, }; if (version == FileStoreDefaults.FileStoreVersion) { if (state.AckFloor.Consumer > 1) state.Delivered.Consumer += state.AckFloor.Consumer - 1; if (state.AckFloor.Stream > 1) state.Delivered.Stream += state.AckFloor.Stream - 1; } const ulong highBit = 1UL << 63; if ((state.AckFloor.Stream & highBit) != 0 || (state.Delivered.Stream & highBit) != 0) return (null, new InvalidDataException("corrupt state")); if (!TryReadUVarInt(buf, ref index, out var pendingCount)) return (null, new InvalidDataException("corrupt state")); if (pendingCount > 0) { if (!TryReadVarInt(buf, ref index, out var minTs)) return (null, new InvalidDataException("corrupt state")); state.Pending = new Dictionary((int)pendingCount); for (var i = 0; i < (int)pendingCount; i++) { if (!TryReadUVarInt(buf, ref index, out var sseq)) return (null, new InvalidDataException("corrupt state")); var dseq = 0UL; if (version == FileStoreDefaults.NewVersion && !TryReadUVarInt(buf, ref index, out dseq)) return (null, new InvalidDataException("corrupt state")); if (!TryReadVarInt(buf, ref index, out var ts)) return (null, new InvalidDataException("corrupt state")); sseq += state.AckFloor.Stream; if (sseq == 0) return (null, new InvalidDataException("corrupt state")); if (version == FileStoreDefaults.NewVersion) dseq += state.AckFloor.Consumer; var adjustedTs = version == FileStoreDefaults.FileStoreVersion ? (ts + minTs) * NanosecondsPerSecond : (minTs - ts) * NanosecondsPerSecond; state.Pending[sseq] = new Pending { Sequence = dseq, Timestamp = adjustedTs, }; } } if (!TryReadUVarInt(buf, ref index, out var redeliveredCount)) return (null, new InvalidDataException("corrupt state")); if (redeliveredCount > 0) { state.Redelivered = new Dictionary((int)redeliveredCount); for (var i = 0; i < (int)redeliveredCount; i++) { if (!TryReadUVarInt(buf, ref index, out var seq) || !TryReadUVarInt(buf, ref index, out var count)) { return (null, new InvalidDataException("corrupt state")); } if (seq > 0 && count > 0) { seq += state.AckFloor.Stream; state.Redelivered[seq] = count; } } } return (state, null); } internal static Exception? WriteFileWithSync(string name, byte[] data, UnixFileMode perm) => WriteAtomically(name, data, perm, sync: true); internal static Exception? WriteAtomically(string name, byte[] data, UnixFileMode perm, bool sync) { ArgumentException.ThrowIfNullOrWhiteSpace(name); ArgumentNullException.ThrowIfNull(data); Init(); var slots = _diskIoSlots!; var tmp = name + ".tmp"; slots.Wait(); try { var options = sync ? FileOptions.WriteThrough : FileOptions.None; using (var stream = new FileStream( tmp, FileMode.Create, FileAccess.Write, FileShare.None, bufferSize: 4096, options)) { stream.Write(data, 0, data.Length); stream.Flush(sync); } try { File.SetUnixFileMode(tmp, perm); } catch (PlatformNotSupportedException) { } File.Move(tmp, name, overwrite: true); if (sync) { var dir = Path.GetDirectoryName(Path.GetFullPath(name)); if (!string.IsNullOrEmpty(dir)) { try { using var handle = File.OpenHandle(dir, FileMode.Open, FileAccess.Read, FileShare.ReadWrite | FileShare.Delete); RandomAccess.FlushToDisk(handle); } catch { // Best-effort directory metadata sync. } } } return null; } catch (Exception ex) { try { if (File.Exists(tmp)) File.Delete(tmp); } catch { // Best-effort cleanup. } return ex; } finally { slots.Release(); } } internal static AeadCipher GenEncryptionKey(StoreCipher sc, byte[] seed) { ArgumentNullException.ThrowIfNull(seed); return sc switch { StoreCipher.ChaCha => new AeadCipher(seed), StoreCipher.Aes => new AeadCipher(seed), _ => throw new InvalidOperationException("unknown cipher"), }; } internal (AeadCipher Aek, byte[] Seed, byte[] Encrypted) GenEncryptionKeys(string context) { if (_prf == null) throw new InvalidOperationException("encryption key function required"); var rb = _prf(Encoding.UTF8.GetBytes(context)); var kek = GenEncryptionKey(_fcfg.Cipher, rb); var seed = new byte[32]; RandomNumberGenerator.Fill(seed); var aek = GenEncryptionKey(_fcfg.Cipher, seed); var nonce = new byte[kek.NonceSize]; RandomNumberGenerator.Fill(nonce); var encryptedSeed = kek.Seal(nonce, seed); var encrypted = new byte[nonce.Length + encryptedSeed.Length]; Buffer.BlockCopy(nonce, 0, encrypted, 0, nonce.Length); Buffer.BlockCopy(encryptedSeed, 0, encrypted, nonce.Length, encryptedSeed.Length); return (aek, seed, encrypted); } internal static XorStreamCipher GenBlockEncryptionKey(StoreCipher sc, byte[] seed, byte[] nonce) { ArgumentNullException.ThrowIfNull(seed); ArgumentNullException.ThrowIfNull(nonce); return sc switch { StoreCipher.ChaCha => new XorStreamCipher(seed, nonce), StoreCipher.Aes => new XorStreamCipher(seed, nonce), _ => throw new InvalidOperationException("unknown cipher"), }; } private void AddLostDataLocked(LostStreamData? ld) { if (ld == null) return; if (_ld != null) { var known = new HashSet(_ld.Msgs); var merged = new List(_ld.Msgs); var added = false; foreach (var seq in ld.Msgs) { if (known.Add(seq)) { merged.Add(seq); added = true; } } if (added) { merged.Sort(); _ld.Msgs = [.. merged]; _ld.Bytes += ld.Bytes; } } else { _ld = new LostStreamData { Msgs = [.. ld.Msgs], Bytes = ld.Bytes, }; } } private static DateTime FromUnixNanosUtc(long nanos) { var seconds = nanos / 1_000_000_000L; var remainderNanos = nanos % 1_000_000_000L; return DateTimeOffset.FromUnixTimeSeconds(seconds).AddTicks(remainderNanos / 100L).UtcDateTime; } private static bool TryReadUVarInt(ReadOnlySpan source, ref int index, out ulong value) { value = 0; var shift = 0; for (var i = 0; i < MaxVarIntLength; i++) { if ((uint)index >= (uint)source.Length) { index = -1; value = 0; return false; } var b = source[index++]; if (b < 0x80) { if (i == MaxVarIntLength - 1 && b > 1) { index = -1; value = 0; return false; } value |= (ulong)b << shift; return true; } value |= (ulong)(b & 0x7F) << shift; shift += 7; } index = -1; value = 0; return false; } private static bool TryReadVarInt(ReadOnlySpan source, ref int index, out long value) { if (!TryReadUVarInt(source, ref index, out var unsigned)) { value = 0; return false; } value = (long)(unsigned >> 1); if ((unsigned & 1) != 0) value = ~value; return true; } internal void RecoverAEK() { if (_prf == null || _aek != null) return; var keyFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileKey); var ekey = File.ReadAllBytes(keyFile); if (ekey.Length < FileStoreDefaults.MinMetaKeySize) throw new InvalidDataException("bad key size"); var rb = _prf(Encoding.UTF8.GetBytes(_cfg.Config.Name)); var kek = GenEncryptionKey(_fcfg.Cipher, rb); var ns = kek.NonceSize; if (ekey.Length <= ns) throw new InvalidDataException("malformed encrypted key"); var nonce = ekey.AsSpan(0, ns); var payload = ekey.AsSpan(ns); var seed = kek.Open(nonce, payload); _aek = GenEncryptionKey(_fcfg.Cipher, seed); } internal void SetupAEK() { if (_prf == null || _aek != null) return; var (key, _, encrypted) = GenEncryptionKeys(_cfg.Config.Name); var keyFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileKey); WriteFileWithOptionalSync(keyFile, encrypted); _aek = key; } internal void WriteStreamMeta() { SetupAEK(); var payload = JsonSerializer.SerializeToUtf8Bytes(_cfg); if (_aek != null) { var nonce = new byte[_aek.NonceSize]; RandomNumberGenerator.Fill(nonce); var encrypted = _aek.Seal(nonce, payload); var sealedPayload = new byte[nonce.Length + encrypted.Length]; Buffer.BlockCopy(nonce, 0, sealedPayload, 0, nonce.Length); Buffer.BlockCopy(encrypted, 0, sealedPayload, nonce.Length, encrypted.Length); payload = sealedPayload; } var meta = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFile); WriteFileWithOptionalSync(meta, payload); var checksum = Convert.ToHexString(SHA256.HashData(payload)).ToLowerInvariant(); var sum = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.JetStreamMetaFileSum); WriteFileWithOptionalSync(sum, Encoding.ASCII.GetBytes(checksum)); } internal static byte[] GetMsgBlockBuf(int sz) { var target = sz switch { <= (int)FileStoreDefaults.DefaultTinyBlockSize => (int)FileStoreDefaults.DefaultTinyBlockSize, <= (int)FileStoreDefaults.DefaultSmallBlockSize => (int)FileStoreDefaults.DefaultSmallBlockSize, <= (int)FileStoreDefaults.DefaultMediumBlockSize => (int)FileStoreDefaults.DefaultMediumBlockSize, <= (int)FileStoreDefaults.DefaultLargeBlockSize => (int)FileStoreDefaults.DefaultLargeBlockSize, _ => sz, }; return MsgBlockBufferPool.Rent(target); } internal static void RecycleMsgBlockBuf(byte[]? buf) { if (buf == null) return; var cap = buf.Length; if (cap == (int)FileStoreDefaults.DefaultTinyBlockSize || cap == (int)FileStoreDefaults.DefaultSmallBlockSize || cap == (int)FileStoreDefaults.DefaultMediumBlockSize || cap == (int)FileStoreDefaults.DefaultLargeBlockSize) { MsgBlockBufferPool.Return(buf, clearArray: false); } } internal bool NoTrackSubjects() { var hasSubjectIndex = _psim is { } psim && psim.Size() > 0; var hasSubjects = _cfg.Config.Subjects is { Length: > 0 }; var hasMirror = _cfg.Config.Mirror != null; var hasSources = _cfg.Config.Sources is { Length: > 0 }; return !(hasSubjectIndex || hasSubjects || hasMirror || hasSources); } internal MessageBlock InitMsgBlock(uint index) { var mb = new MessageBlock { Fs = this, Index = index, Cexp = _fcfg.CacheExpire, Fexp = _fcfg.SubjectStateExpire, NoTrack = NoTrackSubjects(), SyncAlways = _fcfg.SyncAlways, }; var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); Directory.CreateDirectory(mdir); mb.Mfn = Path.Combine(mdir, string.Format(FileStoreDefaults.BlkScan, index)); mb.Kfn = Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, index)); return mb; } internal void LoadEncryptionForMsgBlock(MessageBlock mb) { ArgumentNullException.ThrowIfNull(mb); if (_prf == null) return; var keyPath = string.IsNullOrWhiteSpace(mb.Kfn) ? Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, string.Format(FileStoreDefaults.KeyScan, mb.Index)) : mb.Kfn; byte[] ekey; if (!File.Exists(keyPath)) { var (_, generatedSeed, encrypted) = GenEncryptionKeys($"{_cfg.Config.Name}:{mb.Index}"); WriteFileWithOptionalSync(keyPath, encrypted); mb.Seed = generatedSeed; var nonceSize = GenEncryptionKey(_fcfg.Cipher, _prf(Encoding.UTF8.GetBytes($"{_cfg.Config.Name}:{mb.Index}"))).NonceSize; mb.Nonce = encrypted[..nonceSize]; return; } ekey = File.ReadAllBytes(keyPath); if (ekey.Length < FileStoreDefaults.MinBlkKeySize) throw new InvalidDataException("bad key size"); var rb = _prf(Encoding.UTF8.GetBytes($"{_cfg.Config.Name}:{mb.Index}")); var kek = GenEncryptionKey(_fcfg.Cipher, rb); var ns = kek.NonceSize; if (ekey.Length <= ns) throw new InvalidDataException("malformed encrypted key"); var seed = kek.Open(ekey.AsSpan(0, ns), ekey.AsSpan(ns)); mb.Seed = seed; mb.Nonce = ekey[..ns]; } internal MessageBlock RecoverMsgBlock(uint index) { var mb = InitMsgBlock(index); if (!File.Exists(mb.Mfn)) throw new FileNotFoundException("message block file not found", mb.Mfn); using var file = new FileStream(mb.Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); mb.RBytes = (ulong)file.Length; try { LoadEncryptionForMsgBlock(mb); } catch { // For parity with the Go flow, recovery keeps going and falls back to rebuild paths. } var lchk = new byte[FileStoreDefaults.RecordHashSize]; if (mb.RBytes >= (ulong)FileStoreDefaults.RecordHashSize) { file.Seek(-(long)FileStoreDefaults.RecordHashSize, SeekOrigin.End); file.ReadExactly(lchk, 0, lchk.Length); } var readIndexOk = TryReadBlockIndexInfo(mb, lchk); if (!readIndexOk) { mb.Lchk = lchk; } mb.CloseFDs(); AddMsgBlock(mb); return mb; } internal void Warn(string format, params object?[] args) { if (_fcfg.Server is not NatsServer server) return; server.Warnf("Filestore [{0}] " + format, BuildLogArgs(args)); } internal void Debug(string format, params object?[] args) { if (_fcfg.Server is not NatsServer server) return; server.Debugf("Filestore [{0}] " + format, BuildLogArgs(args)); } internal Exception? RecoverFullState() { _mu.EnterWriteLock(); try { RecoverPartialPurge(); var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); byte[] raw; try { raw = File.ReadAllBytes(fn); } catch (FileNotFoundException ex) { return ex; } catch (DirectoryNotFoundException ex) { return ex; } catch (Exception ex) { Warn("Could not read stream state file: {0}", ex); return ex; } if (raw.Length < FullStateMinimumLength) { TryDeleteFile(fn); Warn("Stream state too short ({0} bytes)", raw.Length); return new InvalidDataException("corrupt state"); } if (!TryValidateFullStateChecksum(raw, out var buf)) { TryDeleteFile(fn); Warn("Stream state checksum did not match"); return new InvalidDataException("corrupt state"); } if (_prf != null && _aek != null) { var ns = _aek.NonceSize; if (buf.Length <= ns) { Warn("Stream state error reading encryption key: malformed ciphertext"); return new InvalidDataException("corrupt state"); } try { buf = _aek.Open(buf.AsSpan(0, ns), buf.AsSpan(ns)); } catch (Exception ex) { Warn("Stream state error reading encryption key: {0}", ex); return ex; } } if (buf.Length < FullStateHeaderLength) { TryDeleteFile(fn); Warn("Stream state missing header"); return new InvalidDataException("corrupt state"); } var version = buf[1]; if (buf[0] != FullStateMagic || version < FullStateMinVersion || version > FullStateVersion) { TryDeleteFile(fn); Warn("Stream state magic and version mismatch"); return new InvalidDataException("corrupt state"); } var bi = FullStateHeaderLength; if (!TryReadUVarInt(buf, ref bi, out var msgs) || !TryReadUVarInt(buf, ref bi, out var bytes) || !TryReadUVarInt(buf, ref bi, out var firstSeq) || !TryReadVarInt(buf, ref bi, out var baseTime) || !TryReadUVarInt(buf, ref bi, out var lastSeq) || !TryReadVarInt(buf, ref bi, out var lastTs)) { TryDeleteFile(fn); Warn("Stream state could not decode stream summary"); return new InvalidDataException("corrupt state"); } var recoveredState = new StreamState { Msgs = msgs, Bytes = bytes, FirstSeq = firstSeq, LastSeq = lastSeq, FirstTime = baseTime == 0 ? default : FromUnixNanosUtc(baseTime), LastTime = lastTs == 0 ? default : FromUnixNanosUtc(lastTs), }; _psim ??= new SubjectTree(); _psim.Reset(); _tsl = 0; if (!TryReadUVarInt(buf, ref bi, out var numSubjects)) return CorruptStateWithDelete(fn, "Stream state missing subject metadata"); for (var i = 0UL; i < numSubjects; i++) { if (!TryReadUVarInt(buf, ref bi, out var subjectLength)) return CorruptStateWithDelete(fn, "Stream state subject length decode failed"); if (subjectLength == 0) continue; if (bi < 0 || bi + (int)subjectLength > buf.Length) return CorruptStateWithDelete(fn, $"Stream state bad subject len ({subjectLength})"); var subject = Encoding.Latin1.GetString(buf, bi, (int)subjectLength); if (!SubscriptionIndex.IsValidLiteralSubject(subject)) return CorruptStateWithDelete(fn, "Stream state corrupt subject detected"); bi += (int)subjectLength; if (!TryReadUVarInt(buf, ref bi, out var total) || !TryReadUVarInt(buf, ref bi, out var fblk)) return CorruptStateWithDelete(fn, "Stream state could not decode subject index"); ulong lblk = fblk; if (total > 1) { if (!TryReadUVarInt(buf, ref bi, out lblk)) return CorruptStateWithDelete(fn, "Stream state could not decode subject last block"); } _psim.Insert(Encoding.Latin1.GetBytes(subject), new Psi { Total = total, Fblk = (uint)fblk, Lblk = (uint)lblk, }); _tsl += (int)subjectLength; } if (!TryReadUVarInt(buf, ref bi, out var numBlocks)) return CorruptStateWithDelete(fn, "Stream state could not decode block count"); var parsedBlocks = new List((int)numBlocks); var parsedMap = new Dictionary((int)numBlocks); var mstate = new StreamState(); var lastBlockIndex = numBlocks > 0 ? numBlocks - 1 : 0; for (var i = 0UL; i < numBlocks; i++) { if (!TryReadUVarInt(buf, ref bi, out var idx) || !TryReadUVarInt(buf, ref bi, out var nbytes) || !TryReadUVarInt(buf, ref bi, out var fseq) || !TryReadVarInt(buf, ref bi, out var fts) || !TryReadUVarInt(buf, ref bi, out var lseq) || !TryReadVarInt(buf, ref bi, out var lts) || !TryReadUVarInt(buf, ref bi, out var numDeleted)) { return CorruptStateWithDelete(fn, "Stream state block decode failed"); } ulong ttls = 0; if (version >= 2) { if (!TryReadUVarInt(buf, ref bi, out ttls)) return CorruptStateWithDelete(fn, "Stream state TTL metadata decode failed"); } ulong schedules = 0; if (version >= 3) { if (!TryReadUVarInt(buf, ref bi, out schedules)) return CorruptStateWithDelete(fn, "Stream state schedule metadata decode failed"); } var mb = InitMsgBlock((uint)idx); mb.First = new MsgId { Seq = fseq, Ts = fts + baseTime }; mb.Last = new MsgId { Seq = lseq, Ts = lts + baseTime }; mb.Bytes = nbytes; mb.Msgs = lseq >= fseq ? (lseq - fseq + 1) : 0; mb.Ttls = ttls; mb.Schedules = schedules; mb.Closed = true; if (numDeleted > 0) { try { var (dmap, consumed) = SequenceSet.Decode(buf.AsSpan(bi)); if (consumed <= 0) return CorruptStateWithDelete(fn, "Stream state error decoding deleted map"); mb.Dmap = dmap; mb.Msgs = mb.Msgs > numDeleted ? (mb.Msgs - numDeleted) : 0; bi += consumed; } catch (Exception ex) { Warn("Stream state error decoding deleted map: {0}", ex); return CorruptStateWithDelete(fn, "Stream state error decoding deleted map"); } } if (mb.Msgs > 0 || i == lastBlockIndex) { parsedBlocks.Add(mb); parsedMap[mb.Index] = mb; UpdateTrackingState(mstate, mb); } else { _dirty++; } } if (!TryReadUVarInt(buf, ref bi, out var blkIndex)) return CorruptStateWithDelete(fn, "Stream state has no block index"); if (bi < 0 || bi + FullStateChecksumLength > buf.Length) return CorruptStateWithDelete(fn, "Stream state has no checksum present"); var lchk = buf.AsSpan(bi, FullStateChecksumLength).ToArray(); _state = recoveredState; _blks = parsedBlocks; _bim = parsedMap; _lmb = _blks.Count > 0 ? _blks[^1] : null; if (_lmb == null || _lmb.Index != (uint)blkIndex) return CorruptStateWithDelete(fn, "Stream state block does not exist or index mismatch"); if (!File.Exists(_lmb.Mfn)) { Warn("Stream state detected prior state, could not locate msg block {0}", blkIndex); return new InvalidOperationException("prior stream state detected"); } if (!LastChecksumMatches(_lmb, lchk)) { Warn("Stream state outdated, last block has additional entries, will rebuild"); return new InvalidOperationException("prior stream state detected"); } var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); if (Directory.Exists(mdir)) { foreach (var path in Directory.EnumerateFiles(mdir, "*" + FileStoreDefaults.BlkSuffix, SearchOption.TopDirectoryOnly)) { if (!TryParseBlockIndex(Path.GetFileName(path), out var index)) continue; if (index > blkIndex) { Warn("Stream state outdated, found extra blocks, will rebuild"); return new InvalidOperationException("prior stream state detected"); } if (index <= uint.MaxValue && _bim.TryGetValue((uint)index, out var mb)) mb.Closed = false; } } var rebuildRequired = false; foreach (var mb in _blks) { if (!mb.Closed) continue; rebuildRequired = true; Warn("Stream state detected prior state, could not locate msg block {0}", mb.Index); } if (rebuildRequired) return new InvalidOperationException("prior stream state detected"); if (!TrackingStatesEqual(_state, mstate)) return CorruptStateWithDelete(fn, "Stream state encountered internal inconsistency on recover"); return null; } finally { _mu.ExitWriteLock(); } } private object?[] BuildLogArgs(object?[] args) { if (args.Length == 0) return [_cfg.Config.Name]; var formatted = new object?[args.Length + 1]; formatted[0] = _cfg.Config.Name; Array.Copy(args, 0, formatted, 1, args.Length); return formatted; } private Exception CorruptStateWithDelete(string fileName, string message) { TryDeleteFile(fileName); Warn("{0}", message); return new InvalidDataException("corrupt state"); } private static void TryDeleteFile(string fileName) { try { if (File.Exists(fileName)) File.Delete(fileName); } catch { // Best effort to drop unusable snapshots. } } private bool TryValidateFullStateChecksum(byte[] raw, out byte[] payload) { payload = Array.Empty(); if (raw.Length <= FullStateChecksumLength) return false; var contentLength = raw.Length - FullStateChecksumLength; payload = raw[..contentLength]; var checksum = raw.AsSpan(contentLength, FullStateChecksumLength); var key = SHA256.HashData(Encoding.UTF8.GetBytes(_cfg.Config.Name)); using var hmac = new HMACSHA256(key); var digest = hmac.ComputeHash(payload); return checksum.SequenceEqual(digest.AsSpan(0, FullStateChecksumLength)); } private bool LastChecksumMatches(MessageBlock mb, byte[] expected) { try { using var fs = new FileStream(mb.Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); if (fs.Length < FileStoreDefaults.RecordHashSize) return expected.AsSpan().SequenceEqual(new byte[FileStoreDefaults.RecordHashSize]); var lchk = new byte[FileStoreDefaults.RecordHashSize]; fs.Seek(-FileStoreDefaults.RecordHashSize, SeekOrigin.End); fs.ReadExactly(lchk, 0, lchk.Length); return expected.AsSpan().SequenceEqual(lchk); } catch { return false; } } private static bool TryParseBlockIndex(string fileName, out ulong index) { index = 0; if (!fileName.EndsWith(FileStoreDefaults.BlkSuffix, StringComparison.Ordinal)) return false; var stem = Path.GetFileNameWithoutExtension(fileName); return ulong.TryParse(stem, out index); } private static bool TryParseIndexedFile(string fileName, string format, out uint index) { index = 0; var suffix = Path.GetExtension(string.Format(format, 0)); if (string.IsNullOrEmpty(suffix) || !fileName.EndsWith(suffix, StringComparison.Ordinal)) return false; var stem = fileName[..^suffix.Length]; return uint.TryParse(stem, out index); } private void RemoveRecoveredBlockLocked(MessageBlock mb) { _blks.Remove(mb); _bim.Remove(mb.Index); if (ReferenceEquals(_lmb, mb)) _lmb = _blks.Count > 0 ? _blks[^1] : null; mb.CloseFDs(); TryDeleteFile(mb.Mfn); var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.IndexScan, mb.Index))); TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, mb.Index))); } private void RecoverPartialPurge() { var storeDir = _fcfg.StoreDir; var msgDir = Path.Combine(storeDir, FileStoreDefaults.MsgDir); var purgeDir = Path.Combine(storeDir, FileStoreDefaults.PurgeDir); var newMsgDir = Path.Combine(storeDir, FileStoreDefaults.NewMsgDir); if (!Directory.Exists(msgDir)) { if (Directory.Exists(newMsgDir)) { Directory.Move(newMsgDir, msgDir); } else if (Directory.Exists(purgeDir)) { Directory.Move(purgeDir, msgDir); } return; } if (Directory.Exists(newMsgDir)) Directory.Delete(newMsgDir, recursive: true); if (Directory.Exists(purgeDir)) Directory.Delete(purgeDir, recursive: true); } private void ResetAgeChk(long delta) { if (_ageChkRun) return; long next = long.MaxValue; if (_ttls != null) next = _ttls.GetNextExpiration(next); if (_cfg.Config.MaxAge <= TimeSpan.Zero && next == long.MaxValue) { CancelAgeChk(); return; } var fireIn = _cfg.Config.MaxAge; if (delta == 0 && _state.Msgs > 0) { var until = TimeSpan.FromSeconds(2); if (fireIn == TimeSpan.Zero || until < fireIn) fireIn = until; } if (next < long.MaxValue) { var nextTicks = DateTime.UnixEpoch.Ticks + next / 100L; var nextUtc = new DateTime(Math.Max(nextTicks, DateTime.UnixEpoch.Ticks), DateTimeKind.Utc); var until = nextUtc - DateTime.UtcNow; if (fireIn == TimeSpan.Zero || until < fireIn) fireIn = until; } if (delta > 0) { var deltaDur = TimeSpan.FromTicks(delta / 100L); if (fireIn == TimeSpan.Zero || deltaDur < fireIn) fireIn = deltaDur; } if (fireIn < TimeSpan.FromMilliseconds(250)) fireIn = TimeSpan.FromMilliseconds(250); var expires = DateTime.UtcNow.Ticks + fireIn.Ticks; if (_ageChkTime > 0 && expires > _ageChkTime) return; _ageChkTime = expires; if (_ageChk != null) _ageChk.Change(fireIn, Timeout.InfiniteTimeSpan); else _ageChk = new Timer(_ => { }, null, fireIn, Timeout.InfiniteTimeSpan); } private void CancelAgeChk() { _ageChk?.Dispose(); _ageChk = null; _ageChkTime = 0; } private void RunMsgScheduling() { _mu.EnterWriteLock(); try { _scheduling?.ResetTimer(); } finally { _mu.ExitWriteLock(); } } internal Exception? RecoverTTLState() { _mu.EnterWriteLock(); try { var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.TtlStreamStateFile); byte[]? buf = null; try { if (File.Exists(fn)) buf = File.ReadAllBytes(fn); } catch (Exception ex) { return ex; } _ttls = HashWheel.NewHashWheel(); ulong ttlSeq = 0; if (buf != null) { try { ttlSeq = _ttls.Decode(buf); } catch (Exception ex) { Warn("Error decoding TTL state: {0}", ex); TryDeleteFile(fn); } } if (ttlSeq < _state.FirstSeq) ttlSeq = _state.FirstSeq; try { if (_state.Msgs > 0 && ttlSeq <= _state.LastSeq) { Warn("TTL state is outdated; attempting to recover using linear scan (seq {0} to {1})", ttlSeq, _state.LastSeq); foreach (var mb in _blks) { mb.Mu.EnterReadLock(); try { if (mb.Ttls == 0 || mb.Last.Seq < ttlSeq) continue; var start = Math.Max(ttlSeq, mb.First.Seq); var end = mb.Last.Seq; for (var seq = start; seq <= end; seq++) { StoreMsg? sm = null; try { sm = LoadMsg(seq, null); } catch (Exception ex) { Warn("Error loading msg seq {0} for recovering TTL: {1}", seq, ex); } if (sm?.Hdr is not { Length: > 0 }) { if (seq == ulong.MaxValue) break; continue; } var (ttl, _) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr); if (ttl > 0) { var expires = sm.Ts + (ttl * NanosecondsPerSecond); _ttls.Add(seq, expires); } if (seq == ulong.MaxValue) break; } } finally { mb.Mu.ExitReadLock(); } } } } finally { ResetAgeChk(0); } return null; } finally { _mu.ExitWriteLock(); } } internal Exception? RecoverMsgSchedulingState() { _mu.EnterWriteLock(); try { var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.MsgSchedulingStreamStateFile); byte[]? buf = null; try { if (File.Exists(fn)) buf = File.ReadAllBytes(fn); } catch (Exception ex) { return ex; } _scheduling = new MsgScheduling(RunMsgScheduling); ulong schedSeq = 0; if (buf != null) { var (decodedSeq, err) = _scheduling.Decode(buf); schedSeq = decodedSeq; if (err != null) { Warn("Error decoding message scheduling state: {0}", err); TryDeleteFile(fn); schedSeq = 0; } } if (schedSeq < _state.FirstSeq) schedSeq = _state.FirstSeq; try { if (_state.Msgs > 0 && schedSeq <= _state.LastSeq) { Warn("Message scheduling state is outdated; attempting to recover using linear scan (seq {0} to {1})", schedSeq, _state.LastSeq); foreach (var mb in _blks) { mb.Mu.EnterReadLock(); try { if (mb.Schedules == 0 || mb.Last.Seq < schedSeq) continue; var start = Math.Max(schedSeq, mb.First.Seq); var end = mb.Last.Seq; for (var seq = start; seq <= end; seq++) { StoreMsg? sm = null; try { sm = LoadMsg(seq, null); } catch (Exception ex) { Warn("Error loading msg seq {0} for recovering message schedules: {1}", seq, ex); } if (sm?.Hdr is not { Length: > 0 }) { if (seq == ulong.MaxValue) break; continue; } var (schedule, ok) = JetStreamHeaderHelpers.NextMessageSchedule(sm.Hdr, sm.Ts); if (ok && schedule != default) _scheduling.Init(seq, sm.Subject, schedule.Ticks * 100L); if (seq == ulong.MaxValue) break; } } finally { mb.Mu.ExitReadLock(); } } } } finally { _scheduling.ResetTimer(); } return null; } finally { _mu.ExitWriteLock(); } } internal void CleanupOldMeta() { string mdir; _mu.EnterReadLock(); try { mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); } finally { _mu.ExitReadLock(); } if (!Directory.Exists(mdir)) return; foreach (var file in Directory.EnumerateFiles(mdir, "*", SearchOption.TopDirectoryOnly)) { var name = Path.GetFileName(file); if (!name.EndsWith(".idx", StringComparison.Ordinal) && !name.EndsWith(".fss", StringComparison.Ordinal)) { continue; } TryDeleteFile(file); } } internal Exception? RecoverMsgs() { _mu.EnterWriteLock(); try { RecoverPartialPurge(); var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); if (!Directory.Exists(mdir)) return new InvalidOperationException("message directory is not readable"); var indices = new List(); foreach (var file in Directory.EnumerateFiles(mdir, "*" + FileStoreDefaults.BlkSuffix, SearchOption.TopDirectoryOnly)) { if (!TryParseBlockIndex(Path.GetFileName(file), out var index) || index > uint.MaxValue) continue; indices.Add((uint)index); } indices.Sort(); _blks.Clear(); _bim.Clear(); _lmb = null; _state = new StreamState(); foreach (var index in indices) { MessageBlock mb; try { mb = RecoverMsgBlock(index); } catch (Exception ex) { return ex; } mb.Mu.EnterWriteLock(); try { if (mb.First.Seq == 0) { RemoveRecoveredBlockLocked(mb); continue; } if (_state.FirstSeq == 0 || (mb.First.Seq < _state.FirstSeq && mb.First.Ts != 0)) { _state.FirstSeq = mb.First.Seq; _state.FirstTime = mb.First.Ts == 0 ? default : FromUnixNanosUtc(mb.First.Ts); } if (mb.Last.Seq >= _state.LastSeq) { _state.LastSeq = mb.Last.Seq; _state.LastTime = mb.Last.Ts == 0 ? default : FromUnixNanosUtc(mb.Last.Ts); } _state.Msgs += mb.Msgs; _state.Bytes += mb.Bytes; if (mb.Msgs == 0) { mb.First.Seq = _state.LastSeq + 1; mb.First.Ts = 0; mb.Last.Seq = _state.LastSeq; mb.Last.Ts = TimestampNormalized(_state.LastTime); } } finally { if (mb.Mu.IsWriteLockHeld) mb.Mu.ExitWriteLock(); } } if (_blks.Count == 0) { AddMsgBlock(InitMsgBlock(1)); } else { _lmb = _blks[^1]; } var valid = new HashSet(_blks.Select(b => b.Index)); foreach (var file in Directory.EnumerateFiles(mdir, "*" + Path.GetExtension(string.Format(FileStoreDefaults.KeyScan, 0)), SearchOption.TopDirectoryOnly)) { var name = Path.GetFileName(file); if (!TryParseIndexedFile(name, FileStoreDefaults.KeyScan, out var index) || !valid.Contains(index)) TryDeleteFile(file); } return null; } catch (Exception ex) { return ex; } finally { _mu.ExitWriteLock(); } } internal Exception? ExpireMsgsOnRecover() { _mu.EnterWriteLock(); try { if (_state.Msgs == 0) return null; if (_cfg.Config.SubjectDeleteMarkerTTL > TimeSpan.Zero) return null; if (_cfg.Config.MaxAge <= TimeSpan.Zero) return null; var minAge = (DateTime.UtcNow - DateTime.UnixEpoch).Ticks * 100L - (_cfg.Config.MaxAge.Ticks * 100L); ulong purged = 0; ulong bytes = 0; long nts = 0; var removed = new List(); MsgId last = default; foreach (var mb in _blks) { mb.Mu.EnterWriteLock(); try { if (minAge < mb.First.Ts) { nts = mb.First.Ts; break; } if (mb.Last.Ts > minAge) { nts = mb.First.Ts; break; } purged += mb.Msgs; bytes += mb.Bytes; if (ReferenceEquals(mb, _lmb)) last = mb.Last; RemoveRecoveredBlockLocked(mb); removed.Add(mb); } finally { if (mb.Mu.IsWriteLockHeld) mb.Mu.ExitWriteLock(); } } if (removed.Count > 0) { _blks = [.. _blks.Where(mb => !removed.Contains(mb))]; if (_bim.Count > 0) { foreach (var mb in removed) _bim.Remove(mb.Index); } _lmb = _blks.Count > 0 ? _blks[^1] : null; if (_lmb == null && last.Seq != 0) { var tomb = InitMsgBlock(1); tomb.First = new MsgId { Seq = last.Seq + 1, Ts = 0 }; tomb.Last = new MsgId { Seq = last.Seq, Ts = last.Ts }; tomb.Msgs = 0; tomb.Bytes = 0; AddMsgBlock(tomb); _psim?.Reset(); _tsl = 0; } } RebuildStateLocked(null); if (nts > 0) ResetAgeChk(nts - minAge); if (purged > 0 || bytes > 0) _dirty++; return null; } catch (Exception ex) { return ex; } finally { _mu.ExitWriteLock(); } } private static void EnsureStoreDirectoryWritable(string storeDir) { if (string.IsNullOrWhiteSpace(storeDir)) throw new ArgumentException("store directory required", nameof(storeDir)); Directory.CreateDirectory(storeDir); var dirInfo = new DirectoryInfo(storeDir); if (!dirInfo.Exists) throw new InvalidOperationException("storage directory is not accessible"); var probe = Path.Combine(storeDir, $"_test_{Guid.NewGuid():N}.tmp"); using (File.Create(probe)) { } File.Delete(probe); } private void WriteFileWithOptionalSync(string path, byte[] payload) { Directory.CreateDirectory(Path.GetDirectoryName(path)!); using var stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); stream.Write(payload, 0, payload.Length); stream.Flush(_fcfg.SyncAlways); } private bool TryReadBlockIndexInfo(MessageBlock mb, byte[] lchk) { var idxFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, string.Format(FileStoreDefaults.IndexScan, mb.Index)); if (!File.Exists(idxFile)) return false; try { var data = JsonSerializer.Deserialize(File.ReadAllBytes(idxFile)); if (data == null || data.LastChecksum == null) return false; if (!lchk.AsSpan().SequenceEqual(data.LastChecksum)) return false; mb.Msgs = data.Msgs; mb.Bytes = data.Bytes; mb.RBytes = data.RawBytes; mb.NoTrack = data.NoTrack; mb.First = new MsgId { Seq = data.FirstSeq, Ts = data.FirstTs }; mb.Last = new MsgId { Seq = data.LastSeq, Ts = data.LastTs }; mb.Lchk = data.LastChecksum; return true; } catch { return false; } } private void AddMsgBlock(MessageBlock mb) { if (!_bim.ContainsKey(mb.Index)) _blks.Add(mb); _bim[mb.Index] = mb; _blks.Sort((a, b) => a.Index.CompareTo(b.Index)); _lmb = _blks.Count > 0 ? _blks[^1] : null; } // ----------------------------------------------------------------------- // Read/query helper methods (Batch 13) // ----------------------------------------------------------------------- // Lock should be held by caller. private (int Next, Exception? Error) CheckSkipFirstBlock(string filter, bool wc, int bi) { if (string.IsNullOrEmpty(filter) || filter == ">") return (bi + 1, null); var start = uint.MaxValue; uint stop = 0; if (_psim is { } psim) { if (wc) { psim.Match(Encoding.UTF8.GetBytes(filter), (_, psi) => { if (psi.Fblk < start) start = psi.Fblk; if (psi.Lblk > stop) stop = psi.Lblk; return true; }); } else { var (psi, ok) = psim.Find(Encoding.UTF8.GetBytes(filter)); if (ok && psi != null) { start = psi.Fblk; stop = psi.Lblk; } } } if (start == uint.MaxValue) return (-1, StoreErrors.ErrStoreEOF); return SelectSkipFirstBlock(bi, start, stop); } // Lock should be held by caller. private (int Next, Exception? Error) CheckSkipFirstBlockMulti(SimpleSublist? sl, int bi) { if (_psim == null || sl == null) return (-1, StoreErrors.ErrStoreEOF); var start = uint.MaxValue; uint stop = 0; _psim.IterFast((subj, psi) => { var matched = false; sl.Match(Encoding.UTF8.GetString(subj), _ => matched = true); if (matched) { if (psi.Fblk < start) start = psi.Fblk; if (psi.Lblk > stop) stop = psi.Lblk; } return true; }); if (start == uint.MaxValue) return (-1, StoreErrors.ErrStoreEOF); return SelectSkipFirstBlock(bi, start, stop); } // Lock should be held by caller. private (int Next, Exception? Error) SelectSkipFirstBlock(int bi, uint start, uint stop) { if (bi < 0 || bi >= _blks.Count) return (-1, StoreErrors.ErrStoreEOF); var mbi = _blks[bi].Index; if (stop <= mbi) return (-1, StoreErrors.ErrStoreEOF); if (start > mbi && _bim.TryGetValue(start, out var mb) && mb != null) { var ni = -1; for (var i = 0; i < _blks.Count; i++) { if (mb.Last.Seq <= _blks[i].Last.Seq) { ni = i; break; } } return (ni, null); } return (bi + 1, null); } // Lock should be held by caller. private void NumFilteredPending(string filter, SimpleState ss) => NumFilteredPendingWithLast(filter, includeLast: true, ss); // Lock should be held by caller. private void NumFilteredPendingNoLast(string filter, SimpleState ss) => NumFilteredPendingWithLast(filter, includeLast: false, ss); // Lock should be held by caller. private void NumFilteredPendingWithLast(string filter, bool includeLast, SimpleState ss) { ArgumentNullException.ThrowIfNull(ss); var isAll = string.IsNullOrEmpty(filter) || filter == ">"; if (isAll) { ss.First = _state.FirstSeq; ss.Last = _state.LastSeq; ss.Msgs = _state.Msgs; return; } ss.First = 0; ss.Last = 0; ss.Msgs = 0; if (_psim == null) return; var wc = SubscriptionIndex.SubjectHasWildcard(filter); var filterBytes = Encoding.UTF8.GetBytes(filter); var start = uint.MaxValue; uint stop = 0; if (wc) { _psim.Match(filterBytes, (_, psi) => { ss.Msgs += psi.Total; if (psi.Fblk < start) start = psi.Fblk; if (psi.Lblk > stop) stop = psi.Lblk; return true; }); } else { var (psi, ok) = _psim.Find(filterBytes); if (ok && psi != null) { ss.Msgs += psi.Total; start = psi.Fblk; stop = psi.Lblk; } } if (stop == 0 || start == uint.MaxValue) return; if (_bim.TryGetValue(start, out var mb) && mb != null) { var (_, first, _) = FilteredPendingInBlock(mb, filter, wc); ss.First = first; } if (ss.First == 0) { uint? correctedFirstBlock = null; for (ulong i = (ulong)start + 1; i <= stop; i++) { if (!_bim.TryGetValue((uint)i, out var candidate) || candidate == null) continue; var (_, first, _) = FilteredPendingInBlock(candidate, filter, wc); if (first > 0) { ss.First = first; correctedFirstBlock = (uint)i; break; } } if (correctedFirstBlock is { } corrected) SchedulePsiFirstBlockCorrection(filter, wc, corrected); } if (includeLast && _bim.TryGetValue(stop, out var lastMb) && lastMb != null) { var (_, _, last) = FilteredPendingInBlock(lastMb, filter, wc); ss.Last = last; } } // Caller should hold a read lock on fs + message block lock if concurrent mutations are possible. private static (ulong Total, ulong First, ulong Last) FilteredPendingInBlock(MessageBlock mb, string filter, bool wc) { if (mb.Fss == null) return (0, 0, 0); var filterBytes = Encoding.UTF8.GetBytes(filter); ulong total = 0; ulong first = 0; ulong last = 0; if (wc) { mb.Fss.Match(filterBytes, (_, ss) => { total += ss.Msgs; if (first == 0 || (ss.First > 0 && ss.First < first)) first = ss.First; if (ss.Last > last) last = ss.Last; return true; }); } else { var (ss, ok) = mb.Fss.Find(filterBytes); if (ok && ss != null) { total = ss.Msgs; first = ss.First; last = ss.Last; } } return (total, first, last); } private void SchedulePsiFirstBlockCorrection(string filter, bool wc, uint firstBlock) { if (_psim == null) return; var filterBytes = Encoding.UTF8.GetBytes(filter); _ = Task.Run(() => { _mu.EnterWriteLock(); try { if (_psim == null) return; if (!wc) { var (info, ok) = _psim.Find(filterBytes); if (ok && info != null && firstBlock > info.Fblk) info.Fblk = firstBlock; return; } _psim.Match(filterBytes, (_, psi) => { if (firstBlock > psi.Fblk) psi.Fblk = firstBlock; return true; }); } finally { _mu.ExitWriteLock(); } }); } // Lock should be held by caller. private (ulong[] Seqs, Exception? Error) AllLastSeqsLocked() { if (_state.Msgs == 0 || NoTrackSubjects() || _psim == null) return (Array.Empty(), null); var numSubjects = _psim.Size(); if (numSubjects == 0) return (Array.Empty(), null); var seqs = new List(numSubjects); var seen = new HashSet(numSubjects, StringComparer.Ordinal); for (var i = _blks.Count - 1; i >= 0; i--) { if (seen.Count == numSubjects) break; var mb = _blks[i]; if (mb.Fss == null) continue; mb.Fss.IterFast((subjectBytes, ss) => { var subject = Encoding.UTF8.GetString(subjectBytes); if (!seen.Add(subject)) return true; if (ss.LastNeedsUpdate) RecalculateLastForSubject(subject, ss); seqs.Add(ss.Last); return true; }); } seqs.Sort(); return (seqs.ToArray(), null); } // Lock should be held by caller. private bool FilterIsAll(string[] filters) { ArgumentNullException.ThrowIfNull(filters); var streamSubjects = _cfg.Config.Subjects ?? []; if (filters.Length != streamSubjects.Length) return false; var sortedFilters = filters.ToArray(); var sortedSubjects = streamSubjects.ToArray(); Array.Sort(sortedFilters, StringComparer.Ordinal); Array.Sort(sortedSubjects, StringComparer.Ordinal); for (var i = 0; i < sortedFilters.Length; i++) { if (!SubscriptionIndex.SubjectIsSubsetMatch(sortedSubjects[i], sortedFilters[i])) return false; } return true; } // Lock should be held by caller. private void RecalculateLastForSubject(string subject, SimpleState ss) { var subjectBytes = Encoding.UTF8.GetBytes(subject); for (var i = _blks.Count - 1; i >= 0; i--) { var fss = _blks[i].Fss; if (fss == null) continue; var (candidate, ok) = fss.Find(subjectBytes); if (!ok || candidate == null || candidate.Last == 0) continue; ss.Last = candidate.Last; ss.LastNeedsUpdate = false; return; } ss.Last = 0; ss.LastNeedsUpdate = false; } // ----------------------------------------------------------------------- // IStreamStore — type / state // ----------------------------------------------------------------------- /// public StorageType Type() => StorageType.FileStorage; /// public StreamState State() => _memStore.State(); /// public void FastState(StreamState state) => _memStore.FastState(state); // ----------------------------------------------------------------------- // IStreamStore — callback registration // ----------------------------------------------------------------------- /// public void RegisterStorageUpdates(StorageUpdateHandler cb) => _memStore.RegisterStorageUpdates(cb); /// public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb) => _memStore.RegisterStorageRemoveMsg(cb); /// public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb) => _memStore.RegisterProcessJetStreamMsg(cb); // ----------------------------------------------------------------------- // IStreamStore — lifecycle // ----------------------------------------------------------------------- /// public void Stop() { _mu.EnterWriteLock(); try { if (_closing) return; _closing = true; } finally { _mu.ExitWriteLock(); } _ageChk?.Dispose(); _ageChk = null; _syncTmr?.Dispose(); _syncTmr = null; _closed = true; _memStore.Stop(); } /// public void Dispose() => Stop(); // ----------------------------------------------------------------------- // IStreamStore — store / load (all stubs) // ----------------------------------------------------------------------- /// public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[]? msg, long ttl) => _memStore.StoreMsg(subject, hdr, msg, ttl); /// public void StoreRawMsg(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck) => _memStore.StoreRawMsg(subject, hdr, msg, seq, ts, ttl, discardNewCheck); /// public (ulong Seq, Exception? Error) SkipMsg(ulong seq) => _memStore.SkipMsg(seq); /// public void SkipMsgs(ulong seq, ulong num) => _memStore.SkipMsgs(seq, num); /// public void FlushAllPending() => _memStore.FlushAllPending(); /// public StoreMsg? LoadMsg(ulong seq, StoreMsg? sm) => _memStore.LoadMsg(seq, sm); /// public (StoreMsg? Sm, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? smp) => _memStore.LoadNextMsg(filter, wc, start, smp); /// public (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp) => _memStore.LoadNextMsgMulti(sl, start, smp); /// public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm) => _memStore.LoadLastMsg(subject, sm); /// public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp) => _memStore.LoadPrevMsg(start, smp); /// public (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp) => _memStore.LoadPrevMsgMulti(sl, start, smp); /// public (bool Removed, Exception? Error) RemoveMsg(ulong seq) => _memStore.RemoveMsg(seq); /// public (bool Removed, Exception? Error) EraseMsg(ulong seq) => _memStore.EraseMsg(seq); /// public (ulong Purged, Exception? Error) Purge() => _memStore.Purge(); /// public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep) => _memStore.PurgeEx(subject, seq, keep); /// public (ulong Purged, Exception? Error) Compact(ulong seq) => _memStore.Compact(seq); /// public void Truncate(ulong seq) => _memStore.Truncate(seq); // ----------------------------------------------------------------------- // IStreamStore — query methods (all stubs) // ----------------------------------------------------------------------- /// public ulong GetSeqFromTime(DateTime t) => _memStore.GetSeqFromTime(t); /// public SimpleState FilteredState(ulong seq, string subject) => _memStore.FilteredState(seq, subject); /// public Dictionary SubjectsState(string filterSubject) => _memStore.SubjectsState(filterSubject); /// public Dictionary SubjectsTotals(string filterSubject) => _memStore.SubjectsTotals(filterSubject); /// public (ulong[] Seqs, Exception? Error) AllLastSeqs() => _memStore.AllLastSeqs(); /// public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed) => _memStore.MultiLastSeqs(filters, maxSeq, maxAllowed); /// public (string Subject, Exception? Error) SubjectForSeq(ulong seq) => _memStore.SubjectForSeq(seq); /// public (ulong Total, ulong ValidThrough, Exception? Error) NumPending(ulong sseq, string filter, bool lastPerSubject) => _memStore.NumPending(sseq, filter, lastPerSubject); /// public (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject) => _memStore.NumPendingMulti(sseq, sl, lastPerSubject); // ----------------------------------------------------------------------- // IStreamStore — stream state encoding (stubs) // ----------------------------------------------------------------------- /// public (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed) => _memStore.EncodedStreamState(failed); /// public void SyncDeleted(DeleteBlocks dbs) => _memStore.SyncDeleted(dbs); // ----------------------------------------------------------------------- // IStreamStore — config / admin (stubs) // ----------------------------------------------------------------------- /// public void UpdateConfig(StreamConfig cfg) { _cfg.Config = cfg.Clone(); _memStore.UpdateConfig(cfg); } /// public void Delete(bool inline) => _memStore.Delete(inline); /// public void ResetState() => _memStore.ResetState(); // ----------------------------------------------------------------------- // IStreamStore — consumer management (stubs) // ----------------------------------------------------------------------- /// public IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg) { var cfi = new FileConsumerInfo { Name = name, Created = created, Config = cfg, }; var odir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.ConsumerDir, name); Directory.CreateDirectory(odir); var cs = new ConsumerFileStore(this, cfi, name, odir); AddConsumer(cs); return cs; } /// public void AddConsumer(IConsumerStore o) { _cmu.EnterWriteLock(); try { _cfs.Add(o); _memStore.AddConsumer(o); } finally { _cmu.ExitWriteLock(); } } /// public void RemoveConsumer(IConsumerStore o) { _cmu.EnterWriteLock(); try { _cfs.Remove(o); _memStore.RemoveConsumer(o); } finally { _cmu.ExitWriteLock(); } } // ----------------------------------------------------------------------- // IStreamStore — snapshot / utilization (stubs) // ----------------------------------------------------------------------- /// public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs) { var state = _memStore.State(); var payload = JsonSerializer.SerializeToUtf8Bytes(state); var reader = new MemoryStream(payload, writable: false); return (new SnapshotResult { Reader = reader, State = state }, null); } /// public (ulong Total, ulong Reported, Exception? Error) Utilization() => _memStore.Utilization(); internal sealed class XorStreamCipher { private readonly byte[] _keySeed; public XorStreamCipher(byte[] seed, byte[] nonce) { using var sha = SHA256.Create(); var input = new byte[seed.Length + nonce.Length]; Buffer.BlockCopy(seed, 0, input, 0, seed.Length); Buffer.BlockCopy(nonce, 0, input, seed.Length, nonce.Length); _keySeed = sha.ComputeHash(input); } public void XorKeyStream(Span buffer) { for (var i = 0; i < buffer.Length; i++) buffer[i] ^= _keySeed[i % _keySeed.Length]; } } internal sealed class AeadCipher { private const int AeadNonceSize = 12; private const int AeadTagSize = 16; private readonly byte[] _key; public AeadCipher(byte[] seed) { using var sha = SHA256.Create(); _key = sha.ComputeHash(seed); } public int NonceSize => AeadNonceSize; public int Overhead => AeadTagSize; public byte[] Seal(ReadOnlySpan nonce, ReadOnlySpan plaintext) { if (nonce.Length != AeadNonceSize) throw new ArgumentException("invalid nonce size", nameof(nonce)); var ciphertext = new byte[plaintext.Length]; var tag = new byte[AeadTagSize]; using var aes = new AesGcm(_key, AeadTagSize); aes.Encrypt(nonce, plaintext, ciphertext, tag); var sealedPayload = new byte[ciphertext.Length + tag.Length]; Buffer.BlockCopy(ciphertext, 0, sealedPayload, 0, ciphertext.Length); Buffer.BlockCopy(tag, 0, sealedPayload, ciphertext.Length, tag.Length); return sealedPayload; } public byte[] Open(ReadOnlySpan nonce, ReadOnlySpan ciphertextAndTag) { if (nonce.Length != AeadNonceSize) throw new ArgumentException("invalid nonce size", nameof(nonce)); if (ciphertextAndTag.Length < AeadTagSize) throw new ArgumentException("invalid ciphertext size", nameof(ciphertextAndTag)); var ciphertextLength = ciphertextAndTag.Length - AeadTagSize; var ciphertext = ciphertextAndTag[..ciphertextLength]; var tag = ciphertextAndTag[ciphertextLength..]; var plaintext = new byte[ciphertextLength]; using var aes = new AesGcm(_key, AeadTagSize); aes.Decrypt(nonce, ciphertext, tag, plaintext); return plaintext; } } private sealed class MessageBlockIndexFile { public ulong Msgs { get; set; } public ulong Bytes { get; set; } public ulong RawBytes { get; set; } public ulong FirstSeq { get; set; } public long FirstTs { get; set; } public ulong LastSeq { get; set; } public long LastTs { get; set; } public byte[]? LastChecksum { get; set; } public bool NoTrack { get; set; } } }