From 91627ecefb808dee1efe0ad19abe6c9ea73d1672 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:40:41 -0500 Subject: [PATCH] feat(batch34): implement and verify group C cluster consumer features --- .../JetStream/JetStreamClusterTypes.cs | 104 +++++++++++++ .../NatsServer.JetStreamClusterConsumers.cs | 140 ++++++++++++++++++ ...amClusterConsumersGroupCTests.Impltests.cs | 80 ++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 4 files changed, 324 insertions(+) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 75b0321..c5d149e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -413,6 +413,110 @@ internal sealed class JetStreamCluster return (default, null); } + internal static byte[] EncodeStreamPurge(StreamPurge purge) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(purge); + var result = new byte[payload.Length + 1]; + result[0] = (byte)EntryOp.PurgeStreamOp; + payload.CopyTo(result.AsSpan(1)); + return result; + } + + internal static (StreamPurge? Purge, Exception? Error) DecodeStreamPurge(byte[] buffer) + { + try + { + return (JsonSerializer.Deserialize(buffer), null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static byte[] EncodeMsgDelete(StreamMsgDelete deleteRequest) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(deleteRequest); + var result = new byte[payload.Length + 1]; + result[0] = (byte)EntryOp.DeleteMsgOp; + payload.CopyTo(result.AsSpan(1)); + return result; + } + + internal static (StreamMsgDelete? Delete, Exception? Error) DecodeMsgDelete(byte[] buffer) + { + try + { + return (JsonSerializer.Deserialize(buffer), null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static byte[] EncodeAddStreamAssignment(StreamAssignment assignment) => + EncodeStreamAssignmentWithOp(assignment, EntryOp.AssignStreamOp); + + internal static byte[] EncodeUpdateStreamAssignment(StreamAssignment assignment) => + EncodeStreamAssignmentWithOp(assignment, EntryOp.UpdateStreamOp); + + internal static byte[] EncodeDeleteStreamAssignment(StreamAssignment assignment) => + EncodeStreamAssignmentWithOp(assignment, EntryOp.RemoveStreamOp); + + internal static (StreamAssignment? Assignment, Exception? Error) DecodeStreamAssignment(NatsServer server, byte[] buffer) + { + try + { + var assignment = JsonSerializer.Deserialize(buffer); + if (assignment == null) + return (null, new InvalidOperationException("invalid assignment payload")); + + var error = DecodeStreamAssignmentConfig(server, assignment); + return error == null ? (assignment, null) : (null, error); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static Exception? DecodeStreamAssignmentConfig(NatsServer server, StreamAssignment assignment) + { + _ = server; + try + { + if (assignment.ConfigJson.ValueKind == JsonValueKind.Undefined || + assignment.ConfigJson.ValueKind == JsonValueKind.Null) + { + assignment.Config ??= new StreamConfig(); + return null; + } + + var cfg = JsonSerializer.Deserialize(assignment.ConfigJson.GetRawText()); + assignment.Config = cfg ?? new StreamConfig(); + return null; + } + catch (Exception ex) + { + assignment.Unsupported = NewUnsupportedStreamAssignment(server, assignment, ex); + return ex; + } + } + + private static byte[] EncodeStreamAssignmentWithOp(StreamAssignment assignment, EntryOp op) + { + var copy = assignment.CopyGroup(); + if (copy.Config != null) + copy.ConfigJson = JsonSerializer.SerializeToElement(copy.Config); + + var payload = JsonSerializer.SerializeToUtf8Bytes(copy); + var result = new byte[payload.Length + 1]; + result[0] = (byte)op; + payload.CopyTo(result.AsSpan(1)); + return result; + } + internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted) { if (!InflightStreams.TryGetValue(accountName, out var streams)) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs index 59d6f08..f08f9aa 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs @@ -87,4 +87,144 @@ public sealed partial class NatsServer cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: false); } } + + internal void JsClusteredStreamUpdateRequest( + ClientInfo clientInfo, + Account account, + string subject, + string reply, + byte[] rawMessage, + StreamConfig config) + { + _ = rawMessage; + JsClusteredStreamRequest(clientInfo, account, subject, reply, rawMessage, new StreamConfigRequest { Config = config }); + } + + internal void JsClusteredStreamDeleteRequest( + ClientInfo clientInfo, + Account account, + string stream, + string subject, + string reply, + byte[] rawMessage) + { + _ = rawMessage; + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster?.Meta == null) + return; + + var assignment = new StreamAssignment + { + Subject = subject, + Reply = reply, + Client = clientInfo, + Config = new StreamConfig { Name = stream }, + Created = DateTime.UtcNow, + }; + + cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-stream:{account.Name}:{stream}")); + cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: true); + } + + internal void JsClusteredStreamPurgeRequest( + ClientInfo clientInfo, + Account account, + NatsStream? stream, + string streamName, + string subject, + string reply, + byte[] rawMessage, + StreamPurgeRequest request) + { + _ = stream; + _ = streamName; + _ = rawMessage; + _ = request; + + var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredStreamRestoreRequest( + ClientInfo clientInfo, + Account account, + object request, + string subject, + string reply, + byte[] rawMessage) + { + _ = request; + _ = rawMessage; + var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal bool AllPeersOffline(RaftGroup? group) + { + if (group == null || group.Peers.Length == 0) + return false; + + foreach (var peer in group.Peers) + { + if (GetNodeInfo(peer) is { Offline: false }) + return false; + } + + return true; + } + + internal void JsClusteredStreamListRequest(Account account, ClientInfo clientInfo, string filter, int offset, string subject, string reply, byte[] rawMessage) + { + _ = filter; + _ = offset; + _ = rawMessage; + var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamListResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredConsumerListRequest(Account account, ClientInfo clientInfo, int offset, string stream, string subject, string reply, byte[] rawMessage) + { + _ = offset; + _ = stream; + _ = rawMessage; + var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerListResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredConsumerDeleteRequest( + ClientInfo clientInfo, + Account account, + string stream, + string consumer, + string subject, + string reply, + byte[] rawMessage) + { + _ = rawMessage; + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster?.Meta == null) + return; + + cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-consumer:{account.Name}:{stream}:{consumer}")); + var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredMsgDeleteRequest( + ClientInfo clientInfo, + Account account, + NatsStream? stream, + string streamName, + string subject, + string reply, + StreamMsgDeleteRequest request, + byte[] rawMessage) + { + _ = stream; + _ = streamName; + _ = rawMessage; + _ = request; + var response = new ApiResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs new file mode 100644 index 0000000..26e406b --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs @@ -0,0 +1,80 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterConsumersGroupCTests +{ + [Fact] // T:1676 + public void JsClusteredStreamUpdateRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamUpdateRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1677 + public void JsClusteredStreamDeleteRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1678 + public void JsClusteredStreamPurgeRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamPurgeRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1679 + public void JsClusteredStreamRestoreRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamRestoreRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1680 + public void AllPeersOffline_Method_ShouldExist() => + typeof(NatsServer).GetMethod("AllPeersOffline", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1681 + public void JsClusteredStreamListRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1682 + public void JsClusteredConsumerListRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredConsumerListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1683 + public void EncodeStreamPurge_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1684 + public void DecodeStreamPurge_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1685 + public void JsClusteredConsumerDeleteRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredConsumerDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1686 + public void EncodeMsgDelete_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1687 + public void DecodeMsgDelete_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1688 + public void JsClusteredMsgDeleteRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredMsgDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1689 + public void EncodeAddStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeAddStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1690 + public void EncodeUpdateStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeUpdateStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1691 + public void EncodeDeleteStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeDeleteStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1692 + public void DecodeStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1693 + public void DecodeStreamAssignmentConfig_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeStreamAssignmentConfig", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); +} diff --git a/porting.db b/porting.db index 85a9ebd5012acb8c039d279b8e3b6412eec2c4b7..aad29c48994f7d177924edbc9908040d5a457437 100644 GIT binary patch delta 2917 zcmZveYiv_x9LCQ%YfseR`7F@Sr2+@j{fY}8w zU|hGgUHjj^uYdLN6YoDj@^WOgF zv~SyD0~u|gd%XD|$BACR_LM)>d$%`YShy-^!;lgmACtL{b zXu|ct?Mt{ITqNNFa2*Nfhigf=Znzf`u8W(%aqAPd6OAhq&Ih+pzM>VJ2F+E$!qM9? zDO+|*!f~EHkt8!+wh3o>3%SM$IdsV`&C$dwLQaRYOkSHQyi;9d#t0UDgtNY7&9>~e zOgGEsTGQ92)y7|qn+*RLb{MSsh+I@ByrR$cMs0e|Avl(eT{5%Dvu^XtUe8Nnly?8o zRL##P+h0wN@iQx{%FDe`Nu3)=m$RyK{poTRbuOaH&GJUg>ReyCoJpPQO_wvObCfP; zQ0Kzwa(ZKxgs*Ucd5Mw2zU*2wDA*6_pPb>P%`dMPJLt**VQH&AW2~U~sOc6LR@4AghobtS zwkawC^_rskpf)M07phTF1l6FZFjSqQLQo48)dN+ns326Cq5@D;pbFNDen_YCVK-Ek zqPn2&EVENSdr)vqWJin_@EzfbWz)BUn-`21sQrxKk_W#N<{9uKB6_14J1-9lApWia zS|G}ohjmv!BJbb14!YjW9;UWqjA>`isVXm{T0e8sm}afgQcSS`8<*T{46u!KB3pyz zEli@W?M$RoU3fVhU{jT#UKyvo-j_$#0L!H}f-IY^2U!8l31Y0#zO6x4uJq8vdr&OM znv+_UksEs0bR}YqM@rk*_;PI=zuY{ZZaiwtq{BU|fo>jO;2_ruyO~Z!8KdgZ-F4g< z^~DfdL3>19akxyU9kWov76^#xhABwO- zp8T6LCex`h$wucR%t&>$`U&E!Dy5{e2UvhI59u7_e%EfKn*(fe@}ir4R`MUiHEHNW z$C@D&=jSN1P!6H$ts_>*&nEt}JZGZD$rnu;WP^12RnO>BE%eYJGtt({(PiWeu@aI>cs;?qcWdAqMoo0F1x{%<}CaixAW4JGhv3F(*rIYa1_W7ssD+=Q`^x-3;9L zvlFLH6mD9=4ZztGt{=`KAKzeIv{z^Pg1Z(!jF!%`7RnWpRLoONfmB2;x79^$ZBhZf zZ^O&Q8X?PsZpuSBm!wP~OU@f39W-bxuW_^Kq+O~Z(P?wE_Q%^_@pv~hZSXXTkrdaL z;(AjYrMPg43#GW86c80)oAw%q3}5O`Fe-i9vrl&A^WTdU z`oXa$zPvrO#3c>VBA3*Xba(&fl8Q%f8!9Q44(!}`&H}pipb$(1MPL#r29v=Q zFcnM#_kro)elP=+02e3)WndF2TQ>+PzRQSC%_7@608DGf_ktTJOvuS8t^n&3!VYb0ykI()`RCj zBk;%@&o%XyXV|&-NGCOy>I9MZ^LCC3VuoBo-Rtn*DMNi;b|s&yBj>cB&UL2C@#>r} aIX7}nbLt!&sYv}Vap#Kq5p1#F2neV{{r56C}oe?{w@o`8EIVoZfT3 zbGzL(cJEUfp>?=+OplQRNF4gXiKazeu!sjiE%$Nr$o_iF~mp{?2PPS3zV@+KyfLF>^|K|V%OwK z^84habm^<0!x)vqZjN`MQ}d;CD<_pBN<`VNY*5xJOKF`DE+e;;RVPsI1(8A3#mrm2 z(f+Y~>HkCVCH8M5=Rmc1a9?VhrE0ZQyDil&s=pxIOx;PH*r!i z`&g$zZL*ARu~eHaRimY9uvGPyYLlhfXsPNf6@@Q~;X+++8GM7fFADc;^)<`rtCp&E zaK&GDT$L&ZZf%NXPRHDIsw`k*Dd!8}rnN8XPMTA#ub^4yMJD}NtqUi<({TfrpC$ge z$TDVD={`p9y{tb==hx`y_vQly=11XNGr|Mgj8XsEl8zi+Ke>aOpQq6%@o@$-%RyL8A$$0+a|;` zb8sV4s+sDL`rF)KTs_WmNAfQFT^!O@V>qs`piHk6y7G+!^Yj^s&3A7wP#_NC!DigO zq5k>-)g+GyMDm3iY2$^FPZTFjZR5-6@MoHr`dWE_y4(3MbKKF+U!e4!c#Lo0f_67? z8(rRwx2|@c8T*8?u*FS!J9p8x4nBnXI=G)6@8H7*UDVL=-zGgBylT+6xmd+JJUg~d z!gDRIBuSp9O{L$Twx>|nJG_E={`Pw*cfLoX%lkQ}@!|eGsW!~#QRTT80?duZVv zo@OrHw1<~lmUftP)cR9`)0nWAt9X9Z*+$dld|@Xn%c;JTkHV#zaiWv^AIv7vtxj&I z(kTi3cer>TZzVPjZ3T~b?DWSzKHBVPb&F1IAK^w;?8V1R4&t7Fx1T#{DDiA6`PSQC zGT#uIobOH>I8$MyKnO}bz&oku`iKW-b<(5*+(C6k3B#zj%&wFBAg`mt$314tN%ZAG z9*A`titL@a)y&T-q<%E$xcdqkW(q z)%Iv_nGa*U(Lc9?9>bk1jwi;&R;G&{Q9}2UnTW$|FF8}#Tq;UtnRL2btRq{Rs3C_p z$zg0w6ODOsSFoSqD-}_hq9wYIZgnva^-g9kY7B}*!&WA2GGpxqehzjq^Y@_n-Esj* zkPIo13J<{$7z!>J25uM*9^>6|@8S(Z6MPa^!;%_y?$8!Gj@l0LH<%$=w(f#3MN%n7^J5}>D*<19eUn(fzwQtszRc&r)u@9Pih)VLr>y(=( z-fLca-3w_j0({_ybQlSv;9&^BXvly}$bv^88yX3kzWpEQShr y4xWc4uoRX-5SBwFtbmnJWvsovYEPl!Vs)g7)p?ZOr!6*eee%z;QJ1`WoAxh%K@>y)