feat(batch11): complete group2 filestore state and utility paths

This commit is contained in:
Joseph Doherty
2026-02-28 13:57:10 -05:00
parent cbef52bb34
commit 0b3fe7d78a
6 changed files with 1387 additions and 1 deletions

View File

@@ -257,6 +257,217 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
: FileStoreDefaults.DefaultMediumBlockSize;
}
internal LostStreamData? LostData()
{
_mu.EnterReadLock();
try
{
if (_ld == null)
return null;
return new LostStreamData
{
Msgs = [.. _ld.Msgs],
Bytes = _ld.Bytes,
};
}
finally
{
_mu.ExitReadLock();
}
}
internal void AddLostData(LostStreamData? ld)
{
_mu.EnterWriteLock();
try
{
AddLostDataLocked(ld);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void RemoveFromLostData(ulong seq)
{
_mu.EnterWriteLock();
try
{
if (_ld == null)
return;
var index = Array.IndexOf(_ld.Msgs, seq);
if (index < 0)
return;
var msgs = new ulong[_ld.Msgs.Length - 1];
if (index > 0)
Array.Copy(_ld.Msgs, 0, msgs, 0, index);
if (index < _ld.Msgs.Length - 1)
Array.Copy(_ld.Msgs, index + 1, msgs, index, _ld.Msgs.Length - index - 1);
if (msgs.Length == 0)
_ld = null;
else
_ld.Msgs = msgs;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void RebuildState(LostStreamData? ld)
{
_mu.EnterWriteLock();
try
{
RebuildStateLocked(ld);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void RebuildStateLocked(LostStreamData? ld)
{
AddLostDataLocked(ld);
_state.Msgs = 0;
_state.Bytes = 0;
_state.FirstSeq = 0;
_state.LastSeq = 0;
_state.FirstTime = default;
_state.LastTime = default;
foreach (var mb in _blks)
{
mb.Mu.EnterReadLock();
try
{
_state.Msgs += mb.Msgs;
_state.Bytes += mb.Bytes;
var firstSeq = mb.First.Seq;
if (_state.FirstSeq == 0 || (firstSeq < _state.FirstSeq && mb.First.Ts != 0))
{
_state.FirstSeq = firstSeq;
_state.FirstTime = mb.First.Ts == 0 ? default : FromUnixNanosUtc(mb.First.Ts);
}
// Preserve the highest last-seq timestamp even when a block's terminal record is erased.
var lastSeq = mb.Last.Seq;
if (lastSeq >= _state.LastSeq)
{
_state.LastSeq = lastSeq;
_state.LastTime = mb.Last.Ts == 0 ? default : FromUnixNanosUtc(mb.Last.Ts);
}
}
finally
{
mb.Mu.ExitReadLock();
}
}
_state.Lost = _ld == null
? null
: new LostStreamData { Msgs = [.. _ld.Msgs], Bytes = _ld.Bytes };
}
internal static void UpdateTrackingState(StreamState state, MessageBlock mb)
{
ArgumentNullException.ThrowIfNull(state);
ArgumentNullException.ThrowIfNull(mb);
var first = mb.First.Seq;
var last = mb.Last.Seq;
if (state.FirstSeq == 0)
state.FirstSeq = first;
else if (first < state.FirstSeq && mb.First.Ts != 0)
state.FirstSeq = first;
if (last > state.LastSeq)
state.LastSeq = last;
state.Msgs += mb.Msgs;
state.Bytes += mb.Bytes;
}
internal static bool TrackingStatesEqual(StreamState fs, StreamState mb)
{
ArgumentNullException.ThrowIfNull(fs);
ArgumentNullException.ThrowIfNull(mb);
// Brand-new state may use FirstSeq=0 while tracked block state starts at 1.
if ((fs.FirstSeq > 1 && mb.FirstSeq > 1) || mb.FirstSeq > 1)
return fs.Msgs == mb.Msgs && fs.FirstSeq == mb.FirstSeq && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes;
return fs.Msgs == mb.Msgs && fs.LastSeq == mb.LastSeq && fs.Bytes == mb.Bytes;
}
internal static List<MessageBlock>? CopyMsgBlocks(List<MessageBlock>? src)
{
if (src == null)
return null;
return [.. src];
}
internal static void KickFlusher(Channel<byte>? fch)
{
if (fch == null)
return;
_ = fch.Writer.TryWrite(0);
}
internal static ulong FileStoreMsgSizeRaw(int slen, int hlen, int mlen)
{
if (hlen == 0)
return (ulong)(22 + slen + mlen + 8);
return (ulong)(22 + slen + 4 + hlen + mlen + 8);
}
internal static ulong FileStoreMsgSize(string subj, byte[]? hdr, byte[]? msg)
=> FileStoreMsgSizeRaw(subj.Length, hdr?.Length ?? 0, msg?.Length ?? 0);
internal static ulong FileStoreMsgSizeEstimate(int slen, int maxPayload)
=> (ulong)(30 + slen + 4 + maxPayload);
internal static Exception? CheckNewHeader(byte[]? hdr)
{
if (hdr == null || hdr.Length < 2)
return new InvalidDataException("corrupt state");
if (hdr[0] != FileStoreDefaults.FileStoreMagic ||
(hdr[1] != FileStoreDefaults.FileStoreVersion && hdr[1] != FileStoreDefaults.NewVersion))
{
return new InvalidDataException("corrupt state");
}
return null;
}
internal static bool SubjectsEqual(string a, string b) => a == b;
internal static bool SubjectsAll(string a, string b) => true;
internal static Func<string, string, bool> CompareFn(string subject)
{
if (string.IsNullOrEmpty(subject) || subject == ">")
return SubjectsAll;
if (SubscriptionIndex.SubjectHasWildcard(subject))
return SubscriptionIndex.SubjectIsSubsetMatch;
return SubjectsEqual;
}
internal static AeadCipher GenEncryptionKey(StoreCipher sc, byte[] seed)
{
ArgumentNullException.ThrowIfNull(seed);
@@ -302,6 +513,50 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
};
}
private void AddLostDataLocked(LostStreamData? ld)
{
if (ld == null)
return;
if (_ld != null)
{
var known = new HashSet<ulong>(_ld.Msgs);
var merged = new List<ulong>(_ld.Msgs);
var added = false;
foreach (var seq in ld.Msgs)
{
if (known.Add(seq))
{
merged.Add(seq);
added = true;
}
}
if (added)
{
merged.Sort();
_ld.Msgs = [.. merged];
_ld.Bytes += ld.Bytes;
}
}
else
{
_ld = new LostStreamData
{
Msgs = [.. ld.Msgs],
Bytes = ld.Bytes,
};
}
}
private static DateTime FromUnixNanosUtc(long nanos)
{
var seconds = nanos / 1_000_000_000L;
var remainderNanos = nanos % 1_000_000_000L;
return DateTimeOffset.FromUnixTimeSeconds(seconds).AddTicks(remainderNanos / 100L).UtcDateTime;
}
internal void RecoverAEK()
{
if (_prf == null || _aek != null)