feat(batch14): complete group3 purge compact state and wave3 tests

This commit is contained in:
Joseph Doherty
2026-02-28 15:43:05 -05:00
parent 9a35e2dfc5
commit 045faf7423
5 changed files with 894 additions and 9 deletions

View File

@@ -3203,6 +3203,446 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
_syncTmr = new Timer(_ => SyncBlocks(), null, _fcfg.SyncInterval, Timeout.InfiniteTimeSpan);
}
// -----------------------------------------------------------------------
// Lifecycle/query helper methods (Batch 14 Group 3)
// -----------------------------------------------------------------------
private int SizeForSeq(ulong seq)
{
if (seq == 0)
return 0;
var (sm, err) = MsgForSeq(seq, null);
if (err != null || sm == null)
return 0;
var size = FileStoreMsgSize(sm.Subject, sm.Hdr, sm.Msg);
return size >= int.MaxValue ? int.MaxValue : (int)size;
}
private (StoreMsg? Sm, Exception? Error) MsgForSeq(ulong seq, StoreMsg? sm)
=> MsgForSeqLocked(seq, sm, needFsLock: true);
private (StoreMsg? Sm, Exception? Error) MsgForSeqLocked(ulong seq, StoreMsg? sm, bool needFsLock)
{
if (IsClosed())
return (null, StoreErrors.ErrStoreClosed);
if (needFsLock)
_mu.EnterReadLock();
try
{
var state = _memStore.State();
if (seq == 0)
seq = state.FirstSeq;
if (state.Msgs == 0 || seq == 0)
return (null, StoreErrors.ErrStoreEOF);
try
{
var loaded = _memStore.LoadMsg(seq, sm);
return loaded == null ? (null, StoreErrors.ErrStoreMsgNotFound) : (loaded, null);
}
catch (Exception ex) when (ReferenceEquals(ex, StoreErrors.ErrStoreEOF))
{
if (seq <= state.LastSeq)
return (null, StoreErrors.ErrStoreMsgNotFound);
return (null, ex);
}
catch (Exception ex)
{
return (null, ex);
}
}
finally
{
if (needFsLock)
_mu.ExitReadLock();
}
}
private (StoreMsg? Sm, Exception? Error) LoadLast(string subj, StoreMsg? sm)
{
if (IsClosed())
return (null, StoreErrors.ErrStoreClosed);
_mu.EnterReadLock();
try
{
return LoadLastLocked(subj, sm);
}
finally
{
_mu.ExitReadLock();
}
}
// Lock should be held.
private (StoreMsg? Sm, Exception? Error) LoadLastLocked(string subj, StoreMsg? sm)
{
if (IsClosed())
return (null, StoreErrors.ErrStoreClosed);
var state = _memStore.State();
if (state.Msgs == 0)
return (null, StoreErrors.ErrStoreMsgNotFound);
if (string.IsNullOrEmpty(subj) || subj == ">")
return MsgForSeqLocked(state.LastSeq, sm, needFsLock: false);
var wc = SubscriptionIndex.SubjectHasWildcard(subj);
if (!wc)
{
try
{
var loaded = _memStore.LoadLastMsg(subj, sm);
return loaded == null ? (null, StoreErrors.ErrStoreMsgNotFound) : (loaded, null);
}
catch (Exception ex)
{
return (null, ex);
}
}
StoreMsg? last = null;
var lastSeq = 0UL;
foreach (var candidate in _memStore.SubjectsTotals(">").Keys)
{
if (!SubscriptionIndex.SubjectIsSubsetMatch(subj, candidate))
continue;
try
{
var cmsg = _memStore.LoadLastMsg(candidate, null);
if (cmsg == null || cmsg.Seq < lastSeq)
continue;
if (last == null)
last = sm ?? new StoreMsg();
last.CopyFrom(cmsg);
lastSeq = cmsg.Seq;
}
catch
{
// Ignore candidate misses and continue scanning.
}
}
return last == null ? (null, StoreErrors.ErrStoreMsgNotFound) : (last, null);
}
private int NumSubjects()
{
_mu.EnterReadLock();
try
{
if (_psim != null)
return _psim.Size();
return _memStore.SubjectsTotals(">").Count;
}
finally
{
_mu.ExitReadLock();
}
}
private int NumConsumers()
{
_cmu.EnterReadLock();
try
{
return _cfs.Count;
}
finally
{
_cmu.ExitReadLock();
}
}
private ulong CacheLoads()
{
ulong total = 0;
_mu.EnterReadLock();
try
{
foreach (var mb in _blks)
{
mb.Mu.EnterReadLock();
try
{
total += mb.Cloads;
}
finally
{
mb.Mu.ExitReadLock();
}
}
}
finally
{
_mu.ExitReadLock();
}
return total;
}
private ulong CacheSize()
{
ulong total = 0;
_mu.EnterReadLock();
try
{
foreach (var mb in _blks)
{
mb.Mu.EnterReadLock();
try
{
if (mb.CacheData?.Buf is { Length: > 0 } buf)
total += (ulong)buf.Length;
}
finally
{
mb.Mu.ExitReadLock();
}
}
}
finally
{
_mu.ExitReadLock();
}
return total;
}
private int DmapEntries()
{
var total = 0;
_mu.EnterReadLock();
try
{
foreach (var mb in _blks)
total += mb.Dmap.Size;
}
finally
{
_mu.ExitReadLock();
}
return total;
}
private (ulong Purged, Exception? Error) PurgeInternal(ulong fseq)
{
if (IsClosed())
return (0, StoreErrors.ErrStoreClosed);
_mu.EnterWriteLock();
try
{
var (purged, err) = fseq > 0 ? _memStore.Compact(fseq) : _memStore.Purge();
if (err != null)
return (purged, err);
_state = _memStore.State();
foreach (var mb in _blks)
mb.CloseFDs();
_blks.Clear();
_bim.Clear();
_lmb = null;
_psim ??= new SubjectTree<Psi>();
_psim.Reset();
_tsl = 0;
_sdm.Empty();
var (lmb, blockErr) = NewMsgBlockForWrite();
if (blockErr != null)
return (purged, blockErr);
_lmb = lmb;
_dirty++;
if (_state.LastSeq > 0)
_ = WriteTombstone(_state.LastSeq, TimestampNormalized(_state.LastTime));
return (purged, null);
}
finally
{
_mu.ExitWriteLock();
}
}
private (ulong Purged, Exception? Error) CompactInternal(ulong seq)
{
if (seq == 0)
return PurgeInternal(0);
if (IsClosed())
return (0, StoreErrors.ErrStoreClosed);
_mu.EnterWriteLock();
try
{
var (purged, err) = _memStore.Compact(seq);
if (err != null)
return (purged, err);
_state = _memStore.State();
if (_state.Msgs == 0)
{
foreach (var mb in _blks.ToArray())
ForceRemoveMsgBlock(mb);
_blks.Clear();
_bim.Clear();
_lmb = null;
var (newMb, blockErr) = NewMsgBlockForWrite();
if (blockErr != null)
return (purged, blockErr);
_lmb = newMb;
}
else
{
foreach (var mb in _blks.ToArray())
{
if (mb.Last.Seq < _state.FirstSeq)
ForceRemoveMsgBlock(mb);
}
_lmb = _blks.Count > 0 ? _blks[^1] : null;
if (_lmb == null)
{
var (newMb, blockErr) = NewMsgBlockForWrite();
if (blockErr != null)
return (purged, blockErr);
_lmb = newMb;
}
}
RefreshPsiFromMemStoreLocked();
_dirty++;
return (purged, null);
}
finally
{
_mu.ExitWriteLock();
}
}
private Exception? Reset()
{
if (IsClosed())
return StoreErrors.ErrStoreClosed;
_mu.EnterWriteLock();
try
{
var err = _memStore.Reset();
if (err != null)
return err;
foreach (var mb in _blks.ToArray())
ForceRemoveMsgBlock(mb);
_blks.Clear();
_bim.Clear();
_lmb = null;
_state = _memStore.State();
_psim ??= new SubjectTree<Psi>();
_psim.Reset();
_tsl = 0;
_sdm.Empty();
_dirty++;
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
private ulong LastSeq()
{
_mu.EnterReadLock();
try
{
return _memStore.State().LastSeq;
}
finally
{
_mu.ExitReadLock();
}
}
private int NumMsgBlocks()
{
_mu.EnterReadLock();
try
{
if (_blks.Count > 0)
return _blks.Count;
return _memStore.State().Msgs > 0 ? 1 : 0;
}
finally
{
_mu.ExitReadLock();
}
}
// Both locks should be held by caller.
private void RemoveMsgBlockFromList(MessageBlock mb)
{
var idx = _blks.FindIndex(existing => ReferenceEquals(existing, mb));
if (idx < 0)
return;
_blks.RemoveAt(idx);
_bim.Remove(mb.Index);
_lmb = _blks.Count > 0 ? _blks[^1] : null;
_dirty++;
}
// Both locks should be held by caller.
private void RemoveMsgBlock(MessageBlock mb)
{
if (ReferenceEquals(mb, _lmb))
{
var lseq = mb.Last.Seq;
var lts = mb.Last.Ts;
var (newMb, _) = NewMsgBlockForWrite();
_lmb = newMb;
if (lseq > 0)
_ = WriteTombstone(lseq, lts);
}
else if (mb.Last.Seq == _state.LastSeq && mb.Last.Seq > 0)
{
_ = WriteTombstone(mb.Last.Seq, mb.Last.Ts);
}
ForceRemoveMsgBlock(mb);
}
// Both locks should be held by caller.
private void ForceRemoveMsgBlock(MessageBlock mb)
{
mb.CloseFDs();
RemoveMsgBlockFromList(mb);
TryDeleteFile(mb.Mfn);
TryDeleteFile(mb.Kfn);
var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir);
TryDeleteFile(Path.Combine(mdir, string.Format(FileStoreDefaults.IndexScan, mb.Index)));
}
// -----------------------------------------------------------------------
// Read/query helper methods (Batch 13)
// -----------------------------------------------------------------------
@@ -3665,7 +4105,16 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
/// <inheritdoc/>
public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm)
=> _memStore.LoadLastMsg(subject, sm);
{
(StoreMsg? Sm, Exception? Error) result =
string.IsNullOrEmpty(subject) || subject == ">"
? MsgForSeq(LastSeq(), sm)
: LoadLast(subject, sm);
if (result.Error != null)
throw result.Error;
return result.Sm ?? throw StoreErrors.ErrStoreMsgNotFound;
}
/// <inheritdoc/>
public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp)
@@ -3685,7 +4134,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
/// <inheritdoc/>
public (ulong Purged, Exception? Error) Purge()
=> _memStore.Purge();
=> PurgeInternal(0);
/// <inheritdoc/>
public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep)
@@ -3693,7 +4142,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
/// <inheritdoc/>
public (ulong Purged, Exception? Error) Compact(ulong seq)
=> _memStore.Compact(seq);
=> CompactInternal(seq);
/// <inheritdoc/>
public void Truncate(ulong seq)