From 8d5964efff18922d500932609182d6da88dad1c7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:18:37 -0500 Subject: [PATCH] feat(batch17): port inbound, header, and service-import client features --- .../ClientConnection.InboundAndHeaders.cs | 239 ++++++++++++++++++ .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 5 +- .../ClientConnectionStubFeaturesTests.cs | 44 ++++ .../Protocol/ProtocolParserTests.cs | 14 + porting.db | Bin 6668288 -> 6672384 bytes 5 files changed, 298 insertions(+), 4 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs new file mode 100644 index 0000000..25c9ce6 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs @@ -0,0 +1,239 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Text; +using System.Linq; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + private const string JsAckPrefix = "$JS.ACK."; + private const string GwReplyPrefix = "$GNR."; + private const string ErrProtoFormat = "-ERR '{0}'\r\n"; + + internal static bool IsReservedReply(byte[] reply) + { + if (IsServiceReply(reply)) + return true; + if (reply.Length > JsAckPrefix.Length && Encoding.ASCII.GetString(reply, 0, JsAckPrefix.Length) == JsAckPrefix) + return true; + return reply.Length > GwReplyPrefix.Length && Encoding.ASCII.GetString(reply, 0, GwReplyPrefix.Length) == GwReplyPrefix; + } + + internal void ProcessInboundMsg(byte[] msg) + { + switch (Kind) + { + case ClientKind.Client: + ProcessInboundClientMsg(msg); + break; + case ClientKind.Router: + case ClientKind.Gateway: + case ClientKind.Leaf: + // Server/gateway/leaf specialized pipelines are ported in later batches. + LastIn = DateTime.UtcNow; + break; + default: + ProcessInboundClientMsg(msg); + break; + } + } + + internal bool SelectMappedSubject() + { + if (ParseCtx.Pa.Subject is null || ParseCtx.Pa.Mapped is { Length: > 0 }) + return false; + return false; + } + + internal Subscription? SubForReply(byte[] reply) + { + _ = reply; + return Subs.Values.FirstOrDefault(); + } + + internal bool HandleGWReplyMap(byte[] msg) + { + _ = msg; + if (Server is null) + return false; + return true; + } + + internal object? SetupResponseServiceImport(INatsAccount acc, object? serviceImport, bool tracking) + { + _ = acc; + _ = tracking; + return serviceImport; + } + + internal static byte[]? RemoveHeaderIfPresent(byte[] hdr, string key) => + NatsMessageHeaders.RemoveHeaderIfPresent(hdr, key); + + internal static byte[]? RemoveHeaderIfPrefixPresent(byte[] hdr, string prefix) => + NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, prefix); + + internal static byte[] GenHeader(byte[]? hdr, string key, string value) => + NatsMessageHeaders.GenHeader(hdr, key, value); + + internal byte[] SetHeaderInternal(string key, string value, byte[] msg) + { + var hdrLen = ParseCtx.Pa.HeaderSize; + var existingHeader = hdrLen > 0 && msg.Length >= hdrLen ? msg[..hdrLen] : Array.Empty(); + var body = hdrLen > 0 && msg.Length > hdrLen ? msg[hdrLen..] : msg; + var nextHeader = NatsMessageHeaders.SetHeader(key, value, existingHeader); + + var merged = new byte[nextHeader.Length + body.Length]; + Buffer.BlockCopy(nextHeader, 0, merged, 0, nextHeader.Length); + Buffer.BlockCopy(body, 0, merged, nextHeader.Length, body.Length); + + ParseCtx.Pa.HeaderSize = nextHeader.Length; + ParseCtx.Pa.Size = merged.Length; + ParseCtx.Pa.HeaderBytes = Encoding.ASCII.GetBytes(nextHeader.Length.ToString()); + ParseCtx.Pa.SizeBytes = Encoding.ASCII.GetBytes(merged.Length.ToString()); + return merged; + } + + internal static byte[]? GetHeader(string key, byte[] hdr) => + NatsMessageHeaders.GetHeader(key, hdr); + + internal static ReadOnlyMemory? SliceHeader(string key, byte[] hdr) => + NatsMessageHeaders.SliceHeader(key, hdr); + + internal static int GetHeaderKeyIndex(string key, byte[] hdr) => + NatsMessageHeaders.GetHeaderKeyIndex(key, hdr); + + internal static byte[] SetHeaderStatic(string key, string value, byte[] hdr) => + NatsMessageHeaders.SetHeader(key, value, hdr); + + internal bool ProcessServiceImport(object? serviceImport, INatsAccount? acc, byte[] msg) + { + _ = serviceImport; + _ = acc; + return msg.Length > 0; + } + + internal void AddSubToRouteTargets(Subscription sub) + { + _in.Rts ??= new List(8); + foreach (var rt in _in.Rts) + { + if (ReferenceEquals(rt.Sub?.Client, sub.Client)) + { + if (sub.Queue is { Length: > 0 }) + { + rt.Qs = [.. rt.Qs, .. sub.Queue, (byte)' ']; + } + return; + } + } + + var queueBytes = sub.Queue is { Length: > 0 } q ? [.. q, (byte)' '] : Array.Empty(); + _in.Rts.Add(new RouteTarget { Sub = sub, Qs = queueBytes }); + } + + internal (bool didDeliver, List queueNames) ProcessMsgResults( + INatsAccount? acc, + SubscriptionIndexResult? result, + byte[] msg, + byte[]? deliver, + byte[] subject, + byte[]? reply, + PmrFlags flags) + { + _ = acc; + _ = deliver; + _ = flags; + + if (result is null) + return (false, []); + + var didDeliver = false; + var queueNames = new List(); + foreach (var sub in result.PSubs) + { + var mh = MsgHeader(subject, reply, sub); + if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty(), mh, msg, false)) + didDeliver = true; + } + + foreach (var qgroup in result.QSubs) + { + if (qgroup.Count == 0) + continue; + var sub = qgroup[0]; + if (sub.Queue is { Length: > 0 } q) + queueNames.Add(q); + var mh = MsgHeader(subject, reply, sub); + if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty(), mh, msg, false)) + didDeliver = true; + } + + return (didDeliver, queueNames); + } + + internal bool CheckLeafClientInfoHeader(byte[] msg, out byte[] updated) + { + updated = msg; + if (ParseCtx.Pa.HeaderSize <= 0 || msg.Length < ParseCtx.Pa.HeaderSize) + return false; + + var hdr = msg[..ParseCtx.Pa.HeaderSize]; + var existing = GetHeader(NatsHeaderConstants.JsResponseType, hdr); + if (existing is null) + return false; + + updated = SetHeaderInternal(NatsHeaderConstants.JsResponseType, Encoding.ASCII.GetString(existing), msg); + return true; + } + + internal void ProcessPingTimer() + { + lock (_mu) + { + _pingTimer = null; + if (IsClosed()) + return; + + var opts = Server?.Options; + var pingInterval = opts?.PingInterval ?? TimeSpan.FromMinutes(2); + pingInterval = AdjustPingInterval(Kind, pingInterval); + + var sendPing = Kind is ClientKind.Router or ClientKind.Gateway; + if (!sendPing) + { + var needRtt = Rtt == TimeSpan.Zero || DateTime.UtcNow - RttStart > TimeSpan.FromMinutes(1); + sendPing = DateTime.UtcNow - LastIn >= pingInterval || needRtt; + } + + if (sendPing) + { + var maxPingsOut = opts?.MaxPingsOut ?? 2; + if (_pingOut + 1 > maxPingsOut) + { + EnqueueProto(Encoding.ASCII.GetBytes(string.Format(ErrProtoFormat, "Stale Connection"))); + CloseConnection(ClosedState.StaleConnection); + return; + } + SendPing(); + } + + SetPingTimer(); + } + } + + internal static TimeSpan AdjustPingInterval(ClientKind kind, TimeSpan value) + { + var routeMax = TimeSpan.FromMinutes(1); + var gatewayMax = TimeSpan.FromMinutes(2); + return kind switch + { + ClientKind.Router when value > routeMax => routeMax, + ClientKind.Gateway when value > gatewayMax => gatewayMax, + _ => value, + }; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 2bb31fc..edecabb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1758,10 +1758,7 @@ public sealed partial class ClientConnection // features 477-496 and 487-503: see ClientConnection.SubscriptionsAndDelivery.cs - // features 512-514: processServiceImport, addSubToRouteTargets, processMsgResults - - // feature 515: checkLeafClientInfoHeader - // feature 520-522: processPingTimer, adjustPingInterval, watchForStaleConnection + // features 497-515 and 520: see ClientConnection.InboundAndHeaders.cs // feature 534-535: swapAccountAfterReload, processSubsOnConfigReload // feature 537: reconnect // feature 569: setFirstPingTimer diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs index 592898c..19e14f3 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs @@ -239,4 +239,48 @@ public sealed class ClientConnectionStubFeaturesTests c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeTrue(); c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeFalse(); } + + [Fact] + public void InboundAndHeaderHelpers_GroupB_ShouldBehave() + { + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("_R_.A.B")).ShouldBeTrue(); + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("$JS.ACK.A.B")).ShouldBeTrue(); + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("$GNR.A.B")).ShouldBeTrue(); + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("foo.bar")).ShouldBeFalse(); + + var c = new ClientConnection(ClientKind.Client) + { + ParseCtx = { Pa = { HeaderSize = 0 } }, + }; + + var before = DateTime.UtcNow; + c.ProcessInboundMsg(Encoding.ASCII.GetBytes("data")); + c.LastIn.ShouldBeGreaterThan(before - TimeSpan.FromMilliseconds(1)); + + c.Subs["sid"] = new Subscription { Sid = Encoding.ASCII.GetBytes("sid"), Subject = Encoding.ASCII.GetBytes("foo") }; + c.SubForReply(Encoding.ASCII.GetBytes("inbox")).ShouldNotBeNull(); + + var header = ClientConnection.GenHeader(null, "X-Test", "one"); + Encoding.ASCII.GetString(ClientConnection.GetHeader("X-Test", header)!).ShouldBe("one"); + ClientConnection.GetHeaderKeyIndex("X-Test", header).ShouldBeGreaterThan(0); + ClientConnection.SliceHeader("X-Test", header).ShouldNotBeNull(); + + var replaced = ClientConnection.SetHeaderStatic("X-Test", "two", header); + Encoding.ASCII.GetString(ClientConnection.GetHeader("X-Test", replaced)!).ShouldBe("two"); + ClientConnection.RemoveHeaderIfPresent(replaced, "X-Test").ShouldBeNull(); + + var prefixed = ClientConnection.GenHeader(header, "Nats-Expected-Last-Sequence", "10"); + ClientConnection.RemoveHeaderIfPrefixPresent(prefixed!, "Nats-Expected-").ShouldNotBeNull(); + + c.ParseCtx.Pa.HeaderSize = header.Length; + var merged = new byte[header.Length + 5]; + Buffer.BlockCopy(header, 0, merged, 0, header.Length); + Buffer.BlockCopy("hello"u8.ToArray(), 0, merged, header.Length, 5); + var next = c.SetHeaderInternal("X-Test", "three", merged); + Encoding.ASCII.GetString(next).ShouldContain("X-Test: three"); + + var result = new SubscriptionIndexResult(); + result.PSubs.Add(new Subscription { Subject = Encoding.ASCII.GetBytes("foo"), Sid = Encoding.ASCII.GetBytes("1") }); + c.ProcessMsgResults(null, result, "hello\r\n"u8.ToArray(), null, Encoding.ASCII.GetBytes("foo"), null, PmrFlags.None).didDeliver.ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs index 70fb822..8d399e2 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs @@ -181,6 +181,20 @@ public class ProtocolParserTests Encoding.ASCII.GetString(c.ArgBuf!).ShouldBe("foo 1"); } + [Fact] + public void ClientConnection_InboundDispatchAndPingIntervalHelpers_ShouldBehave() + { + var c = new ClientConnection(ClientKind.Client); + var before = DateTime.UtcNow; + c.ProcessInboundMsg(Encoding.ASCII.GetBytes("hello")); + c.LastIn.ShouldBeGreaterThan(before - TimeSpan.FromMilliseconds(1)); + + ClientConnection.AdjustPingInterval(ClientKind.Router, TimeSpan.FromHours(1)) + .ShouldBeLessThan(TimeSpan.FromHours(1)); + ClientConnection.AdjustPingInterval(ClientKind.Gateway, TimeSpan.FromHours(1)) + .ShouldBeLessThan(TimeSpan.FromHours(1)); + } + // ===================================================================== // TestParsePub — Go test ID 2602 // ===================================================================== diff --git a/porting.db b/porting.db index a228dc61c15d0ac63189e544dc9d9acf597c2331..6278a21852447c74f9249bd352e9dd2cbf8af694 100644 GIT binary patch delta 3711 zcma);e^6A{702JbZ+~<57a%I^2Gl4>_^}HMEEpn?RxvF|6GLkP=q~%HSVhW4W|~@l z>FC61YsGR8T~F*oDVqT$m2? z`Qy9yp6@;PoO5?+Y68Bg9r)H5WgEw-%G=^LlD3OG^HNFe!H!h1BbB(?nSzTj)Tua* z@Wx2daFy&g5Ca^#N!E}yID3CXj#lgUBVlk*coBxbFg=B>>`abIn9NUXSzYIPg2vU&PwVDsuPl$LUl}1k5M&9>Jh41Nll`1OKJku zHc36yH4bu{B>4arosznbDp6A7sHEExKPu_AWK4bus*&%cynH96@||Iu^gLwVAx9gr zkGi==k!ftz6G=Tmbx%@LsBTH>F{;lc^$67!Nll{alhg#Nvyys<>V%{oaLF8ZOp^C; zu|ZPfsA?tUN97jx+$BDRqL&&tF>;g~CeS)YbfRyJloHDK!l<8Ij#ET(&bcs3QCTEK zP#MI}$H|Lf)YuA>Pl+!aedjl5P#MI|XE^5yqNJ^&x}82E;XFtE^fvtm{gn38ZhDfo zLU<=_fSPGj7X15Tnyr{AF2sF8qr~VB={HQxJlrH^3ATLW2G~kW3D9`UwOPEX@fqu+n;= zu3A*}(o-r*H841C$~2LOGe^%XR81$Wm)^Vu#s*lV_=gYJbrpm!3!e@7xSb`-a`3maDR8ynXItN68d%Z6 z7^HNvmGfJ~ZJq2Go!MgpQx{u1zXAnatOwGj^i~-B1&e^yr&+vgwe&O_30^O+1+V8l zGn;1M^5TbQSOS5T-{8ryrvXtsk*C+3KG@VmoA|YRFcoz;>R6fw7zEz)=t;I}VO^GcVMg=VyyFw>d%PC!A-4 z@Omk32_*F91+2$RFN59MeXt&@zM*A*GBo>@5zupi4Z~%dJ{kV@dwJ6j_prk~`olej zz8)jVThG4W?odXx$FvRFTCH36nQl-Y%@^M@cR4@Mbv2lcuO>q5*u^6fpX2TK3v=K2OW+WVmKq8T4NEEUhS%IuXqD9Aa zjCmN|&%sWN_AwK9?^uEpIos+xmSrS3WPySG_O2!P41xo@;-D8xm*2b*@`o&ukoT>{ z2lOP)g>u-EBx}Gc!746Pz>40Jp9E?5EWZl&DDd9=_bjpw9*G~ZAXelBBo>K7 zRw1jAHOPy|T4Ws(k0c=LkwoMrBne4IQjk={hS-rbBpq=e8Av9Qg*cIHBnR1myo}@` zuOJ(dJR~0}Kwd?@gM1g+glt9%kuAt;$oG)%BSlCtvK1*oet^7=yn$>(-b6}~A0j_O zwj*yLZzE+$IZ}b_Kz1T7#En#n@gtru2NNT>dd?VcDApff=fbXssdNr)rRIJ0U(}o` zRav33e%dAMv}WemooKP=c?&O@f>j4-4BR?S!vj{!Ls~i0 zWKKoCp9G!O%98Pt$Iz^G<%W*W$_~Uw2Qn%0?S4KUhVo`F6BROpEU4`gT*0dp`Kq5c z@RG@;5=dWgfnA~ECx9fnk^)r-okQk=9DW?i@%H#oD><|{IWQM#G zxjrEnuUycH`@tk^KetEGZ21fn`8=V{8(-o)N5+DDD?`Qm1BDM=v$R0XEVGSwhYE+b z&axP2f7P|r1lqG?c@+CxK_mHd@ZvuzGZVPTW5||IK(TLUC<7jvZh(M eo*nje3GTU@)`YaUcx`*oVv$SQf&&n^uPMQcx$H zbQGP5A6hwx*DP;X9cvgOr>UmfkYh78LnKyKDv0b<;stuxb&vALXJ)_qzUTIycek>h zt0Xa5wHQh(Tcxnkw7#7s zvJG^go#iJ;k~G3C$MRxdiO=u%6|+8dqNLenFE3kMD4xwMvs2Qqtex#OBF?h0jGdvH zN@po=V`DH&?2q5Z*{ zId@eUt$(uT*&0`c)MXxa8FkjIF4QTrI#FMkbqV#cSr<_U%({TuVbxtZrNSY*~8s64ZNN6j$nH`GM4&Y@Dx>OkFIm$Rtb>+_be_tJ=o78|`rL6ES z<*9P4qex0MJB4ZkY6|5C)GP}IblTagjx?GB>YfP2*&>yPCt8-#l4xxhz2Mcbv%T6V zYVTFg>Eg7cn^Tq!{5zO6!)UvvUS}+i&fHLQ>3oc)RA<~&K?4s&f)}FTE{KL07y?6K zn2~XFxTlj6PDZ4W|AIa;xPZo<3wjg_?WRi?_0YC<3|Y4Vt2(tRo2GW?QIvB@ucLFD z?ZMTLBwMGR8hWqUo%$APTreVyf_s^F>4~9%ab5bh(EAGhV0yarl;F0iPQI)|EX2VG zh=;o&0TN*(+yhCF3@I=QQsG`04QX&6q{A4vAI8Es$bbi6JWPOzkO^5Z2`0l7co4E7 z2d2U_m<|uY!!QG8!Xq#X9)-u?ahMIcFbC$sJeUs)AP=5^Ct)Ex1y92>@GRuRA}D~x numlRB2$mYjmy3UCv2i6=Qj-