// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Adapted from server/memstore.go (consumerMemStore) namespace ZB.MOM.NatsNet.Server; /// /// In-memory implementation of . /// Stores consumer delivery and ack state in memory only. /// public sealed class ConsumerMemStore : IConsumerStore { // ----------------------------------------------------------------------- // Fields // ----------------------------------------------------------------------- private readonly object _mu = new(); private readonly JetStreamMemStore _ms; private ConsumerConfig _cfg; private ConsumerState _state = new(); private bool _closed; // ----------------------------------------------------------------------- // Constructor // ----------------------------------------------------------------------- /// /// Creates a new consumer memory store backed by the given stream store. /// public ConsumerMemStore(JetStreamMemStore ms, ConsumerConfig cfg) { _ms = ms; _cfg = cfg; } // ----------------------------------------------------------------------- // IConsumerStore — starting sequence // ----------------------------------------------------------------------- /// public void SetStarting(ulong sseq) { lock (_mu) { _state.Delivered.Stream = sseq; _state.AckFloor.Stream = sseq; } } /// public void UpdateStarting(ulong sseq) { lock (_mu) { if (sseq > _state.Delivered.Stream) { _state.Delivered.Stream = sseq; // For AckNone just update delivered and ackfloor at the same time. if (_cfg.AckPolicy == AckPolicy.AckNone) _state.AckFloor.Stream = sseq; } } } /// public void Reset(ulong sseq) { lock (_mu) { _state = new ConsumerState(); } SetStarting(sseq); } // ----------------------------------------------------------------------- // IConsumerStore — state query // ----------------------------------------------------------------------- /// public bool HasState() { lock (_mu) { return _state.Delivered.Consumer != 0 || _state.Delivered.Stream != 0; } } // ----------------------------------------------------------------------- // IConsumerStore — delivery tracking // ----------------------------------------------------------------------- /// public void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts) { lock (_mu) { if (dc != 1 && _cfg.AckPolicy == AckPolicy.AckNone) throw StoreErrors.ErrNoAckPolicy; // Replay from old leader — ignore outdated updates. if (dseq <= _state.AckFloor.Consumer) return; if (_cfg.AckPolicy != AckPolicy.AckNone) { _state.Pending ??= new Dictionary(); if (sseq <= _state.Delivered.Stream) { // Update to a previously delivered message. if (_state.Pending.TryGetValue(sseq, out var p) && p != null) p.Timestamp = ts; } else { _state.Pending[sseq] = new Pending { Sequence = dseq, Timestamp = ts }; } if (dseq > _state.Delivered.Consumer) _state.Delivered.Consumer = dseq; if (sseq > _state.Delivered.Stream) _state.Delivered.Stream = sseq; if (dc > 1) { var maxdc = (ulong)_cfg.MaxDeliver; if (maxdc > 0 && dc > maxdc) _state.Pending.Remove(sseq); _state.Redelivered ??= new Dictionary(); if (!_state.Redelivered.TryGetValue(sseq, out var cur) || cur < dc - 1) _state.Redelivered[sseq] = dc - 1; } } else { // AckNone — update delivered and ackfloor together. if (dseq > _state.Delivered.Consumer) { _state.Delivered.Consumer = dseq; _state.AckFloor.Consumer = dseq; } if (sseq > _state.Delivered.Stream) { _state.Delivered.Stream = sseq; _state.AckFloor.Stream = sseq; } } } } /// public void UpdateAcks(ulong dseq, ulong sseq) { lock (_mu) { if (_cfg.AckPolicy == AckPolicy.AckNone) throw StoreErrors.ErrNoAckPolicy; // Ignore outdated acks. if (dseq <= _state.AckFloor.Consumer) return; if (_state.Pending == null || !_state.Pending.ContainsKey(sseq)) { _state.Redelivered?.Remove(sseq); throw StoreErrors.ErrStoreMsgNotFound; } if (_cfg.AckPolicy == AckPolicy.AckAll) { var sgap = sseq - _state.AckFloor.Stream; _state.AckFloor.Consumer = dseq; _state.AckFloor.Stream = sseq; if (sgap > (ulong)_state.Pending.Count) { var toRemove = new List(); foreach (var kv in _state.Pending) if (kv.Key <= sseq) toRemove.Add(kv.Key); foreach (var k in toRemove) { _state.Pending.Remove(k); _state.Redelivered?.Remove(k); } } else { for (var seq = sseq; seq > sseq - sgap && _state.Pending.Count > 0; seq--) { _state.Pending.Remove(seq); _state.Redelivered?.Remove(seq); if (seq == 0) break; } } return; } // AckExplicit if (_state.Pending.TryGetValue(sseq, out var pending) && pending != null) { _state.Pending.Remove(sseq); if (dseq > pending.Sequence && pending.Sequence > 0) dseq = pending.Sequence; // Use the original delivery sequence. } if (_state.Pending.Count == 0) { _state.AckFloor.Consumer = _state.Delivered.Consumer; _state.AckFloor.Stream = _state.Delivered.Stream; } else if (dseq == _state.AckFloor.Consumer + 1) { _state.AckFloor.Consumer = dseq; _state.AckFloor.Stream = sseq; if (_state.Delivered.Consumer > dseq) { for (var ss = sseq + 1; ss <= _state.Delivered.Stream; ss++) { if (_state.Pending.TryGetValue(ss, out var pp) && pp != null) { if (pp.Sequence > 0) { _state.AckFloor.Consumer = pp.Sequence - 1; _state.AckFloor.Stream = ss - 1; } break; } } } } _state.Redelivered?.Remove(sseq); } } // ----------------------------------------------------------------------- // IConsumerStore — config update // ----------------------------------------------------------------------- /// public void UpdateConfig(ConsumerConfig cfg) { lock (_mu) { _cfg = cfg; } } // ----------------------------------------------------------------------- // IConsumerStore — update state // ----------------------------------------------------------------------- /// public void Update(ConsumerState state) { if (state.AckFloor.Consumer > state.Delivered.Consumer) throw new InvalidOperationException("bad ack floor for consumer"); if (state.AckFloor.Stream > state.Delivered.Stream) throw new InvalidOperationException("bad ack floor for stream"); Dictionary? pending = null; Dictionary? redelivered = null; if (state.Pending?.Count > 0) { pending = new Dictionary(state.Pending.Count); foreach (var kv in state.Pending) { if (kv.Key <= state.AckFloor.Stream || kv.Key > state.Delivered.Stream) throw new InvalidOperationException($"bad pending entry, sequence [{kv.Key}] out of range"); pending[kv.Key] = new Pending { Sequence = kv.Value.Sequence, Timestamp = kv.Value.Timestamp }; } } if (state.Redelivered?.Count > 0) { redelivered = new Dictionary(state.Redelivered); } lock (_mu) { // Ignore outdated updates. if (state.Delivered.Consumer < _state.Delivered.Consumer || state.AckFloor.Stream < _state.AckFloor.Stream) throw new InvalidOperationException("old update ignored"); _state.Delivered = new SequencePair { Consumer = state.Delivered.Consumer, Stream = state.Delivered.Stream }; _state.AckFloor = new SequencePair { Consumer = state.AckFloor.Consumer, Stream = state.AckFloor.Stream }; _state.Pending = pending; _state.Redelivered = redelivered; } } // ----------------------------------------------------------------------- // IConsumerStore — state retrieval // ----------------------------------------------------------------------- /// public (ConsumerState? State, Exception? Error) State() => StateWithCopy(doCopy: true); /// public (ConsumerState? State, Exception? Error) BorrowState() => StateWithCopy(doCopy: false); private (ConsumerState? State, Exception? Error) StateWithCopy(bool doCopy) { lock (_mu) { if (_closed) return (null, StoreErrors.ErrStoreClosed); var state = new ConsumerState { Delivered = new SequencePair { Consumer = _state.Delivered.Consumer, Stream = _state.Delivered.Stream }, AckFloor = new SequencePair { Consumer = _state.AckFloor.Consumer, Stream = _state.AckFloor.Stream }, }; if (_state.Pending?.Count > 0) { state.Pending = doCopy ? CopyPending() : _state.Pending; } if (_state.Redelivered?.Count > 0) { state.Redelivered = doCopy ? CopyRedelivered() : _state.Redelivered; } return (state, null); } } // ----------------------------------------------------------------------- // IConsumerStore — encoding // ----------------------------------------------------------------------- /// public byte[] EncodedState() { lock (_mu) { if (_closed) throw StoreErrors.ErrStoreClosed; // Session 17 target: encode consumer state to binary form. return Array.Empty(); } } // ----------------------------------------------------------------------- // IConsumerStore — lifecycle // ----------------------------------------------------------------------- /// public StorageType Type() => StorageType.MemoryStorage; /// public void Stop() { lock (_mu) { _closed = true; } _ms.RemoveConsumer(this); } /// public void Delete() => Stop(); /// public void StreamDelete() => Stop(); // ----------------------------------------------------------------------- // Private helpers // ----------------------------------------------------------------------- private Dictionary CopyPending() { var pending = new Dictionary(_state.Pending!.Count); foreach (var kv in _state.Pending!) pending[kv.Key] = new Pending { Sequence = kv.Value.Sequence, Timestamp = kv.Value.Timestamp }; return pending; } private Dictionary CopyRedelivered() { return new Dictionary(_state.Redelivered!); } }