From b918c654d028f538e2a9ef740d5a82f94de159b0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 14:27:00 -0500 Subject: [PATCH] feat(batch12): complete filestore recovery --- .../JetStream/FileStore.cs | 256 ++++++++++++++++++ porting.db | Bin 6590464 -> 6590464 bytes reports/current.md | 48 ++-- 3 files changed, 286 insertions(+), 18 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 4c0cddf..09752d5 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -1412,6 +1412,32 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable return ulong.TryParse(stem, out index); } + private static bool TryParseIndexedFile(string fileName, string format, out uint index) + { + index = 0; + var suffix = Path.GetExtension(string.Format(format, 0)); + if (string.IsNullOrEmpty(suffix) || !fileName.EndsWith(suffix, StringComparison.Ordinal)) + return false; + + var stem = fileName[..^suffix.Length]; + return uint.TryParse(stem, out index); + } + + private void RemoveRecoveredBlockLocked(MessageBlock mb) + { + _blks.Remove(mb); + _bim.Remove(mb.Index); + if (ReferenceEquals(_lmb, mb)) + _lmb = _blks.Count > 0 ? _blks[^1] : null; + + mb.CloseFDs(); + TryDeleteFile(mb.Mfn); + + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.IndexScan, mb.Index))); + TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, mb.Index))); + } + private void RecoverPartialPurge() { var storeDir = _fcfg.StoreDir; @@ -1712,6 +1738,236 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable } } + internal void CleanupOldMeta() + { + string mdir; + _mu.EnterReadLock(); + try + { + mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + } + finally + { + _mu.ExitReadLock(); + } + + if (!Directory.Exists(mdir)) + return; + + foreach (var file in Directory.EnumerateFiles(mdir, "*", SearchOption.TopDirectoryOnly)) + { + var name = Path.GetFileName(file); + if (!name.EndsWith(".idx", StringComparison.Ordinal) && + !name.EndsWith(".fss", StringComparison.Ordinal)) + { + continue; + } + + TryDeleteFile(file); + } + } + + internal Exception? RecoverMsgs() + { + _mu.EnterWriteLock(); + try + { + RecoverPartialPurge(); + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + if (!Directory.Exists(mdir)) + return new InvalidOperationException("message directory is not readable"); + + var indices = new List(); + foreach (var file in Directory.EnumerateFiles(mdir, "*" + FileStoreDefaults.BlkSuffix, SearchOption.TopDirectoryOnly)) + { + if (!TryParseBlockIndex(Path.GetFileName(file), out var index) || index > uint.MaxValue) + continue; + + indices.Add((uint)index); + } + + indices.Sort(); + + _blks.Clear(); + _bim.Clear(); + _lmb = null; + _state = new StreamState(); + + foreach (var index in indices) + { + MessageBlock mb; + try + { + mb = RecoverMsgBlock(index); + } + catch (Exception ex) + { + return ex; + } + + mb.Mu.EnterWriteLock(); + try + { + if (mb.First.Seq == 0) + { + RemoveRecoveredBlockLocked(mb); + continue; + } + + if (_state.FirstSeq == 0 || (mb.First.Seq < _state.FirstSeq && mb.First.Ts != 0)) + { + _state.FirstSeq = mb.First.Seq; + _state.FirstTime = mb.First.Ts == 0 ? default : FromUnixNanosUtc(mb.First.Ts); + } + + if (mb.Last.Seq >= _state.LastSeq) + { + _state.LastSeq = mb.Last.Seq; + _state.LastTime = mb.Last.Ts == 0 ? default : FromUnixNanosUtc(mb.Last.Ts); + } + + _state.Msgs += mb.Msgs; + _state.Bytes += mb.Bytes; + + if (mb.Msgs == 0) + { + mb.First.Seq = _state.LastSeq + 1; + mb.First.Ts = 0; + mb.Last.Seq = _state.LastSeq; + mb.Last.Ts = TimestampNormalized(_state.LastTime); + } + } + finally + { + if (mb.Mu.IsWriteLockHeld) + mb.Mu.ExitWriteLock(); + } + } + + if (_blks.Count == 0) + { + AddMsgBlock(InitMsgBlock(1)); + } + else + { + _lmb = _blks[^1]; + } + + var valid = new HashSet(_blks.Select(b => b.Index)); + foreach (var file in Directory.EnumerateFiles(mdir, "*" + Path.GetExtension(string.Format(FileStoreDefaults.KeyScan, 0)), SearchOption.TopDirectoryOnly)) + { + var name = Path.GetFileName(file); + if (!TryParseIndexedFile(name, FileStoreDefaults.KeyScan, out var index) || !valid.Contains(index)) + TryDeleteFile(file); + } + + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? ExpireMsgsOnRecover() + { + _mu.EnterWriteLock(); + try + { + if (_state.Msgs == 0) + return null; + if (_cfg.Config.SubjectDeleteMarkerTTL > TimeSpan.Zero) + return null; + if (_cfg.Config.MaxAge <= TimeSpan.Zero) + return null; + + var minAge = (DateTime.UtcNow - DateTime.UnixEpoch).Ticks * 100L - (_cfg.Config.MaxAge.Ticks * 100L); + + ulong purged = 0; + ulong bytes = 0; + long nts = 0; + var removed = new List(); + MsgId last = default; + + foreach (var mb in _blks) + { + mb.Mu.EnterWriteLock(); + try + { + if (minAge < mb.First.Ts) + { + nts = mb.First.Ts; + break; + } + + if (mb.Last.Ts > minAge) + { + nts = mb.First.Ts; + break; + } + + purged += mb.Msgs; + bytes += mb.Bytes; + if (ReferenceEquals(mb, _lmb)) + last = mb.Last; + + RemoveRecoveredBlockLocked(mb); + removed.Add(mb); + } + finally + { + if (mb.Mu.IsWriteLockHeld) + mb.Mu.ExitWriteLock(); + } + } + + if (removed.Count > 0) + { + _blks = [.. _blks.Where(mb => !removed.Contains(mb))]; + if (_bim.Count > 0) + { + foreach (var mb in removed) + _bim.Remove(mb.Index); + } + + _lmb = _blks.Count > 0 ? _blks[^1] : null; + + if (_lmb == null && last.Seq != 0) + { + var tomb = InitMsgBlock(1); + tomb.First = new MsgId { Seq = last.Seq + 1, Ts = 0 }; + tomb.Last = new MsgId { Seq = last.Seq, Ts = last.Ts }; + tomb.Msgs = 0; + tomb.Bytes = 0; + AddMsgBlock(tomb); + _psim?.Reset(); + _tsl = 0; + } + } + + RebuildStateLocked(null); + + if (nts > 0) + ResetAgeChk(nts - minAge); + if (purged > 0 || bytes > 0) + _dirty++; + + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + _mu.ExitWriteLock(); + } + } + private static void EnsureStoreDirectoryWritable(string storeDir) { if (string.IsNullOrWhiteSpace(storeDir)) diff --git a/porting.db b/porting.db index 49f5c77a08014d841aebcd748b37cb9eed1e6aac..266a8633d74afae7227c0eb2498d4104dbbe5816 100644 GIT binary patch delta 1289 zcmZwGZ%i9?9LMqd-Ch6O{jR+OMj6nSb^~?5puNHhY_RzkQ3SdvDx+JEw)h8eD=-(- zAaw~Pm_=(seeIzf>o#4+gIDtVUf%c3 zFS*?E#{0rDT;1p--(1Rfn)yx=zv}u~4DjMLrCsqVnm8d2ivwan{F-h2p}Nf0>*NDs zepe(f@~sZR!^h9fo_{5|kQ8?Ano4&~+vY@xT>0KB3AOzE!i6be$2rOEtDH(W$(7S~ z#GPR*6-W9PS@w)v$1YKt`ZtRW$~CN6p@+(? zSZ>W1th9zVGa@Y#^HqU15VmBepK2V(ZB}z+@a?xkO(CAIW!)0JVzx-sM{cbO%GK=6 z&t#b`$+YFaed7w{iIhHr%Z-Arwwb?gV3%52n0uL+S%j8(NzclnX+%Ma=Agw?C` z2FvjD#)?K)G-X9|tZ2@!G*uiUQD%v$K9)YAjN~jKDZwqfJ4v=MJONL_Fgyh#FbZQ3fs=3wo`z>&9G->e;CYyU(=cgn%|>T*GHd!aqC=`? zQ-y-{MEYO<`5A^LcWUyv*%#X?CCxJuAPYP%b!1 zh~2|h3(+3=_JUxL6@QF2Wp)cuI0I)P1_n%-yM_4vZ(Wj<<A{Nn_!{TykDukqP54TTUgMd{3CYmrA_eSot?b*8BIW z{nMu#S?t|}PY8ztESpL!x^Lyj_Q?)3);LYNlDHcD6J#Gs|4vGO?2fTnh>dOK?jO zOpwo;N+YN)DleqzB53KZ=m^zK*#V+1k|-z&g6K^sym;V!`S`z&zh`V%>Vb)|EOx7R zJj)!c9>dCnE{9jGrnQQ#jLup4gjp@~x6CXNxl@!1bA429<5$d!cJ5&md>R=~0iWWe z?yHJIzaqGoz6?oxgm?}0K5)m=&_gAP!Ya2=662nkAyqIwL|ZHVK3%Uwk|y$$nZe~W zg)^rh2oL87tU50@u~=lTd|xqTtK3D?Kdc#K98+8)Q(U3*oIB~sux0M73&($2-IT`p zH_CDFk9`L3Gt@pq>oXiT4PD;C0%Qs39?PiKuXR&TiC#sA1Nwkz-K`%~nS)-OF;;~p zTXisC4mcqaqQC`ih=v%L3$ZW{;vgOpAQ3z;ACe#$7JwHP!Xj7!v-jWjj##Ip#nBTC2WBz z*b3FK4QgOJ)WQzf33adw>dnbk|G^w~!1Rpy%XCB5rDV3ht+AuCy`{4`7&MRG@DCbs zCj{$rYmViT{9V2v{gzHkc5$5jW(S06;RGdx{f#!kSIL+?nFhlC0&Ug;pO4->_9up> o!+vOhMhL)e*kewIn_})LlGZQinT~p`-<