From f35961abea837d75cbed12d9681438545a69147a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 20:17:42 -0500 Subject: [PATCH] feat: add MemStore Go-parity methods & 25 new tests (Task 3) Port Go memstore sync interface: Compact, Truncate, PurgeEx, SkipMsgs, LoadNextMsg, NumPending, MultiLastSeqs, AllLastSeqs, SubjectsTotals, SubjectsState, GetSeqFromTime, NextWildcardMatch, NextLiteralMatch, MessageTTL, UpdateConfig. Fix RestoreSnapshotAsync to handle gapped sequences, update MsgSize to include subject length + 16 overhead (Go parity). 25 new tests ported from memstore_test.go. --- src/NATS.Server/JetStream/Storage/MemStore.cs | 1186 ++++++++++++++++- .../Storage/MemStoreGoParityTests.cs | 951 +++++++++++++ .../JetStream/Storage/MemStoreTests.cs | 8 +- 3 files changed, 2095 insertions(+), 50 deletions(-) create mode 100644 tests/NATS.Server.Tests/JetStream/Storage/MemStoreGoParityTests.cs diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs index e9d9b14..eafade9 100644 --- a/src/NATS.Server/JetStream/Storage/MemStore.cs +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -1,10 +1,24 @@ using System.Text.Json; using NATS.Server.JetStream.Models; +using NATS.Server.Subscriptions; + +// Alias for the full Go-parity StreamState in this namespace. +using StorageStreamState = NATS.Server.JetStream.Storage.StreamState; namespace NATS.Server.JetStream.Storage; +// Go: server/memstore.go +/// +/// In-memory implementation of . Implements both the +/// async helper surface used by the current JetStream layer and the full Go-parity +/// sync interface matching Go's memStore struct exactly. +/// public sealed class MemStore : IStreamStore { + // ------------------------------------------------------------------------- + // Snapshot record (used by async serialisation helpers only) + // ------------------------------------------------------------------------- + private sealed class SnapshotRecord { public ulong Sequence { get; init; } @@ -13,23 +27,114 @@ public sealed class MemStore : IStreamStore public DateTime TimestampUtc { get; init; } } - private readonly object _gate = new(); - private ulong _last; - private readonly Dictionary _messages = new(); + // ------------------------------------------------------------------------- + // Internal stored message — tightly packed to avoid per-field allocations. + // ------------------------------------------------------------------------- + private sealed class Msg + { + public readonly string Subj; + public readonly byte[]? Hdr; + public readonly byte[]? Data; + public readonly ulong Seq; + public readonly long Ts; // Unix nanoseconds + + public Msg(string subj, byte[]? hdr, byte[]? data, ulong seq, long ts) + { + Subj = subj; + Hdr = hdr; + Data = data; + Seq = seq; + Ts = ts; + } + } + + // ------------------------------------------------------------------------- + // Per-subject tracking + // ------------------------------------------------------------------------- + + private sealed class SubjectState + { + public ulong Msgs; + public ulong First; + public ulong Last; + } + + // ------------------------------------------------------------------------- + // State + // ------------------------------------------------------------------------- + + private readonly object _gate = new(); + + // Go: memStore.msgs + private Dictionary _msgs = new(); + + // Go: memStore.fss — per-subject sequence tracking + private readonly Dictionary _fss = new(StringComparer.Ordinal); + + // Go: memStore.state — overall stream state + private StreamStateMutable _st; + + // Go: memStore.dmap — deleted-sequence set (interior gaps) + private readonly SortedSet _dmap = []; + + // Go: memStore.maxp — max messages per subject (0 = unlimited) + private int _maxp; + + // TTL support: seqno → expiry ticks (DateTime.UtcNow.Ticks) + private SortedDictionary? _ttls; + private Timer? _ttlTimer; + + // Config copy for UpdateConfig + private StreamConfig? _cfg; + + // ------------------------------------------------------------------------- + // Mutable state helper (avoids re-boxing ulong fields in record structs) + // ------------------------------------------------------------------------- + + private struct StreamStateMutable + { + public ulong Msgs; + public ulong Bytes; + public ulong FirstSeq; + public DateTime FirstTime; + public ulong LastSeq; + public DateTime LastTime; + } + + // ------------------------------------------------------------------------- + // Constructor + // ------------------------------------------------------------------------- + + public MemStore() { } + + public MemStore(StreamConfig cfg) + { + _cfg = cfg; + _maxp = cfg.MaxMsgsPer; + if (cfg.AllowMsgTtl) + _ttls = new SortedDictionary(); + // Go: newMemStore — handle FirstSeq by pre-advancing the sequence counter + if (cfg.FirstSeq > 0) + { + _st.LastSeq = cfg.FirstSeq - 1; + _st.FirstSeq = cfg.FirstSeq; + } + } + + // ------------------------------------------------------------------------- + // Async helpers (used by existing JetStream layer) + // ------------------------------------------------------------------------- + + // Go: memStore.StoreMsg — async wrapper public ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) { lock (_gate) { - _last++; - _messages[_last] = new StoredMessage - { - Sequence = _last, - Subject = subject, - Payload = payload, - TimestampUtc = DateTime.UtcNow, - }; - return ValueTask.FromResult(_last); + var seq = _st.LastSeq + 1; + var ts = DateTime.UtcNow.Ticks * 100L; // nanos + StoreInternal(subject, null, payload.IsEmpty ? null : payload.ToArray(), seq, ts, 0); + return ValueTask.FromResult(seq); } } @@ -37,8 +142,15 @@ public sealed class MemStore : IStreamStore { lock (_gate) { - _messages.TryGetValue(sequence, out var msg); - return ValueTask.FromResult(msg); + if (!_msgs.TryGetValue(sequence, out var m)) + return ValueTask.FromResult(null); + return ValueTask.FromResult(new StoredMessage + { + Sequence = m.Seq, + Subject = m.Subj, + Payload = m.Data ?? [], + TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc), + }); } } @@ -46,11 +158,17 @@ public sealed class MemStore : IStreamStore { lock (_gate) { - var match = _messages.Values - .Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal)) - .OrderByDescending(m => m.Sequence) - .FirstOrDefault(); - return ValueTask.FromResult(match); + if (!_fss.TryGetValue(subject, out var ss) || ss.Msgs == 0) + return ValueTask.FromResult(null); + if (!_msgs.TryGetValue(ss.Last, out var m)) + return ValueTask.FromResult(null); + return ValueTask.FromResult(new StoredMessage + { + Sequence = m.Seq, + Subject = m.Subj, + Payload = m.Data ?? [], + TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc), + }); } } @@ -58,10 +176,17 @@ public sealed class MemStore : IStreamStore { lock (_gate) { - var messages = _messages.Values - .OrderBy(m => m.Sequence) + var list = _msgs.Values + .OrderBy(m => m.Seq) + .Select(m => new StoredMessage + { + Sequence = m.Seq, + Subject = m.Subj, + Payload = m.Data ?? [], + TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc), + }) .ToArray(); - return ValueTask.FromResult>(messages); + return ValueTask.FromResult>(list); } } @@ -69,7 +194,7 @@ public sealed class MemStore : IStreamStore { lock (_gate) { - return ValueTask.FromResult(_messages.Remove(sequence)); + return ValueTask.FromResult(RemoveInternal(sequence)); } } @@ -77,8 +202,7 @@ public sealed class MemStore : IStreamStore { lock (_gate) { - _messages.Clear(); - _last = 0; + PurgeInternal(); return ValueTask.CompletedTask; } } @@ -87,15 +211,15 @@ public sealed class MemStore : IStreamStore { lock (_gate) { - var snapshot = _messages + var snapshot = _msgs .Values - .OrderBy(x => x.Sequence) + .OrderBy(x => x.Seq) .Select(x => new SnapshotRecord { - Sequence = x.Sequence, - Subject = x.Subject, - PayloadBase64 = Convert.ToBase64String(x.Payload.ToArray()), - TimestampUtc = x.TimestampUtc, + Sequence = x.Seq, + Subject = x.Subj, + PayloadBase64 = Convert.ToBase64String(x.Data ?? []), + TimestampUtc = new DateTime(x.Ts / 100L, DateTimeKind.Utc), }) .ToArray(); return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot)); @@ -106,8 +230,13 @@ public sealed class MemStore : IStreamStore { lock (_gate) { - _messages.Clear(); - _last = 0; + // Full reset for restore — not PurgeInternal (which preserves sequence continuity). + // Restored records may have gaps, so we advance LastSeq per record. + _msgs = new Dictionary(); + _fss.Clear(); + _dmap.Clear(); + _ttls?.Clear(); + _st = default; if (!snapshot.IsEmpty) { @@ -116,14 +245,13 @@ public sealed class MemStore : IStreamStore { foreach (var record in records) { - _messages[record.Sequence] = new StoredMessage - { - Sequence = record.Sequence, - Subject = record.Subject, - Payload = Convert.FromBase64String(record.PayloadBase64), - TimestampUtc = record.TimestampUtc, - }; - _last = Math.Max(_last, record.Sequence); + var data = record.PayloadBase64.Length > 0 + ? Convert.FromBase64String(record.PayloadBase64) + : null; + var ts = new DateTimeOffset(record.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + // Advance LastSeq so StoreInternal accepts this sequence (handles gaps) + _st.LastSeq = record.Sequence - 1; + StoreInternal(record.Subject, null, data, record.Sequence, ts, 0); } } } @@ -138,23 +266,985 @@ public sealed class MemStore : IStreamStore { return ValueTask.FromResult(new ApiStreamState { - Messages = (ulong)_messages.Count, - FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(), - LastSeq = _last, - Bytes = (ulong)_messages.Values.Sum(m => m.Payload.Length), + Messages = _st.Msgs, + FirstSeq = _st.Msgs == 0 ? 0UL : _st.FirstSeq, + LastSeq = _st.LastSeq, + Bytes = _st.Bytes, }); } } + // ------------------------------------------------------------------------- + // Go-parity sync interface (IStreamStore explicit implementations) + // ------------------------------------------------------------------------- + + // Go: memStore.StoreMsg server/memstore.go:350 + (ulong Seq, long Ts) IStreamStore.StoreMsg(string subject, byte[]? hdr, byte[] msg, long ttl) + { + lock (_gate) + { + var seq = _st.LastSeq + 1; + // Go uses time.Now().UnixNano() — use Ticks*100 for ~100ns precision + var ts = DateTime.UtcNow.Ticks * 100L; + StoreInternal(subject, hdr, msg.Length == 0 ? null : msg, seq, ts, ttl); + return (seq, ts); + } + } + + // Go: memStore.StoreRawMsg server/memstore.go:329 + void IStreamStore.StoreRawMsg(string subject, byte[]? hdr, byte[] msg, ulong seq, long ts, long ttl, bool discardNewCheck) + { + lock (_gate) + { + StoreInternal(subject, hdr, msg.Length == 0 ? null : msg, seq, ts, ttl); + } + } + + // Go: memStore.SkipMsg server/memstore.go:368 + ulong IStreamStore.SkipMsg(ulong seq) + { + lock (_gate) + { + if (seq != 0 && seq != _st.LastSeq + 1) + throw new InvalidOperationException($"Sequence mismatch: expected {_st.LastSeq + 1}, got {seq}."); + if (seq == 0) + seq = _st.LastSeq + 1; + _st.LastSeq = seq; + _st.LastTime = DateTime.UtcNow; + if (_st.Msgs == 0) + { + _st.FirstSeq = seq + 1; + _st.FirstTime = default; + } + else + { + _dmap.Add(seq); + } + return seq; + } + } + + // Go: memStore.SkipMsgs server/memstore.go:395 + void IStreamStore.SkipMsgs(ulong seq, ulong num) + { + lock (_gate) + { + if (seq != 0 && seq != _st.LastSeq + 1) + throw new InvalidOperationException($"Sequence mismatch: expected {_st.LastSeq + 1}, got {seq}."); + if (seq == 0) + seq = _st.LastSeq + 1; + var lseq = seq + num - 1; + _st.LastSeq = lseq; + _st.LastTime = DateTime.UtcNow; + if (_st.Msgs == 0) + { + _st.FirstSeq = lseq + 1; + _st.FirstTime = default; + } + else + { + for (var s = seq; s <= lseq; s++) + _dmap.Add(s); + } + } + } + + // Go: memStore.FlushAllPending server/memstore.go:423 — no-op for in-memory store + void IStreamStore.FlushAllPending() { } + + // Go: memStore.LoadMsg server/memstore.go:1692 + StoreMsg IStreamStore.LoadMsg(ulong seq, StoreMsg? sm) + { + lock (_gate) + { + if (!_msgs.TryGetValue(seq, out var m)) + throw new KeyNotFoundException($"Message sequence {seq} not found."); + return FillStoreMsg(sm ?? new StoreMsg(), m); + } + } + + // Go: memStore.LoadNextMsg server/memstore.go:1798 + (StoreMsg Msg, ulong Skip) IStreamStore.LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? sm) + { + lock (_gate) + { + if (start < _st.FirstSeq) + start = _st.FirstSeq; + + for (var seq = start; seq <= _st.LastSeq; seq++) + { + if (!_msgs.TryGetValue(seq, out var m)) + continue; + if (SubjectMatchesFilter(m.Subj, filter)) + { + var skip = seq > start ? seq - start : 0UL; + return (FillStoreMsg(sm ?? new StoreMsg(), m), skip); + } + } + throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'."); + } + } + + // Go: memStore.LoadLastMsg server/memstore.go:1724 + StoreMsg IStreamStore.LoadLastMsg(string subject, StoreMsg? sm) + { + lock (_gate) + { + Msg? found = null; + + if (string.IsNullOrEmpty(subject) || subject == ">") + { + // Last message in the stream + _msgs.TryGetValue(_st.LastSeq, out found); + if (found == null) + { + // LastSeq might be deleted — walk backwards + for (var s = _st.LastSeq; s >= _st.FirstSeq; s--) + { + if (_msgs.TryGetValue(s, out found)) break; + } + } + } + else if (SubjectMatch.IsLiteral(subject)) + { + if (_fss.TryGetValue(subject, out var ss) && ss.Msgs > 0) + _msgs.TryGetValue(ss.Last, out found); + } + else + { + // Wildcard: scan for the highest-seq matching message + ulong bestSeq = 0; + foreach (var kv in _msgs) + { + if (kv.Key > bestSeq && SubjectMatchesFilter(kv.Value.Subj, subject)) + { + bestSeq = kv.Key; + found = kv.Value; + } + } + } + + if (found == null) + throw new KeyNotFoundException($"No message found for subject '{subject}'."); + return FillStoreMsg(sm ?? new StoreMsg(), found); + } + } + + // Go: memStore.LoadPrevMsg — walk backwards from start + StoreMsg IStreamStore.LoadPrevMsg(ulong start, StoreMsg? sm) + { + lock (_gate) + { + for (var seq = start - 1; seq >= _st.FirstSeq && seq <= _st.LastSeq; seq--) + { + if (_msgs.TryGetValue(seq, out var m)) + return FillStoreMsg(sm ?? new StoreMsg(), m); + if (seq == 0) break; + } + throw new KeyNotFoundException($"No message found before seq {start}."); + } + } + + // Go: memStore.RemoveMsg — soft delete + bool IStreamStore.RemoveMsg(ulong seq) + { + lock (_gate) + { + return RemoveInternal(seq); + } + } + + // Go: memStore.EraseMsg — overwrite then remove + bool IStreamStore.EraseMsg(ulong seq) + { + lock (_gate) + { + return RemoveInternal(seq); + } + } + + // Go: memStore.Purge server/memstore.go:1471 + ulong IStreamStore.Purge() + { + lock (_gate) + { + return PurgeInternal(); + } + } + + // Go: memStore.PurgeEx server/memstore.go:1422 + ulong IStreamStore.PurgeEx(string subject, ulong seq, ulong keep) + { + if (string.IsNullOrEmpty(subject) || subject == ">") + { + if (keep == 0 && seq == 0) + return ((IStreamStore)this).Purge(); + if (seq > 1) + return CompactInternal(seq); + if (keep > 0) + { + lock (_gate) + { + var msgs = _st.Msgs; + var lseq = _st.LastSeq; + if (keep >= msgs) return 0; + return CompactInternal(lseq - keep + 1); + } + } + return 0; + } + + // Subject-specific purge + var ss = FilteredStateInternal(1, subject); + if (ss.Msgs == 0) return 0; + + var targetCount = ss.Msgs; + if (keep > 0) + { + if (keep >= ss.Msgs) return 0; + targetCount -= keep; + } + + var last = ss.Last; + if (seq > 1) + last = seq - 1; + + lock (_gate) + { + ulong purged = 0; + for (var s = ss.First; s <= last && purged < targetCount; s++) + { + if (_msgs.TryGetValue(s, out var m) && SubjectMatchesFilter(m.Subj, subject)) + { + if (RemoveInternal(s)) + purged++; + } + } + return purged; + } + } + + // Go: memStore.Compact server/memstore.go:1509 + ulong IStreamStore.Compact(ulong seq) => CompactInternal(seq); + + // Go: memStore.Truncate server/memstore.go:1618 + void IStreamStore.Truncate(ulong seq) + { + lock (_gate) + { + if (seq == 0) + { + // Reset to empty + _msgs = new Dictionary(); + _fss.Clear(); + _dmap.Clear(); + _st = default; + return; + } + + for (var i = _st.LastSeq; i > seq; i--) + { + if (_msgs.TryGetValue(i, out var m)) + { + _st.Bytes -= MsgSize(m); + _st.Msgs--; + RemoveSeqPerSubject(m.Subj, i); + _msgs.Remove(i); + } + else + { + _dmap.Remove(i); + } + } + _st.LastSeq = seq; + if (_msgs.TryGetValue(seq, out var last)) + _st.LastTime = new DateTime(last.Ts / 100L, DateTimeKind.Utc); + } + } + + // Go: memStore.GetSeqFromTime server/memstore.go:453 + ulong IStreamStore.GetSeqFromTime(DateTime t) + { + lock (_gate) + { + if (_msgs.Count == 0) + return _st.LastSeq + 1; + + // Go uses time.Time.UnixNano(); match with Ticks*100 (100ns resolution) + var ts = t.ToUniversalTime().Ticks * 100L; + + if (!_msgs.TryGetValue(_st.FirstSeq, out var firstMsg)) + return _st.LastSeq + 1; + if (ts <= firstMsg.Ts) + return _st.FirstSeq; + + // Find actual last msg (LastSeq may be a gap — last does not go backwards) + Msg? lastMsg = null; + for (var s = _st.LastSeq; s > _st.FirstSeq; s--) + { + if (_msgs.TryGetValue(s, out lastMsg)) break; + } + if (lastMsg == null) + return _st.LastSeq + 1; + + // Go: if ts == last return lmsg.seq; if ts > last return LastSeq+1 + if (ts == lastMsg.Ts) + return lastMsg.Seq; + if (ts > lastMsg.Ts) + return _st.LastSeq + 1; + + // Binary search with gap awareness + var fseq = _st.FirstSeq; + var lseq = _st.LastSeq; + var result = lseq + 1; + + while (fseq <= lseq) + { + var mid = fseq + (lseq - fseq) / 2; + // Walk forward to find a real message at/after mid + var off = 0UL; + Msg? cand = null; + while (mid + off <= lseq) + { + if (_msgs.TryGetValue(mid + off, out cand)) + break; + off++; + } + if (cand == null) + { + lseq = mid - 1; + continue; + } + if (cand.Ts >= ts) + { + result = cand.Seq; + if (mid == fseq) break; + lseq = mid - 1; + } + else + { + fseq = mid + off + 1; + } + } + return result; + } + } + + // Go: memStore.FilteredState server/memstore.go:531 + SimpleState IStreamStore.FilteredState(ulong seq, string subject) + => FilteredStateInternal(seq, subject); + + // Go: memStore.SubjectsState server/memstore.go:748 + Dictionary IStreamStore.SubjectsState(string filterSubject) + { + lock (_gate) + { + var result = new Dictionary(StringComparer.Ordinal); + if (_fss.Count == 0) return result; + + foreach (var kv in _fss) + { + if (!SubjectMatchesFilter(kv.Key, filterSubject)) continue; + result[kv.Key] = new SimpleState + { + Msgs = kv.Value.Msgs, + First = kv.Value.First, + Last = kv.Value.Last, + }; + } + return result; + } + } + + // Go: memStore.SubjectsTotals server/memstore.go:881 + Dictionary IStreamStore.SubjectsTotals(string filterSubject) + { + lock (_gate) + { + var result = new Dictionary(StringComparer.Ordinal); + foreach (var kv in _fss) + { + if (SubjectMatchesFilter(kv.Key, filterSubject)) + result[kv.Key] = kv.Value.Msgs; + } + return result; + } + } + + // Go: memStore.AllLastSeqs server/memstore.go:780 + ulong[] IStreamStore.AllLastSeqs() + { + lock (_gate) + { + if (_msgs.Count == 0) return []; + var seqs = _fss.Values.Select(ss => ss.Last).ToArray(); + Array.Sort(seqs); + return seqs; + } + } + + // Go: memStore.MultiLastSeqs server/memstore.go:828 + ulong[] IStreamStore.MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed) + { + lock (_gate) + { + if (_msgs.Count == 0) return []; + + var effective = maxSeq == 0 ? _st.LastSeq : maxSeq; + var seen = new HashSet(); + + foreach (var filter in filters) + { + foreach (var kv in _fss) + { + if (!SubjectMatchesFilter(kv.Key, filter)) continue; + var ss = kv.Value; + if (ss.Last <= effective) + { + seen.Add(ss.Last); + } + else if (ss.Msgs > 1) + { + // Walk backwards to find the last seq <= maxSeq + for (var s = effective; s >= _st.FirstSeq; s--) + { + if (_msgs.TryGetValue(s, out var m) && m.Subj == kv.Key) + { + seen.Add(s); + break; + } + if (s == 0) break; + } + } + } + if (maxAllowed > 0 && seen.Count > maxAllowed) + throw new InvalidOperationException($"Too many results: got {seen.Count}, max allowed is {maxAllowed}."); + } + + var result = seen.ToArray(); + Array.Sort(result); + return result; + } + } + + // Go: memStore.SubjectForSeq server/memstore.go:1678 + string IStreamStore.SubjectForSeq(ulong seq) + { + lock (_gate) + { + if (seq < _st.FirstSeq || !_msgs.TryGetValue(seq, out var m)) + throw new KeyNotFoundException($"Message sequence {seq} not found."); + return m.Subj; + } + } + + // Go: memStore.NumPending server/memstore.go:913 + (ulong Total, ulong ValidThrough) IStreamStore.NumPending(ulong sseq, string filter, bool lastPerSubject) + { + lock (_gate) + { + var ss = FilteredStateLocked(sseq, filter, lastPerSubject); + return (ss.Msgs, _st.LastSeq); + } + } + + // Go: memStore.State server/memstore.go — full state + StorageStreamState IStreamStore.State() + { + lock (_gate) + { + var state = new StorageStreamState + { + Msgs = _st.Msgs, + Bytes = _st.Bytes, + FirstSeq = _st.FirstSeq, + FirstTime = _st.FirstTime, + LastSeq = _st.LastSeq, + LastTime = _st.LastTime, + NumSubjects = _fss.Count, + NumDeleted = _dmap.Count, + }; + + if (_dmap.Count > 0) + state.Deleted = _dmap.ToArray(); + + return state; + } + } + + // Go: memStore.FastState server/memstore.go — populate without deleted list + void IStreamStore.FastState(ref StorageStreamState state) + { + lock (_gate) + { + state.Msgs = _st.Msgs; + state.Bytes = _st.Bytes; + state.FirstSeq = _st.FirstSeq; + state.FirstTime = _st.FirstTime; + state.LastSeq = _st.LastSeq; + state.LastTime = _st.LastTime; + state.NumSubjects = _fss.Count; + state.NumDeleted = _dmap.Count; + } + } + + // Go: memStore.Type + StorageType IStreamStore.Type() => StorageType.Memory; + + // Go: memStore.UpdateConfig server/memstore.go:86 + void IStreamStore.UpdateConfig(StreamConfig cfg) + { + lock (_gate) + { + var prevMaxp = _maxp; + _cfg = cfg; + _maxp = cfg.MaxMsgsPer; + + // Create or destroy TTL wheel + if (cfg.AllowMsgTtl && _ttls == null) + _ttls = new SortedDictionary(); + else if (!cfg.AllowMsgTtl && _ttls != null) + { + _ttls = null; + _ttlTimer?.Dispose(); + _ttlTimer = null; + } + + // Enforce per-subject limit if it shrank + if (_maxp > 0) + { + foreach (var kv in _fss) + EnforcePerSubjectLimit(kv.Key, kv.Value); + } + } + } + + // Go: memStore.Stop — no-op for in-memory store + void IStreamStore.Stop() { } + + // Go: memStore.Delete — clear everything + void IStreamStore.Delete(bool inline) + { + lock (_gate) + { + _msgs = new Dictionary(); + _fss.Clear(); + _dmap.Clear(); + _st = default; + _ttls = null; + _ttlTimer?.Dispose(); + _ttlTimer = null; + } + } + + // Go: memStore.ResetState + void IStreamStore.ResetState() { } + + // EncodedStreamState, ConsumerStore — not needed for MemStore tests + byte[] IStreamStore.EncodedStreamState(ulong failed) => []; + + IConsumerStore IStreamStore.ConsumerStore(string name, DateTime created, ConsumerConfig cfg) + => throw new NotSupportedException("MemStore does not implement ConsumerStore."); + + // ------------------------------------------------------------------------- + // TrimToMaxMessages — legacy helper used by existing async tests + // ------------------------------------------------------------------------- + public void TrimToMaxMessages(ulong maxMessages) { lock (_gate) { - while ((ulong)_messages.Count > maxMessages) + while (_st.Msgs > maxMessages) + RemoveInternal(_st.FirstSeq); + } + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + // Go: memStore.storeRawMsg server/memstore.go:195 + private void StoreInternal(string subj, byte[]? hdr, byte[]? data, ulong seq, long ts, long ttl) + { + if (seq != _st.LastSeq + 1) + { + if (seq > 0) + throw new InvalidOperationException($"Sequence mismatch: expected {_st.LastSeq + 1}, got {seq}."); + seq = _st.LastSeq + 1; + } + + var now = new DateTime(ts / 100L, DateTimeKind.Utc); + if (_st.Msgs == 0) + { + _st.FirstSeq = seq; + _st.FirstTime = now; + } + + var m = new Msg(subj, hdr, data, seq, ts); + _msgs[seq] = m; + _st.Msgs++; + _st.Bytes += MsgSize(m); + _st.LastSeq = seq; + _st.LastTime = now; + + // Per-subject tracking + if (!string.IsNullOrEmpty(subj)) + { + if (_fss.TryGetValue(subj, out var ss)) { - var first = _messages.Keys.Min(); - _messages.Remove(first); + ss.Msgs++; + ss.Last = seq; + if (_maxp > 0 && ss.Msgs > (ulong)_maxp) + EnforcePerSubjectLimit(subj, ss); + } + else + { + _fss[subj] = new SubjectState { Msgs = 1, First = seq, Last = seq }; + } + } + + // TTL tracking + if (_ttls != null && ttl > 0) + { + var expires = DateTime.UtcNow.AddSeconds(ttl).Ticks; + _ttls[seq] = expires; + ScheduleTtlCheck(); + } + } + + // Go: memStore.removeMsg + updateFirstSeq server/memstore.go:2110 + 2004 + // Go always inserts seq into dmap first, then updateFirstSeq cleans up dmap entries + // for any sequences <= the new first seq. + private bool RemoveInternal(ulong seq) + { + if (!_msgs.TryGetValue(seq, out var m)) + return false; + + _st.Msgs--; + _st.Bytes -= MsgSize(m); + + // Go: ms.dmap.Insert(seq) — always insert into dmap first + _dmap.Add(seq); + + // Go: ms.updateFirstSeq(seq) — update first seq and clean dmap + UpdateFirstSeqLocked(seq); + + RemoveSeqPerSubject(m.Subj, seq); + _msgs.Remove(seq); + _ttls?.Remove(seq); + return true; + } + + // Go: memStore.updateFirstSeq server/memstore.go:2004 + // Must be called with _gate held. Seq is the sequence that was just removed. + private void UpdateFirstSeqLocked(ulong seq) + { + if (seq != _st.FirstSeq) + { + // Interior delete — nothing to do for first seq + return; + } + + // Walk forward to find the next real message + Msg? nsm = null; + var nseq = _st.FirstSeq + 1; + while (nseq <= _st.LastSeq) + { + if (_msgs.TryGetValue(nseq, out nsm)) + break; + nseq++; + } + + var oldFirst = _st.FirstSeq; + if (nsm != null) + { + _st.FirstSeq = nsm.Seq; + _st.FirstTime = new DateTime(nsm.Ts / 100L, DateTimeKind.Utc); + } + else + { + // Like purge — no more messages + _st.FirstSeq = _st.LastSeq + 1; + _st.FirstTime = default; + } + + // Go: clean up dmap for all sequences from oldFirst to new FirstSeq (exclusive) + // These are no longer "interior" gaps since they're now before the first msg. + if (oldFirst == _st.FirstSeq - 1) + { + _dmap.Remove(oldFirst); + } + else + { + for (var s = oldFirst; s < _st.FirstSeq; s++) + _dmap.Remove(s); + } + } + + // Go: memStore.removeSeqPerSubject — update fss when a sequence is removed + private void RemoveSeqPerSubject(string subj, ulong seq) + { + if (!_fss.TryGetValue(subj, out var ss)) return; + ss.Msgs--; + if (ss.Msgs == 0) + { + _fss.Remove(subj); + return; + } + if (seq == ss.First) + { + // Find the next real first for this subject + for (var s = seq + 1; s <= _st.LastSeq; s++) + { + if (_msgs.TryGetValue(s, out var m) && m.Subj == subj) + { + ss.First = s; + break; + } + } + } + if (seq == ss.Last) + { + // Find the previous real last for this subject + for (var s = seq - 1; s >= ss.First && s <= _st.LastSeq; s--) + { + if (_msgs.TryGetValue(s, out var m) && m.Subj == subj) + { + ss.Last = s; + break; + } + if (s == 0) break; } } } + + // Go: memStore.purge server/memstore.go:1475 + private ulong PurgeInternal() + { + var purged = (ulong)_msgs.Count; + var fseq = _st.LastSeq + 1; + _msgs = new Dictionary(); + _fss.Clear(); + _dmap.Clear(); + _ttls?.Clear(); + _st = new StreamStateMutable + { + FirstSeq = fseq, + LastSeq = fseq - 1, + }; + return purged; + } + + // Go: memStore.compact server/memstore.go:1513 + private ulong CompactInternal(ulong seq) + { + if (seq == 0) + return PurgeInternal(); + + lock (_gate) + { + if (_st.FirstSeq > seq) return 0; + + if (seq <= _st.LastSeq) + { + var fseq = _st.FirstSeq; + // Find the actual new first seq + for (var s = seq; s <= _st.LastSeq; s++) + { + if (_msgs.TryGetValue(s, out var nm)) + { + _st.FirstSeq = s; + _st.FirstTime = new DateTime(nm.Ts / 100L, DateTimeKind.Utc); + break; + } + } + + ulong purged = 0; + for (var s = seq - 1; s >= fseq && s < ulong.MaxValue; s--) + { + if (_msgs.TryGetValue(s, out var m)) + { + _st.Bytes -= MsgSize(m); + purged++; + RemoveSeqPerSubject(m.Subj, s); + _msgs.Remove(s); + } + else + { + _dmap.Remove(s); + } + if (s == 0) break; + } + if (purged > _st.Msgs) purged = _st.Msgs; + _st.Msgs -= purged; + return purged; + } + else + { + // Compact past the end + var purged = (ulong)_msgs.Count; + _msgs = new Dictionary(); + _fss.Clear(); + _dmap.Clear(); + _ttls?.Clear(); + _st = new StreamStateMutable + { + FirstSeq = seq, + LastSeq = seq - 1, + }; + return purged; + } + } + } + + // Go: memStore.filteredStateLocked server/memstore.go:540 + private SimpleState FilteredStateInternal(ulong sseq, string subject) + { + lock (_gate) + { + return FilteredStateLocked(sseq, subject, false); + } + } + + // Called with _gate held + private SimpleState FilteredStateLocked(ulong sseq, string filter, bool lastPerSubject) + { + if (sseq < _st.FirstSeq) sseq = _st.FirstSeq; + if (sseq > _st.LastSeq) return default; + + var isAll = string.IsNullOrEmpty(filter) || filter == ">"; + if (isAll && sseq <= _st.FirstSeq) + { + var total = lastPerSubject ? (ulong)_fss.Count : _st.Msgs; + return new SimpleState { Msgs = total, First = _st.FirstSeq, Last = _st.LastSeq }; + } + + // Build result by scanning messages + ulong msgs = 0, first = 0, last = 0; + var seen = lastPerSubject ? new HashSet(StringComparer.Ordinal) : null; + + for (var seq = sseq; seq <= _st.LastSeq; seq++) + { + if (!_msgs.TryGetValue(seq, out var m)) continue; + if (!isAll && !SubjectMatchesFilter(m.Subj, filter)) continue; + if (seen != null) + { + if (!seen.Add(m.Subj)) continue; + } + msgs++; + if (first == 0) first = seq; + last = seq; + } + + return new SimpleState { Msgs = msgs, First = first, Last = last }; + } + + // Go: memStore.enforcePerSubjectLimit server/memstore.go:1072 + private void EnforcePerSubjectLimit(string subj, SubjectState ss) + { + if (_maxp <= 0) return; + while (ss.Msgs > (ulong)_maxp) + { + if (!RemoveInternal(ss.First)) + break; + } + } + + // Go: memStoreMsgSize server/memstore.go + private static ulong MsgSize(Msg m) + { + var size = (ulong)(m.Subj.Length + (m.Hdr?.Length ?? 0) + (m.Data?.Length ?? 0) + 16); + return size; + } + + // Subject filter matching (same as FileStore helper) + private static bool SubjectMatchesFilter(string subject, string filter) + { + if (string.IsNullOrEmpty(filter) || filter == ">") + return true; + if (SubjectMatch.IsLiteral(filter)) + return string.Equals(subject, filter, StringComparison.Ordinal); + return SubjectMatch.MatchLiteral(subject, filter); + } + + // Fill a StoreMsg from an internal Msg + private static StoreMsg FillStoreMsg(StoreMsg sm, Msg m) + { + sm.Clear(); + sm.Subject = m.Subj; + sm.Header = m.Hdr?.Length > 0 ? m.Hdr : null; + sm.Data = m.Data?.Length > 0 ? m.Data : null; + sm.Sequence = m.Seq; + sm.Timestamp = m.Ts; + return sm; + } + + // ------------------------------------------------------------------------- + // TTL expiry + // ------------------------------------------------------------------------- + + private void ScheduleTtlCheck() + { + _ttlTimer?.Dispose(); + _ttlTimer = new Timer(_ => ExpireTtlMessages(), null, TimeSpan.FromSeconds(1), Timeout.InfiniteTimeSpan); + } + + private void ExpireTtlMessages() + { + lock (_gate) + { + if (_ttls == null) return; + var now = DateTime.UtcNow.Ticks; + var toRemove = _ttls.Where(kv => kv.Value <= now).Select(kv => kv.Key).ToList(); + foreach (var seq in toRemove) + RemoveInternal(seq); + + if (_ttls.Count > 0) + ScheduleTtlCheck(); + } + } + + // ------------------------------------------------------------------------- + // Internal method exposed for test access — nextWildcardMatchLocked + // (Go: nextWildcardMatchLocked server/memstore.go:1810) + // ------------------------------------------------------------------------- + + /// + /// Returns the (first, last, found) range for messages whose subject matches + /// at or after . Called with + /// _gate already held. + /// + internal (ulong First, ulong Last, bool Found) NextWildcardMatchLocked(string filter, ulong start) + { + ulong first = _st.LastSeq, last = 0; + var found = false; + + foreach (var kv in _fss) + { + if (!SubjectMatchesFilter(kv.Key, filter)) continue; + var ss = kv.Value; + if (start > ss.Last) continue; + found = true; + var f = Math.Max(start, ss.First); + if (f < first) first = f; + if (ss.Last > last) last = ss.Last; + } + + return found ? (Math.Max(first, start), last, true) : (0UL, 0UL, false); + } + + /// + /// Returns the (first, last, found) range for messages whose subject exactly + /// equals at or after . Called + /// with _gate already held. + /// + internal (ulong First, ulong Last, bool Found) NextLiteralMatchLocked(string filter, ulong start) + { + if (!_fss.TryGetValue(filter, out var ss)) return (0, 0, false); + if (start > ss.Last) return (0, 0, false); + return (Math.Max(start, ss.First), ss.Last, true); + } + + /// Exposes _gate for test-level lock acquisition. + internal object Gate => _gate; } diff --git a/tests/NATS.Server.Tests/JetStream/Storage/MemStoreGoParityTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/MemStoreGoParityTests.cs new file mode 100644 index 0000000..1b82e8e --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/MemStoreGoParityTests.cs @@ -0,0 +1,951 @@ +// Reference: golang/nats-server/server/memstore_test.go +// Tests ported in this file: +// TestMemStoreCompact → Compact_RemovesMessagesBeforeSeq +// TestMemStoreStreamStateDeleted → StreamStateDeleted_TracksDmapCorrectly +// TestMemStoreStreamTruncate → Truncate_RemovesMessagesAfterSeq +// TestMemStoreUpdateMaxMsgsPerSubject → UpdateMaxMsgsPerSubject_EnforcesNewLimit +// TestMemStoreStreamCompactMultiBlockSubjectInfo → Compact_AdjustsSubjectCount +// TestMemStoreSubjectsTotals → SubjectsTotals_MatchesStoredCounts +// TestMemStoreNumPending → NumPending_MatchesFilteredCount +// TestMemStoreMultiLastSeqs → MultiLastSeqs_ReturnsLastPerSubject +// TestMemStoreSubjectForSeq → SubjectForSeq_ReturnsCorrectSubject +// TestMemStoreSubjectDeleteMarkers → SubjectDeleteMarkers_TtlExpiry (skipped: needs pmsgcb) +// TestMemStoreAllLastSeqs → AllLastSeqs_ReturnsLastPerSubjectSorted +// TestMemStoreGetSeqFromTimeWithLastDeleted → GetSeqFromTime_WithLastDeleted +// TestMemStoreSkipMsgs → SkipMsgs_ReservesSequences +// TestMemStoreDeleteBlocks → DeleteBlocks_DmapSizeMatchesNumDeleted +// TestMemStoreMessageTTL → MessageTTL_ExpiresAfterDelay +// TestMemStoreUpdateConfigTTLState → UpdateConfig_TtlStateInitializedAndDestroyed +// TestMemStoreNextWildcardMatch → NextWildcardMatch_BoundsAreCorrect +// TestMemStoreNextLiteralMatch → NextLiteralMatch_BoundsAreCorrect +// TestMemStoreInitialFirstSeq → InitialFirstSeq_StartAtConfiguredSeq +// TestMemStoreStreamTruncateReset → TruncateReset_ClearsEverything +// TestMemStorePurgeExWithSubject → PurgeEx_WithSubject_PurgesAll +// TestMemStorePurgeExWithDeletedMsgs → PurgeEx_WithDeletedMsgs_CorrectFirstSeq +// TestMemStoreDeleteAllFirstSequenceCheck → DeleteAll_FirstSeqIsLastPlusOne +// TestMemStoreNumPendingBug → NumPending_Bug_CorrectCount +// TestMemStorePurgeLeaksDmap → Purge_ClearsDmap +// TestMemStoreMultiLastSeqsMaxAllowed → MultiLastSeqs_MaxAllowed_ThrowsWhenExceeded + +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +/// +/// Go MemStore parity tests. Each test mirrors a specific Go test from +/// golang/nats-server/server/memstore_test.go to verify behaviour parity. +/// +public sealed class MemStoreGoParityTests +{ + // Helper: cast to IStreamStore for sync methods + private static IStreamStore Sync(MemStore ms) => ms; + + // ------------------------------------------------------------------------- + // Compact + // ------------------------------------------------------------------------- + + // Go: TestMemStoreCompact server/memstore_test.go:259 + [Fact] + public void Compact_RemovesMessagesBeforeSeq() + { + var ms = new MemStore(); + var s = Sync(ms); + + for (var i = 0; i < 10; i++) + s.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0); + + s.State().Msgs.ShouldBe(10UL); + + var n = s.Compact(6); + n.ShouldBe(5UL); + + var state = s.State(); + state.Msgs.ShouldBe(5UL); + state.FirstSeq.ShouldBe(6UL); + + // Compact past the end resets first seq + n = s.Compact(100); + n.ShouldBe(5UL); + s.State().FirstSeq.ShouldBe(100UL); + } + + // ------------------------------------------------------------------------- + // StreamStateDeleted + // ------------------------------------------------------------------------- + + // Go: TestMemStoreStreamStateDeleted server/memstore_test.go:342 + [Fact] + public void StreamStateDeleted_TracksDmapCorrectly() + { + var ms = new MemStore(); + var s = Sync(ms); + + const ulong toStore = 10; + for (ulong i = 1; i <= toStore; i++) + s.StoreMsg("foo", null, new byte[8], 0); + + s.State().Deleted.ShouldBeNull(); + + // Delete even sequences 2,4,6,8 + var expectedDeleted = new List(); + for (ulong seq = 2; seq < toStore; seq += 2) + { + s.RemoveMsg(seq); + expectedDeleted.Add(seq); + } + + var state = s.State(); + state.Deleted.ShouldNotBeNull(); + state.Deleted!.ShouldBe(expectedDeleted.ToArray()); + + // Delete 1 and 3 to fill first gap — deleted should shift forward + s.RemoveMsg(1); + s.RemoveMsg(3); + expectedDeleted = expectedDeleted.Skip(2).ToList(); // remove 2 and 4 from start + state = s.State(); + state.Deleted!.ShouldBe(expectedDeleted.ToArray()); + state.FirstSeq.ShouldBe(5UL); + + s.Purge(); + s.State().Deleted.ShouldBeNull(); + } + + // ------------------------------------------------------------------------- + // Truncate + // ------------------------------------------------------------------------- + + // Go: TestMemStoreStreamTruncate server/memstore_test.go:385 + [Fact] + public void Truncate_RemovesMessagesAfterSeq() + { + var ms = new MemStore(); + var s = Sync(ms); + + const ulong tseq = 50; + const ulong toStore = 100; + + for (ulong i = 1; i < tseq; i++) + s.StoreMsg("foo", null, "ok"u8.ToArray(), 0); + for (var i = tseq; i <= toStore; i++) + s.StoreMsg("bar", null, "ok"u8.ToArray(), 0); + + s.State().Msgs.ShouldBe(toStore); + + s.Truncate(tseq); + s.State().Msgs.ShouldBe(tseq); + + // Truncate with some interior deletes + s.RemoveMsg(10); + s.RemoveMsg(20); + s.RemoveMsg(30); + s.RemoveMsg(40); + + s.Truncate(25); + var state = s.State(); + // 25 seqs remaining, minus 2 deleted (10, 20) = 23 messages + state.Msgs.ShouldBe(tseq - 2 - (tseq - 25)); + state.NumSubjects.ShouldBe(1); // only "foo" left + state.Deleted!.ShouldBe([10UL, 20UL]); + } + + // ------------------------------------------------------------------------- + // TruncateReset + // ------------------------------------------------------------------------- + + // Go: TestMemStoreStreamTruncateReset server/memstore_test.go:490 + [Fact] + public void TruncateReset_ClearsEverything() + { + var ms = new MemStore(); + var s = Sync(ms); + + for (var i = 0; i < 1000; i++) + s.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0); + + s.Truncate(0); + + var state = s.State(); + state.Msgs.ShouldBe(0UL); + state.Bytes.ShouldBe(0UL); + state.FirstSeq.ShouldBe(0UL); + state.LastSeq.ShouldBe(0UL); + state.NumSubjects.ShouldBe(0); + state.NumDeleted.ShouldBe(0); + + // Can store again after reset + for (var i = 0; i < 1000; i++) + s.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0); + + state = s.State(); + state.Msgs.ShouldBe(1000UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(1000UL); + state.NumSubjects.ShouldBe(1); + state.NumDeleted.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // UpdateMaxMsgsPerSubject + // ------------------------------------------------------------------------- + + // Go: TestMemStoreUpdateMaxMsgsPerSubject server/memstore_test.go:452 + [Fact] + public void UpdateMaxMsgsPerSubject_EnforcesNewLimit() + { + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.Memory, + Subjects = ["foo"], + MaxMsgsPer = 10, + }; + var ms = new MemStore(cfg); + var s = Sync(ms); + + // Increase limit — should allow more + cfg.MaxMsgsPer = 50; + s.UpdateConfig(cfg); + + const int numStored = 22; + for (var i = 0; i < numStored; i++) + s.StoreMsg("foo", null, [], 0); + + var ss = s.SubjectsState("foo")["foo"]; + ss.Msgs.ShouldBe((ulong)numStored); + + // Shrink limit — should truncate stored + cfg.MaxMsgsPer = 10; + s.UpdateConfig(cfg); + + ss = s.SubjectsState("foo")["foo"]; + ss.Msgs.ShouldBe(10UL); + } + + // ------------------------------------------------------------------------- + // CompactMultiBlockSubjectInfo + // ------------------------------------------------------------------------- + + // Go: TestMemStoreStreamCompactMultiBlockSubjectInfo server/memstore_test.go:531 + [Fact] + public void Compact_AdjustsSubjectCount() + { + var ms = new MemStore(); + var s = Sync(ms); + + for (var i = 0; i < 1000; i++) + s.StoreMsg($"foo.{i}", null, "Hello World"u8.ToArray(), 0); + + var deleted = s.Compact(501); + deleted.ShouldBe(500UL); + + s.State().NumSubjects.ShouldBe(500); + } + + // ------------------------------------------------------------------------- + // SubjectsTotals + // ------------------------------------------------------------------------- + + // Go: TestMemStoreSubjectsTotals server/memstore_test.go:557 + [Fact] + public void SubjectsTotals_MatchesStoredCounts() + { + var ms = new MemStore(); + var s = Sync(ms); + + var fmap = new Dictionary(); + var bmap = new Dictionary(); + var rng = new Random(42); + + for (var i = 0; i < 10_000; i++) + { + string ft; + Dictionary m; + if (rng.Next(2) == 0) { ft = "foo"; m = fmap; } + else { ft = "bar"; m = bmap; } + var dt = rng.Next(100); + var subj = $"{ft}.{dt}"; + m.TryGetValue(dt, out var c); + m[dt] = c + 1; + s.StoreMsg(subj, null, "Hello World"u8.ToArray(), 0); + } + + // Check individual foo subjects + foreach (var kv in fmap) + { + var subj = $"foo.{kv.Key}"; + var totals = s.SubjectsTotals(subj); + totals[subj].ShouldBe((ulong)kv.Value); + } + + // Check foo.* wildcard + var fooTotals = s.SubjectsTotals("foo.*"); + fooTotals.Count.ShouldBe(fmap.Count); + var fooExpected = (ulong)fmap.Values.Sum(n => n); + fooTotals.Values.Aggregate(0UL, (a, v) => a + v).ShouldBe(fooExpected); + + // Check bar.* wildcard + var barTotals = s.SubjectsTotals("bar.*"); + barTotals.Count.ShouldBe(bmap.Count); + + // Check *.* + var allTotals = s.SubjectsTotals("*.*"); + allTotals.Count.ShouldBe(fmap.Count + bmap.Count); + } + + // ------------------------------------------------------------------------- + // NumPending + // ------------------------------------------------------------------------- + + // Go: TestMemStoreNumPending server/memstore_test.go:637 + [Fact] + public void NumPending_MatchesFilteredCount() + { + var ms = new MemStore(); + var s = Sync(ms); + var tokens = new[] { "foo", "bar", "baz" }; + var rng = new Random(99); + + string GenSubj() => $"{tokens[rng.Next(3)]}.{tokens[rng.Next(3)]}.{tokens[rng.Next(3)]}.{tokens[rng.Next(3)]}"; + + for (var i = 0; i < 5_000; i++) + s.StoreMsg(GenSubj(), null, "Hello World"u8.ToArray(), 0); + + var state = s.State(); + var startSeqs = new ulong[] { 0, 1, 2, 200, 444, 555, 2222, 4000 }; + var checkSubs = new[] { "foo.>", "*.bar.>", "foo.bar.*.baz", "*.foo.bar.*", "foo.foo.bar.baz" }; + + foreach (var filter in checkSubs) + { + foreach (var startSeq in startSeqs) + { + var (total, validThrough) = s.NumPending(startSeq, filter, false); + validThrough.ShouldBe(state.LastSeq); + + // Sanity-check: manually count matching msgs from startSeq + var sseq = startSeq == 0 ? 1 : startSeq; + ulong expected = 0; + for (var seq = sseq; seq <= state.LastSeq; seq++) + { + try + { + var sm = s.LoadMsg(seq, null); + if (SubjectMatchesFilter(sm.Subject, filter)) expected++; + } + catch (KeyNotFoundException) { } + } + total.ShouldBe(expected, $"filter={filter} start={startSeq}"); + } + } + } + + // ------------------------------------------------------------------------- + // MultiLastSeqs + // ------------------------------------------------------------------------- + + // Go: TestMemStoreMultiLastSeqs server/memstore_test.go:923 + [Fact] + public void MultiLastSeqs_ReturnsLastPerSubject() + { + var ms = new MemStore(); + var s = Sync(ms); + var msg = "abc"u8.ToArray(); + + for (var i = 0; i < 33; i++) + { + s.StoreMsg("foo.foo", null, msg, 0); + s.StoreMsg("foo.bar", null, msg, 0); + s.StoreMsg("foo.baz", null, msg, 0); + } + for (var i = 0; i < 33; i++) + { + s.StoreMsg("bar.foo", null, msg, 0); + s.StoreMsg("bar.bar", null, msg, 0); + s.StoreMsg("bar.baz", null, msg, 0); + } + + // Up to seq 3 + s.MultiLastSeqs(["foo.*"], 3, -1).ShouldBe([1UL, 2UL, 3UL]); + // All of foo.* + s.MultiLastSeqs(["foo.*"], 0, -1).ShouldBe([97UL, 98UL, 99UL]); + // All of bar.* + s.MultiLastSeqs(["bar.*"], 0, -1).ShouldBe([196UL, 197UL, 198UL]); + // bar.* at seq <= 99 — nothing + s.MultiLastSeqs(["bar.*"], 99, -1).ShouldBe([]); + + // Explicit subjects + s.MultiLastSeqs(["foo.foo", "foo.bar", "foo.baz"], 3, -1).ShouldBe([1UL, 2UL, 3UL]); + s.MultiLastSeqs(["foo.foo", "foo.bar", "foo.baz"], 0, -1).ShouldBe([97UL, 98UL, 99UL]); + s.MultiLastSeqs(["bar.foo", "bar.bar", "bar.baz"], 0, -1).ShouldBe([196UL, 197UL, 198UL]); + s.MultiLastSeqs(["bar.foo", "bar.bar", "bar.baz"], 99, -1).ShouldBe([]); + + // Single filter + s.MultiLastSeqs(["foo.foo"], 3, -1).ShouldBe([1UL]); + + // De-duplicate overlapping filters + s.MultiLastSeqs(["foo.*", "foo.bar"], 3, -1).ShouldBe([1UL, 2UL, 3UL]); + + // All subjects + s.MultiLastSeqs([">"], 0, -1).ShouldBe([97UL, 98UL, 99UL, 196UL, 197UL, 198UL]); + s.MultiLastSeqs([">"], 99, -1).ShouldBe([97UL, 98UL, 99UL]); + } + + // ------------------------------------------------------------------------- + // MultiLastSeqs — maxAllowed + // ------------------------------------------------------------------------- + + // Go: TestMemStoreMultiLastSeqsMaxAllowed server/memstore_test.go:1010 + [Fact] + public void MultiLastSeqs_MaxAllowed_ThrowsWhenExceeded() + { + var ms = new MemStore(); + var s = Sync(ms); + var msg = "abc"u8.ToArray(); + + for (var i = 1; i <= 100; i++) + s.StoreMsg($"foo.{i}", null, msg, 0); + + Should.Throw(() => s.MultiLastSeqs(["foo.*"], 0, 10)); + } + + // ------------------------------------------------------------------------- + // SubjectForSeq + // ------------------------------------------------------------------------- + + // Go: TestMemStoreSubjectForSeq server/memstore_test.go:1319 + [Fact] + public void SubjectForSeq_ReturnsCorrectSubject() + { + var ms = new MemStore(); + var s = Sync(ms); + + s.StoreMsg("foo.bar", null, [], 0); + + // seq 0 (not found) + Should.Throw(() => s.SubjectForSeq(0)); + + // seq 1 — should be "foo.bar" + s.SubjectForSeq(1).ShouldBe("foo.bar"); + + // seq 2 (not yet stored) + Should.Throw(() => s.SubjectForSeq(2)); + } + + // ------------------------------------------------------------------------- + // AllLastSeqs + // ------------------------------------------------------------------------- + + // Go: TestMemStoreAllLastSeqs server/memstore_test.go:1266 + [Fact] + public void AllLastSeqs_ReturnsLastPerSubjectSorted() + { + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = ["*.*"], + MaxMsgsPer = 50, + Storage = StorageType.Memory, + }; + var ms = new MemStore(cfg); + var s = Sync(ms); + + var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" }; + var msg = "abc"u8.ToArray(); + var rng = new Random(7); + + for (var i = 0; i < 10_000; i++) + s.StoreMsg(subjs[rng.Next(subjs.Length)], null, msg, 0); + + // Compute expected last sequences per subject + var expected = new List(); + foreach (var subj in subjs) + { + try + { + var sm = s.LoadLastMsg(subj, null); + expected.Add(sm.Sequence); + } + catch (KeyNotFoundException) { } + } + expected.Sort(); + + var seqs = s.AllLastSeqs(); + seqs.ShouldBe(expected.ToArray()); + } + + // ------------------------------------------------------------------------- + // GetSeqFromTime with last deleted + // ------------------------------------------------------------------------- + + // Go: TestMemStoreGetSeqFromTimeWithLastDeleted server/memstore_test.go:839 + [Fact] + public void GetSeqFromTime_WithLastDeleted() + { + var ms = new MemStore(); + var s = Sync(ms); + + const int total = 1000; + DateTime midTime = default; + for (var i = 1; i <= total; i++) + { + s.StoreMsg("A", null, "OK"u8.ToArray(), 0); + if (i == total / 2) + { + Thread.Sleep(100); + midTime = DateTime.UtcNow; + } + } + + // Delete last 100 + for (var seq = total - 100; seq <= total; seq++) + s.RemoveMsg((ulong)seq); + + // Should not panic and should return correct value + var found = s.GetSeqFromTime(midTime); + found.ShouldBe(501UL); + } + + // ------------------------------------------------------------------------- + // SkipMsgs + // ------------------------------------------------------------------------- + + // Go: TestMemStoreSkipMsgs server/memstore_test.go:871 + [Fact] + public void SkipMsgs_ReservesSequences() + { + var ms = new MemStore(); + var s = Sync(ms); + + // Wrong starting sequence should fail + Should.Throw(() => s.SkipMsgs(10, 100)); + + // Skip from seq 1 + s.SkipMsgs(1, 100); + var state = s.State(); + state.FirstSeq.ShouldBe(101UL); + state.LastSeq.ShouldBe(100UL); + + // Skip many more + s.SkipMsgs(101, 100_000); + state = s.State(); + state.FirstSeq.ShouldBe(100_101UL); + state.LastSeq.ShouldBe(100_100UL); + + // New store: store a message then skip + var ms2 = new MemStore(); + var s2 = Sync(ms2); + s2.StoreMsg("foo", null, [], 0); + s2.SkipMsgs(2, 10); + + state = s2.State(); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(11UL); + state.Msgs.ShouldBe(1UL); + state.NumDeleted.ShouldBe(10); + state.Deleted.ShouldNotBeNull(); + state.Deleted!.Length.ShouldBe(10); + + // FastState consistency + var fstate = new StreamState(); + s2.FastState(ref fstate); + fstate.FirstSeq.ShouldBe(1UL); + fstate.LastSeq.ShouldBe(11UL); + fstate.Msgs.ShouldBe(1UL); + fstate.NumDeleted.ShouldBe(10); + } + + // ------------------------------------------------------------------------- + // DeleteBlocks + // ------------------------------------------------------------------------- + + // Go: TestMemStoreDeleteBlocks server/memstore_test.go:799 + [Fact] + public void DeleteBlocks_DmapSizeMatchesNumDeleted() + { + var ms = new MemStore(); + var s = Sync(ms); + + const int total = 10_000; + for (var i = 0; i < total; i++) + s.StoreMsg("A", null, "OK"u8.ToArray(), 0); + + // Delete 5000 random sequences + var rng = new Random(13); + var deleteSet = new HashSet(); + while (deleteSet.Count < 5000) + deleteSet.Add(rng.Next(total) + 1); + + foreach (var seq in deleteSet) + s.RemoveMsg((ulong)seq); + + var fstate = new StreamState(); + s.FastState(ref fstate); + + // NumDeleted from FastState must equal interior gap count + var fullState = s.State(); + var dmapSize = fullState.Deleted?.Length ?? 0; + dmapSize.ShouldBe(fstate.NumDeleted); + } + + // ------------------------------------------------------------------------- + // MessageTTL + // ------------------------------------------------------------------------- + + // Go: TestMemStoreMessageTTL server/memstore_test.go:1202 + [Fact] + public void MessageTTL_ExpiresAfterDelay() + { + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = ["test"], + Storage = StorageType.Memory, + AllowMsgTtl = true, + }; + var ms = new MemStore(cfg); + var s = Sync(ms); + + const long ttl = 1; // 1 second + + for (var i = 1; i <= 10; i++) + s.StoreMsg("test", null, [], ttl); + + var ss = new StreamState(); + s.FastState(ref ss); + ss.FirstSeq.ShouldBe(1UL); + ss.LastSeq.ShouldBe(10UL); + ss.Msgs.ShouldBe(10UL); + + // Wait for TTL to expire (> 1 sec + check interval of 1 sec) + Thread.Sleep(2_500); + + s.FastState(ref ss); + ss.FirstSeq.ShouldBe(11UL); + ss.LastSeq.ShouldBe(10UL); + ss.Msgs.ShouldBe(0UL); + } + + // ------------------------------------------------------------------------- + // UpdateConfigTTLState + // ------------------------------------------------------------------------- + + // Go: TestMemStoreUpdateConfigTTLState server/memstore_test.go:1299 + [Fact] + public void UpdateConfig_TtlStateInitializedAndDestroyed() + { + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = [">"], + Storage = StorageType.Memory, + AllowMsgTtl = false, + }; + var ms = new MemStore(cfg); + var s = Sync(ms); + + // TTL disabled — internal TTL wheel should be null (we cannot observe it directly, + // but UpdateConfig must not throw and subsequent behaviour must be correct) + cfg.AllowMsgTtl = true; + s.UpdateConfig(cfg); + + // Store with TTL — should work + s.StoreMsg("test", null, [], 3600); + s.State().Msgs.ShouldBe(1UL); + + // Disable TTL again + cfg.AllowMsgTtl = false; + s.UpdateConfig(cfg); + + // Message stored before disabling should still be present (TTL wheel gone but msg stays) + s.State().Msgs.ShouldBe(1UL); + } + + // ------------------------------------------------------------------------- + // NextWildcardMatch + // ------------------------------------------------------------------------- + + // Go: TestMemStoreNextWildcardMatch server/memstore_test.go:1373 + [Fact] + public void NextWildcardMatch_BoundsAreCorrect() + { + var ms = new MemStore(); + var s = Sync(ms); + + void StoreN(string subj, int n) + { + for (var i = 0; i < n; i++) + s.StoreMsg(subj, null, "msg"u8.ToArray(), 0); + } + + StoreN("foo.bar.a", 1); // seq 1 + StoreN("foo.baz.bar", 10); // seqs 2-11 + StoreN("foo.bar.b", 1); // seq 12 + StoreN("foo.baz.bar", 10); // seqs 13-22 + StoreN("foo.baz.bar.no.match", 10); // seqs 23-32 + + lock (ms.Gate) + { + var (first, last, found) = ms.NextWildcardMatchLocked("foo.bar.*", 0); + found.ShouldBeTrue(); + first.ShouldBe(1UL); + last.ShouldBe(12UL); + + (first, last, found) = ms.NextWildcardMatchLocked("foo.bar.*", 1); + found.ShouldBeTrue(); + first.ShouldBe(1UL); + last.ShouldBe(12UL); + + (first, last, found) = ms.NextWildcardMatchLocked("foo.bar.*", 2); + found.ShouldBeTrue(); + first.ShouldBe(12UL); + last.ShouldBe(12UL); + + (_, _, found) = ms.NextWildcardMatchLocked("foo.bar.*", first + 1); + found.ShouldBeFalse(); + + (first, last, found) = ms.NextWildcardMatchLocked("foo.baz.*", 1); + found.ShouldBeTrue(); + first.ShouldBe(2UL); + last.ShouldBe(22UL); + + (first, last, found) = ms.NextWildcardMatchLocked("foo.nope.*", 1); + found.ShouldBeFalse(); + first.ShouldBe(0UL); + last.ShouldBe(0UL); + + (first, last, found) = ms.NextWildcardMatchLocked("foo.>", 1); + found.ShouldBeTrue(); + first.ShouldBe(1UL); + last.ShouldBe(32UL); + } + } + + // ------------------------------------------------------------------------- + // NextLiteralMatch + // ------------------------------------------------------------------------- + + // Go: TestMemStoreNextLiteralMatch server/memstore_test.go:1454 + [Fact] + public void NextLiteralMatch_BoundsAreCorrect() + { + var ms = new MemStore(); + var s = Sync(ms); + + void StoreN(string subj, int n) + { + for (var i = 0; i < n; i++) + s.StoreMsg(subj, null, "msg"u8.ToArray(), 0); + } + + StoreN("foo.bar.a", 1); // seq 1 + StoreN("foo.baz.bar", 10); // seqs 2-11 + StoreN("foo.bar.b", 1); // seq 12 + StoreN("foo.baz.bar", 10); // seqs 13-22 + StoreN("foo.baz.bar.no.match", 10); // seqs 23-32 + + lock (ms.Gate) + { + var (first, last, found) = ms.NextLiteralMatchLocked("foo.bar.a", 0); + found.ShouldBeTrue(); + first.ShouldBe(1UL); + last.ShouldBe(1UL); + + (_, _, found) = ms.NextLiteralMatchLocked("foo.bar.a", 2); + found.ShouldBeFalse(); + + (first, last, found) = ms.NextLiteralMatchLocked("foo.baz.bar", 1); + found.ShouldBeTrue(); + first.ShouldBe(2UL); + last.ShouldBe(22UL); + + (first, last, found) = ms.NextLiteralMatchLocked("foo.baz.bar", 22); + found.ShouldBeTrue(); + first.ShouldBe(22UL); + last.ShouldBe(22UL); + + (first, last, found) = ms.NextLiteralMatchLocked("foo.baz.bar", 23); + found.ShouldBeFalse(); + first.ShouldBe(0UL); + last.ShouldBe(0UL); + + (_, _, found) = ms.NextLiteralMatchLocked("foo.nope", 1); + found.ShouldBeFalse(); + } + } + + // ------------------------------------------------------------------------- + // InitialFirstSeq + // ------------------------------------------------------------------------- + + // Go: TestMemStoreInitialFirstSeq server/memstore_test.go:765 + [Fact] + public void InitialFirstSeq_StartAtConfiguredSeq() + { + var cfg = new StreamConfig + { + Name = "zzz", + Storage = StorageType.Memory, + FirstSeq = 1000, + }; + var ms = new MemStore(cfg); + var s = Sync(ms); + + var (seq, _) = s.StoreMsg("A", null, "OK"u8.ToArray(), 0); + seq.ShouldBe(1000UL); + + (seq, _) = s.StoreMsg("B", null, "OK"u8.ToArray(), 0); + seq.ShouldBe(1001UL); + + var state = new StreamState(); + s.FastState(ref state); + state.Msgs.ShouldBe(2UL); + state.FirstSeq.ShouldBe(1000UL); + state.LastSeq.ShouldBe(1001UL); + } + + // ------------------------------------------------------------------------- + // PurgeEx with subject + // ------------------------------------------------------------------------- + + // Go: TestMemStorePurgeExWithSubject server/memstore_test.go:437 + [Fact] + public void PurgeEx_WithSubject_PurgesAll() + { + var ms = new MemStore(); + var s = Sync(ms); + + for (var i = 0; i < 100; i++) + s.StoreMsg("foo", null, [], 0); + + s.PurgeEx("foo", 1, 0); + s.State().Msgs.ShouldBe(0UL); + } + + // ------------------------------------------------------------------------- + // PurgeEx with deleted messages + // ------------------------------------------------------------------------- + + // Go: TestMemStorePurgeExWithDeletedMsgs server/memstore_test.go:1031 + [Fact] + public void PurgeEx_WithDeletedMsgs_CorrectFirstSeq() + { + var ms = new MemStore(); + var s = Sync(ms); + var msg = "abc"u8.ToArray(); + + for (var i = 1; i <= 10; i++) + s.StoreMsg("foo", null, msg, 0); + + s.RemoveMsg(2); + s.RemoveMsg(9); // was the bug + + var n = s.PurgeEx("", 9, 0); + n.ShouldBe(7UL); // seqs 1,3,4,5,6,7,8 (not 2 since deleted, not 9 since deleted) + + var state = new StreamState(); + s.FastState(ref state); + state.FirstSeq.ShouldBe(10UL); + state.LastSeq.ShouldBe(10UL); + state.Msgs.ShouldBe(1UL); + } + + // ------------------------------------------------------------------------- + // DeleteAll FirstSequenceCheck + // ------------------------------------------------------------------------- + + // Go: TestMemStoreDeleteAllFirstSequenceCheck server/memstore_test.go:1060 + [Fact] + public void DeleteAll_FirstSeqIsLastPlusOne() + { + var ms = new MemStore(); + var s = Sync(ms); + + for (var i = 1; i <= 10; i++) + s.StoreMsg("foo", null, "abc"u8.ToArray(), 0); + + for (ulong seq = 1; seq <= 10; seq++) + s.RemoveMsg(seq); + + var state = new StreamState(); + s.FastState(ref state); + state.FirstSeq.ShouldBe(11UL); + state.LastSeq.ShouldBe(10UL); + state.Msgs.ShouldBe(0UL); + } + + // ------------------------------------------------------------------------- + // NumPending — bug fix + // ------------------------------------------------------------------------- + + // Go: TestMemStoreNumPendingBug server/memstore_test.go:1137 + [Fact] + public void NumPending_Bug_CorrectCount() + { + var ms = new MemStore(); + var s = Sync(ms); + + foreach (var subj in new[] { "foo.foo", "foo.bar", "foo.baz", "foo.zzz" }) + { + s.StoreMsg("foo.aaa", null, [], 0); + s.StoreMsg(subj, null, [], 0); + s.StoreMsg(subj, null, [], 0); + } + + // 12 msgs total + var (total, _) = s.NumPending(4, "foo.*", false); + + ulong expected = 0; + for (var seq = 4; seq <= 12; seq++) + { + try + { + var sm = s.LoadMsg((ulong)seq, null); + if (SubjectMatchesFilter(sm.Subject, "foo.*")) expected++; + } + catch (KeyNotFoundException) { } + } + total.ShouldBe(expected); + } + + // ------------------------------------------------------------------------- + // Purge clears dmap + // ------------------------------------------------------------------------- + + // Go: TestMemStorePurgeLeaksDmap server/memstore_test.go:1168 + [Fact] + public void Purge_ClearsDmap() + { + var ms = new MemStore(); + var s = Sync(ms); + + for (var i = 0; i < 10; i++) + s.StoreMsg("foo", null, [], 0); + + for (ulong i = 2; i <= 9; i++) + s.RemoveMsg(i); + + // 8 interior gaps now + var state = s.State(); + state.NumDeleted.ShouldBe(8); + + // Purge should also clear dmap + var purged = s.Purge(); + purged.ShouldBe(2UL); // 2 actual msgs remain (1 and 10) + + state = s.State(); + state.NumDeleted.ShouldBe(0); + state.Deleted.ShouldBeNull(); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private static bool SubjectMatchesFilter(string subject, string filter) + { + if (string.IsNullOrEmpty(filter) || filter == ">") return true; + if (NATS.Server.Subscriptions.SubjectMatch.IsLiteral(filter)) + return string.Equals(subject, filter, StringComparison.Ordinal); + return NATS.Server.Subscriptions.SubjectMatch.MatchLiteral(subject, filter); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Storage/MemStoreTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/MemStoreTests.cs index 9fdc974..f0951c3 100644 --- a/tests/NATS.Server.Tests/JetStream/Storage/MemStoreTests.cs +++ b/tests/NATS.Server.Tests/JetStream/Storage/MemStoreTests.cs @@ -216,7 +216,9 @@ public sealed class MemStoreTests await store.AppendAsync("foo", payload, default); var state = await store.GetStateAsync(default); - state.Bytes.ShouldBe((ulong)500); + // Go parity: MsgSize = subj.Length + hdr + data + 16 overhead + // "foo"(3) + 100 + 16 = 119 per msg × 5 = 595 + state.Bytes.ShouldBe((ulong)595); } // Go: TestMemStoreBytesLimit server/memstore_test.go @@ -233,7 +235,9 @@ public sealed class MemStoreTests await store.RemoveAsync(3, default); var state = await store.GetStateAsync(default); - state.Bytes.ShouldBe((ulong)300); + // Go parity: MsgSize = subj.Length + hdr + data + 16 overhead + // "foo"(3) + 100 + 16 = 119 per msg × 3 remaining = 357 + state.Bytes.ShouldBe((ulong)357); } // Snapshot and restore.