diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index b31ba3c..4c0cddf 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -18,6 +18,7 @@ using System.Security.Cryptography; using System.Text; using System.Text.Json; using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -68,6 +69,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable // Per-subject index map private SubjectTree? _psim; + private HashWheel? _ttls; + private MsgScheduling? _scheduling; // Total subject-list length (sum of subject-string lengths) private int _tsl; @@ -117,6 +120,12 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable private const int ConsumerHeaderLength = 2; private const int MaxVarIntLength = 10; private const long NanosecondsPerSecond = 1_000_000_000L; + private const byte FullStateMagic = 11; + private const byte FullStateMinVersion = 1; + private const byte FullStateVersion = 3; + private const int FullStateHeaderLength = 2; + private const int FullStateMinimumLength = 32; + private const int FullStateChecksumLength = 8; static JetStreamFileStore() { @@ -1024,6 +1033,685 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable return mb; } + internal void Warn(string format, params object?[] args) + { + if (_fcfg.Server is not NatsServer server) + return; + + server.Warnf("Filestore [{0}] " + format, BuildLogArgs(args)); + } + + internal void Debug(string format, params object?[] args) + { + if (_fcfg.Server is not NatsServer server) + return; + + server.Debugf("Filestore [{0}] " + format, BuildLogArgs(args)); + } + + internal Exception? RecoverFullState() + { + _mu.EnterWriteLock(); + try + { + RecoverPartialPurge(); + + var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + byte[] raw; + try + { + raw = File.ReadAllBytes(fn); + } + catch (FileNotFoundException ex) + { + return ex; + } + catch (DirectoryNotFoundException ex) + { + return ex; + } + catch (Exception ex) + { + Warn("Could not read stream state file: {0}", ex); + return ex; + } + + if (raw.Length < FullStateMinimumLength) + { + TryDeleteFile(fn); + Warn("Stream state too short ({0} bytes)", raw.Length); + return new InvalidDataException("corrupt state"); + } + + if (!TryValidateFullStateChecksum(raw, out var buf)) + { + TryDeleteFile(fn); + Warn("Stream state checksum did not match"); + return new InvalidDataException("corrupt state"); + } + + if (_prf != null && _aek != null) + { + var ns = _aek.NonceSize; + if (buf.Length <= ns) + { + Warn("Stream state error reading encryption key: malformed ciphertext"); + return new InvalidDataException("corrupt state"); + } + + try + { + buf = _aek.Open(buf.AsSpan(0, ns), buf.AsSpan(ns)); + } + catch (Exception ex) + { + Warn("Stream state error reading encryption key: {0}", ex); + return ex; + } + } + + if (buf.Length < FullStateHeaderLength) + { + TryDeleteFile(fn); + Warn("Stream state missing header"); + return new InvalidDataException("corrupt state"); + } + + var version = buf[1]; + if (buf[0] != FullStateMagic || version < FullStateMinVersion || version > FullStateVersion) + { + TryDeleteFile(fn); + Warn("Stream state magic and version mismatch"); + return new InvalidDataException("corrupt state"); + } + + var bi = FullStateHeaderLength; + if (!TryReadUVarInt(buf, ref bi, out var msgs) || + !TryReadUVarInt(buf, ref bi, out var bytes) || + !TryReadUVarInt(buf, ref bi, out var firstSeq) || + !TryReadVarInt(buf, ref bi, out var baseTime) || + !TryReadUVarInt(buf, ref bi, out var lastSeq) || + !TryReadVarInt(buf, ref bi, out var lastTs)) + { + TryDeleteFile(fn); + Warn("Stream state could not decode stream summary"); + return new InvalidDataException("corrupt state"); + } + + var recoveredState = new StreamState + { + Msgs = msgs, + Bytes = bytes, + FirstSeq = firstSeq, + LastSeq = lastSeq, + FirstTime = baseTime == 0 ? default : FromUnixNanosUtc(baseTime), + LastTime = lastTs == 0 ? default : FromUnixNanosUtc(lastTs), + }; + + _psim ??= new SubjectTree(); + _psim.Reset(); + _tsl = 0; + + if (!TryReadUVarInt(buf, ref bi, out var numSubjects)) + return CorruptStateWithDelete(fn, "Stream state missing subject metadata"); + + for (var i = 0UL; i < numSubjects; i++) + { + if (!TryReadUVarInt(buf, ref bi, out var subjectLength)) + return CorruptStateWithDelete(fn, "Stream state subject length decode failed"); + + if (subjectLength == 0) + continue; + + if (bi < 0 || bi + (int)subjectLength > buf.Length) + return CorruptStateWithDelete(fn, $"Stream state bad subject len ({subjectLength})"); + + var subject = Encoding.Latin1.GetString(buf, bi, (int)subjectLength); + if (!SubscriptionIndex.IsValidLiteralSubject(subject)) + return CorruptStateWithDelete(fn, "Stream state corrupt subject detected"); + + bi += (int)subjectLength; + if (!TryReadUVarInt(buf, ref bi, out var total) || !TryReadUVarInt(buf, ref bi, out var fblk)) + return CorruptStateWithDelete(fn, "Stream state could not decode subject index"); + + ulong lblk = fblk; + if (total > 1) + { + if (!TryReadUVarInt(buf, ref bi, out lblk)) + return CorruptStateWithDelete(fn, "Stream state could not decode subject last block"); + } + + _psim.Insert(Encoding.Latin1.GetBytes(subject), new Psi + { + Total = total, + Fblk = (uint)fblk, + Lblk = (uint)lblk, + }); + _tsl += (int)subjectLength; + } + + if (!TryReadUVarInt(buf, ref bi, out var numBlocks)) + return CorruptStateWithDelete(fn, "Stream state could not decode block count"); + + var parsedBlocks = new List((int)numBlocks); + var parsedMap = new Dictionary((int)numBlocks); + var mstate = new StreamState(); + var lastBlockIndex = numBlocks > 0 ? numBlocks - 1 : 0; + + for (var i = 0UL; i < numBlocks; i++) + { + if (!TryReadUVarInt(buf, ref bi, out var idx) || + !TryReadUVarInt(buf, ref bi, out var nbytes) || + !TryReadUVarInt(buf, ref bi, out var fseq) || + !TryReadVarInt(buf, ref bi, out var fts) || + !TryReadUVarInt(buf, ref bi, out var lseq) || + !TryReadVarInt(buf, ref bi, out var lts) || + !TryReadUVarInt(buf, ref bi, out var numDeleted)) + { + return CorruptStateWithDelete(fn, "Stream state block decode failed"); + } + + ulong ttls = 0; + if (version >= 2) + { + if (!TryReadUVarInt(buf, ref bi, out ttls)) + return CorruptStateWithDelete(fn, "Stream state TTL metadata decode failed"); + } + + ulong schedules = 0; + if (version >= 3) + { + if (!TryReadUVarInt(buf, ref bi, out schedules)) + return CorruptStateWithDelete(fn, "Stream state schedule metadata decode failed"); + } + + var mb = InitMsgBlock((uint)idx); + mb.First = new MsgId { Seq = fseq, Ts = fts + baseTime }; + mb.Last = new MsgId { Seq = lseq, Ts = lts + baseTime }; + mb.Bytes = nbytes; + mb.Msgs = lseq >= fseq ? (lseq - fseq + 1) : 0; + mb.Ttls = ttls; + mb.Schedules = schedules; + mb.Closed = true; + + if (numDeleted > 0) + { + try + { + var (dmap, consumed) = SequenceSet.Decode(buf.AsSpan(bi)); + if (consumed <= 0) + return CorruptStateWithDelete(fn, "Stream state error decoding deleted map"); + + mb.Dmap = dmap; + mb.Msgs = mb.Msgs > numDeleted ? (mb.Msgs - numDeleted) : 0; + bi += consumed; + } + catch (Exception ex) + { + Warn("Stream state error decoding deleted map: {0}", ex); + return CorruptStateWithDelete(fn, "Stream state error decoding deleted map"); + } + } + + if (mb.Msgs > 0 || i == lastBlockIndex) + { + parsedBlocks.Add(mb); + parsedMap[mb.Index] = mb; + UpdateTrackingState(mstate, mb); + } + else + { + _dirty++; + } + } + + if (!TryReadUVarInt(buf, ref bi, out var blkIndex)) + return CorruptStateWithDelete(fn, "Stream state has no block index"); + + if (bi < 0 || bi + FullStateChecksumLength > buf.Length) + return CorruptStateWithDelete(fn, "Stream state has no checksum present"); + + var lchk = buf.AsSpan(bi, FullStateChecksumLength).ToArray(); + + _state = recoveredState; + _blks = parsedBlocks; + _bim = parsedMap; + _lmb = _blks.Count > 0 ? _blks[^1] : null; + + if (_lmb == null || _lmb.Index != (uint)blkIndex) + return CorruptStateWithDelete(fn, "Stream state block does not exist or index mismatch"); + + if (!File.Exists(_lmb.Mfn)) + { + Warn("Stream state detected prior state, could not locate msg block {0}", blkIndex); + return new InvalidOperationException("prior stream state detected"); + } + + if (!LastChecksumMatches(_lmb, lchk)) + { + Warn("Stream state outdated, last block has additional entries, will rebuild"); + return new InvalidOperationException("prior stream state detected"); + } + + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + if (Directory.Exists(mdir)) + { + foreach (var path in Directory.EnumerateFiles(mdir, "*" + FileStoreDefaults.BlkSuffix, SearchOption.TopDirectoryOnly)) + { + if (!TryParseBlockIndex(Path.GetFileName(path), out var index)) + continue; + + if (index > blkIndex) + { + Warn("Stream state outdated, found extra blocks, will rebuild"); + return new InvalidOperationException("prior stream state detected"); + } + + if (index <= uint.MaxValue && _bim.TryGetValue((uint)index, out var mb)) + mb.Closed = false; + } + } + + var rebuildRequired = false; + foreach (var mb in _blks) + { + if (!mb.Closed) + continue; + + rebuildRequired = true; + Warn("Stream state detected prior state, could not locate msg block {0}", mb.Index); + } + + if (rebuildRequired) + return new InvalidOperationException("prior stream state detected"); + + if (!TrackingStatesEqual(_state, mstate)) + return CorruptStateWithDelete(fn, "Stream state encountered internal inconsistency on recover"); + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private object?[] BuildLogArgs(object?[] args) + { + if (args.Length == 0) + return [_cfg.Config.Name]; + + var formatted = new object?[args.Length + 1]; + formatted[0] = _cfg.Config.Name; + Array.Copy(args, 0, formatted, 1, args.Length); + return formatted; + } + + private Exception CorruptStateWithDelete(string fileName, string message) + { + TryDeleteFile(fileName); + Warn("{0}", message); + return new InvalidDataException("corrupt state"); + } + + private static void TryDeleteFile(string fileName) + { + try + { + if (File.Exists(fileName)) + File.Delete(fileName); + } + catch + { + // Best effort to drop unusable snapshots. + } + } + + private bool TryValidateFullStateChecksum(byte[] raw, out byte[] payload) + { + payload = Array.Empty(); + if (raw.Length <= FullStateChecksumLength) + return false; + + var contentLength = raw.Length - FullStateChecksumLength; + payload = raw[..contentLength]; + var checksum = raw.AsSpan(contentLength, FullStateChecksumLength); + + var key = SHA256.HashData(Encoding.UTF8.GetBytes(_cfg.Config.Name)); + using var hmac = new HMACSHA256(key); + var digest = hmac.ComputeHash(payload); + return checksum.SequenceEqual(digest.AsSpan(0, FullStateChecksumLength)); + } + + private bool LastChecksumMatches(MessageBlock mb, byte[] expected) + { + try + { + using var fs = new FileStream(mb.Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + if (fs.Length < FileStoreDefaults.RecordHashSize) + return expected.AsSpan().SequenceEqual(new byte[FileStoreDefaults.RecordHashSize]); + + var lchk = new byte[FileStoreDefaults.RecordHashSize]; + fs.Seek(-FileStoreDefaults.RecordHashSize, SeekOrigin.End); + fs.ReadExactly(lchk, 0, lchk.Length); + return expected.AsSpan().SequenceEqual(lchk); + } + catch + { + return false; + } + } + + private static bool TryParseBlockIndex(string fileName, out ulong index) + { + index = 0; + if (!fileName.EndsWith(FileStoreDefaults.BlkSuffix, StringComparison.Ordinal)) + return false; + + var stem = Path.GetFileNameWithoutExtension(fileName); + return ulong.TryParse(stem, out index); + } + + private void RecoverPartialPurge() + { + var storeDir = _fcfg.StoreDir; + var msgDir = Path.Combine(storeDir, FileStoreDefaults.MsgDir); + var purgeDir = Path.Combine(storeDir, FileStoreDefaults.PurgeDir); + var newMsgDir = Path.Combine(storeDir, FileStoreDefaults.NewMsgDir); + + if (!Directory.Exists(msgDir)) + { + if (Directory.Exists(newMsgDir)) + { + Directory.Move(newMsgDir, msgDir); + } + else if (Directory.Exists(purgeDir)) + { + Directory.Move(purgeDir, msgDir); + } + + return; + } + + if (Directory.Exists(newMsgDir)) + Directory.Delete(newMsgDir, recursive: true); + if (Directory.Exists(purgeDir)) + Directory.Delete(purgeDir, recursive: true); + } + + private void ResetAgeChk(long delta) + { + if (_ageChkRun) + return; + + long next = long.MaxValue; + if (_ttls != null) + next = _ttls.GetNextExpiration(next); + + if (_cfg.Config.MaxAge <= TimeSpan.Zero && next == long.MaxValue) + { + CancelAgeChk(); + return; + } + + var fireIn = _cfg.Config.MaxAge; + if (delta == 0 && _state.Msgs > 0) + { + var until = TimeSpan.FromSeconds(2); + if (fireIn == TimeSpan.Zero || until < fireIn) + fireIn = until; + } + + if (next < long.MaxValue) + { + var nextTicks = DateTime.UnixEpoch.Ticks + next / 100L; + var nextUtc = new DateTime(Math.Max(nextTicks, DateTime.UnixEpoch.Ticks), DateTimeKind.Utc); + var until = nextUtc - DateTime.UtcNow; + if (fireIn == TimeSpan.Zero || until < fireIn) + fireIn = until; + } + + if (delta > 0) + { + var deltaDur = TimeSpan.FromTicks(delta / 100L); + if (fireIn == TimeSpan.Zero || deltaDur < fireIn) + fireIn = deltaDur; + } + + if (fireIn < TimeSpan.FromMilliseconds(250)) + fireIn = TimeSpan.FromMilliseconds(250); + + var expires = DateTime.UtcNow.Ticks + fireIn.Ticks; + if (_ageChkTime > 0 && expires > _ageChkTime) + return; + + _ageChkTime = expires; + if (_ageChk != null) + _ageChk.Change(fireIn, Timeout.InfiniteTimeSpan); + else + _ageChk = new Timer(_ => { }, null, fireIn, Timeout.InfiniteTimeSpan); + } + + private void CancelAgeChk() + { + _ageChk?.Dispose(); + _ageChk = null; + _ageChkTime = 0; + } + + private void RunMsgScheduling() + { + _mu.EnterWriteLock(); + try + { + _scheduling?.ResetTimer(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? RecoverTTLState() + { + _mu.EnterWriteLock(); + try + { + var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.TtlStreamStateFile); + byte[]? buf = null; + + try + { + if (File.Exists(fn)) + buf = File.ReadAllBytes(fn); + } + catch (Exception ex) + { + return ex; + } + + _ttls = HashWheel.NewHashWheel(); + + ulong ttlSeq = 0; + if (buf != null) + { + try + { + ttlSeq = _ttls.Decode(buf); + } + catch (Exception ex) + { + Warn("Error decoding TTL state: {0}", ex); + TryDeleteFile(fn); + } + } + + if (ttlSeq < _state.FirstSeq) + ttlSeq = _state.FirstSeq; + + try + { + if (_state.Msgs > 0 && ttlSeq <= _state.LastSeq) + { + Warn("TTL state is outdated; attempting to recover using linear scan (seq {0} to {1})", ttlSeq, _state.LastSeq); + foreach (var mb in _blks) + { + mb.Mu.EnterReadLock(); + try + { + if (mb.Ttls == 0 || mb.Last.Seq < ttlSeq) + continue; + + var start = Math.Max(ttlSeq, mb.First.Seq); + var end = mb.Last.Seq; + for (var seq = start; seq <= end; seq++) + { + StoreMsg? sm = null; + try + { + sm = LoadMsg(seq, null); + } + catch (Exception ex) + { + Warn("Error loading msg seq {0} for recovering TTL: {1}", seq, ex); + } + + if (sm?.Hdr is not { Length: > 0 }) + { + if (seq == ulong.MaxValue) + break; + continue; + } + + var (ttl, _) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr); + if (ttl > 0) + { + var expires = sm.Ts + (ttl * NanosecondsPerSecond); + _ttls.Add(seq, expires); + } + + if (seq == ulong.MaxValue) + break; + } + } + finally + { + mb.Mu.ExitReadLock(); + } + } + } + } + finally + { + ResetAgeChk(0); + } + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? RecoverMsgSchedulingState() + { + _mu.EnterWriteLock(); + try + { + var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.MsgSchedulingStreamStateFile); + byte[]? buf = null; + + try + { + if (File.Exists(fn)) + buf = File.ReadAllBytes(fn); + } + catch (Exception ex) + { + return ex; + } + + _scheduling = new MsgScheduling(RunMsgScheduling); + + ulong schedSeq = 0; + if (buf != null) + { + var (decodedSeq, err) = _scheduling.Decode(buf); + schedSeq = decodedSeq; + if (err != null) + { + Warn("Error decoding message scheduling state: {0}", err); + TryDeleteFile(fn); + schedSeq = 0; + } + } + + if (schedSeq < _state.FirstSeq) + schedSeq = _state.FirstSeq; + + try + { + if (_state.Msgs > 0 && schedSeq <= _state.LastSeq) + { + Warn("Message scheduling state is outdated; attempting to recover using linear scan (seq {0} to {1})", schedSeq, _state.LastSeq); + foreach (var mb in _blks) + { + mb.Mu.EnterReadLock(); + try + { + if (mb.Schedules == 0 || mb.Last.Seq < schedSeq) + continue; + + var start = Math.Max(schedSeq, mb.First.Seq); + var end = mb.Last.Seq; + for (var seq = start; seq <= end; seq++) + { + StoreMsg? sm = null; + try + { + sm = LoadMsg(seq, null); + } + catch (Exception ex) + { + Warn("Error loading msg seq {0} for recovering message schedules: {1}", seq, ex); + } + + if (sm?.Hdr is not { Length: > 0 }) + { + if (seq == ulong.MaxValue) + break; + continue; + } + + var (schedule, ok) = JetStreamHeaderHelpers.NextMessageSchedule(sm.Hdr, sm.Ts); + if (ok && schedule != default) + _scheduling.Init(seq, sm.Subject, schedule.Ticks * 100L); + + if (seq == ulong.MaxValue) + break; + } + } + finally + { + mb.Mu.ExitReadLock(); + } + } + } + } + finally + { + _scheduling.ResetTimer(); + } + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + private static void EnsureStoreDirectoryWritable(string storeDir) { if (string.IsNullOrWhiteSpace(storeDir)) diff --git a/porting.db b/porting.db index 448065f..49f5c77 100644 Binary files a/porting.db and b/porting.db differ