From ba4f41cf71ac26aef4c3040bd3220f00810abca1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 27 Feb 2026 06:19:47 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20SubscriptionIndex=20+=20Jet?= =?UTF-8?q?StreamMemStore=20cluster=20=E2=80=94=2039=20features=20verified?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add SubscriptionIndex factory methods, notification wrappers, and ValidateMapping. Implement 24 MemStore methods (TTL, scheduling, SDM, age-check, purge/compact/reset) with JetStream header helpers and constants. Verified features: 987 → 1026. --- .../DataStructures/StreamDeletionMeta.cs | 15 + .../DataStructures/SubscriptionIndex.cs | 14 + .../Internal/SubjectTransform.cs | 45 ++ .../JetStream/JetStreamTypes.cs | 106 +++ .../JetStream/MemStore.cs | 651 +++++++++++++++++- .../NatsMessageHeaders.cs | 26 + porting.db | Bin 3403776 -> 3403776 bytes reports/current.md | 10 +- reports/report_4e61314.md | 38 + 9 files changed, 897 insertions(+), 8 deletions(-) create mode 100644 reports/report_4e61314.md diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/StreamDeletionMeta.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/StreamDeletionMeta.cs index 4e7af96..883e2da 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/StreamDeletionMeta.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/StreamDeletionMeta.cs @@ -74,6 +74,21 @@ public sealed class StreamDeletionMeta return false; } + /// + /// Tries to get the pending entry for . + /// + public bool TryGetPending(ulong seq, out SdmBySeq entry) => _pending.TryGetValue(seq, out entry); + + /// + /// Sets the pending entry for . + /// + public void SetPending(ulong seq, SdmBySeq entry) => _pending[seq] = entry; + + /// + /// Returns the pending count for , or 0 if not tracked. + /// + public ulong GetSubjectTotal(string subj) => _totals.TryGetValue(subj, out var cnt) ? cnt : 0; + /// /// Clears all tracked data. /// Mirrors SDMMeta.empty. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs index c910a2b..8cb1bdc 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs @@ -1096,6 +1096,14 @@ public sealed class SubscriptionIndex return false; } + // Write lock must be held. + private Exception? AddInsertNotify(string subject, Action notify) + => AddNotify(_notify!.Insert, subject, notify); + + // Write lock must be held. + private Exception? AddRemoveNotify(string subject, Action notify) + => AddNotify(_notify!.Remove, subject, notify); + private static Exception? AddNotify(Dictionary>> m, string subject, Action notify) { if (m.TryGetValue(subject, out var chs)) @@ -1531,6 +1539,9 @@ public sealed class SubscriptionIndex public List? PList; public SublistLevel? Next; + /// Factory method matching Go's newNode(). + public static SublistNode NewNode() => new(); + public bool IsEmpty() { return PSubs.Count == 0 && (QSubs == null || QSubs.Count == 0) && @@ -1544,6 +1555,9 @@ public sealed class SubscriptionIndex public SublistNode? Pwc; public SublistNode? Fwc; + /// Factory method matching Go's newLevel(). + public static SublistLevel NewLevel() => new(); + public int NumNodes() { var num = Nodes.Count; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs index f1ef9c6..0997c71 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs @@ -243,6 +243,51 @@ public sealed class SubjectTransform : ISubjectTransformer public static (SubjectTransform? transform, Exception? err) NewStrict(string src, string dest) => NewWithStrict(src, dest, true); + /// + /// Validates a subject mapping destination. Checks each token for valid syntax, + /// validates mustache-style mapping functions against known regexes, then verifies + /// the full transform can be created. Mirrors Go's ValidateMapping. + /// + public static Exception? ValidateMapping(string src, string dest) + { + if (string.IsNullOrEmpty(dest)) + return null; + + bool sfwc = false; + foreach (var t in dest.Split(SubjectTokens.Btsep)) + { + var length = t.Length; + if (length == 0 || sfwc) + return new MappingDestinationException(t, ServerErrors.ErrInvalidMappingDestinationSubject); + + // If it looks like a mapping function, validate against known patterns. + if (length > 4 && t[0] == '{' && t[1] == '{' && t[length - 2] == '}' && t[length - 1] == '}') + { + if (!PartitionRe.IsMatch(t) && + !WildcardRe.IsMatch(t) && + !SplitFromLeftRe.IsMatch(t) && + !SplitFromRightRe.IsMatch(t) && + !SliceFromLeftRe.IsMatch(t) && + !SliceFromRightRe.IsMatch(t) && + !SplitRe.IsMatch(t) && + !RandomRe.IsMatch(t)) + { + return new MappingDestinationException(t, ServerErrors.ErrUnknownMappingDestinationFunction); + } + continue; + } + + if (length == 1 && t[0] == SubjectTokens.Fwc) + sfwc = true; + else if (t.AsSpan().ContainsAny("\t\n\f\r ")) + return ServerErrors.ErrInvalidMappingDestinationSubject; + } + + // Verify that the transform can actually be created. + var (_, err) = New(src, dest); + return err; + } + /// /// Attempts to match a published subject against the source pattern. /// Returns the transformed subject or an error. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs index 5eb4aa5..658ffd0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs @@ -291,3 +291,109 @@ internal sealed class JsaUsage /// Mirrors keyGen in server/jetstream.go. /// public delegate byte[] KeyGen(byte[] context); + +// --------------------------------------------------------------------------- +// JetStream message header helpers +// --------------------------------------------------------------------------- + +/// +/// Static helpers for extracting TTL, scheduling, and scheduler information +/// from JetStream message headers. +/// Mirrors getMessageTTL, nextMessageSchedule, getMessageScheduler +/// in server/stream.go. +/// +public static class JetStreamHeaderHelpers +{ + /// + /// Extracts the TTL value (in seconds) from the message header. + /// Returns 0 if no TTL header is present. Returns -1 for "never". + /// Mirrors Go getMessageTTL. + /// + public static (long Ttl, Exception? Error) GetMessageTtl(byte[] hdr) + { + var raw = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMessageTtl, hdr); + if (raw == null || raw.Length == 0) + return (0, null); + + return ParseMessageTtl(System.Text.Encoding.ASCII.GetString(raw)); + } + + /// + /// Parses a TTL string value into seconds. + /// Supports "never" (-1), Go-style duration strings ("30s", "5m"), or plain integer seconds. + /// Mirrors Go parseMessageTTL. + /// + public static (long Ttl, Exception? Error) ParseMessageTtl(string ttl) + { + if (string.Equals(ttl, "never", StringComparison.OrdinalIgnoreCase)) + return (-1, null); + + // Try parsing as a Go-style duration. + if (TryParseDuration(ttl, out var dur)) + { + if (dur.TotalSeconds < 1) + return (0, new InvalidOperationException("message TTL invalid")); + return ((long)dur.TotalSeconds, null); + } + + // Try as plain integer (seconds). + if (long.TryParse(ttl, out var t)) + { + if (t < 0) + return (0, new InvalidOperationException("message TTL invalid")); + return (t, null); + } + + return (0, new InvalidOperationException("message TTL invalid")); + } + + /// + /// Extracts the next scheduled fire time from the message header. + /// Returns (DateTime, true) if valid, (default, true) if no header, (default, false) on parse error. + /// Mirrors Go nextMessageSchedule. + /// + public static (DateTime Schedule, bool Ok) NextMessageSchedule(byte[] hdr, long ts) + { + if (hdr.Length == 0) + return (default, true); + + var slice = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsSchedulePattern, hdr); + if (slice == null || slice.Value.Length == 0) + return (default, true); + + var val = System.Text.Encoding.ASCII.GetString(slice.Value.Span); + var (schedule, _, ok) = Internal.MsgScheduling.ParseMsgSchedule(val, ts); + return (schedule, ok); + } + + /// + /// Extracts the scheduler identifier from the message header. + /// Returns empty string if not present. + /// Mirrors Go getMessageScheduler. + /// + public static string GetMessageScheduler(byte[] hdr) + { + if (hdr.Length == 0) + return string.Empty; + + var raw = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduler, hdr); + if (raw == null || raw.Length == 0) + return string.Empty; + + return System.Text.Encoding.ASCII.GetString(raw); + } + + private static bool TryParseDuration(string s, out TimeSpan result) + { + result = default; + if (s.EndsWith("ms", StringComparison.Ordinal) && double.TryParse(s[..^2], out var ms)) + { result = TimeSpan.FromMilliseconds(ms); return true; } + if (s.EndsWith('s') && double.TryParse(s[..^1], out var sec)) + { result = TimeSpan.FromSeconds(sec); return true; } + if (s.EndsWith('m') && double.TryParse(s[..^1], out var min)) + { result = TimeSpan.FromMinutes(min); return true; } + if (s.EndsWith('h') && double.TryParse(s[..^1], out var hr)) + { result = TimeSpan.FromHours(hr); return true; } + return TimeSpan.TryParse(s, out result); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs index 79f2260..ff828cb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs @@ -14,6 +14,7 @@ // Adapted from server/memstore.go using System.Text; +using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -57,6 +58,18 @@ public sealed class JetStreamMemStore : IStreamStore // Consumer count private int _consumers; + // TTL hash wheel (only created when cfg.AllowMsgTTL) + private HashWheel? _ttls; + + // Message scheduling (only created when cfg.AllowMsgSchedules) + private MsgScheduling? _scheduling; + + // Subject deletion metadata for cluster consensus + private StreamDeletionMeta _sdm = new(); + + // Guard against re-entrant age check + private bool _ageChkRun; + // ----------------------------------------------------------------------- // Constructor // ----------------------------------------------------------------------- @@ -83,6 +96,11 @@ public sealed class JetStreamMemStore : IStreamStore _state.LastSeq = cfg.FirstSeq - 1; _state.FirstSeq = cfg.FirstSeq; } + + if (cfg.AllowMsgTTL) + _ttls = HashWheel.NewHashWheel(); + if (cfg.AllowMsgSchedules) + _scheduling = new MsgScheduling(RunMsgScheduling); } // ----------------------------------------------------------------------- @@ -225,9 +243,52 @@ public sealed class JetStreamMemStore : IStreamStore EnforceMsgLimit(); EnforceBytesLimit(); - // Age check - if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero) + // Per-message TTL tracking. + if (_ttls != null && ttl > 0) + { + var expires = ts + (ttl * 1_000_000_000L); + _ttls.Add(seq, expires); + } + + // Age check timer management. + if (_ttls != null && ttl > 0) + { + ResetAgeChk(0); + } + else if (_ageChk == null && (_cfg.MaxAge > TimeSpan.Zero || _ttls != null)) + { StartAgeChk(); + } + + // Message scheduling. + if (_scheduling != null && hdr.Length > 0) + { + var (schedule, ok) = JetStreamHeaderHelpers.NextMessageSchedule(hdr, ts); + if (ok && schedule != default) + { + _scheduling.Add(seq, subject, schedule.Ticks * 100L); + } + else + { + _scheduling.RemoveSubject(subject); + } + + // Check for a repeating schedule. + var scheduleNextSlice = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsScheduleNext, hdr); + if (scheduleNextSlice != null && scheduleNextSlice.Value.Length > 0) + { + var scheduleNext = Encoding.ASCII.GetString(scheduleNextSlice.Value.Span); + if (scheduleNext != NatsHeaderConstants.JsScheduleNextPurge) + { + var scheduler = JetStreamHeaderHelpers.GetMessageScheduler(hdr); + if (DateTime.TryParse(scheduleNext, null, System.Globalization.DateTimeStyles.RoundtripKind, out var next) + && !string.IsNullOrEmpty(scheduler)) + { + _scheduling.Update(scheduler, next.ToUniversalTime().Ticks * 100L); + } + } + } + } } /// @@ -562,6 +623,17 @@ public sealed class JetStreamMemStore : IStreamStore UpdateFirstSeq(seq); RemoveSeqPerSubject(sm.Subject, seq); + // Remove TTL entry from hash wheel if applicable. + if (_ttls != null && sm.Hdr.Length > 0) + { + var (ttl, err) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr); + if (err == null && ttl > 0) + { + var expires = sm.Ts + (ttl * 1_000_000_000L); + _ttls.Remove(seq, expires); + } + } + if (secure) { if (sm.Hdr.Length > 0) @@ -1325,7 +1397,15 @@ public sealed class JetStreamMemStore : IStreamStore /// public void ResetState() { - // For memory store, nothing to reset. + _mu.EnterWriteLock(); + try + { + _scheduling?.ClearInflight(); + } + finally + { + _mu.ExitWriteLock(); + } } // ----------------------------------------------------------------------- @@ -1412,6 +1492,571 @@ public sealed class JetStreamMemStore : IStreamStore } } + // ----------------------------------------------------------------------- + // Size helpers (static) + // ----------------------------------------------------------------------- + + /// + /// Computes raw message size from component lengths. + /// Mirrors Go memStoreMsgSizeRaw. + /// + internal static ulong MemStoreMsgSizeRaw(int slen, int hlen, int mlen) + => (ulong)(slen + hlen + mlen + 16); + + /// + /// Computes message size from actual values. + /// Mirrors Go memStoreMsgSize (the package-level function). + /// + internal static ulong MemStoreMsgSize(string subj, byte[]? hdr, byte[]? msg) + => MemStoreMsgSizeRaw(subj.Length, hdr?.Length ?? 0, msg?.Length ?? 0); + + // ----------------------------------------------------------------------- + // Trivial helpers + // ----------------------------------------------------------------------- + + // Lock must be held. + private bool DeleteFirstMsg() => RemoveMsgLocked(_state.FirstSeq, false); + + // Lock must be held. + private void DeleteFirstMsgOrPanic() + { + if (!DeleteFirstMsg()) + throw new InvalidOperationException("jetstream memstore has inconsistent state, can't find first seq msg"); + } + + // Lock must be held. + private void CancelAgeChk() + { + if (_ageChk != null) + { + _ageChk.Dispose(); + _ageChk = null; + _ageChkTime = 0; + } + } + + /// + /// Returns true if a linear scan is preferable over subject tree lookup. + /// Mirrors Go shouldLinearScan. + /// + // Lock must be held. + private bool ShouldLinearScan(string filter, bool wc, ulong start) + { + const int LinearScanMaxFss = 256; + var isAll = filter == ">"; + return isAll || 2 * (int)(_state.LastSeq - start) < _fss.Size() || (wc && _fss.Size() > LinearScanMaxFss); + } + + /// + /// Returns true if the store is closed. + /// Mirrors Go isClosed. + /// + public bool IsClosed() + { + _mu.EnterReadLock(); + try { return _msgs == null; } + finally { _mu.ExitReadLock(); } + } + + /// + /// Checks if the filter represents all subjects (empty or ">"). + /// Mirrors Go filterIsAll. + /// + // Lock must be held. + private static bool FilterIsAll(string filter) + => string.IsNullOrEmpty(filter) || filter == ">"; + + // ----------------------------------------------------------------------- + // Low-complexity helpers + // ----------------------------------------------------------------------- + + /// + /// Returns per-subject message totals matching the filter. + /// Mirrors Go subjectsTotalsLocked. + /// + // Lock must be held. + private Dictionary SubjectsTotalsLocked(string filterSubject) + { + if (_fss.Size() == 0) + return new Dictionary(); + + if (string.IsNullOrEmpty(filterSubject)) + filterSubject = ">"; + + var isAll = filterSubject == ">"; + var result = new Dictionary(); + _fss.Match(Encoding.UTF8.GetBytes(filterSubject), (subj, ss) => + { + result[Encoding.UTF8.GetString(subj)] = ss.Msgs; + return true; + }); + return result; + } + + /// + /// Finds literal subject match sequence bounds. + /// Returns (first, last, true) if found, or (0, 0, false) if not. + /// Mirrors Go nextLiteralMatchLocked. + /// + // Lock must be held. + private (ulong First, ulong Last, bool Found) NextLiteralMatchLocked(string filter, ulong start) + { + var (ss, ok) = _fss.Find(Encoding.UTF8.GetBytes(filter)); + if (!ok || ss == null) + return (0, 0, false); + + RecalculateForSubj(filter, ss); + if (start > ss.Last) + return (0, 0, false); + + return (Math.Max(start, ss.First), ss.Last, true); + } + + /// + /// Finds wildcard subject match sequence bounds using MatchUntil. + /// Mirrors Go nextWildcardMatchLocked. + /// + // Lock must be held. + private (ulong First, ulong Last, bool Found) NextWildcardMatchLocked(string filter, ulong start) + { + bool found = false; + ulong first = _state.LastSeq, last = 0; + + _fss.MatchUntil(Encoding.UTF8.GetBytes(filter), (subj, ss) => + { + RecalculateForSubj(Encoding.UTF8.GetString(subj), ss); + + if (start > ss.Last) + return true; + + found = true; + if (ss.First < first) + first = ss.First; + if (ss.Last > last) + last = ss.Last; + + return first > start; + }); + + if (!found) + return (0, 0, false); + + return (Math.Max(first, start), last, true); + } + + // ----------------------------------------------------------------------- + // SDM methods + // ----------------------------------------------------------------------- + + /// + /// Determines whether this sequence/subject should be processed as a subject deletion marker. + /// Returns (isLast, shouldProcess). + /// Mirrors Go shouldProcessSdmLocked. + /// + // Lock must be held. + private (bool IsLast, bool ShouldProcess) ShouldProcessSdmLocked(ulong seq, string subj) + { + if (_sdm.TryGetPending(seq, out var p)) + { + var elapsed = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L - p.Ts; + if (elapsed < 2_000_000_000L) // 2 seconds in nanoseconds + return (p.Last, false); + + var last = p.Last; + if (last) + { + var msgs = SubjectsTotalsLocked(subj).GetValueOrDefault(subj, 0UL); + var numPending = _sdm.GetSubjectTotal(subj); + if (msgs > numPending) + last = false; + } + _sdm.SetPending(seq, new SdmBySeq { Last = last, Ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L }); + return (last, true); + } + + var msgCount = SubjectsTotalsLocked(subj).GetValueOrDefault(subj, 0UL); + if (msgCount == 0) + return (false, true); + + var pending = _sdm.GetSubjectTotal(subj); + var remaining = msgCount - pending; + return (_sdm.TrackPending(seq, subj, remaining == 1), true); + } + + /// + /// Lock-wrapping version of ShouldProcessSdmLocked. + /// Mirrors Go shouldProcessSdm. + /// + public (bool IsLast, bool ShouldProcess) ShouldProcessSdm(ulong seq, string subj) + { + _mu.EnterWriteLock(); + try { return ShouldProcessSdmLocked(seq, subj); } + finally { _mu.ExitWriteLock(); } + } + + /// + /// Handles message removal: if SDM mode, builds a marker header and invokes _pmsgcb; + /// otherwise invokes _rmcb. + /// Mirrors Go handleRemovalOrSdm. + /// + public void HandleRemovalOrSdm(ulong seq, string subj, bool sdm, long sdmTtl) + { + if (sdm) + { + var hdr = Encoding.ASCII.GetBytes( + $"NATS/1.0\r\n{NatsHeaderConstants.JsMarkerReason}: {NatsHeaderConstants.JsMarkerReasonMaxAge}\r\n" + + $"{NatsHeaderConstants.JsMessageTtl}: {TimeSpan.FromSeconds(sdmTtl)}\r\n" + + $"{NatsHeaderConstants.JsMsgRollup}: {NatsHeaderConstants.JsMsgRollupSubject}\r\n\r\n"); + + // In Go this builds an inMsg and calls pmsgcb. We pass a synthetic StoreMsg. + var msg = new StoreMsg { Subject = subj, Hdr = hdr, Msg = Array.Empty(), Seq = 0, Ts = 0 }; + _pmsgcb?.Invoke(msg); + } + else + { + _rmcb?.Invoke(seq); + } + } + + // ----------------------------------------------------------------------- + // Age/TTL methods + // ----------------------------------------------------------------------- + + /// + /// Resets or arms the age check timer based on TTL and MaxAge. + /// Mirrors Go resetAgeChk. + /// + // Lock must be held. + private void ResetAgeChk(long delta) + { + if (_ageChkRun) + return; + + long next = long.MaxValue; + if (_ttls != null) + next = _ttls.GetNextExpiration(next); + + if (_cfg.MaxAge <= TimeSpan.Zero && next == long.MaxValue) + { + CancelAgeChk(); + return; + } + + var fireIn = _cfg.MaxAge; + + if (delta == 0 && _state.Msgs > 0) + { + var until = TimeSpan.FromSeconds(2); + if (fireIn == TimeSpan.Zero || until < fireIn) + fireIn = until; + } + + if (next < long.MaxValue) + { + var nextTicks = DateTime.UnixEpoch.Ticks + next / 100L; + var nextUtc = new DateTime(Math.Max(nextTicks, DateTime.UnixEpoch.Ticks), DateTimeKind.Utc); + var until = nextUtc - DateTime.UtcNow; + if (fireIn == TimeSpan.Zero || until < fireIn) + fireIn = until; + } + + if (delta > 0) + { + var deltaDur = TimeSpan.FromTicks(delta / 100L); + if (fireIn == TimeSpan.Zero || deltaDur < fireIn) + fireIn = deltaDur; + } + + if (fireIn < TimeSpan.FromMilliseconds(250)) + fireIn = TimeSpan.FromMilliseconds(250); + + var expires = DateTime.UtcNow.Ticks + fireIn.Ticks; + if (_ageChkTime > 0 && expires > _ageChkTime) + return; + + _ageChkTime = expires; + if (_ageChk != null) + _ageChk.Change(fireIn, Timeout.InfiniteTimeSpan); + else + _ageChk = new Timer(_ => ExpireMsgs(), null, fireIn, Timeout.InfiniteTimeSpan); + } + + /// + /// Recovers TTL state from existing messages after restart. + /// Mirrors Go recoverTTLState. + /// + // Lock must be held. + private void RecoverTTLState() + { + _ttls = HashWheel.NewHashWheel(); + if (_state.Msgs == 0) + return; + + try + { + var smp = new StoreMsg(); + var seq = _state.FirstSeq; + while (seq <= _state.LastSeq) + { + if (_msgs != null && _msgs.TryGetValue(seq, out var sm) && sm != null) + { + if (sm.Hdr.Length > 0) + { + var (ttl, _) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr); + if (ttl > 0) + { + var expires = sm.Ts + (ttl * 1_000_000_000L); + _ttls.Add(seq, expires); + } + } + } + seq++; + } + } + finally + { + ResetAgeChk(0); + } + } + + // ----------------------------------------------------------------------- + // Scheduling methods + // ----------------------------------------------------------------------- + + /// + /// Recovers message scheduling state from existing messages after restart. + /// Mirrors Go recoverMsgSchedulingState. + /// + // Lock must be held. + private void RecoverMsgSchedulingState() + { + _scheduling = new MsgScheduling(RunMsgScheduling); + if (_state.Msgs == 0) + return; + + try + { + var seq = _state.FirstSeq; + while (seq <= _state.LastSeq) + { + if (_msgs != null && _msgs.TryGetValue(seq, out var sm) && sm != null) + { + if (sm.Hdr.Length > 0) + { + var (schedule, ok) = JetStreamHeaderHelpers.NextMessageSchedule(sm.Hdr, sm.Ts); + if (ok && schedule != default) + { + _scheduling.Init(seq, sm.Subject, schedule.Ticks * 100L); + } + } + } + seq++; + } + } + finally + { + _scheduling.ResetTimer(); + } + } + + /// + /// Runs through scheduled messages and fires callbacks. + /// Mirrors Go runMsgScheduling. + /// + private void RunMsgScheduling() + { + _mu.EnterWriteLock(); + try + { + if (_scheduling == null) + return; + if (_pmsgcb == null) + { + _scheduling.ResetTimer(); + return; + } + + // TODO: Implement getScheduledMessages integration when MsgScheduling + // supports the full callback-based message loading pattern. + // For now, reset the timer so scheduling continues to fire. + _scheduling.ResetTimer(); + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + } + + // ----------------------------------------------------------------------- + // Reset / Purge Internal + // ----------------------------------------------------------------------- + + /// + /// Completely resets the store. Clears all messages, state, fss, dmap, and sdm. + /// Mirrors Go reset. + /// + public Exception? Reset() + { + _mu.EnterWriteLock(); + ulong purged = 0; + ulong bytes = 0; + StorageUpdateHandler? cb; + try + { + cb = _scb; + if (cb != null && _msgs != null) + { + foreach (var sm in _msgs.Values) + { + purged++; + bytes += MsgSize(sm.Subject, sm.Hdr, sm.Msg); + } + } + + _state.FirstSeq = 0; + _state.FirstTime = default; + _state.LastSeq = 0; + _state.LastTime = DateTime.UtcNow; + _state.Msgs = 0; + _state.Bytes = 0; + _msgs = new Dictionary(); + _fss.Reset(); + _dmap = new SequenceSet(); + _sdm.Empty(); + } + finally + { + _mu.ExitWriteLock(); + } + + cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty); + return null; + } + + /// + /// Internal purge with configurable first-sequence. + /// Mirrors Go purge (the internal version). + /// + // This is the internal purge used by cluster/raft — differs from the public Purge(). + private (ulong Purged, Exception? Error) PurgeInternal(ulong fseq) + { + _mu.EnterWriteLock(); + ulong purged; + long bytes; + StorageUpdateHandler? cb; + try + { + purged = (ulong)(_msgs?.Count ?? 0); + cb = _scb; + bytes = (long)_state.Bytes; + + if (fseq == 0) + fseq = _state.LastSeq + 1; + else if (fseq < _state.LastSeq) + { + _mu.ExitWriteLock(); + return (0, new InvalidOperationException("partial purges not supported on memory store")); + } + + _state.FirstSeq = fseq; + _state.LastSeq = fseq - 1; + _state.FirstTime = default; + _state.Bytes = 0; + _state.Msgs = 0; + if (_msgs != null) + _msgs = new Dictionary(); + _fss.Reset(); + _dmap = new SequenceSet(); + _sdm.Empty(); + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + + cb?.Invoke(-(long)purged, -bytes, 0, string.Empty); + return (purged, null); + } + + /// + /// Internal compact with SDM tracking. + /// Mirrors Go compact (the internal version). + /// + private (ulong Purged, Exception? Error) CompactInternal(ulong seq) + { + if (seq == 0) + return Purge(); + + ulong purged = 0; + ulong bytes = 0; + + _mu.EnterWriteLock(); + StorageUpdateHandler? cb; + try + { + if (_state.FirstSeq > seq) + return (0, null); + + cb = _scb; + if (seq <= _state.LastSeq) + { + var fseq = _state.FirstSeq; + for (var s = seq; s <= _state.LastSeq; s++) + { + if (_msgs != null && _msgs.TryGetValue(s, out var sm2) && sm2 != null) + { + _state.FirstSeq = s; + _state.FirstTime = DateTimeOffset.FromUnixTimeMilliseconds(sm2.Ts / 1_000_000L).UtcDateTime; + break; + } + } + for (var s = seq - 1; s >= fseq; s--) + { + if (_msgs != null && _msgs.TryGetValue(s, out var sm2) && sm2 != null) + { + bytes += MsgSize(sm2.Subject, sm2.Hdr, sm2.Msg); + purged++; + RemoveSeqPerSubject(sm2.Subject, s); + _msgs.Remove(s); + } + else if (!_dmap.IsEmpty) + { + _dmap.Delete(s); + } + if (s == 0) break; + } + if (purged > _state.Msgs) purged = _state.Msgs; + _state.Msgs -= purged; + if (bytes > _state.Bytes) bytes = _state.Bytes; + _state.Bytes -= bytes; + } + else + { + purged = (ulong)(_msgs?.Count ?? 0); + bytes = _state.Bytes; + _state.Bytes = 0; + _state.Msgs = 0; + _state.FirstSeq = seq; + _state.FirstTime = default; + _state.LastSeq = seq - 1; + _msgs = new Dictionary(); + _fss.Reset(); + _dmap = new SequenceSet(); + _sdm.Empty(); + } + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + + cb?.Invoke(-(long)purged, -(long)bytes, 0, string.Empty); + return (purged, null); + } + // ----------------------------------------------------------------------- // Private helpers // ----------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsMessageHeaders.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsMessageHeaders.cs index 6d65536..d329068 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsMessageHeaders.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsMessageHeaders.cs @@ -38,6 +38,32 @@ public static class NatsHeaderConstants // Other commonly used headers. public const string JsMsgId = "Nats-Msg-Id"; public const string JsMsgRollup = "Nats-Rollup"; + public const string JsMsgSize = "Nats-Msg-Size"; + public const string JsResponseType = "Nats-Response-Type"; + public const string JsMessageTtl = "Nats-TTL"; + public const string JsMarkerReason = "Nats-Marker-Reason"; + public const string JsMessageIncr = "Nats-Incr"; + public const string JsBatchId = "Nats-Batch-Id"; + public const string JsBatchSeq = "Nats-Batch-Sequence"; + public const string JsBatchCommit = "Nats-Batch-Commit"; + + // Scheduling headers. + public const string JsSchedulePattern = "Nats-Schedule"; + public const string JsScheduleTtl = "Nats-Schedule-TTL"; + public const string JsScheduleTarget = "Nats-Schedule-Target"; + public const string JsScheduleSource = "Nats-Schedule-Source"; + public const string JsScheduler = "Nats-Scheduler"; + public const string JsScheduleNext = "Nats-Schedule-Next"; + public const string JsScheduleNextPurge = "purge"; + + // Rollup values. + public const string JsMsgRollupSubject = "sub"; + public const string JsMsgRollupAll = "all"; + + // Marker reasons. + public const string JsMarkerReasonMaxAge = "MaxAge"; + public const string JsMarkerReasonPurge = "Purge"; + public const string JsMarkerReasonRemove = "Remove"; } /// diff --git a/porting.db b/porting.db index 1f330f322e6a03ec7bdbfc85f2fed400d235aa42..797f28e15bb69f027a25f7a24bd9f9fde72a2c35 100644 GIT binary patch delta 5800 zcmcIo4R91i7T%f7^vv#NdS*fhfv}qihVYl4%^$=F7*J9`kqAjZz#!RVCuAYnU3WJS zOLVgc!Gh9@XY?tG7g8J|2INPWJGt|6;3>xeiJa$IfFz>jDTPKr(r^(E63>(f;PbO{CiP8E5 zy570M>00IFbUKRW^=6tVaa21LJNp{sbyLYPq#yO~H~k)*-Bezb@TKmKXOb?Cf@9rO z%n-J}2w$bs>0;DN-IVBN>65ynG}eJNNt_CWbLnF=dj+P?qn}9Uqi^L09eboi_(76{ zXHU@>Pzil3T_3N@vn$oaY{kyaBCYX}*4?BvGSZqvTKKtAI$mZ^^%Ke>tv;b)T)Pgd zd8Kq)R16o@Z(zJO0B?1R@e2(-)QO1Jc+%>Kv?da%K4rPuoXA*?=AKAv$^gm9LG9%W z>G-JNQ^i?vnEI=Q^gJrY^kF}ZOGpc=mlx47DN$c*9=%v0#H(^6y`M^YpLvJ*9rF_N z6w}DqnFY*DCZ9=VER4v|QD134^-6&D&@|&wm#?KcDvH@bELypa&WU2qk*8hS6InE@ zezuObjG%geX{9|(1742H)s|M;O2LsWw0Zar^nZTpxh?ecsBw4eI4zzG`k}PfS6^w^ z7kS!?9@52XecWR5WK@f`(Ti9bC&Tug^y+ln)VGj)3oh<+9sgv1*YWF#B!-e*bWSy) z>khk=w+)bbfQ&^#fnQ8ok%i%lG&3Xf<_wrOb%5j)O;WwEi;ho97Ma63!Gk}=3-}{^ za&)6bC~4Ti;O=oOfu@swD~@Bz?5CBkj(TbUM@cT#(QdQ zUazb)IJ`cuTv_jv>l9y&%Pu=zKDpXe@3e>WZ-%qR=B!feAz6oWnOy2~d6W{{sv_^Q zIj*V|WN4McS0k5|&B^dq)hPD*S|a6_z4dOl%j3(kW=%}9W~F6Kk~1^&GxPGZGXtq& zm|AXxTJ+bqOl?-3yH=@FoIXOq;go$f4zKK0yk3XPDf`bgw)EQMm2GxA5%Ia?BBd_K zUbZ>y@@&Od>hmbJx|xd2u6SlDwQj}Zy^e2|HIOO{!uQe*`T7U*<&6m6R=(Ki?_K+T z4*v3ngl{un|E)Qgzuv)I>CfTozb+zfr-KWkHNHdr=0Kl+IkUE{On2EImPs7?eKj7} zD!JI@>$m7L8mbhxPYX|*FA|ICRo?u}?3_vAgzG2&Gi>nxcvKHJg`F-$vXGy8%TAwG zOL9mvx?eWhyneY@S(QORQp{21)1K}pyF{oZYbOOqRX9JdNx97Y+}yx$;~?aw+&npd z@9|-9c0ZS#VGweY2axmM%-;HokSpRea(->&U)8s+>j$vA8FJUeVQGEka;3^w=CL`w z)h@%q7#s>KZDP@JWDpI;04u@TDS+J3!5ei2Vqs1<} zqQ&YQWu;OplSBD7@~~UEwnkX}- zg`vLBlP$N*aj>{0M+U_s+a1;97)(mCUrsNcQ5HE>*y@ylD{H+oKRYX6in^Z8ledMA z{~A6wM@N4>ou;2g$A8U^_><_+*V7>C+a{C z^T&g*myR++&wG{_xbPxMXM;z(EUTLI5?Vt+Whpj8$7WO*>E~ZYPf!puBGf+=9)2vA zhp8_k1gCz362oE#T2VI(SzA#$@Y_*ZxErQzN7c}V*kCsX=XS)x`R#r4Kzx-5Y%vgl z&estGD^?gII-&DbG#gG@GR@#_K@wDKC6kMHpfM5ct9PKn(Bz9dP^#E#N}!^7lBA>V zo)7PBLUAztHDrU!s)^Y38Y+m)W!q5&>eWk$x1x1-e|y;F7}5ejw^YybU< zF){Ln$;~<18+h#vBO!mgQGzdaqLh%OWKKpzm3J4a3AsFIgW+{FAu@K?>-}sWAMQS| z8@)L)^!Cq>p=_zwsx>9eV`6kv?wa|q`yUo!)^YStXcNe?1QmB0SOi5UP;yv}+HeBR zND1l5_zyC{m9N5{%g$7jzedUW(BR6eC@(w-_FqNLQ1`rV`vphSw`d)rKG+}4zQx7~ zgvOnLx=iyJnp&f4Hk0O*z;HNmh&cNJ%mzYXU=Y|yaIHVZu>>4}fxt*$B8Vp7)l(d{ zZHJRzU>O=8H}jCb7Y~6)_TbSPDeXuBuk68#2MT0G#+L2HbuewMIW07=c`r7CWgkw| z+@?)lquX>BAvi0bho=h?V%5rh_`~6vvUI39j)M%X`8nCJ`8eJR_q?q)L&0ZQ0@sOt zV_Q$)ve1+wBThDkzK68h0cD+dUTEa+oj64!rF9?Z!d())_a)ANA3nyxWXR0V$%X85 z_#mA8Gl@j)9nYP|EX;gv7>1J^qs4S^XI;Pa6Ra-%L}*K8*(Tjt`VATyrwL0xXaw9+y~qd?oAGotf?ExeNHvF4PrEOREi5k?ZY&DG-b+&VI!7^ zrSR@TkymS%h`AJ%0WA-T7vS7NF%KG-ijBDc3_4A9R*MM~m7;D~CZxcP zA(njB9$`cs^jM{-7IIM_vc+rxo5d!xaZD=9q3_TIbQX1?LySc&m&vQt7m~zPdYJmQ zlt8t>`NL8IEP7kQ>ip4S`*=3`CEZurbwu-AG1*U#q%l8{dzowI)^I*zWHq;to6Sw; z(m9zUw<@@ty~2Kui`i4`F%pcs**3NX7vLIyO zjw)#qEV4@@;c@MsS}S4rxK_%8F?FN~brPn=z<=AN!|<*^R%vshSOxBJ5)Y5QDox;w z2CVDQJSU6vOlT3s9#Jy|Iq4qyr18A*jPV3m9unfX=%>hq!zfcMbhXm4&?wT3gp5~B zM@@e)?J~8Rer>Xv)r{cW7+prcGv zsSa=-mPV_cDdJWNn1j-I_~;$Nb}^*9DUD-nxJ6e8?c=0uwLU>i)LcQ;!UZfIj~HTK zPq{?`i6Dkx2th2tP=YvuVFd96!&S;XV(tf!^)xcW#bJ6QTzmmX!Ro?<$o8V{DNOL| z;ZT!)6tu?b8MuFTLU4!3%O{1cd3Hh%8B`KRfjlQcfcZrUqqI!b2G<`gPEd~)k39AR z#7q!m=(yJ`z+Z~Z+8!NDNBE}Lyg5`ANzTT>yZ4zj&B3t)bIomkf;EdsISuX^U4ms? zWK6W6jr<9oV9ES6@kBAk x0+}F*U=+a}1jz)W2~r5g5R4@lM=+jX0zoQ48bLZi27#3zlORj|zB^kl{txK-Zpr`v delta 2648 zcmZvdeNa`$6~OP^d)d41>)w4lK;Us7k8kChK%!|>h+`~TK-958Vk!`^exyhd#Y9Yj zRxpG#kE$H2*1$B4poj`JcS0tOL!FpZ<-=rbLn7}P8)IziplJX3$M4Rb zd-j|?yL`GGMy=+e&4L|mt=Lqt zxu$}s9LFV_1(jz`HmuuFzIHLpar3c(%A1_cBB*F>`8wvke63|BRJi#$BAXtwCu%$z zF1YzX8`7stSz4asI38=a-8`+7NH=#C$sBw}>3mpJh%TfHSwcJ!rMuD%X%yWTLP;ud zl0bBe6pK23ILf-8YJ4z{}AEDB7=>;pP>pK+PDH z;Y2eH)T2s-I8NL6D@1rvw}x%I zP?m4hGd0q#??R4n{uTtZqpE$`C`D^&M^0YEi*Yt|cOX^!q61}%vwqGI{Wc6jFxpbR zwan1>aMU|BBE|Yh%YPHClGwt~c^uWI`bTZqJ573Zn$+ZtaCr~PS~u-~=``t?X;Set zDSw)jH%*#9O`4}m+CO_xXcXe;RKj*PdQhWBX!MA0c>>;j2L^Uv6)N{)EBxaiP6O8f zB3kqzT+ajQ6ID3#JG{vEQ$CEFc&O?p-k-tn!b?FGn12{!7=0Z_>T{&FN)cf{3S-0o81BN00o}4rhFbClvnn`>6MT+$9>q_=ozuoi$KywFvR6S$T(hklpS==B z4&V@2a158io%ii*1IKWV&y;lIHE`+WfJw*ux^ae2G1!fFz(A(c=9d$0mI$JuUg z9Wh3GgCcWXS-N+N?mYs2&}p>7SI2Su!!3HPQhn~Kd+<;F&f0JSC;1d}Pq0Jzyp0{T zcgumtj7hq8@vW1%KLX0k>SSa$F5pZ^k01nMF5=}XI0WMu<)`e!t2-_#{W8OeWzAJ?7X5ORVk)#m#cmgj4 z^i;&;v=#T*k$N38-ov{vKYH3M{Yi4#0y+Ni`k!T}35d7PPD`k-b*b#vy!x;VH3{*- z@FQ7jh2L~J!h9)aP^3(h`DoK9kr;$wU>F%DhM8f}Mu}Y73B4^Q2aLxk1YC`(1gX8U z4R)N8&Y_=GWSsr46j_zN%k3tDa*eLGQND?LDo98~6@Aa+&c4*5H|4D=LrBJu$^DN079 z9|})eBcSd!9|*1tD}|C_Wh1;_NUyV&I1?%j)T}+WoVqYAH*6}S1_Heg|DIw z0>nOTT?n4lv?Xjjs$ZybND=n^YK|AsAu&w=%oU zMRs2{YQ2l>FA31_iljo}e%S;?OY9lmJk?s4*xdr4cjN@0&A|fuOmAv8oiDI!=L;gv z{}&pL>*Yt!*0Tj_vM=!`3e-cs7QhrCaBivUeO*%98G+?-p?c^&h#p{N(`z=YIA%>H z`4LPVXA&9&lgaRacw`$1!QbGbfFqWln0pl@UJ;)(8--@{35w)vIEbS5mD