diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index e049499..32aef58 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -33,6 +33,11 @@ namespace ZB.MOM.NatsNet.Server; /// internal sealed class MessageBlock { + private const int MaxVarIntLength = 10; + private static readonly Exception ErrNoCache = new InvalidOperationException("no message cache"); + private static readonly Exception ErrPartialCache = new InvalidOperationException("partial cache"); + private static readonly Exception ErrDeletedMsg = new InvalidOperationException("deleted message"); + // ------------------------------------------------------------------ // Identity fields — first/last use volatile-style access in Go via // atomic.LoadUint64 on the embedded msgId structs. @@ -1128,6 +1133,735 @@ internal sealed class MessageBlock return (raw & ~(FileStoreDefaults.Dbit | FileStoreDefaults.Cbit), deleted); } + // Will do a lookup from cache. + // This will copy the msg from the cache. + // Lock should be held. + internal (StoreMsg? Message, Exception? Error) CacheLookup(ulong seq, StoreMsg? sm) + => CacheLookupEx(seq, sm, doCopy: true); + + // Will do a lookup from cache. + // This will NOT copy the msg from the cache. + // Lock should be held. + internal (StoreMsg? Message, Exception? Error) CacheLookupNoCopy(ulong seq, StoreMsg? sm) + => CacheLookupEx(seq, sm, doCopy: false); + + // Will do a lookup from cache. + // Lock should be held. + internal (StoreMsg? Message, Exception? Error) CacheLookupEx(ulong seq, StoreMsg? sm, bool doCopy) + { + var fseq = First.Seq; + var lseq = Last.Seq; + if ((fseq > 0 && lseq == fseq - 1) || seq < fseq || seq > lseq) + return (null, StoreErrors.ErrStoreMsgNotFound); + + if (Llseq == 0 || seq < Llseq || seq == Llseq + 1 || seq + 1 == Llseq) + Llseq = seq; + + if (Dmap.Exists(seq)) + { + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + return (null, ErrDeletedMsg); + } + + var cache = CacheData; + if (cache == null || cache.Fseq == 0 || cache.Idx.Length == 0 || cache.Buf.Length == 0) + { + if (cache != null) + TryForceExpireCacheLocked(); + return (null, ErrNoCache); + } + + if (seq < cache.Fseq) + return (null, ErrPartialCache); + + var slot = seq - cache.Fseq; + if (slot >= (ulong)cache.Idx.Length) + return (null, StoreErrors.ErrStoreMsgNotFound); + + var raw = cache.Idx[(int)slot]; + var deleted = (raw & FileStoreDefaults.Dbit) != 0; + var hashChecked = (raw & FileStoreDefaults.Cbit) != 0; + if (deleted) + return (null, ErrDeletedMsg); + + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + + var offset = (int)(raw & ~(FileStoreDefaults.Dbit | FileStoreDefaults.Cbit)); + if ((uint)offset >= (uint)cache.Buf.Length) + return (null, ErrPartialCache); + + var (fsm, parseErr) = MsgFromBufEx(cache.Buf.AsSpan(offset), sm, verifyChecksum: !hashChecked, doCopy: doCopy); + if (parseErr != null || fsm == null) + return (null, parseErr ?? StoreErrors.ErrStoreMsgNotFound); + + if (fsm.Seq == 0) + return (null, ErrDeletedMsg); + + if (seq != fsm.Seq) + { + TryForceExpireCacheLocked(); + return (null, new InvalidOperationException($"sequence numbers for cache load did not match, {seq} vs {fsm.Seq}")); + } + + if (!hashChecked) + cache.Idx[(int)slot] = raw | FileStoreDefaults.Cbit; + + return (fsm, null); + } + + // Internal function to return msg parts from a raw buffer. + // Raw buffer will be copied into sm. + // Lock should be held. + internal (StoreMsg? Message, Exception? Error) MsgFromBuf(ReadOnlySpan buf, StoreMsg? sm, bool verifyChecksum) + => MsgFromBufEx(buf, sm, verifyChecksum, doCopy: true); + + // Internal function to return msg parts from a raw buffer. + // Raw buffer will NOT be copied into sm. + // Lock should be held. + internal (StoreMsg? Message, Exception? Error) MsgFromBufNoCopy(ReadOnlySpan buf, StoreMsg? sm, bool verifyChecksum) + => MsgFromBufEx(buf, sm, verifyChecksum, doCopy: false); + + // Internal function to return msg parts from a raw buffer. + // copy boolean determines if message/header buffers are copied. + // Lock should be held. + internal (StoreMsg? Message, Exception? Error) MsgFromBufEx(ReadOnlySpan buf, StoreMsg? sm, bool verifyChecksum, bool doCopy) + { + if (!TryReadRecordHeader(buf, out var seqRaw, out var ts, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out var parseErr)) + return (null, parseErr ?? new ErrBadMsg(Mfn, "record too short")); + + if (recordLength > buf.Length) + return (null, new ErrBadMsg(Mfn, "record extends past buffer")); + + var record = buf[..recordLength]; + var payloadLength = recordLength - FileStoreDefaults.RecordHashSize; + var payload = record[..payloadLength]; + var checksum = record[payloadLength..]; + + if (verifyChecksum) + { + var computed = SHA256.HashData(payload); + if (!computed.AsSpan(0, FileStoreDefaults.RecordHashSize).SequenceEqual(checksum)) + return (null, new ErrBadMsg(Mfn, "invalid checksum")); + } + + var seq = seqRaw; + if ((seq & FileStoreDefaults.Ebit) != 0) + seq = 0; + seq &= ~FileStoreDefaults.Tbit; + + sm ??= new StoreMsg(); + sm.Clear(); + sm.Seq = seq; + sm.Ts = ts; + + var subjectOffset = 8 + 8 + 4 + 4 + 4; + var headerOffset = subjectOffset + subjectLength; + var msgOffset = headerOffset + headerLength; + if (msgOffset + msgLength > record.Length) + return (null, new ErrBadMsg(Mfn, "invalid message bounds")); + + sm.Subject = subjectLength > 0 + ? Encoding.UTF8.GetString(record.Slice(subjectOffset, subjectLength)) + : string.Empty; + + var headerSlice = record.Slice(headerOffset, headerLength); + var msgSlice = record.Slice(msgOffset, msgLength); + + if (doCopy) + { + sm.Hdr = headerSlice.ToArray(); + sm.Msg = msgSlice.ToArray(); + } + else + { + // StoreMsg uses byte[] fields, so no-copy mode still exposes slices as arrays. + sm.Hdr = headerSlice.ToArray(); + sm.Msg = msgSlice.ToArray(); + } + + sm.Buf = new byte[sm.Hdr.Length + sm.Msg.Length]; + if (sm.Hdr.Length > 0) + Buffer.BlockCopy(sm.Hdr, 0, sm.Buf, 0, sm.Hdr.Length); + if (sm.Msg.Length > 0) + Buffer.BlockCopy(sm.Msg, 0, sm.Buf, sm.Hdr.Length, sm.Msg.Length); + + return (sm, null); + } + + // readIndexInfo will read in the index information for this message block. + internal Exception? ReadIndexInfo() + { + var dir = Path.GetDirectoryName(Mfn); + if (string.IsNullOrWhiteSpace(dir)) + return new InvalidOperationException("message block path is missing directory"); + + var ifn = Path.Combine(dir, string.Format(FileStoreDefaults.IndexScan, Index)); + byte[] buf; + try + { + buf = File.ReadAllBytes(ifn); + } + catch (Exception ex) + { + return ex; + } + + if (Liwsz == 0) + Liwsz = buf.Length; + + if (Aek != null) + { + if (Nonce == null || Nonce.Length == 0) + return new InvalidOperationException("missing block nonce"); + + try + { + buf = Aek.Open(Nonce, buf); + } + catch (Exception ex) + { + return ex; + } + } + + var headerErr = JetStreamFileStore.CheckNewHeader(buf); + if (headerErr != null) + { + TryDeleteFile(ifn); + return new InvalidDataException("bad index file"); + } + + var bi = FileStoreDefaults.HdrLen; + if (!TryReadUVarInt(buf, ref bi, out var msgs) || + !TryReadUVarInt(buf, ref bi, out var bytes) || + !TryReadUVarInt(buf, ref bi, out var firstSeq) || + !TryReadVarInt(buf, ref bi, out var firstTs) || + !TryReadUVarInt(buf, ref bi, out var lastSeq) || + !TryReadVarInt(buf, ref bi, out var lastTs) || + !TryReadUVarInt(buf, ref bi, out var dmapLen)) + { + TryDeleteFile(ifn); + return new InvalidDataException("short index file"); + } + + Msgs = msgs; + Bytes = bytes; + First = new MsgId { Seq = firstSeq, Ts = firstTs }; + Last = new MsgId { Seq = lastSeq, Ts = lastTs }; + + var expected = Last.Seq >= First.Seq ? Last.Seq - First.Seq + 1 : 0; + var activeExpected = expected >= dmapLen ? expected - dmapLen : 0; + if (Msgs != activeExpected) + { + TryDeleteFile(ifn); + return new InvalidDataException("accounting inconsistent"); + } + + if (bi < 0 || bi + FileStoreDefaults.RecordHashSize > buf.Length) + { + TryDeleteFile(ifn); + return new InvalidDataException("short index file"); + } + + Lchk = buf.AsSpan(bi, FileStoreDefaults.RecordHashSize).ToArray(); + bi += FileStoreDefaults.RecordHashSize; + + if (dmapLen > 0) + { + if (buf[1] == FileStoreDefaults.NewVersion) + { + try + { + var (decoded, consumed) = SequenceSet.Decode(buf.AsSpan(bi)); + if (consumed <= 0) + return new InvalidDataException("could not decode dmap"); + + Dmap = decoded; + } + catch (Exception ex) + { + return ex; + } + } + else + { + var fseq = First.Seq; + for (var i = 0UL; i < dmapLen; i++) + { + if (!TryReadUVarInt(buf, ref bi, out var relSeq)) + break; + + Dmap.Insert(relSeq + fseq); + } + } + } + + return null; + } + + // Return all active tombstones in this block. + internal MsgId[] Tombs() + { + Mu.EnterWriteLock(); + try + { + return TombsLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + // Return all active tombstones in this block. + // Write lock should be held. + internal MsgId[] TombsLocked() + { + if (CacheNotLoaded()) + { + var loadErr = LoadMsgsWithLock(); + if (loadErr != null) + return Array.Empty(); + } + + try + { + var cache = CacheData; + if (cache == null || cache.Buf.Length == 0) + return Array.Empty(); + + var tombs = new List(); + var offset = 0; + while (offset < cache.Buf.Length) + { + if (!TryReadRecordHeader(cache.Buf.AsSpan(offset), out var seqRaw, out var ts, out _, out _, out _, out var recordLength, out _)) + break; + + if (recordLength <= 0 || offset + recordLength > cache.Buf.Length) + break; + + if ((seqRaw & FileStoreDefaults.Tbit) != 0) + tombs.Add(new MsgId { Seq = seqRaw & ~FileStoreDefaults.Tbit, Ts = ts }); + + offset += recordLength; + } + + return tombs.ToArray(); + } + finally + { + FinishedWithCache(); + } + } + + // fs lock should be held. + internal int NumPriorTombs() + { + Mu.EnterWriteLock(); + try + { + return NumPriorTombsLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + // Return number of tombstones for messages prior to this block. + // Write lock should be held for block. + internal int NumPriorTombsLocked() + { + if (CacheNotLoaded()) + { + var loadErr = LoadMsgsWithLock(); + if (loadErr != null) + return 0; + } + + try + { + var cache = CacheData; + if (cache == null || cache.Buf.Length == 0) + return 0; + + var fsFirstSeq = Fs?.State().FirstSeq ?? 0; + ulong blockFirst = 0; + var tombs = 0; + var offset = 0; + while (offset < cache.Buf.Length) + { + if (!TryReadRecordHeader(cache.Buf.AsSpan(offset), out var seqRaw, out _, out _, out _, out _, out var recordLength, out _)) + break; + + if (recordLength <= 0 || offset + recordLength > cache.Buf.Length) + break; + + if ((seqRaw & FileStoreDefaults.Tbit) != 0) + { + var tseq = seqRaw & ~FileStoreDefaults.Tbit; + if (tseq >= fsFirstSeq && (blockFirst == 0 || tseq < blockFirst)) + tombs++; + + offset += recordLength; + continue; + } + + var seq = seqRaw & ~FileStoreDefaults.Ebit; + if (seq != 0 && (seqRaw & FileStoreDefaults.Ebit) == 0 && blockFirst == 0) + blockFirst = seq; + + offset += recordLength; + } + + return tombs; + } + finally + { + FinishedWithCache(); + } + } + + // Called by purge to simply get rid of cache and close fds. + internal void DirtyClose() + { + Mu.EnterWriteLock(); + try + { + _ = DirtyCloseWithRemove(remove: false); + } + finally + { + Mu.ExitWriteLock(); + } + } + + // Should be called with write lock held. + internal Exception? DirtyCloseWithRemove(bool remove) + { + if (Ctmr != null) + { + Ctmr.Dispose(); + Ctmr = null; + } + + ClearCacheAndOffset(); + + Qch?.Writer.TryComplete(); + Qch = null; + + Mfd?.Dispose(); + Mfd = null; + + if (!remove) + return null; + + Fss = null; + if (!string.IsNullOrEmpty(Mfn)) + { + try + { + File.Delete(Mfn); + Mfn = string.Empty; + } + catch (Exception ex) + { + return ex; + } + } + + if (!string.IsNullOrEmpty(Kfn)) + { + try + { + File.Delete(Kfn); + } + catch (Exception ex) + { + return ex; + } + } + + return null; + } + + // Remove a sequence from per-subject state and track whether first/last need recompute. + // Lock should be held. + internal ulong RemoveSeqPerSubject(string subj, ulong seq) + { + _ = EnsurePerSubjectInfoLoaded(); + if (Fss == null || string.IsNullOrEmpty(subj)) + return 0; + + var bsubj = Encoding.UTF8.GetBytes(subj); + var (ss, ok) = Fss.Find(bsubj); + if (!ok || ss == null) + return 0; + + if (ss.Msgs == 1) + { + Fss.Delete(bsubj); + return 0; + } + + ss.Msgs--; + + if (ss.Msgs == 1) + { + if (!ss.LastNeedsUpdate && seq != ss.Last) + { + ss.First = ss.Last; + ss.FirstNeedsUpdate = false; + return 1; + } + + if (!ss.FirstNeedsUpdate && seq != ss.First) + { + ss.Last = ss.First; + ss.LastNeedsUpdate = false; + return 1; + } + } + + ss.FirstNeedsUpdate = seq == ss.First || ss.FirstNeedsUpdate; + ss.LastNeedsUpdate = seq == ss.Last || ss.LastNeedsUpdate; + return ss.Msgs; + } + + // Recalculate first/last sequence values for a subject in this block. + // Lock should be held. + internal void RecalculateForSubj(string subj, SimpleState ss) + { + if (string.IsNullOrEmpty(subj) || ss == null) + return; + + var loadedHere = false; + if (CacheNotLoaded()) + { + if (LoadMsgsWithLock() != null) + return; + loadedHere = true; + } + + try + { + var cache = CacheData; + if (cache == null || cache.Idx.Length == 0 || cache.Buf.Length == 0) + return; + + var startSlot = (int)Math.Max(0, (long)ss.First - (long)cache.Fseq); + if (startSlot >= cache.Idx.Length) + { + ss.First = ss.Last; + ss.FirstNeedsUpdate = false; + ss.LastNeedsUpdate = false; + return; + } + + var endSlot = (int)Math.Max(0, (long)ss.Last - (long)cache.Fseq); + if (endSlot >= cache.Idx.Length) + endSlot = cache.Idx.Length - 1; + if (startSlot > endSlot) + return; + + if (ss.FirstNeedsUpdate) + { + ss.FirstNeedsUpdate = false; + var minSeq = ss.First + 1; + if (minSeq < First.Seq) + minSeq = First.Seq; + + var scratch = new StoreMsg(); + for (var slot = startSlot; slot < cache.Idx.Length; slot++) + { + var raw = cache.Idx[slot] & ~FileStoreDefaults.Cbit; + if (raw == FileStoreDefaults.Dbit) + continue; + + var offset = (int)raw; + if ((uint)offset >= (uint)cache.Buf.Length) + { + ss.First = ss.Last; + ss.LastNeedsUpdate = false; + return; + } + + var (sm, err) = MsgFromBufNoCopy(cache.Buf.AsSpan(offset), scratch, verifyChecksum: false); + if (err != null || sm == null || sm.Seq == 0) + continue; + if (sm.Subject != subj || sm.Seq < minSeq || Dmap.Exists(sm.Seq)) + continue; + + ss.First = sm.Seq; + if (ss.Msgs == 1) + { + ss.Last = sm.Seq; + ss.LastNeedsUpdate = false; + return; + } + + startSlot = slot; + break; + } + } + + if (ss.LastNeedsUpdate) + { + ss.LastNeedsUpdate = false; + var maxSeq = ss.Last > 0 ? ss.Last - 1 : 0; + if (maxSeq > Last.Seq) + maxSeq = Last.Seq; + + var scratch = new StoreMsg(); + for (var slot = endSlot; slot >= startSlot; slot--) + { + var raw = cache.Idx[slot] & ~FileStoreDefaults.Cbit; + if (raw == FileStoreDefaults.Dbit) + continue; + + var offset = (int)raw; + if ((uint)offset >= (uint)cache.Buf.Length) + return; + + var (sm, err) = MsgFromBufNoCopy(cache.Buf.AsSpan(offset), scratch, verifyChecksum: false); + if (err != null || sm == null || sm.Seq == 0) + continue; + if (sm.Subject != subj || sm.Seq > maxSeq || Dmap.Exists(sm.Seq)) + continue; + + ss.Last = sm.Seq < ss.First ? ss.First : sm.Seq; + if (ss.Msgs == 1) + { + ss.First = ss.Last; + ss.FirstNeedsUpdate = false; + } + return; + } + } + } + finally + { + if (loadedHere) + FinishedWithCache(); + } + } + + // Lock should be held. + internal Exception? ResetPerSubjectInfo() + { + Fss = null; + return GeneratePerSubjectInfo(); + } + + // generatePerSubjectInfo rebuilds per-subject state from the cached block data. + // Lock should be held. + internal Exception? GeneratePerSubjectInfo() + { + if (Msgs == 0) + return null; + + var loadedHere = false; + if (CacheNotLoaded()) + { + var loadErr = LoadMsgsWithLock(); + if (loadErr != null) + return loadErr; + + if (Fss != null) + return null; + + loadedHere = true; + } + + try + { + Fss = new SubjectTree(); + var scratch = new StoreMsg(); + for (var seq = First.Seq; seq <= Last.Seq; seq++) + { + if (Dmap.Exists(seq)) + { + if (seq == ulong.MaxValue) + break; + continue; + } + + var (sm, err) = CacheLookupNoCopy(seq, scratch); + if (err != null) + { + if (ReferenceEquals(err, StoreErrors.ErrStoreMsgNotFound) || IsDeletedMsgError(err)) + { + if (seq == ulong.MaxValue) + break; + continue; + } + + if (ReferenceEquals(err, ErrNoCache)) + return null; + + return err; + } + + if (sm != null && !string.IsNullOrEmpty(sm.Subject)) + { + var subjBytes = Encoding.UTF8.GetBytes(sm.Subject); + var (state, exists) = Fss.Find(subjBytes); + if (exists && state != null) + { + state.Msgs++; + state.Last = seq; + state.LastNeedsUpdate = false; + } + else + { + Fss.Insert(subjBytes, new SimpleState + { + Msgs = 1, + First = seq, + Last = seq, + }); + } + } + + if (seq == ulong.MaxValue) + break; + } + + if (Fss.Size() > 0) + { + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + Lsts = Llts; + StartCacheExpireTimer(); + } + return null; + } + finally + { + if (loadedHere) + FinishedWithCache(); + } + } + + // Helper to make sure per-subject state is loaded if we are tracking subjects. + // Lock should be held. + internal Exception? EnsurePerSubjectInfoLoaded() + { + if (Fss != null || NoTrack) + { + if (Fss != null) + Lsts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + return null; + } + + if (Msgs == 0) + { + Fss = new SubjectTree(); + return null; + } + + return GeneratePerSubjectInfo(); + } + internal void SpinUpFlushLoop() { Mu.EnterWriteLock(); @@ -2195,14 +2929,29 @@ internal sealed class MessageBlock } } - private TimeSpan SinceLastActivity() + internal TimeSpan SinceLastActivity() { + if (Closed) + return TimeSpan.Zero; + var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); var last = Math.Max(Math.Max(Lrts, Llts), Math.Max(Lwts, Lsts)); var delta = now - last; return NanosecondsToTimeSpan(delta); } + // Determine time since last write or remove of a message. + // Read lock should be held. + internal TimeSpan SinceLastWriteActivity() + { + if (Closed) + return TimeSpan.Zero; + + var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + var last = Math.Max(Lwts, Lrts); + return NanosecondsToTimeSpan(now - last); + } + private static long TimeSpanToNanoseconds(TimeSpan value) => value <= TimeSpan.Zero ? 0 : checked(value.Ticks * 100L); @@ -2360,6 +3109,74 @@ internal sealed class MessageBlock return true; } + private static bool TryReadUVarInt(ReadOnlySpan source, ref int index, out ulong value) + { + value = 0; + var shift = 0; + for (var i = 0; i < MaxVarIntLength; i++) + { + if ((uint)index >= (uint)source.Length) + { + index = -1; + value = 0; + return false; + } + + var b = source[index++]; + if (b < 0x80) + { + if (i == MaxVarIntLength - 1 && b > 1) + { + index = -1; + value = 0; + return false; + } + + value |= (ulong)b << shift; + return true; + } + + value |= (ulong)(b & 0x7F) << shift; + shift += 7; + } + + index = -1; + value = 0; + return false; + } + + private static bool TryReadVarInt(ReadOnlySpan source, ref int index, out long value) + { + if (!TryReadUVarInt(source, ref index, out var unsigned)) + { + value = 0; + return false; + } + + value = (long)(unsigned >> 1); + if ((unsigned & 1) != 0) + value = ~value; + + return true; + } + + private static bool IsDeletedMsgError(Exception? ex) + => ex is InvalidOperationException ioe && + string.Equals(ioe.Message, ErrDeletedMsg.Message, StringComparison.Ordinal); + + private static void TryDeleteFile(string path) + { + try + { + if (File.Exists(path)) + File.Delete(path); + } + catch + { + // best effort + } + } + private static bool TryValidateRecordBuffer(ReadOnlySpan buf, out int validLength) { validLength = 0; diff --git a/porting.db b/porting.db index 0104dd8..9cfc20a 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index bf9aaeb..4d75a1a 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 23:03:57 UTC +Generated: 2026-02-28 23:12:57 UTC ## Modules (12 total) @@ -13,10 +13,10 @@ Generated: 2026-02-28 23:03:57 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1738 | +| deferred | 1718 | | n_a | 24 | | stub | 1 | -| verified | 1888 | +| verified | 1908 | ## Unit Tests (3257 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 23:03:57 UTC ## Overall Progress -**3533/6942 items complete (50.9%)** +**3553/6942 items complete (51.2%)**