From 9ef04cc28a7ae022220aae52eab105f1818e59cf Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:58:12 -0500 Subject: [PATCH] batch33 task4 implement group C cluster stream features --- .../JetStream/JetStreamEngine.cs | 264 ++++++++++++++++++ .../NatsServer.JetStreamClusterStreams.cs | 26 ++ ...reamClusterStreamsGroupCTests.Impltests.cs | 116 ++++++++ porting.db | Bin 6758400 -> 6762496 bytes 4 files changed, 406 insertions(+) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index dcded03..6e96f24 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1421,6 +1421,270 @@ internal sealed class JetStreamEngine(JetStream state) if (!isLeader) stream.ResetClusteredState(); } + + internal StreamAssignment? StreamAssignment(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return null; + return accountStreams.TryGetValue(streamName, out var assignment) ? assignment : null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal StreamAssignment? StreamAssignmentOrInflight(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (cluster.Streams.TryGetValue(accountName, out var accountStreams) && + accountStreams.TryGetValue(streamName, out var current)) + return current; + + if (cluster.InflightStreams.TryGetValue(accountName, out var inflight) && + inflight.TryGetValue(streamName, out var info)) + return info.Assignment; + + return null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal IEnumerable StreamAssignmentsOrInflightSeq(string accountName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var seen = new HashSet(StringComparer.Ordinal); + var items = new List(); + if (cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + foreach (var (streamName, assignment) in accountStreams) + { + seen.Add(streamName); + items.Add(assignment); + } + } + + if (cluster.InflightStreams.TryGetValue(accountName, out var inflightStreams)) + { + foreach (var (streamName, inflight) in inflightStreams) + { + if (seen.Contains(streamName) || inflight.Assignment == null) + continue; + items.Add(inflight.Assignment); + } + } + + return items; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal IEnumerable<(string Account, StreamAssignment Assignment)> StreamAssignmentsOrInflightSeqAllAccounts() + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var results = new List<(string Account, StreamAssignment Assignment)>(); + foreach (var account in cluster.Streams.Keys.Union(cluster.InflightStreams.Keys, StringComparer.Ordinal)) + { + var seen = new HashSet(StringComparer.Ordinal); + if (cluster.Streams.TryGetValue(account, out var accountStreams)) + { + foreach (var (streamName, assignment) in accountStreams) + { + seen.Add(streamName); + results.Add((account, assignment)); + } + } + + if (cluster.InflightStreams.TryGetValue(account, out var inflightStreams)) + { + foreach (var (streamName, inflight) in inflightStreams) + { + if (seen.Contains(streamName) || inflight.Assignment == null) + continue; + results.Add((account, inflight.Assignment)); + } + } + } + + return results; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal void ProcessStreamAssignment(StreamAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + cluster.Streams[accountName] = accountStreams; + } + + var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty; + assignment.Responded = true; + accountStreams[streamName] = assignment; + cluster.RemoveInflightStreamProposal(accountName, streamName); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessUpdateStreamAssignment(StreamAssignment assignment) + { + ProcessStreamAssignment(assignment); + } + + internal void ProcessClusterUpdateStream(StreamAssignment assignment) + { + ProcessUpdateStreamAssignment(assignment); + } + + internal void ProcessClusterCreateStream(StreamAssignment assignment) + { + ProcessStreamAssignment(assignment); + } + + internal void ProcessStreamRemoval(StreamAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + var accountName = assignment.Client.ServiceAccount(); + var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty; + if (cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams.Remove(streamName); + if (accountStreams.Count == 0) + cluster.Streams.Remove(accountName); + } + cluster.RemoveInflightStreamProposal(accountName, streamName); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessClusterDeleteStream(StreamAssignment assignment) + { + ProcessStreamRemoval(assignment); + } + + internal void ProcessConsumerAssignment(ConsumerAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return; + if (!accountStreams.TryGetValue(assignment.Stream, out var streamAssignment)) + return; + + streamAssignment.Consumers ??= new Dictionary(StringComparer.Ordinal); + assignment.Responded = true; + streamAssignment.Consumers[assignment.Name] = assignment; + cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessConsumerRemoval(ConsumerAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (cluster.Streams.TryGetValue(accountName, out var accountStreams) && + accountStreams.TryGetValue(assignment.Stream, out var streamAssignment)) + { + streamAssignment.Consumers?.Remove(assignment.Name); + } + cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessClusterCreateConsumer(ConsumerAssignment assignment) + { + ProcessConsumerAssignment(assignment); + } + + internal void ProcessClusterDeleteConsumer(ConsumerAssignment assignment) + { + ProcessConsumerRemoval(assignment); + } + + internal ConsumerAssignment? ConsumerAssignment(string accountName, string streamName, string consumerName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return null; + if (!accountStreams.TryGetValue(streamName, out var streamAssignment)) + return null; + if (streamAssignment.Consumers == null) + return null; + return streamAssignment.Consumers.TryGetValue(consumerName, out var assignment) ? assignment : null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs index f41cfe1..70176f6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs @@ -9,4 +9,30 @@ public sealed partial class NatsServer return Math.Max(1, config.Replicas); } + + internal void SendStreamLostQuorumAdvisory(Account? account, string stream, string[]? peers = null) + { + _ = account; + _ = peers; + Noticef("JetStream stream lost quorum advisory for stream {0}", stream); + } + + internal void SendStreamLeaderElectAdvisory(Account? account, string stream, string leader) + { + _ = account; + Noticef("JetStream stream leader elect advisory for stream {0}, leader {1}", stream, leader); + } + + internal bool RemoveStream(Account? account, string streamName) + { + if (account == null) + return false; + + var (stream, _) = account.LookupStream(streamName); + if (stream == null) + return false; + + stream.Delete(); + return true; + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs new file mode 100644 index 0000000..3fcbf5f --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs @@ -0,0 +1,116 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupCTests +{ + [Fact] // T:1618 + public void SendStreamLostQuorumAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendStreamLostQuorumAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1619 + public void SendStreamLeaderElectAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendStreamLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1620 + public void StreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1621 + public void StreamAssignmentOrInflight_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentOrInflight", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1622 + public void StreamAssignmentsOrInflightSeq_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentsOrInflightSeq", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1623 + public void StreamAssignmentsOrInflightSeqAllAccounts_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentsOrInflightSeqAllAccounts", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1624 + public void ProcessStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1625 + public void ProcessUpdateStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessUpdateStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1626 + public void RemoveStream_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("RemoveStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1627 + public void ProcessClusterUpdateStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterUpdateStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1628 + public void ProcessClusterCreateStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterCreateStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1629 + public void ProcessStreamRemoval_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamRemoval", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1630 + public void ProcessClusterDeleteStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterDeleteStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1631 + public void ProcessConsumerAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1632 + public void ProcessConsumerRemoval_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerRemoval", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1633 + public void ProcessClusterCreateConsumer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterCreateConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1634 + public void ProcessClusterDeleteConsumer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterDeleteConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1635 + public void ConsumerAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ConsumerAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } +} diff --git a/porting.db b/porting.db index 8e2ce0e7bc7daf73a2adb83aeceff88db365c992..0e22117c957f92ba2629733d06823ffc539b7360 100644 GIT binary patch delta 3036 zcmaKse^6A{702I?eQ)pn;yn=LH!LB7B?%x1i-Z`AiBV$(`F%iPcVVG4Hc3G{34sGd3{1F@7B-Uw0M@Snx8JlS{VAL*6IFW3T(Gi^QSt9WF8s7l}`dNy^`8 z_NMsJl0AYTN**~=dO6hB+ua-LQZ?iHjF{)OYgIy{^HqUrXT|NbFYK>A5SAtn%t{%? zpeS{SRa%w(U{|O+)Yr$8nz{S2<(jw~uv{Z|{g!LsE@b4(Qj*&y2*NZ~YtnWvR}B`@ z%H2B40e2Pjdrc~Le@)maUlwewwoSIxwA!z2qS|rEY4qt*ib#6A6leS}Ub-o@C|$y= zmEXzTyOs-bH*L89cN3QLb9dBo9o+rQa_!s=8dHfHb`J)l@;Q22B;Tz$(_9bCO&s&=lrOtp=xR#R=|s@7C(T&*)zD_2#f z0ZI!w*yKH{4;#FG{W%8hB4C zHiqAk);Luj=sPIwy>)$l$$Z)PQutDhTi5Lw(>d%Xf=AdOh|dcfv{cV~?ti;RoM#-v z_VvmI+i&Fa(k$h8#eC{L$tFoUo0duXaki9NPO&_y{eUs0%vV|FGk$c6IYc_5#hLM3 znmWZ)+Pz9mjf83rv(t*sucn$Hh|dfxTbsQ?5|b!-gdL^uekGN>y$+o|9%0MKKEh5@ z$&i{yQx|xpzmBj+%t~?d+f@wo4hG`|5j^y}U$SJ%o@9&)?_{YoKFZ^pQMC3m4cH+{nwby5@KjOi(H zS3SRV|J?O!=N}#KvmtxFa>TYn&XW?zdnot2#@(}wiS&!JIrr-nPBS-MskmQ9H($#h zeXx}ub?HFOS_&*tmeA0f>>#~Sqh`{+_f3LQZ8UDk0 z_NEmskBAuZMdlRmRp)m^qf?hy7TvwbTtt`Ti1YN(58h&*(YyJ{X6q*Cizq~Mm)H&J zJ1H-Ob#lJVawD0_xA`Oweas%Cu}gffsr}K1yy`MPWT{Wir}1Ag8_is1C6U=#SJ=0y z-os&bAtk4tq2HucofO~1C1^gH@1s#>Q$t(McFBVCC`TAiLPYi&_K z_*Qx=Jw9D*q1$Qk>9ng+ub0$LYUR&X1)@NIl*^t#1AX?4n(M%{37SeI^>t&QOP&-?Xp47%HXy7eifp#g{^c_)@og zNX``YlU?sAQ^jF1g=_xzu1y}04oVIoQ_8GKdcL$f6xgw|FVwF#(3($!E5wyFl+=?W zuB9z=-AgmdR#vVk=e!up*Du1Sf58i}JiX%x7SistfJ*z5dc5~{P|OVD;e?(Wo;(l# zG&GhcXs^V568D1VBTvwM!`}=4tF&XK~C$vSwDl%=fD;i`2JfP-1zybG!S2 XGuUEtniko^>Pe2}8yDg4eGLBtG9S-E delta 1763 zcma*nX>3$g6bJD4?tAmzo&9#h?h9Rpt#%VyHe1$0fwpYbwkR{D!vr=^>Pja){5>*b$4RkQzE{SyZE9pv%qO)OkkKJ~D>Fj*R&n_GL zWwu(T#%@uh1_v1~g~ixZ=MPdHl{i^LYA{*vUF&UZ#PphHR@K23U3ao6(O0E6l^c?J zQ9Z4;sfX3w>UMR#!3EnY8~Y;I9Y(n-OP~WzmPm59NT6zk3FB%siy&*U^>4|tW$U&& z?M3?iBr9*WJ(iL~>k35Cp{N{D_%ln)N(>9-79+PK6fGLbl_6Ij$}L1x6N(lfstiS? zh~|f)`6C_kkjoF{<{}!0D6LMLgD|{>*@(hhn1v|3g%aatC(BLeUDhEfcteIQz}zyu zHaRxmXkF?owd@%hV4#2sCeVzb0X?xV9SwH5LUO!q-(cHmoni5q1y9#?lc5$W*=!0* zGY`#*Gmd=9h0OBujKL0m-bAg(`FT2LO_@YHj`J+ajZ~uO!U^6)Y=3$Z-S6Q}@}J?+ z^w$Z_scD*)MW3AHzft=d^CW8PK%uDoVqS@osOR3sdWEy zhl6gO;y;b_KB*?2<`%kFs<B&M-c@&OM6J z>JQFQ^t91A4jTNLXNDeQ5<*CAr|OiaF$oGDT5tFaMCzjweo}N4-2PyiwOT{LaW% z!yfy1PPnkGkM~i>t%%XlIyGG43iJx{ALcIl@hXlu z;tkwZ*B4x+rfWPO=jEWDYkUWFN@{Y9~F;tW7YMR*71s;{UON8luIjJ+^8A5fBMc5DhUf2A+dh zh=X`YfJ8`wWJrNjNQ1GE4jGUMS>S?kFdinrM3@BG@I2%|F62Qz6u=8G8KyuX6v0%O z2Gd~%%!C)A7)oFk%!WBI7v{lyD1`;E5XxW?EQTde4i!)dOJNx-hZV3As^BGf8LFWM zY9V-LufVIY8rhshOOHBNK=1REE2Wd^(2zt?OaCUfoP@*!D1-&J9me1(c>+b zgMT|=Sk`z~+vN9Y<0fyx|Gh`K#kbkpy?L$Lro^)h(#5*T#@K^NS)q}=Mn>eipY8tv DZtP40