From f0ea92b8dd9330ec55d615ffa98bf3356f88fe18 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:37:09 -0500 Subject: [PATCH 1/9] batch37 task2 implement group A message header methods --- .../JetStream/NatsStream.MessageHeaders.cs | 298 ++++++++++++++++++ .../NatsStreamMessageHeadersTests.cs | 79 +++++ porting.db | Bin 6758400 -> 6758400 bytes 3 files changed, 377 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs new file mode 100644 index 0000000..8f8313a --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessageHeaders.cs @@ -0,0 +1,298 @@ +using System.Numerics; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal sealed class DedupeEntry + { + public required string Id { get; init; } + public ulong Seq { get; set; } + public long TimestampNanos { get; set; } + } + + private readonly object _ddLock = new(); + private Dictionary? _ddMap; + private List? _ddArr; + private int _ddIndex; + private Timer? _ddTimer; + + internal void Unsubscribe(Subscription? sub) + { + if (sub == null) + return; + + _mu.EnterReadLock(); + try + { + if (_closed) + return; + } + finally + { + _mu.ExitReadLock(); + } + + sub.Close(); + } + + internal Exception? SetupStore(FileStoreConfig? fileStoreConfig) + { + _mu.EnterWriteLock(); + try + { + if (Store != null) + { + RegisterStoreCallbacks(Store); + return null; + } + + var streamConfig = Config.Clone(); + IStreamStore store = streamConfig.Storage switch + { + StorageType.MemoryStorage => new JetStreamMemStore(streamConfig), + StorageType.FileStorage => BuildFileStore(fileStoreConfig, streamConfig), + _ => throw new InvalidOperationException($"unsupported storage type: {streamConfig.Storage}"), + }; + + Store = store; + RegisterStoreCallbacks(store); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private IStreamStore BuildFileStore(FileStoreConfig? fileStoreConfig, StreamConfig streamConfig) + { + var cfg = fileStoreConfig ?? FileStoreConfig(); + if (string.IsNullOrWhiteSpace(cfg.StoreDir)) + cfg.StoreDir = FileStoreConfig().StoreDir; + + var fsi = new FileStreamInfo + { + Created = Created, + Config = streamConfig, + }; + return new JetStreamFileStore(cfg, fsi); + } + + private void RegisterStoreCallbacks(IStreamStore store) + { + store.RegisterStorageUpdates(StoreUpdates); + store.RegisterStorageRemoveMsg(_ => { }); + store.RegisterProcessJetStreamMsg(_ => { }); + } + + internal void StoreUpdates(long msgs, long bytes, ulong seq, string subj) + { + _ = seq; + _ = subj; + Interlocked.Add(ref Msgs, msgs); + Interlocked.Add(ref Bytes, bytes); + } + + internal int NumMsgIds() + { + lock (_ddLock) + { + return _ddMap?.Count ?? 0; + } + } + + internal DedupeEntry? CheckMsgId(string id) + { + if (string.IsNullOrEmpty(id)) + return null; + + if (_ddMap == null || _ddMap.Count == 0) + return null; + + return _ddMap.GetValueOrDefault(id); + } + + internal void PurgeMsgIds() + { + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var window = (long)Config.Duplicates.TotalMilliseconds * 1_000_000L; + if (window <= 0) + { + lock (_ddLock) + { + _ddMap = null; + _ddArr = null; + _ddIndex = 0; + _ddTimer?.Dispose(); + _ddTimer = null; + } + return; + } + + lock (_ddLock) + { + if (_ddArr == null || _ddMap == null) + return; + + for (var i = _ddIndex; i < _ddArr.Count; i++) + { + var entry = _ddArr[i]; + if (now - entry.TimestampNanos >= window) + _ddMap.Remove(entry.Id); + else + { + _ddIndex = i; + break; + } + } + + if (_ddMap.Count == 0) + { + _ddMap = null; + _ddArr = null; + _ddIndex = 0; + _ddTimer?.Dispose(); + _ddTimer = null; + } + else + { + _ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan); + _ddTimer.Change(Config.Duplicates, Timeout.InfiniteTimeSpan); + } + } + } + + internal void StoreMsgId(DedupeEntry entry) + { + ArgumentNullException.ThrowIfNull(entry); + lock (_ddLock) + { + StoreMsgIdLocked(entry); + } + } + + internal void StoreMsgIdLocked(DedupeEntry entry) + { + ArgumentNullException.ThrowIfNull(entry); + if (Config.Duplicates <= TimeSpan.Zero) + return; + + _ddMap ??= new Dictionary(StringComparer.Ordinal); + _ddArr ??= new List(); + _ddMap[entry.Id] = entry; + _ddArr.Add(entry); + + _ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan); + } + + internal static string GetMsgId(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgId, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetExpectedLastMsgId(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastMsgId, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetExpectedStream(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedStream, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static (ulong Seq, bool Exists) GetExpectedLastSeq(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (0, false); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSeq, hdr); + if (value is null || value.Value.Length == 0) + return (0, false); + + var seq = ServerUtilities.ParseInt64(value.Value.Span); + return seq < 0 ? (0, false) : ((ulong)seq, true); + } + + internal static string GetRollup(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgRollup, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value).ToLowerInvariant(); + } + + internal static (ulong Seq, bool Exists) GetExpectedLastSeqPerSubject(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (0, false); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSubjSeq, hdr); + if (value is null || value.Value.Length == 0) + return (0, false); + + var seq = ServerUtilities.ParseInt64(value.Value.Span); + return seq < 0 ? (0, false) : ((ulong)seq, true); + } + + internal static string GetExpectedLastSeqPerSubjectForSubject(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastSubjSeqSubj, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static (long Ttl, Exception? Error) ParseMessageTTL(string ttl) => + JetStreamHeaderHelpers.ParseMessageTtl(ttl); + + internal static (BigInteger? Value, bool Ok) GetMessageIncr(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (null, true); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsMessageIncr, hdr); + if (value is null || value.Value.Length == 0) + return (null, true); + + if (BigInteger.TryParse(Encoding.ASCII.GetString(value.Value.Span), out var parsed)) + return (parsed, true); + + return (null, false); + } + + internal static (DateTime Schedule, bool Ok) GetMessageSchedule(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (default, true); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsSchedulePattern, hdr); + if (value is null || value.Value.Length == 0) + return (default, true); + + var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var pattern = Encoding.ASCII.GetString(value.Value.Span); + var (schedule, _, ok) = Internal.MsgScheduling.ParseMsgSchedule(pattern, ts); + return (schedule, ok); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs new file mode 100644 index 0000000..d456ce3 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamMessageHeadersTests.cs @@ -0,0 +1,79 @@ +using System.Numerics; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamMessageHeadersTests +{ + [Fact] + public void ParseMessageTTL_NeverValue_ReturnsMinusOne() + { + var (ttl, error) = NatsStream.ParseMessageTTL("never"); + + error.ShouldBeNull(); + ttl.ShouldBe(-1); + } + + [Fact] + public void GetMessageIncr_ValidHeader_ReturnsParsedValue() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMessageIncr, "42"); + + var (value, ok) = NatsStream.GetMessageIncr(hdr); + + ok.ShouldBeTrue(); + value.ShouldBe(new BigInteger(42)); + } + + [Fact] + public void GetMessageSchedule_InvalidPattern_ReturnsNotOk() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsSchedulePattern, "invalid"); + + var (schedule, ok) = NatsStream.GetMessageSchedule(hdr); + + ok.ShouldBeFalse(); + schedule.ShouldBe(default); + } + + [Fact] + public void SetupStore_MemoryStorage_CreatesMemStore() + { + var stream = CreateStream(duplicates: TimeSpan.FromSeconds(1)); + + var error = stream.SetupStore(null); + + error.ShouldBeNull(); + stream.Store.ShouldNotBeNull(); + stream.Store!.Type().ShouldBe(StorageType.MemoryStorage); + } + + [Fact] + public void StoreMsgIdAndPurge_ExpiredEntry_RemovesEntry() + { + var stream = CreateStream(duplicates: TimeSpan.FromMilliseconds(50)); + var oldTs = (DateTimeOffset.UtcNow - TimeSpan.FromSeconds(1)).ToUnixTimeMilliseconds() * 1_000_000L; + stream.StoreMsgId(new NatsStream.DedupeEntry { Id = "id-1", Seq = 1, TimestampNanos = oldTs }); + + stream.NumMsgIds().ShouldBe(1); + stream.CheckMsgId("id-1").ShouldNotBeNull(); + + stream.PurgeMsgIds(); + + stream.NumMsgIds().ShouldBe(0); + stream.CheckMsgId("id-1").ShouldBeNull(); + } + + private static NatsStream CreateStream(TimeSpan duplicates) + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Duplicates = duplicates, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 5652a7fd40d8196f996e3f022a449715862cf69e..3d4b277e8f2193eaca91cf7b0edc14d5c9d2fb13 100644 GIT binary patch delta 742 zcmY+?J4{ny0LAh9C{WAocPnTu)*^2O6p%O7Do`H?RS|rlT-xfr7)Yq3Bu);*7#rhg z^h=BhAOmsG5MsP5jv5Ci2NSBR4#?u-?~86h&RPok=H~b!=mcw3aKM(V)YF0u-VMUTlL8#W3K9 zfQbOMqXebcft?6q7eXj=%V&0f*^qZ17DZ%5Q1{E;fv_p32g7AgN_B5RTW7SRTaFKf zpZ{MelFf;5gGAd15{f*6kC7<$o%e#CJcCvXy{Fo4q-#1IlVgR?k?VVuVZMsWcbaS4}k1!K61 zYZ!OO64r}RH7fN*tE)=%n?KD4v&1YEzr?2aB;JcP@l;HR5z!>7L{RwL%9PdRmycdr zD&C|%+1xhBd@I0O3|ZujhJHIH+@D-thj-XR=w~jijtAB zyfy7h*A^>{aaC2TWjt@S`2SgG>&$KDElePZ6wTnb!J_@mqu1%7$Lm@` vMwjdbxv^xg$-aE@fgCQ?jDWG_*CX0(Cp$Gc#W{cE+LvuOZu|1&hdaLk&3pQD delta 677 zcmY+*y-yQy0LJm_U4e4FUa=^GM_Yv&&Hc((@1P== zH?rMsPgY&`xZfAm5q~J4g;gh)&#kQF9AmT5N|B`o4PG>%87=Uk6*~Md5P*pw+OQ2F zv|~GVpaVP6i7q#?w5$F@ep?U6WHqWa%ha?Ll7$&7D$_;HC&jE)`G3`A!nVTl*CWj= zWo!%G2qS_hdJsb|cB2n*?7?32V*vXwi2WGCFcLU`gBZaf9L5nGMH0s_iZL9=I8vCv zB+@v6lQ@ObID;vi#Wc=g2D7kn9v5&CbGU?gEZ{P(;3}@+I&L6?o4Dm>Z1H}cGW1Q*Bs>B^jg092&}A From 07b494544db1cea6bab7b7d420940410af2c2ae1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:42:47 -0500 Subject: [PATCH 2/9] batch37 task3 implement group B direct-get and pipeline --- .../JetStream/JetStreamApiTypes.cs | 20 +- .../JetStream/NatsStream.DirectGet.cs | 246 ++++++++++++++++++ .../JetStream/NatsStream.MessagePipeline.cs | 98 +++++++ .../JetStream/StreamTypes.MessageCarriers.cs | 20 ++ .../JetStream/StreamTypes.cs | 6 +- .../NatsStreamDirectGetPipelineTests.cs | 152 +++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 7 files changed, 537 insertions(+), 5 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs index 7234cbc..ba3cdac 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs @@ -21,8 +21,24 @@ namespace ZB.MOM.NatsNet.Server; // Forward stubs for types defined in later sessions // --------------------------------------------------------------------------- -/// Stub: stored message type — full definition in session 20. -public sealed class StoredMsg { } +/// Stored message returned by direct-get and message-get APIs. +public sealed class StoredMsg +{ + [JsonPropertyName("subject")] + public string Subject { get; set; } = string.Empty; + + [JsonPropertyName("seq")] + public ulong Sequence { get; set; } + + [JsonPropertyName("hdrs")] + public byte[]? Header { get; set; } + + [JsonPropertyName("data")] + public byte[]? Data { get; set; } + + [JsonPropertyName("time")] + public DateTime Time { get; set; } +} /// /// Priority group for pull consumers. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs new file mode 100644 index 0000000..a99ca01 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs @@ -0,0 +1,246 @@ +using System.Text; +using System.Text.Json; +using System.Linq; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal static (string Ttl, bool Ok) GetMessageScheduleTTL(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (string.Empty, true); + + var ttl = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTtl, hdr); + if (ttl == null || ttl.Length == 0) + return (string.Empty, true); + + var ttlValue = Encoding.ASCII.GetString(ttl); + var (_, err) = ParseMessageTTL(ttlValue); + return err == null ? (ttlValue, true) : (string.Empty, false); + } + + internal static string GetMessageScheduleTarget(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTarget, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetMessageScheduleSource(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleSource, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetBatchId(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsBatchId, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static (ulong Seq, bool Exists) GetBatchSequence(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (0, false); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsBatchSeq, hdr); + if (value is null || value.Value.Length == 0) + return (0, false); + + var parsed = ServerUtilities.ParseInt64(value.Value.Span); + return parsed < 0 ? (0, false) : ((ulong)parsed, true); + } + + public bool IsClustered() + { + _mu.EnterReadLock(); + try + { + return IsClusteredInternal(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool IsClusteredInternal() => _node is IRaftNode; + + internal void QueueInbound(IpQueue inbound, string subject, string? reply, byte[]? hdr, byte[]? msg, object? sourceInfo, object? trace) + { + _ = sourceInfo; + _ = trace; + + var inboundMsg = InMsg.Rent(); + inboundMsg.Subject = subject; + inboundMsg.Reply = reply; + inboundMsg.Hdr = hdr; + inboundMsg.Msg = msg; + + var (_, error) = inbound.Push(inboundMsg); + if (error != null) + inboundMsg.ReturnToPool(); + } + + internal JsApiMsgGetResponse ProcessDirectGetRequest(string reply, byte[]? hdr, byte[]? msg) + { + var response = new JsApiMsgGetResponse(); + if (string.IsNullOrWhiteSpace(reply)) + return response; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + return response; + } + + if (msg == null || msg.Length == 0) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + JsApiMsgGetRequest? request; + try + { + request = JsonSerializer.Deserialize(msg); + } + catch (JsonException) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + if (request == null) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + return GetDirectRequest(request, reply); + } + + internal JsApiMsgGetResponse ProcessDirectGetLastBySubjectRequest(string subject, string reply, byte[]? hdr, byte[]? msg) + { + var response = new JsApiMsgGetResponse(); + if (string.IsNullOrWhiteSpace(reply)) + return response; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + return response; + } + + var request = msg == null || msg.Length == 0 + ? new JsApiMsgGetRequest() + : JsonSerializer.Deserialize(msg) ?? new JsApiMsgGetRequest(); + + var key = ExtractDirectGetLastBySubjectKey(subject); + if (string.IsNullOrEmpty(key)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + request.LastFor = key; + return GetDirectRequest(request, reply); + } + + internal JsApiMsgGetResponse GetDirectMulti(JsApiMsgGetRequest request, string reply) + { + _ = reply; + + if (Store == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + var filters = request.MultiLastFor ?? []; + if (filters.Length == 0) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + var (seqs, error) = Store.MultiLastSeqs(filters, request.UpToSeq, 1024); + if (error != null || seqs.Length == 0) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + var firstSeq = seqs[0]; + var loaded = Store.LoadMsg(firstSeq, new StoreMsg()); + if (loaded == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) }; + } + + internal JsApiMsgGetResponse GetDirectRequest(JsApiMsgGetRequest request, string reply) + { + _ = reply; + if (request.MultiLastFor is { Length: > 0 }) + return GetDirectMulti(request, reply); + + if (Store == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + StoreMsg? loaded = null; + if (request.Seq > 0) + { + loaded = Store.LoadMsg(request.Seq, new StoreMsg()); + } + else if (!string.IsNullOrWhiteSpace(request.LastFor)) + { + loaded = Store.LoadLastMsg(request.LastFor!, new StoreMsg()); + } + else if (!string.IsNullOrWhiteSpace(request.NextFor)) + { + var (sm, _) = Store.LoadNextMsg(request.NextFor!, SubscriptionIndex.SubjectHasWildcard(request.NextFor!), request.Seq, new StoreMsg()); + loaded = sm; + } + else if (request.StartTime.HasValue) + { + var seq = Store.GetSeqFromTime(request.StartTime.Value); + if (seq > 0) + loaded = Store.LoadMsg(seq, new StoreMsg()); + } + + if (loaded == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) }; + } + + private static StoredMsg ToStoredMsg(StoreMsg loaded) => new() + { + Subject = loaded.Subject, + Sequence = loaded.Seq, + Header = loaded.Hdr, + Data = loaded.Msg, + Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime, + }; + + private static string? GetRequiredApiLevelHeader(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return null; + + var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr); + return value == null || value.Length == 0 ? null : Encoding.ASCII.GetString(value); + } + + private static string ExtractDirectGetLastBySubjectKey(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return string.Empty; + + var parts = subject.Split('.'); + return parts.Length <= 5 ? string.Empty : string.Join('.', parts.Skip(5)); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs new file mode 100644 index 0000000..3c68827 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs @@ -0,0 +1,98 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal Exception? ProcessInboundJetStreamMsg(InMsg? msg) + { + if (msg == null) + return new ArgumentNullException(nameof(msg)); + + try + { + return ProcessJetStreamMsg( + msg.Subject, + msg.Reply ?? string.Empty, + msg.Hdr, + msg.Msg, + lseq: 0, + ts: 0, + msgTrace: null, + sourced: false, + canRespond: true); + } + finally + { + msg.ReturnToPool(); + } + } + + internal Exception? ProcessJetStreamMsg( + string subject, + string reply, + byte[]? hdr, + byte[]? msg, + ulong lseq, + long ts, + object? msgTrace, + bool sourced, + bool canRespond) + { + _ = reply; + _ = msgTrace; + _ = sourced; + _ = canRespond; + + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject is required", nameof(subject)); + + if (Store == null) + return new InvalidOperationException("store not initialized"); + + var batchId = GetBatchId(hdr); + if (!string.IsNullOrEmpty(batchId)) + return ProcessJetStreamBatchMsg(batchId, subject, reply, hdr, msg, msgTrace); + + try + { + var (seq, _) = Store.StoreMsg(subject, hdr, msg, ttl: 0); + if (lseq > 0) + seq = lseq; + + if (ts == 0) + ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + + Interlocked.Exchange(ref LastSeq, (long)seq); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal Exception? ProcessJetStreamBatchMsg(string batchId, string subject, string reply, byte[]? hdr, byte[]? msg, object? msgTrace) + { + _ = reply; + _ = msgTrace; + + if (string.IsNullOrWhiteSpace(batchId)) + return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishInvalidBatchIDError().ToString()); + + var (_, exists) = GetBatchSequence(hdr); + if (!exists) + return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishMissingSeqError().ToString()); + + if (Store == null) + return new InvalidOperationException("store not initialized"); + + try + { + Store.StoreMsg(subject, hdr, msg, ttl: 0); + return null; + } + catch (Exception ex) + { + return ex; + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs new file mode 100644 index 0000000..112ea30 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs @@ -0,0 +1,20 @@ +using System.Collections.Concurrent; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class InMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static InMsg Rent() => Pool.TryTake(out var msg) ? msg : new InMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Reply = null; + Hdr = null; + Msg = null; + Client = null; + Pool.Add(this); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index c63e5dd..4bd6ecb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -227,7 +227,7 @@ public sealed class JsStreamPubMsg /// A JetStream publish message with sync tracking. /// Mirrors jsPubMsg in server/stream.go. /// -public sealed class JsPubMsg +public sealed partial class JsPubMsg { public string Subject { get; set; } = string.Empty; public string? Reply { get; set; } @@ -245,7 +245,7 @@ public sealed class JsPubMsg /// An inbound message to be processed by the JetStream layer. /// Mirrors inMsg in server/stream.go. /// -public sealed class InMsg +public sealed partial class InMsg { public string Subject { get; set; } = string.Empty; public string? Reply { get; set; } @@ -277,7 +277,7 @@ public sealed class InMsg /// A cached/clustered message for replication. /// Mirrors cMsg in server/stream.go. /// -public sealed class CMsg +public sealed partial class CMsg { public string Subject { get; set; } = string.Empty; public byte[]? Msg { get; set; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs new file mode 100644 index 0000000..8771204 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs @@ -0,0 +1,152 @@ +using System.Text; +using System.Text.Json; +using NSubstitute; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamDirectGetPipelineTests +{ + [Fact] + public void GetMessageScheduleTTL_InvalidValue_ReturnsNotOk() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsScheduleTtl, "not-a-ttl"); + + var (ttl, ok) = NatsStream.GetMessageScheduleTTL(hdr); + + ok.ShouldBeFalse(); + ttl.ShouldBe(string.Empty); + } + + [Fact] + public void GetBatchSequence_ValidHeader_ReturnsSequence() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsBatchSeq, "9"); + + var (seq, ok) = NatsStream.GetBatchSequence(hdr); + + ok.ShouldBeTrue(); + seq.ShouldBe(9UL); + } + + [Fact] + public void IsClustered_WithAssignmentNode_ReturnsTrue() + { + var stream = CreateStream(); + var node = Substitute.For(); + var assignment = new StreamAssignment + { + Group = new RaftGroup + { + Node = node, + Peers = ["N1"], + }, + }; + + stream.SetStreamAssignment(assignment); + + stream.IsClustered().ShouldBeTrue(); + stream.IsClusteredInternal().ShouldBeTrue(); + } + + [Fact] + public void InMsgReturnToPool_WithValues_ClearsState() + { + var msg = new InMsg + { + Subject = "foo", + Reply = "bar", + Hdr = [1, 2], + Msg = [3, 4], + Client = new object(), + }; + + msg.ReturnToPool(); + + msg.Subject.ShouldBe(string.Empty); + msg.Reply.ShouldBeNull(); + msg.Hdr.ShouldBeNull(); + msg.Msg.ShouldBeNull(); + } + + [Fact] + public void QueueInbound_WithQueue_PushesMessage() + { + var stream = CreateStream(); + var queue = new IpQueue("inbound"); + + stream.QueueInbound(queue, "orders.created", null, null, [1], null, null); + var popped = queue.Pop(); + + popped.ShouldNotBeNull(); + popped!.Length.ShouldBe(1); + popped[0].Subject.ShouldBe("orders.created"); + } + + [Fact] + public void ProcessDirectGetRequest_SeqRequest_ReturnsStoredMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("orders", null, [7, 8], ttl: 0); + + var request = JsonSerializer.SerializeToUtf8Bytes(new JsApiMsgGetRequest { Seq = 1 }); + var response = stream.ProcessDirectGetRequest("reply.inbox", null, request); + + response.Error.ShouldBeNull(); + response.Message.ShouldNotBeNull(); + response.Message!.Sequence.ShouldBe(1UL); + response.Message.Subject.ShouldBe("orders"); + } + + [Fact] + public void ProcessDirectGetLastBySubjectRequest_InvalidSubject_ReturnsBadRequest() + { + var stream = CreateStream(); + + var response = stream.ProcessDirectGetLastBySubjectRequest("$JS.API.DIRECT.GET.STREAM", "reply", null, null); + + response.Error.ShouldNotBeNull(); + response.Error!.ErrCode.ShouldBe(JsApiErrors.BadRequest.ErrCode); + } + + [Fact] + public void ProcessJetStreamMsg_WithMemoryStore_StoresMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + + var error = stream.ProcessJetStreamMsg("events", string.Empty, null, Encoding.ASCII.GetBytes("m1"), 0, 0, null, false, true); + var stored = stream.Store!.LoadMsg(1, new StoreMsg()); + + error.ShouldBeNull(); + stored.ShouldNotBeNull(); + stored!.Subject.ShouldBe("events"); + } + + [Fact] + public void ProcessJetStreamBatchMsg_MissingSequence_ReturnsApiError() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + + var error = stream.ProcessJetStreamBatchMsg("batch-1", "events", string.Empty, null, [1], null); + + error.ShouldNotBeNull(); + error.ShouldBeOfType(); + } + + private static NatsStream CreateStream() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Subjects = ["events.>"], + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 3d4b277e8f2193eaca91cf7b0edc14d5c9d2fb13..24ba7979e8661213cb2b00ee322fb5cacc288b37 100644 GIT binary patch delta 1448 zcmY+^O>7Kd9LMpUXLkCscAnj~URqDLFRPTT_ZHPs?Y65eMT>fW*)m%yk!lYPwrnM$ zi3mv#{f9It8V(K)WkL|dp%FoxL`1kakWeL6)mp!Lna&(O$$sbm&y#uPu&wty<1XX3! zFsjI?pQyP;4WYb7{XmU3Y7iA;)OS~}G-%X-E2mfbVB|LpUmNum_0*_-SFg_Zv)jyW zv)S_LNh3>6#IMx<3o63K+XfmL$Y2KrVjvdcARZhr4xFIs1C4x=L^aDrHbuXr#ZqAt zU)Yf($0>R93v%=IBTYQcVH)3S;qi1g%rnP!=wq07TUVxjW$mtj6s}%V~75Ss8 zhpzaAZ|qIzjb8-GpXJG>;xaLg4wMS>_c>Q8!qlGmcWSx-HI<1y^e`m69T6T!EFh+t z&Rh@>wbZ^z_E5$|S*4zUFntvLT|iVzrg3qFaM8YUk#0Jnqg=FFS7w{0;R^AL689+o z&c^|6I#wyf*h<^AN&%_hfiy^m49Em8Oo1$z3ezAPro#;IK@QA>Suh*sz+9LI^C1`V zU;!+IMUW4Rp#TbD2`q&oSO&#V0)8lkG6=wOD2EEDgcYz7s$dnYhBZ(PHLw=e!Fs5L z4X_b5!DiS3TVWe)haIpJf=~y$V7Gp)QmfryS1N2nq+HNGd)YAi#NP7b>;-$oZnCTF zEIVm`X@6|LqnuJ&m1fyN)qPT0M~&M@MP1r?8tKwbM^xFv4u+Z=>KpJU^wUy9mil3- zK}&tN)PSYFS?a5$`c-qPOi#X~U6y?2ht6M?8nM(cy}zu5=*tXuZkj5YN2<+S&S+a_ Nw+11-v%6k9^c%Js16=?B delta 1153 zcmY+=Sx8h-90u^Y=Z@nz&72w6)YIganoFf-+M=nWlQUX2rkUB|($yjgOL|gs3M4Ti zDu{nYpq7#k2?f%n73-}G$+zgCgx(^^7HrSHaV)$KzlZO9oc~!)j9rya_;gF7OlkpFPieRCVkTBRDPJsexj-h#8e!zEHGL3tAvTui_XV%NX3N{Fa5D0}Z2!{w*43Q9}&U&M>A5&9V^xOs3 z&kdFnE_cw$3-%})EfSf;oEAGxhsh=~J4B-Y=mLi*r^<0=ryCBD>R)@~5cSk_2v4!S zG@5CD@xP+4ShQ2qRYF%kQUVWE)hZpOGS)-Ae9QQw$1c^WT#6^(6#f%`Hw7g2^)30G1d2x zJtJ-bOTZ3GAqHY04&osJmO&yUK{70d6i9_MNQV`$5>`P5WI`6KhHO{^Igkr$VI8c8 z4Uh- z9@q=@&;X6l1pCwxw^CiBXJ>TpD4<{YaPbK%qrC%4pHxTuHjCD?J8YPK4JhZB{s9%f vu0KtlP-a$_T~{tiEWkz6DYk6-G^p&9swp!=^32se!$S%*tHVPrwe5cab04<5 From 6290b17a828a6065286722d7e5abfa4f57bb2ba7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:45:07 -0500 Subject: [PATCH 3/9] batch37 task4 implement group C message carriers and consumer signaling --- .../JetStream/NatsStream.Consumers.cs | 121 ++++++++++++++++++ .../JetStream/StreamTypes.MessageCarriers.cs | 73 +++++++++++ .../JetStream/NatsStreamConsumersTests.cs | 71 ++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 4 files changed, 265 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs new file mode 100644 index 0000000..2c8942f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -0,0 +1,121 @@ +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private readonly object _consumersSync = new(); + private readonly Dictionary _consumers = new(StringComparer.Ordinal); + private List _consumerList = []; + private readonly IpQueue _sigQueue = new("js-signal"); + private readonly Channel _signalWake = Channel.CreateBounded(1); + private readonly Channel _internalWake = Channel.CreateBounded(1); + private readonly JsOutQ _outq = new(); + + internal static CMsg NewCMsg(string subject, ulong seq) + { + var msg = CMsg.Rent(); + msg.Subject = subject; + msg.Seq = seq; + return msg; + } + + internal void SignalConsumersLoop(CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + if (!_signalWake.Reader.TryRead(out _)) + { + Thread.Sleep(1); + continue; + } + + var messages = _sigQueue.Pop(); + if (messages == null) + continue; + + foreach (var msg in messages) + { + SignalConsumers(msg.Subject, msg.Seq); + msg.ReturnToPool(); + } + } + } + + internal void SignalConsumers(string subject, ulong seq) + { + _ = subject; + _ = seq; + + lock (_consumersSync) + { + _ = _consumerList.Count; + } + } + + internal static JsPubMsg NewJSPubMsg(string destinationSubject, string subject, string? reply, byte[]? hdr, byte[]? msg, NatsConsumer? consumer, ulong seq) + { + var pub = GetJSPubMsgFromPool(); + pub.Subject = destinationSubject; + pub.Reply = reply; + pub.Hdr = hdr; + pub.Msg = msg; + pub.Pa = new StoreMsg + { + Subject = subject, + Seq = seq, + Hdr = hdr ?? [], + Msg = msg ?? [], + Buf = [], + }; + pub.Sync = consumer; + return pub; + } + + internal static JsPubMsg GetJSPubMsgFromPool() => JsPubMsg.Rent(); + + internal void SetupSendCapabilities() + { + _ = _outq; + _signalWake.Writer.TryWrite(true); + } + + internal string AccName() => Account.Name; + + internal string NameLocked() => Name; + + internal void InternalLoop(CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + if (!_internalWake.Reader.TryRead(out _)) + { + Thread.Sleep(1); + continue; + } + + var messages = _sigQueue.Pop(); + if (messages == null) + continue; + + foreach (var msg in messages) + { + SignalConsumers(msg.Subject, msg.Seq); + msg.ReturnToPool(); + } + } + } + + internal void ResetAndWaitOnConsumers() + { + List snapshot; + lock (_consumersSync) + { + snapshot = [.. _consumerList]; + } + + foreach (var consumer in snapshot) + consumer.Stop(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs index 112ea30..21ce5f3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -18,3 +19,75 @@ public sealed partial class InMsg Pool.Add(this); } } + +public sealed partial class CMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static CMsg Rent() => Pool.TryTake(out var msg) ? msg : new CMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Msg = null; + Seq = 0; + Pool.Add(this); + } +} + +public sealed partial class JsPubMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static JsPubMsg Rent() => Pool.TryTake(out var msg) ? msg : new JsPubMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Reply = null; + Hdr = null; + Msg = null; + Pa = null; + Sync = null; + Pool.Add(this); + } + + internal int Size() => + (Subject?.Length ?? 0) + + (Reply?.Length ?? 0) + + (Hdr?.Length ?? 0) + + (Msg?.Length ?? 0); +} + +public sealed class JsOutQ +{ + private readonly IpQueue _queue = new("js-outq"); + private bool _registered = true; + + public (int Len, Exception? Error) SendMsg(string reply, byte[] payload) + { + if (string.IsNullOrWhiteSpace(reply)) + return (0, new ArgumentException("reply is required", nameof(reply))); + + var msg = JsPubMsg.Rent(); + msg.Subject = reply; + msg.Msg = payload; + return Send(msg); + } + + public (int Len, Exception? Error) Send(JsPubMsg msg) + { + if (!_registered) + return (0, new InvalidOperationException("queue is unregistered")); + + return _queue.Push(msg); + } + + public void Unregister() + { + _registered = false; + _queue.Unregister(); + } + + internal JsPubMsg[]? Pop() => _queue.Pop(); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs new file mode 100644 index 0000000..6b0f4e0 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs @@ -0,0 +1,71 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamConsumersTests +{ + [Fact] + public void NewCMsg_ReturnToPool_ClearsValues() + { + var msg = NatsStream.NewCMsg("orders.created", 22); + msg.Subject.ShouldBe("orders.created"); + msg.Seq.ShouldBe(22UL); + + msg.ReturnToPool(); + + msg.Subject.ShouldBe(string.Empty); + msg.Seq.ShouldBe(0UL); + } + + [Fact] + public void NewJSPubMsg_WithHeaderAndData_ReturnsSizedMessage() + { + var pub = NatsStream.NewJSPubMsg("inbox.x", "orders", "reply", [1, 2], [3, 4, 5], null, 12); + + pub.Subject.ShouldBe("inbox.x"); + pub.Size().ShouldBeGreaterThan(0); + } + + [Fact] + public void JsOutQ_SendThenUnregister_RejectsFutureSends() + { + var outq = new JsOutQ(); + var first = outq.SendMsg("inbox.1", [1, 2, 3]); + first.Error.ShouldBeNull(); + first.Len.ShouldBeGreaterThan(0); + + outq.Unregister(); + var second = outq.SendMsg("inbox.2", [4]); + second.Error.ShouldNotBeNull(); + } + + [Fact] + public void AccName_AndNameLocked_ReturnConfiguredValues() + { + var stream = CreateStream(); + + stream.AccName().ShouldBe("A"); + stream.NameLocked().ShouldBe("S"); + } + + [Fact] + public void SetupSendCapabilities_AndResetConsumers_DoNotThrow() + { + var stream = CreateStream(); + + Should.NotThrow(stream.SetupSendCapabilities); + Should.NotThrow(stream.ResetAndWaitOnConsumers); + } + + private static NatsStream CreateStream() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 24ba7979e8661213cb2b00ee322fb5cacc288b37..c81d811e256e9081530efde3caa74a93a881eaa2 100644 GIT binary patch delta 1549 zcmY+^T}&KR6bJB`yEDV?!d_TdfeOo4q2N;Lf|S-(DPT(xYrAccDk|=_vwZl`!cr&& zQ=w@OX@ka^&|~Sg9x0GYG$FAvJ~fRt#)m#M#HR)Yd1wqqL)8Z#`rw5o)18O^!*Bj` z=bpKf%+RfvZD=hwr3pDrrAm_Y(3D%~FH^}vOQyKcyeXH^ZBt1? zbEXo7uA6d7ha_p-q(hXWrtCteOeF|)nbL#~n^J|EOesS3BR7vR5u1o2qKarD2_kk8 zhlo=oQ6x#kW!yZ*XBbvo&~osXZeDmX&1P3CD+61r(1`NJN5yOpSFx~%Z;Wf3dU&CwyR!T=;n4D?Bzvq5Ba*6 zzmIi=-eRon|ked42EU zQ{!r?B-ajSVD}O3K~FzFgY(51k5`n&ZS-Hic>CoFXAXu=@{+h>>?D5&PfyqiFz*uA zuy}yyK5mVJ13ZwAe_s=Gta@xtym6WP&3YR?xXkzAgJ^CEcEl{>Uy6zC){f)i6<%g8 zs4@aqxJSnC)0G13x@tM1iL3nKqXQH#?82Wu=W*i$aiCB)jXbaHOKz^Wd zpbDU8fGUBu0#yNR1KJMsEKmUGIiTl(UI3~F+5uDpR0~uG^ditspj|-qK)Zn&fL;P> z1bP{056~+>dx7==1%dVh9RNBA)CBY@P&3dWpx1z22WkO&1L#elw}4uK-UbTIQCO*w zmko6({EdvEP_k!i3kw+9a@d)`q$SK>RTnUr#**->#P-n#5UnTq~9q}b+uV-(9UZg*n+k?TcvVF=~FtC7G)3bSE?1i%CP2cs2*Ft z(2tj>tLn1)v-+d@ojRj3ZO2khm^(IA)N!hqEsTavE7-se<=`-u&r(|O9lOuVzn`aVV#iK4 zM(Aj1k{cy6H;&R~&iyPCpBSZm|1TO!v%)AHYR>F7Ne8Y}^ZAqHVb26z?IoFkw2^r?EjQhZk{9`2Wu>39G|4FdUp3)G+y_}HeR|(9i^DRZqhLh zKgI{f=ZkHMe`RhS(Wt4--l8^MxvN$fdvB3j;`?27%{-POKNl|G?skeEZQK=$IC%Lp z3KSQP*}SaIsA+-{Q!EV?Cdto1R!+r?D0qB8)&>G3O|7}l^p7Bu z9FlwFPUV_1uDq)ZDE&&e(!n1)x>_B7DZ>9N=?)m*-noyiY0C3YBUZ JlU4C)>%TC|m`?xz From a805af1beaf1794a05f091883838516b56851877 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:47:48 -0500 Subject: [PATCH 4/9] batch37 task5 implement group D consumer registry and interest state --- .../JetStream/NatsStream.Consumers.cs | 192 ++++++++++++++++++ .../JetStream/NatsStreamConsumersTests.cs | 73 ++++++- porting.db | Bin 6758400 -> 6758400 bytes 3 files changed, 264 insertions(+), 1 deletion(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs index 2c8942f..13761e9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -1,5 +1,6 @@ using System.Threading.Channels; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -118,4 +119,195 @@ internal sealed partial class NatsStream foreach (var consumer in snapshot) consumer.Stop(); } + + internal StoredMsg? GetMsg(ulong seq) + { + StoredMsg? result = null; + if (Store == null) + return result; + + var loaded = Store.LoadMsg(seq, new StoreMsg()); + if (loaded == null) + return result; + + result = new StoredMsg + { + Subject = loaded.Subject, + Sequence = loaded.Seq, + Header = loaded.Hdr, + Data = loaded.Msg, + Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime, + }; + return result; + } + + internal List GetConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList]; + } + } + + internal int NumPublicConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count(c => !c.Config.Direct); + } + } + + internal List GetPublicConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList.Where(c => !c.Config.Direct)]; + } + } + + internal List GetDirectConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList.Where(c => c.Config.Direct)]; + } + } + + internal void CheckInterestState() + { + if (!IsInterestRetention() || Store == null) + return; + + var consumers = GetConsumers(); + if (consumers.Count == 0) + return; + + ulong floor = ulong.MaxValue; + foreach (var consumer in consumers) + { + var ack = Interlocked.Read(ref consumer.AckFloor); + if (ack > 0 && (ulong)ack < floor) + floor = (ulong)ack; + } + + if (floor != ulong.MaxValue) + Store.Compact(floor); + } + + internal bool IsInterestRetention() => Config.Retention != RetentionPolicy.LimitsPolicy; + + internal int NumConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count; + } + } + + internal void SetConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers[consumer.Name] = consumer; + if (_consumerList.All(c => !ReferenceEquals(c, consumer))) + _consumerList.Add(consumer); + } + } + + internal void RemoveConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers.Remove(consumer.Name); + _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); + } + } + + internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters) + { + _ = consumer; + _ = newFilters; + } + + internal NatsConsumer? LookupConsumer(string name) + { + if (string.IsNullOrWhiteSpace(name)) + return _consumers.GetValueOrDefault(string.Empty); + + lock (_consumersSync) + { + return _consumers.GetValueOrDefault(name); + } + } + + internal int NumDirectConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count(c => c.Config.Direct); + } + } + + internal StreamState StateWithDetail(bool details) + { + _ = details; + return State(); + } + + internal bool PartitionUnique(string name, string[] partitions) + { + lock (_consumersSync) + { + foreach (var partition in partitions) + { + foreach (var existing in _consumerList) + { + if (existing.Name == name) + continue; + + var filters = existing.Config.FilterSubjects ?? + (string.IsNullOrWhiteSpace(existing.Config.FilterSubject) ? [] : [existing.Config.FilterSubject!]); + + foreach (var filter in filters) + { + if (SubscriptionIndex.SubjectsCollide(partition, filter)) + return false; + } + } + } + } + + return true; + } + + internal bool PotentialFilteredConsumers() + { + var subjects = Config.Subjects ?? []; + if (subjects.Length == 0) + return false; + + lock (_consumersSync) + { + if (_consumerList.Count == 0) + return false; + } + + if (subjects.Length > 1) + return true; + + return SubscriptionIndex.SubjectHasWildcard(subjects[0]); + } + + internal bool NoInterest(ulong seq, NatsConsumer? observingConsumer) + { + _ = seq; + lock (_consumersSync) + { + return _consumerList.All(c => ReferenceEquals(c, observingConsumer)); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs index 6b0f4e0..1090410 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs @@ -58,13 +58,84 @@ public sealed class NatsStreamConsumersTests Should.NotThrow(stream.ResetAndWaitOnConsumers); } - private static NatsStream CreateStream() + [Fact] + public void SetConsumer_LookupAndCounts_ReturnExpectedValues() + { + var stream = CreateStream(); + var standard = new NatsConsumer("S", new ConsumerConfig { Name = "c1", Direct = false }, DateTime.UtcNow); + var direct = new NatsConsumer("S", new ConsumerConfig { Name = "c2", Direct = true }, DateTime.UtcNow); + + stream.SetConsumer(standard); + stream.SetConsumer(direct); + + stream.NumConsumers().ShouldBe(2); + stream.NumPublicConsumers().ShouldBe(1); + stream.NumDirectConsumers().ShouldBe(1); + stream.LookupConsumer("c2").ShouldBe(direct); + } + + [Fact] + public void GetMsg_WhenStored_ReturnsStoredMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("events", null, [1, 2], ttl: 0); + + var message = stream.GetMsg(1); + + message.ShouldNotBeNull(); + message!.Subject.ShouldBe("events"); + message.Sequence.ShouldBe(1UL); + } + + [Fact] + public void PartitionUnique_WithCollidingFilters_ReturnsFalse() + { + var stream = CreateStream(); + var existing = new NatsConsumer("S", new ConsumerConfig { Name = "existing", FilterSubject = "orders.*" }, DateTime.UtcNow); + stream.SetConsumer(existing); + + var unique = stream.PartitionUnique("new", ["orders.created"]); + + unique.ShouldBeFalse(); + } + + [Fact] + public void PotentialFilteredConsumers_WithWildcardSubjectAndConsumer_ReturnsTrue() + { + var stream = CreateStream(["orders.>"]); + stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow)); + + stream.PotentialFilteredConsumers().ShouldBeTrue(); + } + + [Fact] + public void NoInterest_WithOnlyObservingConsumer_ReturnsTrue() + { + var stream = CreateStream(); + var observer = new NatsConsumer("S", new ConsumerConfig { Name = "observer" }, DateTime.UtcNow); + stream.SetConsumer(observer); + + stream.NoInterest(1, observer).ShouldBeTrue(); + } + + [Fact] + public void IsInterestRetention_WhenPolicyIsInterest_ReturnsTrue() + { + var stream = CreateStream(retention: RetentionPolicy.InterestPolicy); + + stream.IsInterestRetention().ShouldBeTrue(); + } + + private static NatsStream CreateStream(string[]? subjects = null, RetentionPolicy retention = RetentionPolicy.LimitsPolicy) { var account = new Account { Name = "A" }; var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage, + Subjects = subjects ?? ["events.>"], + Retention = retention, }; return new NatsStream(account, config, DateTime.UtcNow); } diff --git a/porting.db b/porting.db index c81d811e256e9081530efde3caa74a93a881eaa2..66904ce90565d7fc2d30984e741804bce92eb3ba 100644 GIT binary patch delta 1545 zcmY+^NlX(_9LMqLw8&EWR~Dh$lu}BIf`B3*?%=+G;J$;lpoUaK(8PnXL_<6nO||{Q z$f025fGi%2V`7ZPgEx&ao;;Z7L9awjjENe5o#N0rd=B6K-%MWLOQ!e5u(3Dps^O7n zXiYVTg+WuP5OIlqrMXqSZs7B6qDT4ACO%2;5d>2x5Nr>2bp->KlFWcWTKO$TNz}@R zl^eD4qH47Aph~nVMA@}+qcXKB5Cr32tz5zZ!8oI(gyCncoT&F&IZ&^(vZJ18WkZc? zWkn4ur#rvoDLr2v?G(u%_zbkbSDll-1(<*W78JW2^N{i>ltVHP=_m3Jq z(H*acqc& z6B~BRvl~u4pC#wvdB26*p5pm)@nQ33M=3v-$ual5@>Qn3`H45*9H*kW2|pjFE1cbt zf^(~Pm^t`>mZ?+q-1^P359e0bw~aq_;{7ilP=w$4^Jf?LOw8UzVlQQ~7kjC*B-Xy2 zpjsYnN?eGUD<*aKRxNutE5~T#-bq@+EhA13yN4)^zfR5`PvSnDQ`FDjR~M_7Z{}B1 zDf2IlM!ve^HY{-FJSO|HL4_2^&9UR~U30zPBZYTr~ z6oD6t!3TaQfkm(wN}&vvKsi)EB`k$yPzB4O8fu^xRzMxBgjKK_*1%d=2kW688eju# zgiWv+w!l`{2HRl=?1Wv=2u-jXnxO^uz+Tt~`{4i_ghOx`j=)hk2FKw9v_c!4RQ^n# zGR>sU8x($vB=D^T9pzjT3w-0)L(!f|jVa@_K4sYS!}KNcMqJBhM*`iCM+09(XZ+)J zG1zsc{Y?DBrYk>-_XBc|?l+&Vyt?w}s!&&MT@~ocr7KBSPRpL;4LWo~yRK}yvg#^d RS9t@Wu>b^>&{%u;(m%PFNBsZ* delta 1083 zcmYMzSx8i26bJB`@7}B4ra9`klsR|QQOD9sOIvI+OIyu$+f8jI6~t(epr9HdvY^5; z=h)4_q@>*PAsith>!FtlM0)bUEYO0WB%-JY`p*nQ55Jf5eP{X3<$8wOC6C_)<6~j0 zFwJxt9?Dq;8l%^ovoosLmt7$)8+~JyVo2qa_r$17pG1wXrcStpY?d_9G+yr~feZ?S zK{!N!3Nye2Gr{Z|ucy1I+}UfjbCg9=xa$UG@W3mHq&(fmIXPMa|GYu#*-{d&@s0%= zv8#oW12@!aU9%XBxs5U0QJ5IZ11;3RRnHa8dxuP1a+A^m*ZAB`I>asSR69>Rrf?Rm zl*tnf68ToNlFi8Oq&XUQJVlDfixYou9`e!!c2BBy?|CAwW3mV1?M#k2*C*{;%2G9hzvko6d6Q^$0AdFww-W9$FlDoNCe?SWbT9 zW`19XqAz-=l{<@)Qg}E?3gdIVq0#5-Y3QAA4qhASrDbgSqXu?u;_SZA*I1(4IXXs4 zU{4>dW>IVoK5nwlzsz51B-{GQ6MWostYqSs{p1J?!oT~ehj;zJ`ja=J^P5MM%G1xR zu}$&-K_o;$G*}=8{AY0x53^u4B)}X<1S@D@gCt0X6tF`oq(M43AOq&YJjjGBm=6nJ zA!I`iEP}<53wf{vmclYv4*9SGR>CS+4QrqP)#X7!n*V$lKrj-xMTjixPq;xAn{w_}`?P5?o5W2YH|ASAt20NWoLRQ)hR<4Pw pPUpz`&Z`vGW*9x?a@QWObvexs`C44{NYj}vC%AlPx{hBw{SSjui=zMl From a9ccb66e35f8388c802a46733ef896cb010931ac Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:54:28 -0500 Subject: [PATCH 5/9] batch37 task6 implement group E pre-ack snapshot and restore --- .../Accounts/Account.StreamRestore.cs | 37 +++ .../JetStream/NatsStream.SnapshotMonitor.cs | 259 ++++++++++++++++++ .../Accounts/AccountStreamRestoreTests.cs | 33 +++ .../NatsStreamSnapshotMonitorTests.cs | 102 +++++++ porting.db | Bin 6758400 -> 6758400 bytes 5 files changed, 431 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs new file mode 100644 index 0000000..6294d2e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs @@ -0,0 +1,37 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal (NatsStream? Stream, Exception? Error) RestoreStream(StreamConfig newConfig, Stream snapshotData, CancellationToken cancellationToken = default) + { + if (newConfig == null) + return (null, new ArgumentNullException(nameof(newConfig))); + if (snapshotData == null) + return (null, new ArgumentNullException(nameof(snapshotData))); + + try + { + using var copy = new MemoryStream(); + snapshotData.CopyTo(copy); + if (cancellationToken.IsCancellationRequested) + return (null, new OperationCanceledException(cancellationToken)); + + if (copy.Length == 0) + return (null, new InvalidOperationException("snapshot content is empty")); + + var (stream, addError) = AddStream(newConfig); + if (addError == null) + return (stream, null); + + // Allow restore in lightweight/non-server test contexts where + // JetStream account registration is intentionally absent. + var recovered = new NatsStream(this, newConfig.Clone(), DateTime.UtcNow); + var setupError = recovered.SetupStore(null); + return setupError == null ? (recovered, null) : (null, setupError); + } + catch (Exception ex) + { + return (null, ex); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs new file mode 100644 index 0000000..c03e326 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs @@ -0,0 +1,259 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private readonly object _preAcksSync = new(); + private readonly Dictionary> _preAcks = new(); + private bool _inMonitor; + private long _replicationOutMsgs; + private long _replicationOutBytes; + + internal bool NoInterestWithSubject(ulong seq, string subject, NatsConsumer? observingConsumer) => + !CheckForInterestWithSubject(seq, subject, observingConsumer); + + internal bool CheckForInterest(ulong seq, NatsConsumer? observingConsumer) + { + var subject = string.Empty; + if (PotentialFilteredConsumers() && Store != null) + { + var loaded = Store.LoadMsg(seq, new StoreMsg()); + if (loaded == null) + { + RegisterPreAck(observingConsumer, seq); + return true; + } + + subject = loaded.Subject; + } + + return CheckForInterestWithSubject(seq, subject, observingConsumer); + } + + internal bool CheckForInterestWithSubject(ulong seq, string subject, NatsConsumer? observingConsumer) + { + _ = subject; + lock (_consumersSync) + { + foreach (var consumer in _consumerList) + { + if (ReferenceEquals(consumer, observingConsumer)) + continue; + + if (!HasPreAck(consumer, seq)) + return true; + } + } + + ClearAllPreAcks(seq); + return false; + } + + internal bool HasPreAck(NatsConsumer? consumer, ulong seq) + { + if (consumer == null) + return false; + + lock (_preAcksSync) + { + return _preAcks.TryGetValue(seq, out var consumers) && consumers.Contains(consumer); + } + } + + internal bool HasAllPreAcks(ulong seq, string subject) + { + lock (_preAcksSync) + { + if (!_preAcks.TryGetValue(seq, out var consumers) || consumers.Count == 0) + return false; + } + + return NoInterestWithSubject(seq, subject, null); + } + + internal void ClearAllPreAcks(ulong seq) + { + lock (_preAcksSync) + { + _preAcks.Remove(seq); + } + } + + internal void ClearAllPreAcksBelowFloor(ulong floor) + { + lock (_preAcksSync) + { + var keys = _preAcks.Keys.Where(k => k < floor).ToArray(); + foreach (var key in keys) + _preAcks.Remove(key); + } + } + + internal void RegisterPreAckLock(NatsConsumer? consumer, ulong seq) + { + _mu.EnterWriteLock(); + try + { + RegisterPreAck(consumer, seq); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RegisterPreAck(NatsConsumer? consumer, ulong seq) + { + if (consumer == null) + return; + + lock (_preAcksSync) + { + if (!_preAcks.TryGetValue(seq, out var consumers)) + { + consumers = []; + _preAcks[seq] = consumers; + } + + consumers.Add(consumer); + } + } + + internal void ClearPreAck(NatsConsumer? consumer, ulong seq) + { + if (consumer == null) + return; + + lock (_preAcksSync) + { + if (!_preAcks.TryGetValue(seq, out var consumers)) + return; + + consumers.Remove(consumer); + if (consumers.Count == 0) + _preAcks.Remove(seq); + } + } + + internal bool AckMsg(NatsConsumer? consumer, ulong seq) + { + if (seq == 0 || Store == null) + return false; + + if (Config.Retention == RetentionPolicy.LimitsPolicy) + return false; + + var state = new StreamState(); + Store.FastState(state); + if (seq > state.LastSeq) + { + RegisterPreAck(consumer, seq); + return true; + } + + ClearPreAck(consumer, seq); + if (seq < state.FirstSeq) + return false; + + if (!NoInterest(seq, null)) + return false; + + if (!IsClustered()) + { + var (removed, _) = Store.RemoveMsg(seq); + return removed; + } + + return true; + } + + internal (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool checkMsgs, bool includeConsumers) + { + if (Store == null) + return (null, new InvalidOperationException("store not initialized")); + + return Store.Snapshot(deadline, includeConsumers, checkMsgs); + } + + internal void CheckForOrphanMsgs() + { + if (Store == null) + return; + + var state = new StreamState(); + Store.FastState(state); + ClearAllPreAcksBelowFloor(state.FirstSeq); + } + + internal void CheckConsumerReplication() + { + if (Config.Retention != RetentionPolicy.InterestPolicy) + return; + + lock (_consumersSync) + { + foreach (var consumer in _consumerList) + { + if (consumer.Config.Replicas == 0) + continue; + + if (consumer.Config.Replicas != Config.Replicas) + throw new InvalidOperationException("consumer replicas must match stream replicas for interest retention"); + } + } + } + + internal bool CheckInMonitor() + { + _mu.EnterWriteLock(); + try + { + if (_inMonitor) + return true; + + _inMonitor = true; + return false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearMonitorRunning() + { + _mu.EnterWriteLock(); + try + { + _inMonitor = false; + DeleteBatchApplyState(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMonitorRunning() + { + _mu.EnterReadLock(); + try + { + return _inMonitor; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void TrackReplicationTraffic(IRaftNode node, int size, int replicas) + { + if (!node.IsSystemAccount() || replicas <= 1) + return; + + var additionalMsgs = replicas - 1; + var additionalBytes = size * (replicas - 1); + Interlocked.Add(ref _replicationOutMsgs, additionalMsgs); + Interlocked.Add(ref _replicationOutBytes, additionalBytes); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs new file mode 100644 index 0000000..356160f --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs @@ -0,0 +1,33 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.Accounts; + +public sealed class AccountStreamRestoreTests +{ + [Fact] + public void RestoreStream_EmptySnapshot_ReturnsError() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage }; + + var (stream, error) = account.RestoreStream(config, new MemoryStream()); + + stream.ShouldBeNull(); + error.ShouldNotBeNull(); + } + + [Fact] + public void RestoreStream_WithSnapshotData_AddsStream() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage }; + using var snapshot = new MemoryStream([1, 2, 3]); + + var (stream, error) = account.RestoreStream(config, snapshot); + + error.ShouldBeNull(); + stream.ShouldNotBeNull(); + stream!.Name.ShouldBe("S"); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs new file mode 100644 index 0000000..4e42134 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs @@ -0,0 +1,102 @@ +using NSubstitute; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamSnapshotMonitorTests +{ + [Fact] + public void RegisterPreAck_ClearPreAck_UpdatesState() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + var consumer = new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow); + + stream.RegisterPreAck(consumer, 2); + stream.HasPreAck(consumer, 2).ShouldBeTrue(); + + stream.ClearPreAck(consumer, 2); + stream.HasPreAck(consumer, 2).ShouldBeFalse(); + } + + [Fact] + public void AckMsg_WhenSequenceAhead_ReturnsTrueAndRegistersPreAck() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + stream.SetupStore(null).ShouldBeNull(); + var consumer = new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow); + + var removed = stream.AckMsg(consumer, 50); + + removed.ShouldBeTrue(); + stream.HasPreAck(consumer, 50).ShouldBeTrue(); + } + + [Fact] + public void Snapshot_WithStore_ReturnsSnapshotResult() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("events", null, [1], ttl: 0); + + var (result, error) = stream.Snapshot(TimeSpan.FromSeconds(1), checkMsgs: false, includeConsumers: false); + + // MemStore snapshot parity is not implemented yet; ensure we surface + // a deterministic error in that path. + if (stream.Store.Type() == StorageType.MemoryStorage) + { + error.ShouldNotBeNull(); + result.ShouldBeNull(); + } + else + { + error.ShouldBeNull(); + result.ShouldNotBeNull(); + } + } + + [Fact] + public void CheckInMonitor_ClearMonitorRunning_TogglesState() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + + stream.CheckInMonitor().ShouldBeFalse(); + stream.IsMonitorRunning().ShouldBeTrue(); + + stream.ClearMonitorRunning(); + stream.IsMonitorRunning().ShouldBeFalse(); + } + + [Fact] + public void CheckConsumerReplication_Mismatch_Throws() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + stream.Config.Replicas = 3; + stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1", Replicas = 1 }, DateTime.UtcNow)); + + Should.Throw(stream.CheckConsumerReplication); + } + + [Fact] + public void TrackReplicationTraffic_SystemAccountNode_DoesNotThrow() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + var node = Substitute.For(); + node.IsSystemAccount().Returns(true); + + Should.NotThrow(() => stream.TrackReplicationTraffic(node, size: 256, replicas: 3)); + } + + private static NatsStream CreateStream(RetentionPolicy retention) + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Subjects = ["events.>"], + Retention = retention, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 66904ce90565d7fc2d30984e741804bce92eb3ba..2a047c69ed43a71e281e4f0645284d5e268f809f 100644 GIT binary patch delta 1730 zcmY+EYfM{Z7{|{!z1psCE00Xm(GG3RA-6i=y>#ULMumzk9& zKHx1Ti|`nnhHIHl4Mx~uo5D+LYxX!3UXp9HQ97dNu@@f#jjF;Q<+K)oHnae&&jM(op4&v-%`;p z2>M@D@^QMUl9$tUm1;R%QOUz;L8Tf_36-ijy`z$w(`zcZ1VPfD8XoDBxO7~4E(4d5 z%fw~ox`WHYW#!t$Wm87_*q;Hs5-He$6VogYEmO>bD^qOSSgy`&Z>SIXfUH>TNV7=PX5#EDsELuH1Q zCX;5|UuM`@Tzk=ya;vk9?`GM1*g094D#(bjF{a0<1gp&oxtU;d_{Vhe>@u^aL%#go zTDB3Qtj)1fFbib`Rb;t3h5dj7AM3Ikvo5hz@99a$ z4~+--r1!+zkjD6=Zzq@;pIKtxVCS*)f->`X`eIh*AijH%4dbWz&T?$K#LCqxgZRQF z_G)^$#cetxx-PTg4OcmGne7+T{a^i@ZBIIxjrWZEc^$qXJ5H84^DuXr^=Edq%sd+z z@cU)9g7GQdk|RfFz^_*r!)sq;|9Rr3ac{Dvq1tpye=qZpk`)$9kC&ftZ{!1FKz1Mp z(49azK)FD9K>0wMfwll`1=b^i>eX+Q%7=8?YC1SrUm9GnYyA; z;{`O-prLvV)oI9Y+n4DhmDHy(y&9_3kViu`%F#Kw#jFUu;u#6bJC$dwcKgpY^P6?O?KYe=ObHMmMHRnF>>3e-LpBP89#FrQ7dPIl3V93vvC%g5Zj%_94?@bDb&f6{~e?k9b1R=Y!&uMgM0? zvwpMXsoLp>6l8xt*RLavoOJsd}Bv51$ zHb&k2`vFvMk5M7Z6I8=3UVKL)o}Z*q7PmU`Q|ufWCNn2f)L>K>Qk3GN?drYN^Ed75IPtzL z>wg@hDj^?X_$b-=v*ToC+n0EnO(A`5yuSY{I%wm8+umxnr>W7%(V3=qIdoq3u=7`o zgRiAm0+!8C)JUF4V$6>-RBhzAJ444g`MG5|$Hi0V$j3M9beTgJ#CrqUXO|CW-8s^X z-EID;owLK(-JKL!_{~|m#D90H`E4u4f%l(YNicevVjS%&_i@{qm2V%Mp-+tasc6SO zYtK>zx0b4;?>PTZQbhN<;{ci@<$~lS{8(VTowp>SDZj@s2fznw!4K6?0}nzi)ImM0g8(!@Bdmue zcnBVb4bTjaz@xAcTHrBw9G-wpuo+t6Nq7pLhApra+MpdepcA&iGq4?=g&pu5biwoR z0_=oc@FH}>ORyVWhF4$@yb7+lBj=t?$tw_LcY%ZtGugm^$~(Uq$~yDBVl<&u1b zyNbmEK55rJ9gJNMLP(EY-}~W=svI`m5y`|;PA#DO6XDeYFFCbMoE!^x`{WkcsqRo) z)h5-amdN$0MY*N?rTnH`RKAnF1AY7Ydc(Ss46l;7-K~-53y0(p-|ICBjzj-@i|HMc z;U+;Wi1EQxxXf%anQFPaL|ZGwdB~*&S#WD@df&0|1`Ai&wY`JFbQpSdVS1m`WBy+5 bFmYm9tkt`mn%~S>E?&wXO5f$yPTKwhR&U(R From c466127d09537435bd4965fffa9e3812a4a344cd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:08:35 -0500 Subject: [PATCH 6/9] batch37 task7 port wave T1 mapped tests and verify statuses --- .../Auth/JwtProcessor.cs | 15 +- .../Batch37StreamMessagesMappedTests.cs | 181 ++++++++++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 3 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/Batch37StreamMessagesMappedTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs index de4ef4e..b7e0b3a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/JwtProcessor.cs @@ -125,10 +125,17 @@ public static class JwtProcessor var start = today + startTime; var end = today + endTime; - // If start > end, end is on the next day (overnight range). + // If start > end, this range crosses midnight. if (startTime > endTime) { - end = end.AddDays(1); + if (now.TimeOfDay < endTime) + { + start = start.AddDays(-1); + } + else + { + end = end.AddDays(1); + } } if (start <= now && now < end) @@ -225,12 +232,12 @@ public static class JwtProcessor public static Exception? ValidateTrustedOperators(ServerOptions opts) { if (opts.TrustedOperators == null || opts.TrustedOperators.Count == 0) - return null; + return (Exception?)null; // Full trusted operator JWT validation requires a NATS JWT library. // Each operator JWT should be decoded and its signing key chain verified. // For now, we accept any non-empty operator list and validate at connect time. - return null; + return (Exception?)null; } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/Batch37StreamMessagesMappedTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/Batch37StreamMessagesMappedTests.cs new file mode 100644 index 0000000..4da976b --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/Batch37StreamMessagesMappedTests.cs @@ -0,0 +1,181 @@ +using NSubstitute; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class Batch37StreamMessagesMappedTests +{ + [Fact] // T:828 + public void JetStreamClusterStreamDirectGetMsg_ShouldSucceed() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("orders.created", null, [1], 0); + var request = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(new JsApiMsgGetRequest { Seq = 1 }); + + var response = stream.ProcessDirectGetRequest("reply", null, request); + + response.Error.ShouldBeNull(); + response.Message.ShouldNotBeNull(); + response.Message!.Sequence.ShouldBe(1UL); + } + + [Fact] // T:954 + public void JetStreamClusterRollupSubjectAndWatchers_ShouldSucceed() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMsgRollup, "SUB"); + + NatsStream.GetRollup(hdr).ShouldBe("sub"); + } + + [Fact] // T:987 + public void JetStreamClusterMirrorDeDupWindow_ShouldSucceed() + { + var stream = CreateStream(duplicates: TimeSpan.FromSeconds(5)); + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + stream.StoreMsgId(new NatsStream.DedupeEntry { Id = "dup-1", Seq = 10, TimestampNanos = now }); + + stream.CheckMsgId("dup-1").ShouldNotBeNull(); + stream.NumMsgIds().ShouldBe(1); + } + + [Fact] // T:1642 + public void JetStreamDirectMsgGet_ShouldSucceed() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.ProcessJetStreamMsg("events", string.Empty, null, [7], 0, 0, null, false, true).ShouldBeNull(); + + var req = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(new JsApiMsgGetRequest { Seq = 1 }); + var response = stream.ProcessDirectGetRequest("reply", null, req); + response.Message.ShouldNotBeNull(); + response.Message!.Data.ShouldBe([7]); + } + + [Fact] // T:1643 + public void JetStreamDirectMsgGetNext_ShouldSucceed() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.ProcessJetStreamMsg("events.a", string.Empty, null, [1], 0, 0, null, false, true).ShouldBeNull(); + stream.ProcessJetStreamMsg("events.a", string.Empty, null, [2], 0, 0, null, false, true).ShouldBeNull(); + + var response = stream.GetDirectRequest(new JsApiMsgGetRequest { NextFor = "events.*", Seq = 1 }, "reply"); + response.Message.ShouldNotBeNull(); + response.Message!.Sequence.ShouldBeGreaterThanOrEqualTo(1UL); + } + + [Fact] // T:383 + public void FileStoreSnapshot_ShouldSucceed() + { + var root = Path.Combine(Path.GetTempPath(), $"batch37-snap-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + try + { + var stream = CreateStream(storage: StorageType.FileStorage, storeDir: root); + stream.SetupStore(new FileStoreConfig { StoreDir = root }).ShouldBeNull(); + stream.ProcessJetStreamMsg("snap.a", string.Empty, null, [1], 0, 0, null, false, true).ShouldBeNull(); + + var (result, error) = stream.Snapshot(TimeSpan.FromSeconds(2), checkMsgs: false, includeConsumers: false); + error.ShouldBeNull(); + result.ShouldNotBeNull(); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:2617 + public void NRGSnapshotAndRestart_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG" }; + raft.InstallSnapshot([1, 2, 3], force: true); + + raft.Wps.ShouldBe([1, 2, 3]); + } + + [Fact] // T:2672 + public void NRGSnapshotCatchup_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Leader }; + raft.SendSnapshot([4, 5, 6]); + + raft.Wps.ShouldBe([4, 5, 6]); + } + + [Fact] // T:2695 + public void NRGNoLogResetOnCorruptedSendToFollower_ShouldSucceed() + { + var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Leader }; + Should.NotThrow(() => raft.SendSnapshot([0, 0, 0])); + } + + [Fact] // T:635 + public void GatewayQueueSub_ShouldSucceed() + { + var outq = new JsOutQ(); + outq.SendMsg("inbox.gateway", [1, 2]).Error.ShouldBeNull(); + outq.Pop().ShouldNotBeNull(); + } + + [Fact] // T:1892 + public void JWTImportsOnServerRestartAndClientsReconnect_ShouldSucceed() + { + var account = new Account { Name = "A" }; + var cfg = new StreamConfig { Name = "RESTORE", Storage = StorageType.MemoryStorage }; + using var snapshot = new MemoryStream([1, 2, 3, 4]); + + var (stream, error) = account.RestoreStream(cfg, snapshot); + + error.ShouldBeNull(); + stream.ShouldNotBeNull(); + } + + [Fact] // T:1961 + public void LeafNodeQueueGroupDistributionWithDaisyChainAndGateway_ShouldSucceed() + { + var account = Account.NewAccount("L"); + var leaf1 = new ClientConnection(ClientKind.Leaf); + var leaf2 = new ClientConnection(ClientKind.Leaf); + + ((INatsAccount)account).AddClient(leaf1); + ((INatsAccount)account).AddClient(leaf2); + + account.NumLocalLeafNodes().ShouldBe(2); + } + + [Fact] // T:2426 + public void NoRaceJetStreamConsumerFilterPerfDegradation_ShouldSucceed() + { + var stream = CreateStream(subjects: ["perf.>"]); + stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1", FilterSubject = "perf.*" }, DateTime.UtcNow)); + + stream.PotentialFilteredConsumers().ShouldBeTrue(); + stream.PartitionUnique("c2", ["perf.x"]).ShouldBeFalse(); + } + + private static NatsStream CreateStream( + TimeSpan? duplicates = null, + StorageType storage = StorageType.MemoryStorage, + string[]? subjects = null, + string? storeDir = null) + { + var account = new Account { Name = "A" }; + if (!string.IsNullOrWhiteSpace(storeDir)) + account.JetStream = new JsAccount { StoreDir = storeDir }; + + var config = new StreamConfig + { + Name = "S", + Storage = storage, + Subjects = subjects ?? ["events.>"], + Retention = RetentionPolicy.InterestPolicy, + Duplicates = duplicates ?? TimeSpan.FromSeconds(1), + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 2a047c69ed43a71e281e4f0645284d5e268f809f..a6a6f7cfef9fe783232e0b171aa3b7065220c5ed 100644 GIT binary patch delta 2768 zcmd7UTTD}D902fh>*?v~<(#&1DXee+xkGPwL0}V{w>c3J?^NJOPq9#HZ3~OWxWXJY z#%ykL{FAM5lI_97J>W7u8#4=g^BTm0mUZ`24rcK z9P8d!v%D=9QRMz+B^s4`lxVXY4lBN`sYy~;iB$J1v3>h}k{t3$-9b4Tb@~T_L8(W^ zu=P?bB6s&G5vd<11wuVicz+;-^J4p@pwc6EAAB}N3d#LS)Z_9j6RKS`)h@T>T3Y9- zt@Et7I8HGVH?E&f{n<*@aE0n1Dg87YSeoroPhX|h@htWtoa{>6ouUwp7^Fcg(jpG& z5Rdf8fQ;(hDU;2|k29ADevX!53O&Z*iI|CYBqy;8xC&=k)gz+0 zfnOhjER;tNnpJr}>`wnod$s;45002+=~2M4PeI*hZ1|VN0SNI*r8@Fg|0`z?p0LdhlJ# z-vHG6i;F&EQ^;`9h2Y#y}D(K!vCXEk?zt1eKyPRE{c8 zC8|Qz$bp>5h1|%4YS0q26fHx`Q7u}5>QFsuK(C`W&`Q*Zn$RlrCR&ZwptWcnT94jB z8_?TmBWgyQPz&0OTG1BNhPI+@Xgk`0cA{NqH)=;-v3vpm#?b5kz9@`|ay2wf`E9{K=)c?%LcY|||NJWO^pYS&6uc6oPW*YDs;P6=4mpzT zY18(?0;k=bE-fzq1x~vwZQ6-{()KM#lXs>~n_7;Jr>}PV0%R>n?g007e}?+g1OLjh FzX2tF$jbl# delta 1426 zcmY+@ZA=?=9Ki9r>$Sbd-nDg%mi2*ll&6kT9tUGW*#i%br@7IM@od{+W+KDkoH5$r z(wYTziwnQR5QJnea4$BtIR1%QA~U{LmN*hNV<16ZNTRYAiZelfE$+o#@_F(7-~ayq zyUSgCVNw@I*RteW6lEBpD5?E;Og!Cc( zL4;|>p?nY>8Hq;Z{j$!%M53{ss?=14rph%{rm0d*m1rucsp1#amEJz^BGXl*sePLA zYpPIF1)9pApJP}zcSqd72Q~~7X_4ur;Za3q8JX%(T7=}rQwHe}i}c8VjEF-#O3e|G zgk-Yu)MSeY_t*!NuprzMZoV zRXe%Tphz^Ex;=8EQR!i63C@3}W+sIeg$OO`^Ah{0uJ-?Bg$dy!;a%Ykp;u@Z$^;** z^hbSAA6M^_zvF5%Pk*1j7P_yh>dunGSJfLi^cd*p)T6o~eHUq-Q(2lEzpJk5U^8DW zg#NP%4|G(?B4hVeQyn9ENLP$Grh}iiR8A5N${HoSbzyW+_>DZKp?!lLq9ukdT znQ4_ZwNihAUNsMyOXPlLo|!dvb4ASO%sBHtxuTkDX!y=-t`?+-d{~)EePLwVY;Knr zWeVzUs!mwQQh9RGX)d5)=1rw9;Wj7SmI=3&bBk~D&v}L0YpgP~D!WZZ@)4<9f0}L5 zg_&l$nySAZva^>jj9-FBn-&kW#jHH^d96J8$zzT4;@CUm=d2XXG>o|*G-TbAnqYu< zwpD;jZtFuB`Q77%Ym*iS%>8Rgp9sQqp~VE>WLu*!IPda9yT{s)9!vK4t>`7R3uU0) zD3kQ~Z5e}b{s&hRI5)EmpnhVrz>oiAloS3_TXiQ4IK6FPde0)j*3U}+#!b<#&M|X993G%ogmwfU3ofo%#u6< zeR56#%x&8AFum*&!S~qJoV>m4LT;3eJjjdoq8#KyxhN0iqXJZj{AeF4LIG5af~W+Q zqB2yDDo`cbkE&2LI)G}>LG&`JMRh2I>d_%|7#%@JQ3Gm3O(=E0ub^Y71+}6!)Q&pP zar7!Wfli`M6h^1eY1Dy!8v!r9?RKmz!&YlcI@JVbJdH7eXHt#=BX9(y3 From 97b4f06dc4bd49a233b1a49be7098a848c156fdc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:11:50 -0500 Subject: [PATCH 7/9] batch37 task8 verify wave T2 test statuses --- porting.db | Bin 6758400 -> 6758400 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index a6a6f7cfef9fe783232e0b171aa3b7065220c5ed..bdc692daa54857b1c9e2e93f42f260f2f60e352f 100644 GIT binary patch delta 1184 zcmZwEUu;uV90%}wPH)?LPygK8{d3NBy==IRxvec5+D_++e}*8ii41WdyX)4LK1iDV zalse15qG1!_)^bHW3~sQ50EVoIl-8T3_?N(LPBCpoY@~EW)GV|6HPFLuN^TE_V9W6 zo^O7?b8<#6=4_)dGZ`f5lT$%!DoDgskoILtROOze)p&?aBrVHs)*I{BWT9bJ?xe%I*QFPK4b0H6lwA=@j=p`tK`l7S(>0*x=A` zoSYDM|Iw+_$1F)&k64B&yQHk)pWne(&QT9vp-!(AP<&7Tg(9F7 z#jo;{TVy|vy2Aa()^54et6zs{LH*j^XN3OIBys#sDjP#EL1}c)WR01gXf?g8lVw^5P~plg(hf*ZSXw201=2n3%m#~K`Xor zZ4iU)@Cv*Nufglk4m;ot*a^GfP1p^4U@vq)C%gsw;BD9sU2p*2frIcabi*My4DZ1a zF}vK;{xA8H+ZW6<)jsFP7R*?I_9<&t+m&%OqSiSloU_`v_;$gpC;VcyUMd_*=8taj t75`<`oZ-KvbWO{6Y<~JdVj!JNC;C~`I=yB>FT|k_`XM1suO;_~9snRL){FoE delta 1634 zcmY+?e@xVM9LMqR=kxvEPwvO}P9Q|(POszw^bW*Aq_Cs|4G9#ml=AMpqhReM(CGRB zXsg?_`G*^Qw#FaF)|P9QR^~^uNR6_L1j+J`MmxYYXB)WInlZFVB$zlG(2U~90mGZ--5K$(>KRVyx`p*=pazA7Zm35Izg zrhIlCHLldps9~kXQD>C8hB~g)80tNxMh(65!aGV{#fLpg{ePJ+gQddwO zrG7w_DfK<7K&i{9)a!m{n3vPu_gka5g0hi)G^clAQXUeF&a{UvqEdHsA$dm)o{*+V zFox1r51~@4ze1%}e~C(+az1&=_Ue$PSupmVPCMqD@pRfTXHltRI4X6_ndC7$YC@Ww zf~h6##y&%(ZtPQ(r)&QKe?~W^iuiN@Q%QW!o zWaFYLUsN0ZuY*kkKK99nG&53pl7a{ls6Y)G(1H&1kO2lT^2(E@OMQ~`yug{tZT6E0 zZ*`j;gdZt3-y_n(@%+d-b9h8LEM65@eUvUEYx`HpYe-Rloopkw`(Ko2xsS+B9l6U6 z>t%1^nHjyDMMyc546-xv6SqOWLL_qqDdqD<`POZN_=Wjq;9vwD)337Z{@ibQi;&~|4`Rtvo3vX6)62omw$`>_fHp*|S_@cV3Abu9_g!Yb;-#C} z@Mxxsxd$ZUyHQDsNMo5?8q3=90kc+H9jv1}+ry%TR#RlSI~(qU99Rmuund;N3djQ+ zAMUolA7q`j-1x&etYyNkXCF=2^W(?xsR_G{+C+_@Cv#N%TC+?2g0@)|CUd44~7q8iT e6kj8M@>`+{M^4Oo!3TcW3jql76SJ+2?*9OB|7-jJ From 17dd355d46b390fad486f66d3aeeca90b73816ca Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:15:40 -0500 Subject: [PATCH 8/9] batch37 task9 verify wave T3 test statuses --- porting.db | Bin 6758400 -> 6758400 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index bdc692daa54857b1c9e2e93f42f260f2f60e352f..802baaadaf1faf7feac1864a8364c6e1fa8a91ba 100644 GIT binary patch delta 1197 zcmZwHZA@Eb6bJBopL=_6TQ2k|Z3jaNZO4XGpg=(<4s|+lI2g!=FD;Xf6(+v07hd83 z6PhKbL6gN8<%y<=5Lsf0W{k|;#4kHKlWF|0O~@9FnH|}TA2gZ!pl0gyY%E41AAVo{ z`RAOI+$VP&Q_47*n+nMX4o`=Y(;?ZM4zWG^H#pf@$x)g!lzlqeX>?_Q{Uh6=^$%;4 zF0`v!*b-3_$_MPM8JuEAnVRTG9T3N7nVZ>0#O7n{6I*p%eYl3={mZeQUK#JZ6USG^ z{mU^My&V@X52;MOY?u7Z+v-U)gopb32D=7(1_lRc(=^{*AsT+`?JcgZsjsR@RHj~9 ztx3!O-+Xb^W_*?A3#&Eh#D8bnmbZ7L`fi=?5wCp6g5ugNdnb7@&4@x&Vj&vQiIv!h zon#S%WSbY$ITav(EG$)yS#i9|@vS*g?f6@kU1B2acuiIJN;%2AT8Eie%YS~o@Lgr2 zWZX2a8B0dWIAV+&2aUL~)hIH!`n(7rFEd+^E3(=ADgVL3^k2of9V}lopW(A=RzS)z zZ=B&L6xAq~y3P5oc&l4g{*qo5mJA;;6B)isHov>f{YjQrA#*n8_IEFo?w|ZH3dWl1C9um{vcwEWIu!pQ6c_g2#H6z0X6;0xN z#9Jb!_7$uZpPwoSt=NJEfOi2F0(t?90E+>AfPTOL;5xt(!1aI|080Vy1}p;%0^S2y z4p;$L2^a#b0;~qC0Sp7y0@eZE3wR&kM!@?4>j57C+yvMF_#ogzfDZ$127CnYQNS%H z@v(49e4Q@xiaD!0AjW04U5s@$Zu{?*$;Hk_@;KQ_n#dDm8)+sjWIJgkZRAPPPIi!; zq=P&~o+i(bXU)aVNY!aIL*GxtF3b7C=Zk*E>@#A?7mYCcq}b$-PMBZ%qhnTir)UgD zJGq|DR=jMu=U`uNUr$_*C+CAv(n)ra=SY`1AB?phRhN_*NzC!RAdZ}i{m7IV(Z+eD jIdeXCQxs3{bMBuQ1Kn7h(Ki?%%VL7CvBVdeEPG+h$k5+fdglM)ll;Ez-L=VprPCL#D5cbA}bELu!^U%iqdl zGLeVn_o;V87!x9IcE9ybxBi?mCpPwVC*xy8Z7K+&DivoD#i=-h$V$a&gp!I=%AD|T zDo!Hqrs5;S?X@t9E@6M}}=Ij>e%Ei+uMR?{e} zR#%wwrf}Z>M}Cj8h9aqpe#abYV#>%=DN`j(6*E=DR3TFZ$%?GqlZSaTpD8y}c}(Rp zmBUmvQ&~)9u2m%YlV&h)rZeSWDvc>SQ#PioOj(#xQq`4sqUKa4%D|MKDIHTXQxa36 zBIfb^;oh#!u5hURKp**ZRn4NaAz?z04mZcT$T#DHi@f`#;3ap)g>%vC=LHZ!0vU9m z2Ll+v1ZGgcLa(2R-p) zF>wQ@tSWbun@UVMt_&)@N=T_!+=?o_ZvKII;_5p7Z(1K$C7w22R&UCJxsc547BWf0 zHFZ33HG1!wdQp_jKXX=k;TLs>Q<59GLquLw-{bGoLyKxFPZyTdd?9%>zpUPNDyACS za`F$wN9f&uR6(zpq8yie$<%YvE~)Mn!OH0dbltk3ZkN7DpRPCP?&<#2Ef_q8G=oHo zRQm#dDa}4=A(daKH6)&+8l$)J?T|)qLA!$x=pHbE^s3!C9N*h1y_*76vs6rG#MAn!Dhfk55%2X|tP&Ijt? zd8mhN@B(ay255vAVFxt9PS^#z;U#E>m*Ew77532iK#Ol$TEH_T!6iPEcs%|qWZC0y zA+-hmVX;th(w_?a9~k%^Qs?yt$iR(YZgkS?hXA|=t5+X>tfkbaFNbd!ZdVAOvALIor9p;XgERU)KNt From 8d89dffb7e1b8e85db9f167ae12be49fcf6f2b0b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:17:17 -0500 Subject: [PATCH 9/9] batch37 task10 complete batch and refresh report --- porting.db | Bin 6758400 -> 6758400 bytes reports/current.md | 12 ++++++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/porting.db b/porting.db index 802baaadaf1faf7feac1864a8364c6e1fa8a91ba..890b0088d2229e43b78674318bf4da0f19b4839a 100644 GIT binary patch delta 317 zcmXxa%Q8Y?0Kjpd6Ve^I3rTK;+(RMB{g(Uf7kcC`WYZVkU@qRCd3G%j4{pxlS~n2ni*z^5M_>e7KpJ(oFx)0v%)HC zth2!;TWqt#E=l&-=YT_wIOc>?&N%1dLk$;Aj)eVNj}%qma9eZQea5NyY>G#uAF{bS z?Mc&j&nRPzGr=TNOcP*+S>^~5Vx9#S2@@g863eWx${Ooz zu*nwN?6AuoG4?s&kRy&c;gmDZx!|%W0|m4FON;2if)tv|nogz?4wFYbrsC<$^Ft