feat(batch14): complete filestore write lifecycle features and tests

This commit is contained in:
Joseph Doherty
2026-02-28 16:41:31 -05:00
parent 045faf7423
commit 5367c3f34d
9 changed files with 1596 additions and 39 deletions

View File

@@ -2246,11 +2246,14 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
}
private void WriteFileWithOptionalSync(string path, byte[] payload)
=> WriteFileWithOptionalSync(path, payload, UnixFileMode.UserRead | UnixFileMode.UserWrite);
private void WriteFileWithOptionalSync(string path, byte[] payload, UnixFileMode perm)
{
Directory.CreateDirectory(Path.GetDirectoryName(path)!);
using var stream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None);
stream.Write(payload, 0, payload.Length);
stream.Flush(_fcfg.SyncAlways);
var err = WriteAtomically(path, payload, perm, _fcfg.SyncAlways);
if (err != null)
throw err;
}
private bool TryReadBlockIndexInfo(MessageBlock mb, byte[] lchk)
@@ -3193,14 +3196,65 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
// Lock should be held.
private void SetSyncTimerLocked()
{
_syncTmr?.Dispose();
_syncTmr = null;
=> SetSyncTimer();
// Lock should be held.
private void SetSyncTimer()
{
if (_fcfg.SyncInterval <= TimeSpan.Zero || IsClosed())
return;
_syncTmr = new Timer(_ => SyncBlocks(), null, _fcfg.SyncInterval, Timeout.InfiniteTimeSpan);
if (_syncTmr != null)
{
_syncTmr.Change(_fcfg.SyncInterval, Timeout.InfiniteTimeSpan);
return;
}
var half = TimeSpan.FromTicks(Math.Max(1, _fcfg.SyncInterval.Ticks / 2));
var jitterTicks = Random.Shared.NextInt64(Math.Max(1, half.Ticks));
var start = half + TimeSpan.FromTicks(jitterTicks);
_syncTmr = new Timer(_ => SyncBlocks(), null, start, Timeout.InfiniteTimeSpan);
}
// Lock should be held.
private void CancelSyncTimer()
{
_syncTmr?.Dispose();
_syncTmr = null;
}
private async Task FlushStreamStateLoop(ChannelReader<byte> qch, ChannelWriter<byte> done)
{
try
{
var writeThreshold = TimeSpan.FromMinutes(2);
var writeJitter = TimeSpan.FromSeconds(Random.Shared.Next(0, 30));
using var ticker = new PeriodicTimer(writeThreshold + writeJitter);
while (true)
{
var tickTask = ticker.WaitForNextTickAsync().AsTask();
var quitTask = qch.Completion;
var completed = await Task.WhenAny(tickTask, quitTask).ConfigureAwait(false);
if (ReferenceEquals(completed, quitTask))
break;
if (!await tickTask.ConfigureAwait(false))
break;
var err = WriteFullState();
if (err is UnauthorizedAccessException)
{
Warn("File system permission denied when flushing stream state: {0}", err.Message);
break;
}
}
}
finally
{
done.TryWrite(0);
done.TryComplete();
}
}
// -----------------------------------------------------------------------
@@ -3997,6 +4051,698 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
ss.LastNeedsUpdate = false;
}
// -----------------------------------------------------------------------
// Lifecycle helpers (Batch 14 Group 4)
// -----------------------------------------------------------------------
// Lock should be held.
private void PurgeMsgBlock(MessageBlock mb)
{
ArgumentNullException.ThrowIfNull(mb);
mb.Mu.EnterWriteLock();
ulong msgs;
ulong bytes;
try
{
msgs = Math.Min(mb.Msgs, _state.Msgs);
bytes = Math.Min(mb.Bytes, _state.Bytes);
if (_scheduling != null && mb.Schedules > 0)
{
for (var seq = mb.First.Seq; seq <= mb.Last.Seq; seq++)
{
_scheduling.Remove(seq);
if (seq == ulong.MaxValue)
break;
}
}
_state.Msgs -= msgs;
_state.Bytes -= bytes;
RemoveMsgBlock(mb);
mb.TryForceExpireCacheLocked();
}
finally
{
mb.Mu.ExitWriteLock();
}
SelectNextFirst();
if ((msgs > 0 || bytes > 0) && _scb != null)
_scb(-(long)msgs, -(long)bytes, 0, string.Empty);
}
// Lock should be held.
private void ResetGlobalPerSubjectInfo()
{
_psim ??= new SubjectTree<Psi>();
_psim.Reset();
_tsl = 0;
if (NoTrackSubjects())
return;
foreach (var mb in _blks)
PopulateGlobalPerSubjectInfo(mb);
}
// Lock should be held.
private void PopulateGlobalPerSubjectInfo(MessageBlock mb)
{
ArgumentNullException.ThrowIfNull(mb);
mb.Mu.EnterReadLock();
try
{
var fss = mb.Fss;
if (fss == null)
return;
_psim ??= new SubjectTree<Psi>();
fss.IterFast((bsubj, ss) =>
{
if (bsubj.Length == 0)
return true;
var (psi, ok) = _psim.Find(bsubj);
if (ok && psi != null)
{
psi.Total += ss.Msgs;
if (mb.Index > psi.Lblk)
psi.Lblk = mb.Index;
}
else
{
_psim.Insert(bsubj, new Psi
{
Total = ss.Msgs,
Fblk = mb.Index,
Lblk = mb.Index,
});
_tsl += bsubj.Length;
}
return true;
});
}
finally
{
mb.Mu.ExitReadLock();
}
}
private void CloseAllMsgBlocks(bool sync)
{
foreach (var mb in _blks)
CloseMsgBlock(mb, sync);
}
private static void CloseMsgBlock(MessageBlock mb, bool sync)
{
if (mb == null)
return;
mb.Mu.EnterWriteLock();
try
{
if (mb.Closed)
return;
mb.Ctmr?.Dispose();
mb.Ctmr = null;
mb.Fss = null;
mb.TryForceExpireCacheLocked();
if (mb.Qch != null)
{
mb.Qch.Writer.TryComplete();
mb.Qch = null;
}
if (mb.Mfd != null)
{
if (sync || mb.SyncAlways || mb.NeedSync)
mb.Mfd.Flush(true);
mb.Mfd.Dispose();
mb.Mfd = null;
}
mb.NeedSync = false;
mb.Closed = true;
}
catch (Exception ex)
{
mb.Werr = ex;
}
finally
{
mb.Mu.ExitWriteLock();
}
}
private Exception? WriteFullState()
=> WriteFullStateInternal(force: false);
private Exception? ForceWriteFullState()
=> WriteFullStateInternal(force: true);
private Exception? WriteFullStateInternal(bool force)
{
if (IsClosed())
return null;
if (Interlocked.Increment(ref _wfsrun) > 1 && !force)
{
Interlocked.Decrement(ref _wfsrun);
return null;
}
try
{
lock (_wfsmu)
{
try
{
StreamState state;
StreamState tracked = new();
List<(byte[] Subject, Psi Info)> subjects = [];
List<(uint Index, ulong Bytes, ulong FirstSeq, long FirstTs, ulong LastSeq, long LastTs, ulong NumDeleted, ulong Ttls, ulong Schedules, byte[] Dmap, byte[] Lchk, bool IsLast)> blocks = [];
int priorDirty;
_mu.EnterReadLock();
try
{
if (_dirty == 0 && !force)
return null;
state = _memStore.State();
priorDirty = _dirty;
if (_prf != null && _aek == null)
{
_mu.ExitReadLock();
_mu.EnterWriteLock();
try { SetupAEK(); }
finally { _mu.ExitWriteLock(); }
_mu.EnterReadLock();
}
if (_psim != null && _psim.Size() > 0)
{
_psim.Match(Encoding.UTF8.GetBytes(">"), (subject, psi) =>
{
if (psi != null)
{
subjects.Add((subject.ToArray(), new Psi
{
Total = psi.Total,
Fblk = psi.Fblk,
Lblk = psi.Lblk,
}));
}
return true;
});
}
foreach (var mb in _blks)
{
mb.Mu.EnterReadLock();
try
{
var numDeleted = (ulong)mb.Dmap.Size;
var dmap = numDeleted > 0 ? mb.Dmap.Encode(null) : Array.Empty<byte>();
var lchk = mb.Lchk.Length >= FullStateChecksumLength
? mb.Lchk.AsSpan(0, FullStateChecksumLength).ToArray()
: new byte[FullStateChecksumLength];
blocks.Add((
mb.Index,
mb.Bytes,
mb.First.Seq,
mb.First.Ts,
mb.Last.Seq,
mb.Last.Ts,
numDeleted,
mb.Ttls,
mb.Schedules,
dmap,
lchk,
ReferenceEquals(mb, _lmb)));
UpdateTrackingState(tracked, mb);
}
finally
{
mb.Mu.ExitReadLock();
}
}
}
finally
{
if (_mu.IsReadLockHeld)
_mu.ExitReadLock();
}
if (!TrackingStatesEqual(state, tracked))
{
Warn("Stream state encountered internal inconsistency on write");
RebuildState(null);
state = _memStore.State();
}
var baseTime = TimestampNormalized(state.FirstTime);
var buf = new List<byte>(4096)
{
FullStateMagic,
FullStateVersion,
};
AppendUVarInt(buf, state.Msgs);
AppendUVarInt(buf, state.Bytes);
AppendUVarInt(buf, state.FirstSeq);
AppendVarInt(buf, baseTime);
AppendUVarInt(buf, state.LastSeq);
AppendVarInt(buf, TimestampNormalized(state.LastTime));
AppendUVarInt(buf, (ulong)subjects.Count);
foreach (var (subject, info) in subjects)
{
AppendUVarInt(buf, (ulong)subject.Length);
buf.AddRange(subject);
AppendUVarInt(buf, info.Total);
AppendUVarInt(buf, info.Fblk);
if (info.Total > 1)
AppendUVarInt(buf, info.Lblk);
}
AppendUVarInt(buf, (ulong)blocks.Count);
uint lastBlockIndex = 0;
var lastChecksum = new byte[FullStateChecksumLength];
foreach (var block in blocks)
{
AppendUVarInt(buf, block.Index);
AppendUVarInt(buf, block.Bytes);
AppendUVarInt(buf, block.FirstSeq);
AppendVarInt(buf, block.FirstTs - baseTime);
AppendUVarInt(buf, block.LastSeq);
AppendVarInt(buf, block.LastTs - baseTime);
AppendUVarInt(buf, block.NumDeleted);
AppendUVarInt(buf, block.Ttls);
AppendUVarInt(buf, block.Schedules);
if (block.Dmap.Length > 0)
buf.AddRange(block.Dmap);
if (block.IsLast)
{
lastBlockIndex = block.Index;
Buffer.BlockCopy(block.Lchk, 0, lastChecksum, 0, Math.Min(lastChecksum.Length, block.Lchk.Length));
}
}
AppendUVarInt(buf, lastBlockIndex);
buf.AddRange(lastChecksum);
var payload = buf.ToArray();
if (_aek != null)
{
var nonce = new byte[_aek.NonceSize];
RandomNumberGenerator.Fill(nonce);
var encrypted = _aek.Seal(nonce, payload);
var sealedPayload = new byte[nonce.Length + encrypted.Length];
Buffer.BlockCopy(nonce, 0, sealedPayload, 0, nonce.Length);
Buffer.BlockCopy(encrypted, 0, sealedPayload, nonce.Length, encrypted.Length);
payload = sealedPayload;
}
var key = SHA256.HashData(Encoding.UTF8.GetBytes(_cfg.Config.Name));
using var hmac = new HMACSHA256(key);
var digest = hmac.ComputeHash(payload);
var output = new byte[payload.Length + FullStateChecksumLength];
Buffer.BlockCopy(payload, 0, output, 0, payload.Length);
Buffer.BlockCopy(digest, 0, output, payload.Length, FullStateChecksumLength);
var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile);
WriteFileWithOptionalSync(fn, output);
_mu.EnterWriteLock();
try
{
_dirty = Math.Max(0, _dirty - priorDirty);
}
finally
{
_mu.ExitWriteLock();
}
var ttlErr = WriteTTLState();
if (ttlErr != null)
return ttlErr;
var schedErr = WriteMsgSchedulingState();
if (schedErr != null)
return schedErr;
return null;
}
catch (Exception ex)
{
return ex;
}
}
}
finally
{
Interlocked.Decrement(ref _wfsrun);
}
}
private Exception? WriteTTLState()
{
_mu.EnterReadLock();
try
{
if (_ttls == null)
return null;
var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.TtlStreamStateFile);
var buf = _ttls.Encode(_state.LastSeq + 1);
WriteFileWithOptionalSync(fn, buf);
return null;
}
catch (Exception ex)
{
return ex;
}
finally
{
_mu.ExitReadLock();
}
}
private Exception? WriteMsgSchedulingState()
{
_mu.EnterReadLock();
try
{
if (_scheduling == null)
return null;
var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.MsgSchedulingStreamStateFile);
var buf = _scheduling.Encode(_state.LastSeq + 1);
WriteFileWithOptionalSync(fn, buf);
return null;
}
catch (Exception ex)
{
return ex;
}
finally
{
_mu.ExitReadLock();
}
}
private Exception? StopInternal(bool delete, bool writeState)
{
if (IsClosed())
return StoreErrors.ErrStoreClosed;
_mu.EnterWriteLock();
try
{
if (_closing)
return StoreErrors.ErrStoreClosed;
_closing = true;
if (_qch != null)
{
_qch.Writer.TryComplete();
_qch = null;
}
if (writeState)
CheckAndFlushLastBlock();
CloseAllMsgBlocks(sync: false);
CancelSyncTimer();
CancelAgeChk();
if (writeState)
{
_mu.ExitWriteLock();
_ = ForceWriteFullState();
_mu.EnterWriteLock();
}
_closed = true;
_lmb = null;
var cb = _scb;
var bytes = (long)_state.Bytes;
_mu.ExitWriteLock();
_cmu.EnterWriteLock();
var cfs = _cfs.ToArray();
_cfs.Clear();
_cmu.ExitWriteLock();
foreach (var consumer in cfs)
{
if (delete)
consumer.StreamDelete();
else
consumer.Stop();
}
_memStore.Stop();
if (bytes > 0 && cb != null)
cb(0, -bytes, 0, string.Empty);
return null;
}
finally
{
if (_mu.IsWriteLockHeld)
_mu.ExitWriteLock();
}
}
private (MemoryStream? Snapshot, Exception? Error) StreamSnapshot(bool includeConsumers)
{
try
{
_mu.EnterReadLock();
var state = _memStore.State();
var meta = JsonSerializer.SerializeToUtf8Bytes(_cfg);
var blocks = _blks.ToArray();
_mu.ExitReadLock();
var files = new Dictionary<string, byte[]>(StringComparer.Ordinal)
{
[FileStoreDefaults.JetStreamMetaFile] = meta,
[FileStoreDefaults.JetStreamMetaFileSum] = Encoding.ASCII.GetBytes(Convert.ToHexString(SHA256.HashData(meta)).ToLowerInvariant()),
};
foreach (var mb in blocks)
{
if (!File.Exists(mb.Mfn))
continue;
files[$"{FileStoreDefaults.MsgDir}/{string.Format(FileStoreDefaults.BlkScan, mb.Index)}"] = File.ReadAllBytes(mb.Mfn);
}
_ = ForceWriteFullState();
var stateFile = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile);
if (File.Exists(stateFile))
files[$"{FileStoreDefaults.MsgDir}/{FileStoreDefaults.StreamStateFile}"] = File.ReadAllBytes(stateFile);
if (includeConsumers)
{
_cmu.EnterReadLock();
var consumers = _cfs.ToArray();
_cmu.ExitReadLock();
var i = 0;
foreach (var consumer in consumers)
{
var (consumerState, err) = consumer.State();
if (err == null && consumerState != null)
files[$"{FileStoreDefaults.ConsumerDir}/consumer-{i++}.json"] = JsonSerializer.SerializeToUtf8Bytes(consumerState);
}
}
var payload = JsonSerializer.SerializeToUtf8Bytes(new SnapshotEnvelope
{
State = state,
Files = files,
});
return (new MemoryStream(payload, writable: false), null);
}
catch (Exception ex)
{
return (null, ex);
}
finally
{
if (_mu.IsReadLockHeld)
_mu.ExitReadLock();
if (_cmu.IsReadLockHeld)
_cmu.ExitReadLock();
}
}
private FileStoreConfig FileStoreConfig()
{
_mu.EnterReadLock();
try
{
return new FileStoreConfig
{
StoreDir = _fcfg.StoreDir,
BlockSize = _fcfg.BlockSize,
CacheExpire = _fcfg.CacheExpire,
SubjectStateExpire = _fcfg.SubjectStateExpire,
SyncInterval = _fcfg.SyncInterval,
SyncAlways = _fcfg.SyncAlways,
AsyncFlush = _fcfg.AsyncFlush,
Cipher = _fcfg.Cipher,
Compression = _fcfg.Compression,
Server = _fcfg.Server,
};
}
finally
{
_mu.ExitReadLock();
}
}
// Lock should be held.
private void ReadLockAllMsgBlocks()
{
foreach (var mb in _blks)
mb.Mu.EnterReadLock();
}
// Lock should be held.
private void ReadUnlockAllMsgBlocks()
{
foreach (var mb in _blks)
{
if (mb.Mu.IsReadLockHeld)
mb.Mu.ExitReadLock();
}
}
// All blocks should be at least read-locked.
private DeleteBlocks DeleteBlocks()
{
var dbs = new DeleteBlocks();
ulong prevLast = 0;
DeleteRange? prevRange = null;
var msgsSinceGap = false;
foreach (var mb in _blks)
{
var fseq = mb.First.Seq;
if (prevLast > 0 && prevLast + 1 != fseq)
{
var gap = fseq - prevLast - 1;
if (prevRange != null && !msgsSinceGap)
{
prevRange.Num += gap;
}
else
{
prevRange = new DeleteRange { First = prevLast + 1, Num = gap };
msgsSinceGap = false;
dbs.Add(prevRange);
}
}
if (mb.Dmap.Size > 0)
{
var deleted = new List<ulong>(mb.Dmap.Size);
mb.Dmap.Range(seq =>
{
deleted.Add(seq);
return true;
});
dbs.Add(new DeleteSlice([.. deleted]));
prevRange = null;
}
prevLast = mb.Last.Seq;
msgsSinceGap = msgsSinceGap || mb.Msgs > 0;
}
return dbs;
}
private SequenceSet DeleteMap()
{
var dmap = new SequenceSet();
_mu.EnterReadLock();
try
{
ReadLockAllMsgBlocks();
try
{
foreach (var mb in _blks)
{
if (mb.Dmap.Size == 0)
continue;
mb.Dmap.Range(seq =>
{
dmap.Insert(seq);
return true;
});
}
}
finally
{
ReadUnlockAllMsgBlocks();
}
}
finally
{
_mu.ExitReadLock();
}
return dmap;
}
private static void AppendUVarInt(List<byte> buffer, ulong value)
{
do
{
var b = (byte)(value & 0x7Fu);
value >>= 7;
if (value != 0)
b |= 0x80;
buffer.Add(b);
}
while (value != 0);
}
private static void AppendVarInt(List<byte> buffer, long value)
{
var encoded = (ulong)value << 1;
if (value < 0)
encoded = ~encoded;
AppendUVarInt(buffer, encoded);
}
// -----------------------------------------------------------------------
// IStreamStore — type / state
// -----------------------------------------------------------------------
@@ -4044,24 +4790,9 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
/// <inheritdoc/>
public void Stop()
{
_mu.EnterWriteLock();
try
{
if (_closing) return;
_closing = true;
}
finally
{
_mu.ExitWriteLock();
}
_ageChk?.Dispose();
_ageChk = null;
_syncTmr?.Dispose();
_syncTmr = null;
_closed = true;
_memStore.Stop();
var err = StopInternal(delete: false, writeState: true);
if (err != null && !ReferenceEquals(err, StoreErrors.ErrStoreClosed))
throw err;
}
/// <inheritdoc/>
@@ -4198,7 +4929,23 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
/// <inheritdoc/>
public void SyncDeleted(DeleteBlocks dbs)
=> _memStore.SyncDeleted(dbs);
{
if (dbs.Count == 0)
return;
_mu.EnterReadLock();
try
{
if (_state.LastSeq == 0)
return;
}
finally
{
_mu.ExitReadLock();
}
_memStore.SyncDeleted(dbs);
}
// -----------------------------------------------------------------------
// IStreamStore — config / admin (stubs)
@@ -4213,7 +4960,25 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
/// <inheritdoc/>
public void Delete(bool inline)
=> _memStore.Delete(inline);
{
if (IsClosed())
{
try { Directory.Delete(_fcfg.StoreDir, recursive: true); } catch { }
return;
}
_ = StopInternal(delete: true, writeState: false);
var remove = () =>
{
try { Directory.Delete(_fcfg.StoreDir, recursive: true); } catch { }
};
if (inline)
remove();
else
_ = Task.Run(remove);
}
/// <inheritdoc/>
public void ResetState()
@@ -4270,16 +5035,37 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
/// <inheritdoc/>
public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs)
{
var state = _memStore.State();
var payload = JsonSerializer.SerializeToUtf8Bytes(state);
var reader = new MemoryStream(payload, writable: false);
return (new SnapshotResult { Reader = reader, State = state }, null);
if (IsClosed())
return (null, StoreErrors.ErrStoreClosed);
if (checkMsgs)
{
var ld = CheckMsgs();
if (ld is { Msgs.Length: > 0 })
return (null, new InvalidDataException($"snapshot check detected {ld.Msgs.Length} bad messages"));
}
var (reader, err) = StreamSnapshot(includeConsumers);
if (err != null || reader == null)
return (null, err ?? new InvalidOperationException("snapshot generation failed"));
return (new SnapshotResult
{
Reader = reader,
State = _memStore.State(),
}, null);
}
/// <inheritdoc/>
public (ulong Total, ulong Reported, Exception? Error) Utilization()
=> _memStore.Utilization();
private sealed class SnapshotEnvelope
{
public StreamState State { get; set; } = new();
public Dictionary<string, byte[]> Files { get; set; } = new(StringComparer.Ordinal);
}
internal sealed class XorStreamCipher
{
private readonly byte[] _keySeed;