feat(batch15): complete group 5 msgblock/consumerfilestore

This commit is contained in:
Joseph Doherty
2026-02-28 18:12:57 -05:00
parent 43d914bf83
commit 854f410aad
3 changed files with 822 additions and 5 deletions

View File

@@ -33,6 +33,11 @@ namespace ZB.MOM.NatsNet.Server;
/// </summary>
internal sealed class MessageBlock
{
private const int MaxVarIntLength = 10;
private static readonly Exception ErrNoCache = new InvalidOperationException("no message cache");
private static readonly Exception ErrPartialCache = new InvalidOperationException("partial cache");
private static readonly Exception ErrDeletedMsg = new InvalidOperationException("deleted message");
// ------------------------------------------------------------------
// Identity fields — first/last use volatile-style access in Go via
// atomic.LoadUint64 on the embedded msgId structs.
@@ -1128,6 +1133,735 @@ internal sealed class MessageBlock
return (raw & ~(FileStoreDefaults.Dbit | FileStoreDefaults.Cbit), deleted);
}
// Will do a lookup from cache.
// This will copy the msg from the cache.
// Lock should be held.
internal (StoreMsg? Message, Exception? Error) CacheLookup(ulong seq, StoreMsg? sm)
=> CacheLookupEx(seq, sm, doCopy: true);
// Will do a lookup from cache.
// This will NOT copy the msg from the cache.
// Lock should be held.
internal (StoreMsg? Message, Exception? Error) CacheLookupNoCopy(ulong seq, StoreMsg? sm)
=> CacheLookupEx(seq, sm, doCopy: false);
// Will do a lookup from cache.
// Lock should be held.
internal (StoreMsg? Message, Exception? Error) CacheLookupEx(ulong seq, StoreMsg? sm, bool doCopy)
{
var fseq = First.Seq;
var lseq = Last.Seq;
if ((fseq > 0 && lseq == fseq - 1) || seq < fseq || seq > lseq)
return (null, StoreErrors.ErrStoreMsgNotFound);
if (Llseq == 0 || seq < Llseq || seq == Llseq + 1 || seq + 1 == Llseq)
Llseq = seq;
if (Dmap.Exists(seq))
{
Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
return (null, ErrDeletedMsg);
}
var cache = CacheData;
if (cache == null || cache.Fseq == 0 || cache.Idx.Length == 0 || cache.Buf.Length == 0)
{
if (cache != null)
TryForceExpireCacheLocked();
return (null, ErrNoCache);
}
if (seq < cache.Fseq)
return (null, ErrPartialCache);
var slot = seq - cache.Fseq;
if (slot >= (ulong)cache.Idx.Length)
return (null, StoreErrors.ErrStoreMsgNotFound);
var raw = cache.Idx[(int)slot];
var deleted = (raw & FileStoreDefaults.Dbit) != 0;
var hashChecked = (raw & FileStoreDefaults.Cbit) != 0;
if (deleted)
return (null, ErrDeletedMsg);
Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
var offset = (int)(raw & ~(FileStoreDefaults.Dbit | FileStoreDefaults.Cbit));
if ((uint)offset >= (uint)cache.Buf.Length)
return (null, ErrPartialCache);
var (fsm, parseErr) = MsgFromBufEx(cache.Buf.AsSpan(offset), sm, verifyChecksum: !hashChecked, doCopy: doCopy);
if (parseErr != null || fsm == null)
return (null, parseErr ?? StoreErrors.ErrStoreMsgNotFound);
if (fsm.Seq == 0)
return (null, ErrDeletedMsg);
if (seq != fsm.Seq)
{
TryForceExpireCacheLocked();
return (null, new InvalidOperationException($"sequence numbers for cache load did not match, {seq} vs {fsm.Seq}"));
}
if (!hashChecked)
cache.Idx[(int)slot] = raw | FileStoreDefaults.Cbit;
return (fsm, null);
}
// Internal function to return msg parts from a raw buffer.
// Raw buffer will be copied into sm.
// Lock should be held.
internal (StoreMsg? Message, Exception? Error) MsgFromBuf(ReadOnlySpan<byte> buf, StoreMsg? sm, bool verifyChecksum)
=> MsgFromBufEx(buf, sm, verifyChecksum, doCopy: true);
// Internal function to return msg parts from a raw buffer.
// Raw buffer will NOT be copied into sm.
// Lock should be held.
internal (StoreMsg? Message, Exception? Error) MsgFromBufNoCopy(ReadOnlySpan<byte> buf, StoreMsg? sm, bool verifyChecksum)
=> MsgFromBufEx(buf, sm, verifyChecksum, doCopy: false);
// Internal function to return msg parts from a raw buffer.
// copy boolean determines if message/header buffers are copied.
// Lock should be held.
internal (StoreMsg? Message, Exception? Error) MsgFromBufEx(ReadOnlySpan<byte> buf, StoreMsg? sm, bool verifyChecksum, bool doCopy)
{
if (!TryReadRecordHeader(buf, out var seqRaw, out var ts, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out var parseErr))
return (null, parseErr ?? new ErrBadMsg(Mfn, "record too short"));
if (recordLength > buf.Length)
return (null, new ErrBadMsg(Mfn, "record extends past buffer"));
var record = buf[..recordLength];
var payloadLength = recordLength - FileStoreDefaults.RecordHashSize;
var payload = record[..payloadLength];
var checksum = record[payloadLength..];
if (verifyChecksum)
{
var computed = SHA256.HashData(payload);
if (!computed.AsSpan(0, FileStoreDefaults.RecordHashSize).SequenceEqual(checksum))
return (null, new ErrBadMsg(Mfn, "invalid checksum"));
}
var seq = seqRaw;
if ((seq & FileStoreDefaults.Ebit) != 0)
seq = 0;
seq &= ~FileStoreDefaults.Tbit;
sm ??= new StoreMsg();
sm.Clear();
sm.Seq = seq;
sm.Ts = ts;
var subjectOffset = 8 + 8 + 4 + 4 + 4;
var headerOffset = subjectOffset + subjectLength;
var msgOffset = headerOffset + headerLength;
if (msgOffset + msgLength > record.Length)
return (null, new ErrBadMsg(Mfn, "invalid message bounds"));
sm.Subject = subjectLength > 0
? Encoding.UTF8.GetString(record.Slice(subjectOffset, subjectLength))
: string.Empty;
var headerSlice = record.Slice(headerOffset, headerLength);
var msgSlice = record.Slice(msgOffset, msgLength);
if (doCopy)
{
sm.Hdr = headerSlice.ToArray();
sm.Msg = msgSlice.ToArray();
}
else
{
// StoreMsg uses byte[] fields, so no-copy mode still exposes slices as arrays.
sm.Hdr = headerSlice.ToArray();
sm.Msg = msgSlice.ToArray();
}
sm.Buf = new byte[sm.Hdr.Length + sm.Msg.Length];
if (sm.Hdr.Length > 0)
Buffer.BlockCopy(sm.Hdr, 0, sm.Buf, 0, sm.Hdr.Length);
if (sm.Msg.Length > 0)
Buffer.BlockCopy(sm.Msg, 0, sm.Buf, sm.Hdr.Length, sm.Msg.Length);
return (sm, null);
}
// readIndexInfo will read in the index information for this message block.
internal Exception? ReadIndexInfo()
{
var dir = Path.GetDirectoryName(Mfn);
if (string.IsNullOrWhiteSpace(dir))
return new InvalidOperationException("message block path is missing directory");
var ifn = Path.Combine(dir, string.Format(FileStoreDefaults.IndexScan, Index));
byte[] buf;
try
{
buf = File.ReadAllBytes(ifn);
}
catch (Exception ex)
{
return ex;
}
if (Liwsz == 0)
Liwsz = buf.Length;
if (Aek != null)
{
if (Nonce == null || Nonce.Length == 0)
return new InvalidOperationException("missing block nonce");
try
{
buf = Aek.Open(Nonce, buf);
}
catch (Exception ex)
{
return ex;
}
}
var headerErr = JetStreamFileStore.CheckNewHeader(buf);
if (headerErr != null)
{
TryDeleteFile(ifn);
return new InvalidDataException("bad index file");
}
var bi = FileStoreDefaults.HdrLen;
if (!TryReadUVarInt(buf, ref bi, out var msgs) ||
!TryReadUVarInt(buf, ref bi, out var bytes) ||
!TryReadUVarInt(buf, ref bi, out var firstSeq) ||
!TryReadVarInt(buf, ref bi, out var firstTs) ||
!TryReadUVarInt(buf, ref bi, out var lastSeq) ||
!TryReadVarInt(buf, ref bi, out var lastTs) ||
!TryReadUVarInt(buf, ref bi, out var dmapLen))
{
TryDeleteFile(ifn);
return new InvalidDataException("short index file");
}
Msgs = msgs;
Bytes = bytes;
First = new MsgId { Seq = firstSeq, Ts = firstTs };
Last = new MsgId { Seq = lastSeq, Ts = lastTs };
var expected = Last.Seq >= First.Seq ? Last.Seq - First.Seq + 1 : 0;
var activeExpected = expected >= dmapLen ? expected - dmapLen : 0;
if (Msgs != activeExpected)
{
TryDeleteFile(ifn);
return new InvalidDataException("accounting inconsistent");
}
if (bi < 0 || bi + FileStoreDefaults.RecordHashSize > buf.Length)
{
TryDeleteFile(ifn);
return new InvalidDataException("short index file");
}
Lchk = buf.AsSpan(bi, FileStoreDefaults.RecordHashSize).ToArray();
bi += FileStoreDefaults.RecordHashSize;
if (dmapLen > 0)
{
if (buf[1] == FileStoreDefaults.NewVersion)
{
try
{
var (decoded, consumed) = SequenceSet.Decode(buf.AsSpan(bi));
if (consumed <= 0)
return new InvalidDataException("could not decode dmap");
Dmap = decoded;
}
catch (Exception ex)
{
return ex;
}
}
else
{
var fseq = First.Seq;
for (var i = 0UL; i < dmapLen; i++)
{
if (!TryReadUVarInt(buf, ref bi, out var relSeq))
break;
Dmap.Insert(relSeq + fseq);
}
}
}
return null;
}
// Return all active tombstones in this block.
internal MsgId[] Tombs()
{
Mu.EnterWriteLock();
try
{
return TombsLocked();
}
finally
{
Mu.ExitWriteLock();
}
}
// Return all active tombstones in this block.
// Write lock should be held.
internal MsgId[] TombsLocked()
{
if (CacheNotLoaded())
{
var loadErr = LoadMsgsWithLock();
if (loadErr != null)
return Array.Empty<MsgId>();
}
try
{
var cache = CacheData;
if (cache == null || cache.Buf.Length == 0)
return Array.Empty<MsgId>();
var tombs = new List<MsgId>();
var offset = 0;
while (offset < cache.Buf.Length)
{
if (!TryReadRecordHeader(cache.Buf.AsSpan(offset), out var seqRaw, out var ts, out _, out _, out _, out var recordLength, out _))
break;
if (recordLength <= 0 || offset + recordLength > cache.Buf.Length)
break;
if ((seqRaw & FileStoreDefaults.Tbit) != 0)
tombs.Add(new MsgId { Seq = seqRaw & ~FileStoreDefaults.Tbit, Ts = ts });
offset += recordLength;
}
return tombs.ToArray();
}
finally
{
FinishedWithCache();
}
}
// fs lock should be held.
internal int NumPriorTombs()
{
Mu.EnterWriteLock();
try
{
return NumPriorTombsLocked();
}
finally
{
Mu.ExitWriteLock();
}
}
// Return number of tombstones for messages prior to this block.
// Write lock should be held for block.
internal int NumPriorTombsLocked()
{
if (CacheNotLoaded())
{
var loadErr = LoadMsgsWithLock();
if (loadErr != null)
return 0;
}
try
{
var cache = CacheData;
if (cache == null || cache.Buf.Length == 0)
return 0;
var fsFirstSeq = Fs?.State().FirstSeq ?? 0;
ulong blockFirst = 0;
var tombs = 0;
var offset = 0;
while (offset < cache.Buf.Length)
{
if (!TryReadRecordHeader(cache.Buf.AsSpan(offset), out var seqRaw, out _, out _, out _, out _, out var recordLength, out _))
break;
if (recordLength <= 0 || offset + recordLength > cache.Buf.Length)
break;
if ((seqRaw & FileStoreDefaults.Tbit) != 0)
{
var tseq = seqRaw & ~FileStoreDefaults.Tbit;
if (tseq >= fsFirstSeq && (blockFirst == 0 || tseq < blockFirst))
tombs++;
offset += recordLength;
continue;
}
var seq = seqRaw & ~FileStoreDefaults.Ebit;
if (seq != 0 && (seqRaw & FileStoreDefaults.Ebit) == 0 && blockFirst == 0)
blockFirst = seq;
offset += recordLength;
}
return tombs;
}
finally
{
FinishedWithCache();
}
}
// Called by purge to simply get rid of cache and close fds.
internal void DirtyClose()
{
Mu.EnterWriteLock();
try
{
_ = DirtyCloseWithRemove(remove: false);
}
finally
{
Mu.ExitWriteLock();
}
}
// Should be called with write lock held.
internal Exception? DirtyCloseWithRemove(bool remove)
{
if (Ctmr != null)
{
Ctmr.Dispose();
Ctmr = null;
}
ClearCacheAndOffset();
Qch?.Writer.TryComplete();
Qch = null;
Mfd?.Dispose();
Mfd = null;
if (!remove)
return null;
Fss = null;
if (!string.IsNullOrEmpty(Mfn))
{
try
{
File.Delete(Mfn);
Mfn = string.Empty;
}
catch (Exception ex)
{
return ex;
}
}
if (!string.IsNullOrEmpty(Kfn))
{
try
{
File.Delete(Kfn);
}
catch (Exception ex)
{
return ex;
}
}
return null;
}
// Remove a sequence from per-subject state and track whether first/last need recompute.
// Lock should be held.
internal ulong RemoveSeqPerSubject(string subj, ulong seq)
{
_ = EnsurePerSubjectInfoLoaded();
if (Fss == null || string.IsNullOrEmpty(subj))
return 0;
var bsubj = Encoding.UTF8.GetBytes(subj);
var (ss, ok) = Fss.Find(bsubj);
if (!ok || ss == null)
return 0;
if (ss.Msgs == 1)
{
Fss.Delete(bsubj);
return 0;
}
ss.Msgs--;
if (ss.Msgs == 1)
{
if (!ss.LastNeedsUpdate && seq != ss.Last)
{
ss.First = ss.Last;
ss.FirstNeedsUpdate = false;
return 1;
}
if (!ss.FirstNeedsUpdate && seq != ss.First)
{
ss.Last = ss.First;
ss.LastNeedsUpdate = false;
return 1;
}
}
ss.FirstNeedsUpdate = seq == ss.First || ss.FirstNeedsUpdate;
ss.LastNeedsUpdate = seq == ss.Last || ss.LastNeedsUpdate;
return ss.Msgs;
}
// Recalculate first/last sequence values for a subject in this block.
// Lock should be held.
internal void RecalculateForSubj(string subj, SimpleState ss)
{
if (string.IsNullOrEmpty(subj) || ss == null)
return;
var loadedHere = false;
if (CacheNotLoaded())
{
if (LoadMsgsWithLock() != null)
return;
loadedHere = true;
}
try
{
var cache = CacheData;
if (cache == null || cache.Idx.Length == 0 || cache.Buf.Length == 0)
return;
var startSlot = (int)Math.Max(0, (long)ss.First - (long)cache.Fseq);
if (startSlot >= cache.Idx.Length)
{
ss.First = ss.Last;
ss.FirstNeedsUpdate = false;
ss.LastNeedsUpdate = false;
return;
}
var endSlot = (int)Math.Max(0, (long)ss.Last - (long)cache.Fseq);
if (endSlot >= cache.Idx.Length)
endSlot = cache.Idx.Length - 1;
if (startSlot > endSlot)
return;
if (ss.FirstNeedsUpdate)
{
ss.FirstNeedsUpdate = false;
var minSeq = ss.First + 1;
if (minSeq < First.Seq)
minSeq = First.Seq;
var scratch = new StoreMsg();
for (var slot = startSlot; slot < cache.Idx.Length; slot++)
{
var raw = cache.Idx[slot] & ~FileStoreDefaults.Cbit;
if (raw == FileStoreDefaults.Dbit)
continue;
var offset = (int)raw;
if ((uint)offset >= (uint)cache.Buf.Length)
{
ss.First = ss.Last;
ss.LastNeedsUpdate = false;
return;
}
var (sm, err) = MsgFromBufNoCopy(cache.Buf.AsSpan(offset), scratch, verifyChecksum: false);
if (err != null || sm == null || sm.Seq == 0)
continue;
if (sm.Subject != subj || sm.Seq < minSeq || Dmap.Exists(sm.Seq))
continue;
ss.First = sm.Seq;
if (ss.Msgs == 1)
{
ss.Last = sm.Seq;
ss.LastNeedsUpdate = false;
return;
}
startSlot = slot;
break;
}
}
if (ss.LastNeedsUpdate)
{
ss.LastNeedsUpdate = false;
var maxSeq = ss.Last > 0 ? ss.Last - 1 : 0;
if (maxSeq > Last.Seq)
maxSeq = Last.Seq;
var scratch = new StoreMsg();
for (var slot = endSlot; slot >= startSlot; slot--)
{
var raw = cache.Idx[slot] & ~FileStoreDefaults.Cbit;
if (raw == FileStoreDefaults.Dbit)
continue;
var offset = (int)raw;
if ((uint)offset >= (uint)cache.Buf.Length)
return;
var (sm, err) = MsgFromBufNoCopy(cache.Buf.AsSpan(offset), scratch, verifyChecksum: false);
if (err != null || sm == null || sm.Seq == 0)
continue;
if (sm.Subject != subj || sm.Seq > maxSeq || Dmap.Exists(sm.Seq))
continue;
ss.Last = sm.Seq < ss.First ? ss.First : sm.Seq;
if (ss.Msgs == 1)
{
ss.First = ss.Last;
ss.FirstNeedsUpdate = false;
}
return;
}
}
}
finally
{
if (loadedHere)
FinishedWithCache();
}
}
// Lock should be held.
internal Exception? ResetPerSubjectInfo()
{
Fss = null;
return GeneratePerSubjectInfo();
}
// generatePerSubjectInfo rebuilds per-subject state from the cached block data.
// Lock should be held.
internal Exception? GeneratePerSubjectInfo()
{
if (Msgs == 0)
return null;
var loadedHere = false;
if (CacheNotLoaded())
{
var loadErr = LoadMsgsWithLock();
if (loadErr != null)
return loadErr;
if (Fss != null)
return null;
loadedHere = true;
}
try
{
Fss = new SubjectTree<SimpleState>();
var scratch = new StoreMsg();
for (var seq = First.Seq; seq <= Last.Seq; seq++)
{
if (Dmap.Exists(seq))
{
if (seq == ulong.MaxValue)
break;
continue;
}
var (sm, err) = CacheLookupNoCopy(seq, scratch);
if (err != null)
{
if (ReferenceEquals(err, StoreErrors.ErrStoreMsgNotFound) || IsDeletedMsgError(err))
{
if (seq == ulong.MaxValue)
break;
continue;
}
if (ReferenceEquals(err, ErrNoCache))
return null;
return err;
}
if (sm != null && !string.IsNullOrEmpty(sm.Subject))
{
var subjBytes = Encoding.UTF8.GetBytes(sm.Subject);
var (state, exists) = Fss.Find(subjBytes);
if (exists && state != null)
{
state.Msgs++;
state.Last = seq;
state.LastNeedsUpdate = false;
}
else
{
Fss.Insert(subjBytes, new SimpleState
{
Msgs = 1,
First = seq,
Last = seq,
});
}
}
if (seq == ulong.MaxValue)
break;
}
if (Fss.Size() > 0)
{
Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
Lsts = Llts;
StartCacheExpireTimer();
}
return null;
}
finally
{
if (loadedHere)
FinishedWithCache();
}
}
// Helper to make sure per-subject state is loaded if we are tracking subjects.
// Lock should be held.
internal Exception? EnsurePerSubjectInfoLoaded()
{
if (Fss != null || NoTrack)
{
if (Fss != null)
Lsts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
return null;
}
if (Msgs == 0)
{
Fss = new SubjectTree<SimpleState>();
return null;
}
return GeneratePerSubjectInfo();
}
internal void SpinUpFlushLoop()
{
Mu.EnterWriteLock();
@@ -2195,14 +2929,29 @@ internal sealed class MessageBlock
}
}
private TimeSpan SinceLastActivity()
internal TimeSpan SinceLastActivity()
{
if (Closed)
return TimeSpan.Zero;
var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
var last = Math.Max(Math.Max(Lrts, Llts), Math.Max(Lwts, Lsts));
var delta = now - last;
return NanosecondsToTimeSpan(delta);
}
// Determine time since last write or remove of a message.
// Read lock should be held.
internal TimeSpan SinceLastWriteActivity()
{
if (Closed)
return TimeSpan.Zero;
var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
var last = Math.Max(Lwts, Lrts);
return NanosecondsToTimeSpan(now - last);
}
private static long TimeSpanToNanoseconds(TimeSpan value)
=> value <= TimeSpan.Zero ? 0 : checked(value.Ticks * 100L);
@@ -2360,6 +3109,74 @@ internal sealed class MessageBlock
return true;
}
private static bool TryReadUVarInt(ReadOnlySpan<byte> source, ref int index, out ulong value)
{
value = 0;
var shift = 0;
for (var i = 0; i < MaxVarIntLength; i++)
{
if ((uint)index >= (uint)source.Length)
{
index = -1;
value = 0;
return false;
}
var b = source[index++];
if (b < 0x80)
{
if (i == MaxVarIntLength - 1 && b > 1)
{
index = -1;
value = 0;
return false;
}
value |= (ulong)b << shift;
return true;
}
value |= (ulong)(b & 0x7F) << shift;
shift += 7;
}
index = -1;
value = 0;
return false;
}
private static bool TryReadVarInt(ReadOnlySpan<byte> source, ref int index, out long value)
{
if (!TryReadUVarInt(source, ref index, out var unsigned))
{
value = 0;
return false;
}
value = (long)(unsigned >> 1);
if ((unsigned & 1) != 0)
value = ~value;
return true;
}
private static bool IsDeletedMsgError(Exception? ex)
=> ex is InvalidOperationException ioe &&
string.Equals(ioe.Message, ErrDeletedMsg.Message, StringComparison.Ordinal);
private static void TryDeleteFile(string path)
{
try
{
if (File.Exists(path))
File.Delete(path);
}
catch
{
// best effort
}
}
private static bool TryValidateRecordBuffer(ReadOnlySpan<byte> buf, out int validLength)
{
validLength = 0;