From 5a2c8a32509135fed675811ab60c6b2703d35522 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 26 Feb 2026 16:02:03 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20port=20session=2017=20=E2=80=94=20Store?= =?UTF-8?q?=20Interfaces=20&=20Memory=20Store?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - StoreTypes: IStreamStore/IConsumerStore interfaces, StreamConfig/ConsumerConfig, all enums (StorageType, RetentionPolicy, DiscardPolicy, AckPolicy, etc.), StreamState, SimpleState, LostStreamData, DeleteBlocks/Range/Slice, StoreMsg - MemStore: JetStreamMemStore with full message CRUD, state tracking, age expiry - ConsumerMemStore: ConsumerMemStore with delivery/ack state tracking - DiskAvailability: cross-platform disk space checker - 135 features complete (IDs 3164-3194, 2068-2165, 827-832) --- .../JetStream/ConsumerMemStore.cs | 400 +++++ .../JetStream/MemStore.cs | 1445 +++++++++++++++++ .../JetStream/StoreTypes.cs | 1000 ++++++++++++ porting.db | Bin 2473984 -> 2473984 bytes reports/current.md | 10 +- reports/report_77403e3.md | 39 + 6 files changed, 2889 insertions(+), 5 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs create mode 100644 reports/report_77403e3.md diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs new file mode 100644 index 0000000..ec0b603 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/ConsumerMemStore.cs @@ -0,0 +1,400 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/memstore.go (consumerMemStore) + +namespace ZB.MOM.NatsNet.Server; + +/// +/// In-memory implementation of . +/// Stores consumer delivery and ack state in memory only. +/// +public sealed class ConsumerMemStore : IConsumerStore +{ + // ----------------------------------------------------------------------- + // Fields + // ----------------------------------------------------------------------- + + private readonly object _mu = new(); + private readonly JetStreamMemStore _ms; + private ConsumerConfig _cfg; + private ConsumerState _state = new(); + private bool _closed; + + // ----------------------------------------------------------------------- + // Constructor + // ----------------------------------------------------------------------- + + /// + /// Creates a new consumer memory store backed by the given stream store. + /// + public ConsumerMemStore(JetStreamMemStore ms, ConsumerConfig cfg) + { + _ms = ms; + _cfg = cfg; + } + + // ----------------------------------------------------------------------- + // IConsumerStore — starting sequence + // ----------------------------------------------------------------------- + + /// + public void SetStarting(ulong sseq) + { + lock (_mu) + { + _state.Delivered.Stream = sseq; + _state.AckFloor.Stream = sseq; + } + } + + /// + public void UpdateStarting(ulong sseq) + { + lock (_mu) + { + if (sseq > _state.Delivered.Stream) + { + _state.Delivered.Stream = sseq; + // For AckNone just update delivered and ackfloor at the same time. + if (_cfg.AckPolicy == AckPolicy.AckNone) + _state.AckFloor.Stream = sseq; + } + } + } + + /// + public void Reset(ulong sseq) + { + lock (_mu) + { + _state = new ConsumerState(); + } + SetStarting(sseq); + } + + // ----------------------------------------------------------------------- + // IConsumerStore — state query + // ----------------------------------------------------------------------- + + /// + public bool HasState() + { + lock (_mu) + { + return _state.Delivered.Consumer != 0 || _state.Delivered.Stream != 0; + } + } + + // ----------------------------------------------------------------------- + // IConsumerStore — delivery tracking + // ----------------------------------------------------------------------- + + /// + public void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts) + { + lock (_mu) + { + if (dc != 1 && _cfg.AckPolicy == AckPolicy.AckNone) + throw StoreErrors.ErrNoAckPolicy; + + // Replay from old leader — ignore outdated updates. + if (dseq <= _state.AckFloor.Consumer) + return; + + if (_cfg.AckPolicy != AckPolicy.AckNone) + { + _state.Pending ??= new Dictionary(); + + if (sseq <= _state.Delivered.Stream) + { + // Update to a previously delivered message. + if (_state.Pending.TryGetValue(sseq, out var p) && p != null) + p.Timestamp = ts; + } + else + { + _state.Pending[sseq] = new Pending { Sequence = dseq, Timestamp = ts }; + } + + if (dseq > _state.Delivered.Consumer) + _state.Delivered.Consumer = dseq; + if (sseq > _state.Delivered.Stream) + _state.Delivered.Stream = sseq; + + if (dc > 1) + { + var maxdc = (ulong)_cfg.MaxDeliver; + if (maxdc > 0 && dc > maxdc) + _state.Pending.Remove(sseq); + + _state.Redelivered ??= new Dictionary(); + if (!_state.Redelivered.TryGetValue(sseq, out var cur) || cur < dc - 1) + _state.Redelivered[sseq] = dc - 1; + } + } + else + { + // AckNone — update delivered and ackfloor together. + if (dseq > _state.Delivered.Consumer) + { + _state.Delivered.Consumer = dseq; + _state.AckFloor.Consumer = dseq; + } + if (sseq > _state.Delivered.Stream) + { + _state.Delivered.Stream = sseq; + _state.AckFloor.Stream = sseq; + } + } + } + } + + /// + public void UpdateAcks(ulong dseq, ulong sseq) + { + lock (_mu) + { + if (_cfg.AckPolicy == AckPolicy.AckNone) + throw StoreErrors.ErrNoAckPolicy; + + // Ignore outdated acks. + if (dseq <= _state.AckFloor.Consumer) + return; + + if (_state.Pending == null || !_state.Pending.ContainsKey(sseq)) + { + _state.Redelivered?.Remove(sseq); + throw StoreErrors.ErrStoreMsgNotFound; + } + + if (_cfg.AckPolicy == AckPolicy.AckAll) + { + var sgap = sseq - _state.AckFloor.Stream; + _state.AckFloor.Consumer = dseq; + _state.AckFloor.Stream = sseq; + + if (sgap > (ulong)_state.Pending.Count) + { + var toRemove = new List(); + foreach (var kv in _state.Pending) + if (kv.Key <= sseq) + toRemove.Add(kv.Key); + foreach (var k in toRemove) + { + _state.Pending.Remove(k); + _state.Redelivered?.Remove(k); + } + } + else + { + for (var seq = sseq; seq > sseq - sgap && _state.Pending.Count > 0; seq--) + { + _state.Pending.Remove(seq); + _state.Redelivered?.Remove(seq); + if (seq == 0) break; + } + } + return; + } + + // AckExplicit + if (_state.Pending.TryGetValue(sseq, out var pending) && pending != null) + { + _state.Pending.Remove(sseq); + if (dseq > pending.Sequence && pending.Sequence > 0) + dseq = pending.Sequence; // Use the original delivery sequence. + } + + if (_state.Pending.Count == 0) + { + _state.AckFloor.Consumer = _state.Delivered.Consumer; + _state.AckFloor.Stream = _state.Delivered.Stream; + } + else if (dseq == _state.AckFloor.Consumer + 1) + { + _state.AckFloor.Consumer = dseq; + _state.AckFloor.Stream = sseq; + + if (_state.Delivered.Consumer > dseq) + { + for (var ss = sseq + 1; ss <= _state.Delivered.Stream; ss++) + { + if (_state.Pending.TryGetValue(ss, out var pp) && pp != null) + { + if (pp.Sequence > 0) + { + _state.AckFloor.Consumer = pp.Sequence - 1; + _state.AckFloor.Stream = ss - 1; + } + break; + } + } + } + } + + _state.Redelivered?.Remove(sseq); + } + } + + // ----------------------------------------------------------------------- + // IConsumerStore — config update + // ----------------------------------------------------------------------- + + /// + public void UpdateConfig(ConsumerConfig cfg) + { + lock (_mu) + { + _cfg = cfg; + } + } + + // ----------------------------------------------------------------------- + // IConsumerStore — update state + // ----------------------------------------------------------------------- + + /// + public void Update(ConsumerState state) + { + if (state.AckFloor.Consumer > state.Delivered.Consumer) + throw new InvalidOperationException("bad ack floor for consumer"); + if (state.AckFloor.Stream > state.Delivered.Stream) + throw new InvalidOperationException("bad ack floor for stream"); + + Dictionary? pending = null; + Dictionary? redelivered = null; + + if (state.Pending?.Count > 0) + { + pending = new Dictionary(state.Pending.Count); + foreach (var kv in state.Pending) + { + if (kv.Key <= state.AckFloor.Stream || kv.Key > state.Delivered.Stream) + throw new InvalidOperationException($"bad pending entry, sequence [{kv.Key}] out of range"); + pending[kv.Key] = new Pending { Sequence = kv.Value.Sequence, Timestamp = kv.Value.Timestamp }; + } + } + + if (state.Redelivered?.Count > 0) + { + redelivered = new Dictionary(state.Redelivered); + } + + lock (_mu) + { + // Ignore outdated updates. + if (state.Delivered.Consumer < _state.Delivered.Consumer || + state.AckFloor.Stream < _state.AckFloor.Stream) + throw new InvalidOperationException("old update ignored"); + + _state.Delivered = new SequencePair { Consumer = state.Delivered.Consumer, Stream = state.Delivered.Stream }; + _state.AckFloor = new SequencePair { Consumer = state.AckFloor.Consumer, Stream = state.AckFloor.Stream }; + _state.Pending = pending; + _state.Redelivered = redelivered; + } + } + + // ----------------------------------------------------------------------- + // IConsumerStore — state retrieval + // ----------------------------------------------------------------------- + + /// + public (ConsumerState? State, Exception? Error) State() => StateWithCopy(doCopy: true); + + /// + public (ConsumerState? State, Exception? Error) BorrowState() => StateWithCopy(doCopy: false); + + private (ConsumerState? State, Exception? Error) StateWithCopy(bool doCopy) + { + lock (_mu) + { + if (_closed) + return (null, StoreErrors.ErrStoreClosed); + + var state = new ConsumerState + { + Delivered = new SequencePair { Consumer = _state.Delivered.Consumer, Stream = _state.Delivered.Stream }, + AckFloor = new SequencePair { Consumer = _state.AckFloor.Consumer, Stream = _state.AckFloor.Stream }, + }; + + if (_state.Pending?.Count > 0) + { + state.Pending = doCopy ? CopyPending() : _state.Pending; + } + + if (_state.Redelivered?.Count > 0) + { + state.Redelivered = doCopy ? CopyRedelivered() : _state.Redelivered; + } + + return (state, null); + } + } + + // ----------------------------------------------------------------------- + // IConsumerStore — encoding + // ----------------------------------------------------------------------- + + /// + public byte[] EncodedState() + { + lock (_mu) + { + if (_closed) + throw StoreErrors.ErrStoreClosed; + // TODO: session 17 — encode consumer state to binary + return Array.Empty(); + } + } + + // ----------------------------------------------------------------------- + // IConsumerStore — lifecycle + // ----------------------------------------------------------------------- + + /// + public StorageType Type() => StorageType.MemoryStorage; + + /// + public void Stop() + { + lock (_mu) + { + _closed = true; + } + _ms.RemoveConsumer(this); + } + + /// + public void Delete() => Stop(); + + /// + public void StreamDelete() => Stop(); + + // ----------------------------------------------------------------------- + // Private helpers + // ----------------------------------------------------------------------- + + private Dictionary CopyPending() + { + var pending = new Dictionary(_state.Pending!.Count); + foreach (var kv in _state.Pending!) + pending[kv.Key] = new Pending { Sequence = kv.Value.Sequence, Timestamp = kv.Value.Timestamp }; + return pending; + } + + private Dictionary CopyRedelivered() + { + return new Dictionary(_state.Redelivered!); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs new file mode 100644 index 0000000..1ed7238 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs @@ -0,0 +1,1445 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/memstore.go + +using System.Text; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +/// +/// In-memory implementation of . +/// Stores all messages in a Dictionary keyed by sequence number. +/// Not production-complete: complex methods are stubbed for later sessions. +/// +public sealed class JetStreamMemStore : IStreamStore +{ + // ----------------------------------------------------------------------- + // Fields + // ----------------------------------------------------------------------- + + private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); + private StreamConfig _cfg; + private StreamState _state = new(); + + // Primary message store: seq -> StoreMsg + private Dictionary? _msgs; + + // Per-subject state: subject -> SimpleState + private readonly SubjectTree _fss; + + // Deleted sequence set + private SequenceSet _dmap = new(); + + // Max messages per subject (cfg.MaxMsgsPer) + private long _maxp; + + // Registered callbacks + private StorageUpdateHandler? _scb; + private StorageRemoveMsgHandler? _rmcb; + private ProcessJetStreamMsgHandler? _pmsgcb; + + // Age-check timer for MaxAge expiry + private Timer? _ageChk; + private long _ageChkTime; + + // Consumer count + private int _consumers; + + // ----------------------------------------------------------------------- + // Constructor + // ----------------------------------------------------------------------- + + /// + /// Creates a new in-memory store from the supplied . + /// + /// Thrown when cfg is null or Storage != MemoryStorage. + public JetStreamMemStore(StreamConfig cfg) + { + if (cfg == null) + throw new ArgumentException("config required"); + if (cfg.Storage != StorageType.MemoryStorage) + throw new ArgumentException("JetStreamMemStore requires MemoryStorage type in config"); + + _cfg = cfg.Clone(); + _msgs = new Dictionary(); + _fss = new SubjectTree(); + _maxp = cfg.MaxMsgsPer; + + if (cfg.FirstSeq > 0) + Purge(); + } + + // ----------------------------------------------------------------------- + // IStreamStore — store / load + // ----------------------------------------------------------------------- + + /// + public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[]? msg, long ttl) + { + _mu.EnterWriteLock(); + try + { + var seq = _state.LastSeq + 1; + var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + try + { + StoreRawMsgLocked(subject, hdr, msg, seq, ts, ttl, discardNewCheck: true); + _scb?.Invoke(1, (long)MsgSize(subject, hdr, msg), seq, subject); + return (seq, ts); + } + catch + { + return (0, 0); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + public void StoreRawMsg(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck) + { + _mu.EnterWriteLock(); + try + { + StoreRawMsgLocked(subject, hdr, msg, seq, ts, ttl, discardNewCheck); + _scb?.Invoke(1, (long)MsgSize(subject, hdr, msg), seq, subject); + } + finally + { + _mu.ExitWriteLock(); + } + } + + // Lock must be held. + private void StoreRawMsgLocked(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck) + { + if (_msgs == null) + throw StoreErrors.ErrStoreClosed; + + hdr ??= Array.Empty(); + msg ??= Array.Empty(); + + // Discard-new enforcement + if (discardNewCheck && _cfg.Discard == DiscardPolicy.DiscardNew) + { + if (_cfg.MaxMsgs > 0 && _state.Msgs >= (ulong)_cfg.MaxMsgs) + throw StoreErrors.ErrMaxMsgs; + if (_cfg.MaxBytes > 0 && _state.Bytes + MsgSize(subject, hdr, msg) > (ulong)_cfg.MaxBytes) + throw StoreErrors.ErrMaxBytes; + } + + if (seq != _state.LastSeq + 1) + { + if (seq > 0) + throw StoreErrors.ErrSequenceMismatch; + seq = _state.LastSeq + 1; + } + + var now = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime; + + if (_state.Msgs == 0) + { + _state.FirstSeq = seq; + _state.FirstTime = now; + } + + // Build combined buffer + var buf = new byte[hdr.Length + msg.Length]; + if (hdr.Length > 0) + Buffer.BlockCopy(hdr, 0, buf, 0, hdr.Length); + if (msg.Length > 0) + Buffer.BlockCopy(msg, 0, buf, hdr.Length, msg.Length); + + var sm = new StoreMsg + { + Subject = subject, + Hdr = hdr.Length > 0 ? buf[..hdr.Length] : Array.Empty(), + Msg = buf[hdr.Length..], + Buf = buf, + Seq = seq, + Ts = ts, + }; + + _msgs[seq] = sm; + _state.Msgs++; + _state.Bytes += MsgSize(subject, hdr, msg); + _state.LastSeq = seq; + _state.LastTime = now; + + // Per-subject tracking + if (!string.IsNullOrEmpty(subject)) + { + var subjectBytes = Encoding.UTF8.GetBytes(subject); + var (ss, found) = _fss.Find(subjectBytes); + if (found && ss != null) + { + ss.Msgs++; + ss.Last = seq; + ss.LastNeedsUpdate = false; + + // Enforce per-subject limit + if (_maxp > 0 && ss.Msgs > (ulong)_maxp) + EnforcePerSubjectLimit(subject, ss); + } + else + { + _fss.Insert(subjectBytes, new SimpleState { Msgs = 1, First = seq, Last = seq }); + } + } + + // Overall limits + EnforceMsgLimit(); + EnforceBytesLimit(); + + // Age check + if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero) + StartAgeChk(); + } + + /// + public StoreMsg? LoadMsg(ulong seq, StoreMsg? sm) + { + _mu.EnterReadLock(); + try + { + return LoadMsgLocked(seq, sm); + } + finally + { + _mu.ExitReadLock(); + } + } + + private StoreMsg? LoadMsgLocked(ulong seq, StoreMsg? sm) + { + if (_msgs == null) + throw StoreErrors.ErrStoreClosed; + + if (!_msgs.TryGetValue(seq, out var stored) || stored == null) + { + if (seq <= _state.LastSeq) + throw StoreErrors.ErrStoreMsgNotFound; + throw StoreErrors.ErrStoreEOF; + } + + sm ??= new StoreMsg(); + sm.CopyFrom(stored); + return sm; + } + + /// + public (StoreMsg? Sm, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? smp) + { + _mu.EnterWriteLock(); + try + { + return LoadNextMsgLocked(filter, wc, start, smp); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private (StoreMsg? Sm, ulong Skip) LoadNextMsgLocked(string filter, bool wc, ulong start, StoreMsg? smp) + { + if (_msgs == null || _state.Msgs == 0) + return (null, _state.LastSeq); + + if (start < _state.FirstSeq) + start = _state.FirstSeq; + if (start > _state.LastSeq) + return (null, _state.LastSeq); + + var isAll = string.IsNullOrEmpty(filter) || filter == ">"; + + for (var nseq = start; nseq <= _state.LastSeq; nseq++) + { + if (!_msgs.TryGetValue(nseq, out var sm) || sm == null) + continue; + if (isAll || (wc ? MatchLiteral(sm.Subject, filter) : sm.Subject == filter)) + { + smp ??= new StoreMsg(); + smp.CopyFrom(sm); + return (smp, nseq); + } + } + return (null, _state.LastSeq); + } + + /// + public (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp) + { + // TODO: session 17 — implement gsl.SimpleSublist equivalent + _mu.EnterReadLock(); + try + { + if (_msgs == null || _state.Msgs == 0) + return (null, _state.LastSeq); + + if (start < _state.FirstSeq) + start = _state.FirstSeq; + if (start > _state.LastSeq) + return (null, _state.LastSeq); + + for (var nseq = start; nseq <= _state.LastSeq; nseq++) + { + if (_msgs.TryGetValue(nseq, out var sm) && sm != null) + { + smp ??= new StoreMsg(); + smp.CopyFrom(sm); + return (smp, nseq); + } + } + return (null, _state.LastSeq); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm) + { + _mu.EnterWriteLock(); + try + { + return LoadLastLocked(subject, sm); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private StoreMsg? LoadLastLocked(string subject, StoreMsg? sm) + { + if (_msgs == null) + throw StoreErrors.ErrStoreClosed; + + StoreMsg? stored = null; + if (string.IsNullOrEmpty(subject) || subject == ">") + { + _msgs.TryGetValue(_state.LastSeq, out stored); + } + else + { + var (ss, found) = _fss.Find(Encoding.UTF8.GetBytes(subject)); + if (found && ss != null && ss.Msgs > 0) + _msgs.TryGetValue(ss.Last, out stored); + } + + if (stored == null) + throw StoreErrors.ErrStoreMsgNotFound; + + sm ??= new StoreMsg(); + sm.CopyFrom(stored); + return sm; + } + + /// + public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp) + { + _mu.EnterReadLock(); + try + { + if (_msgs == null) + return (null, StoreErrors.ErrStoreClosed); + if (_state.Msgs == 0 || start < _state.FirstSeq) + return (null, StoreErrors.ErrStoreEOF); + if (start > _state.LastSeq) + start = _state.LastSeq; + + for (var seq = start; seq >= _state.FirstSeq; seq--) + { + if (_msgs.TryGetValue(seq, out var sm) && sm != null) + { + smp ??= new StoreMsg(); + smp.CopyFrom(sm); + return (smp, null); + } + if (seq == 0) break; + } + return (null, StoreErrors.ErrStoreEOF); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + public (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp) + { + // TODO: session 17 — implement gsl.SimpleSublist equivalent + _mu.EnterReadLock(); + try + { + if (_msgs == null || _state.Msgs == 0 || start < _state.FirstSeq) + return (null, _state.FirstSeq, StoreErrors.ErrStoreEOF); + + if (start > _state.LastSeq) + start = _state.LastSeq; + + for (var nseq = start; nseq >= _state.FirstSeq; nseq--) + { + if (_msgs.TryGetValue(nseq, out var sm) && sm != null) + { + smp ??= new StoreMsg(); + smp.CopyFrom(sm); + return (smp, nseq, null); + } + if (nseq == 0) break; + } + return (null, _state.LastSeq, StoreErrors.ErrStoreEOF); + } + finally + { + _mu.ExitReadLock(); + } + } + + // ----------------------------------------------------------------------- + // IStreamStore — skip + // ----------------------------------------------------------------------- + + /// + public (ulong Seq, Exception? Error) SkipMsg(ulong seq) + { + _mu.EnterWriteLock(); + try + { + if (seq != _state.LastSeq + 1) + { + if (seq > 0) + return (0, StoreErrors.ErrSequenceMismatch); + seq = _state.LastSeq + 1; + } + + _state.LastSeq = seq; + _state.LastTime = DateTime.UtcNow; + + if (_state.Msgs == 0) + { + _state.FirstSeq = seq + 1; + _state.FirstTime = default; + } + else + { + _dmap.Insert(seq); + } + return (seq, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + public void SkipMsgs(ulong seq, ulong num) + { + _mu.EnterWriteLock(); + try + { + if (seq != _state.LastSeq + 1) + { + if (seq > 0) + throw StoreErrors.ErrSequenceMismatch; + seq = _state.LastSeq + 1; + } + var lseq = seq + num - 1; + _state.LastSeq = lseq; + _state.LastTime = DateTime.UtcNow; + + if (_state.Msgs == 0) + { + _state.FirstSeq = lseq + 1; + _state.FirstTime = default; + } + else + { + for (; seq <= lseq; seq++) + _dmap.Insert(seq); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + // ----------------------------------------------------------------------- + // IStreamStore — remove + // ----------------------------------------------------------------------- + + /// + public (bool Removed, Exception? Error) RemoveMsg(ulong seq) + { + _mu.EnterWriteLock(); + try + { + return (RemoveMsgLocked(seq, secure: false), null); + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + } + + /// + public (bool Removed, Exception? Error) EraseMsg(ulong seq) + { + _mu.EnterWriteLock(); + try + { + return (RemoveMsgLocked(seq, secure: true), null); + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + } + + // Lock must be held. + private bool RemoveMsgLocked(ulong seq, bool secure) + { + if (_msgs == null || !_msgs.TryGetValue(seq, out var sm) || sm == null) + return false; + + var size = MsgSize(sm.Subject, sm.Hdr, sm.Msg); + + if (_state.Msgs > 0) + { + _state.Msgs--; + if (size > _state.Bytes) + size = _state.Bytes; + _state.Bytes -= size; + } + + _dmap.Insert(seq); + UpdateFirstSeq(seq); + RemoveSeqPerSubject(sm.Subject, seq); + + if (secure) + { + if (sm.Hdr.Length > 0) + Array.Clear(sm.Hdr); + if (sm.Msg.Length > 0) + Array.Clear(sm.Msg); + sm.Seq = 0; + sm.Ts = 0; + } + + _msgs.Remove(seq); + + // Invoke update callback — we temporarily release/reacquire write lock + var cb = _scb; + if (cb != null) + { + var subj = sm.Subject; + var delta = size; + _mu.ExitWriteLock(); + try { cb(-1, -(long)delta, seq, subj); } + finally { _mu.EnterWriteLock(); } + } + + return true; + } + + // ----------------------------------------------------------------------- + // IStreamStore — purge / compact / truncate + // ----------------------------------------------------------------------- + + /// + public (ulong Purged, Exception? Error) Purge() + { + _mu.EnterWriteLock(); + ulong purged; + long bytes; + ulong fseq; + StorageUpdateHandler? cb; + try + { + purged = (ulong)(_msgs?.Count ?? 0); + bytes = (long)_state.Bytes; + fseq = _state.LastSeq + 1; + _state.FirstSeq = fseq; + _state.LastSeq = fseq - 1; + _state.FirstTime = default; + _state.Bytes = 0; + _state.Msgs = 0; + _msgs = new Dictionary(); + _dmap = new SequenceSet(); + cb = _scb; + } + finally + { + _mu.ExitWriteLock(); + } + + cb?.Invoke(-(long)purged, -bytes, 0, string.Empty); + return (purged, null); + } + + /// + public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep) + { + // TODO: session 17 — full subject-filtered purge + if (string.IsNullOrEmpty(subject) || subject == ">") + { + if (keep == 0 && seq == 0) + return Purge(); + } + return (0, null); + } + + /// + public (ulong Purged, Exception? Error) Compact(ulong seq) + { + if (seq == 0) + return Purge(); + + ulong purged = 0; + ulong bytes = 0; + StorageUpdateHandler? cb = null; + + _mu.EnterWriteLock(); + try + { + if (_msgs == null || _state.FirstSeq > seq) + return (0, null); + + cb = _scb; + if (seq <= _state.LastSeq) + { + var fseq = _state.FirstSeq; + for (var s = seq; s <= _state.LastSeq; s++) + { + if (_msgs.TryGetValue(s, out var sm2) && sm2 != null) + { + _state.FirstSeq = s; + _state.FirstTime = DateTimeOffset.FromUnixTimeMilliseconds(sm2.Ts / 1_000_000L).UtcDateTime; + break; + } + } + for (var s = seq - 1; s >= fseq; s--) + { + if (_msgs.TryGetValue(s, out var sm2) && sm2 != null) + { + bytes += MsgSize(sm2.Subject, sm2.Hdr, sm2.Msg); + purged++; + RemoveSeqPerSubject(sm2.Subject, s); + _msgs.Remove(s); + } + else if (!_dmap.IsEmpty) + { + _dmap.Delete(s); + } + if (s == 0) break; + } + if (purged > _state.Msgs) purged = _state.Msgs; + _state.Msgs -= purged; + if (bytes > _state.Bytes) bytes = _state.Bytes; + _state.Bytes -= bytes; + } + else + { + purged = (ulong)_msgs.Count; + bytes = _state.Bytes; + _state.Bytes = 0; + _state.Msgs = 0; + _state.FirstSeq = seq; + _state.FirstTime = default; + _state.LastSeq = seq - 1; + _msgs = new Dictionary(); + _dmap = new SequenceSet(); + } + } + finally + { + _mu.ExitWriteLock(); + } + + cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty); + return (purged, null); + } + + /// + public void Truncate(ulong seq) + { + StorageUpdateHandler? cb; + ulong purged = 0; + ulong bytes = 0; + + _mu.EnterWriteLock(); + try + { + if (_msgs == null) + return; + + cb = _scb; + + if (seq == 0) + { + // Full reset + purged = (ulong)_msgs.Count; + bytes = _state.Bytes; + _state = new StreamState { LastTime = DateTime.UtcNow }; + _msgs = new Dictionary(); + _dmap = new SequenceSet(); + } + else + { + DateTime lastTime = _state.LastTime; + if (_msgs.TryGetValue(seq, out var lsm) && lsm != null) + lastTime = DateTimeOffset.FromUnixTimeMilliseconds(lsm.Ts / 1_000_000L).UtcDateTime; + + for (var i = _state.LastSeq; i > seq; i--) + { + if (_msgs.TryGetValue(i, out var sm) && sm != null) + { + purged++; + bytes += MsgSize(sm.Subject, sm.Hdr, sm.Msg); + RemoveSeqPerSubject(sm.Subject, i); + _msgs.Remove(i); + } + else if (!_dmap.IsEmpty) + { + _dmap.Delete(i); + } + } + + _state.LastSeq = seq; + _state.LastTime = lastTime; + if (purged > _state.Msgs) purged = _state.Msgs; + _state.Msgs -= purged; + if (bytes > _state.Bytes) bytes = _state.Bytes; + _state.Bytes -= bytes; + } + } + finally + { + _mu.ExitWriteLock(); + } + + cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty); + } + + // ----------------------------------------------------------------------- + // IStreamStore — state + // ----------------------------------------------------------------------- + + /// + public StreamState State() + { + _mu.EnterReadLock(); + try + { + var state = new StreamState + { + Msgs = _state.Msgs, + Bytes = _state.Bytes, + FirstSeq = _state.FirstSeq, + FirstTime = _state.FirstTime, + LastSeq = _state.LastSeq, + LastTime = _state.LastTime, + Consumers = _consumers, + NumSubjects = _fss.Size(), + }; + + var numDeleted = (int)((state.LastSeq - state.FirstSeq + 1) - state.Msgs); + if (numDeleted < 0) numDeleted = 0; + + if (numDeleted > 0) + { + var deleted = new List(numDeleted); + var fseq = state.FirstSeq; + var lseq = state.LastSeq; + _dmap.Range(seq => + { + if (seq >= fseq && seq <= lseq) + deleted.Add(seq); + return true; + }); + state.Deleted = deleted.ToArray(); + state.NumDeleted = deleted.Count; + } + + return state; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + public void FastState(StreamState state) + { + _mu.EnterReadLock(); + try + { + state.Msgs = _state.Msgs; + state.Bytes = _state.Bytes; + state.FirstSeq = _state.FirstSeq; + state.FirstTime = _state.FirstTime; + state.LastSeq = _state.LastSeq; + state.LastTime = _state.LastTime; + if (state.LastSeq > state.FirstSeq) + { + state.NumDeleted = (int)((state.LastSeq - state.FirstSeq + 1) - state.Msgs); + if (state.NumDeleted < 0) state.NumDeleted = 0; + } + state.Consumers = _consumers; + state.NumSubjects = _fss.Size(); + } + finally + { + _mu.ExitReadLock(); + } + } + + // ----------------------------------------------------------------------- + // IStreamStore — filtered / pending state + // ----------------------------------------------------------------------- + + /// + public SimpleState FilteredState(ulong seq, string subject) + { + _mu.EnterWriteLock(); + try + { + return FilteredStateLocked(seq, subject); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private SimpleState FilteredStateLocked(ulong sseq, string filter) + { + if (sseq < _state.FirstSeq) sseq = _state.FirstSeq; + if (sseq > _state.LastSeq) return new SimpleState(); + if (string.IsNullOrEmpty(filter)) filter = ">"; + + var isAll = filter == ">"; + if (isAll && sseq <= _state.FirstSeq) + return new SimpleState { Msgs = _state.Msgs, First = _state.FirstSeq, Last = _state.LastSeq }; + + var ss = new SimpleState(); + _fss.Match(Encoding.UTF8.GetBytes(filter), (subj, fss) => + { + if (fss.FirstNeedsUpdate || fss.LastNeedsUpdate) + RecalculateForSubj(Encoding.UTF8.GetString(subj), fss); + + if (sseq <= fss.First) + { + ss.Msgs += fss.Msgs; + if (ss.First == 0 || fss.First < ss.First) ss.First = fss.First; + if (fss.Last > ss.Last) ss.Last = fss.Last; + } + return true; + }); + return ss; + } + + /// + public Dictionary SubjectsState(string filterSubject) + { + _mu.EnterWriteLock(); + try + { + if (_fss.Size() == 0) return new Dictionary(); + if (string.IsNullOrEmpty(filterSubject)) filterSubject = ">"; + var result = new Dictionary(); + _fss.Match(Encoding.UTF8.GetBytes(filterSubject), (subj, ss) => + { + var s = Encoding.UTF8.GetString(subj); + if (ss.FirstNeedsUpdate || ss.LastNeedsUpdate) + RecalculateForSubj(s, ss); + if (!result.TryGetValue(s, out var oss) || oss.First == 0) + result[s] = new SimpleState { Msgs = ss.Msgs, First = ss.First, Last = ss.Last }; + else + { + oss.Last = ss.Last; + oss.Msgs += ss.Msgs; + result[s] = oss; + } + return true; + }); + return result; + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + public Dictionary SubjectsTotals(string filterSubject) + { + _mu.EnterReadLock(); + try + { + if (_fss.Size() == 0) return new Dictionary(); + if (string.IsNullOrEmpty(filterSubject)) filterSubject = ">"; + var result = new Dictionary(); + _fss.Match(Encoding.UTF8.GetBytes(filterSubject), (subj, ss) => + { + result[Encoding.UTF8.GetString(subj)] = ss.Msgs; + return true; + }); + return result; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + public (ulong Total, ulong ValidThrough, Exception? Error) NumPending(ulong sseq, string filter, bool lastPerSubject) + { + _mu.EnterWriteLock(); + try + { + var ss = FilteredStateLocked(sseq, filter); + return (ss.Msgs, _state.LastSeq, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + public (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject) + { + // TODO: session 17 — implement gsl.SimpleSublist equivalent + return NumPending(sseq, ">", lastPerSubject); + } + + /// + public (ulong[] Seqs, Exception? Error) AllLastSeqs() + { + _mu.EnterReadLock(); + try + { + if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null); + var seqs = new List(_fss.Size()); + _fss.IterFast((_, ss) => + { + seqs.Add(ss.Last); + return true; + }); + seqs.Sort(); + return (seqs.ToArray(), null); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed) + { + _mu.EnterReadLock(); + try + { + if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null); + if (maxSeq == 0) maxSeq = _state.LastSeq; + + var seqs = new List(64); + var seen = new HashSet(); + foreach (var filter in filters) + { + _fss.Match(Encoding.UTF8.GetBytes(filter), (_, ss) => + { + if (ss.Last <= maxSeq && seen.Add(ss.Last)) + seqs.Add(ss.Last); + return true; + }); + if (maxAllowed > 0 && seqs.Count > maxAllowed) + return (Array.Empty(), StoreErrors.ErrTooManyResults); + } + seqs.Sort(); + return (seqs.ToArray(), null); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + public (string Subject, Exception? Error) SubjectForSeq(ulong seq) + { + _mu.EnterReadLock(); + try + { + if (_msgs == null) return (string.Empty, StoreErrors.ErrStoreClosed); + if (seq < _state.FirstSeq) return (string.Empty, StoreErrors.ErrStoreMsgNotFound); + if (_msgs.TryGetValue(seq, out var sm) && sm != null) + return (sm.Subject, null); + return (string.Empty, StoreErrors.ErrStoreMsgNotFound); + } + finally + { + _mu.ExitReadLock(); + } + } + + // ----------------------------------------------------------------------- + // IStreamStore — time / seq + // ----------------------------------------------------------------------- + + /// + public ulong GetSeqFromTime(DateTime t) + { + var ts = new DateTimeOffset(t).ToUnixTimeMilliseconds() * 1_000_000L; + _mu.EnterReadLock(); + try + { + if (_msgs == null || _msgs.Count == 0) + return _state.LastSeq + 1; + + if (!_msgs.TryGetValue(_state.FirstSeq, out var firstSm) || firstSm == null) + return _state.LastSeq + 1; + + if (ts <= firstSm.Ts) + return _state.FirstSeq; + + // Find actual last stored message + StoreMsg? lastSm = null; + for (var lseq = _state.LastSeq; lseq > _state.FirstSeq; lseq--) + { + if (_msgs.TryGetValue(lseq, out lastSm) && lastSm != null) + break; + } + if (lastSm == null) return _state.LastSeq + 1; + if (ts >= lastSm.Ts) return _state.LastSeq + 1; + + // Linear scan fallback + for (var seq = _state.FirstSeq; seq <= _state.LastSeq; seq++) + { + if (_msgs.TryGetValue(seq, out var sm) && sm != null && sm.Ts >= ts) + return seq; + } + return _state.LastSeq + 1; + } + finally + { + _mu.ExitReadLock(); + } + } + + // ----------------------------------------------------------------------- + // IStreamStore — config, encoding, sync + // ----------------------------------------------------------------------- + + /// + public void UpdateConfig(StreamConfig cfg) + { + if (cfg == null) throw new ArgumentException("config required"); + _mu.EnterWriteLock(); + try + { + _cfg = cfg.Clone(); + _maxp = cfg.MaxMsgsPer; + EnforceMsgLimit(); + EnforceBytesLimit(); + if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero) + StartAgeChk(); + if (_ageChk != null && _cfg.MaxAge == TimeSpan.Zero) + { + _ageChk?.Dispose(); + _ageChk = null; + _ageChkTime = 0; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + /// + public void RegisterStorageUpdates(StorageUpdateHandler cb) + { + _mu.EnterWriteLock(); + _scb = cb; + _mu.ExitWriteLock(); + } + + /// + public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb) + { + _mu.EnterWriteLock(); + _rmcb = cb; + _mu.ExitWriteLock(); + } + + /// + public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb) + { + _mu.EnterWriteLock(); + _pmsgcb = cb; + _mu.ExitWriteLock(); + } + + /// + public void FlushAllPending() + { + // No-op: in-memory store doesn't use async applying. + } + + /// + public (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed) + { + // TODO: session 17 — binary encode using varint encoding matching Go + _mu.EnterReadLock(); + try + { + // Minimal encoded representation (stub): magic + version bytes + return (new byte[] { 42, 1 }, null); + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + public void SyncDeleted(DeleteBlocks dbs) + { + _mu.EnterWriteLock(); + try + { + if (dbs.Count == 1) + { + var (min, max, num) = _dmap.State(); + var (pmin, pmax, pnum) = dbs[0].GetState(); + if (pmin == min && pmax == max && pnum == num) + return; + } + + var lseq = _state.LastSeq; + foreach (var db in dbs) + { + var (first, _, _) = db.GetState(); + if (first > lseq) continue; + db.Range(seq => { RemoveMsgLocked(seq, false); return true; }); + } + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + } + + /// + public void ResetState() + { + // For memory store, nothing to reset. + } + + // ----------------------------------------------------------------------- + // IStreamStore — type / stop / delete + // ----------------------------------------------------------------------- + + /// + public StorageType Type() => StorageType.MemoryStorage; + + /// + public void Stop() + { + _mu.EnterWriteLock(); + try + { + if (_msgs == null) return; + _ageChk?.Dispose(); + _ageChk = null; + _ageChkTime = 0; + _msgs = null; + } + finally + { + _mu.ExitWriteLock(); + } + + Purge(); + } + + /// + public void Delete(bool inline) => Stop(); + + // ----------------------------------------------------------------------- + // IStreamStore — consumers + // ----------------------------------------------------------------------- + + /// + public IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg) + { + if (_msgs == null) + throw StoreErrors.ErrStoreClosed; + if (cfg == null || string.IsNullOrEmpty(name)) + throw new ArgumentException("bad consumer config"); + + var o = new ConsumerMemStore(this, cfg); + AddConsumer(o); + return o; + } + + /// + public void AddConsumer(IConsumerStore o) + { + _mu.EnterWriteLock(); + _consumers++; + _mu.ExitWriteLock(); + } + + /// + public void RemoveConsumer(IConsumerStore o) + { + _mu.EnterWriteLock(); + if (_consumers > 0) _consumers--; + _mu.ExitWriteLock(); + } + + /// + public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs) + { + // TODO: session 17 — not implemented for memory store + return (null, new NotImplementedException("no impl")); + } + + /// + public (ulong Total, ulong Reported, Exception? Error) Utilization() + { + _mu.EnterReadLock(); + try + { + return (_state.Bytes, _state.Bytes, null); + } + finally + { + _mu.ExitReadLock(); + } + } + + // ----------------------------------------------------------------------- + // Private helpers + // ----------------------------------------------------------------------- + + // Lock must be held. + private void EnforceMsgLimit() + { + if (_cfg.Discard != DiscardPolicy.DiscardOld) return; + if (_cfg.MaxMsgs <= 0 || _state.Msgs <= (ulong)_cfg.MaxMsgs) return; + while (_state.Msgs > (ulong)_cfg.MaxMsgs) + RemoveMsgLocked(_state.FirstSeq, false); + } + + // Lock must be held. + private void EnforceBytesLimit() + { + if (_cfg.Discard != DiscardPolicy.DiscardOld) return; + if (_cfg.MaxBytes <= 0 || _state.Bytes <= (ulong)_cfg.MaxBytes) return; + while (_state.Bytes > (ulong)_cfg.MaxBytes) + RemoveMsgLocked(_state.FirstSeq, false); + } + + // Lock must be held. + private void EnforcePerSubjectLimit(string subj, SimpleState ss) + { + if (_maxp <= 0) return; + while (ss.Msgs > (ulong)_maxp) + { + if (ss.FirstNeedsUpdate || ss.LastNeedsUpdate) + RecalculateForSubj(subj, ss); + if (!RemoveMsgLocked(ss.First, false)) + break; + } + } + + // Lock must be held. + private void UpdateFirstSeq(ulong seq) + { + if (seq != _state.FirstSeq) return; + + StoreMsg? nsm = null; + for (var nseq = _state.FirstSeq + 1; nseq <= _state.LastSeq; nseq++) + { + if (_msgs != null && _msgs.TryGetValue(nseq, out nsm) && nsm != null) + break; + } + + var oldFirst = _state.FirstSeq; + if (nsm != null) + { + _state.FirstSeq = nsm.Seq; + _state.FirstTime = DateTimeOffset.FromUnixTimeMilliseconds(nsm.Ts / 1_000_000L).UtcDateTime; + } + else + { + _state.FirstSeq = _state.LastSeq + 1; + _state.FirstTime = default; + } + + if (oldFirst == _state.FirstSeq - 1) + _dmap.Delete(oldFirst); + else + for (var s = oldFirst; s < _state.FirstSeq; s++) + _dmap.Delete(s); + } + + // Lock must be held. + private void RemoveSeqPerSubject(string subj, ulong seq) + { + if (string.IsNullOrEmpty(subj)) return; + var subjectBytes = Encoding.UTF8.GetBytes(subj); + var (ss, found) = _fss.Find(subjectBytes); + if (!found || ss == null) return; + + if (ss.Msgs == 1) + { + _fss.Delete(subjectBytes); + return; + } + ss.Msgs--; + if (ss.Msgs == 1) + { + if (!ss.LastNeedsUpdate && seq != ss.Last) + { + ss.First = ss.Last; + ss.FirstNeedsUpdate = false; + return; + } + if (!ss.FirstNeedsUpdate && seq != ss.First) + { + ss.Last = ss.First; + ss.LastNeedsUpdate = false; + return; + } + } + ss.FirstNeedsUpdate = seq == ss.First || ss.FirstNeedsUpdate; + ss.LastNeedsUpdate = seq == ss.Last || ss.LastNeedsUpdate; + } + + // Lock must be held. + private void RecalculateForSubj(string subj, SimpleState ss) + { + if (_msgs == null) return; + if (ss.FirstNeedsUpdate) + { + var tseq = ss.First + 1; + if (tseq < _state.FirstSeq) tseq = _state.FirstSeq; + for (; tseq <= ss.Last; tseq++) + { + if (_msgs.TryGetValue(tseq, out var sm) && sm != null && sm.Subject == subj) + { + ss.First = tseq; + ss.FirstNeedsUpdate = false; + if (ss.Msgs == 1) { ss.Last = tseq; ss.LastNeedsUpdate = false; return; } + break; + } + } + } + if (ss.LastNeedsUpdate) + { + var tseq = ss.Last - 1; + if (tseq > _state.LastSeq) tseq = _state.LastSeq; + for (; tseq >= ss.First; tseq--) + { + if (_msgs.TryGetValue(tseq, out var sm) && sm != null && sm.Subject == subj) + { + ss.Last = tseq; + ss.LastNeedsUpdate = false; + if (ss.Msgs == 1) { ss.First = tseq; ss.FirstNeedsUpdate = false; } + return; + } + if (tseq == 0) break; + } + } + } + + private void StartAgeChk() + { + if (_ageChk != null) return; + if (_cfg.MaxAge != TimeSpan.Zero) + _ageChk = new Timer(_ => ExpireMsgs(), null, _cfg.MaxAge, _cfg.MaxAge); + } + + private void ExpireMsgs() + { + // TODO: session 17 — full age/TTL expiry logic + _mu.EnterWriteLock(); + try + { + if (_msgs == null || _cfg.MaxAge == TimeSpan.Zero) return; + var minAge = DateTime.UtcNow - _cfg.MaxAge; + var minTs = new DateTimeOffset(minAge).ToUnixTimeMilliseconds() * 1_000_000L; + var toRemove = new List(); + foreach (var kv in _msgs) + { + if (kv.Value.Ts <= minTs) + toRemove.Add(kv.Key); + } + foreach (var seq in toRemove) + RemoveMsgLocked(seq, false); + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + } + + private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg) + => (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16); + + private static bool MatchLiteral(string subject, string filter) + { + var sParts = subject.Split('.'); + var fParts = filter.Split('.'); + return IsSubsetMatch(sParts, fParts); + } + + private static bool IsSubsetMatch(string[] subject, string[] filter) + { + var si = 0; + for (var fi = 0; fi < filter.Length; fi++) + { + if (filter[fi] == ">") + return si < subject.Length; + if (si >= subject.Length) + return false; + if (filter[fi] != "*" && filter[fi] != subject[si]) + return false; + si++; + } + return si == subject.Length; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs new file mode 100644 index 0000000..46434ec --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -0,0 +1,1000 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/store.go, server/stream.go, server/consumer.go, server/filestore.go, server/disk_avail.go + +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +// --------------------------------------------------------------------------- +// StorageType +// --------------------------------------------------------------------------- + +/// Determines how messages are stored for retention. +public enum StorageType +{ + /// On disk, designated by the JetStream config StoreDir. + FileStorage = 22, + + /// In memory only. + MemoryStorage = 33, +} + +// --------------------------------------------------------------------------- +// Well-known store errors +// --------------------------------------------------------------------------- + +public static class StoreErrors +{ + /// Returned when the store has been closed. + public static readonly Exception ErrStoreClosed = new InvalidOperationException("store is closed"); + + /// Returned when a message was not found but was expected. + public static readonly Exception ErrStoreMsgNotFound = new KeyNotFoundException("no message found"); + + /// Returned when message seq is greater than the last sequence. + public static readonly Exception ErrStoreEOF = new EndOfStreamException("stream store EOF"); + + /// Returned when DiscardNew is set and the message limit is reached. + public static readonly Exception ErrMaxMsgs = new InvalidOperationException("maximum messages exceeded"); + + /// Returned when DiscardNew is set and the bytes limit is reached. + public static readonly Exception ErrMaxBytes = new InvalidOperationException("maximum bytes exceeded"); + + /// Returned when DiscardNew is set and the per-subject message limit is reached. + public static readonly Exception ErrMaxMsgsPerSubject = new InvalidOperationException("maximum messages per subject exceeded"); + + /// Returned when RemoveMsg or EraseMsg is called while a snapshot is in progress. + public static readonly Exception ErrStoreSnapshotInProgress = new InvalidOperationException("snapshot in progress"); + + /// Returned when a message is considered too large. + public static readonly Exception ErrMsgTooLarge = new InvalidOperationException("message too large"); + + /// Returned when accessing the wrong storage type. + public static readonly Exception ErrStoreWrongType = new InvalidOperationException("wrong storage type"); + + /// Returned when trying to update a consumer's acks with no ack policy. + public static readonly Exception ErrNoAckPolicy = new InvalidOperationException("ack policy is none"); + + /// Returned when storing a raw message with a mismatched expected sequence. + public static readonly Exception ErrSequenceMismatch = new InvalidOperationException("expected sequence does not match store"); + + /// Returned when a stream state snapshot is corrupt. + public static readonly Exception ErrCorruptStreamState = new InvalidDataException("stream state snapshot is corrupt"); + + /// Returned when a request yields too many matching results. + public static readonly Exception ErrTooManyResults = new InvalidOperationException("too many matching results for request"); + + /// Returned when a stream state encoding is malformed. + public static readonly Exception ErrBadStreamStateEncoding = new InvalidDataException("bad stream state encoding"); +} + +// --------------------------------------------------------------------------- +// StoreMsg — stored message format +// --------------------------------------------------------------------------- + +/// +/// The stored message format for messages retained by the Store layer. +/// +public sealed class StoreMsg +{ + public string Subject { get; set; } = string.Empty; + public byte[] Hdr { get; set; } = Array.Empty(); + public byte[] Msg { get; set; } = Array.Empty(); + public byte[] Buf { get; set; } = Array.Empty(); + public ulong Seq { get; set; } + public long Ts { get; set; } + + /// Copy fields from another StoreMsg into this one. + public void CopyFrom(StoreMsg src) + { + var newBuf = new byte[src.Buf.Length]; + src.Buf.CopyTo(newBuf, 0); + Buf = newBuf; + Hdr = newBuf[..src.Hdr.Length]; + Msg = newBuf[src.Hdr.Length..]; + Subject = src.Subject; + Seq = src.Seq; + Ts = src.Ts; + } + + /// Clear all fields, resetting to an empty message. + public void Clear() + { + Subject = string.Empty; + Hdr = Array.Empty(); + Msg = Array.Empty(); + Buf = Array.Empty(); + Seq = 0; + Ts = 0; + } +} + +// --------------------------------------------------------------------------- +// Delegate types +// --------------------------------------------------------------------------- + +/// +/// Callback into the upper layers to report on changes in storage resources. +/// For single-message cases, also supplies sequence number and subject. +/// +public delegate void StorageUpdateHandler(long msgs, long bytes, ulong seq, string subj); + +/// Callback into the upper layers to remove a message. +public delegate void StorageRemoveMsgHandler(ulong seq); + +/// +/// Callback into the upper layers to process a JetStream message. +/// Will propose the message if the stream is replicated. +/// +public delegate void ProcessJetStreamMsgHandler(object? msg); + +// --------------------------------------------------------------------------- +// IStreamStore interface +// --------------------------------------------------------------------------- + +/// Abstraction over JetStream stream storage backends. +public interface IStreamStore +{ + (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[]? msg, long ttl); + void StoreRawMsg(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck); + (ulong Seq, Exception? Error) SkipMsg(ulong seq); + void SkipMsgs(ulong seq, ulong num); + void FlushAllPending(); + StoreMsg? LoadMsg(ulong seq, StoreMsg? sm); + (StoreMsg? Sm, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? smp); + (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp); + StoreMsg? LoadLastMsg(string subject, StoreMsg? sm); + (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp); + (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp); + (bool Removed, Exception? Error) RemoveMsg(ulong seq); + (bool Removed, Exception? Error) EraseMsg(ulong seq); + (ulong Purged, Exception? Error) Purge(); + (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep); + (ulong Purged, Exception? Error) Compact(ulong seq); + void Truncate(ulong seq); + ulong GetSeqFromTime(DateTime t); + SimpleState FilteredState(ulong seq, string subject); + Dictionary SubjectsState(string filterSubject); + Dictionary SubjectsTotals(string filterSubject); + (ulong[] Seqs, Exception? Error) AllLastSeqs(); + (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed); + (string Subject, Exception? Error) SubjectForSeq(ulong seq); + (ulong Total, ulong ValidThrough, Exception? Error) NumPending(ulong sseq, string filter, bool lastPerSubject); + (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject); + StreamState State(); + void FastState(StreamState state); + (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed); + void SyncDeleted(DeleteBlocks dbs); + StorageType Type(); + void RegisterStorageUpdates(StorageUpdateHandler cb); + void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb); + void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb); + void UpdateConfig(StreamConfig cfg); + void Delete(bool inline); + void Stop(); + IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg); + void AddConsumer(IConsumerStore o); + void RemoveConsumer(IConsumerStore o); + (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs); + (ulong Total, ulong Reported, Exception? Error) Utilization(); + void ResetState(); +} + +// --------------------------------------------------------------------------- +// RetentionPolicy +// --------------------------------------------------------------------------- + +/// Determines how messages in a stream are retained. +public enum RetentionPolicy +{ + /// Messages are retained until any given limit is reached. + LimitsPolicy = 0, + + /// Messages are removed when all known consumers have acknowledged them. + InterestPolicy, + + /// Messages are removed when the first worker acknowledges them. + WorkQueuePolicy, +} + +// --------------------------------------------------------------------------- +// DiscardPolicy +// --------------------------------------------------------------------------- + +/// Determines how the store proceeds when message or byte limits are hit. +public enum DiscardPolicy +{ + /// Remove older messages to return to the limits. + DiscardOld = 0, + + /// Fail the StoreMsg call when limits are exceeded. + DiscardNew, +} + +// --------------------------------------------------------------------------- +// StreamState +// --------------------------------------------------------------------------- + +/// Information about the given stream. +public sealed class StreamState +{ + [JsonPropertyName("messages")] + public ulong Msgs { get; set; } + + [JsonPropertyName("bytes")] + public ulong Bytes { get; set; } + + [JsonPropertyName("first_seq")] + public ulong FirstSeq { get; set; } + + [JsonPropertyName("first_ts")] + public DateTime FirstTime { get; set; } + + [JsonPropertyName("last_seq")] + public ulong LastSeq { get; set; } + + [JsonPropertyName("last_ts")] + public DateTime LastTime { get; set; } + + [JsonPropertyName("num_subjects")] + public int NumSubjects { get; set; } + + [JsonPropertyName("subjects")] + public Dictionary? Subjects { get; set; } + + [JsonPropertyName("num_deleted")] + public int NumDeleted { get; set; } + + [JsonPropertyName("deleted")] + public ulong[]? Deleted { get; set; } + + [JsonPropertyName("lost")] + public LostStreamData? Lost { get; set; } + + [JsonPropertyName("consumer_count")] + public int Consumers { get; set; } +} + +// --------------------------------------------------------------------------- +// SimpleState +// --------------------------------------------------------------------------- + +/// Filtered subject-specific state. +public sealed class SimpleState +{ + [JsonPropertyName("messages")] + public ulong Msgs { get; set; } + + [JsonPropertyName("first_seq")] + public ulong First { get; set; } + + [JsonPropertyName("last_seq")] + public ulong Last { get; set; } + + // Internal: the first/last need to be recalculated before use. + internal bool FirstNeedsUpdate { get; set; } + internal bool LastNeedsUpdate { get; set; } +} + +// --------------------------------------------------------------------------- +// LostStreamData +// --------------------------------------------------------------------------- + +/// Indicates messages that have been lost. +public sealed class LostStreamData +{ + [JsonPropertyName("msgs")] + public ulong[] Msgs { get; set; } = Array.Empty(); + + [JsonPropertyName("bytes")] + public ulong Bytes { get; set; } +} + +// --------------------------------------------------------------------------- +// SnapshotResult +// --------------------------------------------------------------------------- + +/// Contains information about a snapshot operation. +public sealed class SnapshotResult +{ + public Stream? Reader { get; set; } + public StreamState State { get; set; } = new(); +} + +// --------------------------------------------------------------------------- +// DeleteBlock / DeleteBlocks +// --------------------------------------------------------------------------- + +/// +/// Interface for a block of deleted sequences. +/// Implementations include AVL seqsets, run-length encoded ranges, and legacy slices. +/// +public interface IDeleteBlock +{ + (ulong First, ulong Last, ulong Num) GetState(); + void Range(Func f); +} + +/// A list of delete blocks. +public sealed class DeleteBlocks : List +{ + public ulong NumDeleted() + { + ulong total = 0; + foreach (var db in this) + { + var (_, _, num) = db.GetState(); + total += num; + } + return total; + } +} + +// --------------------------------------------------------------------------- +// DeleteRange — run-length encoded delete range +// --------------------------------------------------------------------------- + +/// Run-length encoded delete range. +public sealed class DeleteRange : IDeleteBlock +{ + public ulong First { get; set; } + public ulong Num { get; set; } + + public (ulong First, ulong Last, ulong Num) GetState() + { + var deletesAfterFirst = Num > 0 ? Num - 1 : 0; + return (First, First + deletesAfterFirst, Num); + } + + public void Range(Func f) + { + for (var seq = First; seq < First + Num; seq++) + { + if (!f(seq)) + return; + } + } +} + +// --------------------------------------------------------------------------- +// DeleteSlice — legacy []uint64 equivalent +// --------------------------------------------------------------------------- + +/// Legacy slice-based delete block. +public sealed class DeleteSlice : IDeleteBlock +{ + private readonly ulong[] _seqs; + + public DeleteSlice(ulong[] seqs) + { + _seqs = seqs; + } + + public (ulong First, ulong Last, ulong Num) GetState() + { + if (_seqs.Length == 0) + return (0, 0, 0); + return (_seqs[0], _seqs[^1], (ulong)_seqs.Length); + } + + public void Range(Func f) + { + foreach (var seq in _seqs) + { + if (!f(seq)) + return; + } + } +} + +// --------------------------------------------------------------------------- +// StreamReplicatedState +// --------------------------------------------------------------------------- + +/// +/// Represents what is encoded in a binary stream snapshot used for stream +/// replication in an NRG (NATS Raft Group). +/// +public sealed class StreamReplicatedState +{ + public ulong Msgs { get; set; } + public ulong Bytes { get; set; } + public ulong FirstSeq { get; set; } + public ulong LastSeq { get; set; } + public ulong Failed { get; set; } + public DeleteBlocks Deleted { get; set; } = new(); +} + +// --------------------------------------------------------------------------- +// IConsumerStore interface +// --------------------------------------------------------------------------- + +/// Stores state on consumers for streams. +public interface IConsumerStore +{ + void SetStarting(ulong sseq); + void UpdateStarting(ulong sseq); + void Reset(ulong sseq); + bool HasState(); + void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts); + void UpdateAcks(ulong dseq, ulong sseq); + void UpdateConfig(ConsumerConfig cfg); + void Update(ConsumerState state); + (ConsumerState? State, Exception? Error) State(); + (ConsumerState? State, Exception? Error) BorrowState(); + byte[] EncodedState(); + StorageType Type(); + void Stop(); + void Delete(); + void StreamDelete(); +} + +// --------------------------------------------------------------------------- +// SequencePair +// --------------------------------------------------------------------------- + +/// +/// Has both the consumer and the stream sequence. They point to the same message. +/// +public sealed class SequencePair +{ + [JsonPropertyName("consumer_seq")] + public ulong Consumer { get; set; } + + [JsonPropertyName("stream_seq")] + public ulong Stream { get; set; } +} + +// --------------------------------------------------------------------------- +// Pending +// --------------------------------------------------------------------------- + +/// +/// Represents a pending message for explicit ack or ack-all. +/// Sequence is the original consumer sequence. +/// +public sealed class Pending +{ + public ulong Sequence { get; set; } + public long Timestamp { get; set; } +} + +// --------------------------------------------------------------------------- +// ConsumerState +// --------------------------------------------------------------------------- + +/// Represents a stored state for a consumer. +public sealed class ConsumerState +{ + [JsonPropertyName("delivered")] + public SequencePair Delivered { get; set; } = new(); + + [JsonPropertyName("ack_floor")] + public SequencePair AckFloor { get; set; } = new(); + + /// + /// Pending messages and the timestamp for the delivered time. + /// Only present when AckPolicy is ExplicitAck. + /// Keys are stream sequence numbers. + /// + [JsonPropertyName("pending")] + public Dictionary? Pending { get; set; } + + /// + /// Messages that have been redelivered (count > 1). + /// Keys are stream sequence numbers; values are delivery counts minus one. + /// + [JsonPropertyName("redelivered")] + public Dictionary? Redelivered { get; set; } +} + +// --------------------------------------------------------------------------- +// AckPolicy +// --------------------------------------------------------------------------- + +/// Determines how the consumer should acknowledge delivered messages. +public enum AckPolicy +{ + /// No acks required for delivered messages. + AckNone = 0, + + /// Acking a sequence implicitly acks all sequences below it. + AckAll, + + /// Requires explicit ack or nack for all messages. + AckExplicit, +} + +// --------------------------------------------------------------------------- +// ReplayPolicy +// --------------------------------------------------------------------------- + +/// Determines how the consumer replays messages already queued in the stream. +public enum ReplayPolicy +{ + /// Replay messages as fast as possible. + ReplayInstant = 0, + + /// Maintain the same timing as when the messages were received. + ReplayOriginal, +} + +// --------------------------------------------------------------------------- +// DeliverPolicy +// --------------------------------------------------------------------------- + +/// Determines how the consumer selects the first message to deliver. +public enum DeliverPolicy +{ + /// Deliver all messages (default). + DeliverAll = 0, + + /// Start with the last sequence received. + DeliverLast, + + /// Only deliver new messages sent after the consumer is created. + DeliverNew, + + /// Look for a defined starting sequence. + DeliverByStartSequence, + + /// Select the first message with a timestamp >= StartTime. + DeliverByStartTime, + + /// Start with the last message for all subjects received. + DeliverLastPerSubject, +} + +// --------------------------------------------------------------------------- +// PriorityPolicy +// --------------------------------------------------------------------------- + +/// Policy for selecting messages based on priority. +public enum PriorityPolicy +{ + PriorityNone = 0, + PriorityOverflow, + PriorityPinnedClient, + PriorityPrioritized, +} + +// StoreCipher is already defined in ServerOptionTypes.cs — no duplicate needed here. + +// --------------------------------------------------------------------------- +// StoreCompression +// --------------------------------------------------------------------------- + +/// Compression algorithm used for stream data. +public enum StoreCompression : byte +{ + NoCompression = 0, + S2Compression, +} + +// --------------------------------------------------------------------------- +// PersistModeType +// --------------------------------------------------------------------------- + +/// Determines what persistence mode the stream uses. +public enum PersistModeType +{ + DefaultPersistMode = 0, + AsyncPersistMode, +} + +// --------------------------------------------------------------------------- +// Placement +// --------------------------------------------------------------------------- + +/// +/// Guides placement of streams and meta controllers in clustered JetStream. +/// +public sealed class Placement +{ + [JsonPropertyName("cluster")] + public string? Cluster { get; set; } + + [JsonPropertyName("tags")] + public string[]? Tags { get; set; } + + [JsonPropertyName("preferred")] + public string? Preferred { get; set; } +} + +// --------------------------------------------------------------------------- +// SubjectTransformConfig +// --------------------------------------------------------------------------- + +/// +/// Applies a subject transform to incoming messages before storing. +/// +public sealed class SubjectTransformConfig +{ + [JsonPropertyName("src")] + public string Source { get; set; } = string.Empty; + + [JsonPropertyName("dest")] + public string Destination { get; set; } = string.Empty; +} + +// --------------------------------------------------------------------------- +// RePublish +// --------------------------------------------------------------------------- + +/// Configuration for republishing messages once committed to a stream. +public sealed class RePublish +{ + [JsonPropertyName("src")] + public string? Source { get; set; } + + [JsonPropertyName("dest")] + public string Destination { get; set; } = string.Empty; + + [JsonPropertyName("headers_only")] + public bool HeadersOnly { get; set; } +} + +// --------------------------------------------------------------------------- +// ExternalStream +// --------------------------------------------------------------------------- + +/// Qualifies access to a stream source in another account or domain. +public sealed class ExternalStream +{ + [JsonPropertyName("api")] + public string ApiPrefix { get; set; } = string.Empty; + + [JsonPropertyName("deliver")] + public string DeliverPrefix { get; set; } = string.Empty; +} + +// --------------------------------------------------------------------------- +// StreamSource +// --------------------------------------------------------------------------- + +/// Dictates how streams can source from other streams. +public sealed class StreamSource +{ + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("opt_start_seq")] + public ulong OptStartSeq { get; set; } + + [JsonPropertyName("opt_start_time")] + public DateTime? OptStartTime { get; set; } + + [JsonPropertyName("filter_subject")] + public string? FilterSubject { get; set; } + + [JsonPropertyName("subject_transforms")] + public SubjectTransformConfig[]? SubjectTransforms { get; set; } + + [JsonPropertyName("external")] + public ExternalStream? External { get; set; } +} + +// --------------------------------------------------------------------------- +// StreamConsumerLimits +// --------------------------------------------------------------------------- + +/// Limits applied to consumers created against a stream. +public sealed class StreamConsumerLimits +{ + [JsonPropertyName("inactive_threshold")] + public TimeSpan InactiveThreshold { get; set; } + + [JsonPropertyName("max_ack_pending")] + public int MaxAckPending { get; set; } +} + +// --------------------------------------------------------------------------- +// StreamConfig +// --------------------------------------------------------------------------- + +/// +/// Determines the name, subjects, and retention policy for a given stream. +/// If Subjects is empty the Name will be used. +/// +public sealed class StreamConfig +{ + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("description")] + public string? Description { get; set; } + + [JsonPropertyName("subjects")] + public string[]? Subjects { get; set; } + + [JsonPropertyName("retention")] + public RetentionPolicy Retention { get; set; } + + [JsonPropertyName("max_consumers")] + public int MaxConsumers { get; set; } + + [JsonPropertyName("max_msgs")] + public long MaxMsgs { get; set; } + + [JsonPropertyName("max_bytes")] + public long MaxBytes { get; set; } + + [JsonPropertyName("max_age")] + public TimeSpan MaxAge { get; set; } + + [JsonPropertyName("max_msgs_per_subject")] + public long MaxMsgsPer { get; set; } + + [JsonPropertyName("max_msg_size")] + public int MaxMsgSize { get; set; } + + [JsonPropertyName("discard")] + public DiscardPolicy Discard { get; set; } + + [JsonPropertyName("storage")] + public StorageType Storage { get; set; } + + [JsonPropertyName("num_replicas")] + public int Replicas { get; set; } + + [JsonPropertyName("no_ack")] + public bool NoAck { get; set; } + + [JsonPropertyName("duplicate_window")] + public TimeSpan Duplicates { get; set; } + + [JsonPropertyName("placement")] + public Placement? Placement { get; set; } + + [JsonPropertyName("mirror")] + public StreamSource? Mirror { get; set; } + + [JsonPropertyName("sources")] + public StreamSource[]? Sources { get; set; } + + [JsonPropertyName("compression")] + public StoreCompression Compression { get; set; } + + [JsonPropertyName("first_seq")] + public ulong FirstSeq { get; set; } + + [JsonPropertyName("subject_transform")] + public SubjectTransformConfig? SubjectTransform { get; set; } + + [JsonPropertyName("republish")] + public RePublish? RePublish { get; set; } + + [JsonPropertyName("allow_direct")] + public bool AllowDirect { get; set; } + + [JsonPropertyName("mirror_direct")] + public bool MirrorDirect { get; set; } + + [JsonPropertyName("discard_new_per_subject")] + public bool DiscardNewPer { get; set; } + + [JsonPropertyName("sealed")] + public bool Sealed { get; set; } + + [JsonPropertyName("deny_delete")] + public bool DenyDelete { get; set; } + + [JsonPropertyName("deny_purge")] + public bool DenyPurge { get; set; } + + [JsonPropertyName("allow_rollup_hdrs")] + public bool AllowRollup { get; set; } + + [JsonPropertyName("consumer_limits")] + public StreamConsumerLimits ConsumerLimits { get; set; } = new(); + + [JsonPropertyName("allow_msg_ttl")] + public bool AllowMsgTTL { get; set; } + + [JsonPropertyName("subject_delete_marker_ttl")] + public TimeSpan SubjectDeleteMarkerTTL { get; set; } + + [JsonPropertyName("allow_msg_counter")] + public bool AllowMsgCounter { get; set; } + + [JsonPropertyName("allow_atomic")] + public bool AllowAtomicPublish { get; set; } + + [JsonPropertyName("allow_msg_schedules")] + public bool AllowMsgSchedules { get; set; } + + [JsonPropertyName("persist_mode")] + public PersistModeType PersistMode { get; set; } + + [JsonPropertyName("metadata")] + public Dictionary? Metadata { get; set; } + + /// Performs a deep copy of this StreamConfig. + public StreamConfig Clone() + { + var clone = (StreamConfig)MemberwiseClone(); + if (Placement != null) + clone.Placement = new Placement { Cluster = Placement.Cluster, Tags = Placement.Tags?.ToArray(), Preferred = Placement.Preferred }; + if (Mirror != null) + clone.Mirror = new StreamSource { Name = Mirror.Name, OptStartSeq = Mirror.OptStartSeq, OptStartTime = Mirror.OptStartTime, FilterSubject = Mirror.FilterSubject, SubjectTransforms = Mirror.SubjectTransforms?.ToArray(), External = Mirror.External }; + if (Sources != null) + clone.Sources = Sources.ToArray(); + if (SubjectTransform != null) + clone.SubjectTransform = new SubjectTransformConfig { Source = SubjectTransform.Source, Destination = SubjectTransform.Destination }; + if (RePublish != null) + clone.RePublish = new RePublish { Source = RePublish.Source, Destination = RePublish.Destination, HeadersOnly = RePublish.HeadersOnly }; + if (Metadata != null) + clone.Metadata = new Dictionary(Metadata); + if (Subjects != null) + clone.Subjects = Subjects.ToArray(); + return clone; + } +} + +// --------------------------------------------------------------------------- +// ConsumerConfig +// --------------------------------------------------------------------------- + +/// Configuration for a JetStream consumer. +public sealed class ConsumerConfig +{ + [JsonPropertyName("durable_name")] + public string? Durable { get; set; } + + [JsonPropertyName("name")] + public string? Name { get; set; } + + [JsonPropertyName("description")] + public string? Description { get; set; } + + [JsonPropertyName("deliver_policy")] + public DeliverPolicy DeliverPolicy { get; set; } + + [JsonPropertyName("opt_start_seq")] + public ulong OptStartSeq { get; set; } + + [JsonPropertyName("opt_start_time")] + public DateTime? OptStartTime { get; set; } + + [JsonPropertyName("ack_policy")] + public AckPolicy AckPolicy { get; set; } + + [JsonPropertyName("ack_wait")] + public TimeSpan AckWait { get; set; } + + [JsonPropertyName("max_deliver")] + public int MaxDeliver { get; set; } + + [JsonPropertyName("backoff")] + public TimeSpan[]? BackOff { get; set; } + + [JsonPropertyName("filter_subject")] + public string? FilterSubject { get; set; } + + [JsonPropertyName("filter_subjects")] + public string[]? FilterSubjects { get; set; } + + [JsonPropertyName("replay_policy")] + public ReplayPolicy ReplayPolicy { get; set; } + + [JsonPropertyName("rate_limit_bps")] + public ulong RateLimit { get; set; } + + [JsonPropertyName("sample_freq")] + public string? SampleFrequency { get; set; } + + [JsonPropertyName("max_waiting")] + public int MaxWaiting { get; set; } + + [JsonPropertyName("max_ack_pending")] + public int MaxAckPending { get; set; } + + [JsonPropertyName("flow_control")] + public bool FlowControl { get; set; } + + [JsonPropertyName("headers_only")] + public bool HeadersOnly { get; set; } + + // Pull-based options + [JsonPropertyName("max_batch")] + public int MaxRequestBatch { get; set; } + + [JsonPropertyName("max_expires")] + public TimeSpan MaxRequestExpires { get; set; } + + [JsonPropertyName("max_bytes")] + public int MaxRequestMaxBytes { get; set; } + + // Push-based consumers + [JsonPropertyName("deliver_subject")] + public string? DeliverSubject { get; set; } + + [JsonPropertyName("deliver_group")] + public string? DeliverGroup { get; set; } + + [JsonPropertyName("idle_heartbeat")] + public TimeSpan Heartbeat { get; set; } + + [JsonPropertyName("inactive_threshold")] + public TimeSpan InactiveThreshold { get; set; } + + [JsonPropertyName("num_replicas")] + public int Replicas { get; set; } + + [JsonPropertyName("mem_storage")] + public bool MemoryStorage { get; set; } + + [JsonPropertyName("direct")] + public bool Direct { get; set; } + + [JsonPropertyName("metadata")] + public Dictionary? Metadata { get; set; } + + [JsonPropertyName("pause_until")] + public DateTime? PauseUntil { get; set; } + + [JsonPropertyName("priority_groups")] + public string[]? PriorityGroups { get; set; } + + [JsonPropertyName("priority_policy")] + public PriorityPolicy PriorityPolicy { get; set; } + + [JsonPropertyName("priority_timeout")] + public TimeSpan PinnedTTL { get; set; } +} + +// --------------------------------------------------------------------------- +// DiskAvailability — stub for disk_avail.go +// --------------------------------------------------------------------------- + +/// +/// Utility for checking disk space availability. +/// Adapted from server/disk_avail.go (uses statfs on Unix). +/// +public static class DiskAvailability +{ + // Default large store limit: 1 TB (used as fallback when statfs fails). + private const long JetStreamMaxStoreDefault = 1L * 1024 * 1024 * 1024 * 1024; + + /// + /// Returns approximately 75% of available disk space at . + /// Returns (1 TB) if the check fails. + /// + public static long Available(string path) + { + // TODO: session 17 — implement via DriveInfo or P/Invoke statvfs on non-Windows. + try + { + var drive = new DriveInfo(Path.GetPathRoot(Path.GetFullPath(path)) ?? path); + if (drive.IsReady) + { + // Estimate 75% of available free space, matching Go behaviour. + return drive.AvailableFreeSpace / 4 * 3; + } + } + catch + { + // Fall through to default. + } + + return JetStreamMaxStoreDefault; + } + + /// + /// Returns true if at least bytes are available at . + /// + public static bool Check(string path, long needed) => Available(path) >= needed; +} diff --git a/porting.db b/porting.db index 0bb05ba26421b36c6fc53470a02f304b40dbef02..28f40dd5b39eb3d9b6c57495ba424b43b606f67c 100644 GIT binary patch delta 10282 zcmc&(d0q8P-IdPQMtRMLPVaXPA#me5~=n-r)(hsYd+ds;$G;U^{M4v+cL{ z+ppNB+W&0#+t=G~v{&1Q+XdUFwzqAkY~9C4?CY`V4T<-hIx=DGo;W^FzoE6Gp}nJN zQ^$(srYpEmrInZT%UajBty|Ht!n7Pe!JqCy$e6YFR3DuSZBOKtB;o6+gI@Eg+xw61 z>_5r{`;Y1!b%I^ceYxAf2QBu)_W!n16s*I(a`%(`cQ(T|!zN=o8BGewz2pQ`o#&Iy z_G|VJ?XTNkush)3q>;a_hwKY{0n8zMZJ}0XgI-Gs z-)Sk7;W@%@;$h7dKA<-U!LwH0$8pWzY2=pf8LGET=Bu=Lp3w$Bj(JzXP-7XY9P+R5 zwFkL4{%j1tNB61Trh_BYIy$)FJa6P+YnwxaF+J8In`x?PtZBF@+Z1mSVy;76kF~JM zb^(obn%^ai6H0_!AyF{K{wj7yY-j9h%Vf(KOOYkh5@&nb)@!@p_G??W?Pl8=+hW`# z{A1;z&!JBYroYeM${FTc;&e7B$<-%A?+5&{vc+04O^X<6Aww--s2hXRKjio6yOlz8qZKnP%<$=$<)Xo8&oP~Abu}pUM_)s zpYUyhEmm!U*ThW(qd|K zcAd{+pKc3S-?Mu0SBv=r@|0;8q;C@>yZ^@SvGF?2xw+cv99dK-?~2Ny-4bKexM5 z*~MBJ!oK)(!3mG-5|SbQ34Ip)c!zL2igk34Q?cQ7RqW|Igfw{dFbRpJ#a0Z$Rz+bM z1KZ2F-Q`1+?Y-*uKDSc`XMQf^K+9c1d;~ASAMX+}V+WW}*TMJh5{Bx9C-~hu=%q2U zgA1up$U{{lHw4;@MmtpPcZ>@fXAilJ+u5*Nz{h}(G1#y>=KjZH%zJew_;wu>oway_ z6?LX&v)<-~rb)>i!K6i|Y+kYE5ZFk}Vz7Rx>F{Ss#vW?zsVN(dEHupv{$eqCSatM>YRKVPM(%(v zQTH{(^b&{Ba4UC0cTTwm&%@Wfe}m$k*j)`vpg=#E@#q2qta%FGlwRV2=C^ENz9YS)Aj&75ApazZ zFX&?R^DX9|n|_RWP*}>h8K2e94<`4KS2)E_+7kRp)P*iuDlhU#IeJd=d2`_08Il6u zAJ13?FKrRy;rtrW6gy+{`nnYxmbYwJ#qYVB&k70p(qD-g-aSb&BOWiWp#>;md8%? zL?fL0Gs%l^Y>%Nskz@S!B#%^#9KrM#NCF4?r}SZ#rWZ+Z1TAl&4rP5gE4==p7s;KP zqpto7DTsjmv9wrOT9!gfL*OL>nxoP)BnH-;Cc`2z0ndv}!;z^%F7QY$$Z>Xg z9IZfW72M#FQWSB1d6vxAoZ+wMhyk)*(Rxr39g7TZyc-z`!`xR$IrwMT!}2%^Gu=^d zTTV+5>)jkSLCmY6 zv=ZS{5bjgNnDZLS9d%@c+u+h`#G%!t+(DJpP%JwREl0fD zpB6rA{_Er!IKI^qK5N<=#KFikl#W4}3Q#6hm-TNDpJw^KH;9W7=N4Lq%NNMw!+8Ep z(xW*t;tlZfn0|CQKQ)W(VIPB@BE8^&)(lH|)_@-XZJ3|CSK8 zGVLd&lGbvSbfh8-uC55<^Sa@)cTkjX`-xj)!C&^1I9U5G$z-;gNlTHpbXF)B!oGLO zGR@(A?~!2)yq1nYxDO?BCv6Y`E#U@fls6!gnJRLMx=@72H%B(Qq4h> zt7g?L$hUoC<1Jsto;80*{$)C7JR&^FFV=T)t-1vW&~cwDVbK%kkEb`F;VNZM*a$1n zlf5A|^!4SPDij;t_X{A{u9hQozEFo!sCr%0$AQh_ik=Mtqt(Tz}Qm; zR#)@rG(63CJdF}9;qLbbbfxkXd_Y{8n$!3f(5Z@i{z}F!3E=;LoQ)_g3DBGdxbs6c z6mDNjry%pR9muRiwczOw$tF%?nN1(DVQu>+IvE*m*iq||$`yvEKO#4CaIGE%Ref^J z1>$A=zKqr?8u4Rri#%MOGyH4z|wtvut9Z7FQ9uM-UVYmNFQU!eN zK-fhdHtd$r`3R5L2(F`zq8ZTk6)9jO%B1rU9!X9!zQW|Bx;5UnBzXKQlFRr~CauR) zd*pVHl%<^di?7HrO|A9z*Tl&zKZVZ4<((~IL!^F#(I%R{kAA~0FK?i8)aCi`@*jVL zg=q7Jfs_Q_l43?2kmd$s?62;&b0I1>NcI zh)0V@BiA9mT*0fiKIwaMmuCIfzb8cuzM0NMe6h@hONsO!u(XNZkW)VlDkY3`2I4(( zb@;sekK|qr{*@nzQ`f47G`&HZoKjL$$)L5BOnS zrVY%e>ltbtL#<_~H4N3lP^%eg6+^9LC~gHT{Dhwk;w63(#DB<7(wpuJPPoWBxZtgO ztld*$x%YK(y0>-u%lZjH%X#bTs?HmqO>3|@16$B*imfiITGxE`4{JG$Ow5y@@}Ojg zMekeP(K>2c;1;}gIsV&RRyB6-d*Av`xK@PyZq-4_0>?hE4vTi=V?VUMut<1KSglhG zu?(I$&*!OT;yZ=a!8KL3K3*{k=ug}4>uS{=$HFMn!YI?iDAU3y)50jz!YI?iDAU3y z)3TD8Oj}kk!YpSfW-w@B27{I+2FXkyEujfycmQc(29TB|R)%r$&h$w(eC+td;WGrE zczhD@NyH}!pJaSe@JYod4WD#;GJ@%o#7k-ViG^Tl6eW1`iKs+FhEyimZxFt5jIq7}hEM%JLPw+?;N>KDxK-+RL3HnxuFR;vZ*n2F4z~SQ=nc5Y$r_)Sa zVcK@Al$BcC=Piq@5CfHe%y=DsTqPEW(h?P~m8H^i5whY}sQ&6T^*fUfrmag6)wu4{H3^y|uH;6vw2z4|W;S~s1 z^*g#%Y=~5?iSSaZ_`l3_E)jc`?N?T?{5JNI`3Eyc5=_Hl78~~(`VAgo8~+q<=Pm_3 zjoe8t!JkFj@C-HbtU-CaaijQKc(sXWGwY`tvEq(Bml16qPp;||q)1d1liB;7FRkw%+J-G*Zx;U-A@MBG| zy@jqsA*P`aO8oP=3p6MGZGo{mvR*P&Y!OX*>{dM!Q8u;utLwxw;>Xd8gR-r_^=h!N z9jG?Z&ywMuE#hiTwRw39I_RidMUCp+bU8Bq#BEeH-f^qw(x~wwGDhkFxsxtaFE{2E zwXuEpXX36<2d>7NRkw+niDU4$SVU)sLo>9>B>_lo*jwO&XDCYk|cJfm>N)yi&a|) z*_b4a3H3jSjD=H4QXbP^EekZLE9Zppdy=Icp@k15|72lh{193A<78Y|?ar%H1|gk+ z(BvUpIPOZ3$~9`Znj#hFXa^Nb0?KH?yUJx%zQ^!6y~>~hUP+Y_VOFX%N3pG{K^y!n zRVvXMD;S$5UGKJXb@;8*o)X<2dxf9_+de6a-@!enOjuUh%mH6XeF=DZAe2h6f~9G#(9+9kIJ0Rsp@6qBw zCvH+8J6xer_a6>O+u;5sIGOrWmcalEeSov3MjoP+Q+G?y!H^$W0tb8FH5!N_+@Kh$U{;d?0r~D z3w?W3{3K7{(8DNO;}I#7St386Bv~IWq3r0(%2OOdS>ozbd>XVJl1$1v1m-^?*A}BN7|iN&-r{MZM*z$2;=~>TMu`7@_j8RLp3jFrXw>6gp2mM@>?z5N%fS|k4m#c%2mVa-ABX9)n4`PUa7k`v%5E|yEnVLH>bNd zx4Sp*L~p*($GsRdv^j=y;N9qugNHXd^2~bc;ovK`I^N-Uyc00Kh7<*T5mY LKWN$K_#*2+*-{S7 delta 7719 zcmZu$33wD$w(jbx?)2?+Rdx1+RD~=gA!$0H2~87{n2oR_n}{ev05NQV5CX{Jm<~?V z8F4NmIX%pq5gip3gaAW&K1Y1Gqap%2ih_V~!Ce+N9Mn1YR&{9d9^Y3V;qssT+_T)Y zwt~=#A5E7NEEekw{BE*XEW*bA0U=>`{UZ++B`RY?JUOSdD6^Ek>Q?0;^`!cZGDh8_ zwyBNk4Qj1gq^7Bo@~!f|^3vWxTQ@3J>6c9}?W`$($Svkrmn?0WwY+6^bIZJ29N!?L zj^2j@@vyzuVNCO;+FjQTR$s9Y{9pd3QWkaV+@+08i|4h>bNmI*#`Gc*%kVI1V|C*C z)4T3Ey-Nt6-et|6DA}4V$2Z%=@aBW!8W$&YAs`091EwpM@#r8k8 z`|OX|L-s$|*V&iY>+O^6!<|lYnS4XekYnnF@C89^6(xHgi?*>kwIk2!f-N?&RU(BJ z1qNnYlf#4U;ty7dY`FrMOvF(V=`sO!>9RZ|0zN{-TScoXf%_qAh;a*_ltqsKano%- zZxh_&kwp7fmJ?RR;uzy7cN94?9q|q+2L40k>EWs~qFqQ8JH)%CYovivwv;H@U87vX zTmxJgF8Aj9#3E>|RwOvEQBH*WJ`@%B8QFZ2eaJjizA` zRbdlR$0;HClcoe+~7c)-d-7xFuBq|9lA%VFi;czDDCS(Wwu|#Q6?`$%$KL!h%N^+3375}9ue53I_4#b!u3Fgp23tAxRdgJs02h5s-v zd6kH4$V9v4v?b9p3}QM-u5KA7Jc9o={{*vWBHEcF`{W$76aKJ|_+ZbLi4gCxRN z`$z#a?RPpv>&+I(sT$ zXIU`}{wz3sn54ijd(#_W;hUtNRon<0YaE4eCD z@;-dfbg+B9ljb3^dVP{lFGgfoeU&5#@NlKeRvQVf~i45vbK8OvZ_ljWAz$h}d< zO?y2OGjup)HIc8}Z3-=Byo0*I8$z!WfXT0Bm|D~yLe|_B3N1olMX7IbqcPbWfpd{m2q$X|)JHseYwF-spNg_H=B5v%qmN=3>kUd!kThX|<(E!^JyU zAg)(~)wBWwQ83=8r!X~2kCJI{>_m1ZwEj$z;rMDT%gBggT8=m`W^6jc{pTntgA4aZ z620LVYRWUGfeu8Re}(DX?Z?O}xW6V6ua1z-$GY#PxXiz)!E#!K*fQkFvux8_WHihf zZsbiG^j&4kQ-hOeC1OS+CcxxQc^mbMWa2ekq`Hc-I#(Rb_31jp6#Sdq1IIcJtCK-hV zwthh7M&;x14x3LILkA%wfbw&hiYrf%=`gvUWXSJ0ZIn2ojHEt<`4Pr1`0&#h;5`pu zDdY)w_%z995*-~!-1hA2Ey^WDbbTLt#Ce&Vb!@Zm#Evn;dWW#oay=`P!bO(Vc)OD5 zc=UH=&}aH_!5Q*kl<{M`No>c5#0H)hZTYH;jzh3NSYfJg=1VNyx1S*~aQln4S#aq? zl4aOd)ouDrnNG)|($%Omt2J<6H=2JXoY^0l^&ox*9mDi!tGvp&3DEWtIda7ru>NCG z#I1C%UqVMC#n?NMg4Koap^r(k5LH5!ero98aIarMM?F3YvCg(Fi&U96zK&eHW?F+lZ!mJ7622k7H!dC*{l+&KGR{$?)tn<1d)}MK zw}ir=m+_+T2L0N%W^d|fwMkOErOg74b9ia9-09QOf+choOHI5P%xeLEj-{l;(KxRW z{&YAa6+YWd65!Qy*b1A!mb0MhHOv}vKGH!;=}e?8MOrR*)Oj*8s&5@QZ(5RCNN+}5 zG2&S53i$PijLv^#B*C9oX|ZtT1>$fnSYEq$>2lmT&cN)w>1M-TegR7xyN}5kbOy4? zM>a{EO%i7V6IW;%jGow9mNq$Fg%4l0XBn2KT_oAYRUSugLQ>DoK0OUdVd_OPyQhZ5 z!PgffZC;=^qAN>SaM&;bdEa5N;}@FW)Gxj>by14wbjA>8BHjZFzbC_b7{T29TL%|X{4RMh$0Q^8|dER#X7`cvF^m;_})az@1#CpvW zk&iW9KSs)M7M+SF_=3xQdS7P3cR!LnQ6s{;KN-b_DC6jL$S{9_;Z8`sg!enrnPOqb zCDV86GCGAb6itSoU&28tey-6zYmF4_OD7|yHdt!b>Qg_HZE*j6MomwKfBpxr6VFvQ zorJ);V3JQSVcH3oG5fEyox{HoJG^-r6NjS(IuX%j49#y}(l2BKN26O@@b)i}t1^#H zKwNQ9FuP-)U$G-cGByF8|25Jb3B4A9zF@6si})Ma5(Px@MkW4-GbU5W4?8lQ$77Xg zbOkypu)ya{vZ9x(R%ofv{}O9KYc;q1y46kdocF?WZ>RSMcahTp8wNPDq26OBabsx< z5{{RBUX~;QuLsV1oJr8>bxQEb0B5CHAKr~|FiIEv@OJv3xJtSliRzGuUr29G4^i3G zc%LDpfi@$4ym6!BmXpCGF;1hA2jzqSl0WXsrSLs zaBN+lT7m6QC9Dq@8;jNI0j_6+xZdEFyG9hb!Xt|0XJ<>h>`N@rDu@}duqn4D{0@~n z_-fW9^3{7yhkV!QSh3sUw!Ck#QlUHSIwQX$sKLU}8jMu2?2&mFp42Uu!Jc#_tK(zY z1zp`T4&^fyqFx(XjftJrcDqOBh3l7Y`4f2fPZ<6)DY^`d6PQwW<&9AAQ}J`-7l&UwehK&`;+KS9GJYxerQ(-{ zUmyI^!+9gLGilap>@)S6E>v%$RMyI)KZO z1HK+-^QggbG{?9PSTJxam@-?tE@}$H<;V`FW@~1_D5Tk3N~775`pwb8QKNYT9Rq1| zv1#xclR~qQq>RnCyv9tPtNlGnK;A9d00S7HnFyR3Eb-~JtaLTrqIE?Lk2B_L4v^<* z39xjY$vb`~?TaK6+7f(v1-obd3TV1TOM(OQw1ZcZs_@{RbOJl}rspNB*J%clja`qU z44aPv{?Z<@7^wVb`aAGj18Q|;a7EdJ4JGYVN79NQj{m$dZJ)12PwR~fG-HQJt)Lpq zD>SB8Pd+oph8y$(Ef*e^W6cr5zfd!JFz&(W$ln`O%^`o~LTz1?w{jLC9Uq>nX&=VF z%&f?B7Gc<$4{IjFvx|_SS>n@>p+A^!&Q1LnYt)$X`S9}jV$GbJ5@;$1VvEUPiQsM2 zw!oCfBMS=xW5M5GD^i2YXbOs+gd#|oG{d)Rw?)6RoW6e!?}noa;%BD*S_EOi!C7CBmP@d ztuqCv?r-%2T_L`o(3VY`P~E!Vq+WGAv03{oJp5dS*(^>qqkXJdC|+RWPZI5UFfqq&!(D1oF9mk5)Y9R^Zj#D7 zA~delTvlfhoX@rw7%dS`Ol`{PLL|nW&dbZx&Xw9Ec>X|Kbi*02O6zZQsj+kcf}{H< zmvql6tpcnwak#kB+Aglb#OD54NarI_oBmRdED3MA}s>w~RIVThSqDtkOUSI5Nb!ws*xAJSELI$rO`A5xVOp}B}FDfP&F zi2NyDp9(cyxI*z}w<}772^G%Y{fMJ9}$<$w~dE0oBE_G zr6HEBwHO4RJCAPFuk8uKP(xs4=r)ApPw>cm@q6nb{W?}EO#e@MSjTpZ0+ZK&b?#2mY>`eEWqOm?5$!pT}Q)M6WVt*u{^qkmCFTne~Zd z*l#Q{+`(C)mB_synCmf~u=YKow?cOezP?}@NO1k5dcL7<-w+$%^SvIvI08JX*F@od zepJshaD^c@ulLMa!yZFd_sm)kKBgBNuzn#np?h#7<9*|^$MgWKAC_TSKYE+)HE<;% zHkntVHQbSOn?A-^#D3hS4>2I6AvTAjuk*PhM?Q|ezR--3Lx>Fj`#3t1=iQJ{6XL2d zkvUF!LLUb;L(tO3Cr~(t3=FaUQGpOH?b#>vQHJjj;)3EQb$oTj6i)SqSpUE~(U*_@ zeR)W#=PrV{Con4?d{R$iPqg^Ei)Tavr=CO&>PN>qQ$nnZ1dxhx5A@~F1dQt3elZQ2 z9@p{L6Hn>{rft_1xNzQ{4R>zW2O64{gji!iKjd)b4{t|5aOE*fF;KoE;#Y5oH5Sw? zhg5~mF@xpZ9F61}!1}FbDkE6X) zZ|}@#@9f*&nc3c%)!v!i-kHv_Fzal>??~`{x;j-Me zaM!ZjG^Z7R1#)z4?g_!`5EHCRX3Z9>B0p3{ez+#`Lq+6=^6-1N=N4t#*bCwNVb|8& H@B021jrH(u diff --git a/reports/current.md b/reports/current.md index ca16166..e551db1 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-26 20:50:51 UTC +Generated: 2026-02-26 21:02:04 UTC ## Modules (12 total) @@ -13,9 +13,9 @@ Generated: 2026-02-26 20:50:51 UTC | Status | Count | |--------|-------| -| complete | 1601 | -| n_a | 82 | -| not_started | 1897 | +| complete | 1736 | +| n_a | 77 | +| not_started | 1767 | | stub | 93 | ## Unit Tests (3257 total) @@ -36,4 +36,4 @@ Generated: 2026-02-26 20:50:51 UTC ## Overall Progress -**2194/6942 items complete (31.6%)** +**2324/6942 items complete (33.5%)** diff --git a/reports/report_77403e3.md b/reports/report_77403e3.md new file mode 100644 index 0000000..e551db1 --- /dev/null +++ b/reports/report_77403e3.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-26 21:02:04 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| complete | 11 | +| not_started | 1 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 1736 | +| n_a | 77 | +| not_started | 1767 | +| stub | 93 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 319 | +| n_a | 181 | +| not_started | 2533 | +| stub | 224 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**2324/6942 items complete (33.5%)**