feat: port session 17 — Store Interfaces & Memory Store

- StoreTypes: IStreamStore/IConsumerStore interfaces, StreamConfig/ConsumerConfig,
  all enums (StorageType, RetentionPolicy, DiscardPolicy, AckPolicy, etc.),
  StreamState, SimpleState, LostStreamData, DeleteBlocks/Range/Slice, StoreMsg
- MemStore: JetStreamMemStore with full message CRUD, state tracking, age expiry
- ConsumerMemStore: ConsumerMemStore with delivery/ack state tracking
- DiskAvailability: cross-platform disk space checker
- 135 features complete (IDs 3164-3194, 2068-2165, 827-832)
This commit is contained in:
Joseph Doherty
2026-02-26 16:02:03 -05:00
parent 77403e3d31
commit 5a2c8a3250
6 changed files with 2889 additions and 5 deletions

View File

@@ -0,0 +1,400 @@
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/memstore.go (consumerMemStore)
namespace ZB.MOM.NatsNet.Server;
/// <summary>
/// In-memory implementation of <see cref="IConsumerStore"/>.
/// Stores consumer delivery and ack state in memory only.
/// </summary>
public sealed class ConsumerMemStore : IConsumerStore
{
// -----------------------------------------------------------------------
// Fields
// -----------------------------------------------------------------------
private readonly object _mu = new();
private readonly JetStreamMemStore _ms;
private ConsumerConfig _cfg;
private ConsumerState _state = new();
private bool _closed;
// -----------------------------------------------------------------------
// Constructor
// -----------------------------------------------------------------------
/// <summary>
/// Creates a new consumer memory store backed by the given stream store.
/// </summary>
public ConsumerMemStore(JetStreamMemStore ms, ConsumerConfig cfg)
{
_ms = ms;
_cfg = cfg;
}
// -----------------------------------------------------------------------
// IConsumerStore — starting sequence
// -----------------------------------------------------------------------
/// <inheritdoc/>
public void SetStarting(ulong sseq)
{
lock (_mu)
{
_state.Delivered.Stream = sseq;
_state.AckFloor.Stream = sseq;
}
}
/// <inheritdoc/>
public void UpdateStarting(ulong sseq)
{
lock (_mu)
{
if (sseq > _state.Delivered.Stream)
{
_state.Delivered.Stream = sseq;
// For AckNone just update delivered and ackfloor at the same time.
if (_cfg.AckPolicy == AckPolicy.AckNone)
_state.AckFloor.Stream = sseq;
}
}
}
/// <inheritdoc/>
public void Reset(ulong sseq)
{
lock (_mu)
{
_state = new ConsumerState();
}
SetStarting(sseq);
}
// -----------------------------------------------------------------------
// IConsumerStore — state query
// -----------------------------------------------------------------------
/// <inheritdoc/>
public bool HasState()
{
lock (_mu)
{
return _state.Delivered.Consumer != 0 || _state.Delivered.Stream != 0;
}
}
// -----------------------------------------------------------------------
// IConsumerStore — delivery tracking
// -----------------------------------------------------------------------
/// <inheritdoc/>
public void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts)
{
lock (_mu)
{
if (dc != 1 && _cfg.AckPolicy == AckPolicy.AckNone)
throw StoreErrors.ErrNoAckPolicy;
// Replay from old leader — ignore outdated updates.
if (dseq <= _state.AckFloor.Consumer)
return;
if (_cfg.AckPolicy != AckPolicy.AckNone)
{
_state.Pending ??= new Dictionary<ulong, Pending>();
if (sseq <= _state.Delivered.Stream)
{
// Update to a previously delivered message.
if (_state.Pending.TryGetValue(sseq, out var p) && p != null)
p.Timestamp = ts;
}
else
{
_state.Pending[sseq] = new Pending { Sequence = dseq, Timestamp = ts };
}
if (dseq > _state.Delivered.Consumer)
_state.Delivered.Consumer = dseq;
if (sseq > _state.Delivered.Stream)
_state.Delivered.Stream = sseq;
if (dc > 1)
{
var maxdc = (ulong)_cfg.MaxDeliver;
if (maxdc > 0 && dc > maxdc)
_state.Pending.Remove(sseq);
_state.Redelivered ??= new Dictionary<ulong, ulong>();
if (!_state.Redelivered.TryGetValue(sseq, out var cur) || cur < dc - 1)
_state.Redelivered[sseq] = dc - 1;
}
}
else
{
// AckNone — update delivered and ackfloor together.
if (dseq > _state.Delivered.Consumer)
{
_state.Delivered.Consumer = dseq;
_state.AckFloor.Consumer = dseq;
}
if (sseq > _state.Delivered.Stream)
{
_state.Delivered.Stream = sseq;
_state.AckFloor.Stream = sseq;
}
}
}
}
/// <inheritdoc/>
public void UpdateAcks(ulong dseq, ulong sseq)
{
lock (_mu)
{
if (_cfg.AckPolicy == AckPolicy.AckNone)
throw StoreErrors.ErrNoAckPolicy;
// Ignore outdated acks.
if (dseq <= _state.AckFloor.Consumer)
return;
if (_state.Pending == null || !_state.Pending.ContainsKey(sseq))
{
_state.Redelivered?.Remove(sseq);
throw StoreErrors.ErrStoreMsgNotFound;
}
if (_cfg.AckPolicy == AckPolicy.AckAll)
{
var sgap = sseq - _state.AckFloor.Stream;
_state.AckFloor.Consumer = dseq;
_state.AckFloor.Stream = sseq;
if (sgap > (ulong)_state.Pending.Count)
{
var toRemove = new List<ulong>();
foreach (var kv in _state.Pending)
if (kv.Key <= sseq)
toRemove.Add(kv.Key);
foreach (var k in toRemove)
{
_state.Pending.Remove(k);
_state.Redelivered?.Remove(k);
}
}
else
{
for (var seq = sseq; seq > sseq - sgap && _state.Pending.Count > 0; seq--)
{
_state.Pending.Remove(seq);
_state.Redelivered?.Remove(seq);
if (seq == 0) break;
}
}
return;
}
// AckExplicit
if (_state.Pending.TryGetValue(sseq, out var pending) && pending != null)
{
_state.Pending.Remove(sseq);
if (dseq > pending.Sequence && pending.Sequence > 0)
dseq = pending.Sequence; // Use the original delivery sequence.
}
if (_state.Pending.Count == 0)
{
_state.AckFloor.Consumer = _state.Delivered.Consumer;
_state.AckFloor.Stream = _state.Delivered.Stream;
}
else if (dseq == _state.AckFloor.Consumer + 1)
{
_state.AckFloor.Consumer = dseq;
_state.AckFloor.Stream = sseq;
if (_state.Delivered.Consumer > dseq)
{
for (var ss = sseq + 1; ss <= _state.Delivered.Stream; ss++)
{
if (_state.Pending.TryGetValue(ss, out var pp) && pp != null)
{
if (pp.Sequence > 0)
{
_state.AckFloor.Consumer = pp.Sequence - 1;
_state.AckFloor.Stream = ss - 1;
}
break;
}
}
}
}
_state.Redelivered?.Remove(sseq);
}
}
// -----------------------------------------------------------------------
// IConsumerStore — config update
// -----------------------------------------------------------------------
/// <inheritdoc/>
public void UpdateConfig(ConsumerConfig cfg)
{
lock (_mu)
{
_cfg = cfg;
}
}
// -----------------------------------------------------------------------
// IConsumerStore — update state
// -----------------------------------------------------------------------
/// <inheritdoc/>
public void Update(ConsumerState state)
{
if (state.AckFloor.Consumer > state.Delivered.Consumer)
throw new InvalidOperationException("bad ack floor for consumer");
if (state.AckFloor.Stream > state.Delivered.Stream)
throw new InvalidOperationException("bad ack floor for stream");
Dictionary<ulong, Pending>? pending = null;
Dictionary<ulong, ulong>? redelivered = null;
if (state.Pending?.Count > 0)
{
pending = new Dictionary<ulong, Pending>(state.Pending.Count);
foreach (var kv in state.Pending)
{
if (kv.Key <= state.AckFloor.Stream || kv.Key > state.Delivered.Stream)
throw new InvalidOperationException($"bad pending entry, sequence [{kv.Key}] out of range");
pending[kv.Key] = new Pending { Sequence = kv.Value.Sequence, Timestamp = kv.Value.Timestamp };
}
}
if (state.Redelivered?.Count > 0)
{
redelivered = new Dictionary<ulong, ulong>(state.Redelivered);
}
lock (_mu)
{
// Ignore outdated updates.
if (state.Delivered.Consumer < _state.Delivered.Consumer ||
state.AckFloor.Stream < _state.AckFloor.Stream)
throw new InvalidOperationException("old update ignored");
_state.Delivered = new SequencePair { Consumer = state.Delivered.Consumer, Stream = state.Delivered.Stream };
_state.AckFloor = new SequencePair { Consumer = state.AckFloor.Consumer, Stream = state.AckFloor.Stream };
_state.Pending = pending;
_state.Redelivered = redelivered;
}
}
// -----------------------------------------------------------------------
// IConsumerStore — state retrieval
// -----------------------------------------------------------------------
/// <inheritdoc/>
public (ConsumerState? State, Exception? Error) State() => StateWithCopy(doCopy: true);
/// <inheritdoc/>
public (ConsumerState? State, Exception? Error) BorrowState() => StateWithCopy(doCopy: false);
private (ConsumerState? State, Exception? Error) StateWithCopy(bool doCopy)
{
lock (_mu)
{
if (_closed)
return (null, StoreErrors.ErrStoreClosed);
var state = new ConsumerState
{
Delivered = new SequencePair { Consumer = _state.Delivered.Consumer, Stream = _state.Delivered.Stream },
AckFloor = new SequencePair { Consumer = _state.AckFloor.Consumer, Stream = _state.AckFloor.Stream },
};
if (_state.Pending?.Count > 0)
{
state.Pending = doCopy ? CopyPending() : _state.Pending;
}
if (_state.Redelivered?.Count > 0)
{
state.Redelivered = doCopy ? CopyRedelivered() : _state.Redelivered;
}
return (state, null);
}
}
// -----------------------------------------------------------------------
// IConsumerStore — encoding
// -----------------------------------------------------------------------
/// <inheritdoc/>
public byte[] EncodedState()
{
lock (_mu)
{
if (_closed)
throw StoreErrors.ErrStoreClosed;
// TODO: session 17 — encode consumer state to binary
return Array.Empty<byte>();
}
}
// -----------------------------------------------------------------------
// IConsumerStore — lifecycle
// -----------------------------------------------------------------------
/// <inheritdoc/>
public StorageType Type() => StorageType.MemoryStorage;
/// <inheritdoc/>
public void Stop()
{
lock (_mu)
{
_closed = true;
}
_ms.RemoveConsumer(this);
}
/// <inheritdoc/>
public void Delete() => Stop();
/// <inheritdoc/>
public void StreamDelete() => Stop();
// -----------------------------------------------------------------------
// Private helpers
// -----------------------------------------------------------------------
private Dictionary<ulong, Pending> CopyPending()
{
var pending = new Dictionary<ulong, Pending>(_state.Pending!.Count);
foreach (var kv in _state.Pending!)
pending[kv.Key] = new Pending { Sequence = kv.Value.Sequence, Timestamp = kv.Value.Timestamp };
return pending;
}
private Dictionary<ulong, ulong> CopyRedelivered()
{
return new Dictionary<ulong, ulong>(_state.Redelivered!);
}
}