feat(batch14): complete group2 lifecycle checks and wave2 tests

This commit is contained in:
Joseph Doherty
2026-02-28 15:27:32 -05:00
parent 4f0a7f40fc
commit 9a35e2dfc5
5 changed files with 856 additions and 10 deletions

View File

@@ -72,6 +72,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
private SubjectTree<Psi>? _psim;
private HashWheel? _ttls;
private MsgScheduling? _scheduling;
private readonly StreamDeletionMeta _sdm = new();
// Total subject-list length (sum of subject-string lengths)
private int _tsl;
@@ -127,6 +128,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
private const int FullStateHeaderLength = 2;
private const int FullStateMinimumLength = 32;
private const int FullStateChecksumLength = 8;
private const ulong EmptyRecordLength = 30;
private const ulong RecordLengthBadThreshold = 32UL * 1024UL * 1024UL;
static JetStreamFileStore()
{
@@ -1466,6 +1469,20 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
Directory.Delete(purgeDir, recursive: true);
}
private void StartAgeChk()
{
if (_ageChk != null)
return;
if (_cfg.Config.MaxAge == TimeSpan.Zero && _ttls == null)
return;
var fireIn = _cfg.Config.MaxAge > TimeSpan.Zero
? _cfg.Config.MaxAge
: TimeSpan.FromMilliseconds(250);
_ageChk = new Timer(_ => ExpireMsgs(), null, fireIn, Timeout.InfiniteTimeSpan);
}
private void ResetAgeChk(long delta)
{
if (_ageChkRun)
@@ -1516,7 +1533,7 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
if (_ageChk != null)
_ageChk.Change(fireIn, Timeout.InfiniteTimeSpan);
else
_ageChk = new Timer(_ => { }, null, fireIn, Timeout.InfiniteTimeSpan);
_ageChk = new Timer(_ => ExpireMsgs(), null, fireIn, Timeout.InfiniteTimeSpan);
}
private void CancelAgeChk()
@@ -1531,6 +1548,14 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
_mu.EnterWriteLock();
try
{
if (_scheduling == null)
return;
if (_pmsgcb == null)
{
_scheduling.ResetTimer();
return;
}
_scheduling?.ResetTimer();
}
finally
@@ -1539,6 +1564,239 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
}
}
private void ExpireMsgs()
{
StoreMsg? sm = null;
long ageDelta = 0;
long nextTtl = long.MaxValue;
var removeByTtl = new List<(ulong Seq, long Expires)>();
_mu.EnterWriteLock();
long maxAge;
long minAge;
var sdmTtl = (long)_cfg.Config.SubjectDeleteMarkerTTL.TotalSeconds;
var sdmEnabled = sdmTtl > 0;
try
{
maxAge = _cfg.Config.MaxAge.Ticks * 100L;
minAge = TimestampNormalized(DateTime.UtcNow) - maxAge;
if (sdmEnabled && (_rmcb == null || _pmsgcb == null))
{
ResetAgeChk(0);
return;
}
_ageChkRun = true;
}
finally
{
_mu.ExitWriteLock();
}
if (maxAge > 0)
{
ulong seq = 0;
while (true)
{
var (candidate, skip) = LoadNextMsg(">", wc: true, seq, null);
if (candidate == null || candidate.Ts > minAge)
break;
sm = candidate;
seq = skip == ulong.MaxValue ? ulong.MaxValue : skip + 1;
if (sm.Hdr is { Length: > 0 })
{
var (ttl, err) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr);
if (err == null && ttl < 0)
{
minAge = TimestampNormalized(DateTime.UtcNow) - maxAge;
continue;
}
}
if (sdmEnabled)
{
var (last, shouldProcess) = ShouldProcessSdm(skip, sm.Subject);
if (shouldProcess)
{
var sdm = last && !StreamDeletionMeta.IsSubjectDeleteMarker(sm.Hdr);
HandleRemovalOrSdm(skip, sm.Subject, sdm, sdmTtl);
}
}
else
{
_mu.EnterWriteLock();
try
{
_ = RemoveMsgViaLimits(sm.Seq);
}
finally
{
_mu.ExitWriteLock();
}
}
minAge = TimestampNormalized(DateTime.UtcNow) - maxAge;
if (seq == ulong.MaxValue)
break;
}
}
if (sm != null)
ageDelta = sm.Ts - minAge;
_mu.EnterWriteLock();
try
{
if (_ttls != null)
{
_ttls.ExpireTasks((seq, ts) =>
{
removeByTtl.Add((seq, ts));
return false;
});
nextTtl = _ttls.GetNextExpiration(long.MaxValue);
}
}
finally
{
_mu.ExitWriteLock();
}
if (!sdmEnabled)
{
foreach (var rm in removeByTtl)
{
_mu.EnterWriteLock();
try
{
_ = RemoveMsgInternal(rm.Seq, secure: false, viaLimits: false, needFsLock: false);
try { _ttls?.Remove(rm.Seq, rm.Expires); } catch { }
}
finally
{
_mu.ExitWriteLock();
}
}
}
else
{
removeByTtl.Sort(static (a, b) => a.Seq.CompareTo(b.Seq));
foreach (var rm in removeByTtl)
{
_mu.EnterWriteLock();
StoreMsg? msg;
(bool IsLast, bool ShouldProcess) p;
try
{
msg = _memStore.LoadMsg(rm.Seq, null);
if (msg == null)
{
try { _ttls?.Remove(rm.Seq, rm.Expires); } catch { }
continue;
}
p = ShouldProcessSdmLocked(rm.Seq, msg.Subject);
}
finally
{
_mu.ExitWriteLock();
}
if (p.ShouldProcess && msg != null)
{
var sdm = p.IsLast && !StreamDeletionMeta.IsSubjectDeleteMarker(msg.Hdr);
HandleRemovalOrSdm(rm.Seq, msg.Subject, sdm, sdmTtl);
}
}
}
_mu.EnterWriteLock();
try
{
_ageChkRun = false;
_ageChkTime = 0;
_state = _memStore.State();
if (_state.Msgs == 0 && nextTtl == long.MaxValue)
CancelAgeChk();
else
ResetAgeChk(ageDelta);
}
finally
{
_mu.ExitWriteLock();
}
}
private (bool IsLast, bool ShouldProcess) ShouldProcessSdm(ulong seq, string subj)
{
_mu.EnterWriteLock();
try
{
return ShouldProcessSdmLocked(seq, subj);
}
finally
{
_mu.ExitWriteLock();
}
}
// Lock should be held.
private (bool IsLast, bool ShouldProcess) ShouldProcessSdmLocked(ulong seq, string subj)
{
if (_sdm.TryGetPending(seq, out var pending))
{
var elapsed = TimestampNormalized(DateTime.UtcNow) - pending.Ts;
if (elapsed < 2_000_000_000L)
return (pending.Last, false);
var last = pending.Last;
if (last)
{
var msgs = SubjectsTotalsLocked(subj)?.GetValueOrDefault(subj, 0UL) ?? 0UL;
var numPending = _sdm.GetSubjectTotal(subj);
if (msgs > numPending)
last = false;
}
_sdm.SetPending(seq, new SdmBySeq { Last = last, Ts = TimestampNormalized(DateTime.UtcNow) });
return (last, true);
}
var msgCount = SubjectsTotalsLocked(subj)?.GetValueOrDefault(subj, 0UL) ?? 0UL;
if (msgCount == 0)
return (false, true);
var pendingCount = _sdm.GetSubjectTotal(subj);
var remaining = msgCount > pendingCount ? msgCount - pendingCount : 0;
return (_sdm.TrackPending(seq, subj, remaining == 1), true);
}
private void HandleRemovalOrSdm(ulong seq, string subj, bool sdm, long sdmTtl)
{
if (!sdm)
{
_rmcb?.Invoke(seq);
return;
}
var hdr = Encoding.ASCII.GetBytes(
$"{NatsHeaderConstants.HdrLine}" +
$"{NatsHeaderConstants.JsMarkerReason}: {NatsHeaderConstants.JsMarkerReasonMaxAge}\r\n" +
$"{NatsHeaderConstants.JsMessageTtl}: {TimeSpan.FromSeconds(sdmTtl)}\r\n" +
$"{NatsHeaderConstants.JsMsgRollup}: {NatsHeaderConstants.JsMsgRollupSubject}\r\n\r\n");
_pmsgcb?.Invoke(new StoreMsg
{
Subject = subj,
Hdr = hdr,
Msg = Array.Empty<byte>(),
Seq = 0,
Ts = 0,
});
}
internal Exception? RecoverTTLState()
{
_mu.EnterWriteLock();
@@ -2603,6 +2861,348 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
return record;
}
// Lock should be held.
private void CheckAndFlushLastBlock()
{
var lmb = _lmb;
if (lmb == null)
return;
lmb.Mu.EnterWriteLock();
try
{
if (!lmb.NeedSync)
return;
if (lmb.Mfd == null)
{
lmb.Mfd = new FileStream(lmb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
}
lmb.Mfd.Flush(_fcfg.SyncAlways);
lmb.NeedSync = false;
}
catch (Exception ex)
{
lmb.Werr = ex;
}
finally
{
lmb.Mu.ExitWriteLock();
}
}
private LostStreamData? CheckMsgs()
{
_mu.EnterWriteLock();
try
{
CheckAndFlushLastBlock();
_state = _memStore.State();
RefreshPsiFromMemStoreLocked();
return _ld;
}
finally
{
_mu.ExitWriteLock();
}
}
// Lock should be held.
private (MessageBlock? Mb, Exception? Error) CheckLastBlock(ulong recordLength)
{
var lmb = _lmb;
var rbytes = lmb?.RBytes ?? 0UL;
if (lmb == null || (_fcfg.BlockSize > 0 && rbytes > 0 && rbytes + recordLength > _fcfg.BlockSize))
return NewMsgBlockForWrite();
return (lmb, null);
}
// Lock should be held.
private (ulong RecordLength, Exception? Error) WriteMsgRecord(ulong seq, long ts, string subj, byte[]? hdr, byte[]? msg)
{
hdr ??= Array.Empty<byte>();
msg ??= Array.Empty<byte>();
var recordLength = FileStoreMsgSize(subj, hdr, msg);
if (recordLength > RecordLengthBadThreshold)
return (0, StoreErrors.ErrMsgTooLarge);
var (_, blockErr) = CheckLastBlock(recordLength);
if (blockErr != null)
return (0, blockErr);
_dirty++;
var err = StoreRawMsgInternal(subj, hdr, msg, seq, ts, ttl: 0, discardNewCheck: false);
return (recordLength, err);
}
// Lock should be held.
private Exception? WriteTombstone(ulong seq, long ts)
{
var err = WriteTombstoneNoFlush(seq, ts);
if (err != null)
return err;
var lmb = _lmb;
if (lmb == null)
return null;
lmb.Mu.EnterWriteLock();
try
{
lmb.Mfd?.Flush(_fcfg.SyncAlways);
lmb.NeedSync = false;
return null;
}
catch (Exception ex)
{
lmb.Werr = ex;
return ex;
}
finally
{
lmb.Mu.ExitWriteLock();
}
}
// Lock should be held.
private Exception? WriteTombstoneNoFlush(ulong seq, long ts)
{
var (lmb, blockErr) = CheckLastBlock(EmptyRecordLength);
if (blockErr != null)
return blockErr;
if (lmb == null)
return StoreErrors.ErrStoreClosed;
lmb.Mu.EnterWriteLock();
try
{
lmb.Mfd ??= new FileStream(lmb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
lmb.Mfd.Seek(0, SeekOrigin.End);
var record = BuildMessageRecord(string.Empty, ReadOnlySpan<byte>.Empty, ReadOnlySpan<byte>.Empty, seq, ts);
lmb.Mfd.Write(record, 0, record.Length);
lmb.RBytes += (ulong)record.Length;
lmb.Lwts = ts;
lmb.NeedSync = !_fcfg.SyncAlways;
lmb.NoCompact = true;
lmb.Lchk = record.AsSpan(record.Length - FileStoreDefaults.RecordHashSize).ToArray();
return null;
}
catch (Exception ex)
{
lmb.Werr = ex;
return ex;
}
finally
{
lmb.Mu.ExitWriteLock();
}
}
private void SyncBlocks()
{
if (IsClosed())
return;
List<MessageBlock> blks;
bool firstMoved;
_mu.EnterWriteLock();
try
{
blks = [.. _blks];
firstMoved = _firstMoved;
_firstMoved = false;
}
finally
{
_mu.ExitWriteLock();
}
var markDirty = false;
foreach (var mb in blks)
{
mb.Mu.EnterWriteLock();
try
{
if (mb.Closed)
continue;
if (firstMoved && mb.NoCompact)
{
mb.NoCompact = false;
markDirty = true;
}
if (!mb.NeedSync)
continue;
var opened = false;
if (mb.Mfd == null)
{
mb.Mfd = new FileStream(mb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
opened = true;
}
try
{
mb.Mfd.Flush(true);
mb.NeedSync = false;
}
finally
{
if (opened)
{
mb.Mfd.Dispose();
mb.Mfd = null;
}
}
}
finally
{
mb.Mu.ExitWriteLock();
}
}
if (IsClosed())
return;
_mu.EnterWriteLock();
try
{
SetSyncTimerLocked();
if (markDirty)
_dirty++;
if (!_fcfg.SyncAlways)
{
var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile);
if (File.Exists(fn))
{
using var fd = new FileStream(fn, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite);
fd.Flush(true);
}
}
}
finally
{
_mu.ExitWriteLock();
}
}
// Lock should be held.
private MessageBlock? SelectMsgBlock(ulong seq)
{
var (_, mb) = SelectMsgBlockWithIndex(seq);
return mb;
}
// Lock should be held.
private (int Index, MessageBlock? Block) SelectMsgBlockWithIndex(ulong seq)
{
if (seq < _state.FirstSeq || seq > _state.LastSeq || _state.Msgs == 0)
return (-1, null);
const int linearThreshold = 32;
var nb = _blks.Count - 1;
if (nb < 0)
return (-1, null);
if (nb < linearThreshold)
{
for (var i = 0; i < _blks.Count; i++)
{
if (seq <= _blks[i].Last.Seq)
return (i, _blks[i]);
}
return (-1, null);
}
var low = 0;
var high = nb;
while (low <= high)
{
var mid = (low + high) / 2;
var mb = _blks[mid];
var first = mb.First.Seq;
var last = mb.Last.Seq;
if (seq > last)
{
low = mid + 1;
}
else if (seq < first)
{
if (mid == 0 || seq > _blks[mid - 1].Last.Seq)
return (mid, mb);
high = mid - 1;
}
else
{
return (mid, mb);
}
}
return (-1, null);
}
private MessageBlock? SelectMsgBlockForStart(DateTime minTime)
{
var target = TimestampNormalized(minTime);
_mu.EnterReadLock();
try
{
var low = 0;
var high = _blks.Count - 1;
var candidate = _blks.Count;
while (low <= high)
{
var mid = (low + high) / 2;
var mb = _blks[mid];
long lastTs;
mb.Mu.EnterReadLock();
try
{
lastTs = mb.Last.Ts;
}
finally
{
mb.Mu.ExitReadLock();
}
if (lastTs >= target)
{
candidate = mid;
high = mid - 1;
}
else
{
low = mid + 1;
}
}
return candidate < _blks.Count ? _blks[candidate] : null;
}
finally
{
_mu.ExitReadLock();
}
}
// Lock should be held.
private void SetSyncTimerLocked()
{
_syncTmr?.Dispose();
_syncTmr = null;
if (_fcfg.SyncInterval <= TimeSpan.Zero || IsClosed())
return;
_syncTmr = new Timer(_ => SyncBlocks(), null, _fcfg.SyncInterval, Timeout.InfiniteTimeSpan);
}
// -----------------------------------------------------------------------
// Read/query helper methods (Batch 13)
// -----------------------------------------------------------------------
@@ -2978,15 +3578,24 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
/// <inheritdoc/>
public void RegisterStorageUpdates(StorageUpdateHandler cb)
=> _memStore.RegisterStorageUpdates(cb);
{
_scb = cb;
_memStore.RegisterStorageUpdates(cb);
}
/// <inheritdoc/>
public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb)
=> _memStore.RegisterStorageRemoveMsg(cb);
{
_rmcb = cb;
_memStore.RegisterStorageRemoveMsg(cb);
}
/// <inheritdoc/>
public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb)
=> _memStore.RegisterProcessJetStreamMsg(cb);
{
_pmsgcb = cb;
_memStore.RegisterProcessJetStreamMsg(cb);
}
// -----------------------------------------------------------------------
// IStreamStore — lifecycle