Files
natsnet/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs
2026-02-28 07:17:01 -05:00

2284 lines
69 KiB
C#

// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/memstore.go
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
/// <summary>
/// In-memory implementation of <see cref="IStreamStore"/>.
/// Stores all messages in a Dictionary keyed by sequence number.
/// Not production-complete: complex methods are stubbed for later sessions.
/// </summary>
public sealed class JetStreamMemStore : IStreamStore
{
// -----------------------------------------------------------------------
// Fields
// -----------------------------------------------------------------------
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
private StreamConfig _cfg;
private StreamState _state = new();
// Primary message store: seq -> StoreMsg
private Dictionary<ulong, StoreMsg>? _msgs;
// Per-subject state: subject -> SimpleState
private readonly SubjectTree<SimpleState> _fss;
// Deleted sequence set
private SequenceSet _dmap = new();
// Max messages per subject (cfg.MaxMsgsPer)
private long _maxp;
// Registered callbacks
private StorageUpdateHandler? _scb;
private StorageRemoveMsgHandler? _rmcb;
private ProcessJetStreamMsgHandler? _pmsgcb;
// Age-check timer for MaxAge expiry
private Timer? _ageChk;
private long _ageChkTime;
// Consumer count
private int _consumers;
// TTL hash wheel (only created when cfg.AllowMsgTTL)
private HashWheel? _ttls;
// Message scheduling (only created when cfg.AllowMsgSchedules)
private MsgScheduling? _scheduling;
// Subject deletion metadata for cluster consensus
private StreamDeletionMeta _sdm = new();
// Guard against re-entrant age check
private bool _ageChkRun;
// -----------------------------------------------------------------------
// Constructor
// -----------------------------------------------------------------------
/// <summary>
/// Creates a new in-memory store from the supplied <paramref name="cfg"/>.
/// </summary>
/// <exception cref="ArgumentException">Thrown when cfg is null or Storage != MemoryStorage.</exception>
public JetStreamMemStore(StreamConfig cfg)
{
if (cfg == null)
throw new ArgumentException("config required");
if (cfg.Storage != StorageType.MemoryStorage)
throw new ArgumentException("JetStreamMemStore requires MemoryStorage type in config");
_cfg = cfg.Clone();
_msgs = new Dictionary<ulong, StoreMsg>();
_fss = new SubjectTree<SimpleState>();
_maxp = cfg.MaxMsgsPer;
if (cfg.FirstSeq > 0)
{
// Set the initial state so that the first StoreMsg call assigns seq = cfg.FirstSeq.
_state.LastSeq = cfg.FirstSeq - 1;
_state.FirstSeq = cfg.FirstSeq;
}
if (cfg.AllowMsgTTL)
_ttls = HashWheel.NewHashWheel();
if (cfg.AllowMsgSchedules)
_scheduling = new MsgScheduling(RunMsgScheduling);
}
/// <summary>
/// Factory that mirrors Go <c>newMemStore</c> mapping semantics.
/// </summary>
public static JetStreamMemStore NewMemStore(StreamConfig cfg)
{
var ms = new JetStreamMemStore(cfg);
if (cfg.FirstSeq > 0)
{
var (_, err) = ms.PurgeInternal(cfg.FirstSeq);
if (err != null)
throw err;
}
return ms;
}
// -----------------------------------------------------------------------
// IStreamStore — store / load
// -----------------------------------------------------------------------
/// <inheritdoc/>
public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[]? msg, long ttl)
{
_mu.EnterWriteLock();
try
{
var seq = _state.LastSeq + 1;
// Use 100-nanosecond Ticks for higher timestamp precision.
// Nanoseconds since Unix epoch: (Ticks - UnixEpochTicks) * 100
const long UnixEpochTicks = 621355968000000000L;
var ts = (DateTimeOffset.UtcNow.UtcTicks - UnixEpochTicks) * 100L;
try
{
StoreRawMsgLocked(subject, hdr, msg, seq, ts, ttl, discardNewCheck: true);
_scb?.Invoke(1, (long)MsgSize(subject, hdr, msg), seq, subject);
return (seq, ts);
}
catch
{
return (0, 0);
}
}
finally
{
_mu.ExitWriteLock();
}
}
/// <inheritdoc/>
public void StoreRawMsg(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck)
{
_mu.EnterWriteLock();
try
{
StoreRawMsgLocked(subject, hdr, msg, seq, ts, ttl, discardNewCheck);
_scb?.Invoke(1, (long)MsgSize(subject, hdr, msg), seq, subject);
}
finally
{
_mu.ExitWriteLock();
}
}
// Lock must be held.
private void StoreRawMsgLocked(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck)
{
if (_msgs == null)
throw StoreErrors.ErrStoreClosed;
hdr ??= Array.Empty<byte>();
msg ??= Array.Empty<byte>();
// Determine if we are at the per-subject limit.
bool atSubjectLimit = false;
if (_maxp > 0 && !string.IsNullOrEmpty(subject))
{
var subjectBytesCheck = Encoding.UTF8.GetBytes(subject);
var (ssCheck, foundCheck) = _fss.Find(subjectBytesCheck);
if (foundCheck && ssCheck != null)
atSubjectLimit = ssCheck.Msgs >= (ulong)_maxp;
}
// Discard-new enforcement
if (discardNewCheck && _cfg.Discard == DiscardPolicy.DiscardNew)
{
if (atSubjectLimit && _cfg.DiscardNewPer)
throw StoreErrors.ErrMaxMsgsPerSubject;
if (_cfg.MaxMsgs > 0 && _state.Msgs >= (ulong)_cfg.MaxMsgs && !atSubjectLimit)
throw StoreErrors.ErrMaxMsgs;
if (_cfg.MaxBytes > 0 && _state.Bytes + MsgSize(subject, hdr, msg) > (ulong)_cfg.MaxBytes && !atSubjectLimit)
throw StoreErrors.ErrMaxBytes;
}
if (seq != _state.LastSeq + 1)
{
if (seq > 0)
throw StoreErrors.ErrSequenceMismatch;
seq = _state.LastSeq + 1;
}
var now = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime;
if (_state.Msgs == 0)
{
_state.FirstSeq = seq;
_state.FirstTime = now;
}
// Build combined buffer
var buf = new byte[hdr.Length + msg.Length];
if (hdr.Length > 0)
Buffer.BlockCopy(hdr, 0, buf, 0, hdr.Length);
if (msg.Length > 0)
Buffer.BlockCopy(msg, 0, buf, hdr.Length, msg.Length);
var sm = new StoreMsg
{
Subject = subject,
Hdr = hdr.Length > 0 ? buf[..hdr.Length] : Array.Empty<byte>(),
Msg = buf[hdr.Length..],
Buf = buf,
Seq = seq,
Ts = ts,
};
_msgs[seq] = sm;
_state.Msgs++;
_state.Bytes += MsgSize(subject, hdr, msg);
_state.LastSeq = seq;
_state.LastTime = now;
// Per-subject tracking
if (!string.IsNullOrEmpty(subject))
{
var subjectBytes = Encoding.UTF8.GetBytes(subject);
var (ss, found) = _fss.Find(subjectBytes);
if (found && ss != null)
{
ss.Msgs++;
ss.Last = seq;
ss.LastNeedsUpdate = false;
// Enforce per-subject limit
if (_maxp > 0 && ss.Msgs > (ulong)_maxp)
EnforcePerSubjectLimit(subject, ss);
}
else
{
_fss.Insert(subjectBytes, new SimpleState { Msgs = 1, First = seq, Last = seq });
}
}
// Overall limits
EnforceMsgLimit();
EnforceBytesLimit();
// Per-message TTL tracking.
if (_ttls != null && ttl > 0)
{
var expires = ts + (ttl * 1_000_000_000L);
_ttls.Add(seq, expires);
}
// Age check timer management.
if (_ttls != null && ttl > 0)
{
ResetAgeChk(0);
}
else if (_ageChk == null && (_cfg.MaxAge > TimeSpan.Zero || _ttls != null))
{
StartAgeChk();
}
// Message scheduling.
if (_scheduling != null && hdr.Length > 0)
{
var (schedule, ok) = JetStreamHeaderHelpers.NextMessageSchedule(hdr, ts);
if (ok && schedule != default)
{
_scheduling.Add(seq, subject, schedule.Ticks * 100L);
}
else
{
_scheduling.RemoveSubject(subject);
}
// Check for a repeating schedule.
var scheduleNextSlice = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsScheduleNext, hdr);
if (scheduleNextSlice != null && scheduleNextSlice.Value.Length > 0)
{
var scheduleNext = Encoding.ASCII.GetString(scheduleNextSlice.Value.Span);
if (scheduleNext != NatsHeaderConstants.JsScheduleNextPurge)
{
var scheduler = JetStreamHeaderHelpers.GetMessageScheduler(hdr);
if (DateTime.TryParse(scheduleNext, null, System.Globalization.DateTimeStyles.RoundtripKind, out var next)
&& !string.IsNullOrEmpty(scheduler))
{
_scheduling.Update(scheduler, next.ToUniversalTime().Ticks * 100L);
}
}
}
}
}
/// <inheritdoc/>
public StoreMsg? LoadMsg(ulong seq, StoreMsg? sm)
{
_mu.EnterReadLock();
try
{
return LoadMsgLocked(seq, sm);
}
finally
{
_mu.ExitReadLock();
}
}
private StoreMsg? LoadMsgLocked(ulong seq, StoreMsg? sm)
{
if (_msgs == null)
throw StoreErrors.ErrStoreClosed;
if (!_msgs.TryGetValue(seq, out var stored) || stored == null)
{
if (seq <= _state.LastSeq)
throw StoreErrors.ErrStoreMsgNotFound;
throw StoreErrors.ErrStoreEOF;
}
sm ??= new StoreMsg();
sm.CopyFrom(stored);
return sm;
}
/// <inheritdoc/>
public (StoreMsg? Sm, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? smp)
{
_mu.EnterWriteLock();
try
{
return LoadNextMsgLocked(filter, wc, start, smp);
}
finally
{
_mu.ExitWriteLock();
}
}
private (StoreMsg? Sm, ulong Skip) LoadNextMsgLocked(string filter, bool wc, ulong start, StoreMsg? smp)
{
if (_msgs == null || _state.Msgs == 0)
return (null, _state.LastSeq);
if (start < _state.FirstSeq)
start = _state.FirstSeq;
if (start > _state.LastSeq)
return (null, _state.LastSeq);
var isAll = string.IsNullOrEmpty(filter) || filter == ">";
for (var nseq = start; nseq <= _state.LastSeq; nseq++)
{
if (!_msgs.TryGetValue(nseq, out var sm) || sm == null)
continue;
if (isAll || (wc ? MatchLiteral(sm.Subject, filter) : sm.Subject == filter))
{
smp ??= new StoreMsg();
smp.CopyFrom(sm);
return (smp, nseq);
}
}
return (null, _state.LastSeq);
}
/// <inheritdoc/>
public (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp)
{
// TODO: session 17 — implement gsl.SimpleSublist equivalent
_mu.EnterReadLock();
try
{
if (_msgs == null || _state.Msgs == 0)
return (null, _state.LastSeq);
if (start < _state.FirstSeq)
start = _state.FirstSeq;
if (start > _state.LastSeq)
return (null, _state.LastSeq);
for (var nseq = start; nseq <= _state.LastSeq; nseq++)
{
if (_msgs.TryGetValue(nseq, out var sm) && sm != null)
{
smp ??= new StoreMsg();
smp.CopyFrom(sm);
return (smp, nseq);
}
}
return (null, _state.LastSeq);
}
finally
{
_mu.ExitReadLock();
}
}
/// <inheritdoc/>
public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm)
{
_mu.EnterWriteLock();
try
{
return LoadLastLocked(subject, sm);
}
finally
{
_mu.ExitWriteLock();
}
}
private StoreMsg? LoadLastLocked(string subject, StoreMsg? sm)
{
if (_msgs == null)
throw StoreErrors.ErrStoreClosed;
StoreMsg? stored = null;
if (string.IsNullOrEmpty(subject) || subject == ">")
{
_msgs.TryGetValue(_state.LastSeq, out stored);
}
else
{
var (ss, found) = _fss.Find(Encoding.UTF8.GetBytes(subject));
if (found && ss != null && ss.Msgs > 0)
{
if (ss.LastNeedsUpdate)
RecalculateForSubj(subject, ss);
_msgs.TryGetValue(ss.Last, out stored);
}
}
if (stored == null)
throw StoreErrors.ErrStoreMsgNotFound;
sm ??= new StoreMsg();
sm.CopyFrom(stored);
return sm;
}
/// <inheritdoc/>
public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp)
{
_mu.EnterReadLock();
try
{
if (_msgs == null)
return (null, StoreErrors.ErrStoreClosed);
if (_state.Msgs == 0 || start < _state.FirstSeq)
return (null, StoreErrors.ErrStoreEOF);
if (start > _state.LastSeq)
start = _state.LastSeq;
for (var seq = start; seq >= _state.FirstSeq; seq--)
{
if (_msgs.TryGetValue(seq, out var sm) && sm != null)
{
smp ??= new StoreMsg();
smp.CopyFrom(sm);
return (smp, null);
}
if (seq == 0) break;
}
return (null, StoreErrors.ErrStoreEOF);
}
finally
{
_mu.ExitReadLock();
}
}
/// <inheritdoc/>
public (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp)
{
// TODO: session 17 — implement gsl.SimpleSublist equivalent
_mu.EnterReadLock();
try
{
if (_msgs == null || _state.Msgs == 0 || start < _state.FirstSeq)
return (null, _state.FirstSeq, StoreErrors.ErrStoreEOF);
if (start > _state.LastSeq)
start = _state.LastSeq;
for (var nseq = start; nseq >= _state.FirstSeq; nseq--)
{
if (_msgs.TryGetValue(nseq, out var sm) && sm != null)
{
smp ??= new StoreMsg();
smp.CopyFrom(sm);
return (smp, nseq, null);
}
if (nseq == 0) break;
}
return (null, _state.LastSeq, StoreErrors.ErrStoreEOF);
}
finally
{
_mu.ExitReadLock();
}
}
// -----------------------------------------------------------------------
// IStreamStore — skip
// -----------------------------------------------------------------------
/// <inheritdoc/>
public (ulong Seq, Exception? Error) SkipMsg(ulong seq)
{
_mu.EnterWriteLock();
try
{
if (seq != _state.LastSeq + 1)
{
if (seq > 0)
return (0, StoreErrors.ErrSequenceMismatch);
seq = _state.LastSeq + 1;
}
_state.LastSeq = seq;
_state.LastTime = DateTime.UtcNow;
if (_state.Msgs == 0)
{
_state.FirstSeq = seq + 1;
_state.FirstTime = default;
}
else
{
_dmap.Insert(seq);
}
return (seq, null);
}
finally
{
_mu.ExitWriteLock();
}
}
/// <inheritdoc/>
public void SkipMsgs(ulong seq, ulong num)
{
_mu.EnterWriteLock();
try
{
if (seq != _state.LastSeq + 1)
{
if (seq > 0)
throw StoreErrors.ErrSequenceMismatch;
seq = _state.LastSeq + 1;
}
var lseq = seq + num - 1;
_state.LastSeq = lseq;
_state.LastTime = DateTime.UtcNow;
if (_state.Msgs == 0)
{
_state.FirstSeq = lseq + 1;
_state.FirstTime = default;
}
else
{
for (; seq <= lseq; seq++)
_dmap.Insert(seq);
}
}
finally
{
_mu.ExitWriteLock();
}
}
// -----------------------------------------------------------------------
// IStreamStore — remove
// -----------------------------------------------------------------------
/// <inheritdoc/>
public (bool Removed, Exception? Error) RemoveMsg(ulong seq)
{
_mu.EnterWriteLock();
try
{
return (RemoveMsgLocked(seq, secure: false), null);
}
finally
{
if (_mu.IsWriteLockHeld)
_mu.ExitWriteLock();
}
}
/// <inheritdoc/>
public (bool Removed, Exception? Error) EraseMsg(ulong seq)
{
_mu.EnterWriteLock();
try
{
return (RemoveMsgLocked(seq, secure: true), null);
}
finally
{
if (_mu.IsWriteLockHeld)
_mu.ExitWriteLock();
}
}
// Lock must be held.
private bool RemoveMsgLocked(ulong seq, bool secure)
{
if (_msgs == null || !_msgs.TryGetValue(seq, out var sm) || sm == null)
return false;
var size = MsgSize(sm.Subject, sm.Hdr, sm.Msg);
if (_state.Msgs > 0)
{
_state.Msgs--;
if (size > _state.Bytes)
size = _state.Bytes;
_state.Bytes -= size;
}
_dmap.Insert(seq);
UpdateFirstSeq(seq);
RemoveSeqPerSubject(sm.Subject, seq);
// Remove TTL entry from hash wheel if applicable.
if (_ttls != null && sm.Hdr.Length > 0)
{
var (ttl, err) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr);
if (err == null && ttl > 0)
{
var expires = sm.Ts + (ttl * 1_000_000_000L);
_ttls.Remove(seq, expires);
}
}
if (secure)
{
if (sm.Hdr.Length > 0)
Array.Clear(sm.Hdr);
if (sm.Msg.Length > 0)
Array.Clear(sm.Msg);
sm.Seq = 0;
sm.Ts = 0;
}
_msgs.Remove(seq);
// Invoke update callback — we temporarily release/reacquire write lock
var cb = _scb;
if (cb != null)
{
var subj = sm.Subject;
var delta = size;
_mu.ExitWriteLock();
try { cb(-1, -(long)delta, seq, subj); }
finally { _mu.EnterWriteLock(); }
}
return true;
}
// -----------------------------------------------------------------------
// IStreamStore — purge / compact / truncate
// -----------------------------------------------------------------------
/// <inheritdoc/>
public (ulong Purged, Exception? Error) Purge()
{
_mu.EnterWriteLock();
ulong purged;
long bytes;
ulong fseq;
StorageUpdateHandler? cb;
try
{
purged = (ulong)(_msgs?.Count ?? 0);
bytes = (long)_state.Bytes;
fseq = _state.LastSeq + 1;
_state.FirstSeq = fseq;
_state.LastSeq = fseq - 1;
_state.FirstTime = default;
_state.Bytes = 0;
_state.Msgs = 0;
_msgs = new Dictionary<ulong, StoreMsg>();
_dmap = new SequenceSet();
cb = _scb;
}
finally
{
_mu.ExitWriteLock();
}
cb?.Invoke(-(long)purged, -bytes, 0, string.Empty);
return (purged, null);
}
/// <inheritdoc/>
public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep)
{
var isAll = string.IsNullOrEmpty(subject) || subject == ">";
if (isAll)
{
if (keep == 0 && seq == 0)
return Purge();
if (seq > 1)
return Compact(seq);
if (keep > 0)
{
ulong msgs, lseq;
_mu.EnterReadLock();
msgs = _state.Msgs;
lseq = _state.LastSeq;
_mu.ExitReadLock();
if (keep >= msgs)
return (0, null);
return Compact(lseq - keep + 1);
}
return (0, null);
}
// Subject-filtered purge
var ss = FilteredState(1, subject);
if (ss.Msgs == 0)
return (0, null);
if (keep > 0)
{
if (keep >= ss.Msgs)
return (0, null);
ss.Msgs -= keep;
}
var last = ss.Last;
if (seq > 1)
last = seq - 1;
ulong purged = 0;
_mu.EnterWriteLock();
try
{
if (_msgs == null)
return (0, null);
for (var s = ss.First; s <= last; s++)
{
if (_msgs.TryGetValue(s, out var sm) && sm != null && sm.Subject == subject)
{
if (RemoveMsgLocked(s, false))
{
purged++;
if (purged >= ss.Msgs)
break;
}
}
}
}
finally
{
if (_mu.IsWriteLockHeld)
_mu.ExitWriteLock();
}
return (purged, null);
}
/// <inheritdoc/>
public (ulong Purged, Exception? Error) Compact(ulong seq)
{
if (seq == 0)
return Purge();
ulong purged = 0;
ulong bytes = 0;
StorageUpdateHandler? cb = null;
_mu.EnterWriteLock();
try
{
if (_msgs == null || _state.FirstSeq > seq)
return (0, null);
cb = _scb;
if (seq <= _state.LastSeq)
{
var fseq = _state.FirstSeq;
for (var s = seq; s <= _state.LastSeq; s++)
{
if (_msgs.TryGetValue(s, out var sm2) && sm2 != null)
{
_state.FirstSeq = s;
_state.FirstTime = DateTimeOffset.FromUnixTimeMilliseconds(sm2.Ts / 1_000_000L).UtcDateTime;
break;
}
}
for (var s = seq - 1; s >= fseq; s--)
{
if (_msgs.TryGetValue(s, out var sm2) && sm2 != null)
{
bytes += MsgSize(sm2.Subject, sm2.Hdr, sm2.Msg);
purged++;
RemoveSeqPerSubject(sm2.Subject, s);
_msgs.Remove(s);
}
else if (!_dmap.IsEmpty)
{
_dmap.Delete(s);
}
if (s == 0) break;
}
if (purged > _state.Msgs) purged = _state.Msgs;
_state.Msgs -= purged;
if (bytes > _state.Bytes) bytes = _state.Bytes;
_state.Bytes -= bytes;
}
else
{
purged = (ulong)_msgs.Count;
bytes = _state.Bytes;
_state.Bytes = 0;
_state.Msgs = 0;
_state.FirstSeq = seq;
_state.FirstTime = default;
_state.LastSeq = seq - 1;
_msgs = new Dictionary<ulong, StoreMsg>();
_dmap = new SequenceSet();
}
}
finally
{
_mu.ExitWriteLock();
}
cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty);
return (purged, null);
}
/// <inheritdoc/>
public void Truncate(ulong seq)
{
StorageUpdateHandler? cb;
ulong purged = 0;
ulong bytes = 0;
_mu.EnterWriteLock();
try
{
if (_msgs == null)
return;
cb = _scb;
if (seq == 0)
{
// Full reset
purged = (ulong)_msgs.Count;
bytes = _state.Bytes;
_state = new StreamState();
_msgs = new Dictionary<ulong, StoreMsg>();
_dmap = new SequenceSet();
_fss.Reset();
}
else
{
DateTime lastTime = _state.LastTime;
if (_msgs.TryGetValue(seq, out var lsm) && lsm != null)
lastTime = DateTimeOffset.FromUnixTimeMilliseconds(lsm.Ts / 1_000_000L).UtcDateTime;
for (var i = _state.LastSeq; i > seq; i--)
{
if (_msgs.TryGetValue(i, out var sm) && sm != null)
{
purged++;
bytes += MsgSize(sm.Subject, sm.Hdr, sm.Msg);
RemoveSeqPerSubject(sm.Subject, i);
_msgs.Remove(i);
}
else if (!_dmap.IsEmpty)
{
_dmap.Delete(i);
}
}
_state.LastSeq = seq;
_state.LastTime = lastTime;
if (purged > _state.Msgs) purged = _state.Msgs;
_state.Msgs -= purged;
if (bytes > _state.Bytes) bytes = _state.Bytes;
_state.Bytes -= bytes;
}
}
finally
{
_mu.ExitWriteLock();
}
cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty);
}
// -----------------------------------------------------------------------
// IStreamStore — state
// -----------------------------------------------------------------------
/// <inheritdoc/>
public StreamState State()
{
_mu.EnterReadLock();
try
{
var state = new StreamState
{
Msgs = _state.Msgs,
Bytes = _state.Bytes,
FirstSeq = _state.FirstSeq,
FirstTime = _state.FirstTime,
LastSeq = _state.LastSeq,
LastTime = _state.LastTime,
Consumers = _consumers,
NumSubjects = _fss.Size(),
};
var numDeleted = (int)((state.LastSeq - state.FirstSeq + 1) - state.Msgs);
if (numDeleted < 0) numDeleted = 0;
if (numDeleted > 0)
{
var deleted = new List<ulong>(numDeleted);
var fseq = state.FirstSeq;
var lseq = state.LastSeq;
_dmap.Range(seq =>
{
if (seq >= fseq && seq <= lseq)
deleted.Add(seq);
return true;
});
state.Deleted = deleted.ToArray();
state.NumDeleted = deleted.Count;
}
return state;
}
finally
{
_mu.ExitReadLock();
}
}
/// <inheritdoc/>
public void FastState(StreamState state)
{
_mu.EnterReadLock();
try
{
state.Msgs = _state.Msgs;
state.Bytes = _state.Bytes;
state.FirstSeq = _state.FirstSeq;
state.FirstTime = _state.FirstTime;
state.LastSeq = _state.LastSeq;
state.LastTime = _state.LastTime;
if (state.LastSeq > state.FirstSeq)
{
state.NumDeleted = (int)((state.LastSeq - state.FirstSeq + 1) - state.Msgs);
if (state.NumDeleted < 0) state.NumDeleted = 0;
}
state.Consumers = _consumers;
state.NumSubjects = _fss.Size();
}
finally
{
_mu.ExitReadLock();
}
}
// -----------------------------------------------------------------------
// IStreamStore — filtered / pending state
// -----------------------------------------------------------------------
/// <inheritdoc/>
public SimpleState FilteredState(ulong seq, string subject)
{
_mu.EnterWriteLock();
try
{
return FilteredStateLocked(seq, subject);
}
finally
{
_mu.ExitWriteLock();
}
}
private SimpleState FilteredStateLocked(ulong sseq, string filter)
{
if (sseq < _state.FirstSeq) sseq = _state.FirstSeq;
if (sseq > _state.LastSeq) return new SimpleState();
if (string.IsNullOrEmpty(filter)) filter = ">";
var isAll = filter == ">";
if (isAll && sseq <= _state.FirstSeq)
return new SimpleState { Msgs = _state.Msgs, First = _state.FirstSeq, Last = _state.LastSeq };
var ss = new SimpleState();
var havePartial = false;
_fss.Match(Encoding.UTF8.GetBytes(filter), (subj, fss) =>
{
if (fss.FirstNeedsUpdate || fss.LastNeedsUpdate)
RecalculateForSubj(Encoding.UTF8.GetString(subj), fss);
if (sseq <= fss.First)
{
// All messages in this subject are at or after sseq
ss.Msgs += fss.Msgs;
if (ss.First == 0 || fss.First < ss.First) ss.First = fss.First;
if (fss.Last > ss.Last) ss.Last = fss.Last;
}
else if (sseq <= fss.Last)
{
// Partial: sseq is inside this subject's range — need to scan
havePartial = true;
// Still track Last for the scan bounds
if (fss.Last > ss.Last) ss.Last = fss.Last;
}
// else sseq > fss.Last: all messages before sseq, skip
return true;
});
if (!havePartial)
return ss;
// Need to scan messages from sseq to ss.Last
if (_msgs == null)
return ss;
var scanFirst = sseq;
var scanLast = ss.Last;
if (scanLast == 0) scanLast = _state.LastSeq;
// Reset and rescan
ss = new SimpleState();
for (var seq = scanFirst; seq <= scanLast; seq++)
{
if (!_msgs.TryGetValue(seq, out var sm) || sm == null)
continue;
if (isAll || MatchLiteral(sm.Subject, filter))
{
ss.Msgs++;
if (ss.First == 0) ss.First = seq;
ss.Last = seq;
}
}
return ss;
}
/// <inheritdoc/>
public Dictionary<string, SimpleState> SubjectsState(string filterSubject)
{
_mu.EnterWriteLock();
try
{
if (_fss.Size() == 0) return new Dictionary<string, SimpleState>();
if (string.IsNullOrEmpty(filterSubject)) filterSubject = ">";
var result = new Dictionary<string, SimpleState>();
_fss.Match(Encoding.UTF8.GetBytes(filterSubject), (subj, ss) =>
{
var s = Encoding.UTF8.GetString(subj);
if (ss.FirstNeedsUpdate || ss.LastNeedsUpdate)
RecalculateForSubj(s, ss);
if (!result.TryGetValue(s, out var oss) || oss.First == 0)
result[s] = new SimpleState { Msgs = ss.Msgs, First = ss.First, Last = ss.Last };
else
{
oss.Last = ss.Last;
oss.Msgs += ss.Msgs;
result[s] = oss;
}
return true;
});
return result;
}
finally
{
_mu.ExitWriteLock();
}
}
/// <inheritdoc/>
public Dictionary<string, ulong> SubjectsTotals(string filterSubject)
{
_mu.EnterReadLock();
try
{
if (_fss.Size() == 0) return new Dictionary<string, ulong>();
if (string.IsNullOrEmpty(filterSubject)) filterSubject = ">";
var result = new Dictionary<string, ulong>();
_fss.Match(Encoding.UTF8.GetBytes(filterSubject), (subj, ss) =>
{
result[Encoding.UTF8.GetString(subj)] = ss.Msgs;
return true;
});
return result;
}
finally
{
_mu.ExitReadLock();
}
}
/// <inheritdoc/>
public (ulong Total, ulong ValidThrough, Exception? Error) NumPending(ulong sseq, string filter, bool lastPerSubject)
{
_mu.EnterWriteLock();
try
{
var ss = FilteredStateLocked(sseq, filter);
return (ss.Msgs, _state.LastSeq, null);
}
finally
{
_mu.ExitWriteLock();
}
}
/// <inheritdoc/>
public (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject)
{
// TODO: session 17 — implement gsl.SimpleSublist equivalent
return NumPending(sseq, ">", lastPerSubject);
}
/// <inheritdoc/>
public (ulong[] Seqs, Exception? Error) AllLastSeqs()
{
_mu.EnterReadLock();
try
{
return AllLastSeqsLocked();
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Returns sorted per-subject last sequences without taking locks.
/// Mirrors Go <c>allLastSeqsLocked</c>.
/// </summary>
private (ulong[] Seqs, Exception? Error) AllLastSeqsLocked()
{
if (_msgs == null || _msgs.Count == 0) return (Array.Empty<ulong>(), null);
var seqs = new List<ulong>(_fss.Size());
_fss.IterFast((subj, ss) =>
{
if (ss.LastNeedsUpdate)
RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
seqs.Add(ss.Last);
return true;
});
seqs.Sort();
return (seqs.ToArray(), null);
}
/// <inheritdoc/>
public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed)
{
_mu.EnterReadLock();
try
{
if (_msgs == null || _msgs.Count == 0) return (Array.Empty<ulong>(), null);
if (maxSeq == 0) maxSeq = _state.LastSeq;
var seqs = new List<ulong>(64);
var seen = new HashSet<ulong>();
foreach (var filter in filters)
{
_fss.Match(Encoding.UTF8.GetBytes(filter), (subj, ss) =>
{
if (ss.LastNeedsUpdate)
RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
if (ss.Last <= maxSeq)
{
if (seen.Add(ss.Last))
seqs.Add(ss.Last);
}
else if (ss.Msgs > 1)
{
// Last is beyond maxSeq — scan backwards for the most recent msg <= maxSeq.
var s = Encoding.UTF8.GetString(subj);
for (var seq = maxSeq; seq > 0; seq--)
{
if (_msgs.TryGetValue(seq, out var sm) && sm != null && sm.Subject == s)
{
if (seen.Add(seq)) seqs.Add(seq);
break;
}
}
}
return true;
});
if (maxAllowed > 0 && seqs.Count > maxAllowed)
return (null!, StoreErrors.ErrTooManyResults);
}
seqs.Sort();
return (seqs.ToArray(), null);
}
finally
{
_mu.ExitReadLock();
}
}
/// <inheritdoc/>
public (string Subject, Exception? Error) SubjectForSeq(ulong seq)
{
_mu.EnterReadLock();
try
{
if (_msgs == null) return (string.Empty, StoreErrors.ErrStoreClosed);
if (seq < _state.FirstSeq) return (string.Empty, StoreErrors.ErrStoreMsgNotFound);
if (_msgs.TryGetValue(seq, out var sm) && sm != null)
return (sm.Subject, null);
return (string.Empty, StoreErrors.ErrStoreMsgNotFound);
}
finally
{
_mu.ExitReadLock();
}
}
// -----------------------------------------------------------------------
// IStreamStore — time / seq
// -----------------------------------------------------------------------
/// <inheritdoc/>
public ulong GetSeqFromTime(DateTime t)
{
// Use same 100-nanosecond precision as StoreMsg timestamps.
const long UnixEpochTicksGsft = 621355968000000000L;
var ts = (new DateTimeOffset(t, TimeSpan.Zero).UtcTicks - UnixEpochTicksGsft) * 100L;
_mu.EnterReadLock();
try
{
if (_msgs == null || _msgs.Count == 0)
return _state.LastSeq + 1;
if (!_msgs.TryGetValue(_state.FirstSeq, out var firstSm) || firstSm == null)
return _state.LastSeq + 1;
if (ts <= firstSm.Ts)
return _state.FirstSeq;
// Find actual last stored message
StoreMsg? lastSm = null;
for (var lseq = _state.LastSeq; lseq > _state.FirstSeq; lseq--)
{
if (_msgs.TryGetValue(lseq, out lastSm) && lastSm != null)
break;
}
if (lastSm == null) return _state.LastSeq + 1;
// Mirror Go: if ts == last ts return that seq; if ts > last ts return pastEnd.
if (ts == lastSm.Ts) return lastSm.Seq;
if (ts > lastSm.Ts) return _state.LastSeq + 1;
// Linear scan fallback
for (var seq = _state.FirstSeq; seq <= _state.LastSeq; seq++)
{
if (_msgs.TryGetValue(seq, out var sm) && sm != null && sm.Ts >= ts)
return seq;
}
return _state.LastSeq + 1;
}
finally
{
_mu.ExitReadLock();
}
}
// -----------------------------------------------------------------------
// IStreamStore — config, encoding, sync
// -----------------------------------------------------------------------
/// <inheritdoc/>
public void UpdateConfig(StreamConfig cfg)
{
if (cfg == null) throw new ArgumentException("config required");
_mu.EnterWriteLock();
try
{
_cfg = cfg.Clone();
// Clamp MaxMsgsPer to minimum of -1
if (_cfg.MaxMsgsPer < -1)
{
_cfg.MaxMsgsPer = -1;
cfg.MaxMsgsPer = -1;
}
var oldMaxp = _maxp;
_maxp = _cfg.MaxMsgsPer;
EnforceMsgLimit();
EnforceBytesLimit();
// Enforce per-subject limits if MaxMsgsPer was reduced or newly set
if (_maxp > 0 && (oldMaxp == 0 || _maxp < oldMaxp))
{
var lm = (ulong)_maxp;
_fss.IterFast((subj, ss) =>
{
if (ss.Msgs > lm)
EnforcePerSubjectLimit(Encoding.UTF8.GetString(subj), ss);
return true;
});
}
if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero)
StartAgeChk();
if (_ageChk != null && _cfg.MaxAge == TimeSpan.Zero)
{
_ageChk?.Dispose();
_ageChk = null;
_ageChkTime = 0;
}
}
finally
{
_mu.ExitWriteLock();
}
}
/// <inheritdoc/>
public void RegisterStorageUpdates(StorageUpdateHandler cb)
{
_mu.EnterWriteLock();
_scb = cb;
_mu.ExitWriteLock();
}
/// <inheritdoc/>
public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb)
{
_mu.EnterWriteLock();
_rmcb = cb;
_mu.ExitWriteLock();
}
/// <inheritdoc/>
public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb)
{
_mu.EnterWriteLock();
_pmsgcb = cb;
_mu.ExitWriteLock();
}
/// <inheritdoc/>
public void FlushAllPending()
{
// No-op: in-memory store doesn't use async applying.
}
/// <inheritdoc/>
public (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed)
{
// TODO: session 17 — binary encode using varint encoding matching Go
_mu.EnterReadLock();
try
{
// Minimal encoded representation (stub): magic + version bytes
return (new byte[] { 42, 1 }, null);
}
finally
{
_mu.ExitReadLock();
}
}
/// <inheritdoc/>
public void SyncDeleted(DeleteBlocks dbs)
{
_mu.EnterWriteLock();
try
{
if (dbs.Count == 1)
{
var (min, max, num) = _dmap.State();
var (pmin, pmax, pnum) = dbs[0].GetState();
if (pmin == min && pmax == max && pnum == num)
return;
}
var lseq = _state.LastSeq;
foreach (var db in dbs)
{
var (first, _, _) = db.GetState();
if (first > lseq) continue;
db.Range(seq => { RemoveMsgLocked(seq, false); return true; });
}
}
finally
{
if (_mu.IsWriteLockHeld)
_mu.ExitWriteLock();
}
}
/// <inheritdoc/>
public void ResetState()
{
_mu.EnterWriteLock();
try
{
_scheduling?.ClearInflight();
}
finally
{
_mu.ExitWriteLock();
}
}
// -----------------------------------------------------------------------
// IStreamStore — type / stop / delete
// -----------------------------------------------------------------------
/// <inheritdoc/>
public StorageType Type() => StorageType.MemoryStorage;
/// <inheritdoc/>
public void Stop()
{
_mu.EnterWriteLock();
try
{
if (_msgs == null) return;
_ageChk?.Dispose();
_ageChk = null;
_ageChkTime = 0;
_msgs = null;
}
finally
{
_mu.ExitWriteLock();
}
Purge();
}
/// <inheritdoc/>
public void Delete(bool inline) => Stop();
// -----------------------------------------------------------------------
// IStreamStore — consumers
// -----------------------------------------------------------------------
/// <inheritdoc/>
public IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg)
{
if (_msgs == null)
throw StoreErrors.ErrStoreClosed;
if (cfg == null || string.IsNullOrEmpty(name))
throw new ArgumentException("bad consumer config");
var o = new ConsumerMemStore(this, cfg);
AddConsumer(o);
return o;
}
/// <inheritdoc/>
public void AddConsumer(IConsumerStore o)
{
_mu.EnterWriteLock();
_consumers++;
_mu.ExitWriteLock();
}
/// <inheritdoc/>
public void RemoveConsumer(IConsumerStore o)
{
_mu.EnterWriteLock();
if (_consumers > 0) _consumers--;
_mu.ExitWriteLock();
}
/// <inheritdoc/>
public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs)
{
// TODO: session 17 — not implemented for memory store
return (null, new NotImplementedException("no impl"));
}
/// <inheritdoc/>
public (ulong Total, ulong Reported, Exception? Error) Utilization()
{
_mu.EnterReadLock();
try
{
return (_state.Bytes, _state.Bytes, null);
}
finally
{
_mu.ExitReadLock();
}
}
// -----------------------------------------------------------------------
// Size helpers (static)
// -----------------------------------------------------------------------
/// <summary>
/// Computes raw message size from component lengths.
/// Mirrors Go <c>memStoreMsgSizeRaw</c>.
/// </summary>
internal static ulong MemStoreMsgSizeRaw(int slen, int hlen, int mlen)
=> (ulong)(slen + hlen + mlen + 16);
/// <summary>
/// Computes message size from actual values.
/// Mirrors Go <c>memStoreMsgSize</c> (the package-level function).
/// </summary>
internal static ulong MemStoreMsgSize(string subj, byte[]? hdr, byte[]? msg)
=> MemStoreMsgSizeRaw(subj.Length, hdr?.Length ?? 0, msg?.Length ?? 0);
// -----------------------------------------------------------------------
// Trivial helpers
// -----------------------------------------------------------------------
// Lock must be held.
private bool DeleteFirstMsg() => RemoveMsgLocked(_state.FirstSeq, false);
// Lock must be held.
private void DeleteFirstMsgOrPanic()
{
if (!DeleteFirstMsg())
throw new InvalidOperationException("jetstream memstore has inconsistent state, can't find first seq msg");
}
// Lock must be held.
private void CancelAgeChk()
{
if (_ageChk != null)
{
_ageChk.Dispose();
_ageChk = null;
_ageChkTime = 0;
}
}
/// <summary>
/// Returns true if a linear scan is preferable over subject tree lookup.
/// Mirrors Go <c>shouldLinearScan</c>.
/// </summary>
// Lock must be held.
private bool ShouldLinearScan(string filter, bool wc, ulong start)
{
const int LinearScanMaxFss = 256;
var isAll = filter == ">";
return isAll || 2 * (int)(_state.LastSeq - start) < _fss.Size() || (wc && _fss.Size() > LinearScanMaxFss);
}
/// <summary>
/// Returns true if the store is closed.
/// Mirrors Go <c>isClosed</c>.
/// </summary>
public bool IsClosed()
{
_mu.EnterReadLock();
try { return _msgs == null; }
finally { _mu.ExitReadLock(); }
}
/// <summary>
/// Checks if the filter represents all subjects (empty or "&gt;").
/// Mirrors Go <c>filterIsAll</c>.
/// </summary>
// Lock must be held.
private static bool FilterIsAll(string filter)
=> string.IsNullOrEmpty(filter) || filter == ">";
// -----------------------------------------------------------------------
// Low-complexity helpers
// -----------------------------------------------------------------------
/// <summary>
/// Returns per-subject message totals matching the filter.
/// Mirrors Go <c>subjectsTotalsLocked</c>.
/// </summary>
// Lock must be held.
private Dictionary<string, ulong> SubjectsTotalsLocked(string filterSubject)
{
if (_fss.Size() == 0)
return new Dictionary<string, ulong>();
if (string.IsNullOrEmpty(filterSubject))
filterSubject = ">";
var isAll = filterSubject == ">";
var result = new Dictionary<string, ulong>();
_fss.Match(Encoding.UTF8.GetBytes(filterSubject), (subj, ss) =>
{
result[Encoding.UTF8.GetString(subj)] = ss.Msgs;
return true;
});
return result;
}
/// <summary>
/// Finds literal subject match sequence bounds.
/// Returns (first, last, true) if found, or (0, 0, false) if not.
/// Mirrors Go <c>nextLiteralMatchLocked</c>.
/// </summary>
// Lock must be held.
private (ulong First, ulong Last, bool Found) NextLiteralMatchLocked(string filter, ulong start)
{
var (ss, ok) = _fss.Find(Encoding.UTF8.GetBytes(filter));
if (!ok || ss == null)
return (0, 0, false);
RecalculateForSubj(filter, ss);
if (start > ss.Last)
return (0, 0, false);
return (Math.Max(start, ss.First), ss.Last, true);
}
/// <summary>
/// Finds wildcard subject match sequence bounds using MatchUntil.
/// Mirrors Go <c>nextWildcardMatchLocked</c>.
/// </summary>
// Lock must be held.
private (ulong First, ulong Last, bool Found) NextWildcardMatchLocked(string filter, ulong start)
{
bool found = false;
ulong first = _state.LastSeq, last = 0;
_fss.MatchUntil(Encoding.UTF8.GetBytes(filter), (subj, ss) =>
{
RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
if (start > ss.Last)
return true;
found = true;
if (ss.First < first)
first = ss.First;
if (ss.Last > last)
last = ss.Last;
return first > start;
});
if (!found)
return (0, 0, false);
return (Math.Max(first, start), last, true);
}
// -----------------------------------------------------------------------
// SDM methods
// -----------------------------------------------------------------------
/// <summary>
/// Determines whether this sequence/subject should be processed as a subject deletion marker.
/// Returns (isLast, shouldProcess).
/// Mirrors Go <c>shouldProcessSdmLocked</c>.
/// </summary>
// Lock must be held.
private (bool IsLast, bool ShouldProcess) ShouldProcessSdmLocked(ulong seq, string subj)
{
if (_sdm.TryGetPending(seq, out var p))
{
var elapsed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L - p.Ts;
if (elapsed < 2_000_000_000L) // 2 seconds in nanoseconds
return (p.Last, false);
var last = p.Last;
if (last)
{
var msgs = SubjectsTotalsLocked(subj).GetValueOrDefault(subj, 0UL);
var numPending = _sdm.GetSubjectTotal(subj);
if (msgs > numPending)
last = false;
}
_sdm.SetPending(seq, new SdmBySeq { Last = last, Ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L });
return (last, true);
}
var msgCount = SubjectsTotalsLocked(subj).GetValueOrDefault(subj, 0UL);
if (msgCount == 0)
return (false, true);
var pending = _sdm.GetSubjectTotal(subj);
var remaining = msgCount - pending;
return (_sdm.TrackPending(seq, subj, remaining == 1), true);
}
/// <summary>
/// Lock-wrapping version of ShouldProcessSdmLocked.
/// Mirrors Go <c>shouldProcessSdm</c>.
/// </summary>
public (bool IsLast, bool ShouldProcess) ShouldProcessSdm(ulong seq, string subj)
{
_mu.EnterWriteLock();
try { return ShouldProcessSdmLocked(seq, subj); }
finally { _mu.ExitWriteLock(); }
}
/// <summary>
/// Handles message removal: if SDM mode, builds a marker header and invokes _pmsgcb;
/// otherwise invokes _rmcb.
/// Mirrors Go <c>handleRemovalOrSdm</c>.
/// </summary>
public void HandleRemovalOrSdm(ulong seq, string subj, bool sdm, long sdmTtl)
{
if (sdm)
{
var hdr = Encoding.ASCII.GetBytes(
$"NATS/1.0\r\n{NatsHeaderConstants.JsMarkerReason}: {NatsHeaderConstants.JsMarkerReasonMaxAge}\r\n" +
$"{NatsHeaderConstants.JsMessageTtl}: {TimeSpan.FromSeconds(sdmTtl)}\r\n" +
$"{NatsHeaderConstants.JsMsgRollup}: {NatsHeaderConstants.JsMsgRollupSubject}\r\n\r\n");
// In Go this builds an inMsg and calls pmsgcb. We pass a synthetic StoreMsg.
var msg = new StoreMsg { Subject = subj, Hdr = hdr, Msg = Array.Empty<byte>(), Seq = 0, Ts = 0 };
_pmsgcb?.Invoke(msg);
}
else
{
_rmcb?.Invoke(seq);
}
}
// -----------------------------------------------------------------------
// Age/TTL methods
// -----------------------------------------------------------------------
/// <summary>
/// Resets or arms the age check timer based on TTL and MaxAge.
/// Mirrors Go <c>resetAgeChk</c>.
/// </summary>
// Lock must be held.
private void ResetAgeChk(long delta)
{
if (_ageChkRun)
return;
long next = long.MaxValue;
if (_ttls != null)
next = _ttls.GetNextExpiration(next);
if (_cfg.MaxAge <= TimeSpan.Zero && next == long.MaxValue)
{
CancelAgeChk();
return;
}
var fireIn = _cfg.MaxAge;
if (delta == 0 && _state.Msgs > 0)
{
var until = TimeSpan.FromSeconds(2);
if (fireIn == TimeSpan.Zero || until < fireIn)
fireIn = until;
}
if (next < long.MaxValue)
{
var nextTicks = DateTime.UnixEpoch.Ticks + next / 100L;
var nextUtc = new DateTime(Math.Max(nextTicks, DateTime.UnixEpoch.Ticks), DateTimeKind.Utc);
var until = nextUtc - DateTime.UtcNow;
if (fireIn == TimeSpan.Zero || until < fireIn)
fireIn = until;
}
if (delta > 0)
{
var deltaDur = TimeSpan.FromTicks(delta / 100L);
if (fireIn == TimeSpan.Zero || deltaDur < fireIn)
fireIn = deltaDur;
}
if (fireIn < TimeSpan.FromMilliseconds(250))
fireIn = TimeSpan.FromMilliseconds(250);
var expires = DateTime.UtcNow.Ticks + fireIn.Ticks;
if (_ageChkTime > 0 && expires > _ageChkTime)
return;
_ageChkTime = expires;
if (_ageChk != null)
_ageChk.Change(fireIn, Timeout.InfiniteTimeSpan);
else
_ageChk = new Timer(_ => ExpireMsgs(), null, fireIn, Timeout.InfiniteTimeSpan);
}
/// <summary>
/// Recovers TTL state from existing messages after restart.
/// Mirrors Go <c>recoverTTLState</c>.
/// </summary>
// Lock must be held.
private void RecoverTTLState()
{
_ttls = HashWheel.NewHashWheel();
if (_state.Msgs == 0)
return;
try
{
var smp = new StoreMsg();
var seq = _state.FirstSeq;
while (seq <= _state.LastSeq)
{
if (_msgs != null && _msgs.TryGetValue(seq, out var sm) && sm != null)
{
if (sm.Hdr.Length > 0)
{
var (ttl, _) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr);
if (ttl > 0)
{
var expires = sm.Ts + (ttl * 1_000_000_000L);
_ttls.Add(seq, expires);
}
}
}
seq++;
}
}
finally
{
ResetAgeChk(0);
}
}
// -----------------------------------------------------------------------
// Scheduling methods
// -----------------------------------------------------------------------
/// <summary>
/// Recovers message scheduling state from existing messages after restart.
/// Mirrors Go <c>recoverMsgSchedulingState</c>.
/// </summary>
// Lock must be held.
private void RecoverMsgSchedulingState()
{
_scheduling = new MsgScheduling(RunMsgScheduling);
if (_state.Msgs == 0)
return;
try
{
var seq = _state.FirstSeq;
while (seq <= _state.LastSeq)
{
if (_msgs != null && _msgs.TryGetValue(seq, out var sm) && sm != null)
{
if (sm.Hdr.Length > 0)
{
var (schedule, ok) = JetStreamHeaderHelpers.NextMessageSchedule(sm.Hdr, sm.Ts);
if (ok && schedule != default)
{
_scheduling.Init(seq, sm.Subject, schedule.Ticks * 100L);
}
}
}
seq++;
}
}
finally
{
_scheduling.ResetTimer();
}
}
/// <summary>
/// Runs through scheduled messages and fires callbacks.
/// Mirrors Go <c>runMsgScheduling</c>.
/// </summary>
private void RunMsgScheduling()
{
_mu.EnterWriteLock();
try
{
if (_scheduling == null)
return;
if (_pmsgcb == null)
{
_scheduling.ResetTimer();
return;
}
// TODO: Implement getScheduledMessages integration when MsgScheduling
// supports the full callback-based message loading pattern.
// For now, reset the timer so scheduling continues to fire.
_scheduling.ResetTimer();
}
finally
{
if (_mu.IsWriteLockHeld)
_mu.ExitWriteLock();
}
}
// -----------------------------------------------------------------------
// Reset / Purge Internal
// -----------------------------------------------------------------------
/// <summary>
/// Completely resets the store. Clears all messages, state, fss, dmap, and sdm.
/// Mirrors Go <c>reset</c>.
/// </summary>
public Exception? Reset()
{
_mu.EnterWriteLock();
ulong purged = 0;
ulong bytes = 0;
StorageUpdateHandler? cb;
try
{
cb = _scb;
if (cb != null && _msgs != null)
{
foreach (var sm in _msgs.Values)
{
purged++;
bytes += MsgSize(sm.Subject, sm.Hdr, sm.Msg);
}
}
_state.FirstSeq = 0;
_state.FirstTime = default;
_state.LastSeq = 0;
_state.LastTime = DateTime.UtcNow;
_state.Msgs = 0;
_state.Bytes = 0;
_msgs = new Dictionary<ulong, StoreMsg>();
_fss.Reset();
_dmap = new SequenceSet();
_sdm.Empty();
}
finally
{
_mu.ExitWriteLock();
}
cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty);
return null;
}
/// <summary>
/// Internal purge with configurable first-sequence.
/// Mirrors Go <c>purge</c> (the internal version).
/// </summary>
// This is the internal purge used by cluster/raft — differs from the public Purge().
private (ulong Purged, Exception? Error) PurgeInternal(ulong fseq)
{
_mu.EnterWriteLock();
ulong purged;
long bytes;
StorageUpdateHandler? cb;
try
{
purged = (ulong)(_msgs?.Count ?? 0);
cb = _scb;
bytes = (long)_state.Bytes;
if (fseq == 0)
fseq = _state.LastSeq + 1;
else if (fseq < _state.LastSeq)
{
_mu.ExitWriteLock();
return (0, new InvalidOperationException("partial purges not supported on memory store"));
}
_state.FirstSeq = fseq;
_state.LastSeq = fseq - 1;
_state.FirstTime = default;
_state.Bytes = 0;
_state.Msgs = 0;
if (_msgs != null)
_msgs = new Dictionary<ulong, StoreMsg>();
_fss.Reset();
_dmap = new SequenceSet();
_sdm.Empty();
}
finally
{
if (_mu.IsWriteLockHeld)
_mu.ExitWriteLock();
}
cb?.Invoke(-(long)purged, -bytes, 0, string.Empty);
return (purged, null);
}
/// <summary>
/// Internal compact with SDM tracking.
/// Mirrors Go <c>compact</c> (the internal version).
/// </summary>
private (ulong Purged, Exception? Error) CompactInternal(ulong seq)
{
if (seq == 0)
return Purge();
ulong purged = 0;
ulong bytes = 0;
_mu.EnterWriteLock();
StorageUpdateHandler? cb;
try
{
if (_state.FirstSeq > seq)
return (0, null);
cb = _scb;
if (seq <= _state.LastSeq)
{
var fseq = _state.FirstSeq;
for (var s = seq; s <= _state.LastSeq; s++)
{
if (_msgs != null && _msgs.TryGetValue(s, out var sm2) && sm2 != null)
{
_state.FirstSeq = s;
_state.FirstTime = DateTimeOffset.FromUnixTimeMilliseconds(sm2.Ts / 1_000_000L).UtcDateTime;
break;
}
}
for (var s = seq - 1; s >= fseq; s--)
{
if (_msgs != null && _msgs.TryGetValue(s, out var sm2) && sm2 != null)
{
bytes += MsgSize(sm2.Subject, sm2.Hdr, sm2.Msg);
purged++;
RemoveSeqPerSubject(sm2.Subject, s);
_msgs.Remove(s);
}
else if (!_dmap.IsEmpty)
{
_dmap.Delete(s);
}
if (s == 0) break;
}
if (purged > _state.Msgs) purged = _state.Msgs;
_state.Msgs -= purged;
if (bytes > _state.Bytes) bytes = _state.Bytes;
_state.Bytes -= bytes;
}
else
{
purged = (ulong)(_msgs?.Count ?? 0);
bytes = _state.Bytes;
_state.Bytes = 0;
_state.Msgs = 0;
_state.FirstSeq = seq;
_state.FirstTime = default;
_state.LastSeq = seq - 1;
_msgs = new Dictionary<ulong, StoreMsg>();
_fss.Reset();
_dmap = new SequenceSet();
_sdm.Empty();
}
}
finally
{
if (_mu.IsWriteLockHeld)
_mu.ExitWriteLock();
}
cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty);
return (purged, null);
}
// -----------------------------------------------------------------------
// Private helpers
// -----------------------------------------------------------------------
// Lock must be held.
private void EnforceMsgLimit()
{
if (_cfg.Discard != DiscardPolicy.DiscardOld) return;
if (_cfg.MaxMsgs <= 0 || _state.Msgs <= (ulong)_cfg.MaxMsgs) return;
while (_state.Msgs > (ulong)_cfg.MaxMsgs)
RemoveMsgLocked(_state.FirstSeq, false);
}
// Lock must be held.
private void EnforceBytesLimit()
{
if (_cfg.Discard != DiscardPolicy.DiscardOld) return;
if (_cfg.MaxBytes <= 0 || _state.Bytes <= (ulong)_cfg.MaxBytes) return;
while (_state.Bytes > (ulong)_cfg.MaxBytes)
RemoveMsgLocked(_state.FirstSeq, false);
}
// Lock must be held.
private void EnforcePerSubjectLimit(string subj, SimpleState ss)
{
if (_maxp <= 0) return;
while (ss.Msgs > (ulong)_maxp)
{
if (ss.FirstNeedsUpdate || ss.LastNeedsUpdate)
RecalculateForSubj(subj, ss);
if (!RemoveMsgLocked(ss.First, false))
break;
}
}
// Lock must be held.
private void UpdateFirstSeq(ulong seq)
{
if (seq != _state.FirstSeq) return;
StoreMsg? nsm = null;
for (var nseq = _state.FirstSeq + 1; nseq <= _state.LastSeq; nseq++)
{
if (_msgs != null && _msgs.TryGetValue(nseq, out nsm) && nsm != null)
break;
}
var oldFirst = _state.FirstSeq;
if (nsm != null)
{
_state.FirstSeq = nsm.Seq;
_state.FirstTime = DateTimeOffset.FromUnixTimeMilliseconds(nsm.Ts / 1_000_000L).UtcDateTime;
}
else
{
_state.FirstSeq = _state.LastSeq + 1;
_state.FirstTime = default;
}
if (oldFirst == _state.FirstSeq - 1)
_dmap.Delete(oldFirst);
else
for (var s = oldFirst; s < _state.FirstSeq; s++)
_dmap.Delete(s);
}
// Lock must be held.
private void RemoveSeqPerSubject(string subj, ulong seq)
{
if (string.IsNullOrEmpty(subj)) return;
var subjectBytes = Encoding.UTF8.GetBytes(subj);
var (ss, found) = _fss.Find(subjectBytes);
if (!found || ss == null) return;
if (ss.Msgs == 1)
{
_fss.Delete(subjectBytes);
return;
}
ss.Msgs--;
if (ss.Msgs == 1)
{
if (!ss.LastNeedsUpdate && seq != ss.Last)
{
ss.First = ss.Last;
ss.FirstNeedsUpdate = false;
return;
}
if (!ss.FirstNeedsUpdate && seq != ss.First)
{
ss.Last = ss.First;
ss.LastNeedsUpdate = false;
return;
}
}
ss.FirstNeedsUpdate = seq == ss.First || ss.FirstNeedsUpdate;
ss.LastNeedsUpdate = seq == ss.Last || ss.LastNeedsUpdate;
}
// Lock must be held.
private void RecalculateForSubj(string subj, SimpleState ss)
{
if (_msgs == null) return;
if (ss.FirstNeedsUpdate)
{
var tseq = ss.First + 1;
if (tseq < _state.FirstSeq) tseq = _state.FirstSeq;
for (; tseq <= ss.Last; tseq++)
{
if (_msgs.TryGetValue(tseq, out var sm) && sm != null && sm.Subject == subj)
{
ss.First = tseq;
ss.FirstNeedsUpdate = false;
if (ss.Msgs == 1) { ss.Last = tseq; ss.LastNeedsUpdate = false; return; }
break;
}
}
}
if (ss.LastNeedsUpdate)
{
var tseq = ss.Last - 1;
if (tseq > _state.LastSeq) tseq = _state.LastSeq;
for (; tseq >= ss.First; tseq--)
{
if (_msgs.TryGetValue(tseq, out var sm) && sm != null && sm.Subject == subj)
{
ss.Last = tseq;
ss.LastNeedsUpdate = false;
if (ss.Msgs == 1) { ss.First = tseq; ss.FirstNeedsUpdate = false; }
return;
}
if (tseq == 0) break;
}
}
}
private void StartAgeChk()
{
if (_ageChk != null) return;
if (_cfg.MaxAge != TimeSpan.Zero)
_ageChk = new Timer(_ => ExpireMsgs(), null, _cfg.MaxAge, _cfg.MaxAge);
}
private void ExpireMsgs()
{
// TODO: session 17 — full age/TTL expiry logic
_mu.EnterWriteLock();
try
{
if (_msgs == null || _cfg.MaxAge == TimeSpan.Zero) return;
var minAge = DateTime.UtcNow - _cfg.MaxAge;
// Use same 100-nanosecond precision as StoreMsg timestamps.
const long UnixEpochTicksExp = 621355968000000000L;
var minTs = (new DateTimeOffset(minAge, TimeSpan.Zero).UtcTicks - UnixEpochTicksExp) * 100L;
var toRemove = new List<ulong>();
foreach (var kv in _msgs)
{
if (kv.Value.Ts <= minTs)
toRemove.Add(kv.Key);
}
foreach (var seq in toRemove)
RemoveMsgLocked(seq, false);
}
finally
{
if (_mu.IsWriteLockHeld)
_mu.ExitWriteLock();
}
}
private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg)
=> (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16);
private static bool MatchLiteral(string subject, string filter)
{
var sParts = subject.Split('.');
var fParts = filter.Split('.');
return IsSubsetMatch(sParts, fParts);
}
private static bool IsSubsetMatch(string[] subject, string[] filter)
{
var si = 0;
for (var fi = 0; fi < filter.Length; fi++)
{
if (filter[fi] == ">")
return si < subject.Length;
if (si >= subject.Length)
return false;
if (filter[fi] != "*" && filter[fi] != subject[si])
return false;
si++;
}
return si == subject.Length;
}
}