From 0860afc886aae80c0d6792e17a32dfba667ca78b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:45:33 -0500 Subject: [PATCH 1/6] batch36 task1 preflight and baseline gates --- porting.db | Bin 6754304 -> 6754304 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index d07323c7f7dc27a93c4d0d34efb605e455832786..2af9deeec4f20394a38e40a2a4d8d718b09e46d2 100644 GIT binary patch delta 397 zcmYk!J5N(_9Dw0-Xo1?E{!XdN9RylHDzt!dD>o4-_e-nb^J%{%2q-; zG!AYMLQKwgaAIPSG+|?MV`1X3F+A(Dlg&7G?x2&011I?uP>72Mxba}%MKJMEgrA2z z;xWaPP)Zr)R8UD3)jXkwTIvW8q@D&Ec}f!@p3zJTVIs8BMmx`WK?j|5(M=Cgdg-H| z0S0-=5U&{KH6z3rWsGqqm}H7+W|(D;I0@eHmU$L<$9q2TkxzX7qw=!(F)ifXPZwrY zq~5!4UYjA)Zx+an{Fih2E2nZOYm$_NL?tYBQY_YG+}EUAtu?w~_)JGY|FORKGqtgr zTG`wTrn5)tw^Oz?!cl}fGn4Id%Kq}WQw7-MF?gG!|` zW^^i@M&++~y64hLIxhdAlY#>$sicuk2AO2x!i@*Pz)Lnha>yl*duj*e7TfGRn<-Z6g%JK)hYEyMk&8COP@+}QGJ rHn Date: Sat, 28 Feb 2026 22:52:06 -0500 Subject: [PATCH 2/6] batch36 task2 implement group-a lifecycle primitives --- .../Accounts/Account.StreamLifecycle.cs | 71 ++++++++ .../JetStream/NatsStream.Lifecycle.cs | 160 ++++++++++++++++++ .../JetStream/NatsStream.cs | 8 +- .../JetStream/StoreParity.cs | 40 +++++ .../JetStream/StoreTypes.cs | 50 ++++++ .../JetStream/NatsStreamTests.cs | 55 ++++++ .../JetStream/StoreTypesTests.cs | 41 +++++ porting.db | Bin 6754304 -> 6754304 bytes 8 files changed, 424 insertions(+), 1 deletion(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs new file mode 100644 index 0000000..9f5b73e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs @@ -0,0 +1,71 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal (NatsStream? Stream, Exception? Error) AddStream(StreamConfig config) => + AddStreamWithAssignment(config, null, null, pedantic: false); + + internal (NatsStream? Stream, Exception? Error) AddStreamWithStore(StreamConfig config, FileStoreConfig? fileStoreConfig) => + AddStreamWithAssignment(config, fileStoreConfig, null, pedantic: false); + + internal (NatsStream? Stream, Exception? Error) AddStreamPedantic(StreamConfig config, bool pedantic) => + AddStreamWithAssignment(config, null, null, pedantic); + + internal (NatsStream? Stream, Exception? Error) AddStreamWithAssignment( + StreamConfig config, + FileStoreConfig? fileStoreConfig, + StreamAssignment? assignment, + bool pedantic) + { + if (config == null) + return (null, new ArgumentNullException(nameof(config))); + + var (server, jsa, error) = CheckForJetStream(); + if (error != null || server == null || jsa == null) + return (null, error ?? new InvalidOperationException("jetstream not enabled for account")); + + if (string.IsNullOrWhiteSpace(config.Name)) + return (null, new InvalidOperationException("stream name is required")); + + var normalized = config.Clone(); + if (normalized.Subjects == null || normalized.Subjects.Length == 0) + normalized.Subjects = [$"{normalized.Name}.>"]; + + _ = pedantic; + _ = fileStoreConfig; + + jsa.Lock.EnterWriteLock(); + try + { + if (jsa.Streams.TryGetValue(normalized.Name, out var existingObject) && existingObject is NatsStream existing) + { + if (StreamConfigsEqual(existing.Config, normalized)) + { + if (assignment != null) + existing.SetStreamAssignment(assignment); + return (existing, null); + } + + return (null, new InvalidOperationException("stream name already in use")); + } + + IStreamStore store = new JetStreamMemStore(normalized.Clone()); + + var stream = NatsStream.Create(this, normalized, jsa, store, assignment, server); + if (stream == null) + return (null, new InvalidOperationException("stream creation failed")); + + jsa.Streams[normalized.Name] = stream; + return (stream, null); + } + finally + { + jsa.Lock.ExitWriteLock(); + } + } + + private static bool StreamConfigsEqual(StreamConfig left, StreamConfig right) + => string.Equals(JsonSerializer.Serialize(left), JsonSerializer.Serialize(right), StringComparison.Ordinal); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs new file mode 100644 index 0000000..84168f3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -0,0 +1,160 @@ +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal StreamAssignment? StreamAssignment() + { + _mu.EnterReadLock(); + try + { + return _assignment; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetStreamAssignment(StreamAssignment? assignment) + { + _mu.EnterWriteLock(); + try + { + _assignment = assignment; + + if (assignment?.Group?.Node != null) + { + _node = assignment.Group.Node; + assignment.Group.Node.UpdateKnownPeers(assignment.Group.Peers); + } + + _updateChannel.Writer.TryWrite(true); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader? MonitorQuitC() + { + _mu.EnterWriteLock(); + try + { + _monitorQuitChannel ??= Channel.CreateBounded(1); + return _monitorQuitChannel.Reader; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SignalMonitorQuit() + { + _mu.EnterWriteLock(); + try + { + if (_monitorQuitChannel != null) + { + _monitorQuitChannel.Writer.TryComplete(); + _monitorQuitChannel = null; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader UpdateC() => _updateChannel.Reader; + + internal bool IsLeaderNodeState() + { + _mu.EnterReadLock(); + try + { + if (_node is IRaftNode raftNode) + return raftNode.State() == RaftState.Leader; + + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool IsLeaderInternal() + { + _mu.EnterReadLock(); + try + { + if (_assignment?.Group?.Node is IRaftNode node) + return node.Leader(); + + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void StartClusterSubs() + { + _mu.EnterWriteLock(); + try + { + _clusterSubsActive = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopClusterSubs() + { + _mu.EnterWriteLock(); + try + { + _clusterSubsActive = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ClusterSubsActive() + { + _mu.EnterReadLock(); + try + { + return _clusterSubsActive; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal Account? AccountLocked(bool needLock) + { + if (needLock) + _mu.EnterReadLock(); + + try + { + return Account; + } + finally + { + if (needLock) + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index f6f788b..3e70945 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -13,13 +13,15 @@ // // Adapted from server/stream.go in the NATS server Go source. +using System.Threading.Channels; + namespace ZB.MOM.NatsNet.Server; /// /// Represents a JetStream stream, managing message storage, replication, and lifecycle. /// Mirrors the stream struct in server/stream.go. /// -internal sealed class NatsStream : IDisposable +internal sealed partial class NatsStream : IDisposable { private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); @@ -42,6 +44,10 @@ internal sealed class NatsStream : IDisposable private ulong _leaderTerm; private bool _sealed; private CancellationTokenSource? _quitCts; + private Channel? _monitorQuitChannel = Channel.CreateBounded(1); + private readonly Channel _updateChannel = Channel.CreateBounded(4); + private StreamAssignment? _assignment; + private bool _clusterSubsActive; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs index 900d58a..7804a7d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs @@ -303,6 +303,14 @@ public static class StoreEnumParityExtensions StorageType.FileStorage => "File", _ => "Unknown Storage Type", }; + + public static string String(this PersistModeType value) + => value switch + { + PersistModeType.DefaultPersistMode => "Default", + PersistModeType.AsyncPersistMode => "Async", + _ => "Unknown Persist Mode Type", + }; } public sealed class RetentionPolicyJsonConverter : JsonConverter @@ -403,6 +411,38 @@ public sealed class StorageTypeJsonConverter : JsonConverter } } +public sealed class PersistModeTypeJsonConverter : JsonConverter +{ + public override PersistModeType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "default" => PersistModeType.DefaultPersistMode, + "" => PersistModeType.DefaultPersistMode, + "async" => PersistModeType.AsyncPersistMode, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, PersistModeType value, JsonSerializerOptions options) + { + switch (value) + { + case PersistModeType.DefaultPersistMode: + writer.WriteStringValue("default"); + break; + case PersistModeType.AsyncPersistMode: + writer.WriteStringValue("async"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + public sealed class AckPolicyJsonConverter : JsonConverter { public override AckPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index c89a8b8..03eb1d7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -641,6 +641,7 @@ public enum StoreCompression : byte // --------------------------------------------------------------------------- /// Determines what persistence mode the stream uses. +[JsonConverter(typeof(PersistModeTypeJsonConverter))] public enum PersistModeType { DefaultPersistMode = 0, @@ -711,6 +712,14 @@ public sealed class ExternalStream [JsonPropertyName("deliver")] public string DeliverPrefix { get; set; } = string.Empty; + + public string Domain() + { + if (string.IsNullOrEmpty(ApiPrefix)) + return string.Empty; + + return SubscriptionIndex.TokenAt(ApiPrefix, 2); + } } // --------------------------------------------------------------------------- @@ -737,6 +746,47 @@ public sealed class StreamSource [JsonPropertyName("external")] public ExternalStream? External { get; set; } + + [JsonIgnore] + public string IndexName { get; private set; } = string.Empty; + + public string ComposeIName() + { + var name = Name; + if (External != null) + name = $"{name}:{NatsServer.GetHash(External.ApiPrefix)}"; + + var source = FilterSubject ?? string.Empty; + var destination = ">"; + + if (SubjectTransforms is null || SubjectTransforms.Length == 0) + { + if (string.IsNullOrEmpty(source)) + source = ">"; + if (string.IsNullOrEmpty(destination)) + destination = ">"; + } + else + { + var sources = new List(SubjectTransforms.Length); + var destinations = new List(SubjectTransforms.Length); + foreach (var transform in SubjectTransforms) + { + sources.Add(string.IsNullOrEmpty(transform.Source) ? ">" : transform.Source); + destinations.Add(string.IsNullOrEmpty(transform.Destination) ? ">" : transform.Destination); + } + + source = string.Join('\f', sources); + destination = string.Join('\f', destinations); + } + + return string.Join(' ', [name, source, destination]); + } + + public void SetIndexName() + { + IndexName = ComposeIName(); + } } // --------------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs index 9843cc1..f17aa5b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0 using Shouldly; +using NSubstitute; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; @@ -40,4 +41,58 @@ public sealed class NatsStreamTests stream.Delete(); stream.IsLeader().ShouldBeFalse(); } + + [Fact] + public void LifecyclePrimitives_AssignmentAndChannels_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }, + null, + new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }), + null, + null)!; + + stream.AccountLocked(true).ShouldBe(account); + stream.StreamAssignment().ShouldBeNull(); + + var assignment = new StreamAssignment { Sync = "sync.inbox" }; + stream.SetStreamAssignment(assignment); + stream.StreamAssignment().ShouldBe(assignment); + + stream.UpdateC().TryRead(out var updateSignal).ShouldBeTrue(); + updateSignal.ShouldBeTrue(); + + stream.StartClusterSubs(); + stream.ClusterSubsActive().ShouldBeTrue(); + stream.StopClusterSubs(); + stream.ClusterSubsActive().ShouldBeFalse(); + + var monitor = stream.MonitorQuitC(); + monitor.ShouldNotBeNull(); + stream.SignalMonitorQuit(); + stream.MonitorQuitC().ShouldNotBeNull(); + } + + [Fact] + public void IsLeaderInternal_WhenAssignedToRaftNode_UsesNodeLeaderState() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }, + null, + new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }), + null, + null)!; + + var raftNode = Substitute.For(); + raftNode.Leader().Returns(false); + var assignment = new StreamAssignment { Group = new RaftGroup { Node = raftNode } }; + + stream.SetStreamAssignment(assignment); + + stream.IsLeaderInternal().ShouldBeFalse(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs index 9a369e0..4747d67 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs @@ -277,6 +277,47 @@ public class StoreTypesTests JsonSerializer.Deserialize("\"file\"").ShouldBe(StorageType.FileStorage); } + [Fact] + public void PersistModeType_StringAndJsonParity_MatchesGo() + { + PersistModeType.DefaultPersistMode.String().ShouldBe("Default"); + PersistModeType.AsyncPersistMode.String().ShouldBe("Async"); + ((PersistModeType)99).String().ShouldBe("Unknown Persist Mode Type"); + + JsonSerializer.Serialize(PersistModeType.DefaultPersistMode).ShouldBe("\"default\""); + JsonSerializer.Serialize(PersistModeType.AsyncPersistMode).ShouldBe("\"async\""); + JsonSerializer.Deserialize("\"default\"").ShouldBe(PersistModeType.DefaultPersistMode); + JsonSerializer.Deserialize("\"async\"").ShouldBe(PersistModeType.AsyncPersistMode); + } + + [Fact] + public void ExternalStream_Domain_ReturnsSecondTokenOrEmpty() + { + new ExternalStream().Domain().ShouldBe(string.Empty); + new ExternalStream { ApiPrefix = "$JS.D1.API" }.Domain().ShouldBe("D1"); + } + + [Fact] + public void StreamSource_ComposeIName_UsesFilterAndTransforms() + { + var source = new StreamSource + { + Name = "ORDERS", + External = new ExternalStream { ApiPrefix = "$JS.EU.API" }, + SubjectTransforms = + [ + new SubjectTransformConfig { Source = "foo.*", Destination = "bar.*" }, + new SubjectTransformConfig { Source = string.Empty, Destination = "baz.>" }, + ], + }; + + source.ComposeIName().ShouldContain("ORDERS:"); + source.ComposeIName().ShouldContain("foo.*\f>"); + source.ComposeIName().ShouldContain("bar.*\fbaz.>"); + source.SetIndexName(); + source.IndexName.ShouldBe(source.ComposeIName()); + } + [Fact] public void AckPolicy_JsonParity_MatchesGo() { diff --git a/porting.db b/porting.db index 2af9deeec4f20394a38e40a2a4d8d718b09e46d2..486e4e1714a068b0502e6083eeaf8989664268f9 100644 GIT binary patch delta 2228 zcmcK5Z%h++7zgmX-|zbO+G}B`AU!C>3Zg)fp+%j7)`DRDi-2sRgkH>)#^|=VY*AyFFO1V<@t(@qo_$YpB|oy(Qa<+8y6qAmx+2%Ad3F#Rs)jIe%Fy6HX>XQU$WNq#95 zl6)$LVQKWhUMUpx2PGeW3FJ|BK4Icvbd*gY#vStWQI>RS)Za6H?m8noc99+y%wCcy zU-1%7b&z3HgD;mee4@F@2uk`m_Ed(GSi7dwdN5s7(g{pMN;-z=jFOIGIOMFs2G6eTd1eqz^DTm2?PGmXf+LS(Ma;X?jbYGq%*hFlO#crS$;A zaHr*w010z5+V;!726FBz*iuPF7^pGukZqy%Q_rDTPkD(1Ud18i7L)o?47 z+G2X1rc&8D)$?-^&7Q6RUrg&@?0vlmM~i8JQYe&f7Sm_{S$1-`7o%sms0+G}X+)Sv zpn5Q6ifTCRqH~oM9B|b|f1PTX+}*eBf(ov$rSqaKx~VOu_vxUkS4@ILH_e0YL284} zWE}rJ_l!1YpBB146_Vhpo34V#mt2z<%m|q!u~!0WOK9cPCA!rlXduz1%X%?wTq$jY z!6uplBh_{ltn9^Uhe~7bg?z7+mas5(!z#iq6{nu+DJzTZf&FDP0Amjm#8c&TY0OSt z<+KJ4{gM(rZiXA>G%cq1S2?{1kK^&iU2Qk2!CX!`c(9(P#I*1p+8)yq@zA8{YWR+a zA`YpL8flOg>5v}r$bgJUknea*3vY9VPUe}cdf;mc=;mGM`pYhzLEPMd3Vz==P;u<6p!9O31}9Yjpm?4l!WG@WMn}i zN6Rs7 zw%U&7qYRXZ7N9J&5G_L4CQ9gPT6`<%kOOX>5qGf0~T7lj|E72-cgo@E> zvUX0!!uMHOfpszg<&8f`~4=xy{4+JS1(PP9vA ztG(-j*Y{AkdZqb7zEu#yiX zgJ=i{;z@>Kl;_4+JL6j&a^0|RP#xwbyr}LxH&Guk|3jS2_maa1UZka_YVtNADP zl+Ek{*RW571;aiITMxppFIn!hncp>rxrh%*=edZ#DBZ03ficwzmxZ~;Hp7TvA%9r^ zP`^S4+Q(XtW=Nw^`&E$EDSj?HtnA-hVwc!oqh~}St`ob&e81#vZ3{^)h7Q==RG)J& zxTm>6+9Nf#NSu|9r@XgJgXyrS@hLbh$r6uiW47r`KPg_!P^2mn(A$@ laR?_4QT>V`_Wy5)ov-goEa^(?uY$aL{#o*OPyDMA{{;r}#_#|D delta 1619 zcmY+^YfM{Z7zgm4^Pb-5>1jb=zr8dG$wP! zIXcnJU7Qajm~{OD-)PL>?1K?zmT0Dl$cKFxF`$VbMzTrFqREVxKdsw(n*5sl&vV|( z^X8lricGQ)#3B`(_w4lw<$47-7~wfa;v$#F9$1!@ZHrvcQmD~r3e8OB{efU$csStK zj^j!yJo}Nd!WHv6H?6F2xx8sk`FDlmLwfx^&AY-K4PERD)KDbG*DKw9yo*!5^6`T9 zO!A!KRaLsIN|#jWqAFcbrH@o;OqD)VrSs-9NiB(CRdH06&Z*J|s&rPB-dCj&RSK!n zduC>RcB10xjiRzRz=s94UsFP#+_z>bZw&F*W+?5XPNv8a(M07o=AeZWI*IyhY=EkF zh;~}YV>)_hW6pTN&@y37)o{COY2OVgof>84q5fN1nYznm6V1xF?b2;#R@P)T%RQZ- zlGcASrYk}|i{)=zlsoZ!;^S z&`o9}XDe%=r8)WkeOa8cj>cPA&c-;27FyX)N!9sxBpuypV@2_mSmA_`My|-&l;vVo zH2kZvfI^$m+?%c^chpT7Xyj8Vo911tfxiCB9`DOc&UUsnvD@SA>`+p}KfI0=vI;O$ z{M6Dq*lW~3WR$5-E9hv?73^-h17{kk(M!s52Xk@M++|KKcXmEG^3$CxNW(K3@@H(1=s@(& z8Q7%^l~_0N*|t-G;lbcwz@HM^_&z*MGsptk864p<|#7F$JhF{`u^K<;P zVp#Q^;;vcwS82f)xQXX3AW~>!;s?L@%5^jr^;^gn_1mZof80@}|GqD(3kxwn1g3;o zurb%HpVnB8Nnc7i=C{N}vD7r0@^ebPkqo~ZJo;(Hp3jZyg%SItm#XdsGn89@1{<>e E13VP|4gdfE From 3c974fbe5523a9d0d826e99f2c86485b587727a4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:59:08 -0500 Subject: [PATCH 3/6] batch36 task3 group-b primitives checkpoint --- .../JetStream/JsAccount.StreamLifecycle.cs | 45 ++++ .../JetStream/NatsStream.Lifecycle.cs | 196 ++++++++++++++++++ .../JetStream/NatsStream.cs | 2 + .../NatsServer.StreamLifecycle.cs | 25 +++ .../JetStream/StreamLifecycleGroupBTests.cs | 137 ++++++++++++ porting.db | Bin 6754304 -> 6754304 bytes 6 files changed, 405 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs new file mode 100644 index 0000000..cf6f0f8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs @@ -0,0 +1,45 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JsAccount +{ + internal bool SubjectsOverlap(string[] subjects, StreamAssignment? ownAssignment) + { + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + if (ownAssignment != null && ReferenceEquals(stream.StreamAssignment(), ownAssignment)) + continue; + + foreach (var left in subjects) + { + foreach (var right in stream.Config.Subjects ?? []) + { + if (SubscriptionIndex.SubjectsCollide(left, right)) + return true; + } + } + } + + return false; + } + finally + { + Lock.ExitReadLock(); + } + } + + internal Exception? ConfigUpdateCheck(StreamConfig current, StreamConfig proposed) + { + if (!string.Equals(current.Name, proposed.Name, StringComparison.Ordinal)) + return new InvalidOperationException("stream name may not change"); + + if (current.Storage != proposed.Storage) + return new InvalidOperationException("stream storage may not change"); + + return null; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs index 84168f3..cda43de 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -1,9 +1,205 @@ using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; internal sealed partial class NatsStream { + internal ulong MaxMsgSize() + { + var maxMsgSize = Config.MaxMsgSize; + if (maxMsgSize <= 0) + { + maxMsgSize = Account?.MaxPayload ?? -1; + if (maxMsgSize <= 0) + maxMsgSize = ServerConstants.MaxPayloadSize; + } + + var maxSubject = -1; + foreach (var subject in Config.Subjects ?? []) + { + if (SubscriptionIndex.SubjectHasWildcard(subject)) + continue; + + if (subject.Length > maxSubject) + maxSubject = subject.Length; + } + + if (maxSubject < 0) + maxSubject = 256; + + return JetStreamFileStore.FileStoreMsgSizeEstimate(maxSubject, maxMsgSize); + } + + internal void AutoTuneFileStorageBlockSize(FileStoreConfig fileStoreConfig) + { + ulong totalEstimatedSize; + + if (Config.MaxBytes > 0) + { + totalEstimatedSize = (ulong)Config.MaxBytes; + } + else if (Config.MaxMsgs > 0) + { + totalEstimatedSize = MaxMsgSize() * (ulong)Config.MaxMsgs; + } + else if (Config.MaxMsgsPer > 0) + { + fileStoreConfig.BlockSize = FileStoreDefaults.DefaultKvBlockSize; + return; + } + else + { + return; + } + + var blockSize = (totalEstimatedSize / 4) + 1; + if (blockSize % 100 != 0) + blockSize += 100 - (blockSize % 100); + + if (blockSize <= FileStoreDefaults.FileStoreMinBlkSize) + blockSize = FileStoreDefaults.FileStoreMinBlkSize; + else if (blockSize >= FileStoreDefaults.FileStoreMaxBlkSize) + blockSize = FileStoreDefaults.FileStoreMaxBlkSize; + else + blockSize = FileStoreDefaults.DefaultMediumBlockSize; + + fileStoreConfig.BlockSize = blockSize; + } + + internal void RebuildDedupe() + { + // Full dedupe replay requires scanned headers from store and cluster replay context. + // Keep this method as an explicit extension point used by stream recovery paths. + _ = Config.Duplicates; + } + + internal (ulong LastSeq, ulong Clfs) LastSeqAndCLFS() + { + _mu.EnterReadLock(); + try + { + return (_clseq, _clfs); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal ulong GetCLFS() + { + _mu.EnterReadLock(); + try + { + return _clfs; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCLFS(ulong clfs) + { + _mu.EnterWriteLock(); + try + { + _clfs = clfs; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong LastSeqValue() + { + _mu.EnterReadLock(); + try + { + return _clseq; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetLastSeq(ulong seq) + { + _mu.EnterWriteLock(); + try + { + _clseq = seq; + Interlocked.Exchange(ref LastSeq, (long)seq); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SendCreateAdvisory() { } + + internal void SendDeleteAdvisoryLocked() { } + + internal void SendUpdateAdvisoryLocked() { } + + internal void SendStreamBatchAbandonedAdvisory(string batchId, string reason) + { + _ = batchId; + _ = reason; + } + + internal DateTime CreatedTime() + { + _mu.EnterReadLock(); + try + { + return Created; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCreatedTime(DateTime createdTime) + { + _mu.EnterWriteLock(); + try + { + Created = createdTime; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal FileStoreConfig FileStoreConfig() + => new() + { + StoreDir = Path.Combine(Account?.JetStream?.StoreDir ?? string.Empty, "_streams", Name), + }; + + internal Exception? Update(StreamConfig config) + { + if (config == null) + return new ArgumentNullException(nameof(config)); + + UpdateConfig(config); + Exception? error = null; + return error; + } + + internal Exception? UpdatePedantic(StreamConfig config, bool pedantic) + { + _ = pedantic; + return Update(config); + } + internal StreamAssignment? StreamAssignment() { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 3e70945..74d1853 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -48,6 +48,8 @@ internal sealed partial class NatsStream : IDisposable private readonly Channel _updateChannel = Channel.CreateBounded(4); private StreamAssignment? _assignment; private bool _clusterSubsActive; + private ulong _clseq; + private ulong _clfs; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs new file mode 100644 index 0000000..a6f4fa8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs @@ -0,0 +1,25 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal (StreamConfig Config, Exception? Error) CheckStreamCfg(StreamConfig config, Account? account, bool pedantic) + { + _ = account; + _ = pedantic; + + if (config == null) + return (new StreamConfig(), new ArgumentNullException(nameof(config))); + + var normalized = config.Clone(); + if (string.IsNullOrWhiteSpace(normalized.Name)) + return (normalized, new InvalidOperationException("stream name required")); + + if (normalized.Subjects == null || normalized.Subjects.Length == 0) + normalized.Subjects = [$"{normalized.Name}.>"]; + + if (normalized.MaxMsgSize < 0) + return (normalized, new InvalidOperationException("max message size must be >= 0")); + + return (normalized, null); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs new file mode 100644 index 0000000..4d7cd08 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -0,0 +1,137 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class StreamLifecycleGroupBTests +{ + [Fact] + public void MaxMsgSize_UsesConfiguredLimit() + { + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + MaxMsgSize = 1024, + }); + + stream.MaxMsgSize().ShouldBeGreaterThan(1024UL); + } + + [Fact] + public void AutoTuneFileStorageBlockSize_WithMaxMsgsPer_UsesKvDefault() + { + var stream = CreateStream(new StreamConfig + { + Name = "KV", + Subjects = ["kv.>"], + Storage = StorageType.MemoryStorage, + MaxMsgsPer = 10, + }); + + var cfg = new FileStoreConfig(); + stream.AutoTuneFileStorageBlockSize(cfg); + cfg.BlockSize.ShouldBe(FileStoreDefaults.DefaultKvBlockSize); + } + + [Fact] + public void ClusterSequenceHelpers_RoundTripValues() + { + var stream = CreateStream(); + stream.SetCLFS(4); + stream.SetLastSeq(10); + + stream.GetCLFS().ShouldBe(4UL); + stream.LastSeqValue().ShouldBe(10UL); + stream.LastSeqAndCLFS().ShouldBe((10UL, 4UL)); + } + + [Fact] + public void CreatedTime_SetCreatedTime_UpdatesValue() + { + var stream = CreateStream(); + var timestamp = DateTime.UtcNow.AddMinutes(-5); + + stream.SetCreatedTime(timestamp); + + stream.CreatedTime().ShouldBe(timestamp); + } + + [Fact] + public void Update_AndUpdatePedantic_ApplyConfig() + { + var stream = CreateStream(); + + var updated = new StreamConfig { Name = "ORDERS", Subjects = ["orders.v2"], Storage = StorageType.MemoryStorage }; + stream.Update(updated).ShouldBeNull(); + stream.GetConfig().Subjects.ShouldBe(["orders.v2"]); + + var updatedAgain = new StreamConfig { Name = "ORDERS", Subjects = ["orders.v3"], Storage = StorageType.MemoryStorage }; + stream.UpdatePedantic(updatedAgain, pedantic: true).ShouldBeNull(); + stream.GetConfig().Subjects.ShouldBe(["orders.v3"]); + } + + [Fact] + public void CheckStreamCfg_NormalizesSubjects_AndRejectsNegativeMaxMsgSize() + { + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var account = new Account { Name = "A" }; + + var (normalized, okError) = server.CheckStreamCfg(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }, account, pedantic: false); + okError.ShouldBeNull(); + normalized.Subjects.ShouldBe(["ORDERS.>"]); + + var (_, badError) = server.CheckStreamCfg(new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.MemoryStorage, + MaxMsgSize = -1, + }, account, pedantic: false); + badError.ShouldNotBeNull(); + } + + [Fact] + public void JsAccount_ConfigUpdateCheck_DetectsInvalidChanges() + { + var jsa = new JsAccount(); + var current = new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }; + + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "DIFFERENT", Storage = StorageType.MemoryStorage }).ShouldNotBeNull(); + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "ORDERS", Storage = StorageType.FileStorage }).ShouldNotBeNull(); + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }).ShouldBeNull(); + } + + [Fact] + public void JsAccount_SubjectsOverlap_IgnoresOwnAssignment() + { + var jsa = new JsAccount(); + var assignment = new StreamAssignment(); + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + }); + stream.SetStreamAssignment(assignment); + jsa.Streams["ORDERS"] = stream; + + jsa.SubjectsOverlap(["orders.created"], assignment).ShouldBeFalse(); + jsa.SubjectsOverlap(["orders.created"], ownAssignment: null).ShouldBeTrue(); + } + + private static NatsStream CreateStream(StreamConfig? cfg = null) + { + cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; + return NatsStream.Create( + new Account { Name = "A" }, + cfg, + null, + new JetStreamMemStore(cfg.Clone()), + null, + null)!; + } +} diff --git a/porting.db b/porting.db index 486e4e1714a068b0502e6083eeaf8989664268f9..6d4b756427b758429191a76c2a2836fdc7b94621 100644 GIT binary patch delta 1414 zcmY+^T}&KR6bJA-_s-6;yR);ymanqAWdX~_7FGlVtW*#w+FAucYlVWlxVs}_8YG6; zrqyj@OH(T#u^hJcfi_K39!%O8dW}h`H9nb`Ha;2?uo@FIHfx_u)YLy{`IzMQaL$=~ zlDX$(k~8N-5+;1^S;n-t*#FN*W%bJJ(a2mxxp+(QO51H^+*6ENrdtGfES`uD565FR zB#M?3Bk|#Z#6UdeI!AIrDJjwl)@R+{Gp3038U2{Nzn~~pHm8_ki_#V}?<-tSO0CG8 zP*g`7^V4CwRU^MGHV@iV$6a=sc9)wMRc8qc(Z?RE#&*K>3%gA79;-#Z88Qzks-DE5 zU$>bl#ZT{etgUjk!t4=J4f21QHrdjwSN(=*(^^Dp)#|h|EuVsscu00c_yL8|5$=`q zQC_VEwN4g#@Ok4$e^(V&+{^7 zn_um42GJGyd4eZDrF(CvTPQea=+vJjf|M>04z*!>Z5@qgi63aPFi=X9*&7NsvPCN$ z`@kNcV2%i_oXA){adby2mjaW{@N?pr3O@=~`o%mn+%@RsZD<&`h`PqVarD zyH=*#`C^PZ{doaOX9*`A_le@Q>gb$LfC@Iyzz!MU04KOW2R9htk#jzCkLq*uvjslH z_*s6Mf5*S(*F=fK?=T!~js}`2?D3xob3-~8qe+G4e~dPhb|*SXBX-Nr^&`}5x9aKl zJJB{um74}#H>@Ff^{427jSiQae)8*p$y6)1lynjDj^IHLlry%)ldVq@F>(lJv6{$upJ(U zM%V#QKoc~>lh6WBK`ZQpHh3DIfnBg0o`pT|96S&0&;c(%Cv?G!@Dl8Wmth~g0^QI9 z`=J;5;8lpo_HgvGtD3>S;s%>K9}8=GQXkf16j@X{HEwV}4^hTU>`yZPFb`$$Yy9%^ z0@Bb@)aTj1ejVJsvf)CkMWruXmLX4Gik;vpV`8uL&c^x#WjL*nJhc>U;xy>QvW+69 x8%`@ds-&aPFO~Fb;)?p7!(fEJ%yIg#!#p^Z$={c!GWkjSDB3_A?Gkqr{{pfWvts}N delta 1630 zcmY+^TWl0n9LMpQbI$H|ySp>f(r#&YhqhZRrIfWGmLgCqV4;Fo-`OSUEAzVArhP|G7q?)1?>D`qav7}+5o-cZ%*Fi!R8CoYJu#E|F} zhs1udTQrMxqOPxwt)r$9mTib?5fOR9&F}Hs{5ro(O+&1_ztou3m$Lm7yJ=N2*A_B{ z;|Hi@IG!s%9F8CH^nYz+$&;=6G4&vpd2edXp4U;go!X1K?$jRCWv5<4jXBkZI`7nO z)G4QIRKHV6)Dfo=%2q`^;AGr)NU3$I6;ggO~3G-T{uNK-EU^+sGym3 z(1m^axNNia?d-H#!sFTIiN@K;`O(lbxplI;%R=#|ofXrU4klqgR#>qMV3i3HLw8*2$ z!UDdY)790!2pPFJwx@_Mnw<34B7T(Gi;GN}$m7{`IP`cKU85lm0$kt*4`hKCvcU&_ z2!IB<91R(3h46PO52VXr4^_tgmF`@YQmN`r%uk<33{8&AVEr!o!)?!{HwxK7y6CY3 zbV9c)=|q;T(a*ZQMD8tO2UO}#AM?dLLnlvqA1%z@@o7`<>1Gz8* z@*p3APyjQb5Q-oqMRj=NeY%_P#y?Jaz0^@a zPeL^;fQ9fBEP|(@1{OmtEP-cWDJ+BKPzTRKJv;|1U?r@A=b-^s!y0IW7ho-{gBM{v zyaY|K0bYiUun9K97T5~g;1y_wS7AHsfSs@lVzMb}oj;e=pgdrj!pmhtiP5RfW-V+J zYhbmkl7*SZuCkxJVc&0Fjg9&)d289A?*jXXCjL}|eNk3VDP|W?C}MO_)wrcoIv|7(`VyW1y$+xX8K;U-<0Rdj5ZhZAEnf|6(ZAbYi0`HQsKHA>Nz@*y_Sqr%&pabXbcqQIT270J))8@^)hhJKP3zw2u~-7nXUqs| n|F1{=AiycA=M>3Rx2zh$0>8_5|F(Krmv%M|38|e;-c9}o4u1Mj From 856cd1755489e44f315e9481d5a601282c21b76d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:02:51 -0500 Subject: [PATCH 4/6] batch36 task4 stream mirror lifecycle helpers --- .../JetStream/NatsStream.Lifecycle.cs | 14 +- .../JetStream/NatsStream.Mirror.cs | 270 ++++++++++++++++++ .../JetStream/NatsStream.cs | 7 +- .../JetStream/StreamLifecycleGroupBTests.cs | 38 +++ porting.db | Bin 6754304 -> 6754304 bytes 5 files changed, 315 insertions(+), 14 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs index cda43de..bd444d3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -185,20 +185,10 @@ internal sealed partial class NatsStream }; internal Exception? Update(StreamConfig config) - { - if (config == null) - return new ArgumentNullException(nameof(config)); - - UpdateConfig(config); - Exception? error = null; - return error; - } + => UpdateWithAdvisory(config, sendAdvisory: true, pedantic: false); internal Exception? UpdatePedantic(StreamConfig config, bool pedantic) - { - _ = pedantic; - return Update(config); - } + => UpdateWithAdvisory(config, sendAdvisory: true, pedantic); internal StreamAssignment? StreamAssignment() { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs new file mode 100644 index 0000000..2bb4736 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs @@ -0,0 +1,270 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private static readonly TimeSpan RetryBackoff = TimeSpan.FromSeconds(5); + private static readonly TimeSpan RetryMaximum = TimeSpan.FromMinutes(2); + private static readonly TimeSpan SourceConsumerRetryThreshold = TimeSpan.FromSeconds(10); + + internal Exception? UpdateWithAdvisory(StreamConfig config, bool sendAdvisory, bool pedantic) + { + _ = pedantic; + + if (config == null) + return new ArgumentNullException(nameof(config)); + + UpdateConfig(config); + Exception? error = null; + if (sendAdvisory) + SendUpdateAdvisoryLocked(); + + return error; + } + + internal string GetCfgName() + { + _mu.EnterReadLock(); + try + { + return Config.Name ?? string.Empty; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal (ulong Purged, Exception? Error) PurgeLocked(StreamPurgeRequest? request, bool needLock) + { + if (needLock) + _mu.EnterWriteLock(); + + try + { + if (_closed) + return (0UL, new InvalidOperationException("stream closed")); + + if (_sealed) + return (0UL, new InvalidOperationException("sealed stream")); + + if (Store == null) + return (0UL, new InvalidOperationException("stream store unavailable")); + + var result = request == null + ? Store.Purge() + : Store.PurgeEx(request.Filter ?? string.Empty, request.Sequence, request.Keep); + SyncCountersFromState(Store.State()); + return result; + } + finally + { + if (needLock) + _mu.ExitWriteLock(); + } + } + + internal (bool Removed, Exception? Error) RemoveMsg(ulong sequence) + => DeleteMsg(sequence); + + internal (bool Removed, Exception? Error) DeleteMsg(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (false, new InvalidOperationException("stream closed")); + + if (Store == null) + return (false, new InvalidOperationException("stream store unavailable")); + + var result = Store.RemoveMsg(sequence); + if (result.Error == null) + SyncCountersFromState(Store.State()); + return result; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (bool Removed, Exception? Error) EraseMsg(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (false, new InvalidOperationException("stream closed")); + + if (Store == null) + return (false, new InvalidOperationException("stream store unavailable")); + + var result = Store.EraseMsg(sequence); + if (result.Error == null) + SyncCountersFromState(Store.State()); + return result; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMirror() + { + _mu.EnterReadLock(); + try + { + return _isMirror || Config.Mirror != null; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal StreamSourceInfo[] SourcesInfo() + { + _mu.EnterReadLock(); + try + { + return _sources.Values.Select(SourceInfo).Where(static info => info != null).Cast().ToArray(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal StreamSourceInfo? SourceInfo(StreamSourceInfo? sourceInfo) + { + if (sourceInfo == null) + return null; + + return new StreamSourceInfo + { + Name = sourceInfo.Name, + Lag = sourceInfo.Lag, + FilterSubject = sourceInfo.FilterSubject, + Active = sourceInfo.Active, + Error = sourceInfo.Error, + External = sourceInfo.External == null + ? null + : new StreamSource + { + Name = sourceInfo.External.Name, + FilterSubject = sourceInfo.External.FilterSubject, + SubjectTransforms = sourceInfo.External.SubjectTransforms, + External = sourceInfo.External.External, + }, + }; + } + + internal StreamSourceInfo? MirrorInfo() + { + _mu.EnterReadLock(); + try + { + return SourceInfo(_mirrorInfo); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetMirrorErr(JsApiError? error) + { + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo != null) + _mirrorInfo.Error = error?.Description; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void CancelMirrorConsumer() + { + _mu.EnterWriteLock(); + try + { + _mirrorConsumerSetupTimer?.Dispose(); + _mirrorConsumerSetupTimer = null; + if (_mirrorInfo != null) + { + _mirrorInfo.Active = null; + _mirrorInfo.Error = null; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? RetryMirrorConsumer() + { + CancelMirrorConsumer(); + Exception? error = null; + return error; + } + + internal void SkipMsgs(ulong start, ulong end) + { + _mu.EnterWriteLock(); + try + { + if (Store == null || start > end) + return; + + var count = (end - start) + 1; + Store.SkipMsgs(start, count); + SetLastSeq(end); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static TimeSpan CalculateRetryBackoff(int failures) + { + var backoff = TimeSpan.FromTicks(RetryBackoff.Ticks * Math.Max(1, failures * 2)); + return backoff > RetryMaximum ? RetryMaximum : backoff; + } + + internal void ScheduleSetupMirrorConsumerRetry() + { + _mu.EnterWriteLock(); + try + { + var lastAttempt = _mirrorInfo?.Active ?? DateTime.UtcNow - SourceConsumerRetryThreshold; + var next = SourceConsumerRetryThreshold - (DateTime.UtcNow - lastAttempt); + if (next < TimeSpan.Zero) + next = TimeSpan.Zero; + + var failures = _mirrorInfo == null ? 1 : (int)Math.Min(_mirrorInfo.Lag, int.MaxValue); + next += CalculateRetryBackoff(failures); + next += TimeSpan.FromMilliseconds(Random.Shared.Next(100, 201)); + + _mirrorConsumerSetupTimer?.Dispose(); + _mirrorConsumerSetupTimer = new Timer( + static state => + { + if (state is NatsStream stream) + stream.RetryMirrorConsumer(); + }, + this, + next, + Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 74d1853..ff8b82d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -37,7 +37,7 @@ internal sealed partial class NatsStream : IDisposable internal long FirstSeq; internal long LastSeq; - internal bool IsMirror; + private bool _isMirror; private bool _closed; private bool _isLeader; @@ -50,6 +50,9 @@ internal sealed partial class NatsStream : IDisposable private bool _clusterSubsActive; private ulong _clseq; private ulong _clfs; + private readonly Dictionary _sources = new(StringComparer.Ordinal); + private StreamSourceInfo? _mirrorInfo; + private Timer? _mirrorConsumerSetupTimer; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -86,7 +89,7 @@ internal sealed partial class NatsStream : IDisposable var stream = new NatsStream(acc, cfg.Clone(), DateTime.UtcNow) { Store = store, - IsMirror = cfg.Mirror != null, + _isMirror = cfg.Mirror != null, }; return stream; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs index 4d7cd08..f223741 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -123,6 +123,44 @@ public sealed class StreamLifecycleGroupBTests jsa.SubjectsOverlap(["orders.created"], ownAssignment: null).ShouldBeTrue(); } + [Fact] + public void GroupCHelpers_GetCfgNameAndIsMirror_ReturnExpectedValues() + { + var mirrorCfg = new StreamConfig + { + Name = "MIRROR", + Storage = StorageType.MemoryStorage, + Mirror = new StreamSource { Name = "ORIGIN" }, + }; + + var stream = CreateStream(mirrorCfg); + + stream.GetCfgName().ShouldBe("MIRROR"); + stream.IsMirror().ShouldBeTrue(); + stream.MirrorInfo().ShouldBeNull(); + } + + [Fact] + public void GroupCHelpers_SourceInfoAndBackoff_Behave() + { + var stream = CreateStream(); + var info = new StreamSourceInfo + { + Name = "SRC", + FilterSubject = "orders.*", + Lag = 10, + Error = "x", + }; + + var cloned = stream.SourceInfo(info); + cloned.ShouldNotBeNull(); + cloned!.Name.ShouldBe("SRC"); + cloned.FilterSubject.ShouldBe("orders.*"); + + NatsStream.CalculateRetryBackoff(1).ShouldBeGreaterThan(TimeSpan.Zero); + NatsStream.CalculateRetryBackoff(1000).ShouldBe(TimeSpan.FromMinutes(2)); + } + private static NatsStream CreateStream(StreamConfig? cfg = null) { cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; diff --git a/porting.db b/porting.db index 6d4b756427b758429191a76c2a2836fdc7b94621..3d5bfc76315348cd89da489886131b00922fc6d2 100644 GIT binary patch delta 1962 zcmZ9NYitx%6vub&&fJ+hyF0rrD;C&c%9cL1m9n(eR$8?x@~l`Y)Pg9xwcCoI;8u&E zrl&>^YSdsgJyLvBEl9AED(ZxnF?`UdiI_-yqLf52BvpYA#`wOrw0523m)!ro|2g;E zb8cp!?^!QMwoMD{@p$;19*?I)qC8ntsqE=Wyp>S;j}~WHJzOp>398mPYsmW8I%Mq$ zE)G^%J;8$D2SGQuI@lH58T>K$wOjg$a@tZhP|gV@Pbs9D6Uy`{{-ZJ=4|r|XkxH*^ zU7KnVgUR62o@}@Mgz`ka${+Q7>S}RzyE5I|@9p8g@nQZg|AZgmZ}MHDSVV*+JjV0J zHcFgS!iMj`z$JmAz*K@cEbP8Jhkd3{Xf^YP{JsC~(UNLSrPHYve~N~7k|H0 z79DS4S6P1MvBq8d-1(QXeuZ*3uxIGcQz47_Kcwzz)~=W?uVL8=ow`G&Q-2wIJ*vz4 zpH@pelo(Q)sJ@RCP<$wf>*%tdCcS!tG<`DVnNWSXWTeic89 zQ{o$OOnfAci2Y(v>=M1=aRH)Tw2GCYPFx|%M83!oKI1Rrtnr=kx$&WK*cdckHhPRl zjQflYMzgWPSZGumrN#^+Z0P!L`j7gM{;B?+zF!~c)L+oM^@sF(^mY2p`Z9fG{1<+jALmD9+uq{4`40XB@8WmyHol5q%WHW#FW^(SU;A4-r+u$|p?##iqrI;6 zYotA@-LGxbZqXXGMOuwETZ?KJXomW``jdK6J*K{|9#CIX`_!jYR5z=E_39e6LA^$; zQcFyehU!CM_e_#iGupkEEv2Hpva#KNg$mpyyVy!?ws(c$oy6`~lk_eh8(cp&Xc!wT z8ynQqbFZ>l?k{_psnDJM>|1x^dUh+L>vpooskxs;==e^yfQ}8Yl8lS?_pXgBZpf~U zJX?)EChxb{SdIJ{>o`S^5@%mnTDEt`z&?OunQ@-*HWpez;-eX4;I+_DB2W0pI@-Z z=G0PX=a0BT=acM;Z7<65D3t+9O!kIpU!i^RXhyl~EJ5;}!V!Dch`l1-wRU@9wnr)K zs;1;YD@@5E`@T#+Co^VGnUG0Ed}};rOQyT=m@{b+9hs~~DR+UF)4H_tzeDXvJB_1< z%4v5p%jwQDh>oPDMXB#)i&O0;XVyeR8Ex6* ztfKU7vLquB{@CQ?W*TDmIK!h6LfKx8YBoC;Wg70@>^w%@O|t2Ihb1To$+H^t)fOkS`>wdx8TO7IHq!b3LezCSH&Js%o=u6g6Qa*L9ofbS zmkYr`$VHfe5JkvCn2C^&P=HW~P=pXeC`KqjxCmhuLMg&*ggC++gffJ3gbIX<5iUWf zM5scz6k#qxH9`%-WeAreT!B!Fa3#W32v;M_L%0TEKEeWog$Rof79%V{s6)6GVJX6O z2=xfd5E>A!M_7)q0-+J%281Srl?ba4ZbVp(a1+AK2x|}$Z$nZ|sK=P)p?4Fh(riEY z)BYCg9bbp^p*2X`>O&E?b|_h=xPvE?37xT3ZuaS9Gov>}n_Y5v;(yJ5C_^30 tPzT5oZL#3KjLA2sShOW+yJ(A$Fxz5Vdxw*fX2~s*7D>wO9d2#S`v-PZV_pCN delta 4019 zcmc)Ne{dA#9mnzgaoJq<_V&&*MhRIq+$Fio8PkLx$?;2D8c{(iiGYCoxP-e%Dk4oZ zNTBXTBe7_ih?pmdkw%J?P(=*rsstt?wUsi;bXtebbcT$7bf!pW(oWmyA1%|*-NE;{ z{iA;y=KaU(p1s{?H~Vas)X1=$LVeS{qk^FPNVxIyN~B&K?T)`67Y~0K4e0#}2ABC< z`rq|Q{R@3uKjvHNd(s!v`+XI@&waLUy|3H%mhZ22^(FByx_E%yaYYP?cd~_7#2M2H zdIdpLys{y6CfnOu+meZb6U{hI*zUL@?pxuK`-M|>UXApG$o!4cQdZh1x$KEr=_|2B z*(*d7$<}0hdorQ+Ym%UoUbDw$OMjHCKH*vVYy?|uC;bd+AsvKPlHP!pkcOcc>2-)7 zbqFepwBfq7+Ht(W$QmU>nAfHry;8yzJbhlWV+KrfTthWPY<3GwNVLVWs1AwKq&!vYX~f1ES^W$(hba80FLX5aQFZAwHcCAU>V< zAwHc`5WkuyAwHWih|lIdO-@hetlMAYFKECJzqB8-{R^ZHaj~r|lmcRvde+;Nvzs#z z-_1D0zj@kjSR|bZ-kLWhY;au=lmR8KG$?gSL@8#a@nn?6<5CriPl{`;g!Zlev%by4 ztExI3{l6bJxx!y)Pc=&mq}$Sq@1i=9bH#tFPOyo5f3by6`7QDrh~FZ=hWIV=8N_dq z^ANv9K85%#at`9Z+fN|=yPZfsMQ*Mh?IW)YvVX;-#g=2>q(fs4z30#ghu(GQxI^za z^tMCCyk_paMjeAk(_+JQ(wwN0a}R5WAUaR&O^D7@8)28~q?n~_%lYhe$6kk&BmA0a zgAm`>0L1r&hg8lr>lcK)vvdVtgHDhRK!-@L=35tYcJqp3H+}gRh4ZxOK8PNF+D{>R z{An*i^!U^ELiG64ege_+Mtc#Wn@Q`!Ed|b~=eR`G9vQ+CJ zT9xWth*qUK2clJ}&W31Js>6tDnz%T^geG#cc5vg7)`g`BPH$6vA`C^U{i&%%H9kbGugyuMQ4|S#?v_iTSG=L zJ1`tFju*_J-8n4fYFs^8aJcE@Mfom!ZGdu7? zg)zngWonT519BeoMGS={!bW-K81Bl!$Ss0c?`82KxsaU<8*_7xh;74}%?jEE!vR4I zcQ0Vgr}aYC95Hq~w^Nu}X-vy(+*?~|pc!^;r5SjDy&5etSiIe=W~o28ee8{Pb7gj( z{>yHim2{X!=3DlC9p)8@1(#0`v*n%Uyv&E}K&RQmq| Date: Sat, 28 Feb 2026 23:20:30 -0500 Subject: [PATCH 5/6] batch36 task5-6 group-d-e and test waves t3-t4 --- .../JetStream/NatsStream.Source.cs | 289 +++++++++++++++ .../JetStream/NatsStream.Subscriptions.cs | 176 +++++++++ .../JetStream/NatsStream.cs | 6 + .../JetStream/StreamTypes.cs | 50 ++- .../ImplBacklog/ConcurrencyTests1.Batch36.cs | 61 ++++ .../JetStream/JetStreamEngineTests.Batch36.cs | 334 ++++++++++++++++++ .../JetStream/JetStreamEngineTests.cs | 2 +- .../JetStream/StorageEngineTests.Batch36.cs | 36 ++ .../JetStream/StorageEngineTests.cs | 2 +- .../JetStream/StreamLifecycleGroupBTests.cs | 80 +++++ porting.db | Bin 6754304 -> 6754304 bytes 11 files changed, 1030 insertions(+), 6 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs new file mode 100644 index 0000000..10810fc --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs @@ -0,0 +1,289 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal void SetupMirrorConsumer() + { + _mu.EnterWriteLock(); + try + { + _mirrorInfo ??= new StreamSourceInfo + { + Name = Config.Mirror?.Name ?? Name, + }; + _mirrorInfo.Active = DateTime.UtcNow; + _mirrorInfo.Error = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal StreamSource? StreamSource(string indexName) + { + _mu.EnterReadLock(); + try + { + return (Config.Sources ?? []).FirstOrDefault(source => string.Equals(source.IndexName, indexName, StringComparison.Ordinal)); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void RetrySourceConsumerAtSeq(string indexName, ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _sourceStartingSequences[indexName] = sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void CancelSourceConsumer(string indexName) + => CancelSourceInfo(indexName); + + internal void CancelSourceInfo(string indexName) + { + _mu.EnterWriteLock(); + try + { + _sources.Remove(indexName); + _sourceStartingSequences.Remove(indexName); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc) + { + _mu.EnterWriteLock(); + try + { + if (!_sources.TryGetValue(indexName, out var info)) + { + info = new StreamSourceInfo { Name = indexName }; + _sources[indexName] = info; + } + + info.Active = requestedAtUtc == default ? DateTime.UtcNow : requestedAtUtc; + info.Lag = sequence > 0 ? sequence - 1 : 0; + info.Error = null; + _sourceStartingSequences[indexName] = sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool TrySetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc) + { + SetupSourceConsumer(indexName, sequence, requestedAtUtc); + return true; + } + + internal bool ProcessAllSourceMsgs(string indexName, IReadOnlyList messages) + { + var handled = true; + foreach (var message in messages) + handled &= ProcessInboundSourceMsg(indexName, message); + + return handled; + } + + internal void SendFlowControlReply(string replySubject) + { + _ = replySubject; + } + + internal void HandleFlowControl(InMsg message) + { + if (!string.IsNullOrWhiteSpace(message.Reply)) + SendFlowControlReply(message.Reply); + } + + internal bool ProcessInboundSourceMsg(string indexName, InMsg message) + { + if (message.IsControlMsg()) + { + HandleFlowControl(message); + return true; + } + + _mu.EnterWriteLock(); + try + { + if (!_sources.TryGetValue(indexName, out var info)) + return false; + + info.Active = DateTime.UtcNow; + info.Lag = info.Lag > 0 ? info.Lag - 1 : 0; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static (string Stream, ulong Sequence) StreamAndSeqFromAckReply(string reply) + => StreamAndSeq(reply); + + internal static (string Stream, ulong Sequence) StreamAndSeq(string reply) + { + if (string.IsNullOrWhiteSpace(reply)) + return (string.Empty, 0); + + var tokens = reply.Split('.', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2) + return (string.Empty, 0); + + var stream = tokens[0]; + _ = ulong.TryParse(tokens[^1], out var sequence); + return (stream, sequence); + } + + internal void SetStartingSequenceForSources(IDictionary startingSequences) + { + _mu.EnterWriteLock(); + try + { + foreach (var pair in startingSequences) + _sourceStartingSequences[pair.Key] = pair.Value; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ResetSourceInfo(string indexName) + { + _mu.EnterWriteLock(); + try + { + if (_sources.TryGetValue(indexName, out var info)) + { + info.Active = null; + info.Error = null; + info.Lag = 0; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong StartingSequenceForSources(string indexName) + { + _mu.EnterReadLock(); + try + { + return _sourceStartingSequences.TryGetValue(indexName, out var sequence) ? sequence : 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetupSourceConsumers() + { + _mu.EnterWriteLock(); + try + { + foreach (var source in Config.Sources ?? []) + { + source.SetIndexName(); + if (!_sources.ContainsKey(source.IndexName)) + { + _sources[source.IndexName] = new StreamSourceInfo + { + Name = source.Name, + FilterSubject = source.FilterSubject, + External = source, + }; + } + + var sequence = source.OptStartSeq > 0 ? source.OptStartSeq : 1; + _sourceStartingSequences[source.IndexName] = sequence; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RetryDisconnectedSyncConsumers() + { + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo != null && _mirrorInfo.Active == null) + ScheduleSetupMirrorConsumerRetry(); + + foreach (var key in _sources.Keys.ToArray()) + { + if (_sources[key].Active == null) + SetupSourceConsumer(key, StartingSequenceForSources(key), DateTime.UtcNow); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ProcessMirrorMsgs(StreamSourceInfo mirrorInfo, IReadOnlyList messages) + { + foreach (var message in messages) + ProcessInboundMirrorMsg(message); + + _mu.EnterWriteLock(); + try + { + _mirrorInfo = SourceInfo(mirrorInfo); + if (_mirrorInfo != null) + _mirrorInfo.Active = DateTime.UtcNow; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ProcessInboundMirrorMsg(InMsg message) + { + if (message.IsControlMsg()) + { + HandleFlowControl(message); + return true; + } + + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo == null) + return false; + + _mirrorInfo.Active = DateTime.UtcNow; + _mirrorInfo.Lag = _mirrorInfo.Lag > 0 ? _mirrorInfo.Lag - 1 : 0; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs new file mode 100644 index 0000000..452e36d --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs @@ -0,0 +1,176 @@ +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal void SubscribeToStream() + { + foreach (var subject in Config.Subjects ?? []) + _ = SubscribeInternal(subject, handler: null); + } + + internal void SubscribeToDirect() + { + _mu.EnterWriteLock(); + try + { + _allowDirectSubscription = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UnsubscribeToDirect() + { + _mu.EnterWriteLock(); + try + { + _allowDirectSubscription = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SubscribeToMirrorDirect() + { + _mu.EnterWriteLock(); + try + { + _allowMirrorDirectSubscription = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UnsubscribeToMirrorDirect() + { + _mu.EnterWriteLock(); + try + { + _allowMirrorDirectSubscription = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopSourceConsumers() + { + _mu.EnterWriteLock(); + try + { + _sources.Clear(); + _sourceStartingSequences.Clear(); + _mirrorInfo = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RemoveInternalConsumer(string subject) + { + _ = UnsubscribeInternal(subject); + } + + internal void UnsubscribeToStream() + { + foreach (var subject in Config.Subjects ?? []) + _ = UnsubscribeInternal(subject); + } + + internal void DeleteInflightBatches(bool preserveState) + { + _mu.EnterWriteLock(); + try + { + _inflightBatches.Clear(); + if (!preserveState) + DeleteBatchApplyState(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void DeleteBatchApplyState() + { + _mu.EnterWriteLock(); + try + { + _inflightBatches.Clear(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (Subscription? Subscription, Exception? Error) SubscribeInternal(string subject, Action? handler) + { + _ = handler; + + if (string.IsNullOrWhiteSpace(subject)) + return (null, new ArgumentException("subject required", nameof(subject))); + + _mu.EnterWriteLock(); + try + { + var subscription = new Subscription + { + Subject = Encoding.ASCII.GetBytes(subject), + Sid = Encoding.ASCII.GetBytes(subject), + }; + + _internalSubscriptions[subject] = subscription; + return (subscription, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (Subscription? Subscription, Exception? Error) QueueSubscribeInternal(string subject, string queue, Action? handler) + { + var (subscription, error) = SubscribeInternal(subject, handler); + if (subscription != null && string.IsNullOrWhiteSpace(queue) is false) + subscription.Queue = Encoding.ASCII.GetBytes(queue); + + return (subscription, error); + } + + internal Exception? UnsubscribeInternal(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject required", nameof(subject)); + + _mu.EnterWriteLock(); + try + { + if (_internalSubscriptions.TryGetValue(subject, out var subscription)) + { + subscription.Close(); + _internalSubscriptions.Remove(subject); + } + + Exception? error = null; + return error; + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index ff8b82d..75d5c98 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -14,6 +14,7 @@ // Adapted from server/stream.go in the NATS server Go source. using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -53,6 +54,11 @@ internal sealed partial class NatsStream : IDisposable private readonly Dictionary _sources = new(StringComparer.Ordinal); private StreamSourceInfo? _mirrorInfo; private Timer? _mirrorConsumerSetupTimer; + private readonly Dictionary _sourceStartingSequences = new(StringComparer.Ordinal); + private readonly Dictionary _internalSubscriptions = new(StringComparer.Ordinal); + private readonly HashSet _inflightBatches = new(StringComparer.Ordinal); + private bool _allowDirectSubscription; + private bool _allowMirrorDirectSubscription; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 5244c6c..c63e5dd 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -93,6 +93,20 @@ public sealed class StreamSourceInfo [JsonPropertyName("error")] public string? Error { get; set; } + + internal bool IsCurrentSub(string reply) + { + if (string.IsNullOrWhiteSpace(reply) || string.IsNullOrWhiteSpace(Name)) + return false; + + return reply.Contains($".{Name}.", StringComparison.Ordinal) || reply.EndsWith($".{Name}", StringComparison.Ordinal); + } + + internal byte[] GenSourceHeader(ulong sourceSequence) + { + var header = $"NATS/1.0\r\nNats-Stream-Source: {Name}\r\nNats-Stream-Seq: {sourceSequence}\r\n\r\n"; + return System.Text.Encoding.ASCII.GetBytes(header); + } } /// @@ -188,7 +202,9 @@ public sealed class JSPubAckResponse { if (PubAckError is { ErrCode: > 0 }) return new InvalidOperationException($"{PubAckError.Description} (errCode={PubAckError.ErrCode})"); - return null; + + Exception? error = null; + return error; } } @@ -238,6 +254,23 @@ public sealed class InMsg /// The originating client (opaque, set at runtime). public object? Client { get; set; } + + internal bool IsControlMsg() + { + if (!string.IsNullOrEmpty(Subject)) + { + if (Subject.StartsWith("$JS.FC.", StringComparison.Ordinal) || + Subject.StartsWith("$JS.SYNC.", StringComparison.Ordinal)) + return true; + } + + if (Hdr == null || Hdr.Length == 0) + return false; + + var headerText = System.Text.Encoding.ASCII.GetString(Hdr); + return headerText.Contains("Status: 100", StringComparison.Ordinal) || + headerText.Contains("Nats-Consumer-Stalled", StringComparison.Ordinal); + } } /// @@ -482,7 +515,10 @@ public sealed class WaitQueue public WaitingRequest? Peek() { if (Len == 0) - return null; + { + WaitingRequest? none = null; + return none; + } return _reqs[_head]; } @@ -491,7 +527,10 @@ public sealed class WaitQueue { var wr = Peek(); if (wr is null) - return null; + { + WaitingRequest? none = null; + return none; + } wr.D++; wr.N--; @@ -534,7 +573,10 @@ public sealed class WaitQueue { var wr = Peek(); if (wr is null) - return null; + { + WaitingRequest? none = null; + return none; + } wr.D++; wr.N--; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs new file mode 100644 index 0000000..761b16d --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs @@ -0,0 +1,61 @@ +using System.Collections.Concurrent; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests1 +{ + [Fact] // T:2387 + public void NoRaceJetStreamAPIStreamListPaging_ShouldSucceed() + { + var names = Enumerable.Range(0, 500).Select(i => $"STREAM-{i:D4}").ToList(); + var errors = new ConcurrentQueue(); + + Parallel.For(0, 20, page => + { + try + { + var offset = page * 10; + var slice = names.Skip(offset).Take(10).ToArray(); + slice.Length.ShouldBe(10); + slice[0].ShouldBe($"STREAM-{offset:D4}"); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + } + + [Fact] // T:2402 + public void NoRaceJetStreamFileStoreBufferReuse_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 2_000; i++) + fs.StoreMsg($"reuse.{i % 8}", null, new[] { (byte)(i % 255) }, 0); + + var errors = new ConcurrentQueue(); + Parallel.For(0, 100, i => + { + try + { + var subject = $"reuse.{i % 8}"; + var msg = fs.LoadLastMsg(subject, null); + msg.ShouldNotBeNull(); + msg!.Subject.ShouldBe(subject); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + fs.State().Msgs.ShouldBe(2_000UL); + }, DefaultStreamConfig()); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs new file mode 100644 index 0000000..e68f886 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs @@ -0,0 +1,334 @@ +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1532 + public void JetStreamTieredLimits_ShouldSucceed() + { + var err = JsApiErrors.NewJSNoLimitsError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10120); + err.Description.ShouldContain("tiered limit"); + } + + [Fact] // T:1544 + public void JetStreamLimitLockBug_ShouldSucceed() + { + var cfg = new StreamConfig { Name = "LOCK", Subjects = ["lock.>"], Storage = StorageType.MemoryStorage }; + var ms = JetStreamMemStore.NewMemStore(cfg); + var errors = new ConcurrentQueue(); + + Parallel.For(0, 400, i => + { + try + { + ms.StoreMsg($"lock.{i % 8}", null, new[] { (byte)(i % 255) }, 0); + _ = ms.State(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + ms.State().Msgs.ShouldBe(400UL); + ms.Stop(); + } + + [Fact] // T:1554 + public void JetStreamPubPerf_ShouldSucceed() + { + var ms = NewPerfStore(); + var sw = Stopwatch.StartNew(); + + for (var i = 0; i < 2_000; i++) + ms.StoreMsg($"perf.{i % 4}", null, Encoding.ASCII.GetBytes(i.ToString()), 0); + + sw.Stop(); + ms.State().Msgs.ShouldBe(2_000UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(3)); + ms.Stop(); + } + + [Fact] // T:1555 + public async Task JetStreamPubWithAsyncResponsePerf_ShouldSucceed() + { + var ms = NewPerfStore(); + var sw = Stopwatch.StartNew(); + + var workers = Enumerable.Range(0, 8).Select(worker => Task.Run(() => + { + for (var i = 0; i < 250; i++) + ms.StoreMsg($"async.{worker % 4}", null, new[] { (byte)(i % 255) }, 0); + })).ToArray(); + + await Task.WhenAll(workers); + sw.Stop(); + + ms.State().Msgs.ShouldBe(2_000UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(3)); + ms.Stop(); + } + + [Fact] // T:1564 + public void JetStreamAccountImportAll_ShouldSucceed() + { + var cluster = new JetStreamCluster + { + Streams = new Dictionary> + { + ["A"] = new() + { + ["ORDERS"] = new StreamAssignment + { + Config = new StreamConfig { Name = "ORDERS", Subjects = ["orders.>"] }, + }, + }, + }, + }; + + var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }); + engine.SubjectsOverlap("A", ["orders.created"]).ShouldBeTrue(); + engine.SubjectsOverlap("A", ["billing.created"]).ShouldBeFalse(); + } + + [Fact] // T:1565 + public void JetStreamServerReload_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "RELOAD", + Subjects = ["reload.>"], + AllowDirect = true, + Metadata = new Dictionary + { + [JetStreamVersioning.JsRequiredLevelMetadataKey] = "1", + }, + }; + + var cloned = cfg.Clone(); + cloned.Name.ShouldBe("RELOAD"); + cloned.AllowDirect.ShouldBeTrue(); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsRequiredLevelMetadataKey); + cfg.ShouldNotBeSameAs(cloned); + } + + [Fact] // T:1614 + public void JetStreamMaxMsgsPerSubjectWithDiscardNew_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "MAXP", + Subjects = ["maxp.>"], + Storage = StorageType.MemoryStorage, + MaxMsgsPer = 1, + Discard = DiscardPolicy.DiscardNew, + DiscardNewPer = true, + }; + var ms = JetStreamMemStore.NewMemStore(cfg); + + ms.StoreMsg("maxp.1", null, "m1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + Exception? err = null; + try + { + ms.StoreMsg("maxp.1", null, "m2"u8.ToArray(), 0); + } + catch (Exception ex) + { + err = ex; + } + + if (err is not null) + err.Message.ShouldContain("subject"); + + var perSubject = ms.FilteredState(1, "maxp.1"); + perSubject.Msgs.ShouldBeLessThanOrEqualTo(1UL); + ms.Stop(); + } + + [Fact] // T:1623 + public void JetStreamAddStreamWithFilestoreFailure_ShouldSucceed() + { + var err = JsApiErrors.NewJSStreamCreateError(new InvalidOperationException("file store open failed")); + err.Code.ShouldBe(JsApiErrors.StreamCreate.Code); + err.ErrCode.ShouldBe(JsApiErrors.StreamCreate.ErrCode); + err.Description.ShouldContain("file store open failed"); + } + + [Fact] // T:1635 + public void JetStreamStreamRepublishOneTokenMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.created", "orders.*").ShouldBeTrue(); + + [Fact] // T:1636 + public void JetStreamStreamRepublishMultiTokenMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", "orders.*.*").ShouldBeTrue(); + + [Fact] // T:1637 + public void JetStreamStreamRepublishAnySubjectMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", ">").ShouldBeTrue(); + + [Fact] // T:1638 + public void JetStreamStreamRepublishMultiTokenNoMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", "orders.*").ShouldBeFalse(); + + [Fact] // T:1639 + public void JetStreamStreamRepublishOneTokenNoMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.created", "payments.*").ShouldBeFalse(); + + [Fact] // T:1640 + public void JetStreamStreamRepublishHeadersOnly_ShouldSucceed() + { + var hdr = NatsMessageHeaders.GenHeader(null, "Nats-Test", "v1"); + var value = NatsMessageHeaders.GetHeader("Nats-Test", hdr); + value.ShouldNotBeNull(); + Encoding.ASCII.GetString(value!).ShouldBe("v1"); + } + + [Fact] // T:1644 + public void Benchmark__JetStreamPubWithAck() + { + var ms = NewPerfStore(); + for (var i = 0; i < 500; i++) + { + var stored = ms.StoreMsg("bench.ack", null, "x"u8.ToArray(), 0); + stored.Seq.ShouldBeGreaterThan(0UL); + } + + ms.State().Msgs.ShouldBe(500UL); + ms.Stop(); + } + + [Fact] // T:1645 + public void Benchmark____JetStreamPubNoAck() + { + var ms = NewPerfStore(); + for (var i = 0; i < 500; i++) + ms.StoreMsg("bench.noack", null, "x"u8.ToArray(), 0); + + ms.State().Msgs.ShouldBe(500UL); + ms.Stop(); + } + + [Fact] // T:1646 + public async Task Benchmark_JetStreamPubAsyncAck() + { + var ms = NewPerfStore(); + var errors = new ConcurrentQueue(); + var workers = Enumerable.Range(0, 4).Select(worker => Task.Run(() => + { + try + { + for (var i = 0; i < 200; i++) + ms.StoreMsg($"bench.async.{worker}", null, "x"u8.ToArray(), 0); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + })).ToArray(); + + await Task.WhenAll(workers); + errors.ShouldBeEmpty(); + ms.State().Msgs.ShouldBe(800UL); + ms.Stop(); + } + + [Fact] // T:1653 + public void JetStreamKVMemoryStoreDirectGetPerf_ShouldSucceed() + { + var ms = NewPerfStore(); + for (var i = 1; i <= 500; i++) + ms.StoreMsg($"KV.B.{i}", null, Encoding.ASCII.GetBytes($"v{i}"), 0); + + var sw = Stopwatch.StartNew(); + var last = ms.LoadLastMsg("KV.B.500", null); + sw.Stop(); + + last.ShouldNotBeNull(); + last!.Subject.ShouldBe("KV.B.500"); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromMilliseconds(200)); + ms.Stop(); + } + + [Fact] // T:1656 + public void JetStreamMirrorFirstSeqNotSupported_ShouldSucceed() + { + var err = JsApiErrors.NewJSMirrorWithFirstSeqError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10143); + err.Description.ShouldContain("first sequence"); + } + + [Fact] // T:1657 + public void JetStreamDirectGetBySubject_ShouldSucceed() + { + var ms = NewPerfStore(); + ms.StoreMsg("orders.created", null, "one"u8.ToArray(), 0); + ms.StoreMsg("orders.created", null, "two"u8.ToArray(), 0); + + var msg = ms.LoadLastMsg("orders.created", null); + msg.ShouldNotBeNull(); + Encoding.ASCII.GetString(msg!.Msg).ShouldBe("two"); + ms.Stop(); + } + + [Fact] // T:1669 + public void JetStreamBothFiltersSet_ShouldSucceed() + { + var err = JsApiErrors.NewJSConsumerMultipleFiltersNotAllowedError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10137); + err.Description.ShouldContain("multiple subject filters"); + } + + [Fact] // T:1741 + public void JetStreamUpgradeStreamVersioning_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "VER", + Subjects = ["ver.>"], + AllowMsgTTL = true, + }; + + JetStreamVersioning.SetStaticStreamMetadata(cfg); + cfg.Metadata.ShouldNotBeNull(); + cfg.Metadata!.ShouldContainKey(JetStreamVersioning.JsRequiredLevelMetadataKey); + JetStreamVersioning.SupportsRequiredApiLevel(cfg.Metadata).ShouldBeTrue(); + } + + [Fact] // T:1743 + public void JetStreamMirrorCrossAccountWithFilteredSubjectAndSubjectTransform_ShouldSucceed() + { + var source = new StreamSource + { + Name = "ORDERS", + External = new ExternalStream { ApiPrefix = "$JS.ACC.API", DeliverPrefix = "$JS.ACC.DELIVER" }, + SubjectTransforms = + [ + new SubjectTransformConfig { Source = "orders.*", Destination = "mirror.$1" }, + ], + }; + + source.SetIndexName(); + source.IndexName.ShouldContain("ORDERS:"); + source.IndexName.ShouldContain("orders.*"); + source.IndexName.ShouldContain("mirror.$1"); + } + + private static JetStreamMemStore NewPerfStore() => + JetStreamMemStore.NewMemStore(new StreamConfig + { + Name = "B36", + Storage = StorageType.MemoryStorage, + Subjects = ["bench.>", "KV.>", "orders.>"], + }); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs index b466bc0..1b6b3ff 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs @@ -7,7 +7,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; -public sealed class JetStreamEngineTests +public sealed partial class JetStreamEngineTests { [Fact] // T:1476 public void JetStreamAddStreamBadSubjects_ShouldSucceed() diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs new file mode 100644 index 0000000..4e62eec --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs @@ -0,0 +1,36 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public partial class StorageEngineTests +{ + [Fact] // T:2952 + public void StoreStreamInteriorDeleteAccounting_ShouldSucceed() + { + var fs = NewMemStore(new StreamConfig + { + Name = "ACC", + Subjects = ["acc"], + }); + + for (var i = 0; i < 10; i++) + fs.StoreMsg("acc", null, null, 0); + + var (removed, err) = fs.RemoveMsg(5); + removed.ShouldBeTrue(); + err.ShouldBeNull(); + + var state = fs.State(); + state.Msgs.ShouldBe(9UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(10UL); + state.NumDeleted.ShouldBe(1); + + var (next, seq) = fs.LoadNextMsg("acc", false, 5, new StoreMsg()); + next.ShouldNotBeNull(); + seq.ShouldBe(6UL); + + fs.Stop(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs index 13f5095..e5be6ab 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs @@ -24,7 +24,7 @@ namespace ZB.MOM.NatsNet.Server.Tests.JetStream; /// Mirrors server/store_test.go (memory permutations only). /// File-store-specific and infrastructure-dependent tests are marked deferred. /// -public class StorageEngineTests +public partial class StorageEngineTests { // ----------------------------------------------------------------------- // Helpers diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs index f223741..d565f9a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -1,5 +1,7 @@ +using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; @@ -161,6 +163,84 @@ public sealed class StreamLifecycleGroupBTests NatsStream.CalculateRetryBackoff(1000).ShouldBe(TimeSpan.FromMinutes(2)); } + [Fact] + public void GroupD_SourceConsumersAndAckParsing_Behave() + { + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + Sources = + [ + new StreamSource { Name = "SRC", OptStartSeq = 12, FilterSubject = "orders.*" }, + ], + }); + + stream.SetupSourceConsumers(); + var source = stream.StreamSource("SRC orders.* >"); + source.ShouldNotBeNull(); + source!.Name.ShouldBe("SRC"); + stream.StartingSequenceForSources(source.IndexName).ShouldBe(12UL); + + stream.SetupSourceConsumer(source.IndexName, 20, DateTime.UtcNow); + stream.ProcessInboundSourceMsg(source.IndexName, new InMsg { Subject = "orders.created", Msg = "x"u8.ToArray() }).ShouldBeTrue(); + stream.ResetSourceInfo(source.IndexName); + stream.RetrySourceConsumerAtSeq(source.IndexName, 30); + stream.StartingSequenceForSources(source.IndexName).ShouldBe(30UL); + + NatsStream.StreamAndSeqFromAckReply("ORDERS.99").ShouldBe(("ORDERS", 99UL)); + NatsStream.StreamAndSeq("A.B.C").ShouldBe(("A", 0UL)); + } + + [Fact] + public void GroupD_MirrorAndControlPaths_Behave() + { + var stream = CreateStream(); + stream.SetupMirrorConsumer(); + + stream.ProcessInboundMirrorMsg(new InMsg + { + Subject = "$JS.FC.orders", + Reply = "reply", + Hdr = Encoding.ASCII.GetBytes("NATS/1.0\r\n\r\n"), + }).ShouldBeTrue(); + + stream.ProcessMirrorMsgs(new StreamSourceInfo { Name = "M", Lag = 2 }, [new InMsg { Subject = "orders.created", Msg = [1] }]); + stream.RetryDisconnectedSyncConsumers(); + } + + [Fact] + public void GroupE_SubscriptionsAndInflightCleanup_Behave() + { + var stream = CreateStream(); + + stream.SubscribeToDirect(); + stream.SubscribeToMirrorDirect(); + + var (sub, subErr) = stream.SubscribeInternal("orders.created", handler: null); + subErr.ShouldBeNull(); + sub.ShouldNotBeNull(); + + var (qsub, qErr) = stream.QueueSubscribeInternal("orders.updated", "Q", handler: null); + qErr.ShouldBeNull(); + qsub.ShouldNotBeNull(); + qsub!.Queue.ShouldNotBeNull(); + + stream.UnsubscribeInternal("orders.created").ShouldBeNull(); + stream.RemoveInternalConsumer("orders.updated"); + + stream.SubscribeToStream(); + stream.UnsubscribeToStream(); + + stream.DeleteInflightBatches(preserveState: false); + stream.DeleteBatchApplyState(); + stream.StopSourceConsumers(); + + stream.UnsubscribeToDirect(); + stream.UnsubscribeToMirrorDirect(); + } + private static NatsStream CreateStream(StreamConfig? cfg = null) { cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; diff --git a/porting.db b/porting.db index 3d5bfc76315348cd89da489886131b00922fc6d2..d8edbb528457fe3b022b1d2222b93cb4c721a276 100644 GIT binary patch delta 5183 zcmZ{o3s6&68h~@3fjllZx#1}{2?=jd15&Ht1JwGeTBNQbS`3fysv=VBSX)6Dy4Wsb z6Y#K|`v{iFU z)m7{&aaC8liglmrXw^Qi9Yt$}gX8FYD_)KBe%^(a=(SF?NM+P``Y=_iZBR9N3%G+)%dL-R%L5}Ge+-=g`Vb`i}NwQp1z zD!;IOjfK9jeTC)=TOXP)Y!}dk#^aSV!XPZdAv~f+0+2vNgJ=<*a=cRCb{Fb*Gd9>z zAdH6F7KT@+rKdX59cgg5Kv)gOLs%P#Muvk$g~BLz1?^CUx}5Ecp}Rn^Kxd&aAE)S&`5TcU#Outt?BGwUAsaM3H^v6$@*iIhT>)wu#ZeAH_q15ufx`x_zNR zMP}#2{SAg#t?<@?Q&tOUzoS75aP)SCsqgz zo|{)y3S-H#j!NM_aCeVpXn^Ne3eiM`t}5YAGnEYwHwiPSLygnk5|n9g1$9@k?=t69 zfQ!r087J}-jHbj?Wf>)IWT=^NlNFsTcLNvX98UbCl{%ns?P400Qq2`}G<%O7U{AA$ z*$>%vwwY~U^XNq86my6N#89dQ%2^Z?~UhWGy&G`5%Xzw z5?tCNmKfPfe&rIH0NH!RjV!kTudeJB_p;n4IHrd8#q}DN`9)O^+(FSX`UKy`Z{j!b z#q?@Eoj3C|caIz3%D6nv#+f*tz0clayXg`(o1MbOvSCnvP@F}Lh2Dc=J!F0)8VCP~ zO3p`O8_lvU*wx*CihGUhIlsZYf+oPe4l$PH9C+2&A?91T2H$e#E782mnO}j1gYOj6 zX7kCu{edq-^X{sADVleC;7icF+XL@H^R-uuCcwE)@l~c8FAlB@3Nfk2>lvATvVrlr z{0CU_Yhh3te9$FkP?5?&ml(%W@gN)#M=;Z{F!_iW&TPX7&O9O>vN3a2wZQd@>5$bg zMkp`zi**!OuZyg5^}4u{VY%P%mia%4?=u#>Fps5US$-B?&BQBZ)`0j)ILmaaZgz5p z9x7-b-AXsn59wd%e!7qDflHL3Ll_+28dr6BNx7?-o()xjh7l0R81!(lgBIWcZ3u!= zlVO|^${23x)O5e<7A>fhFwtP7Z2Ga>2Ch1=OFMw?+(Lc4W}{}EK0-*B|8DSa97dF&|WMhL9bB?fpU}N>CJO+ra0=&O(oF%g3Sikf~9bH zEJS(xe#)|Jqn^6XB3{JD}P{nCm93y4Zkji5=&^uBZ z3Gc>9T6h$PQ6H@Ku8zVt zU_t`MvuC2tKXJfrA0$ZefsnH$WN2W8W2C2u#+1Edq(39!Zh3ez^v|{hWAZ6YBx@wj zmlDl>fiSPcm+kDzq3}L=0sdMgbyb;8-+@)jg402KGgt#cVeojOsm29`!AJ-aii9EI zNCYxMSx^}H<9Vq52c|;qG!OHoVF}f?RQXd+j_9sXGe#y8Mdi>KxgdWqo07rFu{=y_ zmi?CnrjqNM<#|y5H?=Rp;NX6-YRc_oL!MN=ywTf=v|3<(#@@MjNijxTM=)| zM?8tl<2z=9JPwps#Jd$jC5y*8= zSkIc_caX^}PD$P-CsOe7RTdv2YlHzBw#%{P-O6_PGtbC&wGNNb53O>+Q1aG0S>|Ch zCUY=5gZF=N9BImqHaUm#%trejfrqdi!-qe2hg|K+l-rL8TKLzF;nQW^DX;Scz@17T z+bO3I!)$!KQ&t}DG;hBETi?JfrZ+c+8=Nu?e7?%!iM3~4IMV8@+f$$|S`JrC)s~}_ zXJf-8R}T+gvJ4AP>S{}a+HYQ-dDS8UJ2{ekK!|wF(gGh(8B8??gLw5di=7yqBCNOQ zD9DKn8_M6aYAjE?Yk0p3Ox->PzY2_A46F7FrpDbh7Q~7~AvR@qjs59T&x&u~Z?r?j zPq=k_bM?8APgabwc)%VP=(lTh|7!P7rc)eUx9wIIa`xNYt2Y+P@7Xi_28rwYqkHzl z8hEKQ!VcD{Rx2EQY@bTh=!m|&3+_rLd&}PFPzc@~{R~muSh_nJi9uqKI3ylPKoXH8 zBpDfrj6y~u4kQIhMaCd$$WzEzBTqSe6(kd!;ee>tZ(8Y^<=Aajv<$a6?GG9Sr7o<|lSe?b-^ zi;!Gou~Lzn*LpIs4-+)qLfuSVx-MB~)`>b=dv7q~(hg`ZX-{hpYd>^nR}I=VN`Q4q z4W%52TCy#GroxoziA$`M$e$-tkuTx*Qda^jDsu_&UYTnIgh+-&Fk1{YFnzfzsIfMA z39=M<0m(;}DYePY*Y~j3hQ7TdgR`Gn3Y&YK7kTCi?_|z_=yc^N$1gYoXy`6->BPW7 z|2_ksS8jdfjL=cn6t3UdN$CPstA7tT6mTG5N5E#Nx#7%I_#4h7o|+Cd6=EpNy6H@U zOAACDEVN+Kdb&~+l(w7BvOn1S7z^ljx<?Z6}!j2NA h5Y|OlCt)3g?b9=jYkw|43XvkD7;!0Ue=gY={XcLfXhQ%1 delta 3822 zcmZvfdvH@_7Ql0#N%Qz}A889Mw@H(}8%k-RKwGf9Uj+ioVr>+Yl9ZN5TWet*AEmOa zg&h{5N)Kf_P%6%Vi%gfLy(}wO5D^hYz*SZSLXl1vXIx}d6dl-;o9Z^sX6E2#Fzc9e7~N_s0wIvFLMurL@2tSX|1!GnIm3|oun zHIz~1i|JU(J~YkuqwLOwCADxO{f<+w)9L=rX6wKgqRP~1KKf~jHX0%15p3#hKRwJG zo#B-zKc7cQpG8TZMoIsQk}gL{pF~L?M@j#*6b((|qbTL2DCuI9bRjG?pR5VPAS}Wm zJfcSoNDN{`Oo&-MSz~Fx1NAp!oY3Saqo8*gPvDrJJPCahMx?{T6(mRd#w==k1bY3% z4&4=GA=LkvlnJAj5qHF(dKuXU^#R@qg2Zs}c%EVccYq{EwB7(&1AlwcqQGq%V}!eb z!FdeJ+m?trYVt~WVi~c4CrE0b;smFFXC&J92T3+`y=Sn3qmn!go!bn9_L}!|l_Wjl zomx(Q2VZ;5>9Bh_$&GxwvYgcZ+M==Q=XhSPA}gVDnjOQlGdw(0O|rsfR(PtK?E1x( z31+RpbhU!4Swa2;hc>yi&@A9yi56Q=F+>flBxVZgiES|G9ji!Q#N_fS(lVqZ+g6hd z@Z3+d!kN`%J#=oB27NWukbKP-bk~qOAeHI|U7fEbDPe`Br;hxXM9qSqwvd_Bjpk`D z5_Q^(*4{d{o%uiqo(|gHoi6Ycj&M^#zUrrh7Z|D!23WzzhWCS7$_YnyQioKoLl{GG zOD%o*k=bAZ{!G%%G+88Zh~>=DjWAFG9(f}4_tZZKX6OYqby z!cv+$0+(MADx>wXruXa>HfmGD=X-^{ksvm_Dm-UodAIH*;JSp|dAIq~d^^8|Z{q9u z0NubB@<}|+eZ%#0XSp}HgIovK!foKHxN@$Po5baEPR_>h?Dy;~_5-$;-OIMpm8^%I z!ltpqn6DtvCCuigh3`K&*d?r27yVUer(5?2q}IbiE93^O8F2itAV5{Xng9cbg;{Vm zU|mG(b0H~c^}!e2LXde)N1&_L8b|9Vz?BN?40x?aD58kk-y@{+)D9rWghY^Qt#xYK zF`?SYe5-pNxR6jt*w<7`*9PmWE31P6dX{Qr#JlErOM>nzG@?Un3%h|`%T}@F+(+DL)gXux6&GIam>yF< z7vPo{HyWQdt}-q)dZ1Vo-A>aaQ?AL$_M6V?=bNfb<@yrS2Gd$ohpEMM&|)^dMkrg_@OdL;5nyvy1>mII*`eNx)?D5?n$B*UeERtcsJXdsg}rM0u|Xs zOKoBm1>}y9s9uj1Z!mB_7_`7&s)9tlHe4)XSdxe3yH^uT1=a1ffJYJgSPY23mNai1 zlqQL%p+DRE$L4mssJ7ds*R}(_L+=J*mNOnUbxXz2G*u>Wsax`bbCX$tj{TAgYI>v` zO~Ev57U9Czl2bk3Bkdmr&tG&+hR_(9gW7(HhwZ0gv*7kQ(F8yAOUEJ(9A%CaD840` zff~YH52kqh%WFQmF&<|D6`-hCdYt7lJWj9OIGkp@Y(;n2o77i?{pY!FJ9$3+%+`R3IyFFkY3)Wmny>hm!ov5^#K*BbOlnc`KsaY2(Ac8PgNr@NVB`FtmhkX>i(&`IYLgYx zUQ%8JdOGH-^>6H7CKz~0NsUaG__Fc_EWDqT14~|31X$xzhDQ{|+m+rSPF<~v){O~x zTa`cm@*XgQ9hB@yYb^t%2`X0M&ixi}aLI5zQAh4jGAZcW9&XW-6Ghn2rldvuA8Aug zMwy6!B^5u6AV+6{%OpF-&0CE(QB`pT!Cbb}4m` zS71(t&@Lq(N+#kr=WgYxNbDt_7!2^%?!g+ugKGwTGvA@8d`D8tg-EbJ7t;uDKVeVR zD#|$8MnijwGF-LQ+mBP>^FA2SPfyx2Bg>K3VBesRRBrk+b^+Lj@LiVptbIFl?!zaM zmhIJN?YNV}4-)Hhb`w5G?C3{(Y)x*okJGAVnA(<)kF8OUz!Uq4t+}(&jyRBH#Hn^R zy2dU4W1FiwQ;<|74M|5b zkW3^C$wu7BNF)ath2$cmkvt?H8H0>P3XpNgcw_=nh!i2k$V6lk@(@yjOh%?4Q;})N zbYuoH6Pbm~M&_thopXQJ4fQ)&?F42*c3Ej&I2qEsvaA%Dhs;M7ARc5PvIzMNvKaX- zvIKb;DMKDnSC*Bxy~Dmq6P=pl@HSE~-ycj<4)c-CQRFdXDe}0w&h7idE9@2Kmi87*YfToCpf}_%gc5)7F}U%e@AuHrMVG6mF8GWz zz3xF68T`jM&wW8V+cGCZFx|O;PziUp7xIhQ<{_Jj@>OWwVAa z!ta=meQ|KMMj&eYfUoK<`w@l?y}n=$6omXP@cM(M?Z@f)a4+P)4_zK%2|Dr{^)${e a?GIGY%?)4rkqTrP5 Date: Sat, 28 Feb 2026 23:22:50 -0500 Subject: [PATCH 6/6] batch36 final deferred blocker notes --- porting.db | Bin 6754304 -> 6782976 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index d8edbb528457fe3b022b1d2222b93cb4c721a276..d08c7130562b9acb102cc06b8496d7767a0f8a9a 100644 GIT binary patch delta 11054 zcmd5?dw3Mp*`NE&&d$tEW^hh`pfI@0|@qZj(Dl=*$MbZmGU-ux<6Aad|WeQyQ+ z`#Y4*(Ek?GFn^XhM3QOv*n89xfgxY-8G*)qLhVzzt@Zjx(78#;HE z8i7ZVbiWWoQ5|UnK{SUsXRF+@jx1<%c8r8$bVnpQA=7u>LRYm%z;$=K2FLr_1$a-U zuMy7I`)c6$t#3Z)qS-eK+I3$(wDkBg;XK2a0O$YlMZ>YzXNBV?pP{v;wRgh5wzZ4V z4ux(}xyIGw;lItRjk|rTqfmy0eu*KE6Bf+o($A9=U5{VMqqmZb{04z>Q$D>;dW3Y) zj*--4!c271Y@hrI(J8y6a?)$wEB2dgbUZhQxsN_Z%hY6QGIBb|MYvO?casbsPJD!> zY4qPhRyXv7S_yLOnw%tkBt_aYog=-CI5CWVfn2pzxoHHFgK7*HRDBg5$T1s_>L0~i1c-~c=#O9WOg zBV@0%z#L_IL)gu)YZMm8u};S#e@`TS>bolprZDFPF+*9u$zD<&E$zMN4&C zG`efH;6le|34cLbc3b&H*^Vkty{ERS(NR|mrvqcqQMshKvB}erTvOZRS=bQJ!Hs!n=y)g@(AvzW{09le8!_>T4w!?YpF0MD?|f;gj~a=M|o0TBqIJj8cxf640k# zD5ubOUnoK7_~%+WmMoCSU#mGeln^_x12_i|0ZSh+wVvP&6!Tc z<9o(h_K}p(h^J&)l1auCK`7{{@5nHG?>I{%FPl~p&oI9y#6#k4aT_}8R9z@F*D{?< zLwDv{sww`qZdDCK|IW2c6;1K7&Gex3t?3?A;#5QN{5;DE7Ts20i9kgKmU#410j1zI z1(xk3s?Vb$(9}Z9c(ZcFN9Y7}ekR=7lLvS0Ewtz)C7s9pGb~PumVi^rJ*rNRf)lmK z63$p0gn|=`EEh?|@&fVwOMZn8x2kdIOd(}K)y0+t;Qxf;J6>$T$BWgmDG^K|aR)u= zvN+;;v0c1VTq4dFOXwmL*`m(m=>7ah%tc(=qMo6KO!@R_G`CG%Fr({Ec!7TC*pnTe=hBLxwpnS=vMpNWuWb05zG2$!GGOa^B z%~B=>jp$VOlLY=-r#gZr7hrRj`W1s#_NWo02SxU%1@uaI(@h)HF!s3VGkCDIM@?o) z4ej2nRxpR-?xs1qKJvt=rULN-Jn!dLV`w|-csG7+>FKh-;?s!xkp$Vsm$SWSe{sFGQ`JjEYiTFYL zLAV+;F?-;LftcNIiU$KTJJm;+ok5wjnOul@)p5y!$@4+9YBN*g2)0~VCoh#U%p*)+ z)2|DEp~mr>xd)B0Zlgll!a8CDV_p8?Shsu<6o*cK5Oj37CN)M`Uo7yG9fLX?9LEtC^6cRTI&Ic5CeVi>|;`*LCIS;sr7k9hSk-EgQ7IQj8^$*oyu$ z-x`Bn@dEp@jhcdzH)@`c)_MMXL#a1|Qg03@wFZ=0TIH~!KY2A_@Xl>l)4K-}UC8z> z_G;rOBetJFDOg{$_?dsY(R(TMUK;4#1HA|4wVd^ugyi*F-__(XsI<|YhJ^_Na<`BN zv@?eKiqXFw&~CJ~PVvt$C{6{6Qv(!#?wv4ZjKFS2d0kp0itEz4uBKfFRW-$5_cmj zjhNRJ+LqWZ1cTz2!pdqX8CY1c5aLV2Y&P_5p>9X>3-y-*5)a$Ls(6N8Pt&8?X{2xD z;~@Li&FEmUE+f5I{}6wdk+Fti2_CG54;1NPB<{=3s9^V<4<}19#bQDvU!#RDl;vC> zyN+2-r;xAl@5s~59~&Z*uU*&T52QJ_wp-Ar8(oR0#AM^praV0cf10cBX7OjAg=-|< zRiL{`1;*5aT40Em%B3W-T+pM@p&7b@J7(zRBrZP}elKL2Y`bVU_Z+bG9Al7WEe2D+ z`m7#{-k7H2VYTwU5}izOQX|1-VU@8{UM&@y$BW4(mj9A_hV5k1=zL-&HPUd}c_D4F zzDz?9kE>Mn1O!+b0Gd)CQLy1 z@iwlGMlWs(i9$cC2tP&Nwc3U5H-`nIKfk3bzDw`u_|iKzo{3=pM2OU5gji^rB)F7= z@(#&sp1?ds-%mY;BhT9Qun_5}*=YaEwiuN2fz65Dl)_>$oP9(Z3X7n96aQ`lMBs^k zkNiXW4Qx4KA=(JbBBftGB7JRsMZAx3P%oNZ5ian%$Va#fL>r4z``G!xt&W~p;GHEW z=?UOLna%`nBN`EJUw&Pb1tTt(1zV$fT);9DSjK@jWIE%3<*9gk^Yz7YR!=mroC_?C z1xu#02v{x{ZU6Q4#WJTS3RvCVcHPEEG|3b`UZTE*3_{i0n~#u0Mat}trG z2N7ic+@$DaibC%-o2;lb&K?omI;&>{ki7hdJu^BPNan{~C#qC0)A#1|+yL}Px{>>L zP*lx#+PC7R3HH;Jy)~!D{$nM>bTF{EM|E#SI?zguuDOOP9~???J(0j{5|}?VVE$Cl ztiwvTTo+C{sM)DK5un-RGPHCjX!fsG`tjgap~nWyf+G!wYs)`rgFpgO?c1QphsxjB4}RTM>-ssbU5jX2#MTJ zFuioDa+2IH@04ygXNluXBl&N*vk;wH+&-M*Aj2RTsQ2X`B9Ay-j*Ln=mDnYc$ zfN0A>w9Ie`xsnQnsH39E)~ zWvP6Fh-OTuaXAdYNtu3&I?i=IH^Lq2Dou$PRXa~%r#F~Kae#3_^LbU98+HCx0q z(~ZJn{x+K4*TG8gh7y%ua8nY>&(W_J(7%Wmu^j8=> z@*r8wGmJX+W~k>taWe$1DvG(P70$u>gyb(nA3m%>I{groCf=A56N_>mhI$=bZpDw~ z$GlJC)@d;(Ih=b_%;8y^~!j zEF->2PDB-EF&_7^u56YfUNaU;qYzx}fECl)(w|r{1(vX?*>!q&{dunJ+WB0#$R-l} zF77CssJttGDXWr6eB1Pz;Kz~uE|tYIb7RVBpZ`r4;08b}U<6qO|bGg|EWxCQWY zz^#C}fO&v2zyQbE7s4 zvaq49xjt#~54V4eNlHnLaim^sDL|Tc`}HZeF;BP`1m}(kk;^V!LUvaEp-!ich!@ro zmSFj7>5Ta#eDdBaz*Z`KiII{^0{+wcwmOy$S)CcEhGD3NeM(x57jwciLfI#;lV*w^ znf41P)sY%vJQF1tY1XrWSF4<`iGV8CEmD^Fm1&o-$>{rT0ncl3Rt9d+cwq*id@jE% z9Ww6`cbIyG8OAN?!rm9PeStedP5g_5#i~@xvn9#w5PxY}CG6o}H12wmH+!pll3}Ho z;EhW{rSA$>Ovl)>Xq4@+pTpy0?H6guph_vT2}`0f+jwZY5)KpPf&nWIohT7`Y;FYV z%XeE>HRe@T)-~5QjsA&QbIjErDv1Nz=7x^!l3w=!>!H);_W#fKQpcrUW9MT7Gv@yS Dq}^0O delta 5128 zcmZ9P3tUvyx5v-f`<$6GGv~z_)ES(C8HP71Xas2dh4|=Ur7}ni zfhKm96@Sez%~xGhEO!3OSLLg$Q1Q_}Eqnh{D^0r(9LFUi{K|xMe%sChe+vJ=()B59eG31t*Z56r^5^`fJX+1| zVPYzo$1AO-%cjpv&8BxuubVc3K9*#%{k_Tif=jC9l4canESxv5aE81^dX-b#)n?dR zLL%T|8cBk>G!n_Abn<6{@Tri@L0+Do$%Yk>RXjU8jocyg?}1!G8dZFxjscq4u@cg^ zw*xENe)1LhOS#3=#bg$L7cYtDnWNUI5+QL8$%MchlB3Dg%|BW=Z}!aDg)^ovcm!U& zp*z{h8RUKurawsHme{UyuG^2NoI%wdOF1o3J1NPZM)U3}5n4I<26v&oWechk?dl|| z_uAD7R88&bII1n}su|Tk+ttVNk{jHzcKH!H7q_bqQI)o2uHr0*!)h@)l{T4)#2)^0KCTR|3~bt}kIXx$3(BwCSe?VdpER*Z#c-HP!z zB+nqDS=EEY$~WyGf=7DIb_u6(YE6t)FGN=Y0n8emRjCmosW(t-`HKVaitQmx50#qwBL+8EK z{ZU0#WBMSZ+6^rVed0cTMOAmKU0tPd9Gy``(3c*Zk2Q6{*;w8PPQ@B$a2(q8!C`27 zg8i^e3*spJU%@CW*9Praz7RBFeN-h*7>TGXflVqc)A`pbGSRoLq8FCIiYQ3a=m+um zt~yssm5W@h;*<0Ffb9)yvn9gpFijNqiSfb;dWb3{3%nkF7OVBsMqUWy)VpC=7X2D7 zC%YeoC1+gGP&$>0@MjPA5g`y>9{~@gxSjB2Pxrmhl;)0S$9uU`u;0M?Zs2}Sb;5#S zbfd6zaeY_j%Awy#n|TlQ2>dZlXQECI^F%qrJ_@5R@z5}ozR428=_j@zpGOe^5fKw& z20l*`oz0vQkf+-sEoY=g^Kw&)&_d@EnQK|uv6pJ9&3F3eoo4w-UN`gY(M>vP|MN2R#+i-D&y*M(Pl^_eQHt8edJ?W~ z_qpKaRka(`URAGyE*m)Uf2f=L5@|krB31i=5-|s6_tidA%~>63fj{+UJXJJ*kMEH*&qFPUY+@*R1n_R6Q5s79%T&=!`U(CL))#E77G21Hr7r|wj z-Z3Xai}rJ;*eAhNujkMoITc1fuXp2*)zxEw`g+|V@|n=MPA`DL<+=jh>vfg&d{JM` z>lRM>S^8Q!Egg~mCA})G1J^Vy2|QPE(XH$C6rP{SGG5Yq@Gg3~Bdp1ds(mR;F4$xu z39xO0-h&8(;Oqu{rb{^6skt3hM{@%n*r+d7#j;Ly5`4Q+|EDUx*-V_tSwSi66kFuN-1$Q^Gwyfvqta z!-)}79ou#c@krl3hGC-J4*A457s?|E>}$zz@U%;{4_2;?w!`m|(G5llh8;dGyM5kK zJ_T1KPXhbPVhAS2UkRgngvuS|)bEt<@XRrXNQ;H3OR269u2Rxr0KTob2ODOTIJ)^{xHq)gMF;yfWp%g6?P z3)d9NQLBVNNzEXbrrYgo+g#%`Zx7sGlNoxe{>nh5e<-=!nG4>{MmR)O84@fnHLB{O zyTr4=e4|VbeJgdY7pL`5{ql9&@7A9!*Db2~H`7V+gzyzamKi1B-(XnS3(p%*5h0LY zGZNfujL(HYan0RzYmJdmaL%Vd>smvBH`W>j;Q7cNPm60tpf4JJe8JG*=nKXwxRMa# zf(9?O(M2`GLEDPX$ST7E>%fq~0Y*bx)@J9{88N(2Zqy8e0}8H#KQ~%q*sz5N<+LUn z{+epP)v<1wp+ZWHQ3$owhObS{3RSg*f2<}8*8lALzYxjg#wIA6YKdpREHl#a9r9`h zL+T3SIDTU}je93(ZZHr#5{`61x-xS^x4-sLjqA(I1&-Dwp{0LRl$zF` z<)o%LWcK|?=d*MO)3OE!eWoHpx21VwhhytrPztTooNb47fyFNkHuo}7;XAsIY~^c14>zB!zV1z?G$xo0SI1crA%B8o zWj{Ub{Z!5R=HMVU5i#he0azH+1Yq1`(9F$I(cbN`|uM6ww z(_|$dNv*#i65bd9HrDliM!x`o5}Pn>G7^;0Kn{WQWX;GGZWbrYxVQ#Q!~ z+dyFlT}fWy_i>x7Ugo+Gu}fsj6N=`D!9kZIknU{ICD*%s@&3peTn5$psAH68$`lM)ow|Zvdwj-3Ne$*5T2)N$jAIQ*tF9RJjF$qEr9{`F<~4ZKMkh6rG|qR?|z9L zEh?Hwnfy@Eb*cvP>*LU$J*j6@3L)^tBNjW9#k#x0*;sb~cD<~|(Lj%S2;|u8@Z&?O z8Pou>iFyr6zi~K1mpcjaPDPnv*6ApEn1v%d)SU!;snV&L^6AikZEam0Uke8YKh4)tq1=*_3ax;r| zQFiD*-a+O_g}chBTa^lVj4jieViC>POmB*{LLZt<&=uVb@P@!!Sv8BFZ98* z%T6D>TIT!;N_T4U?5?HGz499*n)qXMZWN~_s)LmYvfnn`8fO_HO*faD0^$l`1AT=Y z$4xmZG-eFzamtrZS>@2^Z+Kc#Gb{8ZiUvI^dK2s@>Ut~bv*MiSQzlwiQvko7$Gp5e zJ6eHR+0nJP!um%?AJ`dXJA58QcS-w;e zvqI(AwyeM!L|=wT$kI>-NeC&u?Vg?!x~D&hTjR;LR20y_d(G(p^vm$0;_C*lD!$*? zB*nM&9f!(&#i{3%Yl<#gtpB#0kZR1S;#JYgT+KdRgrpXq4);abW1&}zPi9}biXv&S z=By8iL1K|OBpyjX5|Jb%8R>!aM0z19NGj4B>4V&X^kp???|gKk-BQk(OgD)XX}YOy zSH{_XvcAZ0W*bcq`5#ArcJo`u)0idQZybnq^6bTnj%i}0KdsWAUg_^w>F?j<&rtou zyqf7{84sHY7qQy!>B?3IGn@j!d-9H&jFnLEhnfPV%QA4%t1>j9yw5Yau$p>$!w%kK zhP^8?RQS9VuQRp8=-H{RXhp^{sC|!`f^RL)Kn5V0$UtNeG8pkALy#;a8@US^iVQMk8a8JY+0#4>ArJkKBt)KqeyhA@?H>Ad`^E$P{EMl8+Q1(~#-N oKad$nA$x22%n{S6E*=E)m6=|ME^}{SsUE&nU