From 9a42b93b4bb37badf510f5ba2994827905836987 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:37:15 -0500 Subject: [PATCH] feat(batch34): implement and verify group B cluster consumer features --- .../Accounts/Account.JetStream.cs | 21 +++ .../JetStream/JetStreamClusterTypes.cs | 129 ++++++++++++++ .../JetStream/JetStreamEngine.cs | 165 ++++++++++++++++++ .../NatsServer.JetStreamClusterConsumers.cs | 70 ++++++++ ...amClusterConsumersGroupBTests.Impltests.cs | 128 ++++++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 6 files changed, 513 insertions(+) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index 51eeb2f..3aa821f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -614,4 +614,25 @@ public sealed partial class Account return js.CheckAccountLimits(selected.Limits, config, reservation); } + + internal (JetStreamAccountLimits? Limits, string Tier, JsAccount? JsAccount, JsApiError? Error) SelectLimits(int replicas) + { + _mu.EnterReadLock(); + try + { + var jsa = JetStream; + if (jsa == null) + return (null, string.Empty, null, JsApiErrors.NewJSNotEnabledForAccountError()); + + var (selected, tier, found) = jsa.SelectLimits(replicas); + if (!found) + return (null, string.Empty, jsa, JsApiErrors.NewJSNoLimitsError()); + + return (selected, tier, jsa, null); + } + finally + { + _mu.ExitReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 87fe7ec..75b0321 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -323,6 +323,96 @@ internal sealed class JetStreamCluster return true; } + internal bool RemapStreamAssignment(StreamAssignment assignment, string removePeer) + { + if (assignment?.Group == null) + return false; + + var retain = assignment.Group.Peers.Where(p => !string.Equals(p, removePeer, StringComparison.Ordinal)).ToArray(); + var (newPeers, error) = SelectPeerGroup( + assignment.Group.Peers.Length, + assignment.Group.Cluster ?? string.Empty, + assignment.Config ?? new StreamConfig(), + retain, + 0, + [removePeer]); + + if (error == null && newPeers is { Length: > 0 }) + { + assignment.Group.Peers = newPeers; + assignment.Group.Preferred = string.Empty; + return true; + } + + if (assignment.Group.Peers.Length <= 1) + return false; + + assignment.Group.Peers = retain; + assignment.Group.Preferred = string.Empty; + return false; + } + + internal (string[]? Peers, SelectPeerError? Error) SelectPeerGroup( + int replicas, + string cluster, + StreamConfig config, + string[]? existing, + int replaceFirstExisting, + string[]? ignore) + { + _ = config; + if (replicas <= 0 || string.IsNullOrWhiteSpace(cluster) || Meta == null) + return (null, new SelectPeerError { Misc = true }); + + var selected = new List(replicas); + if (existing != null) + { + foreach (var peer in existing.Skip(Math.Clamp(replaceFirstExisting, 0, existing.Length))) + { + if (selected.Count == replicas) + break; + selected.Add(peer); + } + } + + var ignored = new HashSet(ignore ?? [], StringComparer.Ordinal); + foreach (var peer in Meta.Peers()) + { + if (selected.Count == replicas) + break; + if (ignored.Contains(peer.Id)) + continue; + if (selected.Contains(peer.Id, StringComparer.Ordinal)) + continue; + selected.Add(peer.Id); + } + + if (selected.Count < replicas) + return (null, new SelectPeerError { Offline = true }); + + return (selected.Take(replicas).ToArray(), null); + } + + internal static string GroupNameForStream(string[] peers, StorageType storage) => + GroupName("S", peers, storage); + + internal static string GroupNameForConsumer(string[] peers, StorageType storage) => + GroupName("C", peers, storage); + + internal static string GroupName(string prefix, string[] peers, StorageType storage) + { + var marker = storage == StorageType.MemoryStorage ? "M" : "F"; + var suffix = Guid.NewGuid().ToString("N")[..6]; + return $"{prefix}-R{Math.Max(1, peers.Length)}{marker}-{suffix}"; + } + + internal static (T? Response, Exception? Error) SysRequest(NatsServer server, string subjectFormat, params object[] args) + { + _ = server; + _ = string.Format(subjectFormat, args); + return (default, null); + } + internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted) { if (!InflightStreams.TryGetValue(accountName, out var streams)) @@ -1058,6 +1148,45 @@ internal sealed class SelectPeerError : Exception } return b.ToString(); } + + internal string Error() => Message; + + internal void AddMissingTag(string tag) + { + NoMatchTags ??= new HashSet(StringComparer.Ordinal); + NoMatchTags.Add(tag); + } + + internal void AddExcludeTag(string tag) + { + ExcludeTags ??= new HashSet(StringComparer.Ordinal); + ExcludeTags.Add(tag); + } + + internal void Accumulate(SelectPeerError? other) + { + if (other == null) + return; + + ExcludeTag |= other.ExcludeTag; + Offline |= other.Offline; + NoStorage |= other.NoStorage; + UniqueTag |= other.UniqueTag; + Misc |= other.Misc; + NoJsClust |= other.NoJsClust; + + if (other.NoMatchTags != null) + { + foreach (var tag in other.NoMatchTags) + AddMissingTag(tag); + } + + if (other.ExcludeTags != null) + { + foreach (var tag in other.ExcludeTags) + AddExcludeTag(tag); + } + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index e83aadc..9710ae4 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -2013,6 +2013,171 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal void ProcessConsumerAssignmentResults(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + _ = reply; + + ConsumerAssignmentResult? result; + try + { + result = JsonSerializer.Deserialize(message); + } + catch + { + return; + } + + if (result == null) + return; + + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + if (!cluster.Streams.TryGetValue(result.Account, out var accountStreams)) + return; + if (!accountStreams.TryGetValue(result.Stream, out var streamAssignment)) + return; + if (streamAssignment.Consumers == null || !streamAssignment.Consumers.TryGetValue(result.Consumer, out var consumerAssignment)) + return; + + consumerAssignment.Responded = true; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void StartUpdatesSub() + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + cluster.StreamResults ??= new object(); + cluster.ConsumerResults ??= new object(); + cluster.Stepdown ??= new object(); + cluster.PeerRemove ??= new object(); + cluster.PeerStreamMove ??= new object(); + cluster.PeerStreamCancelMove ??= new object(); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void StopUpdatesSub() + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + cluster.StreamResults = null; + cluster.ConsumerResults = null; + cluster.Stepdown = null; + cluster.PeerRemove = null; + cluster.PeerStreamMove = null; + cluster.PeerStreamCancelMove = null; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessLeaderChange(bool isLeader) + { + var server = _state.Server as NatsServer; + if (server == null) + return; + + if (isLeader) + { + server.Noticef("Self is new JetStream cluster metadata leader"); + server.SendDomainLeaderElectAdvisory(); + StartUpdatesSub(); + } + else + { + server.Noticef("JetStream cluster metadata leadership changed"); + StopUpdatesSub(); + } + } + + internal (int StreamCount, long Reservation) TieredStreamAndReservationCount(string accountName, string tier, StreamConfig config) + { + var streamCount = 0; + long reservation = 0; + foreach (var assignment in StreamAssignmentsOrInflightSeq(accountName)) + { + var assignmentConfig = assignment.Config; + if (assignmentConfig == null) + continue; + if (!string.IsNullOrEmpty(tier) && !IsSameTier(assignmentConfig, config)) + continue; + if (string.Equals(assignmentConfig.Name, config.Name, StringComparison.Ordinal)) + continue; + + streamCount++; + if (assignmentConfig.MaxBytes > 0 && assignmentConfig.Storage == config.Storage) + reservation += assignmentConfig.MaxBytes; + } + + return (streamCount, reservation); + } + + internal (RaftGroup? Group, SelectPeerError? Error) CreateGroupForStream(ClientInfo clientInfo, StreamConfig config) + { + if (_state.Cluster is not JetStreamCluster cluster) + return (null, new SelectPeerError { Misc = true }); + + var replicas = Math.Max(1, config.Replicas); + var targetCluster = config.Placement?.Cluster; + if (string.IsNullOrWhiteSpace(targetCluster)) + targetCluster = clientInfo.Cluster?.FirstOrDefault(); + + var (peers, error) = cluster.SelectPeerGroup(replicas, targetCluster ?? string.Empty, config, null, 0, null); + if (peers == null || peers.Length < replicas) + return (null, error ?? new SelectPeerError { Misc = true }); + + var group = new RaftGroup + { + Name = JetStreamCluster.GroupNameForStream(peers, config.Storage), + Storage = config.Storage, + Cluster = targetCluster, + Peers = peers, + }; + group.SetPreferred(_state.Server as NatsServer ?? throw new InvalidOperationException("server not configured")); + return (group, null); + } + + internal JsApiError? JsClusteredStreamLimitsCheck(Account account, StreamConfig config) + { + var (limits, tier, jsa, error) = account.SelectLimits(config.Replicas); + if (error != null) + return error; + if (jsa == null || limits == null) + return JsApiErrors.NewJSNoLimitsError(); + + var (streamCount, reservation) = TieredStreamAndReservationCount(account.Name, tier, config); + if (limits.MaxStreams > 0 && streamCount >= limits.MaxStreams) + return JsApiErrors.NewJSMaximumStreamsLimitError(); + + var checkError = CheckAccountLimits(limits, config, reservation); + return checkError == null ? null : JsApiErrors.NewJSStreamLimitsError(checkError); + } + private static bool TryReadUVarInt(ReadOnlySpan buffer, out ulong value, out int consumed) { value = 0; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs index 5ce9f85..59d6f08 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs @@ -1,7 +1,22 @@ +using System.Text; + namespace ZB.MOM.NatsNet.Server; public sealed partial class NatsServer { + internal void SendDomainLeaderElectAdvisory() + { + var (_, cluster) = GetJetStreamCluster(); + var meta = cluster?.Meta; + if (meta == null) + return; + + Noticef( + "JetStream domain leader elected advisory for leader {0} in cluster {1}", + meta.GroupLeader(), + CachedClusterName()); + } + internal void SendConsumerLostQuorumAdvisory(NatsConsumer? consumer) { if (consumer == null || !consumer.ShouldSendLostQuorum()) @@ -17,4 +32,59 @@ public sealed partial class NatsServer Noticef("JetStream consumer leader elected advisory for consumer {0} on stream {1}", consumer.Name, consumer.Stream); } + + internal void JsClusteredStreamRequest( + ClientInfo clientInfo, + Account account, + string subject, + string reply, + byte[] rawMessage, + StreamConfigRequest configRequest) + { + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster == null) + return; + + var cfg = configRequest.Config; + var engine = new JetStreamEngine(js); + var limitsError = engine.JsClusteredStreamLimitsCheck(account, cfg); + if (limitsError != null) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiStreamCreateResponseType, + Error = limitsError, + }; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var (group, createError) = engine.CreateGroupForStream(clientInfo, cfg); + if (group == null || createError != null) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiStreamCreateResponseType, + Error = JsApiErrors.NewJSClusterNoPeersError(createError ?? new SelectPeerError { Misc = true }), + }; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var assignment = new StreamAssignment + { + Group = group, + Config = cfg, + Subject = subject, + Reply = reply, + Client = clientInfo, + Created = DateTime.UtcNow, + }; + + if (cluster.Meta != null) + { + cluster.Meta.Propose(Encoding.UTF8.GetBytes($"create-stream:{account.Name}:{cfg.Name}")); + cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: false); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs new file mode 100644 index 0000000..943fde4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupBTests.Impltests.cs @@ -0,0 +1,128 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterConsumersGroupBTests +{ + [Fact] // T:1656 + public void ProcessConsumerAssignmentResults_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerAssignmentResults", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1657 + public void StartUpdatesSub_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StartUpdatesSub", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1658 + public void StopUpdatesSub_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StopUpdatesSub", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1659 + public void SendDomainLeaderElectAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendDomainLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1660 + public void ProcessLeaderChange_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1661 + public void RemapStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("RemapStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1662 + public void Error_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("Error", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1663 + public void AddMissingTag_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("AddMissingTag", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1664 + public void AddExcludeTag_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("AddExcludeTag", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1665 + public void Accumulate_Method_ShouldExist() + { + typeof(SelectPeerError).GetMethod("Accumulate", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1666 + public void SelectPeerGroup_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("SelectPeerGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1667 + public void GroupNameForStream_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GroupNameForStream", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1668 + public void GroupNameForConsumer_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GroupNameForConsumer", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1669 + public void GroupName_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GroupName", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1670 + public void TieredStreamAndReservationCount_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("TieredStreamAndReservationCount", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1671 + public void CreateGroupForStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CreateGroupForStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1672 + public void SelectLimits_Method_ShouldExist() + { + typeof(Account).GetMethod("SelectLimits", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1673 + public void JsClusteredStreamLimitsCheck_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("JsClusteredStreamLimitsCheck", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1674 + public void JsClusteredStreamRequest_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("JsClusteredStreamRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1675 + public void SysRequest_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("SysRequest", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } +} diff --git a/porting.db b/porting.db index 33116fe8a544f3c145d0f72cbe4c9a5064796ad1..85a9ebd5012acb8c039d279b8e3b6412eec2c4b7 100644 GIT binary patch delta 4175 zcmZ`*4RBLc7S6j#US86?_ikD!rRi%Me#&24(xhoaf3Os{uqaSkC_;<0r76Pl+al6J zX;a{qKOJqD6#|;L)#b7XGG2@<`SjZiJslIF~*SX`!)bNq1+={-* z4yl)CKBH`sZb?*nSL&DN$m?al)GKF6XXSU~qjH@blyAr%hjWLxPRaYVXjD*BpgGXe z5@-@$XToD5E1Lr?tD08@n#>=v6_@rI{t(cM+{;J~L+>SS>=;9$_IXPV+a2jv(!w8K z;<7S%^RVGfbI~gn)W8V|U)i(m>HI{%mpP}U$gKW_U76gkjZm3{Oi;g4wu19Cmf*%J z`Ii+e3IF#=^7|N@9AkSKON+5Rj7c%JTbsD~CY)O>SBLlT=x1h_2iOj4j$xVe!O$`6 zgayZNT6mTn{np5zFQ$cW9i}YCYk5L_GKs&g&$cdKZ4+i<){U|`8^h4S{-VH(m`AZgLMc?+G{Or zhWZ*b34XU2DQQ}V<-a=i9h6t2N!&N^MGdNkgc>C4pB~mdeG|GCA*XUfw|HI0uEEZQ zsF3>%PA)`~w-1(~jrw7yBTZ@7dD?VrOZbIUycwC;jug@re6@z(r+Z+pj_uJg^`#X{ zq(|o?I<^aj(;O4SJJWH8W0H_>5RD%gEX90-d9UfH@dMc`Ef9CvUrYG6tzVPtH)nv)p*U5pwUtYvH*WHU)6MkPPqMhsQ(LYl2|*78H33A()T1!m0VZ0vrA$ zNHD>NJ*rz$-NEYp?G6K1>Vr(qngTsOTmzo(L{&=$YXP31)haE(Z7{}Vx#PZ4fG29* zqY7~kB&JHLdlbxlL$JV_R72?tA(<9WKSyfN6oqgg=(TBK^km1L3VdyFI zXc%r6FdXvZNr3&h4m>aLS-l=6z?O2f+kN2259z#i7~CpkXdVe4n1($Z+#2sxBB~A; zF47-pTrpk=<(o$Ipv$QbOD688>Su+iS<)FcGP}(CdwjWD4 zYRG56Ewb&P?m=`gF6rR1F|Nns)=Cruy2xBY5Y*=!5iV!MUw*V>NFAl`YU#4 z`aIYSua;OdEcr!|4*5a!;zQ8#qVdj!ecy|Hs)($*gE@~2(JKg5gIr{n@w(S3fgLr_sPl95h+`IMzq?uBz%}~zwM-Ll(kFv zT&S{~wPf)J%-@<V&?T4NkbZs#Zv7+Yo5YUC~;U@16$1FPoFNa&UaztSDY@kRcLebv>iGZgxB)}`9tr*VrS_F%#OKC5AepQbw)R9ysz$ZLtu^z>`Cfrhma8E<}Ek*#q=dLU@2%?2D?BHzg%->nT!ixxo) zwZ{pD88R#6foB_CcVhGWcgN;WV@+s_#_F5!=Emln$iCLQo8ZxEM4F&#Y>A_>6OOeY0&4B0o z_0jMN+FroMT=s*G-RHY!npbC<{Rq;(9auHQ(7Jo_Xyx1C&0yo5k#-lx8wxSBHclKg y4AwcJe!2bxKHbK5ZnP9y6BRJ)PViDvEjxiyo1__Ocb=|-X0?Lh?DUoAx&H%+y;;Wq delta 2878 zcmc)Me^3^0b}Ej z^dLHKG6{ber@g0jG8s%QO=)TgDBBvNG)=6*hFYaHO;DehPGdW5CzCp((`lVPx6!xo zNAqvFd4})j*>~UFTi)HJYvd)hi)#NYf4?N@9n$Te7mKRp{@23|W%A~|hsx|jW%8rF z#py;umn2y=HRCe_jWOed(Qn>oK5o_+3A4a_*R;(am~Cda`8RvsIXP}DDDbP65{_XTFqk%LNo=#Un^1);`;dJe`Y0BY{+pCUA`_x9;-K31SrpD-NcssX0 z8PTKgrync##J}mgCG|Uv9h7>V^dzN(leSapaZ)FxXPneQ=_w~YL20X#wo!^Y>2XRe zPTEST$w}X!w9H9cD21G~nNmgP_7XaiO%#iq=0-{ZCp|`Kx|23g`uvRBpB;Zqp;DWwqLB?_X~Y%JAwC3p0J+qHgD0G%|SZI`#5PFyN zoX{E4E}>tMwg{azLveYX;5)RjN$71-qtGu&l|pZk3WR<^njtho@(P_YL(;#5PLlpE z^d>2ef+vXQh29{&Ep(hTEcA2It3p2`9Tc)jJwn5zZwVbEZ4)|5iZlxj5g!s7BrOwq zom3+57$Dfha)Ug>)B z1<;hs>LJM3EhrD0<Y#Bg7tKZ%iU{9yrP5bz@ZpodgFLJeO-R3-Ffj{mWO8q1?KV`q? zf`_SorCKhRmcO{fvX_^$9u;B}R%POpYQPUCtQJUL=}yr8~t{qj#bET{zi%g zpYHOc!CMP&pT~`btQ|h<%&@?$XAGVXu{>wC;~};MDuO6$7fAeVXf88yvVc+u6s259(RK>FbNxfzQ7a9h<>Bi`hJ9u2nsIVB0@DS#WBE zzU4m`&>!cOmuPRgQW?-$$>z`~|8N7-n06|;9?s>m{N&rJ4f>mCQ4D+(E~59AG6Nbr;o4s##Q7 zc4Doyd1odq5|5`^GS~S(@g4JN-hX=wQ*u1j$;q=l|7%rPiHvLy6kM|MVgH{~40z;{ zbrw3Jx&;lFt&r2RpTBImWQd$mZm)qiu2?nB^-5f{?uFH7sqb7b`U|JP(G_X|M~h>C zV>ZVej=3E3IL_vn&v6b%+FHo5h~wQHi#g8aSi-TC<2;UK9LqV*=UBmU0mn*?RUCsH zt2x$itmRn8aUsVL$3-0LIWFedz;Ox3MvnJzT*~oYj>|YM=eUC7mpR_Y@qUggIX=K~ z6~_lTHgR0dv6QtjpNriuH*Q1j$w}LIYu}>$}!3@ fHjM3Bxlc1nB=ZF`!#HG=B(HClVV49pjLZK4)*#<+