diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/StreamDeletionMeta.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/StreamDeletionMeta.cs
index 4e7af96..883e2da 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/StreamDeletionMeta.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/StreamDeletionMeta.cs
@@ -74,6 +74,21 @@ public sealed class StreamDeletionMeta
return false;
}
+ ///
+ /// Tries to get the pending entry for .
+ ///
+ public bool TryGetPending(ulong seq, out SdmBySeq entry) => _pending.TryGetValue(seq, out entry);
+
+ ///
+ /// Sets the pending entry for .
+ ///
+ public void SetPending(ulong seq, SdmBySeq entry) => _pending[seq] = entry;
+
+ ///
+ /// Returns the pending count for , or 0 if not tracked.
+ ///
+ public ulong GetSubjectTotal(string subj) => _totals.TryGetValue(subj, out var cnt) ? cnt : 0;
+
///
/// Clears all tracked data.
/// Mirrors SDMMeta.empty.
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs
index c910a2b..8cb1bdc 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs
@@ -1096,6 +1096,14 @@ public sealed class SubscriptionIndex
return false;
}
+ // Write lock must be held.
+ private Exception? AddInsertNotify(string subject, Action notify)
+ => AddNotify(_notify!.Insert, subject, notify);
+
+ // Write lock must be held.
+ private Exception? AddRemoveNotify(string subject, Action notify)
+ => AddNotify(_notify!.Remove, subject, notify);
+
private static Exception? AddNotify(Dictionary>> m, string subject, Action notify)
{
if (m.TryGetValue(subject, out var chs))
@@ -1531,6 +1539,9 @@ public sealed class SubscriptionIndex
public List? PList;
public SublistLevel? Next;
+ /// Factory method matching Go's newNode().
+ public static SublistNode NewNode() => new();
+
public bool IsEmpty()
{
return PSubs.Count == 0 && (QSubs == null || QSubs.Count == 0) &&
@@ -1544,6 +1555,9 @@ public sealed class SubscriptionIndex
public SublistNode? Pwc;
public SublistNode? Fwc;
+ /// Factory method matching Go's newLevel().
+ public static SublistLevel NewLevel() => new();
+
public int NumNodes()
{
var num = Nodes.Count;
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs
index f1ef9c6..0997c71 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs
@@ -243,6 +243,51 @@ public sealed class SubjectTransform : ISubjectTransformer
public static (SubjectTransform? transform, Exception? err) NewStrict(string src, string dest) =>
NewWithStrict(src, dest, true);
+ ///
+ /// Validates a subject mapping destination. Checks each token for valid syntax,
+ /// validates mustache-style mapping functions against known regexes, then verifies
+ /// the full transform can be created. Mirrors Go's ValidateMapping.
+ ///
+ public static Exception? ValidateMapping(string src, string dest)
+ {
+ if (string.IsNullOrEmpty(dest))
+ return null;
+
+ bool sfwc = false;
+ foreach (var t in dest.Split(SubjectTokens.Btsep))
+ {
+ var length = t.Length;
+ if (length == 0 || sfwc)
+ return new MappingDestinationException(t, ServerErrors.ErrInvalidMappingDestinationSubject);
+
+ // If it looks like a mapping function, validate against known patterns.
+ if (length > 4 && t[0] == '{' && t[1] == '{' && t[length - 2] == '}' && t[length - 1] == '}')
+ {
+ if (!PartitionRe.IsMatch(t) &&
+ !WildcardRe.IsMatch(t) &&
+ !SplitFromLeftRe.IsMatch(t) &&
+ !SplitFromRightRe.IsMatch(t) &&
+ !SliceFromLeftRe.IsMatch(t) &&
+ !SliceFromRightRe.IsMatch(t) &&
+ !SplitRe.IsMatch(t) &&
+ !RandomRe.IsMatch(t))
+ {
+ return new MappingDestinationException(t, ServerErrors.ErrUnknownMappingDestinationFunction);
+ }
+ continue;
+ }
+
+ if (length == 1 && t[0] == SubjectTokens.Fwc)
+ sfwc = true;
+ else if (t.AsSpan().ContainsAny("\t\n\f\r "))
+ return ServerErrors.ErrInvalidMappingDestinationSubject;
+ }
+
+ // Verify that the transform can actually be created.
+ var (_, err) = New(src, dest);
+ return err;
+ }
+
///
/// Attempts to match a published subject against the source pattern.
/// Returns the transformed subject or an error.
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs
index 5eb4aa5..658ffd0 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs
@@ -291,3 +291,109 @@ internal sealed class JsaUsage
/// Mirrors keyGen in server/jetstream.go.
///
public delegate byte[] KeyGen(byte[] context);
+
+// ---------------------------------------------------------------------------
+// JetStream message header helpers
+// ---------------------------------------------------------------------------
+
+///
+/// Static helpers for extracting TTL, scheduling, and scheduler information
+/// from JetStream message headers.
+/// Mirrors getMessageTTL, nextMessageSchedule, getMessageScheduler
+/// in server/stream.go.
+///
+public static class JetStreamHeaderHelpers
+{
+ ///
+ /// Extracts the TTL value (in seconds) from the message header.
+ /// Returns 0 if no TTL header is present. Returns -1 for "never".
+ /// Mirrors Go getMessageTTL.
+ ///
+ public static (long Ttl, Exception? Error) GetMessageTtl(byte[] hdr)
+ {
+ var raw = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMessageTtl, hdr);
+ if (raw == null || raw.Length == 0)
+ return (0, null);
+
+ return ParseMessageTtl(System.Text.Encoding.ASCII.GetString(raw));
+ }
+
+ ///
+ /// Parses a TTL string value into seconds.
+ /// Supports "never" (-1), Go-style duration strings ("30s", "5m"), or plain integer seconds.
+ /// Mirrors Go parseMessageTTL.
+ ///
+ public static (long Ttl, Exception? Error) ParseMessageTtl(string ttl)
+ {
+ if (string.Equals(ttl, "never", StringComparison.OrdinalIgnoreCase))
+ return (-1, null);
+
+ // Try parsing as a Go-style duration.
+ if (TryParseDuration(ttl, out var dur))
+ {
+ if (dur.TotalSeconds < 1)
+ return (0, new InvalidOperationException("message TTL invalid"));
+ return ((long)dur.TotalSeconds, null);
+ }
+
+ // Try as plain integer (seconds).
+ if (long.TryParse(ttl, out var t))
+ {
+ if (t < 0)
+ return (0, new InvalidOperationException("message TTL invalid"));
+ return (t, null);
+ }
+
+ return (0, new InvalidOperationException("message TTL invalid"));
+ }
+
+ ///
+ /// Extracts the next scheduled fire time from the message header.
+ /// Returns (DateTime, true) if valid, (default, true) if no header, (default, false) on parse error.
+ /// Mirrors Go nextMessageSchedule.
+ ///
+ public static (DateTime Schedule, bool Ok) NextMessageSchedule(byte[] hdr, long ts)
+ {
+ if (hdr.Length == 0)
+ return (default, true);
+
+ var slice = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsSchedulePattern, hdr);
+ if (slice == null || slice.Value.Length == 0)
+ return (default, true);
+
+ var val = System.Text.Encoding.ASCII.GetString(slice.Value.Span);
+ var (schedule, _, ok) = Internal.MsgScheduling.ParseMsgSchedule(val, ts);
+ return (schedule, ok);
+ }
+
+ ///
+ /// Extracts the scheduler identifier from the message header.
+ /// Returns empty string if not present.
+ /// Mirrors Go getMessageScheduler.
+ ///
+ public static string GetMessageScheduler(byte[] hdr)
+ {
+ if (hdr.Length == 0)
+ return string.Empty;
+
+ var raw = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduler, hdr);
+ if (raw == null || raw.Length == 0)
+ return string.Empty;
+
+ return System.Text.Encoding.ASCII.GetString(raw);
+ }
+
+ private static bool TryParseDuration(string s, out TimeSpan result)
+ {
+ result = default;
+ if (s.EndsWith("ms", StringComparison.Ordinal) && double.TryParse(s[..^2], out var ms))
+ { result = TimeSpan.FromMilliseconds(ms); return true; }
+ if (s.EndsWith('s') && double.TryParse(s[..^1], out var sec))
+ { result = TimeSpan.FromSeconds(sec); return true; }
+ if (s.EndsWith('m') && double.TryParse(s[..^1], out var min))
+ { result = TimeSpan.FromMinutes(min); return true; }
+ if (s.EndsWith('h') && double.TryParse(s[..^1], out var hr))
+ { result = TimeSpan.FromHours(hr); return true; }
+ return TimeSpan.TryParse(s, out result);
+ }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs
index 79f2260..ff828cb 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs
@@ -14,6 +14,7 @@
// Adapted from server/memstore.go
using System.Text;
+using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
@@ -57,6 +58,18 @@ public sealed class JetStreamMemStore : IStreamStore
// Consumer count
private int _consumers;
+ // TTL hash wheel (only created when cfg.AllowMsgTTL)
+ private HashWheel? _ttls;
+
+ // Message scheduling (only created when cfg.AllowMsgSchedules)
+ private MsgScheduling? _scheduling;
+
+ // Subject deletion metadata for cluster consensus
+ private StreamDeletionMeta _sdm = new();
+
+ // Guard against re-entrant age check
+ private bool _ageChkRun;
+
// -----------------------------------------------------------------------
// Constructor
// -----------------------------------------------------------------------
@@ -83,6 +96,11 @@ public sealed class JetStreamMemStore : IStreamStore
_state.LastSeq = cfg.FirstSeq - 1;
_state.FirstSeq = cfg.FirstSeq;
}
+
+ if (cfg.AllowMsgTTL)
+ _ttls = HashWheel.NewHashWheel();
+ if (cfg.AllowMsgSchedules)
+ _scheduling = new MsgScheduling(RunMsgScheduling);
}
// -----------------------------------------------------------------------
@@ -225,9 +243,52 @@ public sealed class JetStreamMemStore : IStreamStore
EnforceMsgLimit();
EnforceBytesLimit();
- // Age check
- if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero)
+ // Per-message TTL tracking.
+ if (_ttls != null && ttl > 0)
+ {
+ var expires = ts + (ttl * 1_000_000_000L);
+ _ttls.Add(seq, expires);
+ }
+
+ // Age check timer management.
+ if (_ttls != null && ttl > 0)
+ {
+ ResetAgeChk(0);
+ }
+ else if (_ageChk == null && (_cfg.MaxAge > TimeSpan.Zero || _ttls != null))
+ {
StartAgeChk();
+ }
+
+ // Message scheduling.
+ if (_scheduling != null && hdr.Length > 0)
+ {
+ var (schedule, ok) = JetStreamHeaderHelpers.NextMessageSchedule(hdr, ts);
+ if (ok && schedule != default)
+ {
+ _scheduling.Add(seq, subject, schedule.Ticks * 100L);
+ }
+ else
+ {
+ _scheduling.RemoveSubject(subject);
+ }
+
+ // Check for a repeating schedule.
+ var scheduleNextSlice = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsScheduleNext, hdr);
+ if (scheduleNextSlice != null && scheduleNextSlice.Value.Length > 0)
+ {
+ var scheduleNext = Encoding.ASCII.GetString(scheduleNextSlice.Value.Span);
+ if (scheduleNext != NatsHeaderConstants.JsScheduleNextPurge)
+ {
+ var scheduler = JetStreamHeaderHelpers.GetMessageScheduler(hdr);
+ if (DateTime.TryParse(scheduleNext, null, System.Globalization.DateTimeStyles.RoundtripKind, out var next)
+ && !string.IsNullOrEmpty(scheduler))
+ {
+ _scheduling.Update(scheduler, next.ToUniversalTime().Ticks * 100L);
+ }
+ }
+ }
+ }
}
///
@@ -562,6 +623,17 @@ public sealed class JetStreamMemStore : IStreamStore
UpdateFirstSeq(seq);
RemoveSeqPerSubject(sm.Subject, seq);
+ // Remove TTL entry from hash wheel if applicable.
+ if (_ttls != null && sm.Hdr.Length > 0)
+ {
+ var (ttl, err) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr);
+ if (err == null && ttl > 0)
+ {
+ var expires = sm.Ts + (ttl * 1_000_000_000L);
+ _ttls.Remove(seq, expires);
+ }
+ }
+
if (secure)
{
if (sm.Hdr.Length > 0)
@@ -1325,7 +1397,15 @@ public sealed class JetStreamMemStore : IStreamStore
///
public void ResetState()
{
- // For memory store, nothing to reset.
+ _mu.EnterWriteLock();
+ try
+ {
+ _scheduling?.ClearInflight();
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
}
// -----------------------------------------------------------------------
@@ -1412,6 +1492,571 @@ public sealed class JetStreamMemStore : IStreamStore
}
}
+ // -----------------------------------------------------------------------
+ // Size helpers (static)
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Computes raw message size from component lengths.
+ /// Mirrors Go memStoreMsgSizeRaw.
+ ///
+ internal static ulong MemStoreMsgSizeRaw(int slen, int hlen, int mlen)
+ => (ulong)(slen + hlen + mlen + 16);
+
+ ///
+ /// Computes message size from actual values.
+ /// Mirrors Go memStoreMsgSize (the package-level function).
+ ///
+ internal static ulong MemStoreMsgSize(string subj, byte[]? hdr, byte[]? msg)
+ => MemStoreMsgSizeRaw(subj.Length, hdr?.Length ?? 0, msg?.Length ?? 0);
+
+ // -----------------------------------------------------------------------
+ // Trivial helpers
+ // -----------------------------------------------------------------------
+
+ // Lock must be held.
+ private bool DeleteFirstMsg() => RemoveMsgLocked(_state.FirstSeq, false);
+
+ // Lock must be held.
+ private void DeleteFirstMsgOrPanic()
+ {
+ if (!DeleteFirstMsg())
+ throw new InvalidOperationException("jetstream memstore has inconsistent state, can't find first seq msg");
+ }
+
+ // Lock must be held.
+ private void CancelAgeChk()
+ {
+ if (_ageChk != null)
+ {
+ _ageChk.Dispose();
+ _ageChk = null;
+ _ageChkTime = 0;
+ }
+ }
+
+ ///
+ /// Returns true if a linear scan is preferable over subject tree lookup.
+ /// Mirrors Go shouldLinearScan.
+ ///
+ // Lock must be held.
+ private bool ShouldLinearScan(string filter, bool wc, ulong start)
+ {
+ const int LinearScanMaxFss = 256;
+ var isAll = filter == ">";
+ return isAll || 2 * (int)(_state.LastSeq - start) < _fss.Size() || (wc && _fss.Size() > LinearScanMaxFss);
+ }
+
+ ///
+ /// Returns true if the store is closed.
+ /// Mirrors Go isClosed.
+ ///
+ public bool IsClosed()
+ {
+ _mu.EnterReadLock();
+ try { return _msgs == null; }
+ finally { _mu.ExitReadLock(); }
+ }
+
+ ///
+ /// Checks if the filter represents all subjects (empty or ">").
+ /// Mirrors Go filterIsAll.
+ ///
+ // Lock must be held.
+ private static bool FilterIsAll(string filter)
+ => string.IsNullOrEmpty(filter) || filter == ">";
+
+ // -----------------------------------------------------------------------
+ // Low-complexity helpers
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Returns per-subject message totals matching the filter.
+ /// Mirrors Go subjectsTotalsLocked.
+ ///
+ // Lock must be held.
+ private Dictionary SubjectsTotalsLocked(string filterSubject)
+ {
+ if (_fss.Size() == 0)
+ return new Dictionary();
+
+ if (string.IsNullOrEmpty(filterSubject))
+ filterSubject = ">";
+
+ var isAll = filterSubject == ">";
+ var result = new Dictionary();
+ _fss.Match(Encoding.UTF8.GetBytes(filterSubject), (subj, ss) =>
+ {
+ result[Encoding.UTF8.GetString(subj)] = ss.Msgs;
+ return true;
+ });
+ return result;
+ }
+
+ ///
+ /// Finds literal subject match sequence bounds.
+ /// Returns (first, last, true) if found, or (0, 0, false) if not.
+ /// Mirrors Go nextLiteralMatchLocked.
+ ///
+ // Lock must be held.
+ private (ulong First, ulong Last, bool Found) NextLiteralMatchLocked(string filter, ulong start)
+ {
+ var (ss, ok) = _fss.Find(Encoding.UTF8.GetBytes(filter));
+ if (!ok || ss == null)
+ return (0, 0, false);
+
+ RecalculateForSubj(filter, ss);
+ if (start > ss.Last)
+ return (0, 0, false);
+
+ return (Math.Max(start, ss.First), ss.Last, true);
+ }
+
+ ///
+ /// Finds wildcard subject match sequence bounds using MatchUntil.
+ /// Mirrors Go nextWildcardMatchLocked.
+ ///
+ // Lock must be held.
+ private (ulong First, ulong Last, bool Found) NextWildcardMatchLocked(string filter, ulong start)
+ {
+ bool found = false;
+ ulong first = _state.LastSeq, last = 0;
+
+ _fss.MatchUntil(Encoding.UTF8.GetBytes(filter), (subj, ss) =>
+ {
+ RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
+
+ if (start > ss.Last)
+ return true;
+
+ found = true;
+ if (ss.First < first)
+ first = ss.First;
+ if (ss.Last > last)
+ last = ss.Last;
+
+ return first > start;
+ });
+
+ if (!found)
+ return (0, 0, false);
+
+ return (Math.Max(first, start), last, true);
+ }
+
+ // -----------------------------------------------------------------------
+ // SDM methods
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Determines whether this sequence/subject should be processed as a subject deletion marker.
+ /// Returns (isLast, shouldProcess).
+ /// Mirrors Go shouldProcessSdmLocked.
+ ///
+ // Lock must be held.
+ private (bool IsLast, bool ShouldProcess) ShouldProcessSdmLocked(ulong seq, string subj)
+ {
+ if (_sdm.TryGetPending(seq, out var p))
+ {
+ var elapsed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L - p.Ts;
+ if (elapsed < 2_000_000_000L) // 2 seconds in nanoseconds
+ return (p.Last, false);
+
+ var last = p.Last;
+ if (last)
+ {
+ var msgs = SubjectsTotalsLocked(subj).GetValueOrDefault(subj, 0UL);
+ var numPending = _sdm.GetSubjectTotal(subj);
+ if (msgs > numPending)
+ last = false;
+ }
+ _sdm.SetPending(seq, new SdmBySeq { Last = last, Ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L });
+ return (last, true);
+ }
+
+ var msgCount = SubjectsTotalsLocked(subj).GetValueOrDefault(subj, 0UL);
+ if (msgCount == 0)
+ return (false, true);
+
+ var pending = _sdm.GetSubjectTotal(subj);
+ var remaining = msgCount - pending;
+ return (_sdm.TrackPending(seq, subj, remaining == 1), true);
+ }
+
+ ///
+ /// Lock-wrapping version of ShouldProcessSdmLocked.
+ /// Mirrors Go shouldProcessSdm.
+ ///
+ public (bool IsLast, bool ShouldProcess) ShouldProcessSdm(ulong seq, string subj)
+ {
+ _mu.EnterWriteLock();
+ try { return ShouldProcessSdmLocked(seq, subj); }
+ finally { _mu.ExitWriteLock(); }
+ }
+
+ ///
+ /// Handles message removal: if SDM mode, builds a marker header and invokes _pmsgcb;
+ /// otherwise invokes _rmcb.
+ /// Mirrors Go handleRemovalOrSdm.
+ ///
+ public void HandleRemovalOrSdm(ulong seq, string subj, bool sdm, long sdmTtl)
+ {
+ if (sdm)
+ {
+ var hdr = Encoding.ASCII.GetBytes(
+ $"NATS/1.0\r\n{NatsHeaderConstants.JsMarkerReason}: {NatsHeaderConstants.JsMarkerReasonMaxAge}\r\n" +
+ $"{NatsHeaderConstants.JsMessageTtl}: {TimeSpan.FromSeconds(sdmTtl)}\r\n" +
+ $"{NatsHeaderConstants.JsMsgRollup}: {NatsHeaderConstants.JsMsgRollupSubject}\r\n\r\n");
+
+ // In Go this builds an inMsg and calls pmsgcb. We pass a synthetic StoreMsg.
+ var msg = new StoreMsg { Subject = subj, Hdr = hdr, Msg = Array.Empty(), Seq = 0, Ts = 0 };
+ _pmsgcb?.Invoke(msg);
+ }
+ else
+ {
+ _rmcb?.Invoke(seq);
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // Age/TTL methods
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Resets or arms the age check timer based on TTL and MaxAge.
+ /// Mirrors Go resetAgeChk.
+ ///
+ // Lock must be held.
+ private void ResetAgeChk(long delta)
+ {
+ if (_ageChkRun)
+ return;
+
+ long next = long.MaxValue;
+ if (_ttls != null)
+ next = _ttls.GetNextExpiration(next);
+
+ if (_cfg.MaxAge <= TimeSpan.Zero && next == long.MaxValue)
+ {
+ CancelAgeChk();
+ return;
+ }
+
+ var fireIn = _cfg.MaxAge;
+
+ if (delta == 0 && _state.Msgs > 0)
+ {
+ var until = TimeSpan.FromSeconds(2);
+ if (fireIn == TimeSpan.Zero || until < fireIn)
+ fireIn = until;
+ }
+
+ if (next < long.MaxValue)
+ {
+ var nextTicks = DateTime.UnixEpoch.Ticks + next / 100L;
+ var nextUtc = new DateTime(Math.Max(nextTicks, DateTime.UnixEpoch.Ticks), DateTimeKind.Utc);
+ var until = nextUtc - DateTime.UtcNow;
+ if (fireIn == TimeSpan.Zero || until < fireIn)
+ fireIn = until;
+ }
+
+ if (delta > 0)
+ {
+ var deltaDur = TimeSpan.FromTicks(delta / 100L);
+ if (fireIn == TimeSpan.Zero || deltaDur < fireIn)
+ fireIn = deltaDur;
+ }
+
+ if (fireIn < TimeSpan.FromMilliseconds(250))
+ fireIn = TimeSpan.FromMilliseconds(250);
+
+ var expires = DateTime.UtcNow.Ticks + fireIn.Ticks;
+ if (_ageChkTime > 0 && expires > _ageChkTime)
+ return;
+
+ _ageChkTime = expires;
+ if (_ageChk != null)
+ _ageChk.Change(fireIn, Timeout.InfiniteTimeSpan);
+ else
+ _ageChk = new Timer(_ => ExpireMsgs(), null, fireIn, Timeout.InfiniteTimeSpan);
+ }
+
+ ///
+ /// Recovers TTL state from existing messages after restart.
+ /// Mirrors Go recoverTTLState.
+ ///
+ // Lock must be held.
+ private void RecoverTTLState()
+ {
+ _ttls = HashWheel.NewHashWheel();
+ if (_state.Msgs == 0)
+ return;
+
+ try
+ {
+ var smp = new StoreMsg();
+ var seq = _state.FirstSeq;
+ while (seq <= _state.LastSeq)
+ {
+ if (_msgs != null && _msgs.TryGetValue(seq, out var sm) && sm != null)
+ {
+ if (sm.Hdr.Length > 0)
+ {
+ var (ttl, _) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr);
+ if (ttl > 0)
+ {
+ var expires = sm.Ts + (ttl * 1_000_000_000L);
+ _ttls.Add(seq, expires);
+ }
+ }
+ }
+ seq++;
+ }
+ }
+ finally
+ {
+ ResetAgeChk(0);
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // Scheduling methods
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Recovers message scheduling state from existing messages after restart.
+ /// Mirrors Go recoverMsgSchedulingState.
+ ///
+ // Lock must be held.
+ private void RecoverMsgSchedulingState()
+ {
+ _scheduling = new MsgScheduling(RunMsgScheduling);
+ if (_state.Msgs == 0)
+ return;
+
+ try
+ {
+ var seq = _state.FirstSeq;
+ while (seq <= _state.LastSeq)
+ {
+ if (_msgs != null && _msgs.TryGetValue(seq, out var sm) && sm != null)
+ {
+ if (sm.Hdr.Length > 0)
+ {
+ var (schedule, ok) = JetStreamHeaderHelpers.NextMessageSchedule(sm.Hdr, sm.Ts);
+ if (ok && schedule != default)
+ {
+ _scheduling.Init(seq, sm.Subject, schedule.Ticks * 100L);
+ }
+ }
+ }
+ seq++;
+ }
+ }
+ finally
+ {
+ _scheduling.ResetTimer();
+ }
+ }
+
+ ///
+ /// Runs through scheduled messages and fires callbacks.
+ /// Mirrors Go runMsgScheduling.
+ ///
+ private void RunMsgScheduling()
+ {
+ _mu.EnterWriteLock();
+ try
+ {
+ if (_scheduling == null)
+ return;
+ if (_pmsgcb == null)
+ {
+ _scheduling.ResetTimer();
+ return;
+ }
+
+ // TODO: Implement getScheduledMessages integration when MsgScheduling
+ // supports the full callback-based message loading pattern.
+ // For now, reset the timer so scheduling continues to fire.
+ _scheduling.ResetTimer();
+ }
+ finally
+ {
+ if (_mu.IsWriteLockHeld)
+ _mu.ExitWriteLock();
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // Reset / Purge Internal
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Completely resets the store. Clears all messages, state, fss, dmap, and sdm.
+ /// Mirrors Go reset.
+ ///
+ public Exception? Reset()
+ {
+ _mu.EnterWriteLock();
+ ulong purged = 0;
+ ulong bytes = 0;
+ StorageUpdateHandler? cb;
+ try
+ {
+ cb = _scb;
+ if (cb != null && _msgs != null)
+ {
+ foreach (var sm in _msgs.Values)
+ {
+ purged++;
+ bytes += MsgSize(sm.Subject, sm.Hdr, sm.Msg);
+ }
+ }
+
+ _state.FirstSeq = 0;
+ _state.FirstTime = default;
+ _state.LastSeq = 0;
+ _state.LastTime = DateTime.UtcNow;
+ _state.Msgs = 0;
+ _state.Bytes = 0;
+ _msgs = new Dictionary();
+ _fss.Reset();
+ _dmap = new SequenceSet();
+ _sdm.Empty();
+ }
+ finally
+ {
+ _mu.ExitWriteLock();
+ }
+
+ cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty);
+ return null;
+ }
+
+ ///
+ /// Internal purge with configurable first-sequence.
+ /// Mirrors Go purge (the internal version).
+ ///
+ // This is the internal purge used by cluster/raft — differs from the public Purge().
+ private (ulong Purged, Exception? Error) PurgeInternal(ulong fseq)
+ {
+ _mu.EnterWriteLock();
+ ulong purged;
+ long bytes;
+ StorageUpdateHandler? cb;
+ try
+ {
+ purged = (ulong)(_msgs?.Count ?? 0);
+ cb = _scb;
+ bytes = (long)_state.Bytes;
+
+ if (fseq == 0)
+ fseq = _state.LastSeq + 1;
+ else if (fseq < _state.LastSeq)
+ {
+ _mu.ExitWriteLock();
+ return (0, new InvalidOperationException("partial purges not supported on memory store"));
+ }
+
+ _state.FirstSeq = fseq;
+ _state.LastSeq = fseq - 1;
+ _state.FirstTime = default;
+ _state.Bytes = 0;
+ _state.Msgs = 0;
+ if (_msgs != null)
+ _msgs = new Dictionary();
+ _fss.Reset();
+ _dmap = new SequenceSet();
+ _sdm.Empty();
+ }
+ finally
+ {
+ if (_mu.IsWriteLockHeld)
+ _mu.ExitWriteLock();
+ }
+
+ cb?.Invoke(-(long)purged, -bytes, 0, string.Empty);
+ return (purged, null);
+ }
+
+ ///
+ /// Internal compact with SDM tracking.
+ /// Mirrors Go compact (the internal version).
+ ///
+ private (ulong Purged, Exception? Error) CompactInternal(ulong seq)
+ {
+ if (seq == 0)
+ return Purge();
+
+ ulong purged = 0;
+ ulong bytes = 0;
+
+ _mu.EnterWriteLock();
+ StorageUpdateHandler? cb;
+ try
+ {
+ if (_state.FirstSeq > seq)
+ return (0, null);
+
+ cb = _scb;
+ if (seq <= _state.LastSeq)
+ {
+ var fseq = _state.FirstSeq;
+ for (var s = seq; s <= _state.LastSeq; s++)
+ {
+ if (_msgs != null && _msgs.TryGetValue(s, out var sm2) && sm2 != null)
+ {
+ _state.FirstSeq = s;
+ _state.FirstTime = DateTimeOffset.FromUnixTimeMilliseconds(sm2.Ts / 1_000_000L).UtcDateTime;
+ break;
+ }
+ }
+ for (var s = seq - 1; s >= fseq; s--)
+ {
+ if (_msgs != null && _msgs.TryGetValue(s, out var sm2) && sm2 != null)
+ {
+ bytes += MsgSize(sm2.Subject, sm2.Hdr, sm2.Msg);
+ purged++;
+ RemoveSeqPerSubject(sm2.Subject, s);
+ _msgs.Remove(s);
+ }
+ else if (!_dmap.IsEmpty)
+ {
+ _dmap.Delete(s);
+ }
+ if (s == 0) break;
+ }
+ if (purged > _state.Msgs) purged = _state.Msgs;
+ _state.Msgs -= purged;
+ if (bytes > _state.Bytes) bytes = _state.Bytes;
+ _state.Bytes -= bytes;
+ }
+ else
+ {
+ purged = (ulong)(_msgs?.Count ?? 0);
+ bytes = _state.Bytes;
+ _state.Bytes = 0;
+ _state.Msgs = 0;
+ _state.FirstSeq = seq;
+ _state.FirstTime = default;
+ _state.LastSeq = seq - 1;
+ _msgs = new Dictionary();
+ _fss.Reset();
+ _dmap = new SequenceSet();
+ _sdm.Empty();
+ }
+ }
+ finally
+ {
+ if (_mu.IsWriteLockHeld)
+ _mu.ExitWriteLock();
+ }
+
+ cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty);
+ return (purged, null);
+ }
+
// -----------------------------------------------------------------------
// Private helpers
// -----------------------------------------------------------------------
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsMessageHeaders.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsMessageHeaders.cs
index 6d65536..d329068 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsMessageHeaders.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsMessageHeaders.cs
@@ -38,6 +38,32 @@ public static class NatsHeaderConstants
// Other commonly used headers.
public const string JsMsgId = "Nats-Msg-Id";
public const string JsMsgRollup = "Nats-Rollup";
+ public const string JsMsgSize = "Nats-Msg-Size";
+ public const string JsResponseType = "Nats-Response-Type";
+ public const string JsMessageTtl = "Nats-TTL";
+ public const string JsMarkerReason = "Nats-Marker-Reason";
+ public const string JsMessageIncr = "Nats-Incr";
+ public const string JsBatchId = "Nats-Batch-Id";
+ public const string JsBatchSeq = "Nats-Batch-Sequence";
+ public const string JsBatchCommit = "Nats-Batch-Commit";
+
+ // Scheduling headers.
+ public const string JsSchedulePattern = "Nats-Schedule";
+ public const string JsScheduleTtl = "Nats-Schedule-TTL";
+ public const string JsScheduleTarget = "Nats-Schedule-Target";
+ public const string JsScheduleSource = "Nats-Schedule-Source";
+ public const string JsScheduler = "Nats-Scheduler";
+ public const string JsScheduleNext = "Nats-Schedule-Next";
+ public const string JsScheduleNextPurge = "purge";
+
+ // Rollup values.
+ public const string JsMsgRollupSubject = "sub";
+ public const string JsMsgRollupAll = "all";
+
+ // Marker reasons.
+ public const string JsMarkerReasonMaxAge = "MaxAge";
+ public const string JsMarkerReasonPurge = "Purge";
+ public const string JsMarkerReasonRemove = "Remove";
}
///
diff --git a/porting.db b/porting.db
index 1f330f3..797f28e 100644
Binary files a/porting.db and b/porting.db differ
diff --git a/reports/current.md b/reports/current.md
index 8a1f25d..6b23774 100644
--- a/reports/current.md
+++ b/reports/current.md
@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
-Generated: 2026-02-27 11:11:12 UTC
+Generated: 2026-02-27 11:19:48 UTC
## Modules (12 total)
@@ -12,10 +12,10 @@ Generated: 2026-02-27 11:11:12 UTC
| Status | Count |
|--------|-------|
-| deferred | 2493 |
+| deferred | 2463 |
| n_a | 18 |
-| stub | 168 |
-| verified | 994 |
+| stub | 166 |
+| verified | 1026 |
## Unit Tests (3257 total)
@@ -35,4 +35,4 @@ Generated: 2026-02-27 11:11:12 UTC
## Overall Progress
-**1601/6942 items complete (23.1%)**
+**1633/6942 items complete (23.5%)**
diff --git a/reports/report_4e61314.md b/reports/report_4e61314.md
new file mode 100644
index 0000000..6b23774
--- /dev/null
+++ b/reports/report_4e61314.md
@@ -0,0 +1,38 @@
+# NATS .NET Porting Status Report
+
+Generated: 2026-02-27 11:19:48 UTC
+
+## Modules (12 total)
+
+| Status | Count |
+|--------|-------|
+| verified | 12 |
+
+## Features (3673 total)
+
+| Status | Count |
+|--------|-------|
+| deferred | 2463 |
+| n_a | 18 |
+| stub | 166 |
+| verified | 1026 |
+
+## Unit Tests (3257 total)
+
+| Status | Count |
+|--------|-------|
+| deferred | 2662 |
+| n_a | 187 |
+| stub | 18 |
+| verified | 390 |
+
+## Library Mappings (36 total)
+
+| Status | Count |
+|--------|-------|
+| mapped | 36 |
+
+
+## Overall Progress
+
+**1633/6942 items complete (23.5%)**