diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 4c0cddf..09752d5 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -1412,6 +1412,32 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable return ulong.TryParse(stem, out index); } + private static bool TryParseIndexedFile(string fileName, string format, out uint index) + { + index = 0; + var suffix = Path.GetExtension(string.Format(format, 0)); + if (string.IsNullOrEmpty(suffix) || !fileName.EndsWith(suffix, StringComparison.Ordinal)) + return false; + + var stem = fileName[..^suffix.Length]; + return uint.TryParse(stem, out index); + } + + private void RemoveRecoveredBlockLocked(MessageBlock mb) + { + _blks.Remove(mb); + _bim.Remove(mb.Index); + if (ReferenceEquals(_lmb, mb)) + _lmb = _blks.Count > 0 ? _blks[^1] : null; + + mb.CloseFDs(); + TryDeleteFile(mb.Mfn); + + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.IndexScan, mb.Index))); + TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, mb.Index))); + } + private void RecoverPartialPurge() { var storeDir = _fcfg.StoreDir; @@ -1712,6 +1738,236 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable } } + internal void CleanupOldMeta() + { + string mdir; + _mu.EnterReadLock(); + try + { + mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + } + finally + { + _mu.ExitReadLock(); + } + + if (!Directory.Exists(mdir)) + return; + + foreach (var file in Directory.EnumerateFiles(mdir, "*", SearchOption.TopDirectoryOnly)) + { + var name = Path.GetFileName(file); + if (!name.EndsWith(".idx", StringComparison.Ordinal) && + !name.EndsWith(".fss", StringComparison.Ordinal)) + { + continue; + } + + TryDeleteFile(file); + } + } + + internal Exception? RecoverMsgs() + { + _mu.EnterWriteLock(); + try + { + RecoverPartialPurge(); + var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); + if (!Directory.Exists(mdir)) + return new InvalidOperationException("message directory is not readable"); + + var indices = new List(); + foreach (var file in Directory.EnumerateFiles(mdir, "*" + FileStoreDefaults.BlkSuffix, SearchOption.TopDirectoryOnly)) + { + if (!TryParseBlockIndex(Path.GetFileName(file), out var index) || index > uint.MaxValue) + continue; + + indices.Add((uint)index); + } + + indices.Sort(); + + _blks.Clear(); + _bim.Clear(); + _lmb = null; + _state = new StreamState(); + + foreach (var index in indices) + { + MessageBlock mb; + try + { + mb = RecoverMsgBlock(index); + } + catch (Exception ex) + { + return ex; + } + + mb.Mu.EnterWriteLock(); + try + { + if (mb.First.Seq == 0) + { + RemoveRecoveredBlockLocked(mb); + continue; + } + + if (_state.FirstSeq == 0 || (mb.First.Seq < _state.FirstSeq && mb.First.Ts != 0)) + { + _state.FirstSeq = mb.First.Seq; + _state.FirstTime = mb.First.Ts == 0 ? default : FromUnixNanosUtc(mb.First.Ts); + } + + if (mb.Last.Seq >= _state.LastSeq) + { + _state.LastSeq = mb.Last.Seq; + _state.LastTime = mb.Last.Ts == 0 ? default : FromUnixNanosUtc(mb.Last.Ts); + } + + _state.Msgs += mb.Msgs; + _state.Bytes += mb.Bytes; + + if (mb.Msgs == 0) + { + mb.First.Seq = _state.LastSeq + 1; + mb.First.Ts = 0; + mb.Last.Seq = _state.LastSeq; + mb.Last.Ts = TimestampNormalized(_state.LastTime); + } + } + finally + { + if (mb.Mu.IsWriteLockHeld) + mb.Mu.ExitWriteLock(); + } + } + + if (_blks.Count == 0) + { + AddMsgBlock(InitMsgBlock(1)); + } + else + { + _lmb = _blks[^1]; + } + + var valid = new HashSet(_blks.Select(b => b.Index)); + foreach (var file in Directory.EnumerateFiles(mdir, "*" + Path.GetExtension(string.Format(FileStoreDefaults.KeyScan, 0)), SearchOption.TopDirectoryOnly)) + { + var name = Path.GetFileName(file); + if (!TryParseIndexedFile(name, FileStoreDefaults.KeyScan, out var index) || !valid.Contains(index)) + TryDeleteFile(file); + } + + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? ExpireMsgsOnRecover() + { + _mu.EnterWriteLock(); + try + { + if (_state.Msgs == 0) + return null; + if (_cfg.Config.SubjectDeleteMarkerTTL > TimeSpan.Zero) + return null; + if (_cfg.Config.MaxAge <= TimeSpan.Zero) + return null; + + var minAge = (DateTime.UtcNow - DateTime.UnixEpoch).Ticks * 100L - (_cfg.Config.MaxAge.Ticks * 100L); + + ulong purged = 0; + ulong bytes = 0; + long nts = 0; + var removed = new List(); + MsgId last = default; + + foreach (var mb in _blks) + { + mb.Mu.EnterWriteLock(); + try + { + if (minAge < mb.First.Ts) + { + nts = mb.First.Ts; + break; + } + + if (mb.Last.Ts > minAge) + { + nts = mb.First.Ts; + break; + } + + purged += mb.Msgs; + bytes += mb.Bytes; + if (ReferenceEquals(mb, _lmb)) + last = mb.Last; + + RemoveRecoveredBlockLocked(mb); + removed.Add(mb); + } + finally + { + if (mb.Mu.IsWriteLockHeld) + mb.Mu.ExitWriteLock(); + } + } + + if (removed.Count > 0) + { + _blks = [.. _blks.Where(mb => !removed.Contains(mb))]; + if (_bim.Count > 0) + { + foreach (var mb in removed) + _bim.Remove(mb.Index); + } + + _lmb = _blks.Count > 0 ? _blks[^1] : null; + + if (_lmb == null && last.Seq != 0) + { + var tomb = InitMsgBlock(1); + tomb.First = new MsgId { Seq = last.Seq + 1, Ts = 0 }; + tomb.Last = new MsgId { Seq = last.Seq, Ts = last.Ts }; + tomb.Msgs = 0; + tomb.Bytes = 0; + AddMsgBlock(tomb); + _psim?.Reset(); + _tsl = 0; + } + } + + RebuildStateLocked(null); + + if (nts > 0) + ResetAgeChk(nts - minAge); + if (purged > 0 || bytes > 0) + _dirty++; + + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + _mu.ExitWriteLock(); + } + } + private static void EnsureStoreDirectoryWritable(string storeDir) { if (string.IsNullOrWhiteSpace(storeDir)) diff --git a/porting.db b/porting.db index 49f5c77..266a863 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 75f620f..594e7eb 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,26 +1,38 @@ -# Current Porting Status +# NATS .NET Porting Status Report -Generated: 2026-02-28 19:08:58Z +Generated: 2026-02-28 19:26:47 UTC -=== Porting Status Summary === +## Modules (12 total) -Modules (12 total): - verified 12 +| Status | Count | +|--------|-------| +| verified | 12 | -Features (3673 total): - complete 14 - deferred 1910 - n_a 24 - stub 1 - verified 1724 +## Features (3673 total) -Unit Tests (3257 total): - deferred 1795 - n_a 241 - verified 1221 +| Status | Count | +|--------|-------| +| complete | 22 | +| deferred | 1902 | +| n_a | 24 | +| stub | 1 | +| verified | 1724 | -Library Mappings (36 total): - mapped 36 +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| deferred | 1795 | +| n_a | 241 | +| verified | 1221 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | -Overall Progress: 3236/6942 (46.6%) +## Overall Progress + +**3244/6942 items complete (46.7%)**