feat(batch12): complete filestore recovery

This commit is contained in:
Joseph Doherty
2026-02-28 14:27:00 -05:00
parent d55bb3ef19
commit b918c654d0
3 changed files with 286 additions and 18 deletions

View File

@@ -1412,6 +1412,32 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
return ulong.TryParse(stem, out index);
}
private static bool TryParseIndexedFile(string fileName, string format, out uint index)
{
index = 0;
var suffix = Path.GetExtension(string.Format(format, 0));
if (string.IsNullOrEmpty(suffix) || !fileName.EndsWith(suffix, StringComparison.Ordinal))
return false;
var stem = fileName[..^suffix.Length];
return uint.TryParse(stem, out index);
}
private void RemoveRecoveredBlockLocked(MessageBlock mb)
{
_blks.Remove(mb);
_bim.Remove(mb.Index);
if (ReferenceEquals(_lmb, mb))
_lmb = _blks.Count > 0 ? _blks[^1] : null;
mb.CloseFDs();
TryDeleteFile(mb.Mfn);
var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir);
TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.IndexScan, mb.Index)));
TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, mb.Index)));
}
private void RecoverPartialPurge()
{
var storeDir = _fcfg.StoreDir;
@@ -1712,6 +1738,236 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
}
}
internal void CleanupOldMeta()
{
string mdir;
_mu.EnterReadLock();
try
{
mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir);
}
finally
{
_mu.ExitReadLock();
}
if (!Directory.Exists(mdir))
return;
foreach (var file in Directory.EnumerateFiles(mdir, "*", SearchOption.TopDirectoryOnly))
{
var name = Path.GetFileName(file);
if (!name.EndsWith(".idx", StringComparison.Ordinal) &&
!name.EndsWith(".fss", StringComparison.Ordinal))
{
continue;
}
TryDeleteFile(file);
}
}
internal Exception? RecoverMsgs()
{
_mu.EnterWriteLock();
try
{
RecoverPartialPurge();
var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir);
if (!Directory.Exists(mdir))
return new InvalidOperationException("message directory is not readable");
var indices = new List<uint>();
foreach (var file in Directory.EnumerateFiles(mdir, "*" + FileStoreDefaults.BlkSuffix, SearchOption.TopDirectoryOnly))
{
if (!TryParseBlockIndex(Path.GetFileName(file), out var index) || index > uint.MaxValue)
continue;
indices.Add((uint)index);
}
indices.Sort();
_blks.Clear();
_bim.Clear();
_lmb = null;
_state = new StreamState();
foreach (var index in indices)
{
MessageBlock mb;
try
{
mb = RecoverMsgBlock(index);
}
catch (Exception ex)
{
return ex;
}
mb.Mu.EnterWriteLock();
try
{
if (mb.First.Seq == 0)
{
RemoveRecoveredBlockLocked(mb);
continue;
}
if (_state.FirstSeq == 0 || (mb.First.Seq < _state.FirstSeq && mb.First.Ts != 0))
{
_state.FirstSeq = mb.First.Seq;
_state.FirstTime = mb.First.Ts == 0 ? default : FromUnixNanosUtc(mb.First.Ts);
}
if (mb.Last.Seq >= _state.LastSeq)
{
_state.LastSeq = mb.Last.Seq;
_state.LastTime = mb.Last.Ts == 0 ? default : FromUnixNanosUtc(mb.Last.Ts);
}
_state.Msgs += mb.Msgs;
_state.Bytes += mb.Bytes;
if (mb.Msgs == 0)
{
mb.First.Seq = _state.LastSeq + 1;
mb.First.Ts = 0;
mb.Last.Seq = _state.LastSeq;
mb.Last.Ts = TimestampNormalized(_state.LastTime);
}
}
finally
{
if (mb.Mu.IsWriteLockHeld)
mb.Mu.ExitWriteLock();
}
}
if (_blks.Count == 0)
{
AddMsgBlock(InitMsgBlock(1));
}
else
{
_lmb = _blks[^1];
}
var valid = new HashSet<uint>(_blks.Select(b => b.Index));
foreach (var file in Directory.EnumerateFiles(mdir, "*" + Path.GetExtension(string.Format(FileStoreDefaults.KeyScan, 0)), SearchOption.TopDirectoryOnly))
{
var name = Path.GetFileName(file);
if (!TryParseIndexedFile(name, FileStoreDefaults.KeyScan, out var index) || !valid.Contains(index))
TryDeleteFile(file);
}
return null;
}
catch (Exception ex)
{
return ex;
}
finally
{
_mu.ExitWriteLock();
}
}
internal Exception? ExpireMsgsOnRecover()
{
_mu.EnterWriteLock();
try
{
if (_state.Msgs == 0)
return null;
if (_cfg.Config.SubjectDeleteMarkerTTL > TimeSpan.Zero)
return null;
if (_cfg.Config.MaxAge <= TimeSpan.Zero)
return null;
var minAge = (DateTime.UtcNow - DateTime.UnixEpoch).Ticks * 100L - (_cfg.Config.MaxAge.Ticks * 100L);
ulong purged = 0;
ulong bytes = 0;
long nts = 0;
var removed = new List<MessageBlock>();
MsgId last = default;
foreach (var mb in _blks)
{
mb.Mu.EnterWriteLock();
try
{
if (minAge < mb.First.Ts)
{
nts = mb.First.Ts;
break;
}
if (mb.Last.Ts > minAge)
{
nts = mb.First.Ts;
break;
}
purged += mb.Msgs;
bytes += mb.Bytes;
if (ReferenceEquals(mb, _lmb))
last = mb.Last;
RemoveRecoveredBlockLocked(mb);
removed.Add(mb);
}
finally
{
if (mb.Mu.IsWriteLockHeld)
mb.Mu.ExitWriteLock();
}
}
if (removed.Count > 0)
{
_blks = [.. _blks.Where(mb => !removed.Contains(mb))];
if (_bim.Count > 0)
{
foreach (var mb in removed)
_bim.Remove(mb.Index);
}
_lmb = _blks.Count > 0 ? _blks[^1] : null;
if (_lmb == null && last.Seq != 0)
{
var tomb = InitMsgBlock(1);
tomb.First = new MsgId { Seq = last.Seq + 1, Ts = 0 };
tomb.Last = new MsgId { Seq = last.Seq, Ts = last.Ts };
tomb.Msgs = 0;
tomb.Bytes = 0;
AddMsgBlock(tomb);
_psim?.Reset();
_tsl = 0;
}
}
RebuildStateLocked(null);
if (nts > 0)
ResetAgeChk(nts - minAge);
if (purged > 0 || bytes > 0)
_dirty++;
return null;
}
catch (Exception ex)
{
return ex;
}
finally
{
_mu.ExitWriteLock();
}
}
private static void EnsureStoreDirectoryWritable(string storeDir)
{
if (string.IsNullOrWhiteSpace(storeDir))