diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs
new file mode 100644
index 0000000..ec0b603
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs
@@ -0,0 +1,400 @@
+// Copyright 2019-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/memstore.go (consumerMemStore)
+
+namespace ZB.MOM.NatsNet.Server;
+
+///
+/// In-memory implementation of .
+/// Stores consumer delivery and ack state in memory only.
+///
+public sealed class ConsumerMemStore : IConsumerStore
+{
+ // -----------------------------------------------------------------------
+ // Fields
+ // -----------------------------------------------------------------------
+
+ private readonly object _mu = new();
+ private readonly JetStreamMemStore _ms;
+ private ConsumerConfig _cfg;
+ private ConsumerState _state = new();
+ private bool _closed;
+
+ // -----------------------------------------------------------------------
+ // Constructor
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Creates a new consumer memory store backed by the given stream store.
+ ///
+ public ConsumerMemStore(JetStreamMemStore ms, ConsumerConfig cfg)
+ {
+ _ms = ms;
+ _cfg = cfg;
+ }
+
+ // -----------------------------------------------------------------------
+ // IConsumerStore — starting sequence
+ // -----------------------------------------------------------------------
+
+ ///
+ public void SetStarting(ulong sseq)
+ {
+ lock (_mu)
+ {
+ _state.Delivered.Stream = sseq;
+ _state.AckFloor.Stream = sseq;
+ }
+ }
+
+ ///
+ public void UpdateStarting(ulong sseq)
+ {
+ lock (_mu)
+ {
+ if (sseq > _state.Delivered.Stream)
+ {
+ _state.Delivered.Stream = sseq;
+ // For AckNone just update delivered and ackfloor at the same time.
+ if (_cfg.AckPolicy == AckPolicy.AckNone)
+ _state.AckFloor.Stream = sseq;
+ }
+ }
+ }
+
+ ///
+ public void Reset(ulong sseq)
+ {
+ lock (_mu)
+ {
+ _state = new ConsumerState();
+ }
+ SetStarting(sseq);
+ }
+
+ // -----------------------------------------------------------------------
+ // IConsumerStore — state query
+ // -----------------------------------------------------------------------
+
+ ///
+ public bool HasState()
+ {
+ lock (_mu)
+ {
+ return _state.Delivered.Consumer != 0 || _state.Delivered.Stream != 0;
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IConsumerStore — delivery tracking
+ // -----------------------------------------------------------------------
+
+ ///
+ public void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts)
+ {
+ lock (_mu)
+ {
+ if (dc != 1 && _cfg.AckPolicy == AckPolicy.AckNone)
+ throw StoreErrors.ErrNoAckPolicy;
+
+ // Replay from old leader — ignore outdated updates.
+ if (dseq <= _state.AckFloor.Consumer)
+ return;
+
+ if (_cfg.AckPolicy != AckPolicy.AckNone)
+ {
+ _state.Pending ??= new Dictionary();
+
+ if (sseq <= _state.Delivered.Stream)
+ {
+ // Update to a previously delivered message.
+ if (_state.Pending.TryGetValue(sseq, out var p) && p != null)
+ p.Timestamp = ts;
+ }
+ else
+ {
+ _state.Pending[sseq] = new Pending { Sequence = dseq, Timestamp = ts };
+ }
+
+ if (dseq > _state.Delivered.Consumer)
+ _state.Delivered.Consumer = dseq;
+ if (sseq > _state.Delivered.Stream)
+ _state.Delivered.Stream = sseq;
+
+ if (dc > 1)
+ {
+ var maxdc = (ulong)_cfg.MaxDeliver;
+ if (maxdc > 0 && dc > maxdc)
+ _state.Pending.Remove(sseq);
+
+ _state.Redelivered ??= new Dictionary();
+ if (!_state.Redelivered.TryGetValue(sseq, out var cur) || cur < dc - 1)
+ _state.Redelivered[sseq] = dc - 1;
+ }
+ }
+ else
+ {
+ // AckNone — update delivered and ackfloor together.
+ if (dseq > _state.Delivered.Consumer)
+ {
+ _state.Delivered.Consumer = dseq;
+ _state.AckFloor.Consumer = dseq;
+ }
+ if (sseq > _state.Delivered.Stream)
+ {
+ _state.Delivered.Stream = sseq;
+ _state.AckFloor.Stream = sseq;
+ }
+ }
+ }
+ }
+
+ ///
+ public void UpdateAcks(ulong dseq, ulong sseq)
+ {
+ lock (_mu)
+ {
+ if (_cfg.AckPolicy == AckPolicy.AckNone)
+ throw StoreErrors.ErrNoAckPolicy;
+
+ // Ignore outdated acks.
+ if (dseq <= _state.AckFloor.Consumer)
+ return;
+
+ if (_state.Pending == null || !_state.Pending.ContainsKey(sseq))
+ {
+ _state.Redelivered?.Remove(sseq);
+ throw StoreErrors.ErrStoreMsgNotFound;
+ }
+
+ if (_cfg.AckPolicy == AckPolicy.AckAll)
+ {
+ var sgap = sseq - _state.AckFloor.Stream;
+ _state.AckFloor.Consumer = dseq;
+ _state.AckFloor.Stream = sseq;
+
+ if (sgap > (ulong)_state.Pending.Count)
+ {
+ var toRemove = new List();
+ foreach (var kv in _state.Pending)
+ if (kv.Key <= sseq)
+ toRemove.Add(kv.Key);
+ foreach (var k in toRemove)
+ {
+ _state.Pending.Remove(k);
+ _state.Redelivered?.Remove(k);
+ }
+ }
+ else
+ {
+ for (var seq = sseq; seq > sseq - sgap && _state.Pending.Count > 0; seq--)
+ {
+ _state.Pending.Remove(seq);
+ _state.Redelivered?.Remove(seq);
+ if (seq == 0) break;
+ }
+ }
+ return;
+ }
+
+ // AckExplicit
+ if (_state.Pending.TryGetValue(sseq, out var pending) && pending != null)
+ {
+ _state.Pending.Remove(sseq);
+ if (dseq > pending.Sequence && pending.Sequence > 0)
+ dseq = pending.Sequence; // Use the original delivery sequence.
+ }
+
+ if (_state.Pending.Count == 0)
+ {
+ _state.AckFloor.Consumer = _state.Delivered.Consumer;
+ _state.AckFloor.Stream = _state.Delivered.Stream;
+ }
+ else if (dseq == _state.AckFloor.Consumer + 1)
+ {
+ _state.AckFloor.Consumer = dseq;
+ _state.AckFloor.Stream = sseq;
+
+ if (_state.Delivered.Consumer > dseq)
+ {
+ for (var ss = sseq + 1; ss <= _state.Delivered.Stream; ss++)
+ {
+ if (_state.Pending.TryGetValue(ss, out var pp) && pp != null)
+ {
+ if (pp.Sequence > 0)
+ {
+ _state.AckFloor.Consumer = pp.Sequence - 1;
+ _state.AckFloor.Stream = ss - 1;
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ _state.Redelivered?.Remove(sseq);
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IConsumerStore — config update
+ // -----------------------------------------------------------------------
+
+ ///
+ public void UpdateConfig(ConsumerConfig cfg)
+ {
+ lock (_mu)
+ {
+ _cfg = cfg;
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IConsumerStore — update state
+ // -----------------------------------------------------------------------
+
+ ///
+ public void Update(ConsumerState state)
+ {
+ if (state.AckFloor.Consumer > state.Delivered.Consumer)
+ throw new InvalidOperationException("bad ack floor for consumer");
+ if (state.AckFloor.Stream > state.Delivered.Stream)
+ throw new InvalidOperationException("bad ack floor for stream");
+
+ Dictionary? pending = null;
+ Dictionary? redelivered = null;
+
+ if (state.Pending?.Count > 0)
+ {
+ pending = new Dictionary(state.Pending.Count);
+ foreach (var kv in state.Pending)
+ {
+ if (kv.Key <= state.AckFloor.Stream || kv.Key > state.Delivered.Stream)
+ throw new InvalidOperationException($"bad pending entry, sequence [{kv.Key}] out of range");
+ pending[kv.Key] = new Pending { Sequence = kv.Value.Sequence, Timestamp = kv.Value.Timestamp };
+ }
+ }
+
+ if (state.Redelivered?.Count > 0)
+ {
+ redelivered = new Dictionary(state.Redelivered);
+ }
+
+ lock (_mu)
+ {
+ // Ignore outdated updates.
+ if (state.Delivered.Consumer < _state.Delivered.Consumer ||
+ state.AckFloor.Stream < _state.AckFloor.Stream)
+ throw new InvalidOperationException("old update ignored");
+
+ _state.Delivered = new SequencePair { Consumer = state.Delivered.Consumer, Stream = state.Delivered.Stream };
+ _state.AckFloor = new SequencePair { Consumer = state.AckFloor.Consumer, Stream = state.AckFloor.Stream };
+ _state.Pending = pending;
+ _state.Redelivered = redelivered;
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IConsumerStore — state retrieval
+ // -----------------------------------------------------------------------
+
+ ///
+ public (ConsumerState? State, Exception? Error) State() => StateWithCopy(doCopy: true);
+
+ ///
+ public (ConsumerState? State, Exception? Error) BorrowState() => StateWithCopy(doCopy: false);
+
+ private (ConsumerState? State, Exception? Error) StateWithCopy(bool doCopy)
+ {
+ lock (_mu)
+ {
+ if (_closed)
+ return (null, StoreErrors.ErrStoreClosed);
+
+ var state = new ConsumerState
+ {
+ Delivered = new SequencePair { Consumer = _state.Delivered.Consumer, Stream = _state.Delivered.Stream },
+ AckFloor = new SequencePair { Consumer = _state.AckFloor.Consumer, Stream = _state.AckFloor.Stream },
+ };
+
+ if (_state.Pending?.Count > 0)
+ {
+ state.Pending = doCopy ? CopyPending() : _state.Pending;
+ }
+
+ if (_state.Redelivered?.Count > 0)
+ {
+ state.Redelivered = doCopy ? CopyRedelivered() : _state.Redelivered;
+ }
+
+ return (state, null);
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IConsumerStore — encoding
+ // -----------------------------------------------------------------------
+
+ ///
+ public byte[] EncodedState()
+ {
+ lock (_mu)
+ {
+ if (_closed)
+ throw StoreErrors.ErrStoreClosed;
+ // TODO: session 17 — encode consumer state to binary
+ return Array.Empty();
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IConsumerStore — lifecycle
+ // -----------------------------------------------------------------------
+
+ ///
+ public StorageType Type() => StorageType.MemoryStorage;
+
+ ///
+ public void Stop()
+ {
+ lock (_mu)
+ {
+ _closed = true;
+ }
+ _ms.RemoveConsumer(this);
+ }
+
+ ///
+ public void Delete() => Stop();
+
+ ///
+ public void StreamDelete() => Stop();
+
+ // -----------------------------------------------------------------------
+ // Private helpers
+ // -----------------------------------------------------------------------
+
+ private Dictionary CopyPending()
+ {
+ var pending = new Dictionary(_state.Pending!.Count);
+ foreach (var kv in _state.Pending!)
+ pending[kv.Key] = new Pending { Sequence = kv.Value.Sequence, Timestamp = kv.Value.Timestamp };
+ return pending;
+ }
+
+ private Dictionary CopyRedelivered()
+ {
+ return new Dictionary(_state.Redelivered!);
+ }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs
new file mode 100644
index 0000000..1ed7238
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs
@@ -0,0 +1,1445 @@
+// Copyright 2019-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/memstore.go
+
+using System.Text;
+using ZB.MOM.NatsNet.Server.Internal.DataStructures;
+
+namespace ZB.MOM.NatsNet.Server;
+
+///
+/// In-memory implementation of .
+/// Stores all messages in a Dictionary keyed by sequence number.
+/// Not production-complete: complex methods are stubbed for later sessions.
+///
+public sealed class JetStreamMemStore : IStreamStore
+{
+ // -----------------------------------------------------------------------
+ // Fields
+ // -----------------------------------------------------------------------
+
+ private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
+ private StreamConfig _cfg;
+ private StreamState _state = new();
+
+ // Primary message store: seq -> StoreMsg
+ private Dictionary? _msgs;
+
+ // Per-subject state: subject -> SimpleState
+ private readonly SubjectTree _fss;
+
+ // Deleted sequence set
+ private SequenceSet _dmap = new();
+
+ // Max messages per subject (cfg.MaxMsgsPer)
+ private long _maxp;
+
+ // Registered callbacks
+ private StorageUpdateHandler? _scb;
+ private StorageRemoveMsgHandler? _rmcb;
+ private ProcessJetStreamMsgHandler? _pmsgcb;
+
+ // Age-check timer for MaxAge expiry
+ private Timer? _ageChk;
+ private long _ageChkTime;
+
+ // Consumer count
+ private int _consumers;
+
+ // -----------------------------------------------------------------------
+ // Constructor
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Creates a new in-memory store from the supplied .
+ ///
+ /// Thrown when cfg is null or Storage != MemoryStorage.
+ public JetStreamMemStore(StreamConfig cfg)
+ {
+ if (cfg == null)
+ throw new ArgumentException("config required");
+ if (cfg.Storage != StorageType.MemoryStorage)
+ throw new ArgumentException("JetStreamMemStore requires MemoryStorage type in config");
+
+ _cfg = cfg.Clone();
+ _msgs = new Dictionary();
+ _fss = new SubjectTree();
+ _maxp = cfg.MaxMsgsPer;
+
+ if (cfg.FirstSeq > 0)
+ Purge();
+ }
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — store / load
+ // -----------------------------------------------------------------------
+
+ ///
+ public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[]? msg, long ttl)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ var seq = _state.LastSeq + 1;
+ var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
+ try
+ {
+ StoreRawMsgLocked(subject, hdr, msg, seq, ts, ttl, discardNewCheck: true);
+ _scb?.Invoke(1, (long)MsgSize(subject, hdr, msg), seq, subject);
+ return (seq, ts);
+ }
+ catch
+ {
+ return (0, 0);
+ }
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ ///
+ public void StoreRawMsg(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ StoreRawMsgLocked(subject, hdr, msg, seq, ts, ttl, discardNewCheck);
+ _scb?.Invoke(1, (long)MsgSize(subject, hdr, msg), seq, subject);
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ // Lock must be held.
+ private void StoreRawMsgLocked(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck)
+ {
+ if (_msgs == null)
+ throw StoreErrors.ErrStoreClosed;
+
+ hdr ??= Array.Empty();
+ msg ??= Array.Empty();
+
+ // Discard-new enforcement
+ if (discardNewCheck && _cfg.Discard == DiscardPolicy.DiscardNew)
+ {
+ if (_cfg.MaxMsgs > 0 && _state.Msgs >= (ulong)_cfg.MaxMsgs)
+ throw StoreErrors.ErrMaxMsgs;
+ if (_cfg.MaxBytes > 0 && _state.Bytes + MsgSize(subject, hdr, msg) > (ulong)_cfg.MaxBytes)
+ throw StoreErrors.ErrMaxBytes;
+ }
+
+ if (seq != _state.LastSeq + 1)
+ {
+ if (seq > 0)
+ throw StoreErrors.ErrSequenceMismatch;
+ seq = _state.LastSeq + 1;
+ }
+
+ var now = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime;
+
+ if (_state.Msgs == 0)
+ {
+ _state.FirstSeq = seq;
+ _state.FirstTime = now;
+ }
+
+ // Build combined buffer
+ var buf = new byte[hdr.Length + msg.Length];
+ if (hdr.Length > 0)
+ Buffer.BlockCopy(hdr, 0, buf, 0, hdr.Length);
+ if (msg.Length > 0)
+ Buffer.BlockCopy(msg, 0, buf, hdr.Length, msg.Length);
+
+ var sm = new StoreMsg
+ {
+ Subject = subject,
+ Hdr = hdr.Length > 0 ? buf[..hdr.Length] : Array.Empty(),
+ Msg = buf[hdr.Length..],
+ Buf = buf,
+ Seq = seq,
+ Ts = ts,
+ };
+
+ _msgs[seq] = sm;
+ _state.Msgs++;
+ _state.Bytes += MsgSize(subject, hdr, msg);
+ _state.LastSeq = seq;
+ _state.LastTime = now;
+
+ // Per-subject tracking
+ if (!string.IsNullOrEmpty(subject))
+ {
+ var subjectBytes = Encoding.UTF8.GetBytes(subject);
+ var (ss, found) = _fss.Find(subjectBytes);
+ if (found && ss != null)
+ {
+ ss.Msgs++;
+ ss.Last = seq;
+ ss.LastNeedsUpdate = false;
+
+ // Enforce per-subject limit
+ if (_maxp > 0 && ss.Msgs > (ulong)_maxp)
+ EnforcePerSubjectLimit(subject, ss);
+ }
+ else
+ {
+ _fss.Insert(subjectBytes, new SimpleState { Msgs = 1, First = seq, Last = seq });
+ }
+ }
+
+ // Overall limits
+ EnforceMsgLimit();
+ EnforceBytesLimit();
+
+ // Age check
+ if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero)
+ StartAgeChk();
+ }
+
+ ///
+ public StoreMsg? LoadMsg(ulong seq, StoreMsg? sm)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ return LoadMsgLocked(seq, sm);
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ private StoreMsg? LoadMsgLocked(ulong seq, StoreMsg? sm)
+ {
+ if (_msgs == null)
+ throw StoreErrors.ErrStoreClosed;
+
+ if (!_msgs.TryGetValue(seq, out var stored) || stored == null)
+ {
+ if (seq <= _state.LastSeq)
+ throw StoreErrors.ErrStoreMsgNotFound;
+ throw StoreErrors.ErrStoreEOF;
+ }
+
+ sm ??= new StoreMsg();
+ sm.CopyFrom(stored);
+ return sm;
+ }
+
+ ///
+ public (StoreMsg? Sm, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? smp)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ return LoadNextMsgLocked(filter, wc, start, smp);
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ private (StoreMsg? Sm, ulong Skip) LoadNextMsgLocked(string filter, bool wc, ulong start, StoreMsg? smp)
+ {
+ if (_msgs == null || _state.Msgs == 0)
+ return (null, _state.LastSeq);
+
+ if (start < _state.FirstSeq)
+ start = _state.FirstSeq;
+ if (start > _state.LastSeq)
+ return (null, _state.LastSeq);
+
+ var isAll = string.IsNullOrEmpty(filter) || filter == ">";
+
+ for (var nseq = start; nseq <= _state.LastSeq; nseq++)
+ {
+ if (!_msgs.TryGetValue(nseq, out var sm) || sm == null)
+ continue;
+ if (isAll || (wc ? MatchLiteral(sm.Subject, filter) : sm.Subject == filter))
+ {
+ smp ??= new StoreMsg();
+ smp.CopyFrom(sm);
+ return (smp, nseq);
+ }
+ }
+ return (null, _state.LastSeq);
+ }
+
+ ///
+ public (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp)
+ {
+ // TODO: session 17 — implement gsl.SimpleSublist equivalent
+ _mu.EnterReadLock();
+ try
+ {
+ if (_msgs == null || _state.Msgs == 0)
+ return (null, _state.LastSeq);
+
+ if (start < _state.FirstSeq)
+ start = _state.FirstSeq;
+ if (start > _state.LastSeq)
+ return (null, _state.LastSeq);
+
+ for (var nseq = start; nseq <= _state.LastSeq; nseq++)
+ {
+ if (_msgs.TryGetValue(nseq, out var sm) && sm != null)
+ {
+ smp ??= new StoreMsg();
+ smp.CopyFrom(sm);
+ return (smp, nseq);
+ }
+ }
+ return (null, _state.LastSeq);
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ ///
+ public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ return LoadLastLocked(subject, sm);
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ private StoreMsg? LoadLastLocked(string subject, StoreMsg? sm)
+ {
+ if (_msgs == null)
+ throw StoreErrors.ErrStoreClosed;
+
+ StoreMsg? stored = null;
+ if (string.IsNullOrEmpty(subject) || subject == ">")
+ {
+ _msgs.TryGetValue(_state.LastSeq, out stored);
+ }
+ else
+ {
+ var (ss, found) = _fss.Find(Encoding.UTF8.GetBytes(subject));
+ if (found && ss != null && ss.Msgs > 0)
+ _msgs.TryGetValue(ss.Last, out stored);
+ }
+
+ if (stored == null)
+ throw StoreErrors.ErrStoreMsgNotFound;
+
+ sm ??= new StoreMsg();
+ sm.CopyFrom(stored);
+ return sm;
+ }
+
+ ///
+ public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ if (_msgs == null)
+ return (null, StoreErrors.ErrStoreClosed);
+ if (_state.Msgs == 0 || start < _state.FirstSeq)
+ return (null, StoreErrors.ErrStoreEOF);
+ if (start > _state.LastSeq)
+ start = _state.LastSeq;
+
+ for (var seq = start; seq >= _state.FirstSeq; seq--)
+ {
+ if (_msgs.TryGetValue(seq, out var sm) && sm != null)
+ {
+ smp ??= new StoreMsg();
+ smp.CopyFrom(sm);
+ return (smp, null);
+ }
+ if (seq == 0) break;
+ }
+ return (null, StoreErrors.ErrStoreEOF);
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ ///
+ public (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp)
+ {
+ // TODO: session 17 — implement gsl.SimpleSublist equivalent
+ _mu.EnterReadLock();
+ try
+ {
+ if (_msgs == null || _state.Msgs == 0 || start < _state.FirstSeq)
+ return (null, _state.FirstSeq, StoreErrors.ErrStoreEOF);
+
+ if (start > _state.LastSeq)
+ start = _state.LastSeq;
+
+ for (var nseq = start; nseq >= _state.FirstSeq; nseq--)
+ {
+ if (_msgs.TryGetValue(nseq, out var sm) && sm != null)
+ {
+ smp ??= new StoreMsg();
+ smp.CopyFrom(sm);
+ return (smp, nseq, null);
+ }
+ if (nseq == 0) break;
+ }
+ return (null, _state.LastSeq, StoreErrors.ErrStoreEOF);
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — skip
+ // -----------------------------------------------------------------------
+
+ ///
+ public (ulong Seq, Exception? Error) SkipMsg(ulong seq)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (seq != _state.LastSeq + 1)
+ {
+ if (seq > 0)
+ return (0, StoreErrors.ErrSequenceMismatch);
+ seq = _state.LastSeq + 1;
+ }
+
+ _state.LastSeq = seq;
+ _state.LastTime = DateTime.UtcNow;
+
+ if (_state.Msgs == 0)
+ {
+ _state.FirstSeq = seq + 1;
+ _state.FirstTime = default;
+ }
+ else
+ {
+ _dmap.Insert(seq);
+ }
+ return (seq, null);
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ ///
+ public void SkipMsgs(ulong seq, ulong num)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (seq != _state.LastSeq + 1)
+ {
+ if (seq > 0)
+ throw StoreErrors.ErrSequenceMismatch;
+ seq = _state.LastSeq + 1;
+ }
+ var lseq = seq + num - 1;
+ _state.LastSeq = lseq;
+ _state.LastTime = DateTime.UtcNow;
+
+ if (_state.Msgs == 0)
+ {
+ _state.FirstSeq = lseq + 1;
+ _state.FirstTime = default;
+ }
+ else
+ {
+ for (; seq <= lseq; seq++)
+ _dmap.Insert(seq);
+ }
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — remove
+ // -----------------------------------------------------------------------
+
+ ///
+ public (bool Removed, Exception? Error) RemoveMsg(ulong seq)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ return (RemoveMsgLocked(seq, secure: false), null);
+ }
+ finally
+ {
+ if (_mu.IsWriteLockHeld)
+ _mu.ExitWriteLock();
+ }
+ }
+
+ ///
+ public (bool Removed, Exception? Error) EraseMsg(ulong seq)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ return (RemoveMsgLocked(seq, secure: true), null);
+ }
+ finally
+ {
+ if (_mu.IsWriteLockHeld)
+ _mu.ExitWriteLock();
+ }
+ }
+
+ // Lock must be held.
+ private bool RemoveMsgLocked(ulong seq, bool secure)
+ {
+ if (_msgs == null || !_msgs.TryGetValue(seq, out var sm) || sm == null)
+ return false;
+
+ var size = MsgSize(sm.Subject, sm.Hdr, sm.Msg);
+
+ if (_state.Msgs > 0)
+ {
+ _state.Msgs--;
+ if (size > _state.Bytes)
+ size = _state.Bytes;
+ _state.Bytes -= size;
+ }
+
+ _dmap.Insert(seq);
+ UpdateFirstSeq(seq);
+ RemoveSeqPerSubject(sm.Subject, seq);
+
+ if (secure)
+ {
+ if (sm.Hdr.Length > 0)
+ Array.Clear(sm.Hdr);
+ if (sm.Msg.Length > 0)
+ Array.Clear(sm.Msg);
+ sm.Seq = 0;
+ sm.Ts = 0;
+ }
+
+ _msgs.Remove(seq);
+
+ // Invoke update callback — we temporarily release/reacquire write lock
+ var cb = _scb;
+ if (cb != null)
+ {
+ var subj = sm.Subject;
+ var delta = size;
+ _mu.ExitWriteLock();
+ try { cb(-1, -(long)delta, seq, subj); }
+ finally { _mu.EnterWriteLock(); }
+ }
+
+ return true;
+ }
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — purge / compact / truncate
+ // -----------------------------------------------------------------------
+
+ ///
+ public (ulong Purged, Exception? Error) Purge()
+ {
+ _mu.EnterWriteLock();
+ ulong purged;
+ long bytes;
+ ulong fseq;
+ StorageUpdateHandler? cb;
+ try
+ {
+ purged = (ulong)(_msgs?.Count ?? 0);
+ bytes = (long)_state.Bytes;
+ fseq = _state.LastSeq + 1;
+ _state.FirstSeq = fseq;
+ _state.LastSeq = fseq - 1;
+ _state.FirstTime = default;
+ _state.Bytes = 0;
+ _state.Msgs = 0;
+ _msgs = new Dictionary();
+ _dmap = new SequenceSet();
+ cb = _scb;
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+
+ cb?.Invoke(-(long)purged, -bytes, 0, string.Empty);
+ return (purged, null);
+ }
+
+ ///
+ public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep)
+ {
+ // TODO: session 17 — full subject-filtered purge
+ if (string.IsNullOrEmpty(subject) || subject == ">")
+ {
+ if (keep == 0 && seq == 0)
+ return Purge();
+ }
+ return (0, null);
+ }
+
+ ///
+ public (ulong Purged, Exception? Error) Compact(ulong seq)
+ {
+ if (seq == 0)
+ return Purge();
+
+ ulong purged = 0;
+ ulong bytes = 0;
+ StorageUpdateHandler? cb = null;
+
+ _mu.EnterWriteLock();
+ try
+ {
+ if (_msgs == null || _state.FirstSeq > seq)
+ return (0, null);
+
+ cb = _scb;
+ if (seq <= _state.LastSeq)
+ {
+ var fseq = _state.FirstSeq;
+ for (var s = seq; s <= _state.LastSeq; s++)
+ {
+ if (_msgs.TryGetValue(s, out var sm2) && sm2 != null)
+ {
+ _state.FirstSeq = s;
+ _state.FirstTime = DateTimeOffset.FromUnixTimeMilliseconds(sm2.Ts / 1_000_000L).UtcDateTime;
+ break;
+ }
+ }
+ for (var s = seq - 1; s >= fseq; s--)
+ {
+ if (_msgs.TryGetValue(s, out var sm2) && sm2 != null)
+ {
+ bytes += MsgSize(sm2.Subject, sm2.Hdr, sm2.Msg);
+ purged++;
+ RemoveSeqPerSubject(sm2.Subject, s);
+ _msgs.Remove(s);
+ }
+ else if (!_dmap.IsEmpty)
+ {
+ _dmap.Delete(s);
+ }
+ if (s == 0) break;
+ }
+ if (purged > _state.Msgs) purged = _state.Msgs;
+ _state.Msgs -= purged;
+ if (bytes > _state.Bytes) bytes = _state.Bytes;
+ _state.Bytes -= bytes;
+ }
+ else
+ {
+ purged = (ulong)_msgs.Count;
+ bytes = _state.Bytes;
+ _state.Bytes = 0;
+ _state.Msgs = 0;
+ _state.FirstSeq = seq;
+ _state.FirstTime = default;
+ _state.LastSeq = seq - 1;
+ _msgs = new Dictionary();
+ _dmap = new SequenceSet();
+ }
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+
+ cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty);
+ return (purged, null);
+ }
+
+ ///
+ public void Truncate(ulong seq)
+ {
+ StorageUpdateHandler? cb;
+ ulong purged = 0;
+ ulong bytes = 0;
+
+ _mu.EnterWriteLock();
+ try
+ {
+ if (_msgs == null)
+ return;
+
+ cb = _scb;
+
+ if (seq == 0)
+ {
+ // Full reset
+ purged = (ulong)_msgs.Count;
+ bytes = _state.Bytes;
+ _state = new StreamState { LastTime = DateTime.UtcNow };
+ _msgs = new Dictionary();
+ _dmap = new SequenceSet();
+ }
+ else
+ {
+ DateTime lastTime = _state.LastTime;
+ if (_msgs.TryGetValue(seq, out var lsm) && lsm != null)
+ lastTime = DateTimeOffset.FromUnixTimeMilliseconds(lsm.Ts / 1_000_000L).UtcDateTime;
+
+ for (var i = _state.LastSeq; i > seq; i--)
+ {
+ if (_msgs.TryGetValue(i, out var sm) && sm != null)
+ {
+ purged++;
+ bytes += MsgSize(sm.Subject, sm.Hdr, sm.Msg);
+ RemoveSeqPerSubject(sm.Subject, i);
+ _msgs.Remove(i);
+ }
+ else if (!_dmap.IsEmpty)
+ {
+ _dmap.Delete(i);
+ }
+ }
+
+ _state.LastSeq = seq;
+ _state.LastTime = lastTime;
+ if (purged > _state.Msgs) purged = _state.Msgs;
+ _state.Msgs -= purged;
+ if (bytes > _state.Bytes) bytes = _state.Bytes;
+ _state.Bytes -= bytes;
+ }
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+
+ cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty);
+ }
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — state
+ // -----------------------------------------------------------------------
+
+ ///
+ public StreamState State()
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ var state = new StreamState
+ {
+ Msgs = _state.Msgs,
+ Bytes = _state.Bytes,
+ FirstSeq = _state.FirstSeq,
+ FirstTime = _state.FirstTime,
+ LastSeq = _state.LastSeq,
+ LastTime = _state.LastTime,
+ Consumers = _consumers,
+ NumSubjects = _fss.Size(),
+ };
+
+ var numDeleted = (int)((state.LastSeq - state.FirstSeq + 1) - state.Msgs);
+ if (numDeleted < 0) numDeleted = 0;
+
+ if (numDeleted > 0)
+ {
+ var deleted = new List(numDeleted);
+ var fseq = state.FirstSeq;
+ var lseq = state.LastSeq;
+ _dmap.Range(seq =>
+ {
+ if (seq >= fseq && seq <= lseq)
+ deleted.Add(seq);
+ return true;
+ });
+ state.Deleted = deleted.ToArray();
+ state.NumDeleted = deleted.Count;
+ }
+
+ return state;
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ ///
+ public void FastState(StreamState state)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ state.Msgs = _state.Msgs;
+ state.Bytes = _state.Bytes;
+ state.FirstSeq = _state.FirstSeq;
+ state.FirstTime = _state.FirstTime;
+ state.LastSeq = _state.LastSeq;
+ state.LastTime = _state.LastTime;
+ if (state.LastSeq > state.FirstSeq)
+ {
+ state.NumDeleted = (int)((state.LastSeq - state.FirstSeq + 1) - state.Msgs);
+ if (state.NumDeleted < 0) state.NumDeleted = 0;
+ }
+ state.Consumers = _consumers;
+ state.NumSubjects = _fss.Size();
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — filtered / pending state
+ // -----------------------------------------------------------------------
+
+ ///
+ public SimpleState FilteredState(ulong seq, string subject)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ return FilteredStateLocked(seq, subject);
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ private SimpleState FilteredStateLocked(ulong sseq, string filter)
+ {
+ if (sseq < _state.FirstSeq) sseq = _state.FirstSeq;
+ if (sseq > _state.LastSeq) return new SimpleState();
+ if (string.IsNullOrEmpty(filter)) filter = ">";
+
+ var isAll = filter == ">";
+ if (isAll && sseq <= _state.FirstSeq)
+ return new SimpleState { Msgs = _state.Msgs, First = _state.FirstSeq, Last = _state.LastSeq };
+
+ var ss = new SimpleState();
+ _fss.Match(Encoding.UTF8.GetBytes(filter), (subj, fss) =>
+ {
+ if (fss.FirstNeedsUpdate || fss.LastNeedsUpdate)
+ RecalculateForSubj(Encoding.UTF8.GetString(subj), fss);
+
+ if (sseq <= fss.First)
+ {
+ ss.Msgs += fss.Msgs;
+ if (ss.First == 0 || fss.First < ss.First) ss.First = fss.First;
+ if (fss.Last > ss.Last) ss.Last = fss.Last;
+ }
+ return true;
+ });
+ return ss;
+ }
+
+ ///
+ public Dictionary SubjectsState(string filterSubject)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (_fss.Size() == 0) return new Dictionary();
+ if (string.IsNullOrEmpty(filterSubject)) filterSubject = ">";
+ var result = new Dictionary();
+ _fss.Match(Encoding.UTF8.GetBytes(filterSubject), (subj, ss) =>
+ {
+ var s = Encoding.UTF8.GetString(subj);
+ if (ss.FirstNeedsUpdate || ss.LastNeedsUpdate)
+ RecalculateForSubj(s, ss);
+ if (!result.TryGetValue(s, out var oss) || oss.First == 0)
+ result[s] = new SimpleState { Msgs = ss.Msgs, First = ss.First, Last = ss.Last };
+ else
+ {
+ oss.Last = ss.Last;
+ oss.Msgs += ss.Msgs;
+ result[s] = oss;
+ }
+ return true;
+ });
+ return result;
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ ///
+ public Dictionary SubjectsTotals(string filterSubject)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ if (_fss.Size() == 0) return new Dictionary();
+ if (string.IsNullOrEmpty(filterSubject)) filterSubject = ">";
+ var result = new Dictionary();
+ _fss.Match(Encoding.UTF8.GetBytes(filterSubject), (subj, ss) =>
+ {
+ result[Encoding.UTF8.GetString(subj)] = ss.Msgs;
+ return true;
+ });
+ return result;
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ ///
+ public (ulong Total, ulong ValidThrough, Exception? Error) NumPending(ulong sseq, string filter, bool lastPerSubject)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ var ss = FilteredStateLocked(sseq, filter);
+ return (ss.Msgs, _state.LastSeq, null);
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ ///
+ public (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject)
+ {
+ // TODO: session 17 — implement gsl.SimpleSublist equivalent
+ return NumPending(sseq, ">", lastPerSubject);
+ }
+
+ ///
+ public (ulong[] Seqs, Exception? Error) AllLastSeqs()
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null);
+ var seqs = new List(_fss.Size());
+ _fss.IterFast((_, ss) =>
+ {
+ seqs.Add(ss.Last);
+ return true;
+ });
+ seqs.Sort();
+ return (seqs.ToArray(), null);
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ ///
+ public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null);
+ if (maxSeq == 0) maxSeq = _state.LastSeq;
+
+ var seqs = new List(64);
+ var seen = new HashSet();
+ foreach (var filter in filters)
+ {
+ _fss.Match(Encoding.UTF8.GetBytes(filter), (_, ss) =>
+ {
+ if (ss.Last <= maxSeq && seen.Add(ss.Last))
+ seqs.Add(ss.Last);
+ return true;
+ });
+ if (maxAllowed > 0 && seqs.Count > maxAllowed)
+ return (Array.Empty(), StoreErrors.ErrTooManyResults);
+ }
+ seqs.Sort();
+ return (seqs.ToArray(), null);
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ ///
+ public (string Subject, Exception? Error) SubjectForSeq(ulong seq)
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ if (_msgs == null) return (string.Empty, StoreErrors.ErrStoreClosed);
+ if (seq < _state.FirstSeq) return (string.Empty, StoreErrors.ErrStoreMsgNotFound);
+ if (_msgs.TryGetValue(seq, out var sm) && sm != null)
+ return (sm.Subject, null);
+ return (string.Empty, StoreErrors.ErrStoreMsgNotFound);
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — time / seq
+ // -----------------------------------------------------------------------
+
+ ///
+ public ulong GetSeqFromTime(DateTime t)
+ {
+ var ts = new DateTimeOffset(t).ToUnixTimeMilliseconds() * 1_000_000L;
+ _mu.EnterReadLock();
+ try
+ {
+ if (_msgs == null || _msgs.Count == 0)
+ return _state.LastSeq + 1;
+
+ if (!_msgs.TryGetValue(_state.FirstSeq, out var firstSm) || firstSm == null)
+ return _state.LastSeq + 1;
+
+ if (ts <= firstSm.Ts)
+ return _state.FirstSeq;
+
+ // Find actual last stored message
+ StoreMsg? lastSm = null;
+ for (var lseq = _state.LastSeq; lseq > _state.FirstSeq; lseq--)
+ {
+ if (_msgs.TryGetValue(lseq, out lastSm) && lastSm != null)
+ break;
+ }
+ if (lastSm == null) return _state.LastSeq + 1;
+ if (ts >= lastSm.Ts) return _state.LastSeq + 1;
+
+ // Linear scan fallback
+ for (var seq = _state.FirstSeq; seq <= _state.LastSeq; seq++)
+ {
+ if (_msgs.TryGetValue(seq, out var sm) && sm != null && sm.Ts >= ts)
+ return seq;
+ }
+ return _state.LastSeq + 1;
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — config, encoding, sync
+ // -----------------------------------------------------------------------
+
+ ///
+ public void UpdateConfig(StreamConfig cfg)
+ {
+ if (cfg == null) throw new ArgumentException("config required");
+ _mu.EnterWriteLock();
+ try
+ {
+ _cfg = cfg.Clone();
+ _maxp = cfg.MaxMsgsPer;
+ EnforceMsgLimit();
+ EnforceBytesLimit();
+ if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero)
+ StartAgeChk();
+ if (_ageChk != null && _cfg.MaxAge == TimeSpan.Zero)
+ {
+ _ageChk?.Dispose();
+ _ageChk = null;
+ _ageChkTime = 0;
+ }
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+ }
+
+ ///
+ public void RegisterStorageUpdates(StorageUpdateHandler cb)
+ {
+ _mu.EnterWriteLock();
+ _scb = cb;
+ _mu.ExitWriteLock();
+ }
+
+ ///
+ public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb)
+ {
+ _mu.EnterWriteLock();
+ _rmcb = cb;
+ _mu.ExitWriteLock();
+ }
+
+ ///
+ public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb)
+ {
+ _mu.EnterWriteLock();
+ _pmsgcb = cb;
+ _mu.ExitWriteLock();
+ }
+
+ ///
+ public void FlushAllPending()
+ {
+ // No-op: in-memory store doesn't use async applying.
+ }
+
+ ///
+ public (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed)
+ {
+ // TODO: session 17 — binary encode using varint encoding matching Go
+ _mu.EnterReadLock();
+ try
+ {
+ // Minimal encoded representation (stub): magic + version bytes
+ return (new byte[] { 42, 1 }, null);
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ ///
+ public void SyncDeleted(DeleteBlocks dbs)
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (dbs.Count == 1)
+ {
+ var (min, max, num) = _dmap.State();
+ var (pmin, pmax, pnum) = dbs[0].GetState();
+ if (pmin == min && pmax == max && pnum == num)
+ return;
+ }
+
+ var lseq = _state.LastSeq;
+ foreach (var db in dbs)
+ {
+ var (first, _, _) = db.GetState();
+ if (first > lseq) continue;
+ db.Range(seq => { RemoveMsgLocked(seq, false); return true; });
+ }
+ }
+ finally
+ {
+ if (_mu.IsWriteLockHeld)
+ _mu.ExitWriteLock();
+ }
+ }
+
+ ///
+ public void ResetState()
+ {
+ // For memory store, nothing to reset.
+ }
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — type / stop / delete
+ // -----------------------------------------------------------------------
+
+ ///
+ public StorageType Type() => StorageType.MemoryStorage;
+
+ ///
+ public void Stop()
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (_msgs == null) return;
+ _ageChk?.Dispose();
+ _ageChk = null;
+ _ageChkTime = 0;
+ _msgs = null;
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+
+ Purge();
+ }
+
+ ///
+ public void Delete(bool inline) => Stop();
+
+ // -----------------------------------------------------------------------
+ // IStreamStore — consumers
+ // -----------------------------------------------------------------------
+
+ ///
+ public IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg)
+ {
+ if (_msgs == null)
+ throw StoreErrors.ErrStoreClosed;
+ if (cfg == null || string.IsNullOrEmpty(name))
+ throw new ArgumentException("bad consumer config");
+
+ var o = new ConsumerMemStore(this, cfg);
+ AddConsumer(o);
+ return o;
+ }
+
+ ///
+ public void AddConsumer(IConsumerStore o)
+ {
+ _mu.EnterWriteLock();
+ _consumers++;
+ _mu.ExitWriteLock();
+ }
+
+ ///
+ public void RemoveConsumer(IConsumerStore o)
+ {
+ _mu.EnterWriteLock();
+ if (_consumers > 0) _consumers--;
+ _mu.ExitWriteLock();
+ }
+
+ ///
+ public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs)
+ {
+ // TODO: session 17 — not implemented for memory store
+ return (null, new NotImplementedException("no impl"));
+ }
+
+ ///
+ public (ulong Total, ulong Reported, Exception? Error) Utilization()
+ {
+ _mu.EnterReadLock();
+ try
+ {
+ return (_state.Bytes, _state.Bytes, null);
+ }
+ finally
+ {
+ _mu.ExitReadLock();
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // Private helpers
+ // -----------------------------------------------------------------------
+
+ // Lock must be held.
+ private void EnforceMsgLimit()
+ {
+ if (_cfg.Discard != DiscardPolicy.DiscardOld) return;
+ if (_cfg.MaxMsgs <= 0 || _state.Msgs <= (ulong)_cfg.MaxMsgs) return;
+ while (_state.Msgs > (ulong)_cfg.MaxMsgs)
+ RemoveMsgLocked(_state.FirstSeq, false);
+ }
+
+ // Lock must be held.
+ private void EnforceBytesLimit()
+ {
+ if (_cfg.Discard != DiscardPolicy.DiscardOld) return;
+ if (_cfg.MaxBytes <= 0 || _state.Bytes <= (ulong)_cfg.MaxBytes) return;
+ while (_state.Bytes > (ulong)_cfg.MaxBytes)
+ RemoveMsgLocked(_state.FirstSeq, false);
+ }
+
+ // Lock must be held.
+ private void EnforcePerSubjectLimit(string subj, SimpleState ss)
+ {
+ if (_maxp <= 0) return;
+ while (ss.Msgs > (ulong)_maxp)
+ {
+ if (ss.FirstNeedsUpdate || ss.LastNeedsUpdate)
+ RecalculateForSubj(subj, ss);
+ if (!RemoveMsgLocked(ss.First, false))
+ break;
+ }
+ }
+
+ // Lock must be held.
+ private void UpdateFirstSeq(ulong seq)
+ {
+ if (seq != _state.FirstSeq) return;
+
+ StoreMsg? nsm = null;
+ for (var nseq = _state.FirstSeq + 1; nseq <= _state.LastSeq; nseq++)
+ {
+ if (_msgs != null && _msgs.TryGetValue(nseq, out nsm) && nsm != null)
+ break;
+ }
+
+ var oldFirst = _state.FirstSeq;
+ if (nsm != null)
+ {
+ _state.FirstSeq = nsm.Seq;
+ _state.FirstTime = DateTimeOffset.FromUnixTimeMilliseconds(nsm.Ts / 1_000_000L).UtcDateTime;
+ }
+ else
+ {
+ _state.FirstSeq = _state.LastSeq + 1;
+ _state.FirstTime = default;
+ }
+
+ if (oldFirst == _state.FirstSeq - 1)
+ _dmap.Delete(oldFirst);
+ else
+ for (var s = oldFirst; s < _state.FirstSeq; s++)
+ _dmap.Delete(s);
+ }
+
+ // Lock must be held.
+ private void RemoveSeqPerSubject(string subj, ulong seq)
+ {
+ if (string.IsNullOrEmpty(subj)) return;
+ var subjectBytes = Encoding.UTF8.GetBytes(subj);
+ var (ss, found) = _fss.Find(subjectBytes);
+ if (!found || ss == null) return;
+
+ if (ss.Msgs == 1)
+ {
+ _fss.Delete(subjectBytes);
+ return;
+ }
+ ss.Msgs--;
+ if (ss.Msgs == 1)
+ {
+ if (!ss.LastNeedsUpdate && seq != ss.Last)
+ {
+ ss.First = ss.Last;
+ ss.FirstNeedsUpdate = false;
+ return;
+ }
+ if (!ss.FirstNeedsUpdate && seq != ss.First)
+ {
+ ss.Last = ss.First;
+ ss.LastNeedsUpdate = false;
+ return;
+ }
+ }
+ ss.FirstNeedsUpdate = seq == ss.First || ss.FirstNeedsUpdate;
+ ss.LastNeedsUpdate = seq == ss.Last || ss.LastNeedsUpdate;
+ }
+
+ // Lock must be held.
+ private void RecalculateForSubj(string subj, SimpleState ss)
+ {
+ if (_msgs == null) return;
+ if (ss.FirstNeedsUpdate)
+ {
+ var tseq = ss.First + 1;
+ if (tseq < _state.FirstSeq) tseq = _state.FirstSeq;
+ for (; tseq <= ss.Last; tseq++)
+ {
+ if (_msgs.TryGetValue(tseq, out var sm) && sm != null && sm.Subject == subj)
+ {
+ ss.First = tseq;
+ ss.FirstNeedsUpdate = false;
+ if (ss.Msgs == 1) { ss.Last = tseq; ss.LastNeedsUpdate = false; return; }
+ break;
+ }
+ }
+ }
+ if (ss.LastNeedsUpdate)
+ {
+ var tseq = ss.Last - 1;
+ if (tseq > _state.LastSeq) tseq = _state.LastSeq;
+ for (; tseq >= ss.First; tseq--)
+ {
+ if (_msgs.TryGetValue(tseq, out var sm) && sm != null && sm.Subject == subj)
+ {
+ ss.Last = tseq;
+ ss.LastNeedsUpdate = false;
+ if (ss.Msgs == 1) { ss.First = tseq; ss.FirstNeedsUpdate = false; }
+ return;
+ }
+ if (tseq == 0) break;
+ }
+ }
+ }
+
+ private void StartAgeChk()
+ {
+ if (_ageChk != null) return;
+ if (_cfg.MaxAge != TimeSpan.Zero)
+ _ageChk = new Timer(_ => ExpireMsgs(), null, _cfg.MaxAge, _cfg.MaxAge);
+ }
+
+ private void ExpireMsgs()
+ {
+ // TODO: session 17 — full age/TTL expiry logic
+ _mu.EnterWriteLock();
+ try
+ {
+ if (_msgs == null || _cfg.MaxAge == TimeSpan.Zero) return;
+ var minAge = DateTime.UtcNow - _cfg.MaxAge;
+ var minTs = new DateTimeOffset(minAge).ToUnixTimeMilliseconds() * 1_000_000L;
+ var toRemove = new List();
+ foreach (var kv in _msgs)
+ {
+ if (kv.Value.Ts <= minTs)
+ toRemove.Add(kv.Key);
+ }
+ foreach (var seq in toRemove)
+ RemoveMsgLocked(seq, false);
+ }
+ finally
+ {
+ if (_mu.IsWriteLockHeld)
+ _mu.ExitWriteLock();
+ }
+ }
+
+ private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg)
+ => (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16);
+
+ private static bool MatchLiteral(string subject, string filter)
+ {
+ var sParts = subject.Split('.');
+ var fParts = filter.Split('.');
+ return IsSubsetMatch(sParts, fParts);
+ }
+
+ private static bool IsSubsetMatch(string[] subject, string[] filter)
+ {
+ var si = 0;
+ for (var fi = 0; fi < filter.Length; fi++)
+ {
+ if (filter[fi] == ">")
+ return si < subject.Length;
+ if (si >= subject.Length)
+ return false;
+ if (filter[fi] != "*" && filter[fi] != subject[si])
+ return false;
+ si++;
+ }
+ return si == subject.Length;
+ }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs
new file mode 100644
index 0000000..46434ec
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs
@@ -0,0 +1,1000 @@
+// Copyright 2019-2026 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/store.go, server/stream.go, server/consumer.go, server/filestore.go, server/disk_avail.go
+
+using System.Text.Json.Serialization;
+using ZB.MOM.NatsNet.Server.Internal.DataStructures;
+
+namespace ZB.MOM.NatsNet.Server;
+
+// ---------------------------------------------------------------------------
+// StorageType
+// ---------------------------------------------------------------------------
+
+/// Determines how messages are stored for retention.
+public enum StorageType
+{
+ /// On disk, designated by the JetStream config StoreDir.
+ FileStorage = 22,
+
+ /// In memory only.
+ MemoryStorage = 33,
+}
+
+// ---------------------------------------------------------------------------
+// Well-known store errors
+// ---------------------------------------------------------------------------
+
+public static class StoreErrors
+{
+ /// Returned when the store has been closed.
+ public static readonly Exception ErrStoreClosed = new InvalidOperationException("store is closed");
+
+ /// Returned when a message was not found but was expected.
+ public static readonly Exception ErrStoreMsgNotFound = new KeyNotFoundException("no message found");
+
+ /// Returned when message seq is greater than the last sequence.
+ public static readonly Exception ErrStoreEOF = new EndOfStreamException("stream store EOF");
+
+ /// Returned when DiscardNew is set and the message limit is reached.
+ public static readonly Exception ErrMaxMsgs = new InvalidOperationException("maximum messages exceeded");
+
+ /// Returned when DiscardNew is set and the bytes limit is reached.
+ public static readonly Exception ErrMaxBytes = new InvalidOperationException("maximum bytes exceeded");
+
+ /// Returned when DiscardNew is set and the per-subject message limit is reached.
+ public static readonly Exception ErrMaxMsgsPerSubject = new InvalidOperationException("maximum messages per subject exceeded");
+
+ /// Returned when RemoveMsg or EraseMsg is called while a snapshot is in progress.
+ public static readonly Exception ErrStoreSnapshotInProgress = new InvalidOperationException("snapshot in progress");
+
+ /// Returned when a message is considered too large.
+ public static readonly Exception ErrMsgTooLarge = new InvalidOperationException("message too large");
+
+ /// Returned when accessing the wrong storage type.
+ public static readonly Exception ErrStoreWrongType = new InvalidOperationException("wrong storage type");
+
+ /// Returned when trying to update a consumer's acks with no ack policy.
+ public static readonly Exception ErrNoAckPolicy = new InvalidOperationException("ack policy is none");
+
+ /// Returned when storing a raw message with a mismatched expected sequence.
+ public static readonly Exception ErrSequenceMismatch = new InvalidOperationException("expected sequence does not match store");
+
+ /// Returned when a stream state snapshot is corrupt.
+ public static readonly Exception ErrCorruptStreamState = new InvalidDataException("stream state snapshot is corrupt");
+
+ /// Returned when a request yields too many matching results.
+ public static readonly Exception ErrTooManyResults = new InvalidOperationException("too many matching results for request");
+
+ /// Returned when a stream state encoding is malformed.
+ public static readonly Exception ErrBadStreamStateEncoding = new InvalidDataException("bad stream state encoding");
+}
+
+// ---------------------------------------------------------------------------
+// StoreMsg — stored message format
+// ---------------------------------------------------------------------------
+
+///
+/// The stored message format for messages retained by the Store layer.
+///
+public sealed class StoreMsg
+{
+ public string Subject { get; set; } = string.Empty;
+ public byte[] Hdr { get; set; } = Array.Empty();
+ public byte[] Msg { get; set; } = Array.Empty();
+ public byte[] Buf { get; set; } = Array.Empty();
+ public ulong Seq { get; set; }
+ public long Ts { get; set; }
+
+ /// Copy fields from another StoreMsg into this one.
+ public void CopyFrom(StoreMsg src)
+ {
+ var newBuf = new byte[src.Buf.Length];
+ src.Buf.CopyTo(newBuf, 0);
+ Buf = newBuf;
+ Hdr = newBuf[..src.Hdr.Length];
+ Msg = newBuf[src.Hdr.Length..];
+ Subject = src.Subject;
+ Seq = src.Seq;
+ Ts = src.Ts;
+ }
+
+ /// Clear all fields, resetting to an empty message.
+ public void Clear()
+ {
+ Subject = string.Empty;
+ Hdr = Array.Empty();
+ Msg = Array.Empty();
+ Buf = Array.Empty();
+ Seq = 0;
+ Ts = 0;
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Delegate types
+// ---------------------------------------------------------------------------
+
+///
+/// Callback into the upper layers to report on changes in storage resources.
+/// For single-message cases, also supplies sequence number and subject.
+///
+public delegate void StorageUpdateHandler(long msgs, long bytes, ulong seq, string subj);
+
+/// Callback into the upper layers to remove a message.
+public delegate void StorageRemoveMsgHandler(ulong seq);
+
+///
+/// Callback into the upper layers to process a JetStream message.
+/// Will propose the message if the stream is replicated.
+///
+public delegate void ProcessJetStreamMsgHandler(object? msg);
+
+// ---------------------------------------------------------------------------
+// IStreamStore interface
+// ---------------------------------------------------------------------------
+
+/// Abstraction over JetStream stream storage backends.
+public interface IStreamStore
+{
+ (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[]? msg, long ttl);
+ void StoreRawMsg(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck);
+ (ulong Seq, Exception? Error) SkipMsg(ulong seq);
+ void SkipMsgs(ulong seq, ulong num);
+ void FlushAllPending();
+ StoreMsg? LoadMsg(ulong seq, StoreMsg? sm);
+ (StoreMsg? Sm, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? smp);
+ (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp);
+ StoreMsg? LoadLastMsg(string subject, StoreMsg? sm);
+ (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp);
+ (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp);
+ (bool Removed, Exception? Error) RemoveMsg(ulong seq);
+ (bool Removed, Exception? Error) EraseMsg(ulong seq);
+ (ulong Purged, Exception? Error) Purge();
+ (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep);
+ (ulong Purged, Exception? Error) Compact(ulong seq);
+ void Truncate(ulong seq);
+ ulong GetSeqFromTime(DateTime t);
+ SimpleState FilteredState(ulong seq, string subject);
+ Dictionary SubjectsState(string filterSubject);
+ Dictionary SubjectsTotals(string filterSubject);
+ (ulong[] Seqs, Exception? Error) AllLastSeqs();
+ (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed);
+ (string Subject, Exception? Error) SubjectForSeq(ulong seq);
+ (ulong Total, ulong ValidThrough, Exception? Error) NumPending(ulong sseq, string filter, bool lastPerSubject);
+ (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject);
+ StreamState State();
+ void FastState(StreamState state);
+ (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed);
+ void SyncDeleted(DeleteBlocks dbs);
+ StorageType Type();
+ void RegisterStorageUpdates(StorageUpdateHandler cb);
+ void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb);
+ void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb);
+ void UpdateConfig(StreamConfig cfg);
+ void Delete(bool inline);
+ void Stop();
+ IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg);
+ void AddConsumer(IConsumerStore o);
+ void RemoveConsumer(IConsumerStore o);
+ (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs);
+ (ulong Total, ulong Reported, Exception? Error) Utilization();
+ void ResetState();
+}
+
+// ---------------------------------------------------------------------------
+// RetentionPolicy
+// ---------------------------------------------------------------------------
+
+/// Determines how messages in a stream are retained.
+public enum RetentionPolicy
+{
+ /// Messages are retained until any given limit is reached.
+ LimitsPolicy = 0,
+
+ /// Messages are removed when all known consumers have acknowledged them.
+ InterestPolicy,
+
+ /// Messages are removed when the first worker acknowledges them.
+ WorkQueuePolicy,
+}
+
+// ---------------------------------------------------------------------------
+// DiscardPolicy
+// ---------------------------------------------------------------------------
+
+/// Determines how the store proceeds when message or byte limits are hit.
+public enum DiscardPolicy
+{
+ /// Remove older messages to return to the limits.
+ DiscardOld = 0,
+
+ /// Fail the StoreMsg call when limits are exceeded.
+ DiscardNew,
+}
+
+// ---------------------------------------------------------------------------
+// StreamState
+// ---------------------------------------------------------------------------
+
+/// Information about the given stream.
+public sealed class StreamState
+{
+ [JsonPropertyName("messages")]
+ public ulong Msgs { get; set; }
+
+ [JsonPropertyName("bytes")]
+ public ulong Bytes { get; set; }
+
+ [JsonPropertyName("first_seq")]
+ public ulong FirstSeq { get; set; }
+
+ [JsonPropertyName("first_ts")]
+ public DateTime FirstTime { get; set; }
+
+ [JsonPropertyName("last_seq")]
+ public ulong LastSeq { get; set; }
+
+ [JsonPropertyName("last_ts")]
+ public DateTime LastTime { get; set; }
+
+ [JsonPropertyName("num_subjects")]
+ public int NumSubjects { get; set; }
+
+ [JsonPropertyName("subjects")]
+ public Dictionary? Subjects { get; set; }
+
+ [JsonPropertyName("num_deleted")]
+ public int NumDeleted { get; set; }
+
+ [JsonPropertyName("deleted")]
+ public ulong[]? Deleted { get; set; }
+
+ [JsonPropertyName("lost")]
+ public LostStreamData? Lost { get; set; }
+
+ [JsonPropertyName("consumer_count")]
+ public int Consumers { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// SimpleState
+// ---------------------------------------------------------------------------
+
+/// Filtered subject-specific state.
+public sealed class SimpleState
+{
+ [JsonPropertyName("messages")]
+ public ulong Msgs { get; set; }
+
+ [JsonPropertyName("first_seq")]
+ public ulong First { get; set; }
+
+ [JsonPropertyName("last_seq")]
+ public ulong Last { get; set; }
+
+ // Internal: the first/last need to be recalculated before use.
+ internal bool FirstNeedsUpdate { get; set; }
+ internal bool LastNeedsUpdate { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// LostStreamData
+// ---------------------------------------------------------------------------
+
+/// Indicates messages that have been lost.
+public sealed class LostStreamData
+{
+ [JsonPropertyName("msgs")]
+ public ulong[] Msgs { get; set; } = Array.Empty();
+
+ [JsonPropertyName("bytes")]
+ public ulong Bytes { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// SnapshotResult
+// ---------------------------------------------------------------------------
+
+/// Contains information about a snapshot operation.
+public sealed class SnapshotResult
+{
+ public Stream? Reader { get; set; }
+ public StreamState State { get; set; } = new();
+}
+
+// ---------------------------------------------------------------------------
+// DeleteBlock / DeleteBlocks
+// ---------------------------------------------------------------------------
+
+///
+/// Interface for a block of deleted sequences.
+/// Implementations include AVL seqsets, run-length encoded ranges, and legacy slices.
+///
+public interface IDeleteBlock
+{
+ (ulong First, ulong Last, ulong Num) GetState();
+ void Range(Func f);
+}
+
+/// A list of delete blocks.
+public sealed class DeleteBlocks : List
+{
+ public ulong NumDeleted()
+ {
+ ulong total = 0;
+ foreach (var db in this)
+ {
+ var (_, _, num) = db.GetState();
+ total += num;
+ }
+ return total;
+ }
+}
+
+// ---------------------------------------------------------------------------
+// DeleteRange — run-length encoded delete range
+// ---------------------------------------------------------------------------
+
+/// Run-length encoded delete range.
+public sealed class DeleteRange : IDeleteBlock
+{
+ public ulong First { get; set; }
+ public ulong Num { get; set; }
+
+ public (ulong First, ulong Last, ulong Num) GetState()
+ {
+ var deletesAfterFirst = Num > 0 ? Num - 1 : 0;
+ return (First, First + deletesAfterFirst, Num);
+ }
+
+ public void Range(Func f)
+ {
+ for (var seq = First; seq < First + Num; seq++)
+ {
+ if (!f(seq))
+ return;
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// DeleteSlice — legacy []uint64 equivalent
+// ---------------------------------------------------------------------------
+
+/// Legacy slice-based delete block.
+public sealed class DeleteSlice : IDeleteBlock
+{
+ private readonly ulong[] _seqs;
+
+ public DeleteSlice(ulong[] seqs)
+ {
+ _seqs = seqs;
+ }
+
+ public (ulong First, ulong Last, ulong Num) GetState()
+ {
+ if (_seqs.Length == 0)
+ return (0, 0, 0);
+ return (_seqs[0], _seqs[^1], (ulong)_seqs.Length);
+ }
+
+ public void Range(Func f)
+ {
+ foreach (var seq in _seqs)
+ {
+ if (!f(seq))
+ return;
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// StreamReplicatedState
+// ---------------------------------------------------------------------------
+
+///
+/// Represents what is encoded in a binary stream snapshot used for stream
+/// replication in an NRG (NATS Raft Group).
+///
+public sealed class StreamReplicatedState
+{
+ public ulong Msgs { get; set; }
+ public ulong Bytes { get; set; }
+ public ulong FirstSeq { get; set; }
+ public ulong LastSeq { get; set; }
+ public ulong Failed { get; set; }
+ public DeleteBlocks Deleted { get; set; } = new();
+}
+
+// ---------------------------------------------------------------------------
+// IConsumerStore interface
+// ---------------------------------------------------------------------------
+
+/// Stores state on consumers for streams.
+public interface IConsumerStore
+{
+ void SetStarting(ulong sseq);
+ void UpdateStarting(ulong sseq);
+ void Reset(ulong sseq);
+ bool HasState();
+ void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts);
+ void UpdateAcks(ulong dseq, ulong sseq);
+ void UpdateConfig(ConsumerConfig cfg);
+ void Update(ConsumerState state);
+ (ConsumerState? State, Exception? Error) State();
+ (ConsumerState? State, Exception? Error) BorrowState();
+ byte[] EncodedState();
+ StorageType Type();
+ void Stop();
+ void Delete();
+ void StreamDelete();
+}
+
+// ---------------------------------------------------------------------------
+// SequencePair
+// ---------------------------------------------------------------------------
+
+///
+/// Has both the consumer and the stream sequence. They point to the same message.
+///
+public sealed class SequencePair
+{
+ [JsonPropertyName("consumer_seq")]
+ public ulong Consumer { get; set; }
+
+ [JsonPropertyName("stream_seq")]
+ public ulong Stream { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// Pending
+// ---------------------------------------------------------------------------
+
+///
+/// Represents a pending message for explicit ack or ack-all.
+/// Sequence is the original consumer sequence.
+///
+public sealed class Pending
+{
+ public ulong Sequence { get; set; }
+ public long Timestamp { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// ConsumerState
+// ---------------------------------------------------------------------------
+
+/// Represents a stored state for a consumer.
+public sealed class ConsumerState
+{
+ [JsonPropertyName("delivered")]
+ public SequencePair Delivered { get; set; } = new();
+
+ [JsonPropertyName("ack_floor")]
+ public SequencePair AckFloor { get; set; } = new();
+
+ ///
+ /// Pending messages and the timestamp for the delivered time.
+ /// Only present when AckPolicy is ExplicitAck.
+ /// Keys are stream sequence numbers.
+ ///
+ [JsonPropertyName("pending")]
+ public Dictionary? Pending { get; set; }
+
+ ///
+ /// Messages that have been redelivered (count > 1).
+ /// Keys are stream sequence numbers; values are delivery counts minus one.
+ ///
+ [JsonPropertyName("redelivered")]
+ public Dictionary? Redelivered { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// AckPolicy
+// ---------------------------------------------------------------------------
+
+/// Determines how the consumer should acknowledge delivered messages.
+public enum AckPolicy
+{
+ /// No acks required for delivered messages.
+ AckNone = 0,
+
+ /// Acking a sequence implicitly acks all sequences below it.
+ AckAll,
+
+ /// Requires explicit ack or nack for all messages.
+ AckExplicit,
+}
+
+// ---------------------------------------------------------------------------
+// ReplayPolicy
+// ---------------------------------------------------------------------------
+
+/// Determines how the consumer replays messages already queued in the stream.
+public enum ReplayPolicy
+{
+ /// Replay messages as fast as possible.
+ ReplayInstant = 0,
+
+ /// Maintain the same timing as when the messages were received.
+ ReplayOriginal,
+}
+
+// ---------------------------------------------------------------------------
+// DeliverPolicy
+// ---------------------------------------------------------------------------
+
+/// Determines how the consumer selects the first message to deliver.
+public enum DeliverPolicy
+{
+ /// Deliver all messages (default).
+ DeliverAll = 0,
+
+ /// Start with the last sequence received.
+ DeliverLast,
+
+ /// Only deliver new messages sent after the consumer is created.
+ DeliverNew,
+
+ /// Look for a defined starting sequence.
+ DeliverByStartSequence,
+
+ /// Select the first message with a timestamp >= StartTime.
+ DeliverByStartTime,
+
+ /// Start with the last message for all subjects received.
+ DeliverLastPerSubject,
+}
+
+// ---------------------------------------------------------------------------
+// PriorityPolicy
+// ---------------------------------------------------------------------------
+
+/// Policy for selecting messages based on priority.
+public enum PriorityPolicy
+{
+ PriorityNone = 0,
+ PriorityOverflow,
+ PriorityPinnedClient,
+ PriorityPrioritized,
+}
+
+// StoreCipher is already defined in ServerOptionTypes.cs — no duplicate needed here.
+
+// ---------------------------------------------------------------------------
+// StoreCompression
+// ---------------------------------------------------------------------------
+
+/// Compression algorithm used for stream data.
+public enum StoreCompression : byte
+{
+ NoCompression = 0,
+ S2Compression,
+}
+
+// ---------------------------------------------------------------------------
+// PersistModeType
+// ---------------------------------------------------------------------------
+
+/// Determines what persistence mode the stream uses.
+public enum PersistModeType
+{
+ DefaultPersistMode = 0,
+ AsyncPersistMode,
+}
+
+// ---------------------------------------------------------------------------
+// Placement
+// ---------------------------------------------------------------------------
+
+///
+/// Guides placement of streams and meta controllers in clustered JetStream.
+///
+public sealed class Placement
+{
+ [JsonPropertyName("cluster")]
+ public string? Cluster { get; set; }
+
+ [JsonPropertyName("tags")]
+ public string[]? Tags { get; set; }
+
+ [JsonPropertyName("preferred")]
+ public string? Preferred { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// SubjectTransformConfig
+// ---------------------------------------------------------------------------
+
+///
+/// Applies a subject transform to incoming messages before storing.
+///
+public sealed class SubjectTransformConfig
+{
+ [JsonPropertyName("src")]
+ public string Source { get; set; } = string.Empty;
+
+ [JsonPropertyName("dest")]
+ public string Destination { get; set; } = string.Empty;
+}
+
+// ---------------------------------------------------------------------------
+// RePublish
+// ---------------------------------------------------------------------------
+
+/// Configuration for republishing messages once committed to a stream.
+public sealed class RePublish
+{
+ [JsonPropertyName("src")]
+ public string? Source { get; set; }
+
+ [JsonPropertyName("dest")]
+ public string Destination { get; set; } = string.Empty;
+
+ [JsonPropertyName("headers_only")]
+ public bool HeadersOnly { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// ExternalStream
+// ---------------------------------------------------------------------------
+
+/// Qualifies access to a stream source in another account or domain.
+public sealed class ExternalStream
+{
+ [JsonPropertyName("api")]
+ public string ApiPrefix { get; set; } = string.Empty;
+
+ [JsonPropertyName("deliver")]
+ public string DeliverPrefix { get; set; } = string.Empty;
+}
+
+// ---------------------------------------------------------------------------
+// StreamSource
+// ---------------------------------------------------------------------------
+
+/// Dictates how streams can source from other streams.
+public sealed class StreamSource
+{
+ [JsonPropertyName("name")]
+ public string Name { get; set; } = string.Empty;
+
+ [JsonPropertyName("opt_start_seq")]
+ public ulong OptStartSeq { get; set; }
+
+ [JsonPropertyName("opt_start_time")]
+ public DateTime? OptStartTime { get; set; }
+
+ [JsonPropertyName("filter_subject")]
+ public string? FilterSubject { get; set; }
+
+ [JsonPropertyName("subject_transforms")]
+ public SubjectTransformConfig[]? SubjectTransforms { get; set; }
+
+ [JsonPropertyName("external")]
+ public ExternalStream? External { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// StreamConsumerLimits
+// ---------------------------------------------------------------------------
+
+/// Limits applied to consumers created against a stream.
+public sealed class StreamConsumerLimits
+{
+ [JsonPropertyName("inactive_threshold")]
+ public TimeSpan InactiveThreshold { get; set; }
+
+ [JsonPropertyName("max_ack_pending")]
+ public int MaxAckPending { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// StreamConfig
+// ---------------------------------------------------------------------------
+
+///
+/// Determines the name, subjects, and retention policy for a given stream.
+/// If Subjects is empty the Name will be used.
+///
+public sealed class StreamConfig
+{
+ [JsonPropertyName("name")]
+ public string Name { get; set; } = string.Empty;
+
+ [JsonPropertyName("description")]
+ public string? Description { get; set; }
+
+ [JsonPropertyName("subjects")]
+ public string[]? Subjects { get; set; }
+
+ [JsonPropertyName("retention")]
+ public RetentionPolicy Retention { get; set; }
+
+ [JsonPropertyName("max_consumers")]
+ public int MaxConsumers { get; set; }
+
+ [JsonPropertyName("max_msgs")]
+ public long MaxMsgs { get; set; }
+
+ [JsonPropertyName("max_bytes")]
+ public long MaxBytes { get; set; }
+
+ [JsonPropertyName("max_age")]
+ public TimeSpan MaxAge { get; set; }
+
+ [JsonPropertyName("max_msgs_per_subject")]
+ public long MaxMsgsPer { get; set; }
+
+ [JsonPropertyName("max_msg_size")]
+ public int MaxMsgSize { get; set; }
+
+ [JsonPropertyName("discard")]
+ public DiscardPolicy Discard { get; set; }
+
+ [JsonPropertyName("storage")]
+ public StorageType Storage { get; set; }
+
+ [JsonPropertyName("num_replicas")]
+ public int Replicas { get; set; }
+
+ [JsonPropertyName("no_ack")]
+ public bool NoAck { get; set; }
+
+ [JsonPropertyName("duplicate_window")]
+ public TimeSpan Duplicates { get; set; }
+
+ [JsonPropertyName("placement")]
+ public Placement? Placement { get; set; }
+
+ [JsonPropertyName("mirror")]
+ public StreamSource? Mirror { get; set; }
+
+ [JsonPropertyName("sources")]
+ public StreamSource[]? Sources { get; set; }
+
+ [JsonPropertyName("compression")]
+ public StoreCompression Compression { get; set; }
+
+ [JsonPropertyName("first_seq")]
+ public ulong FirstSeq { get; set; }
+
+ [JsonPropertyName("subject_transform")]
+ public SubjectTransformConfig? SubjectTransform { get; set; }
+
+ [JsonPropertyName("republish")]
+ public RePublish? RePublish { get; set; }
+
+ [JsonPropertyName("allow_direct")]
+ public bool AllowDirect { get; set; }
+
+ [JsonPropertyName("mirror_direct")]
+ public bool MirrorDirect { get; set; }
+
+ [JsonPropertyName("discard_new_per_subject")]
+ public bool DiscardNewPer { get; set; }
+
+ [JsonPropertyName("sealed")]
+ public bool Sealed { get; set; }
+
+ [JsonPropertyName("deny_delete")]
+ public bool DenyDelete { get; set; }
+
+ [JsonPropertyName("deny_purge")]
+ public bool DenyPurge { get; set; }
+
+ [JsonPropertyName("allow_rollup_hdrs")]
+ public bool AllowRollup { get; set; }
+
+ [JsonPropertyName("consumer_limits")]
+ public StreamConsumerLimits ConsumerLimits { get; set; } = new();
+
+ [JsonPropertyName("allow_msg_ttl")]
+ public bool AllowMsgTTL { get; set; }
+
+ [JsonPropertyName("subject_delete_marker_ttl")]
+ public TimeSpan SubjectDeleteMarkerTTL { get; set; }
+
+ [JsonPropertyName("allow_msg_counter")]
+ public bool AllowMsgCounter { get; set; }
+
+ [JsonPropertyName("allow_atomic")]
+ public bool AllowAtomicPublish { get; set; }
+
+ [JsonPropertyName("allow_msg_schedules")]
+ public bool AllowMsgSchedules { get; set; }
+
+ [JsonPropertyName("persist_mode")]
+ public PersistModeType PersistMode { get; set; }
+
+ [JsonPropertyName("metadata")]
+ public Dictionary? Metadata { get; set; }
+
+ /// Performs a deep copy of this StreamConfig.
+ public StreamConfig Clone()
+ {
+ var clone = (StreamConfig)MemberwiseClone();
+ if (Placement != null)
+ clone.Placement = new Placement { Cluster = Placement.Cluster, Tags = Placement.Tags?.ToArray(), Preferred = Placement.Preferred };
+ if (Mirror != null)
+ clone.Mirror = new StreamSource { Name = Mirror.Name, OptStartSeq = Mirror.OptStartSeq, OptStartTime = Mirror.OptStartTime, FilterSubject = Mirror.FilterSubject, SubjectTransforms = Mirror.SubjectTransforms?.ToArray(), External = Mirror.External };
+ if (Sources != null)
+ clone.Sources = Sources.ToArray();
+ if (SubjectTransform != null)
+ clone.SubjectTransform = new SubjectTransformConfig { Source = SubjectTransform.Source, Destination = SubjectTransform.Destination };
+ if (RePublish != null)
+ clone.RePublish = new RePublish { Source = RePublish.Source, Destination = RePublish.Destination, HeadersOnly = RePublish.HeadersOnly };
+ if (Metadata != null)
+ clone.Metadata = new Dictionary(Metadata);
+ if (Subjects != null)
+ clone.Subjects = Subjects.ToArray();
+ return clone;
+ }
+}
+
+// ---------------------------------------------------------------------------
+// ConsumerConfig
+// ---------------------------------------------------------------------------
+
+/// Configuration for a JetStream consumer.
+public sealed class ConsumerConfig
+{
+ [JsonPropertyName("durable_name")]
+ public string? Durable { get; set; }
+
+ [JsonPropertyName("name")]
+ public string? Name { get; set; }
+
+ [JsonPropertyName("description")]
+ public string? Description { get; set; }
+
+ [JsonPropertyName("deliver_policy")]
+ public DeliverPolicy DeliverPolicy { get; set; }
+
+ [JsonPropertyName("opt_start_seq")]
+ public ulong OptStartSeq { get; set; }
+
+ [JsonPropertyName("opt_start_time")]
+ public DateTime? OptStartTime { get; set; }
+
+ [JsonPropertyName("ack_policy")]
+ public AckPolicy AckPolicy { get; set; }
+
+ [JsonPropertyName("ack_wait")]
+ public TimeSpan AckWait { get; set; }
+
+ [JsonPropertyName("max_deliver")]
+ public int MaxDeliver { get; set; }
+
+ [JsonPropertyName("backoff")]
+ public TimeSpan[]? BackOff { get; set; }
+
+ [JsonPropertyName("filter_subject")]
+ public string? FilterSubject { get; set; }
+
+ [JsonPropertyName("filter_subjects")]
+ public string[]? FilterSubjects { get; set; }
+
+ [JsonPropertyName("replay_policy")]
+ public ReplayPolicy ReplayPolicy { get; set; }
+
+ [JsonPropertyName("rate_limit_bps")]
+ public ulong RateLimit { get; set; }
+
+ [JsonPropertyName("sample_freq")]
+ public string? SampleFrequency { get; set; }
+
+ [JsonPropertyName("max_waiting")]
+ public int MaxWaiting { get; set; }
+
+ [JsonPropertyName("max_ack_pending")]
+ public int MaxAckPending { get; set; }
+
+ [JsonPropertyName("flow_control")]
+ public bool FlowControl { get; set; }
+
+ [JsonPropertyName("headers_only")]
+ public bool HeadersOnly { get; set; }
+
+ // Pull-based options
+ [JsonPropertyName("max_batch")]
+ public int MaxRequestBatch { get; set; }
+
+ [JsonPropertyName("max_expires")]
+ public TimeSpan MaxRequestExpires { get; set; }
+
+ [JsonPropertyName("max_bytes")]
+ public int MaxRequestMaxBytes { get; set; }
+
+ // Push-based consumers
+ [JsonPropertyName("deliver_subject")]
+ public string? DeliverSubject { get; set; }
+
+ [JsonPropertyName("deliver_group")]
+ public string? DeliverGroup { get; set; }
+
+ [JsonPropertyName("idle_heartbeat")]
+ public TimeSpan Heartbeat { get; set; }
+
+ [JsonPropertyName("inactive_threshold")]
+ public TimeSpan InactiveThreshold { get; set; }
+
+ [JsonPropertyName("num_replicas")]
+ public int Replicas { get; set; }
+
+ [JsonPropertyName("mem_storage")]
+ public bool MemoryStorage { get; set; }
+
+ [JsonPropertyName("direct")]
+ public bool Direct { get; set; }
+
+ [JsonPropertyName("metadata")]
+ public Dictionary? Metadata { get; set; }
+
+ [JsonPropertyName("pause_until")]
+ public DateTime? PauseUntil { get; set; }
+
+ [JsonPropertyName("priority_groups")]
+ public string[]? PriorityGroups { get; set; }
+
+ [JsonPropertyName("priority_policy")]
+ public PriorityPolicy PriorityPolicy { get; set; }
+
+ [JsonPropertyName("priority_timeout")]
+ public TimeSpan PinnedTTL { get; set; }
+}
+
+// ---------------------------------------------------------------------------
+// DiskAvailability — stub for disk_avail.go
+// ---------------------------------------------------------------------------
+
+///
+/// Utility for checking disk space availability.
+/// Adapted from server/disk_avail.go (uses statfs on Unix).
+///
+public static class DiskAvailability
+{
+ // Default large store limit: 1 TB (used as fallback when statfs fails).
+ private const long JetStreamMaxStoreDefault = 1L * 1024 * 1024 * 1024 * 1024;
+
+ ///
+ /// Returns approximately 75% of available disk space at .
+ /// Returns (1 TB) if the check fails.
+ ///
+ public static long Available(string path)
+ {
+ // TODO: session 17 — implement via DriveInfo or P/Invoke statvfs on non-Windows.
+ try
+ {
+ var drive = new DriveInfo(Path.GetPathRoot(Path.GetFullPath(path)) ?? path);
+ if (drive.IsReady)
+ {
+ // Estimate 75% of available free space, matching Go behaviour.
+ return drive.AvailableFreeSpace / 4 * 3;
+ }
+ }
+ catch
+ {
+ // Fall through to default.
+ }
+
+ return JetStreamMaxStoreDefault;
+ }
+
+ ///
+ /// Returns true if at least bytes are available at .
+ ///
+ public static bool Check(string path, long needed) => Available(path) >= needed;
+}
diff --git a/porting.db b/porting.db
index 0bb05ba..28f40dd 100644
Binary files a/porting.db and b/porting.db differ
diff --git a/reports/current.md b/reports/current.md
index ca16166..e551db1 100644
--- a/reports/current.md
+++ b/reports/current.md
@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
-Generated: 2026-02-26 20:50:51 UTC
+Generated: 2026-02-26 21:02:04 UTC
## Modules (12 total)
@@ -13,9 +13,9 @@ Generated: 2026-02-26 20:50:51 UTC
| Status | Count |
|--------|-------|
-| complete | 1601 |
-| n_a | 82 |
-| not_started | 1897 |
+| complete | 1736 |
+| n_a | 77 |
+| not_started | 1767 |
| stub | 93 |
## Unit Tests (3257 total)
@@ -36,4 +36,4 @@ Generated: 2026-02-26 20:50:51 UTC
## Overall Progress
-**2194/6942 items complete (31.6%)**
+**2324/6942 items complete (33.5%)**
diff --git a/reports/report_77403e3.md b/reports/report_77403e3.md
new file mode 100644
index 0000000..e551db1
--- /dev/null
+++ b/reports/report_77403e3.md
@@ -0,0 +1,39 @@
+# NATS .NET Porting Status Report
+
+Generated: 2026-02-26 21:02:04 UTC
+
+## Modules (12 total)
+
+| Status | Count |
+|--------|-------|
+| complete | 11 |
+| not_started | 1 |
+
+## Features (3673 total)
+
+| Status | Count |
+|--------|-------|
+| complete | 1736 |
+| n_a | 77 |
+| not_started | 1767 |
+| stub | 93 |
+
+## Unit Tests (3257 total)
+
+| Status | Count |
+|--------|-------|
+| complete | 319 |
+| n_a | 181 |
+| not_started | 2533 |
+| stub | 224 |
+
+## Library Mappings (36 total)
+
+| Status | Count |
+|--------|-------|
+| mapped | 36 |
+
+
+## Overall Progress
+
+**2324/6942 items complete (33.5%)**