diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs
index e9d9b14..eafade9 100644
--- a/src/NATS.Server/JetStream/Storage/MemStore.cs
+++ b/src/NATS.Server/JetStream/Storage/MemStore.cs
@@ -1,10 +1,24 @@
using System.Text.Json;
using NATS.Server.JetStream.Models;
+using NATS.Server.Subscriptions;
+
+// Alias for the full Go-parity StreamState in this namespace.
+using StorageStreamState = NATS.Server.JetStream.Storage.StreamState;
namespace NATS.Server.JetStream.Storage;
+// Go: server/memstore.go
+///
+/// In-memory implementation of . Implements both the
+/// async helper surface used by the current JetStream layer and the full Go-parity
+/// sync interface matching Go's memStore struct exactly.
+///
public sealed class MemStore : IStreamStore
{
+ // -------------------------------------------------------------------------
+ // Snapshot record (used by async serialisation helpers only)
+ // -------------------------------------------------------------------------
+
private sealed class SnapshotRecord
{
public ulong Sequence { get; init; }
@@ -13,23 +27,114 @@ public sealed class MemStore : IStreamStore
public DateTime TimestampUtc { get; init; }
}
- private readonly object _gate = new();
- private ulong _last;
- private readonly Dictionary _messages = new();
+ // -------------------------------------------------------------------------
+ // Internal stored message — tightly packed to avoid per-field allocations.
+ // -------------------------------------------------------------------------
+ private sealed class Msg
+ {
+ public readonly string Subj;
+ public readonly byte[]? Hdr;
+ public readonly byte[]? Data;
+ public readonly ulong Seq;
+ public readonly long Ts; // Unix nanoseconds
+
+ public Msg(string subj, byte[]? hdr, byte[]? data, ulong seq, long ts)
+ {
+ Subj = subj;
+ Hdr = hdr;
+ Data = data;
+ Seq = seq;
+ Ts = ts;
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Per-subject tracking
+ // -------------------------------------------------------------------------
+
+ private sealed class SubjectState
+ {
+ public ulong Msgs;
+ public ulong First;
+ public ulong Last;
+ }
+
+ // -------------------------------------------------------------------------
+ // State
+ // -------------------------------------------------------------------------
+
+ private readonly object _gate = new();
+
+ // Go: memStore.msgs
+ private Dictionary _msgs = new();
+
+ // Go: memStore.fss — per-subject sequence tracking
+ private readonly Dictionary _fss = new(StringComparer.Ordinal);
+
+ // Go: memStore.state — overall stream state
+ private StreamStateMutable _st;
+
+ // Go: memStore.dmap — deleted-sequence set (interior gaps)
+ private readonly SortedSet _dmap = [];
+
+ // Go: memStore.maxp — max messages per subject (0 = unlimited)
+ private int _maxp;
+
+ // TTL support: seqno → expiry ticks (DateTime.UtcNow.Ticks)
+ private SortedDictionary? _ttls;
+ private Timer? _ttlTimer;
+
+ // Config copy for UpdateConfig
+ private StreamConfig? _cfg;
+
+ // -------------------------------------------------------------------------
+ // Mutable state helper (avoids re-boxing ulong fields in record structs)
+ // -------------------------------------------------------------------------
+
+ private struct StreamStateMutable
+ {
+ public ulong Msgs;
+ public ulong Bytes;
+ public ulong FirstSeq;
+ public DateTime FirstTime;
+ public ulong LastSeq;
+ public DateTime LastTime;
+ }
+
+ // -------------------------------------------------------------------------
+ // Constructor
+ // -------------------------------------------------------------------------
+
+ public MemStore() { }
+
+ public MemStore(StreamConfig cfg)
+ {
+ _cfg = cfg;
+ _maxp = cfg.MaxMsgsPer;
+ if (cfg.AllowMsgTtl)
+ _ttls = new SortedDictionary();
+ // Go: newMemStore — handle FirstSeq by pre-advancing the sequence counter
+ if (cfg.FirstSeq > 0)
+ {
+ _st.LastSeq = cfg.FirstSeq - 1;
+ _st.FirstSeq = cfg.FirstSeq;
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Async helpers (used by existing JetStream layer)
+ // -------------------------------------------------------------------------
+
+ // Go: memStore.StoreMsg — async wrapper
public ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct)
{
lock (_gate)
{
- _last++;
- _messages[_last] = new StoredMessage
- {
- Sequence = _last,
- Subject = subject,
- Payload = payload,
- TimestampUtc = DateTime.UtcNow,
- };
- return ValueTask.FromResult(_last);
+ var seq = _st.LastSeq + 1;
+ var ts = DateTime.UtcNow.Ticks * 100L; // nanos
+ StoreInternal(subject, null, payload.IsEmpty ? null : payload.ToArray(), seq, ts, 0);
+ return ValueTask.FromResult(seq);
}
}
@@ -37,8 +142,15 @@ public sealed class MemStore : IStreamStore
{
lock (_gate)
{
- _messages.TryGetValue(sequence, out var msg);
- return ValueTask.FromResult(msg);
+ if (!_msgs.TryGetValue(sequence, out var m))
+ return ValueTask.FromResult(null);
+ return ValueTask.FromResult(new StoredMessage
+ {
+ Sequence = m.Seq,
+ Subject = m.Subj,
+ Payload = m.Data ?? [],
+ TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc),
+ });
}
}
@@ -46,11 +158,17 @@ public sealed class MemStore : IStreamStore
{
lock (_gate)
{
- var match = _messages.Values
- .Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal))
- .OrderByDescending(m => m.Sequence)
- .FirstOrDefault();
- return ValueTask.FromResult(match);
+ if (!_fss.TryGetValue(subject, out var ss) || ss.Msgs == 0)
+ return ValueTask.FromResult(null);
+ if (!_msgs.TryGetValue(ss.Last, out var m))
+ return ValueTask.FromResult(null);
+ return ValueTask.FromResult(new StoredMessage
+ {
+ Sequence = m.Seq,
+ Subject = m.Subj,
+ Payload = m.Data ?? [],
+ TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc),
+ });
}
}
@@ -58,10 +176,17 @@ public sealed class MemStore : IStreamStore
{
lock (_gate)
{
- var messages = _messages.Values
- .OrderBy(m => m.Sequence)
+ var list = _msgs.Values
+ .OrderBy(m => m.Seq)
+ .Select(m => new StoredMessage
+ {
+ Sequence = m.Seq,
+ Subject = m.Subj,
+ Payload = m.Data ?? [],
+ TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc),
+ })
.ToArray();
- return ValueTask.FromResult>(messages);
+ return ValueTask.FromResult>(list);
}
}
@@ -69,7 +194,7 @@ public sealed class MemStore : IStreamStore
{
lock (_gate)
{
- return ValueTask.FromResult(_messages.Remove(sequence));
+ return ValueTask.FromResult(RemoveInternal(sequence));
}
}
@@ -77,8 +202,7 @@ public sealed class MemStore : IStreamStore
{
lock (_gate)
{
- _messages.Clear();
- _last = 0;
+ PurgeInternal();
return ValueTask.CompletedTask;
}
}
@@ -87,15 +211,15 @@ public sealed class MemStore : IStreamStore
{
lock (_gate)
{
- var snapshot = _messages
+ var snapshot = _msgs
.Values
- .OrderBy(x => x.Sequence)
+ .OrderBy(x => x.Seq)
.Select(x => new SnapshotRecord
{
- Sequence = x.Sequence,
- Subject = x.Subject,
- PayloadBase64 = Convert.ToBase64String(x.Payload.ToArray()),
- TimestampUtc = x.TimestampUtc,
+ Sequence = x.Seq,
+ Subject = x.Subj,
+ PayloadBase64 = Convert.ToBase64String(x.Data ?? []),
+ TimestampUtc = new DateTime(x.Ts / 100L, DateTimeKind.Utc),
})
.ToArray();
return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot));
@@ -106,8 +230,13 @@ public sealed class MemStore : IStreamStore
{
lock (_gate)
{
- _messages.Clear();
- _last = 0;
+ // Full reset for restore — not PurgeInternal (which preserves sequence continuity).
+ // Restored records may have gaps, so we advance LastSeq per record.
+ _msgs = new Dictionary();
+ _fss.Clear();
+ _dmap.Clear();
+ _ttls?.Clear();
+ _st = default;
if (!snapshot.IsEmpty)
{
@@ -116,14 +245,13 @@ public sealed class MemStore : IStreamStore
{
foreach (var record in records)
{
- _messages[record.Sequence] = new StoredMessage
- {
- Sequence = record.Sequence,
- Subject = record.Subject,
- Payload = Convert.FromBase64String(record.PayloadBase64),
- TimestampUtc = record.TimestampUtc,
- };
- _last = Math.Max(_last, record.Sequence);
+ var data = record.PayloadBase64.Length > 0
+ ? Convert.FromBase64String(record.PayloadBase64)
+ : null;
+ var ts = new DateTimeOffset(record.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
+ // Advance LastSeq so StoreInternal accepts this sequence (handles gaps)
+ _st.LastSeq = record.Sequence - 1;
+ StoreInternal(record.Subject, null, data, record.Sequence, ts, 0);
}
}
}
@@ -138,23 +266,985 @@ public sealed class MemStore : IStreamStore
{
return ValueTask.FromResult(new ApiStreamState
{
- Messages = (ulong)_messages.Count,
- FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(),
- LastSeq = _last,
- Bytes = (ulong)_messages.Values.Sum(m => m.Payload.Length),
+ Messages = _st.Msgs,
+ FirstSeq = _st.Msgs == 0 ? 0UL : _st.FirstSeq,
+ LastSeq = _st.LastSeq,
+ Bytes = _st.Bytes,
});
}
}
+ // -------------------------------------------------------------------------
+ // Go-parity sync interface (IStreamStore explicit implementations)
+ // -------------------------------------------------------------------------
+
+ // Go: memStore.StoreMsg server/memstore.go:350
+ (ulong Seq, long Ts) IStreamStore.StoreMsg(string subject, byte[]? hdr, byte[] msg, long ttl)
+ {
+ lock (_gate)
+ {
+ var seq = _st.LastSeq + 1;
+ // Go uses time.Now().UnixNano() — use Ticks*100 for ~100ns precision
+ var ts = DateTime.UtcNow.Ticks * 100L;
+ StoreInternal(subject, hdr, msg.Length == 0 ? null : msg, seq, ts, ttl);
+ return (seq, ts);
+ }
+ }
+
+ // Go: memStore.StoreRawMsg server/memstore.go:329
+ void IStreamStore.StoreRawMsg(string subject, byte[]? hdr, byte[] msg, ulong seq, long ts, long ttl, bool discardNewCheck)
+ {
+ lock (_gate)
+ {
+ StoreInternal(subject, hdr, msg.Length == 0 ? null : msg, seq, ts, ttl);
+ }
+ }
+
+ // Go: memStore.SkipMsg server/memstore.go:368
+ ulong IStreamStore.SkipMsg(ulong seq)
+ {
+ lock (_gate)
+ {
+ if (seq != 0 && seq != _st.LastSeq + 1)
+ throw new InvalidOperationException($"Sequence mismatch: expected {_st.LastSeq + 1}, got {seq}.");
+ if (seq == 0)
+ seq = _st.LastSeq + 1;
+ _st.LastSeq = seq;
+ _st.LastTime = DateTime.UtcNow;
+ if (_st.Msgs == 0)
+ {
+ _st.FirstSeq = seq + 1;
+ _st.FirstTime = default;
+ }
+ else
+ {
+ _dmap.Add(seq);
+ }
+ return seq;
+ }
+ }
+
+ // Go: memStore.SkipMsgs server/memstore.go:395
+ void IStreamStore.SkipMsgs(ulong seq, ulong num)
+ {
+ lock (_gate)
+ {
+ if (seq != 0 && seq != _st.LastSeq + 1)
+ throw new InvalidOperationException($"Sequence mismatch: expected {_st.LastSeq + 1}, got {seq}.");
+ if (seq == 0)
+ seq = _st.LastSeq + 1;
+ var lseq = seq + num - 1;
+ _st.LastSeq = lseq;
+ _st.LastTime = DateTime.UtcNow;
+ if (_st.Msgs == 0)
+ {
+ _st.FirstSeq = lseq + 1;
+ _st.FirstTime = default;
+ }
+ else
+ {
+ for (var s = seq; s <= lseq; s++)
+ _dmap.Add(s);
+ }
+ }
+ }
+
+ // Go: memStore.FlushAllPending server/memstore.go:423 — no-op for in-memory store
+ void IStreamStore.FlushAllPending() { }
+
+ // Go: memStore.LoadMsg server/memstore.go:1692
+ StoreMsg IStreamStore.LoadMsg(ulong seq, StoreMsg? sm)
+ {
+ lock (_gate)
+ {
+ if (!_msgs.TryGetValue(seq, out var m))
+ throw new KeyNotFoundException($"Message sequence {seq} not found.");
+ return FillStoreMsg(sm ?? new StoreMsg(), m);
+ }
+ }
+
+ // Go: memStore.LoadNextMsg server/memstore.go:1798
+ (StoreMsg Msg, ulong Skip) IStreamStore.LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? sm)
+ {
+ lock (_gate)
+ {
+ if (start < _st.FirstSeq)
+ start = _st.FirstSeq;
+
+ for (var seq = start; seq <= _st.LastSeq; seq++)
+ {
+ if (!_msgs.TryGetValue(seq, out var m))
+ continue;
+ if (SubjectMatchesFilter(m.Subj, filter))
+ {
+ var skip = seq > start ? seq - start : 0UL;
+ return (FillStoreMsg(sm ?? new StoreMsg(), m), skip);
+ }
+ }
+ throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'.");
+ }
+ }
+
+ // Go: memStore.LoadLastMsg server/memstore.go:1724
+ StoreMsg IStreamStore.LoadLastMsg(string subject, StoreMsg? sm)
+ {
+ lock (_gate)
+ {
+ Msg? found = null;
+
+ if (string.IsNullOrEmpty(subject) || subject == ">")
+ {
+ // Last message in the stream
+ _msgs.TryGetValue(_st.LastSeq, out found);
+ if (found == null)
+ {
+ // LastSeq might be deleted — walk backwards
+ for (var s = _st.LastSeq; s >= _st.FirstSeq; s--)
+ {
+ if (_msgs.TryGetValue(s, out found)) break;
+ }
+ }
+ }
+ else if (SubjectMatch.IsLiteral(subject))
+ {
+ if (_fss.TryGetValue(subject, out var ss) && ss.Msgs > 0)
+ _msgs.TryGetValue(ss.Last, out found);
+ }
+ else
+ {
+ // Wildcard: scan for the highest-seq matching message
+ ulong bestSeq = 0;
+ foreach (var kv in _msgs)
+ {
+ if (kv.Key > bestSeq && SubjectMatchesFilter(kv.Value.Subj, subject))
+ {
+ bestSeq = kv.Key;
+ found = kv.Value;
+ }
+ }
+ }
+
+ if (found == null)
+ throw new KeyNotFoundException($"No message found for subject '{subject}'.");
+ return FillStoreMsg(sm ?? new StoreMsg(), found);
+ }
+ }
+
+ // Go: memStore.LoadPrevMsg — walk backwards from start
+ StoreMsg IStreamStore.LoadPrevMsg(ulong start, StoreMsg? sm)
+ {
+ lock (_gate)
+ {
+ for (var seq = start - 1; seq >= _st.FirstSeq && seq <= _st.LastSeq; seq--)
+ {
+ if (_msgs.TryGetValue(seq, out var m))
+ return FillStoreMsg(sm ?? new StoreMsg(), m);
+ if (seq == 0) break;
+ }
+ throw new KeyNotFoundException($"No message found before seq {start}.");
+ }
+ }
+
+ // Go: memStore.RemoveMsg — soft delete
+ bool IStreamStore.RemoveMsg(ulong seq)
+ {
+ lock (_gate)
+ {
+ return RemoveInternal(seq);
+ }
+ }
+
+ // Go: memStore.EraseMsg — overwrite then remove
+ bool IStreamStore.EraseMsg(ulong seq)
+ {
+ lock (_gate)
+ {
+ return RemoveInternal(seq);
+ }
+ }
+
+ // Go: memStore.Purge server/memstore.go:1471
+ ulong IStreamStore.Purge()
+ {
+ lock (_gate)
+ {
+ return PurgeInternal();
+ }
+ }
+
+ // Go: memStore.PurgeEx server/memstore.go:1422
+ ulong IStreamStore.PurgeEx(string subject, ulong seq, ulong keep)
+ {
+ if (string.IsNullOrEmpty(subject) || subject == ">")
+ {
+ if (keep == 0 && seq == 0)
+ return ((IStreamStore)this).Purge();
+ if (seq > 1)
+ return CompactInternal(seq);
+ if (keep > 0)
+ {
+ lock (_gate)
+ {
+ var msgs = _st.Msgs;
+ var lseq = _st.LastSeq;
+ if (keep >= msgs) return 0;
+ return CompactInternal(lseq - keep + 1);
+ }
+ }
+ return 0;
+ }
+
+ // Subject-specific purge
+ var ss = FilteredStateInternal(1, subject);
+ if (ss.Msgs == 0) return 0;
+
+ var targetCount = ss.Msgs;
+ if (keep > 0)
+ {
+ if (keep >= ss.Msgs) return 0;
+ targetCount -= keep;
+ }
+
+ var last = ss.Last;
+ if (seq > 1)
+ last = seq - 1;
+
+ lock (_gate)
+ {
+ ulong purged = 0;
+ for (var s = ss.First; s <= last && purged < targetCount; s++)
+ {
+ if (_msgs.TryGetValue(s, out var m) && SubjectMatchesFilter(m.Subj, subject))
+ {
+ if (RemoveInternal(s))
+ purged++;
+ }
+ }
+ return purged;
+ }
+ }
+
+ // Go: memStore.Compact server/memstore.go:1509
+ ulong IStreamStore.Compact(ulong seq) => CompactInternal(seq);
+
+ // Go: memStore.Truncate server/memstore.go:1618
+ void IStreamStore.Truncate(ulong seq)
+ {
+ lock (_gate)
+ {
+ if (seq == 0)
+ {
+ // Reset to empty
+ _msgs = new Dictionary();
+ _fss.Clear();
+ _dmap.Clear();
+ _st = default;
+ return;
+ }
+
+ for (var i = _st.LastSeq; i > seq; i--)
+ {
+ if (_msgs.TryGetValue(i, out var m))
+ {
+ _st.Bytes -= MsgSize(m);
+ _st.Msgs--;
+ RemoveSeqPerSubject(m.Subj, i);
+ _msgs.Remove(i);
+ }
+ else
+ {
+ _dmap.Remove(i);
+ }
+ }
+ _st.LastSeq = seq;
+ if (_msgs.TryGetValue(seq, out var last))
+ _st.LastTime = new DateTime(last.Ts / 100L, DateTimeKind.Utc);
+ }
+ }
+
+ // Go: memStore.GetSeqFromTime server/memstore.go:453
+ ulong IStreamStore.GetSeqFromTime(DateTime t)
+ {
+ lock (_gate)
+ {
+ if (_msgs.Count == 0)
+ return _st.LastSeq + 1;
+
+ // Go uses time.Time.UnixNano(); match with Ticks*100 (100ns resolution)
+ var ts = t.ToUniversalTime().Ticks * 100L;
+
+ if (!_msgs.TryGetValue(_st.FirstSeq, out var firstMsg))
+ return _st.LastSeq + 1;
+ if (ts <= firstMsg.Ts)
+ return _st.FirstSeq;
+
+ // Find actual last msg (LastSeq may be a gap — last does not go backwards)
+ Msg? lastMsg = null;
+ for (var s = _st.LastSeq; s > _st.FirstSeq; s--)
+ {
+ if (_msgs.TryGetValue(s, out lastMsg)) break;
+ }
+ if (lastMsg == null)
+ return _st.LastSeq + 1;
+
+ // Go: if ts == last return lmsg.seq; if ts > last return LastSeq+1
+ if (ts == lastMsg.Ts)
+ return lastMsg.Seq;
+ if (ts > lastMsg.Ts)
+ return _st.LastSeq + 1;
+
+ // Binary search with gap awareness
+ var fseq = _st.FirstSeq;
+ var lseq = _st.LastSeq;
+ var result = lseq + 1;
+
+ while (fseq <= lseq)
+ {
+ var mid = fseq + (lseq - fseq) / 2;
+ // Walk forward to find a real message at/after mid
+ var off = 0UL;
+ Msg? cand = null;
+ while (mid + off <= lseq)
+ {
+ if (_msgs.TryGetValue(mid + off, out cand))
+ break;
+ off++;
+ }
+ if (cand == null)
+ {
+ lseq = mid - 1;
+ continue;
+ }
+ if (cand.Ts >= ts)
+ {
+ result = cand.Seq;
+ if (mid == fseq) break;
+ lseq = mid - 1;
+ }
+ else
+ {
+ fseq = mid + off + 1;
+ }
+ }
+ return result;
+ }
+ }
+
+ // Go: memStore.FilteredState server/memstore.go:531
+ SimpleState IStreamStore.FilteredState(ulong seq, string subject)
+ => FilteredStateInternal(seq, subject);
+
+ // Go: memStore.SubjectsState server/memstore.go:748
+ Dictionary IStreamStore.SubjectsState(string filterSubject)
+ {
+ lock (_gate)
+ {
+ var result = new Dictionary(StringComparer.Ordinal);
+ if (_fss.Count == 0) return result;
+
+ foreach (var kv in _fss)
+ {
+ if (!SubjectMatchesFilter(kv.Key, filterSubject)) continue;
+ result[kv.Key] = new SimpleState
+ {
+ Msgs = kv.Value.Msgs,
+ First = kv.Value.First,
+ Last = kv.Value.Last,
+ };
+ }
+ return result;
+ }
+ }
+
+ // Go: memStore.SubjectsTotals server/memstore.go:881
+ Dictionary IStreamStore.SubjectsTotals(string filterSubject)
+ {
+ lock (_gate)
+ {
+ var result = new Dictionary(StringComparer.Ordinal);
+ foreach (var kv in _fss)
+ {
+ if (SubjectMatchesFilter(kv.Key, filterSubject))
+ result[kv.Key] = kv.Value.Msgs;
+ }
+ return result;
+ }
+ }
+
+ // Go: memStore.AllLastSeqs server/memstore.go:780
+ ulong[] IStreamStore.AllLastSeqs()
+ {
+ lock (_gate)
+ {
+ if (_msgs.Count == 0) return [];
+ var seqs = _fss.Values.Select(ss => ss.Last).ToArray();
+ Array.Sort(seqs);
+ return seqs;
+ }
+ }
+
+ // Go: memStore.MultiLastSeqs server/memstore.go:828
+ ulong[] IStreamStore.MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed)
+ {
+ lock (_gate)
+ {
+ if (_msgs.Count == 0) return [];
+
+ var effective = maxSeq == 0 ? _st.LastSeq : maxSeq;
+ var seen = new HashSet();
+
+ foreach (var filter in filters)
+ {
+ foreach (var kv in _fss)
+ {
+ if (!SubjectMatchesFilter(kv.Key, filter)) continue;
+ var ss = kv.Value;
+ if (ss.Last <= effective)
+ {
+ seen.Add(ss.Last);
+ }
+ else if (ss.Msgs > 1)
+ {
+ // Walk backwards to find the last seq <= maxSeq
+ for (var s = effective; s >= _st.FirstSeq; s--)
+ {
+ if (_msgs.TryGetValue(s, out var m) && m.Subj == kv.Key)
+ {
+ seen.Add(s);
+ break;
+ }
+ if (s == 0) break;
+ }
+ }
+ }
+ if (maxAllowed > 0 && seen.Count > maxAllowed)
+ throw new InvalidOperationException($"Too many results: got {seen.Count}, max allowed is {maxAllowed}.");
+ }
+
+ var result = seen.ToArray();
+ Array.Sort(result);
+ return result;
+ }
+ }
+
+ // Go: memStore.SubjectForSeq server/memstore.go:1678
+ string IStreamStore.SubjectForSeq(ulong seq)
+ {
+ lock (_gate)
+ {
+ if (seq < _st.FirstSeq || !_msgs.TryGetValue(seq, out var m))
+ throw new KeyNotFoundException($"Message sequence {seq} not found.");
+ return m.Subj;
+ }
+ }
+
+ // Go: memStore.NumPending server/memstore.go:913
+ (ulong Total, ulong ValidThrough) IStreamStore.NumPending(ulong sseq, string filter, bool lastPerSubject)
+ {
+ lock (_gate)
+ {
+ var ss = FilteredStateLocked(sseq, filter, lastPerSubject);
+ return (ss.Msgs, _st.LastSeq);
+ }
+ }
+
+ // Go: memStore.State server/memstore.go — full state
+ StorageStreamState IStreamStore.State()
+ {
+ lock (_gate)
+ {
+ var state = new StorageStreamState
+ {
+ Msgs = _st.Msgs,
+ Bytes = _st.Bytes,
+ FirstSeq = _st.FirstSeq,
+ FirstTime = _st.FirstTime,
+ LastSeq = _st.LastSeq,
+ LastTime = _st.LastTime,
+ NumSubjects = _fss.Count,
+ NumDeleted = _dmap.Count,
+ };
+
+ if (_dmap.Count > 0)
+ state.Deleted = _dmap.ToArray();
+
+ return state;
+ }
+ }
+
+ // Go: memStore.FastState server/memstore.go — populate without deleted list
+ void IStreamStore.FastState(ref StorageStreamState state)
+ {
+ lock (_gate)
+ {
+ state.Msgs = _st.Msgs;
+ state.Bytes = _st.Bytes;
+ state.FirstSeq = _st.FirstSeq;
+ state.FirstTime = _st.FirstTime;
+ state.LastSeq = _st.LastSeq;
+ state.LastTime = _st.LastTime;
+ state.NumSubjects = _fss.Count;
+ state.NumDeleted = _dmap.Count;
+ }
+ }
+
+ // Go: memStore.Type
+ StorageType IStreamStore.Type() => StorageType.Memory;
+
+ // Go: memStore.UpdateConfig server/memstore.go:86
+ void IStreamStore.UpdateConfig(StreamConfig cfg)
+ {
+ lock (_gate)
+ {
+ var prevMaxp = _maxp;
+ _cfg = cfg;
+ _maxp = cfg.MaxMsgsPer;
+
+ // Create or destroy TTL wheel
+ if (cfg.AllowMsgTtl && _ttls == null)
+ _ttls = new SortedDictionary();
+ else if (!cfg.AllowMsgTtl && _ttls != null)
+ {
+ _ttls = null;
+ _ttlTimer?.Dispose();
+ _ttlTimer = null;
+ }
+
+ // Enforce per-subject limit if it shrank
+ if (_maxp > 0)
+ {
+ foreach (var kv in _fss)
+ EnforcePerSubjectLimit(kv.Key, kv.Value);
+ }
+ }
+ }
+
+ // Go: memStore.Stop — no-op for in-memory store
+ void IStreamStore.Stop() { }
+
+ // Go: memStore.Delete — clear everything
+ void IStreamStore.Delete(bool inline)
+ {
+ lock (_gate)
+ {
+ _msgs = new Dictionary();
+ _fss.Clear();
+ _dmap.Clear();
+ _st = default;
+ _ttls = null;
+ _ttlTimer?.Dispose();
+ _ttlTimer = null;
+ }
+ }
+
+ // Go: memStore.ResetState
+ void IStreamStore.ResetState() { }
+
+ // EncodedStreamState, ConsumerStore — not needed for MemStore tests
+ byte[] IStreamStore.EncodedStreamState(ulong failed) => [];
+
+ IConsumerStore IStreamStore.ConsumerStore(string name, DateTime created, ConsumerConfig cfg)
+ => throw new NotSupportedException("MemStore does not implement ConsumerStore.");
+
+ // -------------------------------------------------------------------------
+ // TrimToMaxMessages — legacy helper used by existing async tests
+ // -------------------------------------------------------------------------
+
public void TrimToMaxMessages(ulong maxMessages)
{
lock (_gate)
{
- while ((ulong)_messages.Count > maxMessages)
+ while (_st.Msgs > maxMessages)
+ RemoveInternal(_st.FirstSeq);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Internal helpers
+ // -------------------------------------------------------------------------
+
+ // Go: memStore.storeRawMsg server/memstore.go:195
+ private void StoreInternal(string subj, byte[]? hdr, byte[]? data, ulong seq, long ts, long ttl)
+ {
+ if (seq != _st.LastSeq + 1)
+ {
+ if (seq > 0)
+ throw new InvalidOperationException($"Sequence mismatch: expected {_st.LastSeq + 1}, got {seq}.");
+ seq = _st.LastSeq + 1;
+ }
+
+ var now = new DateTime(ts / 100L, DateTimeKind.Utc);
+ if (_st.Msgs == 0)
+ {
+ _st.FirstSeq = seq;
+ _st.FirstTime = now;
+ }
+
+ var m = new Msg(subj, hdr, data, seq, ts);
+ _msgs[seq] = m;
+ _st.Msgs++;
+ _st.Bytes += MsgSize(m);
+ _st.LastSeq = seq;
+ _st.LastTime = now;
+
+ // Per-subject tracking
+ if (!string.IsNullOrEmpty(subj))
+ {
+ if (_fss.TryGetValue(subj, out var ss))
{
- var first = _messages.Keys.Min();
- _messages.Remove(first);
+ ss.Msgs++;
+ ss.Last = seq;
+ if (_maxp > 0 && ss.Msgs > (ulong)_maxp)
+ EnforcePerSubjectLimit(subj, ss);
+ }
+ else
+ {
+ _fss[subj] = new SubjectState { Msgs = 1, First = seq, Last = seq };
+ }
+ }
+
+ // TTL tracking
+ if (_ttls != null && ttl > 0)
+ {
+ var expires = DateTime.UtcNow.AddSeconds(ttl).Ticks;
+ _ttls[seq] = expires;
+ ScheduleTtlCheck();
+ }
+ }
+
+ // Go: memStore.removeMsg + updateFirstSeq server/memstore.go:2110 + 2004
+ // Go always inserts seq into dmap first, then updateFirstSeq cleans up dmap entries
+ // for any sequences <= the new first seq.
+ private bool RemoveInternal(ulong seq)
+ {
+ if (!_msgs.TryGetValue(seq, out var m))
+ return false;
+
+ _st.Msgs--;
+ _st.Bytes -= MsgSize(m);
+
+ // Go: ms.dmap.Insert(seq) — always insert into dmap first
+ _dmap.Add(seq);
+
+ // Go: ms.updateFirstSeq(seq) — update first seq and clean dmap
+ UpdateFirstSeqLocked(seq);
+
+ RemoveSeqPerSubject(m.Subj, seq);
+ _msgs.Remove(seq);
+ _ttls?.Remove(seq);
+ return true;
+ }
+
+ // Go: memStore.updateFirstSeq server/memstore.go:2004
+ // Must be called with _gate held. Seq is the sequence that was just removed.
+ private void UpdateFirstSeqLocked(ulong seq)
+ {
+ if (seq != _st.FirstSeq)
+ {
+ // Interior delete — nothing to do for first seq
+ return;
+ }
+
+ // Walk forward to find the next real message
+ Msg? nsm = null;
+ var nseq = _st.FirstSeq + 1;
+ while (nseq <= _st.LastSeq)
+ {
+ if (_msgs.TryGetValue(nseq, out nsm))
+ break;
+ nseq++;
+ }
+
+ var oldFirst = _st.FirstSeq;
+ if (nsm != null)
+ {
+ _st.FirstSeq = nsm.Seq;
+ _st.FirstTime = new DateTime(nsm.Ts / 100L, DateTimeKind.Utc);
+ }
+ else
+ {
+ // Like purge — no more messages
+ _st.FirstSeq = _st.LastSeq + 1;
+ _st.FirstTime = default;
+ }
+
+ // Go: clean up dmap for all sequences from oldFirst to new FirstSeq (exclusive)
+ // These are no longer "interior" gaps since they're now before the first msg.
+ if (oldFirst == _st.FirstSeq - 1)
+ {
+ _dmap.Remove(oldFirst);
+ }
+ else
+ {
+ for (var s = oldFirst; s < _st.FirstSeq; s++)
+ _dmap.Remove(s);
+ }
+ }
+
+ // Go: memStore.removeSeqPerSubject — update fss when a sequence is removed
+ private void RemoveSeqPerSubject(string subj, ulong seq)
+ {
+ if (!_fss.TryGetValue(subj, out var ss)) return;
+ ss.Msgs--;
+ if (ss.Msgs == 0)
+ {
+ _fss.Remove(subj);
+ return;
+ }
+ if (seq == ss.First)
+ {
+ // Find the next real first for this subject
+ for (var s = seq + 1; s <= _st.LastSeq; s++)
+ {
+ if (_msgs.TryGetValue(s, out var m) && m.Subj == subj)
+ {
+ ss.First = s;
+ break;
+ }
+ }
+ }
+ if (seq == ss.Last)
+ {
+ // Find the previous real last for this subject
+ for (var s = seq - 1; s >= ss.First && s <= _st.LastSeq; s--)
+ {
+ if (_msgs.TryGetValue(s, out var m) && m.Subj == subj)
+ {
+ ss.Last = s;
+ break;
+ }
+ if (s == 0) break;
}
}
}
+
+ // Go: memStore.purge server/memstore.go:1475
+ private ulong PurgeInternal()
+ {
+ var purged = (ulong)_msgs.Count;
+ var fseq = _st.LastSeq + 1;
+ _msgs = new Dictionary();
+ _fss.Clear();
+ _dmap.Clear();
+ _ttls?.Clear();
+ _st = new StreamStateMutable
+ {
+ FirstSeq = fseq,
+ LastSeq = fseq - 1,
+ };
+ return purged;
+ }
+
+ // Go: memStore.compact server/memstore.go:1513
+ private ulong CompactInternal(ulong seq)
+ {
+ if (seq == 0)
+ return PurgeInternal();
+
+ lock (_gate)
+ {
+ if (_st.FirstSeq > seq) return 0;
+
+ if (seq <= _st.LastSeq)
+ {
+ var fseq = _st.FirstSeq;
+ // Find the actual new first seq
+ for (var s = seq; s <= _st.LastSeq; s++)
+ {
+ if (_msgs.TryGetValue(s, out var nm))
+ {
+ _st.FirstSeq = s;
+ _st.FirstTime = new DateTime(nm.Ts / 100L, DateTimeKind.Utc);
+ break;
+ }
+ }
+
+ ulong purged = 0;
+ for (var s = seq - 1; s >= fseq && s < ulong.MaxValue; s--)
+ {
+ if (_msgs.TryGetValue(s, out var m))
+ {
+ _st.Bytes -= MsgSize(m);
+ purged++;
+ RemoveSeqPerSubject(m.Subj, s);
+ _msgs.Remove(s);
+ }
+ else
+ {
+ _dmap.Remove(s);
+ }
+ if (s == 0) break;
+ }
+ if (purged > _st.Msgs) purged = _st.Msgs;
+ _st.Msgs -= purged;
+ return purged;
+ }
+ else
+ {
+ // Compact past the end
+ var purged = (ulong)_msgs.Count;
+ _msgs = new Dictionary();
+ _fss.Clear();
+ _dmap.Clear();
+ _ttls?.Clear();
+ _st = new StreamStateMutable
+ {
+ FirstSeq = seq,
+ LastSeq = seq - 1,
+ };
+ return purged;
+ }
+ }
+ }
+
+ // Go: memStore.filteredStateLocked server/memstore.go:540
+ private SimpleState FilteredStateInternal(ulong sseq, string subject)
+ {
+ lock (_gate)
+ {
+ return FilteredStateLocked(sseq, subject, false);
+ }
+ }
+
+ // Called with _gate held
+ private SimpleState FilteredStateLocked(ulong sseq, string filter, bool lastPerSubject)
+ {
+ if (sseq < _st.FirstSeq) sseq = _st.FirstSeq;
+ if (sseq > _st.LastSeq) return default;
+
+ var isAll = string.IsNullOrEmpty(filter) || filter == ">";
+ if (isAll && sseq <= _st.FirstSeq)
+ {
+ var total = lastPerSubject ? (ulong)_fss.Count : _st.Msgs;
+ return new SimpleState { Msgs = total, First = _st.FirstSeq, Last = _st.LastSeq };
+ }
+
+ // Build result by scanning messages
+ ulong msgs = 0, first = 0, last = 0;
+ var seen = lastPerSubject ? new HashSet(StringComparer.Ordinal) : null;
+
+ for (var seq = sseq; seq <= _st.LastSeq; seq++)
+ {
+ if (!_msgs.TryGetValue(seq, out var m)) continue;
+ if (!isAll && !SubjectMatchesFilter(m.Subj, filter)) continue;
+ if (seen != null)
+ {
+ if (!seen.Add(m.Subj)) continue;
+ }
+ msgs++;
+ if (first == 0) first = seq;
+ last = seq;
+ }
+
+ return new SimpleState { Msgs = msgs, First = first, Last = last };
+ }
+
+ // Go: memStore.enforcePerSubjectLimit server/memstore.go:1072
+ private void EnforcePerSubjectLimit(string subj, SubjectState ss)
+ {
+ if (_maxp <= 0) return;
+ while (ss.Msgs > (ulong)_maxp)
+ {
+ if (!RemoveInternal(ss.First))
+ break;
+ }
+ }
+
+ // Go: memStoreMsgSize server/memstore.go
+ private static ulong MsgSize(Msg m)
+ {
+ var size = (ulong)(m.Subj.Length + (m.Hdr?.Length ?? 0) + (m.Data?.Length ?? 0) + 16);
+ return size;
+ }
+
+ // Subject filter matching (same as FileStore helper)
+ private static bool SubjectMatchesFilter(string subject, string filter)
+ {
+ if (string.IsNullOrEmpty(filter) || filter == ">")
+ return true;
+ if (SubjectMatch.IsLiteral(filter))
+ return string.Equals(subject, filter, StringComparison.Ordinal);
+ return SubjectMatch.MatchLiteral(subject, filter);
+ }
+
+ // Fill a StoreMsg from an internal Msg
+ private static StoreMsg FillStoreMsg(StoreMsg sm, Msg m)
+ {
+ sm.Clear();
+ sm.Subject = m.Subj;
+ sm.Header = m.Hdr?.Length > 0 ? m.Hdr : null;
+ sm.Data = m.Data?.Length > 0 ? m.Data : null;
+ sm.Sequence = m.Seq;
+ sm.Timestamp = m.Ts;
+ return sm;
+ }
+
+ // -------------------------------------------------------------------------
+ // TTL expiry
+ // -------------------------------------------------------------------------
+
+ private void ScheduleTtlCheck()
+ {
+ _ttlTimer?.Dispose();
+ _ttlTimer = new Timer(_ => ExpireTtlMessages(), null, TimeSpan.FromSeconds(1), Timeout.InfiniteTimeSpan);
+ }
+
+ private void ExpireTtlMessages()
+ {
+ lock (_gate)
+ {
+ if (_ttls == null) return;
+ var now = DateTime.UtcNow.Ticks;
+ var toRemove = _ttls.Where(kv => kv.Value <= now).Select(kv => kv.Key).ToList();
+ foreach (var seq in toRemove)
+ RemoveInternal(seq);
+
+ if (_ttls.Count > 0)
+ ScheduleTtlCheck();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Internal method exposed for test access — nextWildcardMatchLocked
+ // (Go: nextWildcardMatchLocked server/memstore.go:1810)
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Returns the (first, last, found) range for messages whose subject matches
+ /// at or after . Called with
+ /// _gate already held.
+ ///
+ internal (ulong First, ulong Last, bool Found) NextWildcardMatchLocked(string filter, ulong start)
+ {
+ ulong first = _st.LastSeq, last = 0;
+ var found = false;
+
+ foreach (var kv in _fss)
+ {
+ if (!SubjectMatchesFilter(kv.Key, filter)) continue;
+ var ss = kv.Value;
+ if (start > ss.Last) continue;
+ found = true;
+ var f = Math.Max(start, ss.First);
+ if (f < first) first = f;
+ if (ss.Last > last) last = ss.Last;
+ }
+
+ return found ? (Math.Max(first, start), last, true) : (0UL, 0UL, false);
+ }
+
+ ///
+ /// Returns the (first, last, found) range for messages whose subject exactly
+ /// equals at or after . Called
+ /// with _gate already held.
+ ///
+ internal (ulong First, ulong Last, bool Found) NextLiteralMatchLocked(string filter, ulong start)
+ {
+ if (!_fss.TryGetValue(filter, out var ss)) return (0, 0, false);
+ if (start > ss.Last) return (0, 0, false);
+ return (Math.Max(start, ss.First), ss.Last, true);
+ }
+
+ /// Exposes _gate for test-level lock acquisition.
+ internal object Gate => _gate;
}
diff --git a/tests/NATS.Server.Tests/JetStream/Storage/MemStoreGoParityTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/MemStoreGoParityTests.cs
new file mode 100644
index 0000000..1b82e8e
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Storage/MemStoreGoParityTests.cs
@@ -0,0 +1,951 @@
+// Reference: golang/nats-server/server/memstore_test.go
+// Tests ported in this file:
+// TestMemStoreCompact → Compact_RemovesMessagesBeforeSeq
+// TestMemStoreStreamStateDeleted → StreamStateDeleted_TracksDmapCorrectly
+// TestMemStoreStreamTruncate → Truncate_RemovesMessagesAfterSeq
+// TestMemStoreUpdateMaxMsgsPerSubject → UpdateMaxMsgsPerSubject_EnforcesNewLimit
+// TestMemStoreStreamCompactMultiBlockSubjectInfo → Compact_AdjustsSubjectCount
+// TestMemStoreSubjectsTotals → SubjectsTotals_MatchesStoredCounts
+// TestMemStoreNumPending → NumPending_MatchesFilteredCount
+// TestMemStoreMultiLastSeqs → MultiLastSeqs_ReturnsLastPerSubject
+// TestMemStoreSubjectForSeq → SubjectForSeq_ReturnsCorrectSubject
+// TestMemStoreSubjectDeleteMarkers → SubjectDeleteMarkers_TtlExpiry (skipped: needs pmsgcb)
+// TestMemStoreAllLastSeqs → AllLastSeqs_ReturnsLastPerSubjectSorted
+// TestMemStoreGetSeqFromTimeWithLastDeleted → GetSeqFromTime_WithLastDeleted
+// TestMemStoreSkipMsgs → SkipMsgs_ReservesSequences
+// TestMemStoreDeleteBlocks → DeleteBlocks_DmapSizeMatchesNumDeleted
+// TestMemStoreMessageTTL → MessageTTL_ExpiresAfterDelay
+// TestMemStoreUpdateConfigTTLState → UpdateConfig_TtlStateInitializedAndDestroyed
+// TestMemStoreNextWildcardMatch → NextWildcardMatch_BoundsAreCorrect
+// TestMemStoreNextLiteralMatch → NextLiteralMatch_BoundsAreCorrect
+// TestMemStoreInitialFirstSeq → InitialFirstSeq_StartAtConfiguredSeq
+// TestMemStoreStreamTruncateReset → TruncateReset_ClearsEverything
+// TestMemStorePurgeExWithSubject → PurgeEx_WithSubject_PurgesAll
+// TestMemStorePurgeExWithDeletedMsgs → PurgeEx_WithDeletedMsgs_CorrectFirstSeq
+// TestMemStoreDeleteAllFirstSequenceCheck → DeleteAll_FirstSeqIsLastPlusOne
+// TestMemStoreNumPendingBug → NumPending_Bug_CorrectCount
+// TestMemStorePurgeLeaksDmap → Purge_ClearsDmap
+// TestMemStoreMultiLastSeqsMaxAllowed → MultiLastSeqs_MaxAllowed_ThrowsWhenExceeded
+
+using NATS.Server.JetStream.Models;
+using NATS.Server.JetStream.Storage;
+
+namespace NATS.Server.Tests.JetStream.Storage;
+
+///
+/// Go MemStore parity tests. Each test mirrors a specific Go test from
+/// golang/nats-server/server/memstore_test.go to verify behaviour parity.
+///
+public sealed class MemStoreGoParityTests
+{
+ // Helper: cast to IStreamStore for sync methods
+ private static IStreamStore Sync(MemStore ms) => ms;
+
+ // -------------------------------------------------------------------------
+ // Compact
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreCompact server/memstore_test.go:259
+ [Fact]
+ public void Compact_RemovesMessagesBeforeSeq()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ for (var i = 0; i < 10; i++)
+ s.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0);
+
+ s.State().Msgs.ShouldBe(10UL);
+
+ var n = s.Compact(6);
+ n.ShouldBe(5UL);
+
+ var state = s.State();
+ state.Msgs.ShouldBe(5UL);
+ state.FirstSeq.ShouldBe(6UL);
+
+ // Compact past the end resets first seq
+ n = s.Compact(100);
+ n.ShouldBe(5UL);
+ s.State().FirstSeq.ShouldBe(100UL);
+ }
+
+ // -------------------------------------------------------------------------
+ // StreamStateDeleted
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreStreamStateDeleted server/memstore_test.go:342
+ [Fact]
+ public void StreamStateDeleted_TracksDmapCorrectly()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ const ulong toStore = 10;
+ for (ulong i = 1; i <= toStore; i++)
+ s.StoreMsg("foo", null, new byte[8], 0);
+
+ s.State().Deleted.ShouldBeNull();
+
+ // Delete even sequences 2,4,6,8
+ var expectedDeleted = new List();
+ for (ulong seq = 2; seq < toStore; seq += 2)
+ {
+ s.RemoveMsg(seq);
+ expectedDeleted.Add(seq);
+ }
+
+ var state = s.State();
+ state.Deleted.ShouldNotBeNull();
+ state.Deleted!.ShouldBe(expectedDeleted.ToArray());
+
+ // Delete 1 and 3 to fill first gap — deleted should shift forward
+ s.RemoveMsg(1);
+ s.RemoveMsg(3);
+ expectedDeleted = expectedDeleted.Skip(2).ToList(); // remove 2 and 4 from start
+ state = s.State();
+ state.Deleted!.ShouldBe(expectedDeleted.ToArray());
+ state.FirstSeq.ShouldBe(5UL);
+
+ s.Purge();
+ s.State().Deleted.ShouldBeNull();
+ }
+
+ // -------------------------------------------------------------------------
+ // Truncate
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreStreamTruncate server/memstore_test.go:385
+ [Fact]
+ public void Truncate_RemovesMessagesAfterSeq()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ const ulong tseq = 50;
+ const ulong toStore = 100;
+
+ for (ulong i = 1; i < tseq; i++)
+ s.StoreMsg("foo", null, "ok"u8.ToArray(), 0);
+ for (var i = tseq; i <= toStore; i++)
+ s.StoreMsg("bar", null, "ok"u8.ToArray(), 0);
+
+ s.State().Msgs.ShouldBe(toStore);
+
+ s.Truncate(tseq);
+ s.State().Msgs.ShouldBe(tseq);
+
+ // Truncate with some interior deletes
+ s.RemoveMsg(10);
+ s.RemoveMsg(20);
+ s.RemoveMsg(30);
+ s.RemoveMsg(40);
+
+ s.Truncate(25);
+ var state = s.State();
+ // 25 seqs remaining, minus 2 deleted (10, 20) = 23 messages
+ state.Msgs.ShouldBe(tseq - 2 - (tseq - 25));
+ state.NumSubjects.ShouldBe(1); // only "foo" left
+ state.Deleted!.ShouldBe([10UL, 20UL]);
+ }
+
+ // -------------------------------------------------------------------------
+ // TruncateReset
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreStreamTruncateReset server/memstore_test.go:490
+ [Fact]
+ public void TruncateReset_ClearsEverything()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ for (var i = 0; i < 1000; i++)
+ s.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0);
+
+ s.Truncate(0);
+
+ var state = s.State();
+ state.Msgs.ShouldBe(0UL);
+ state.Bytes.ShouldBe(0UL);
+ state.FirstSeq.ShouldBe(0UL);
+ state.LastSeq.ShouldBe(0UL);
+ state.NumSubjects.ShouldBe(0);
+ state.NumDeleted.ShouldBe(0);
+
+ // Can store again after reset
+ for (var i = 0; i < 1000; i++)
+ s.StoreMsg("foo", null, "Hello World"u8.ToArray(), 0);
+
+ state = s.State();
+ state.Msgs.ShouldBe(1000UL);
+ state.FirstSeq.ShouldBe(1UL);
+ state.LastSeq.ShouldBe(1000UL);
+ state.NumSubjects.ShouldBe(1);
+ state.NumDeleted.ShouldBe(0);
+ }
+
+ // -------------------------------------------------------------------------
+ // UpdateMaxMsgsPerSubject
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreUpdateMaxMsgsPerSubject server/memstore_test.go:452
+ [Fact]
+ public void UpdateMaxMsgsPerSubject_EnforcesNewLimit()
+ {
+ var cfg = new StreamConfig
+ {
+ Name = "TEST",
+ Storage = StorageType.Memory,
+ Subjects = ["foo"],
+ MaxMsgsPer = 10,
+ };
+ var ms = new MemStore(cfg);
+ var s = Sync(ms);
+
+ // Increase limit — should allow more
+ cfg.MaxMsgsPer = 50;
+ s.UpdateConfig(cfg);
+
+ const int numStored = 22;
+ for (var i = 0; i < numStored; i++)
+ s.StoreMsg("foo", null, [], 0);
+
+ var ss = s.SubjectsState("foo")["foo"];
+ ss.Msgs.ShouldBe((ulong)numStored);
+
+ // Shrink limit — should truncate stored
+ cfg.MaxMsgsPer = 10;
+ s.UpdateConfig(cfg);
+
+ ss = s.SubjectsState("foo")["foo"];
+ ss.Msgs.ShouldBe(10UL);
+ }
+
+ // -------------------------------------------------------------------------
+ // CompactMultiBlockSubjectInfo
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreStreamCompactMultiBlockSubjectInfo server/memstore_test.go:531
+ [Fact]
+ public void Compact_AdjustsSubjectCount()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ for (var i = 0; i < 1000; i++)
+ s.StoreMsg($"foo.{i}", null, "Hello World"u8.ToArray(), 0);
+
+ var deleted = s.Compact(501);
+ deleted.ShouldBe(500UL);
+
+ s.State().NumSubjects.ShouldBe(500);
+ }
+
+ // -------------------------------------------------------------------------
+ // SubjectsTotals
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreSubjectsTotals server/memstore_test.go:557
+ [Fact]
+ public void SubjectsTotals_MatchesStoredCounts()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ var fmap = new Dictionary();
+ var bmap = new Dictionary();
+ var rng = new Random(42);
+
+ for (var i = 0; i < 10_000; i++)
+ {
+ string ft;
+ Dictionary m;
+ if (rng.Next(2) == 0) { ft = "foo"; m = fmap; }
+ else { ft = "bar"; m = bmap; }
+ var dt = rng.Next(100);
+ var subj = $"{ft}.{dt}";
+ m.TryGetValue(dt, out var c);
+ m[dt] = c + 1;
+ s.StoreMsg(subj, null, "Hello World"u8.ToArray(), 0);
+ }
+
+ // Check individual foo subjects
+ foreach (var kv in fmap)
+ {
+ var subj = $"foo.{kv.Key}";
+ var totals = s.SubjectsTotals(subj);
+ totals[subj].ShouldBe((ulong)kv.Value);
+ }
+
+ // Check foo.* wildcard
+ var fooTotals = s.SubjectsTotals("foo.*");
+ fooTotals.Count.ShouldBe(fmap.Count);
+ var fooExpected = (ulong)fmap.Values.Sum(n => n);
+ fooTotals.Values.Aggregate(0UL, (a, v) => a + v).ShouldBe(fooExpected);
+
+ // Check bar.* wildcard
+ var barTotals = s.SubjectsTotals("bar.*");
+ barTotals.Count.ShouldBe(bmap.Count);
+
+ // Check *.*
+ var allTotals = s.SubjectsTotals("*.*");
+ allTotals.Count.ShouldBe(fmap.Count + bmap.Count);
+ }
+
+ // -------------------------------------------------------------------------
+ // NumPending
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreNumPending server/memstore_test.go:637
+ [Fact]
+ public void NumPending_MatchesFilteredCount()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+ var tokens = new[] { "foo", "bar", "baz" };
+ var rng = new Random(99);
+
+ string GenSubj() => $"{tokens[rng.Next(3)]}.{tokens[rng.Next(3)]}.{tokens[rng.Next(3)]}.{tokens[rng.Next(3)]}";
+
+ for (var i = 0; i < 5_000; i++)
+ s.StoreMsg(GenSubj(), null, "Hello World"u8.ToArray(), 0);
+
+ var state = s.State();
+ var startSeqs = new ulong[] { 0, 1, 2, 200, 444, 555, 2222, 4000 };
+ var checkSubs = new[] { "foo.>", "*.bar.>", "foo.bar.*.baz", "*.foo.bar.*", "foo.foo.bar.baz" };
+
+ foreach (var filter in checkSubs)
+ {
+ foreach (var startSeq in startSeqs)
+ {
+ var (total, validThrough) = s.NumPending(startSeq, filter, false);
+ validThrough.ShouldBe(state.LastSeq);
+
+ // Sanity-check: manually count matching msgs from startSeq
+ var sseq = startSeq == 0 ? 1 : startSeq;
+ ulong expected = 0;
+ for (var seq = sseq; seq <= state.LastSeq; seq++)
+ {
+ try
+ {
+ var sm = s.LoadMsg(seq, null);
+ if (SubjectMatchesFilter(sm.Subject, filter)) expected++;
+ }
+ catch (KeyNotFoundException) { }
+ }
+ total.ShouldBe(expected, $"filter={filter} start={startSeq}");
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // MultiLastSeqs
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreMultiLastSeqs server/memstore_test.go:923
+ [Fact]
+ public void MultiLastSeqs_ReturnsLastPerSubject()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+ var msg = "abc"u8.ToArray();
+
+ for (var i = 0; i < 33; i++)
+ {
+ s.StoreMsg("foo.foo", null, msg, 0);
+ s.StoreMsg("foo.bar", null, msg, 0);
+ s.StoreMsg("foo.baz", null, msg, 0);
+ }
+ for (var i = 0; i < 33; i++)
+ {
+ s.StoreMsg("bar.foo", null, msg, 0);
+ s.StoreMsg("bar.bar", null, msg, 0);
+ s.StoreMsg("bar.baz", null, msg, 0);
+ }
+
+ // Up to seq 3
+ s.MultiLastSeqs(["foo.*"], 3, -1).ShouldBe([1UL, 2UL, 3UL]);
+ // All of foo.*
+ s.MultiLastSeqs(["foo.*"], 0, -1).ShouldBe([97UL, 98UL, 99UL]);
+ // All of bar.*
+ s.MultiLastSeqs(["bar.*"], 0, -1).ShouldBe([196UL, 197UL, 198UL]);
+ // bar.* at seq <= 99 — nothing
+ s.MultiLastSeqs(["bar.*"], 99, -1).ShouldBe([]);
+
+ // Explicit subjects
+ s.MultiLastSeqs(["foo.foo", "foo.bar", "foo.baz"], 3, -1).ShouldBe([1UL, 2UL, 3UL]);
+ s.MultiLastSeqs(["foo.foo", "foo.bar", "foo.baz"], 0, -1).ShouldBe([97UL, 98UL, 99UL]);
+ s.MultiLastSeqs(["bar.foo", "bar.bar", "bar.baz"], 0, -1).ShouldBe([196UL, 197UL, 198UL]);
+ s.MultiLastSeqs(["bar.foo", "bar.bar", "bar.baz"], 99, -1).ShouldBe([]);
+
+ // Single filter
+ s.MultiLastSeqs(["foo.foo"], 3, -1).ShouldBe([1UL]);
+
+ // De-duplicate overlapping filters
+ s.MultiLastSeqs(["foo.*", "foo.bar"], 3, -1).ShouldBe([1UL, 2UL, 3UL]);
+
+ // All subjects
+ s.MultiLastSeqs([">"], 0, -1).ShouldBe([97UL, 98UL, 99UL, 196UL, 197UL, 198UL]);
+ s.MultiLastSeqs([">"], 99, -1).ShouldBe([97UL, 98UL, 99UL]);
+ }
+
+ // -------------------------------------------------------------------------
+ // MultiLastSeqs — maxAllowed
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreMultiLastSeqsMaxAllowed server/memstore_test.go:1010
+ [Fact]
+ public void MultiLastSeqs_MaxAllowed_ThrowsWhenExceeded()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+ var msg = "abc"u8.ToArray();
+
+ for (var i = 1; i <= 100; i++)
+ s.StoreMsg($"foo.{i}", null, msg, 0);
+
+ Should.Throw(() => s.MultiLastSeqs(["foo.*"], 0, 10));
+ }
+
+ // -------------------------------------------------------------------------
+ // SubjectForSeq
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreSubjectForSeq server/memstore_test.go:1319
+ [Fact]
+ public void SubjectForSeq_ReturnsCorrectSubject()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ s.StoreMsg("foo.bar", null, [], 0);
+
+ // seq 0 (not found)
+ Should.Throw(() => s.SubjectForSeq(0));
+
+ // seq 1 — should be "foo.bar"
+ s.SubjectForSeq(1).ShouldBe("foo.bar");
+
+ // seq 2 (not yet stored)
+ Should.Throw(() => s.SubjectForSeq(2));
+ }
+
+ // -------------------------------------------------------------------------
+ // AllLastSeqs
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreAllLastSeqs server/memstore_test.go:1266
+ [Fact]
+ public void AllLastSeqs_ReturnsLastPerSubjectSorted()
+ {
+ var cfg = new StreamConfig
+ {
+ Name = "zzz",
+ Subjects = ["*.*"],
+ MaxMsgsPer = 50,
+ Storage = StorageType.Memory,
+ };
+ var ms = new MemStore(cfg);
+ var s = Sync(ms);
+
+ var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" };
+ var msg = "abc"u8.ToArray();
+ var rng = new Random(7);
+
+ for (var i = 0; i < 10_000; i++)
+ s.StoreMsg(subjs[rng.Next(subjs.Length)], null, msg, 0);
+
+ // Compute expected last sequences per subject
+ var expected = new List();
+ foreach (var subj in subjs)
+ {
+ try
+ {
+ var sm = s.LoadLastMsg(subj, null);
+ expected.Add(sm.Sequence);
+ }
+ catch (KeyNotFoundException) { }
+ }
+ expected.Sort();
+
+ var seqs = s.AllLastSeqs();
+ seqs.ShouldBe(expected.ToArray());
+ }
+
+ // -------------------------------------------------------------------------
+ // GetSeqFromTime with last deleted
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreGetSeqFromTimeWithLastDeleted server/memstore_test.go:839
+ [Fact]
+ public void GetSeqFromTime_WithLastDeleted()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ const int total = 1000;
+ DateTime midTime = default;
+ for (var i = 1; i <= total; i++)
+ {
+ s.StoreMsg("A", null, "OK"u8.ToArray(), 0);
+ if (i == total / 2)
+ {
+ Thread.Sleep(100);
+ midTime = DateTime.UtcNow;
+ }
+ }
+
+ // Delete last 100
+ for (var seq = total - 100; seq <= total; seq++)
+ s.RemoveMsg((ulong)seq);
+
+ // Should not panic and should return correct value
+ var found = s.GetSeqFromTime(midTime);
+ found.ShouldBe(501UL);
+ }
+
+ // -------------------------------------------------------------------------
+ // SkipMsgs
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreSkipMsgs server/memstore_test.go:871
+ [Fact]
+ public void SkipMsgs_ReservesSequences()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ // Wrong starting sequence should fail
+ Should.Throw(() => s.SkipMsgs(10, 100));
+
+ // Skip from seq 1
+ s.SkipMsgs(1, 100);
+ var state = s.State();
+ state.FirstSeq.ShouldBe(101UL);
+ state.LastSeq.ShouldBe(100UL);
+
+ // Skip many more
+ s.SkipMsgs(101, 100_000);
+ state = s.State();
+ state.FirstSeq.ShouldBe(100_101UL);
+ state.LastSeq.ShouldBe(100_100UL);
+
+ // New store: store a message then skip
+ var ms2 = new MemStore();
+ var s2 = Sync(ms2);
+ s2.StoreMsg("foo", null, [], 0);
+ s2.SkipMsgs(2, 10);
+
+ state = s2.State();
+ state.FirstSeq.ShouldBe(1UL);
+ state.LastSeq.ShouldBe(11UL);
+ state.Msgs.ShouldBe(1UL);
+ state.NumDeleted.ShouldBe(10);
+ state.Deleted.ShouldNotBeNull();
+ state.Deleted!.Length.ShouldBe(10);
+
+ // FastState consistency
+ var fstate = new StreamState();
+ s2.FastState(ref fstate);
+ fstate.FirstSeq.ShouldBe(1UL);
+ fstate.LastSeq.ShouldBe(11UL);
+ fstate.Msgs.ShouldBe(1UL);
+ fstate.NumDeleted.ShouldBe(10);
+ }
+
+ // -------------------------------------------------------------------------
+ // DeleteBlocks
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreDeleteBlocks server/memstore_test.go:799
+ [Fact]
+ public void DeleteBlocks_DmapSizeMatchesNumDeleted()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ const int total = 10_000;
+ for (var i = 0; i < total; i++)
+ s.StoreMsg("A", null, "OK"u8.ToArray(), 0);
+
+ // Delete 5000 random sequences
+ var rng = new Random(13);
+ var deleteSet = new HashSet();
+ while (deleteSet.Count < 5000)
+ deleteSet.Add(rng.Next(total) + 1);
+
+ foreach (var seq in deleteSet)
+ s.RemoveMsg((ulong)seq);
+
+ var fstate = new StreamState();
+ s.FastState(ref fstate);
+
+ // NumDeleted from FastState must equal interior gap count
+ var fullState = s.State();
+ var dmapSize = fullState.Deleted?.Length ?? 0;
+ dmapSize.ShouldBe(fstate.NumDeleted);
+ }
+
+ // -------------------------------------------------------------------------
+ // MessageTTL
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreMessageTTL server/memstore_test.go:1202
+ [Fact]
+ public void MessageTTL_ExpiresAfterDelay()
+ {
+ var cfg = new StreamConfig
+ {
+ Name = "zzz",
+ Subjects = ["test"],
+ Storage = StorageType.Memory,
+ AllowMsgTtl = true,
+ };
+ var ms = new MemStore(cfg);
+ var s = Sync(ms);
+
+ const long ttl = 1; // 1 second
+
+ for (var i = 1; i <= 10; i++)
+ s.StoreMsg("test", null, [], ttl);
+
+ var ss = new StreamState();
+ s.FastState(ref ss);
+ ss.FirstSeq.ShouldBe(1UL);
+ ss.LastSeq.ShouldBe(10UL);
+ ss.Msgs.ShouldBe(10UL);
+
+ // Wait for TTL to expire (> 1 sec + check interval of 1 sec)
+ Thread.Sleep(2_500);
+
+ s.FastState(ref ss);
+ ss.FirstSeq.ShouldBe(11UL);
+ ss.LastSeq.ShouldBe(10UL);
+ ss.Msgs.ShouldBe(0UL);
+ }
+
+ // -------------------------------------------------------------------------
+ // UpdateConfigTTLState
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreUpdateConfigTTLState server/memstore_test.go:1299
+ [Fact]
+ public void UpdateConfig_TtlStateInitializedAndDestroyed()
+ {
+ var cfg = new StreamConfig
+ {
+ Name = "zzz",
+ Subjects = [">"],
+ Storage = StorageType.Memory,
+ AllowMsgTtl = false,
+ };
+ var ms = new MemStore(cfg);
+ var s = Sync(ms);
+
+ // TTL disabled — internal TTL wheel should be null (we cannot observe it directly,
+ // but UpdateConfig must not throw and subsequent behaviour must be correct)
+ cfg.AllowMsgTtl = true;
+ s.UpdateConfig(cfg);
+
+ // Store with TTL — should work
+ s.StoreMsg("test", null, [], 3600);
+ s.State().Msgs.ShouldBe(1UL);
+
+ // Disable TTL again
+ cfg.AllowMsgTtl = false;
+ s.UpdateConfig(cfg);
+
+ // Message stored before disabling should still be present (TTL wheel gone but msg stays)
+ s.State().Msgs.ShouldBe(1UL);
+ }
+
+ // -------------------------------------------------------------------------
+ // NextWildcardMatch
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreNextWildcardMatch server/memstore_test.go:1373
+ [Fact]
+ public void NextWildcardMatch_BoundsAreCorrect()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ void StoreN(string subj, int n)
+ {
+ for (var i = 0; i < n; i++)
+ s.StoreMsg(subj, null, "msg"u8.ToArray(), 0);
+ }
+
+ StoreN("foo.bar.a", 1); // seq 1
+ StoreN("foo.baz.bar", 10); // seqs 2-11
+ StoreN("foo.bar.b", 1); // seq 12
+ StoreN("foo.baz.bar", 10); // seqs 13-22
+ StoreN("foo.baz.bar.no.match", 10); // seqs 23-32
+
+ lock (ms.Gate)
+ {
+ var (first, last, found) = ms.NextWildcardMatchLocked("foo.bar.*", 0);
+ found.ShouldBeTrue();
+ first.ShouldBe(1UL);
+ last.ShouldBe(12UL);
+
+ (first, last, found) = ms.NextWildcardMatchLocked("foo.bar.*", 1);
+ found.ShouldBeTrue();
+ first.ShouldBe(1UL);
+ last.ShouldBe(12UL);
+
+ (first, last, found) = ms.NextWildcardMatchLocked("foo.bar.*", 2);
+ found.ShouldBeTrue();
+ first.ShouldBe(12UL);
+ last.ShouldBe(12UL);
+
+ (_, _, found) = ms.NextWildcardMatchLocked("foo.bar.*", first + 1);
+ found.ShouldBeFalse();
+
+ (first, last, found) = ms.NextWildcardMatchLocked("foo.baz.*", 1);
+ found.ShouldBeTrue();
+ first.ShouldBe(2UL);
+ last.ShouldBe(22UL);
+
+ (first, last, found) = ms.NextWildcardMatchLocked("foo.nope.*", 1);
+ found.ShouldBeFalse();
+ first.ShouldBe(0UL);
+ last.ShouldBe(0UL);
+
+ (first, last, found) = ms.NextWildcardMatchLocked("foo.>", 1);
+ found.ShouldBeTrue();
+ first.ShouldBe(1UL);
+ last.ShouldBe(32UL);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // NextLiteralMatch
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreNextLiteralMatch server/memstore_test.go:1454
+ [Fact]
+ public void NextLiteralMatch_BoundsAreCorrect()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ void StoreN(string subj, int n)
+ {
+ for (var i = 0; i < n; i++)
+ s.StoreMsg(subj, null, "msg"u8.ToArray(), 0);
+ }
+
+ StoreN("foo.bar.a", 1); // seq 1
+ StoreN("foo.baz.bar", 10); // seqs 2-11
+ StoreN("foo.bar.b", 1); // seq 12
+ StoreN("foo.baz.bar", 10); // seqs 13-22
+ StoreN("foo.baz.bar.no.match", 10); // seqs 23-32
+
+ lock (ms.Gate)
+ {
+ var (first, last, found) = ms.NextLiteralMatchLocked("foo.bar.a", 0);
+ found.ShouldBeTrue();
+ first.ShouldBe(1UL);
+ last.ShouldBe(1UL);
+
+ (_, _, found) = ms.NextLiteralMatchLocked("foo.bar.a", 2);
+ found.ShouldBeFalse();
+
+ (first, last, found) = ms.NextLiteralMatchLocked("foo.baz.bar", 1);
+ found.ShouldBeTrue();
+ first.ShouldBe(2UL);
+ last.ShouldBe(22UL);
+
+ (first, last, found) = ms.NextLiteralMatchLocked("foo.baz.bar", 22);
+ found.ShouldBeTrue();
+ first.ShouldBe(22UL);
+ last.ShouldBe(22UL);
+
+ (first, last, found) = ms.NextLiteralMatchLocked("foo.baz.bar", 23);
+ found.ShouldBeFalse();
+ first.ShouldBe(0UL);
+ last.ShouldBe(0UL);
+
+ (_, _, found) = ms.NextLiteralMatchLocked("foo.nope", 1);
+ found.ShouldBeFalse();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // InitialFirstSeq
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreInitialFirstSeq server/memstore_test.go:765
+ [Fact]
+ public void InitialFirstSeq_StartAtConfiguredSeq()
+ {
+ var cfg = new StreamConfig
+ {
+ Name = "zzz",
+ Storage = StorageType.Memory,
+ FirstSeq = 1000,
+ };
+ var ms = new MemStore(cfg);
+ var s = Sync(ms);
+
+ var (seq, _) = s.StoreMsg("A", null, "OK"u8.ToArray(), 0);
+ seq.ShouldBe(1000UL);
+
+ (seq, _) = s.StoreMsg("B", null, "OK"u8.ToArray(), 0);
+ seq.ShouldBe(1001UL);
+
+ var state = new StreamState();
+ s.FastState(ref state);
+ state.Msgs.ShouldBe(2UL);
+ state.FirstSeq.ShouldBe(1000UL);
+ state.LastSeq.ShouldBe(1001UL);
+ }
+
+ // -------------------------------------------------------------------------
+ // PurgeEx with subject
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStorePurgeExWithSubject server/memstore_test.go:437
+ [Fact]
+ public void PurgeEx_WithSubject_PurgesAll()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ for (var i = 0; i < 100; i++)
+ s.StoreMsg("foo", null, [], 0);
+
+ s.PurgeEx("foo", 1, 0);
+ s.State().Msgs.ShouldBe(0UL);
+ }
+
+ // -------------------------------------------------------------------------
+ // PurgeEx with deleted messages
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStorePurgeExWithDeletedMsgs server/memstore_test.go:1031
+ [Fact]
+ public void PurgeEx_WithDeletedMsgs_CorrectFirstSeq()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+ var msg = "abc"u8.ToArray();
+
+ for (var i = 1; i <= 10; i++)
+ s.StoreMsg("foo", null, msg, 0);
+
+ s.RemoveMsg(2);
+ s.RemoveMsg(9); // was the bug
+
+ var n = s.PurgeEx("", 9, 0);
+ n.ShouldBe(7UL); // seqs 1,3,4,5,6,7,8 (not 2 since deleted, not 9 since deleted)
+
+ var state = new StreamState();
+ s.FastState(ref state);
+ state.FirstSeq.ShouldBe(10UL);
+ state.LastSeq.ShouldBe(10UL);
+ state.Msgs.ShouldBe(1UL);
+ }
+
+ // -------------------------------------------------------------------------
+ // DeleteAll FirstSequenceCheck
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreDeleteAllFirstSequenceCheck server/memstore_test.go:1060
+ [Fact]
+ public void DeleteAll_FirstSeqIsLastPlusOne()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ for (var i = 1; i <= 10; i++)
+ s.StoreMsg("foo", null, "abc"u8.ToArray(), 0);
+
+ for (ulong seq = 1; seq <= 10; seq++)
+ s.RemoveMsg(seq);
+
+ var state = new StreamState();
+ s.FastState(ref state);
+ state.FirstSeq.ShouldBe(11UL);
+ state.LastSeq.ShouldBe(10UL);
+ state.Msgs.ShouldBe(0UL);
+ }
+
+ // -------------------------------------------------------------------------
+ // NumPending — bug fix
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStoreNumPendingBug server/memstore_test.go:1137
+ [Fact]
+ public void NumPending_Bug_CorrectCount()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ foreach (var subj in new[] { "foo.foo", "foo.bar", "foo.baz", "foo.zzz" })
+ {
+ s.StoreMsg("foo.aaa", null, [], 0);
+ s.StoreMsg(subj, null, [], 0);
+ s.StoreMsg(subj, null, [], 0);
+ }
+
+ // 12 msgs total
+ var (total, _) = s.NumPending(4, "foo.*", false);
+
+ ulong expected = 0;
+ for (var seq = 4; seq <= 12; seq++)
+ {
+ try
+ {
+ var sm = s.LoadMsg((ulong)seq, null);
+ if (SubjectMatchesFilter(sm.Subject, "foo.*")) expected++;
+ }
+ catch (KeyNotFoundException) { }
+ }
+ total.ShouldBe(expected);
+ }
+
+ // -------------------------------------------------------------------------
+ // Purge clears dmap
+ // -------------------------------------------------------------------------
+
+ // Go: TestMemStorePurgeLeaksDmap server/memstore_test.go:1168
+ [Fact]
+ public void Purge_ClearsDmap()
+ {
+ var ms = new MemStore();
+ var s = Sync(ms);
+
+ for (var i = 0; i < 10; i++)
+ s.StoreMsg("foo", null, [], 0);
+
+ for (ulong i = 2; i <= 9; i++)
+ s.RemoveMsg(i);
+
+ // 8 interior gaps now
+ var state = s.State();
+ state.NumDeleted.ShouldBe(8);
+
+ // Purge should also clear dmap
+ var purged = s.Purge();
+ purged.ShouldBe(2UL); // 2 actual msgs remain (1 and 10)
+
+ state = s.State();
+ state.NumDeleted.ShouldBe(0);
+ state.Deleted.ShouldBeNull();
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ private static bool SubjectMatchesFilter(string subject, string filter)
+ {
+ if (string.IsNullOrEmpty(filter) || filter == ">") return true;
+ if (NATS.Server.Subscriptions.SubjectMatch.IsLiteral(filter))
+ return string.Equals(subject, filter, StringComparison.Ordinal);
+ return NATS.Server.Subscriptions.SubjectMatch.MatchLiteral(subject, filter);
+ }
+}
diff --git a/tests/NATS.Server.Tests/JetStream/Storage/MemStoreTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/MemStoreTests.cs
index 9fdc974..f0951c3 100644
--- a/tests/NATS.Server.Tests/JetStream/Storage/MemStoreTests.cs
+++ b/tests/NATS.Server.Tests/JetStream/Storage/MemStoreTests.cs
@@ -216,7 +216,9 @@ public sealed class MemStoreTests
await store.AppendAsync("foo", payload, default);
var state = await store.GetStateAsync(default);
- state.Bytes.ShouldBe((ulong)500);
+ // Go parity: MsgSize = subj.Length + hdr + data + 16 overhead
+ // "foo"(3) + 100 + 16 = 119 per msg × 5 = 595
+ state.Bytes.ShouldBe((ulong)595);
}
// Go: TestMemStoreBytesLimit server/memstore_test.go
@@ -233,7 +235,9 @@ public sealed class MemStoreTests
await store.RemoveAsync(3, default);
var state = await store.GetStateAsync(default);
- state.Bytes.ShouldBe((ulong)300);
+ // Go parity: MsgSize = subj.Length + hdr + data + 16 overhead
+ // "foo"(3) + 100 + 16 = 119 per msg × 3 remaining = 357
+ state.Bytes.ShouldBe((ulong)357);
}
// Snapshot and restore.