From aeeb73f699918e01e097d86145c3d957c94cb375 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:06:37 -0500 Subject: [PATCH 1/6] chore(batch17): start batch and stage group-a ids --- porting.db | Bin 6660096 -> 6660096 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index fe79732859070d5084235a02c845bc9fe5992f23..baf14ed0780af9840bc2426e35c0ba932d853112 100644 GIT binary patch delta 4249 zcmb_dZBSI#8NMIvE(`ZwSmFZPT+9N3f?jvoRTf0hRf7`R_!WssU4exKzoCHXIGr}^ z-kGD5KS?{*<8+!78#Ckd!%{_V4RIjTnfz#4U8i+yor)|VUrF#ZZ92A-Hhp)Y#|7^{ z_Qx~(KD*~R&--=nb`PE7y8*pD4t9rn!J%DnuuZ+}dzxW{b+fo3!hR3G6=7Ft!x5Gk zUQd0E+us~$33PM>nngZ``+v;NFbr2k*)%XGWmCZND4PVfjIs$}M#>&C4Eva}abPo) zjRD*KS4u~L-lQx7Y>2WEU{@%60PG@V!@$mIXQ$Xv?gfTnUZOt`VAYg80#-)ZEU;qA zJ_9CG_9-w)8~TLx_8FxE%on_dO+DQ7DoKzy@v%57j*Iuiu=I{}R(eC~mR^&NNq&Bs zf56}2KjuG#)1-&Hr+t2b`vbd8b5vMG6&!RNdN=D2rvPX z0cJo7AXU3`S!iXm?ui+xpQqNPoHh?9drbw#y@p>V?d5)lbL}j`zvvY%bKH-#1HTrk z*d%DW6TfpsSd*?A)I%8zyIO5hY&jfj)qZwG$YXK%2L>B%&o`yxe_a)FbKHuv#O`!% zaF%Xxm4)imR**PC-&Sn79B0LzKH+avVZn9RggkkHBABPPSQxfMRBZX+fzMtOD(3{7 zLxKf=bxp`$BzRE`fI#_{VtXzssOlGv&I#Vyw%mg6_6xc81rDKXwHb1%2|5&81t>g9 z1@`s}%kUaacr#8ptqG2W)u=YL36yTNQ?V698?gR2f@{73yCxvJA6^$$EXuAx^+R@U zC?~4G4TFLe&s-OZ=Zw0?PO_)=PH94S+Ee1^WvN5c7^Qf@wgkTxHckRr3x!{ zj>?6L6tAeqK;a3sDz>7iN9U+asKT@8uw6Y0hs%R)ifwiD@Sn!yetOu7KNy$u;}!6% zwWvox;SIs#>5eMC9G4%*DGDd#m5UUG>S0h+1Wzh9C8{_wArH(cVk@?1QZ8CJ8|~PX ze8!Lk-AjfwC<^XV+oPY~QN&n5{H%ODPXG4oLPu!YO0_MTV=yqEV`NsoPIHX4*^F;L zlJnwi-7Z9AL~;;Wj>vO}*b&J^BoC2%L{=bDfJh-CD-l_RND(5%h^$6r4I&C6YY}lE z;zXnb5xB&S$T~#UBT|Zp2az&F$`RRsNChGr5vfF^3Xx5SY(~V3$QDGlBC-vU=Mkw! zWIG}|5UD}r1w?8Q`8p!sK;%V4b|SJ1k==;wL1Zr?FCnrIkvc?PMr1!CuORX&BH!#M z2e@9g81p^8Lf)D7g|?>0_g^U!+GuPKr1AOe;S=@k9c`^0fzD2gw)t1SkJ778G3IBI z+fAb;xADGlqv0b%b<%^Rjr=9Pm>cBwuw(2l<{@(`?dNGbrJK@z@!#S_ajP&XydkVe z{VlxNM^et1|6@KAeWNQWM=Yt#CNImf`S_=|eC1-CT}dha{g6-AzB}Z5HVuV+D0~ow z>ruD?g&R@WkHSs;;pU{Fb=k}BKp_cq6~hnZWrq~X=`S()c|q%b*L&Q5F4;%*5vcAbPWS%TTC{!-lYRfDw8 z-qO}#_cwMm9kQ2r?5!PbC)@4b3j6Uu*P*uNPJ4UcMDyVjt@ie|jxLweRjN4S-WP6H zF!TI&9RuvGZ1TkJuUqo{)jIdjCbEF%bqw(ByIe6&zv34Y)TM2DCSGN}OG}@^R-OCL zUG*nm2(B_1@)n)zXPyjUNmGs)cy$b>6KSwn$6zXv2AgyYCKG8;rDHJh#9&D~6q`Y% zjsgCDP2*aee$#JW`r{lk*{EYO{-lbfZ&HPh!B`>#~_kOgEAe1ktYUA s-z1NY0sinke?m8@RL5jEkyWhMF~B7m^Y84crTnwG%Cx&b4ixeK15j05cK`qY delta 1898 zcma*neNYr-90%}y_U?eYdv=fGRboNp1O!Z%zWb%frC_3g3Qt^?shtz zX{K?k{8^g8q{U2$=pgIs_%e;X*i5J6L~5D%iq#mQjZRVay%(MFzvkvM^Sj;Oe1FgL z?9MhH+iYltj+PuYN^8y0TXR@-2RopfF0p(@Gdo0uS~f;z(5xO-tzYb6eI_2UUEnpY zT32JEtA?fyuuM|#Gud`SG9-x)uCwdZIKalR%d~BPB}QL*;%Rs4>PGBUFvR&07{ zRTXPP(y&1FDXQc^^$Dt&K(!NNTu3elqZAOjj;X1YoLMUz%s$ksP5AXhnYr(h+sDg3lQ3d19t#4}= z!2m`uK^TNX1en1BR=usA*RlBTC5!dCwZzh7J{ge~ZVJmWy=lx4KcFEg%uAnl@ODuU z^ol)v0yAo6?FHJtmuKsL?d2k)s1jomH8{c}DXo(aAz!&siuBCa>Y@ae<5{gH3=zd7 zs_x`}P*k@R=(N-8`*Ng&Yfhq#lauXqw<|Q*;!(6344&rBp_%U>aFWiqkmpQnsL`|^ z`GFN`Nm?};^R*^5Aq|mw*?ykKsLG4Y_zq+Ct^<5%V74JstD@OQc?yXKc@|Zv;n>7g zC5oC)i6*Kz$diLRDb*JHcd{tBlN|^7T-vlJ{=v~43Z==4R*AMWNtOI#$#sb5QPkhq zNyi~RI&^v|+9Lnj+^+`LM&e<)(njUw57OGMDUMLuQf(<3Cwi*YgnWNm?=|Hf^*6rk%c<%(_RxAOG%(ULTwCd1n%fmx+O=EC0qW0+ z4lK=1yhkz9>ED%9|4!uftWSX?NQS|X0z)7b9558pARRJb7-T{g42Nt`VFcvBNXUge z$cIrd8VXOVIE;rvcmgIs5fsBjcoHVTWOxcnU*d|05fz6#dC3dqpnOgH>#%hI!3od3#kRzo6daIL6WvZBuJ{|2JA z860}?Zs*C!l66AFXAu$MYr;;2tumc8Ei_&Vo$+3u)9YL4^)2%HD!snNUSE~hSKZ~SF&^Va-YwWh$Q4p2@79t0 MCD*&RxiSs^0hM8zT>t<8 From 1baba5ac0e7a8862bd2cc1d95ac365fb651d19dc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:12:58 -0500 Subject: [PATCH 2/6] feat(batch17): port subscription and delivery client core features --- ...ientConnection.SubscriptionsAndDelivery.cs | 482 ++++++++++++++++++ .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 2 +- .../ClientConnectionStubFeaturesTests.cs | 87 ++++ .../ClientTests.cs | 7 + porting.db | Bin 6660096 -> 6668288 bytes 5 files changed, 577 insertions(+), 1 deletion(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs new file mode 100644 index 0000000..927c304 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs @@ -0,0 +1,482 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + private const string MqttPrefix = "$MQTT."; + private const string ReplyPrefix = "_R_."; + private const int MsgScratchSize = 1024; + private const int MaxDenyPermCacheSize = 256; + private const int MaxPermCacheSize = 128; + private const int PruneSize = 32; + private static readonly TimeSpan StallMin = TimeSpan.FromMilliseconds(2); + private static readonly TimeSpan StallMax = TimeSpan.FromMilliseconds(5); + private static readonly TimeSpan StallTotal = TimeSpan.FromMilliseconds(10); + + internal Exception? AddShadowSubscriptions(INatsAccount? acc, Subscription sub) + { + if (acc is null) + return new InvalidOperationException("missing account"); + _ = sub; + return null; + } + + internal (Subscription? shadow, Exception? err) AddShadowSub(Subscription sub, object? ime) + { + _ = ime; + var copy = new Subscription + { + Subject = sub.Subject.ToArray(), + Queue = sub.Queue?.ToArray(), + Sid = sub.Sid?.ToArray(), + Qw = sub.Qw, + Client = sub.Client, + }; + return (copy, null); + } + + internal bool CanSubscribe(string subject, string? queue = null) + { + if (Perms is null) + return true; + + var checkAllow = !((IsMqtt() || Kind != ClientKind.Client) && subject.StartsWith(MqttPrefix, StringComparison.Ordinal)); + var allowed = true; + + if (checkAllow && Perms.Sub.Allow is not null) + { + var result = Perms.Sub.Allow.Match(subject); + allowed = result.PSubs.Count > 0; + if (!string.IsNullOrEmpty(queue) && result.QSubs.Count > 0) + allowed = QueueMatches(queue, result.QSubs); + + if (!allowed && Kind == ClientKind.Leaf && SubscriptionIndex.SubjectHasWildcard(subject)) + { + var reverse = Perms.Sub.Allow.ReverseMatch(subject); + allowed = reverse.PSubs.Count != 0; + } + } + + if (allowed && Perms.Sub.Deny is not null) + { + var result = Perms.Sub.Deny.Match(subject); + allowed = result.PSubs.Count == 0; + if (!string.IsNullOrEmpty(queue) && result.QSubs.Count > 0) + allowed = !QueueMatches(queue, result.QSubs); + + if (allowed && MPerms is null && SubscriptionIndex.SubjectHasWildcard(subject) && DArray is not null) + { + foreach (var deny in DArray.Keys) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(deny, subject)) + { + LoadMsgDenyFilter(); + break; + } + } + } + } + + return allowed; + } + + internal static bool QueueMatches(string queue, IReadOnlyList> qsubs) + { + if (qsubs.Count == 0) + return true; + + foreach (var qsub in qsubs) + { + if (qsub.Count == 0 || qsub[0].Queue is not { Length: > 0 } q) + continue; + + var qname = Encoding.ASCII.GetString(q); + if (queue == qname) + return true; + if (SubscriptionIndex.SubjectHasWildcard(qname) && SubscriptionIndex.SubjectIsSubsetMatch(queue, qname)) + return true; + } + return false; + } + + internal void Unsubscribe(INatsAccount? acc, Subscription sub, bool force, bool remove) + { + if (!force && sub.IsClosed()) + return; + + lock (_mu) + { + if (remove && sub.Sid is { Length: > 0 } sid) + Subs.Remove(Encoding.ASCII.GetString(sid)); + sub.Close(); + } + + _ = acc; + } + + internal Exception? ProcessUnsub(byte[] arg) + { + var args = SplitArg(arg); + if (args.Count is < 1 or > 2) + return new FormatException($"processUnsub Parse Error: {Encoding.ASCII.GetString(arg)}"); + + var sid = Encoding.ASCII.GetString(args[0]); + lock (_mu) + { + _in.Subs++; + if (!Subs.TryGetValue(sid, out var sub)) + return null; + + // Max-delivery based deferred unsub is not modeled yet, so unsubscribe immediately. + Unsubscribe(Account, sub, force: true, remove: true); + } + + if (Opts.Verbose) + SendOK(); + + return null; + } + + internal bool CheckDenySub(string subject) + { + if (MPerms is null || MPerms.Deny is null) + return false; + + if (MPerms.DCache.TryGetValue(subject, out var denied)) + return denied; + + var (np, _) = MPerms.Deny.NumInterest(subject); + denied = np != 0; + MPerms.DCache[subject] = denied; + if (MPerms.DCache.Count > MaxDenyPermCacheSize) + PruneDenyCache(); + return denied; + } + + internal byte[] MsgHeaderForRouteOrLeaf(byte[] subj, byte[]? reply, RouteTarget rt, INatsAccount? acc) + { + var msg = new List(MsgScratchSize) + { + (byte)(rt.Sub?.Client?.Kind == ClientKind.Leaf ? 'L' : 'R'), + (byte)'M', + (byte)'S', + (byte)'G', + (byte)' ', + }; + + if (acc is not null && rt.Sub?.Client?.Kind == ClientKind.Router) + { + msg.AddRange(Encoding.ASCII.GetBytes(acc.Name)); + msg.Add((byte)' '); + } + + msg.AddRange(subj); + msg.Add((byte)' '); + + if (rt.Qs.Length > 0) + { + if (reply is { Length: > 0 }) + { + msg.Add((byte)'+'); + msg.Add((byte)' '); + msg.AddRange(reply); + msg.Add((byte)' '); + } + else + { + msg.Add((byte)'|'); + msg.Add((byte)' '); + } + msg.AddRange(rt.Qs); + msg.Add((byte)' '); + } + else if (reply is { Length: > 0 }) + { + msg.AddRange(reply); + msg.Add((byte)' '); + } + + var pa = ParseCtx.Pa; + if (pa.HeaderSize > 0) + { + msg.AddRange(pa.HeaderBytes ?? Encoding.ASCII.GetBytes(pa.HeaderSize.ToString())); + msg.Add((byte)' '); + msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString())); + } + else + { + msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString())); + } + + msg.Add((byte)'\r'); + msg.Add((byte)'\n'); + return msg.ToArray(); + } + + internal byte[] MsgHeader(byte[] subj, byte[]? reply, Subscription sub) + { + var pa = ParseCtx.Pa; + var hasHeader = pa.HeaderSize > 0; + var msg = new List(MsgScratchSize); + if (hasHeader) + msg.Add((byte)'H'); + msg.Add((byte)'M'); + msg.Add((byte)'S'); + msg.Add((byte)'G'); + msg.Add((byte)' '); + msg.AddRange(subj); + msg.Add((byte)' '); + + if (sub.Sid is { Length: > 0 }) + { + msg.AddRange(sub.Sid); + msg.Add((byte)' '); + } + + if (reply is { Length: > 0 }) + { + msg.AddRange(reply); + msg.Add((byte)' '); + } + + if (hasHeader) + { + msg.AddRange(pa.HeaderBytes ?? Encoding.ASCII.GetBytes(pa.HeaderSize.ToString())); + msg.Add((byte)' '); + msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString())); + } + else + { + msg.AddRange(pa.SizeBytes ?? Encoding.ASCII.GetBytes(pa.Size.ToString())); + } + + msg.Add((byte)'\r'); + msg.Add((byte)'\n'); + return msg.ToArray(); + } + + internal void StalledWait(ClientConnection producer) + { + if (producer._in.Tst > StallTotal) + return; + + var ttl = OutPb >= OutMp && OutMp > 0 ? StallMax : StallMin; + if (producer._in.Tst + ttl > StallTotal) + ttl = StallTotal - producer._in.Tst; + if (ttl <= TimeSpan.Zero) + return; + + var start = DateTime.UtcNow; + Thread.Sleep(ttl); + producer._in.Tst += DateTime.UtcNow - start; + } + + internal bool DeliverMsg(bool prodIsMqtt, Subscription sub, INatsAccount? acc, byte[] subject, byte[] reply, byte[] mh, byte[] msg, bool gwReply) + { + _ = acc; + _ = subject; + _ = reply; + _ = gwReply; + + if (sub.IsClosed()) + return false; + + QueueOutbound(mh); + QueueOutbound(msg); + if (prodIsMqtt) + QueueOutbound("\r\n"u8.ToArray()); + + AddToPCD(this); + return true; + } + + internal void AddToPCD(ClientConnection client) + { + Pcd ??= new Dictionary(); + if (Pcd.TryAdd(client, true)) + client.OutPb += 0; + } + + internal void TrackRemoteReply(string subject, string reply) + { + _ = subject; + _rrTracking ??= new RrTracking + { + RMap = new Dictionary(StringComparer.Ordinal), + Lrt = TimeSpan.FromSeconds(1), + }; + _rrTracking.RMap ??= new Dictionary(StringComparer.Ordinal); + _rrTracking.RMap[reply] = new RespEntry { Time = DateTime.UtcNow }; + } + + internal void PruneRemoteTracking() + { + lock (_mu) + { + if (_rrTracking?.RMap is null || _rrTracking.RMap.Count == 0) + { + _rrTracking = null; + return; + } + + var now = DateTime.UtcNow; + var ttl = _rrTracking.Lrt <= TimeSpan.Zero ? TimeSpan.FromSeconds(1) : _rrTracking.Lrt; + foreach (var key in _rrTracking.RMap.Keys.ToArray()) + { + if (_rrTracking.RMap[key] is RespEntry re && now - re.Time > ttl) + _rrTracking.RMap.Remove(key); + } + + if (_rrTracking.RMap.Count == 0) + { + _rrTracking.Ptmr?.Dispose(); + _rrTracking = null; + } + } + } + + internal void PruneReplyPerms() + { + var resp = Perms?.Resp; + if (resp is null || Replies is null) + return; + + var maxMsgs = resp.MaxMsgs; + var ttl = resp.Expires; + var now = DateTime.UtcNow; + + foreach (var k in Replies.Keys.ToArray()) + { + var r = Replies[k]; + if ((maxMsgs > 0 && r.N >= maxMsgs) || (ttl > TimeSpan.Zero && now - r.Time > ttl)) + Replies.Remove(k); + } + + RepliesSincePrune = 0; + LastReplyPrune = now; + } + + internal void PruneDenyCache() + { + if (MPerms is null) + return; + + var removed = 0; + foreach (var subject in MPerms.DCache.Keys.ToArray()) + { + MPerms.DCache.Remove(subject); + if (++removed >= PruneSize) + break; + } + } + + internal void PrunePubPermsCache() + { + if (Perms is null) + return; + + for (var i = 0; i < 5; i++) + { + if (Interlocked.CompareExchange(ref Perms.PRun, 1, 0) != 0) + return; + + var removed = 0; + foreach (var key in Perms.PCache.Keys.ToArray()) + { + if (Perms.PCache.Remove(key)) + removed++; + if (removed > PruneSize && Perms.PCache.Count <= MaxPermCacheSize) + break; + } + + Interlocked.Add(ref Perms.PcsZ, -removed); + Interlocked.Exchange(ref Perms.PRun, 0); + if (Perms.PCache.Count <= MaxPermCacheSize) + return; + } + } + + internal bool PubAllowed(string subject) => PubAllowedFullCheck(subject, fullCheck: true, hasLock: false); + + internal bool PubAllowedFullCheck(string subject, bool fullCheck, bool hasLock) + { + if (Perms is null || (Perms.Pub.Allow is null && Perms.Pub.Deny is null)) + return true; + + if (Perms.PCache.TryGetValue(subject, out var cached)) + return cached; + + var checkAllow = !((IsMqtt() || Kind != ClientKind.Client) && subject.StartsWith(MqttPrefix, StringComparison.Ordinal)); + var allowed = true; + + if (checkAllow && Perms.Pub.Allow is not null) + { + var (np, _) = Perms.Pub.Allow.NumInterest(subject); + allowed = np != 0; + } + + if (allowed && Perms.Pub.Deny is not null) + { + var (np, _) = Perms.Pub.Deny.NumInterest(subject); + allowed = np == 0; + } + + if (!allowed && fullCheck && Perms.Resp is not null && Replies is not null) + { + if (hasLock) + { + if (Replies.TryGetValue(subject, out var resp)) + { + resp.N++; + if ((Perms.Resp.MaxMsgs > 0 && resp.N > Perms.Resp.MaxMsgs) + || (Perms.Resp.Expires > TimeSpan.Zero && DateTime.UtcNow - resp.Time > Perms.Resp.Expires)) + { + Replies.Remove(subject); + } + else + { + Replies[subject] = resp; + allowed = true; + } + } + } + else + { + lock (_mu) + { + if (Replies.TryGetValue(subject, out var resp)) + { + resp.N++; + if ((Perms.Resp.MaxMsgs > 0 && resp.N > Perms.Resp.MaxMsgs) + || (Perms.Resp.Expires > TimeSpan.Zero && DateTime.UtcNow - resp.Time > Perms.Resp.Expires)) + { + Replies.Remove(subject); + } + else + { + Replies[subject] = resp; + allowed = true; + } + } + } + } + } + else + { + Perms.PCache[subject] = allowed; + if (Interlocked.Increment(ref Perms.PcsZ) > MaxPermCacheSize) + PrunePubPermsCache(); + } + + return allowed; + } + + internal static bool IsServiceReply(byte[] reply) => + reply.Length > 3 && Encoding.ASCII.GetString(reply, 0, 4) == ReplyPrefix; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 60bc7d8..2bb31fc 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1756,7 +1756,7 @@ public sealed partial class ClientConnection // features 471-486: processPub variants, parseSub, processSub, etc. // Implemented in full when Server+Account sessions complete. - // features 487-503: deliverMsg, addToPCD, trackRemoteReply, pruning, pubAllowed, etc. + // features 477-496 and 487-503: see ClientConnection.SubscriptionsAndDelivery.cs // features 512-514: processServiceImport, addSubToRouteTargets, processMsgResults diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs index 57b3e75..592898c 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs @@ -6,7 +6,9 @@ using System.Text; using System.Linq; using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Auth; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server.Tests; @@ -152,4 +154,89 @@ public sealed class ClientConnectionStubFeaturesTests result.sub.ShouldNotBeNull(); c.Subs.Count.ShouldBe(2); } + + [Fact] + public void CanSubscribe_WithAllowAndDenyQueues_ShouldMatchExpected() + { + var c = new ClientConnection(ClientKind.Client) + { + Perms = new ClientPermissions(), + }; + c.Perms.Sub.Allow = SubscriptionIndex.NewSublistWithCache(); + c.Perms.Sub.Deny = SubscriptionIndex.NewSublistWithCache(); + c.Perms.Sub.Allow.Insert(new Subscription + { + Subject = Encoding.ASCII.GetBytes("foo.*"), + Queue = Encoding.ASCII.GetBytes("q"), + }); + c.Perms.Sub.Deny.Insert(new Subscription + { + Subject = Encoding.ASCII.GetBytes("foo.blocked"), + }); + + c.CanSubscribe("foo.bar", "q").ShouldBeTrue(); + c.CanSubscribe("foo.bar", "other").ShouldBeFalse(); + c.CanSubscribe("foo.blocked").ShouldBeFalse(); + } + + [Fact] + public void ProcessUnsub_WithKnownSid_ShouldRemoveSubscription() + { + var c = new ClientConnection(ClientKind.Client); + c.ParseSub(Encoding.ASCII.GetBytes("foo sid1"), noForward: false).ShouldBeNull(); + c.Subs.Count.ShouldBe(1); + + c.ProcessUnsub(Encoding.ASCII.GetBytes("sid1")).ShouldBeNull(); + c.Subs.ShouldNotContainKey("sid1"); + } + + [Fact] + public void MsgHeaderAndRouteHeader_ShouldIncludeSubjectsAndSizes() + { + var c = new ClientConnection(ClientKind.Client); + c.ParseCtx.Pa.HeaderSize = 10; + c.ParseCtx.Pa.Size = 30; + c.ParseCtx.Pa.HeaderBytes = Encoding.ASCII.GetBytes("10"); + c.ParseCtx.Pa.SizeBytes = Encoding.ASCII.GetBytes("30"); + + var sub = new Subscription { Sid = Encoding.ASCII.GetBytes("22") }; + var mh = c.MsgHeader(Encoding.ASCII.GetBytes("foo.bar"), Encoding.ASCII.GetBytes("_R_.x"), sub); + Encoding.ASCII.GetString(mh).ShouldContain("foo.bar 22 _R_.x"); + Encoding.ASCII.GetString(mh).ShouldContain("30"); + + var routeTarget = new RouteTarget { Sub = sub, Qs = Encoding.ASCII.GetBytes("q1 q2") }; + var rmh = c.MsgHeaderForRouteOrLeaf( + Encoding.ASCII.GetBytes("foo.bar"), + Encoding.ASCII.GetBytes("_R_.x"), + routeTarget, + null); + Encoding.ASCII.GetString(rmh).ShouldContain("foo.bar"); + Encoding.ASCII.GetString(rmh).ShouldContain("q1 q2"); + } + + [Fact] + public void PubAllowedFullCheck_ShouldHonorResponseReplyCache() + { + var c = new ClientConnection(ClientKind.Client) + { + Perms = new ClientPermissions + { + Resp = new ResponsePermission + { + MaxMsgs = 2, + Expires = TimeSpan.FromMinutes(1), + }, + }, + Replies = new Dictionary(StringComparer.Ordinal) + { + ["_R_.x"] = new RespEntry { Time = DateTime.UtcNow, N = 0 }, + }, + }; + c.Perms.Pub.Deny = SubscriptionIndex.NewSublistWithCache(); + c.Perms.Pub.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(">") }); + + c.PubAllowed("_R_.x").ShouldBeTrue(); + c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeTrue(); + c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeFalse(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs index cf4a561..0486941 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientTests.cs @@ -189,6 +189,13 @@ public sealed class ClientTests c2.SetExpiration(DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeSeconds(), TimeSpan.Zero); SpinWait.SpinUntil(c2.IsClosed, TimeSpan.FromSeconds(2)).ShouldBeTrue(); } + + [Fact] + public void ReplyHelpers_ServiceAndReserved_ShouldClassifyPrefixes() + { + ClientConnection.IsServiceReply(Encoding.ASCII.GetBytes("_R_.A.B")).ShouldBeTrue(); + ClientConnection.IsServiceReply(Encoding.ASCII.GetBytes("foo.bar")).ShouldBeFalse(); + } } /// diff --git a/porting.db b/porting.db index baf14ed0780af9840bc2426e35c0ba932d853112..a228dc61c15d0ac63189e544dc9d9acf597c2331 100644 GIT binary patch delta 4832 zcmb_edr(tX8o!SukmTehk;h88!B8GbAd&|HqE$re`vD480Urqj7DYi2T)R7#klEAj z*0#Fs@MW#jLc4aI&epMv5=C8DcV|0ocN}fU*>T4QA}HF52kp!{uCuczBxdcH33sOc z@n!P6`Et(h`<>tSo!i#m!MC*mzb{d~k6{#*`;BwieV=bG%w{V+UD>{_Y}VD8)TCe- zmei^iTKH7ERO{9@2pvL`FZDcIBG|Juyn$^ z(H|Q_=q5$Z5E`baG=zpIS{On%D9R6^L5fTvbe$qGga#=3zXsPdyl>ha_R?{M<~;M5 z7a*#WyA6}OxPRpeO3h=wwMyM>;$@%8kf4LG>zFbX+P7e$J()s_Io6RmG|z zRgNm%Z;yaQp0)vd<&R=F7bT*ZdKi#~wr=pKqnA3D!j zSQ%Ejc6K58Y*AQW%b@~ zQ@Q^zTILv|-6Wh4)NC7b4BbMD)H_vQDAS`AQ5FT3PvgPbDc=Hx{T1oai#;3l3~OHo zAO0T2f#nQZ1x3pR12m>Y>EQ4gWY$@21(y8wU7ih-7*!)bWR2v<}WLRsb%>tLG%rnK z*7$b@*C}L!vlgme|0!e+s%P|U^3!wg4$uScG&&yG`uu71lFVNx6!)PcGIxrn0`9(r z5))K|XGnY8T_+p9n&#dq7W7=XimhWH+o-`h3;tfhC zV*=9yZM}?s6JI~@KubeMLr-H8jTjoSG~#H))A$*U$utsZyx=?VAhGuQWbIT&!;~|! z$Nbw|P4t|oc!i10W6A}aW)6B$^|A6pNH((hTK9TSBdtHI1P%t8z<>Q`%RRiKXP;`K{jrzJ6=$H_#ZFG&f?hR1*n9?d4 z6c54KPyN0a65}BChB!S)YlEkrCg!!*niIx_sl6c*_+0_rOm{`k#=sfeGYsx2_8OOCXG@a{-nmnq4Q;!v`l)MLoXu5D zb&Wgg8n!pj+g`oERbNLNVfXZ^=7Jo{j3<{OT%YZVg;_}s4Vqtk}V@2#ai#00EfJ% zdXpZ-yldqm`y6b!)>{%*i;{9`%WwbU zjG4GeZ8n<}EhajX;;=lDYbp|9pP@nm4{NR@9uxUKg@`McmMzZ delta 2579 zcmYk7eQ*`k8OC?d-jCeP+1*_9rd(X`CLu8&Y;Qi0gby1)1Y;@?QD~x&5ORZ%m_#rz zopC1ItOHDkA|mFf?NDebG95vJkTsA(9Q~uyPMJ)F3Wy}~5fc;=0Y_S$(r0t;J?tO9 zndjN_ob$ft+;iq`+Qg(yy&ckyhuqr*!Bh1{s8rnESu;B)t*KuVsgo-OL73wF zLNeKbOJcLQpN;PqLoN*GS+n}Zm)+VS?`c6@)%;RRW8{^Hc+l#(DmJ;=5csJsQXg5j zE>b5RguBTj2jFtKdk;?MZa>`p#r|w}z{mDs>;QMagL{{|op9T@+pe@ZCtk#03m^Of zu9mwCaL;mg9&SE&=io}X`yB3m+kb<*-}Yg+`)z-qNLKcB@%%B5`hxJiWHIj^GMuea zl_+v?1Z_MFmJ$IiyNO5AnRXgt(DhuRnJv~t2(Y~xmuY} zZE{UQ%GF%0;HrwN%7hX}xfCRaDO`j)G2?B6_bbCk0VXVHD+Q{Q}^yu#IGuD;@G zfUAD4E^+lGOFcx&$~<*^@8WenILy@$S6_2A$kjEju385UkxDT~eoeSXbV1%Iua%d} zi{;1VX>yLN%bHAl-}{DqU-(Y@KK32;z4z8=$`fqS98G77#FT8-Hn4I5`}8mwVrM$Z z&4`4pJj?wf+lnmp<=H(DK67( zzR2z!p}%J{kJ6ovx&J*%a{~F(W)zmRFEv+>L;d!Rb^|29N{-R*c+$_7b<$$+aJs$y9i5cKw?DQm+)Fq2Dw})Nu3k;df52BO z^b<;?xzbcAOG=gYNgYWNak}FLddnmcLwMKSV@+<6y&#!Lj zysd#0;0NQt1K>dr0IA?-U_1zd30CLrhZcXErrZ$}v0U)Zb)Rz`m6nkrv0Oc?{6z23 zWO>BblGJFw0fXfZ>aVl(j7bJFF6%NI9n^!Y?-`d8!cD%5t(QiZ8^Ho1PK?%>P4MAp zz`m^bfoBZrZH|*mOdmVBqQdr&`I2LLfMtEHf9QyNhV&x)jFGG~8<8w-Uu^_)?1dK& z>7$Nh!>}HXCqrfflGCHjMljD#Zn&&>Ig*uE^!)hmR&TCFa!&gyBUof7cVE%(I+CAU z)u+ajS>_rfXGU9%pkXKJHT{GmdFPs5;Idz3K=JqAw^5jIHGh0~Rh<-Mu{SmX- zE)f=S?E5CUl+9b|yXAQNPPM?f~n0l6R#OaTV?IS7M% zPyh-+5wL4K3Z{W#Py(id8DJ)u1s(&x0Ht6ym;>g5$H6@C1egyNfG0s2cnXw*UxKH> xLa+!d1{L5Lumn5{o&!t4GVm)<397*J;03T8tN_)Z2CM|NV3qabNS*ue{{g4UlUe`( From 8d5964efff18922d500932609182d6da88dad1c7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:18:37 -0500 Subject: [PATCH 3/6] feat(batch17): port inbound, header, and service-import client features --- .../ClientConnection.InboundAndHeaders.cs | 239 ++++++++++++++++++ .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 5 +- .../ClientConnectionStubFeaturesTests.cs | 44 ++++ .../Protocol/ProtocolParserTests.cs | 14 + porting.db | Bin 6668288 -> 6672384 bytes 5 files changed, 298 insertions(+), 4 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs new file mode 100644 index 0000000..25c9ce6 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs @@ -0,0 +1,239 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Text; +using System.Linq; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + private const string JsAckPrefix = "$JS.ACK."; + private const string GwReplyPrefix = "$GNR."; + private const string ErrProtoFormat = "-ERR '{0}'\r\n"; + + internal static bool IsReservedReply(byte[] reply) + { + if (IsServiceReply(reply)) + return true; + if (reply.Length > JsAckPrefix.Length && Encoding.ASCII.GetString(reply, 0, JsAckPrefix.Length) == JsAckPrefix) + return true; + return reply.Length > GwReplyPrefix.Length && Encoding.ASCII.GetString(reply, 0, GwReplyPrefix.Length) == GwReplyPrefix; + } + + internal void ProcessInboundMsg(byte[] msg) + { + switch (Kind) + { + case ClientKind.Client: + ProcessInboundClientMsg(msg); + break; + case ClientKind.Router: + case ClientKind.Gateway: + case ClientKind.Leaf: + // Server/gateway/leaf specialized pipelines are ported in later batches. + LastIn = DateTime.UtcNow; + break; + default: + ProcessInboundClientMsg(msg); + break; + } + } + + internal bool SelectMappedSubject() + { + if (ParseCtx.Pa.Subject is null || ParseCtx.Pa.Mapped is { Length: > 0 }) + return false; + return false; + } + + internal Subscription? SubForReply(byte[] reply) + { + _ = reply; + return Subs.Values.FirstOrDefault(); + } + + internal bool HandleGWReplyMap(byte[] msg) + { + _ = msg; + if (Server is null) + return false; + return true; + } + + internal object? SetupResponseServiceImport(INatsAccount acc, object? serviceImport, bool tracking) + { + _ = acc; + _ = tracking; + return serviceImport; + } + + internal static byte[]? RemoveHeaderIfPresent(byte[] hdr, string key) => + NatsMessageHeaders.RemoveHeaderIfPresent(hdr, key); + + internal static byte[]? RemoveHeaderIfPrefixPresent(byte[] hdr, string prefix) => + NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, prefix); + + internal static byte[] GenHeader(byte[]? hdr, string key, string value) => + NatsMessageHeaders.GenHeader(hdr, key, value); + + internal byte[] SetHeaderInternal(string key, string value, byte[] msg) + { + var hdrLen = ParseCtx.Pa.HeaderSize; + var existingHeader = hdrLen > 0 && msg.Length >= hdrLen ? msg[..hdrLen] : Array.Empty(); + var body = hdrLen > 0 && msg.Length > hdrLen ? msg[hdrLen..] : msg; + var nextHeader = NatsMessageHeaders.SetHeader(key, value, existingHeader); + + var merged = new byte[nextHeader.Length + body.Length]; + Buffer.BlockCopy(nextHeader, 0, merged, 0, nextHeader.Length); + Buffer.BlockCopy(body, 0, merged, nextHeader.Length, body.Length); + + ParseCtx.Pa.HeaderSize = nextHeader.Length; + ParseCtx.Pa.Size = merged.Length; + ParseCtx.Pa.HeaderBytes = Encoding.ASCII.GetBytes(nextHeader.Length.ToString()); + ParseCtx.Pa.SizeBytes = Encoding.ASCII.GetBytes(merged.Length.ToString()); + return merged; + } + + internal static byte[]? GetHeader(string key, byte[] hdr) => + NatsMessageHeaders.GetHeader(key, hdr); + + internal static ReadOnlyMemory? SliceHeader(string key, byte[] hdr) => + NatsMessageHeaders.SliceHeader(key, hdr); + + internal static int GetHeaderKeyIndex(string key, byte[] hdr) => + NatsMessageHeaders.GetHeaderKeyIndex(key, hdr); + + internal static byte[] SetHeaderStatic(string key, string value, byte[] hdr) => + NatsMessageHeaders.SetHeader(key, value, hdr); + + internal bool ProcessServiceImport(object? serviceImport, INatsAccount? acc, byte[] msg) + { + _ = serviceImport; + _ = acc; + return msg.Length > 0; + } + + internal void AddSubToRouteTargets(Subscription sub) + { + _in.Rts ??= new List(8); + foreach (var rt in _in.Rts) + { + if (ReferenceEquals(rt.Sub?.Client, sub.Client)) + { + if (sub.Queue is { Length: > 0 }) + { + rt.Qs = [.. rt.Qs, .. sub.Queue, (byte)' ']; + } + return; + } + } + + var queueBytes = sub.Queue is { Length: > 0 } q ? [.. q, (byte)' '] : Array.Empty(); + _in.Rts.Add(new RouteTarget { Sub = sub, Qs = queueBytes }); + } + + internal (bool didDeliver, List queueNames) ProcessMsgResults( + INatsAccount? acc, + SubscriptionIndexResult? result, + byte[] msg, + byte[]? deliver, + byte[] subject, + byte[]? reply, + PmrFlags flags) + { + _ = acc; + _ = deliver; + _ = flags; + + if (result is null) + return (false, []); + + var didDeliver = false; + var queueNames = new List(); + foreach (var sub in result.PSubs) + { + var mh = MsgHeader(subject, reply, sub); + if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty(), mh, msg, false)) + didDeliver = true; + } + + foreach (var qgroup in result.QSubs) + { + if (qgroup.Count == 0) + continue; + var sub = qgroup[0]; + if (sub.Queue is { Length: > 0 } q) + queueNames.Add(q); + var mh = MsgHeader(subject, reply, sub); + if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty(), mh, msg, false)) + didDeliver = true; + } + + return (didDeliver, queueNames); + } + + internal bool CheckLeafClientInfoHeader(byte[] msg, out byte[] updated) + { + updated = msg; + if (ParseCtx.Pa.HeaderSize <= 0 || msg.Length < ParseCtx.Pa.HeaderSize) + return false; + + var hdr = msg[..ParseCtx.Pa.HeaderSize]; + var existing = GetHeader(NatsHeaderConstants.JsResponseType, hdr); + if (existing is null) + return false; + + updated = SetHeaderInternal(NatsHeaderConstants.JsResponseType, Encoding.ASCII.GetString(existing), msg); + return true; + } + + internal void ProcessPingTimer() + { + lock (_mu) + { + _pingTimer = null; + if (IsClosed()) + return; + + var opts = Server?.Options; + var pingInterval = opts?.PingInterval ?? TimeSpan.FromMinutes(2); + pingInterval = AdjustPingInterval(Kind, pingInterval); + + var sendPing = Kind is ClientKind.Router or ClientKind.Gateway; + if (!sendPing) + { + var needRtt = Rtt == TimeSpan.Zero || DateTime.UtcNow - RttStart > TimeSpan.FromMinutes(1); + sendPing = DateTime.UtcNow - LastIn >= pingInterval || needRtt; + } + + if (sendPing) + { + var maxPingsOut = opts?.MaxPingsOut ?? 2; + if (_pingOut + 1 > maxPingsOut) + { + EnqueueProto(Encoding.ASCII.GetBytes(string.Format(ErrProtoFormat, "Stale Connection"))); + CloseConnection(ClosedState.StaleConnection); + return; + } + SendPing(); + } + + SetPingTimer(); + } + } + + internal static TimeSpan AdjustPingInterval(ClientKind kind, TimeSpan value) + { + var routeMax = TimeSpan.FromMinutes(1); + var gatewayMax = TimeSpan.FromMinutes(2); + return kind switch + { + ClientKind.Router when value > routeMax => routeMax, + ClientKind.Gateway when value > gatewayMax => gatewayMax, + _ => value, + }; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 2bb31fc..edecabb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1758,10 +1758,7 @@ public sealed partial class ClientConnection // features 477-496 and 487-503: see ClientConnection.SubscriptionsAndDelivery.cs - // features 512-514: processServiceImport, addSubToRouteTargets, processMsgResults - - // feature 515: checkLeafClientInfoHeader - // feature 520-522: processPingTimer, adjustPingInterval, watchForStaleConnection + // features 497-515 and 520: see ClientConnection.InboundAndHeaders.cs // feature 534-535: swapAccountAfterReload, processSubsOnConfigReload // feature 537: reconnect // feature 569: setFirstPingTimer diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs index 592898c..19e14f3 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs @@ -239,4 +239,48 @@ public sealed class ClientConnectionStubFeaturesTests c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeTrue(); c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeFalse(); } + + [Fact] + public void InboundAndHeaderHelpers_GroupB_ShouldBehave() + { + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("_R_.A.B")).ShouldBeTrue(); + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("$JS.ACK.A.B")).ShouldBeTrue(); + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("$GNR.A.B")).ShouldBeTrue(); + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("foo.bar")).ShouldBeFalse(); + + var c = new ClientConnection(ClientKind.Client) + { + ParseCtx = { Pa = { HeaderSize = 0 } }, + }; + + var before = DateTime.UtcNow; + c.ProcessInboundMsg(Encoding.ASCII.GetBytes("data")); + c.LastIn.ShouldBeGreaterThan(before - TimeSpan.FromMilliseconds(1)); + + c.Subs["sid"] = new Subscription { Sid = Encoding.ASCII.GetBytes("sid"), Subject = Encoding.ASCII.GetBytes("foo") }; + c.SubForReply(Encoding.ASCII.GetBytes("inbox")).ShouldNotBeNull(); + + var header = ClientConnection.GenHeader(null, "X-Test", "one"); + Encoding.ASCII.GetString(ClientConnection.GetHeader("X-Test", header)!).ShouldBe("one"); + ClientConnection.GetHeaderKeyIndex("X-Test", header).ShouldBeGreaterThan(0); + ClientConnection.SliceHeader("X-Test", header).ShouldNotBeNull(); + + var replaced = ClientConnection.SetHeaderStatic("X-Test", "two", header); + Encoding.ASCII.GetString(ClientConnection.GetHeader("X-Test", replaced)!).ShouldBe("two"); + ClientConnection.RemoveHeaderIfPresent(replaced, "X-Test").ShouldBeNull(); + + var prefixed = ClientConnection.GenHeader(header, "Nats-Expected-Last-Sequence", "10"); + ClientConnection.RemoveHeaderIfPrefixPresent(prefixed!, "Nats-Expected-").ShouldNotBeNull(); + + c.ParseCtx.Pa.HeaderSize = header.Length; + var merged = new byte[header.Length + 5]; + Buffer.BlockCopy(header, 0, merged, 0, header.Length); + Buffer.BlockCopy("hello"u8.ToArray(), 0, merged, header.Length, 5); + var next = c.SetHeaderInternal("X-Test", "three", merged); + Encoding.ASCII.GetString(next).ShouldContain("X-Test: three"); + + var result = new SubscriptionIndexResult(); + result.PSubs.Add(new Subscription { Subject = Encoding.ASCII.GetBytes("foo"), Sid = Encoding.ASCII.GetBytes("1") }); + c.ProcessMsgResults(null, result, "hello\r\n"u8.ToArray(), null, Encoding.ASCII.GetBytes("foo"), null, PmrFlags.None).didDeliver.ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs index 70fb822..8d399e2 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs @@ -181,6 +181,20 @@ public class ProtocolParserTests Encoding.ASCII.GetString(c.ArgBuf!).ShouldBe("foo 1"); } + [Fact] + public void ClientConnection_InboundDispatchAndPingIntervalHelpers_ShouldBehave() + { + var c = new ClientConnection(ClientKind.Client); + var before = DateTime.UtcNow; + c.ProcessInboundMsg(Encoding.ASCII.GetBytes("hello")); + c.LastIn.ShouldBeGreaterThan(before - TimeSpan.FromMilliseconds(1)); + + ClientConnection.AdjustPingInterval(ClientKind.Router, TimeSpan.FromHours(1)) + .ShouldBeLessThan(TimeSpan.FromHours(1)); + ClientConnection.AdjustPingInterval(ClientKind.Gateway, TimeSpan.FromHours(1)) + .ShouldBeLessThan(TimeSpan.FromHours(1)); + } + // ===================================================================== // TestParsePub — Go test ID 2602 // ===================================================================== diff --git a/porting.db b/porting.db index a228dc61c15d0ac63189e544dc9d9acf597c2331..6278a21852447c74f9249bd352e9dd2cbf8af694 100644 GIT binary patch delta 3711 zcma);e^6A{702JbZ+~<57a%I^2Gl4>_^}HMEEpn?RxvF|6GLkP=q~%HSVhW4W|~@l z>FC61YsGR8T~F*oDVqT$m2? z`Qy9yp6@;PoO5?+Y68Bg9r)H5WgEw-%G=^LlD3OG^HNFe!H!h1BbB(?nSzTj)Tua* z@Wx2daFy&g5Ca^#N!E}yID3CXj#lgUBVlk*coBxbFg=B>>`abIn9NUXSzYIPg2vU&PwVDsuPl$LUl}1k5M&9>Jh41Nll`1OKJku zHc36yH4bu{B>4arosznbDp6A7sHEExKPu_AWK4bus*&%cynH96@||Iu^gLwVAx9gr zkGi==k!ftz6G=Tmbx%@LsBTH>F{;lc^$67!Nll{alhg#Nvyys<>V%{oaLF8ZOp^C; zu|ZPfsA?tUN97jx+$BDRqL&&tF>;g~CeS)YbfRyJloHDK!l<8Ij#ET(&bcs3QCTEK zP#MI}$H|Lf)YuA>Pl+!aedjl5P#MI|XE^5yqNJ^&x}82E;XFtE^fvtm{gn38ZhDfo zLU<=_fSPGj7X15Tnyr{AF2sF8qr~VB={HQxJlrH^3ATLW2G~kW3D9`UwOPEX@fqu+n;= zu3A*}(o-r*H841C$~2LOGe^%XR81$Wm)^Vu#s*lV_=gYJbrpm!3!e@7xSb`-a`3maDR8ynXItN68d%Z6 z7^HNvmGfJ~ZJq2Go!MgpQx{u1zXAnatOwGj^i~-B1&e^yr&+vgwe&O_30^O+1+V8l zGn;1M^5TbQSOS5T-{8ryrvXtsk*C+3KG@VmoA|YRFcoz;>R6fw7zEz)=t;I}VO^GcVMg=VyyFw>d%PC!A-4 z@Omk32_*F91+2$RFN59MeXt&@zM*A*GBo>@5zupi4Z~%dJ{kV@dwJ6j_prk~`olej zz8)jVThG4W?odXx$FvRFTCH36nQl-Y%@^M@cR4@Mbv2lcuO>q5*u^6fpX2TK3v=K2OW+WVmKq8T4NEEUhS%IuXqD9Aa zjCmN|&%sWN_AwK9?^uEpIos+xmSrS3WPySG_O2!P41xo@;-D8xm*2b*@`o&ukoT>{ z2lOP)g>u-EBx}Gc!746Pz>40Jp9E?5EWZl&DDd9=_bjpw9*G~ZAXelBBo>K7 zRw1jAHOPy|T4Ws(k0c=LkwoMrBne4IQjk={hS-rbBpq=e8Av9Qg*cIHBnR1myo}@` zuOJ(dJR~0}Kwd?@gM1g+glt9%kuAt;$oG)%BSlCtvK1*oet^7=yn$>(-b6}~A0j_O zwj*yLZzE+$IZ}b_Kz1T7#En#n@gtru2NNT>dd?VcDApff=fbXssdNr)rRIJ0U(}o` zRav33e%dAMv}WemooKP=c?&O@f>j4-4BR?S!vj{!Ls~i0 zWKKoCp9G!O%98Pt$Iz^G<%W*W$_~Uw2Qn%0?S4KUhVo`F6BROpEU4`gT*0dp`Kq5c z@RG@;5=dWgfnA~ECx9fnk^)r-okQk=9DW?i@%H#oD><|{IWQM#G zxjrEnuUycH`@tk^KetEGZ21fn`8=V{8(-o)N5+DDD?`Qm1BDM=v$R0XEVGSwhYE+b z&axP2f7P|r1lqG?c@+CxK_mHd@ZvuzGZVPTW5||IK(TLUC<7jvZh(M eo*nje3GTU@)`YaUcx`*oVv$SQf&&n^uPMQcx$H zbQGP5A6hwx*DP;X9cvgOr>UmfkYh78LnKyKDv0b<;stuxb&vALXJ)_qzUTIycek>h zt0Xa5wHQh(Tcxnkw7#7s zvJG^go#iJ;k~G3C$MRxdiO=u%6|+8dqNLenFE3kMD4xwMvs2Qqtex#OBF?h0jGdvH zN@po=V`DH&?2q5Z*{ zId@eUt$(uT*&0`c)MXxa8FkjIF4QTrI#FMkbqV#cSr<_U%({TuVbxtZrNSY*~8s64ZNN6j$nH`GM4&Y@Dx>OkFIm$Rtb>+_be_tJ=o78|`rL6ES z<*9P4qex0MJB4ZkY6|5C)GP}IblTagjx?GB>YfP2*&>yPCt8-#l4xxhz2Mcbv%T6V zYVTFg>Eg7cn^Tq!{5zO6!)UvvUS}+i&fHLQ>3oc)RA<~&K?4s&f)}FTE{KL07y?6K zn2~XFxTlj6PDZ4W|AIa;xPZo<3wjg_?WRi?_0YC<3|Y4Vt2(tRo2GW?QIvB@ucLFD z?ZMTLBwMGR8hWqUo%$APTreVyf_s^F>4~9%ab5bh(EAGhV0yarl;F0iPQI)|EX2VG zh=;o&0TN*(+yhCF3@I=QQsG`04QX&6q{A4vAI8Es$bbi6JWPOzkO^5Z2`0l7co4E7 z2d2U_m<|uY!!QG8!Xq#X9)-u?ahMIcFbC$sJeUs)AP=5^Ct)Ex1y92>@GRuRA}D~x numlRB2$mYjmy3UCv2i6=Qj- Date: Sat, 28 Feb 2026 19:32:59 -0500 Subject: [PATCH 4/6] feat(batch17): port lifecycle, tls, and rate-limited logging features --- .../ClientConnection.LifecycleAndTls.cs | 516 ++++++++++++++++++ ...ientConnection.SubscriptionsAndDelivery.cs | 2 +- .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 33 +- .../src/ZB.MOM.NatsNet.Server/ClientTypes.cs | 10 + .../NatsServer.Listeners.cs | 5 +- .../ClientConnectionStubFeaturesTests.cs | 113 ++++ .../ImplBacklog/NatsServerTests.cs | 33 ++ porting.db | Bin 6672384 -> 6676480 bytes 8 files changed, 694 insertions(+), 18 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LifecycleAndTls.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LifecycleAndTls.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LifecycleAndTls.cs new file mode 100644 index 0000000..3eac89d --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LifecycleAndTls.cs @@ -0,0 +1,516 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Collections.Concurrent; +using System.Net.Security; +using System.Runtime.CompilerServices; +using System.Security.Authentication; +using System.Text; +using System.Linq; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + private static readonly TimeSpan FirstPingInterval = TimeSpan.FromSeconds(15); + private static readonly TimeSpan FirstClientPingInterval = TimeSpan.FromSeconds(2); + private const int MaxPerAccountCacheSize = 8192; + private const string StaleErrProtoFormat = "-ERR '{0}'\r\n"; + private static readonly ConditionalWeakTable> RateLimitCacheByServer = new(); + + internal void WatchForStaleConnection(TimeSpan pingInterval, int pingMax) + { + if (pingInterval <= TimeSpan.Zero || pingMax < 0) + return; + + var staleAfter = TimeSpan.FromTicks(pingInterval.Ticks * (pingMax + 1L)); + if (staleAfter <= TimeSpan.Zero) + return; + + ClearPingTimer(); + _pingTimer = new Timer(_ => + { + lock (_mu) + { + if (IsClosed()) + return; + + Debugf("Stale Client Connection - Closing"); + EnqueueProto(Encoding.ASCII.GetBytes(string.Format(StaleErrProtoFormat, "Stale Connection"))); + } + CloseConnection(ClosedState.StaleConnection); + }, null, staleAfter, Timeout.InfiniteTimeSpan); + } + + internal void SwapAccountAfterReload() + { + string accountName; + lock (_mu) + { + if (_account is null || Server is null) + return; + accountName = _account.Name; + } + + if (Server is not NatsServer server) + return; + + var (updated, _) = server.LookupAccount(accountName); + if (updated is null) + return; + + lock (_mu) + { + if (!ReferenceEquals(_account, updated)) + _account = updated; + } + } + + internal void ProcessSubsOnConfigReload(ISet? accountsWithChangedStreamImports) + { + INatsAccount? acc; + var checkPerms = false; + var checkAcc = false; + var retained = new List(); + var removed = new List(); + + lock (_mu) + { + checkPerms = Perms is not null; + checkAcc = _account is not null; + acc = _account; + if (!checkPerms && !checkAcc) + return; + + if (checkAcc && acc is not null && accountsWithChangedStreamImports is not null && + !accountsWithChangedStreamImports.Contains(acc.Name)) + { + checkAcc = false; + } + + MPerms = null; + foreach (var sub in Subs.Values) + { + var subject = Encoding.ASCII.GetString(sub.Subject); + var canSub = CanSubscribe(subject); + var canQSub = sub.Queue is { Length: > 0 } q && CanSubscribe(subject, Encoding.ASCII.GetString(q)); + + if (!canSub && !canQSub) + { + removed.Add(sub); + } + else if (checkAcc) + { + retained.Add(sub); + } + } + } + + if (checkAcc && acc is not null) + { + foreach (var sub in retained) + { + AddShadowSubscriptions(acc, sub); + } + } + + foreach (var sub in removed) + { + Unsubscribe(acc, sub, force: true, remove: true); + var sid = sub.Sid is { Length: > 0 } s ? Encoding.ASCII.GetString(s) : string.Empty; + SendErr($"Permissions Violation for Subscription to \"{Encoding.ASCII.GetString(sub.Subject)}\" (sid \"{sid}\")"); + Noticef("Removed sub \"{0}\" (sid \"{1}\") for \"{2}\" - not authorized", + Encoding.ASCII.GetString(sub.Subject), sid, GetAuthUser()); + } + } + + internal void Reconnect() + { + lock (_mu) + { + if (Flags.IsSet(ClientFlags.NoReconnect) || Server is null) + return; + } + + // Route/gateway/leaf reconnect orchestration is owned by server sessions. + } + + internal (INatsAccount? Account, SubscriptionIndexResult? Result) GetAccAndResultFromCache() + { + var pa = ParseCtx.Pa; + if (pa.Subject is null || pa.Subject.Length == 0) + return (null, null); + + _in.PaCache ??= new Dictionary(StringComparer.Ordinal); + var cacheKeyBytes = pa.PaCache is { Length: > 0 } k ? k : pa.Subject; + var cacheKey = Encoding.ASCII.GetString(cacheKeyBytes); + + if (_in.PaCache.TryGetValue(cacheKey, out var cached) && + cached.Acc is Account cachedAcc && + cached.Results is not null && + cachedAcc.Sublist is not null && + cached.GenId == (ulong)cachedAcc.Sublist.GenId()) + { + return (cached.Acc, cached.Results); + } + + INatsAccount? acc = null; + if (Kind == ClientKind.Router && pa.Account is { Length: > 0 } && _account is not null) + { + acc = _account; + } + else if (Server is NatsServer server && pa.Account is { Length: > 0 } accountNameBytes) + { + var accountName = Encoding.ASCII.GetString(accountNameBytes); + (acc, _) = server.LookupAccount(accountName); + } + + if (acc is not Account concreteAcc || concreteAcc.Sublist is null) + return (null, null); + + var result = concreteAcc.Sublist.MatchBytes(pa.Subject); + if (_in.PaCache.Count >= MaxPerAccountCacheSize) + { + foreach (var key in _in.PaCache.Keys.ToArray()) + { + _in.PaCache.Remove(key); + if (_in.PaCache.Count < MaxPerAccountCacheSize) + break; + } + } + + _in.PaCache[cacheKey] = new PerAccountCache + { + Acc = concreteAcc, + Results = result, + GenId = (ulong)concreteAcc.Sublist.GenId(), + }; + return (concreteAcc, result); + } + + internal void PruneClosedSubFromPerAccountCache() + { + if (_in.PaCache is null || _in.PaCache.Count == 0) + return; + + foreach (var key in _in.PaCache.Keys.ToArray()) + { + var entry = _in.PaCache[key]; + var result = entry.Results; + if (result is null) + { + _in.PaCache.Remove(key); + continue; + } + + var remove = result.PSubs.Any(static s => s.IsClosed()); + if (!remove) + { + foreach (var qsub in result.QSubs) + { + if (qsub.Any(static s => s.IsClosed())) + { + remove = true; + break; + } + } + } + + if (remove) + _in.PaCache.Remove(key); + } + } + + internal void AddServerAndClusterInfo(ClientInfo? ci) + { + if (ci is null) + return; + + if (Server is NatsServer server) + { + ci.Server = Kind == ClientKind.Leaf ? ci.Server : server.Name(); + var cluster = server.CachedClusterName(); + if (!string.IsNullOrWhiteSpace(cluster)) + ci.Cluster = [cluster]; + } + } + + internal ClientInfo? GetClientInfo(bool detailed) + { + if (Kind is not (ClientKind.Client or ClientKind.Leaf or ClientKind.JetStream or ClientKind.Account)) + return null; + + var ci = new ClientInfo(); + if (detailed) + AddServerAndClusterInfo(ci); + + lock (_mu) + { + ci.Account = _account?.Name ?? string.Empty; + ci.Rtt = Rtt; + if (!detailed) + return ci; + + ci.Start = Start == default ? string.Empty : Start.ToString("O"); + ci.Host = Host; + ci.Id = Cid; + ci.Name = Opts.Name; + ci.User = GetRawAuthUser(); + ci.Lang = Opts.Lang; + ci.Version = Opts.Version; + ci.Jwt = Opts.Jwt; + ci.NameTag = NameTag; + ci.Kind = KindString(); + ci.ClientType = ClientTypeString(); + } + + return ci; + } + + internal Exception? DoTLSServerHandshake( + string typ, + SslServerAuthenticationOptions tlsConfig, + double timeout, + PinnedCertSet? pinnedCerts) + { + var (_, err) = DoTLSHandshake(typ, solicit: false, null, tlsConfig, null, string.Empty, timeout, pinnedCerts); + return err; + } + + internal (bool resetTlsName, Exception? err) DoTLSClientHandshake( + string typ, + Uri? url, + SslClientAuthenticationOptions tlsConfig, + string tlsName, + double timeout, + PinnedCertSet? pinnedCerts) + { + return DoTLSHandshake(typ, solicit: true, url, null, tlsConfig, tlsName, timeout, pinnedCerts); + } + + internal (bool resetTlsName, Exception? err) DoTLSHandshake( + string typ, + bool solicit, + Uri? url, + SslServerAuthenticationOptions? serverTlsConfig, + SslClientAuthenticationOptions? clientTlsConfig, + string tlsName, + double timeout, + PinnedCertSet? pinnedCerts) + { + if (_nc is null) + return (false, ServerErrors.ErrConnectionClosed); + + var kind = Kind; + var resetTlsName = false; + Exception? err = null; + SslStream? ssl = null; + + try + { + var baseStream = _nc; + if (solicit) + { + Debugf("Starting TLS {0} client handshake", typ); + var options = clientTlsConfig ?? new SslClientAuthenticationOptions(); + if (string.IsNullOrWhiteSpace(options.TargetHost)) + { + var host = url?.Host ?? string.Empty; + options.TargetHost = !string.IsNullOrWhiteSpace(tlsName) ? tlsName : host; + } + + ssl = new SslStream(baseStream, leaveInnerStreamOpen: false); + _nc = ssl; + + using var cts = timeout > 0 + ? new CancellationTokenSource(TimeSpan.FromSeconds(timeout)) + : new CancellationTokenSource(); + ssl.AuthenticateAsClientAsync(options, cts.Token).GetAwaiter().GetResult(); + } + else + { + Debugf(kind == ClientKind.Client + ? "Starting TLS client connection handshake" + : "Starting TLS {0} server handshake", typ); + ssl = new SslStream(baseStream, leaveInnerStreamOpen: false); + _nc = ssl; + + using var cts = timeout > 0 + ? new CancellationTokenSource(TimeSpan.FromSeconds(timeout)) + : new CancellationTokenSource(); + ssl.AuthenticateAsServerAsync(serverTlsConfig ?? new SslServerAuthenticationOptions(), cts.Token) + .GetAwaiter() + .GetResult(); + } + + if (pinnedCerts is { Count: > 0 } && !MatchesPinnedCert(pinnedCerts)) + err = new InvalidOperationException("certificate not pinned"); + } + catch (AuthenticationException authEx) + { + if (solicit && !string.IsNullOrWhiteSpace(tlsName) && url is not null && + string.Equals(url.Host, tlsName, StringComparison.OrdinalIgnoreCase)) + { + resetTlsName = true; + } + err = authEx; + } + catch (OperationCanceledException) + { + err = new TimeoutException("TLS handshake timeout"); + } + catch (Exception ex) + { + err = ex; + } + + if (err is null) + { + lock (_mu) + { + Flags = Flags.Set(ClientFlags.HandshakeComplete); + if (IsClosed()) + return (false, ServerErrors.ErrConnectionClosed); + } + return (false, null); + } + + if (kind == ClientKind.Client) + Errorf("TLS handshake error: {0}", err.Message); + else + Errorf("TLS {0} handshake error: {1}", typ, err.Message); + + CloseConnection(ClosedState.TlsHandshakeError); + return (resetTlsName, ServerErrors.ErrConnectionClosed); + } + + internal static (HashSet Allowed, Exception? Error) ConvertAllowedConnectionTypes(IEnumerable cts) + { + var unknown = new List(); + var allowed = new HashSet(StringComparer.Ordinal); + + foreach (var value in cts) + { + var upper = value.ToUpperInvariant(); + if (AuthHandler.ConnectionTypes.IsKnown(upper)) + { + allowed.Add(upper); + } + else + { + unknown.Add(upper); + } + } + + return unknown.Count == 0 + ? (allowed, null) + : (allowed, new ArgumentException($"invalid connection types \"{string.Join(",", unknown)}\"")); + } + + internal void RateLimitErrorf(string format, params object?[] args) + { + if (Server is null) + return; + + var statement = string.Format(format, args); + if (!TryMarkRateLimited("ERR:" + statement)) + return; + + var suffix = FormatClientSuffix(); + if (!string.IsNullOrWhiteSpace(String())) + Errorf("{0} - {1}{2}", String(), statement, suffix); + else + Errorf("{0}{1}", statement, suffix); + } + + internal void RateLimitFormatWarnf(string format, params object?[] args) + { + if (Server is null) + return; + + if (!TryMarkRateLimited("WARN_FMT:" + format)) + return; + + var statement = string.Format(format, args); + var suffix = FormatClientSuffix(); + if (!string.IsNullOrWhiteSpace(String())) + Warnf("{0} - {1}{2}", String(), statement, suffix); + else + Warnf("{0}{1}", statement, suffix); + } + + internal void RateLimitWarnf(string format, params object?[] args) + { + if (Server is null) + return; + + var statement = string.Format(format, args); + if (!TryMarkRateLimited("WARN:" + statement)) + return; + + var suffix = FormatClientSuffix(); + if (!string.IsNullOrWhiteSpace(String())) + Warnf("{0} - {1}{2}", String(), statement, suffix); + else + Warnf("{0}{1}", statement, suffix); + } + + internal void RateLimitDebugf(string format, params object?[] args) + { + if (Server is null) + return; + + var statement = string.Format(format, args); + if (!TryMarkRateLimited("DBG:" + statement)) + return; + + var suffix = FormatClientSuffix(); + if (!string.IsNullOrWhiteSpace(String())) + Debugf("{0} - {1}{2}", String(), statement, suffix); + else + Debugf("{0}{1}", statement, suffix); + } + + internal void SetFirstPingTimer() + { + var opts = Server?.Options; + if (opts is null) + return; + + var d = opts.PingInterval; + if (Kind == ClientKind.Router && opts.Cluster.PingInterval > TimeSpan.Zero) + d = opts.Cluster.PingInterval; + if (IsWebSocket() && opts.Websocket.PingInterval > TimeSpan.Zero) + d = opts.Websocket.PingInterval; + + if (!opts.DisableShortFirstPing) + { + if (Kind != ClientKind.Client) + { + if (d > FirstPingInterval) + d = FirstPingInterval; + d = AdjustPingInterval(Kind, d); + } + else if (d > FirstClientPingInterval) + { + d = FirstClientPingInterval; + } + } + + var addTicks = d.Ticks > 0 ? Random.Shared.NextInt64(Math.Max(1, d.Ticks / 5)) : 0L; + d = d.Add(TimeSpan.FromTicks(addTicks)); + + ClearPingTimer(); + _pingTimer = new Timer(_ => ProcessPingTimer(), null, d, Timeout.InfiniteTimeSpan); + } + + private bool TryMarkRateLimited(string key) + { + var serverKey = (object?)Server ?? this; + var cache = RateLimitCacheByServer.GetOrCreateValue(serverKey); + return cache.TryAdd(key, DateTime.UtcNow); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs index 927c304..8b26bba 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs @@ -134,7 +134,7 @@ public sealed partial class ClientConnection return null; // Max-delivery based deferred unsub is not modeled yet, so unsubscribe immediately. - Unsubscribe(Account, sub, force: true, remove: true); + Unsubscribe(_account, sub, force: true, remove: true); } if (Opts.Verbose) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index edecabb..516ad45 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -97,7 +97,7 @@ public sealed partial class ClientConnection // Connection kind and server references. internal ClientKind Kind; // mirrors c.kind internal INatsServer? Server; // mirrors c.srv - internal INatsAccount? Account; // mirrors c.acc + internal INatsAccount? _account; // mirrors c.acc internal ClientPermissions? Perms; // mirrors c.perms internal MsgDeny? MPerms; // mirrors c.mperms @@ -439,15 +439,15 @@ public sealed partial class ClientConnection if (!acc.IsValid) throw new BadAccountException(); // Deregister from previous account. - if (Account is not null) + if (_account is not null) { - var prev = Account.RemoveClient(this); + var prev = _account.RemoveClient(this); if (prev == 1) Server?.DecActiveAccounts(); } lock (_mu) { - Account = acc; + _account = acc; ApplyAccountLimits(); } @@ -503,7 +503,7 @@ public sealed partial class ClientConnection /// internal void ApplyAccountLimits() { - if (Account is null || (Kind != ClientKind.Client && Kind != ClientKind.Leaf)) + if (_account is null || (Kind != ClientKind.Client && Kind != ClientKind.Leaf)) return; Volatile.Write(ref _mpay, JwtNoLimit); @@ -1111,7 +1111,7 @@ public sealed partial class ClientConnection internal void SetAccount(INatsAccount? acc) { - lock (_mu) { Account = acc; } + lock (_mu) { _account = acc; } } internal void SetAccount(Account? acc) => SetAccount(acc as INatsAccount); @@ -1360,25 +1360,29 @@ public sealed partial class ClientConnection // Account / server helpers (features 540-545) // ========================================================================= - internal INatsAccount? GetAccount() + internal INatsAccount? Account() { - lock (_mu) { return Account; } + lock (_mu) { return _account; } } + internal INatsAccount? GetAccount() => Account(); + // ========================================================================= // TLS handshake helpers (features 546-548) // ========================================================================= internal async Task DoTlsServerHandshakeAsync(SslServerAuthenticationOptions opts, CancellationToken ct = default) { - // Deferred: full TLS flow will be completed with server integration. - return false; + _ = ct; + return await Task.FromResult( + DoTLSServerHandshake("client", opts, Server?.Options.TlsTimeout ?? 2, Server?.Options.TlsPinnedCerts) is null); } internal async Task DoTlsClientHandshakeAsync(SslClientAuthenticationOptions opts, CancellationToken ct = default) { - // Deferred: full TLS flow will be completed with server integration. - return false; + _ = ct; + var (_, err) = DoTLSClientHandshake("route", null, opts, opts.TargetHost ?? string.Empty, Server?.Options.TlsTimeout ?? 2, null); + return await Task.FromResult(err is null); } // ========================================================================= @@ -1759,9 +1763,8 @@ public sealed partial class ClientConnection // features 477-496 and 487-503: see ClientConnection.SubscriptionsAndDelivery.cs // features 497-515 and 520: see ClientConnection.InboundAndHeaders.cs - // feature 534-535: swapAccountAfterReload, processSubsOnConfigReload - // feature 537: reconnect - // feature 569: setFirstPingTimer + // features 521-522, 534-535, 537, 540-548, 553, 565-569: + // see ClientConnection.LifecycleAndTls.cs // ========================================================================= // IsMqtt / IsWebSocket helpers (used by clientType, not separately tracked) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs index 608761e..ca53a62 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs @@ -292,10 +292,12 @@ public sealed class ClientOptions /// public sealed class ClientInfo { + public string Server { get; set; } = string.Empty; public string Start { get; set; } = string.Empty; public string Host { get; set; } = string.Empty; public ulong Id { get; set; } public string Account { get; set; } = string.Empty; + public string ServiceName { get; set; } = string.Empty; public string User { get; set; } = string.Empty; public string Name { get; set; } = string.Empty; public string Lang { get; set; } = string.Empty; @@ -311,6 +313,7 @@ public sealed class ClientInfo public bool Restart { get; set; } public bool Disconnect { get; set; } public string[]? Cluster { get; set; } + public List Alternates { get; set; } = []; public bool Service { get; set; } /// @@ -319,6 +322,13 @@ public sealed class ClientInfo /// Added here to support . /// public TimeSpan Rtt { get; set; } + + /// + /// Returns the service account for this client info payload. + /// Mirrors Go ClientInfo.serviceAccount(). + /// + public string ServiceAccount() => + string.IsNullOrWhiteSpace(ServiceName) ? Account : ServiceName; } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs index 6056e61..dc176c6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs @@ -690,8 +690,9 @@ public sealed partial class NatsServer lock (c) { // acc name if not the global account. - if (c.Account?.Name != null && c.Account.Name != ServerConstants.DefaultGlobalAccount) - acc = c.Account.Name; + var account = c.GetAccount(); + if (account?.Name != null && account.Name != ServerConstants.DefaultGlobalAccount) + acc = account.Name; } var cc = new ClosedClient diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs index 19e14f3..ccaf654 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs @@ -283,4 +283,117 @@ public sealed class ClientConnectionStubFeaturesTests result.PSubs.Add(new Subscription { Subject = Encoding.ASCII.GetBytes("foo"), Sid = Encoding.ASCII.GetBytes("1") }); c.ProcessMsgResults(null, result, "hello\r\n"u8.ToArray(), null, Encoding.ASCII.GetBytes("foo"), null, PmrFlags.None).didDeliver.ShouldBeTrue(); } + + [Fact] + public void LifecycleAndTlsHelpers_GroupC_ShouldBehave() + { + var logger = new CaptureLogger(); + var (server, err) = NatsServer.NewServer(new ServerOptions + { + PingInterval = TimeSpan.FromMilliseconds(120), + }); + err.ShouldBeNull(); + server.SetLogger(logger, debugFlag: true, traceFlag: true); + + using var ms = new MemoryStream(); + var c = new ClientConnection(ClientKind.Client, server, ms) + { + Cid = 42, + Host = "127.0.0.1", + Start = DateTime.UtcNow.AddSeconds(-2), + Rtt = TimeSpan.FromMilliseconds(5), + }; + + c.SetFirstPingTimer(); + GetTimer(c, "_pingTimer").ShouldNotBeNull(); + + c.WatchForStaleConnection(TimeSpan.FromMilliseconds(20), pingMax: 0); + Thread.Sleep(60); + c.IsClosed().ShouldBeTrue(); + + var temp = Account.NewAccount("A"); + temp.Sublist = SubscriptionIndex.NewSublistWithCache(); + c.SetAccount(temp); + + var registered = server.LookupOrRegisterAccount("A").Account; + registered.Sublist = SubscriptionIndex.NewSublistWithCache(); + var inserted = new Subscription + { + Subject = Encoding.ASCII.GetBytes("foo.bar"), + Sid = Encoding.ASCII.GetBytes("11"), + }; + registered.Sublist.Insert(inserted).ShouldBeNull(); + + c.SwapAccountAfterReload(); + c.GetAccount().ShouldBe(registered); + + c.Perms = new ClientPermissions(); + c.Perms.Sub.Deny = SubscriptionIndex.NewSublistWithCache(); + c.Perms.Sub.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(">") }).ShouldBeNull(); + c.Subs["22"] = new Subscription + { + Subject = Encoding.ASCII.GetBytes("foo.bar"), + Sid = Encoding.ASCII.GetBytes("22"), + }; + c.ProcessSubsOnConfigReload(new HashSet(StringComparer.Ordinal) { registered.Name }); + c.Subs.ContainsKey("22").ShouldBeFalse(); + + c.ParseCtx.Pa.Account = Encoding.ASCII.GetBytes("A"); + c.ParseCtx.Pa.Subject = Encoding.ASCII.GetBytes("foo.bar"); + c.ParseCtx.Pa.PaCache = Encoding.ASCII.GetBytes("A:foo.bar"); + var cached = c.GetAccAndResultFromCache(); + cached.Account.ShouldBe(registered); + cached.Result.ShouldNotBeNull(); + cached.Result.PSubs.Count.ShouldBeGreaterThan(0); + + var closedSub = new Subscription { Subject = Encoding.ASCII.GetBytes("foo.closed") }; + closedSub.Close(); + var inField = typeof(ClientConnection).GetField("_in", BindingFlags.Instance | BindingFlags.NonPublic)!; + var state = (ReadCacheState)inField.GetValue(c)!; + state.PaCache = new Dictionary(StringComparer.Ordinal) + { + ["closed"] = new PerAccountCache + { + Acc = registered, + Results = new SubscriptionIndexResult + { + PSubs = { closedSub }, + }, + GenId = 1, + }, + }; + inField.SetValue(c, state); + c.PruneClosedSubFromPerAccountCache(); + state = (ReadCacheState)inField.GetValue(c)!; + state.PaCache.ShouldNotBeNull(); + state.PaCache.Count.ShouldBe(0); + + var info = c.GetClientInfo(detailed: true); + info.ShouldNotBeNull(); + info!.Account.ShouldBe("A"); + info.Server.ShouldNotBeNullOrWhiteSpace(); + info.ServiceAccount().ShouldBe("A"); + + var (allowed, convertErr) = ClientConnection.ConvertAllowedConnectionTypes( + ["standard", "mqtt", "bad"]); + allowed.ShouldContain(AuthHandler.ConnectionTypes.Standard); + allowed.ShouldContain(AuthHandler.ConnectionTypes.Mqtt); + convertErr.ShouldNotBeNull(); + + c.RateLimitWarnf("warn {0}", 1); + c.RateLimitWarnf("warn {0}", 1); + logger.Warnings.Count.ShouldBe(1); + } + + private sealed class CaptureLogger : INatsLogger + { + public List Warnings { get; } = []; + + public void Noticef(string format, params object[] args) { } + public void Warnf(string format, params object[] args) => Warnings.Add(string.Format(format, args)); + public void Fatalf(string format, params object[] args) { } + public void Errorf(string format, params object[] args) { } + public void Debugf(string format, params object[] args) { } + public void Tracef(string format, params object[] args) { } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs index e4f24b7..eca96fa 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs @@ -6,6 +6,26 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class NatsServerTests { + [Fact] + public void RateLimitedClientLogging_ShouldSuppressDuplicates() + { + var logger = new NatsServerCaptureLogger(); + var (server, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + server.SetLogger(logger, debugFlag: true, traceFlag: true); + + var c = new ClientConnection(ClientKind.Client, server, new MemoryStream()); + c.RateLimitWarnf("duplicate warning {0}", "A"); + c.RateLimitWarnf("duplicate warning {0}", "A"); + c.RateLimitFormatWarnf("format warning {0}", "B"); + c.RateLimitFormatWarnf("format warning {0}", "C"); + c.RateLimitErrorf("duplicate error {0}", "X"); + c.RateLimitErrorf("duplicate error {0}", "X"); + + logger.Warnings.Count.ShouldBe(2); + logger.Errors.Count.ShouldBe(1); + } + [Fact] // T:2886 public void CustomRouterAuthentication_ShouldSucceed() { @@ -518,4 +538,17 @@ public sealed class NatsServerTests "TestServerShutdownDuringStart".ShouldNotBeNullOrWhiteSpace(); } + private sealed class NatsServerCaptureLogger : INatsLogger + { + public List Warnings { get; } = []; + public List Errors { get; } = []; + + public void Noticef(string format, params object[] args) { } + public void Warnf(string format, params object[] args) => Warnings.Add(string.Format(format, args)); + public void Fatalf(string format, params object[] args) { } + public void Errorf(string format, params object[] args) => Errors.Add(string.Format(format, args)); + public void Debugf(string format, params object[] args) { } + public void Tracef(string format, params object[] args) { } + } + } diff --git a/porting.db b/porting.db index 6278a21852447c74f9249bd352e9dd2cbf8af694..26ab5b27c083af13cd4c6a1df10f793661b04619 100644 GIT binary patch delta 7249 zcmbuE3wRXO6~|}pyeD^OHX$TzUYn2z34~;Kvq?6Q0HGA|#UqIb2$e(e$}r;mEy?5At~c-dFmZ{b+P%PCSLd!ax!}Ng_V@Z|&o7s`>lQ6o#+kTab>Q%oQj_SByrJy?CSIMY$ z`jr(`t6wFdTH{xVsA~PHDgkAsUs_O=`Bf~c$$k}s%H>xos!@KWpi1zo0#w(h?J`FE zojoUrJDFWXV+`9xb}UytyRJHzR@=#U!RW7PBzsf*0**NeOdxy+xU$dRY<^QtR z2+uIg75L^F+nI%y%ilW{)hNFjjw->gQc!8er{A)cCf+oO`OFZSNIeT@zBI>_@Z)$J zKb()|MeZ{9ckUd2l0VFM@LTy7ei^@z?-uHWXM`?FXdSJlGwBpsNHgh0+C|@?r|Aj$ zj494UP5Vu4rj4f6rW#YZsRWvu$uQ$`GvQfqIS3^t1>V|6%HU)RiG$CdB?*Ln3p>`4 z$xw^UuwoynVpT{eAn6dhk5s~YEyN60_ma3BkB~a}0u5+=ge1X79_v-Kw4f5O8W$fS zQDX2flfm3d65ye|q#RmXi4_hVA*DfGA{N5tBN)=Ti42L*dUHi*Le>#dV{~pJ#Zi)S zjQNsGVYb(rXEc7HcB>z#@2G!Jf1|#n9#x-Kcd6UeR&|}aQhi8WsNSp2RPR(LsW+<* zHA78S4k%A3KT*~yOO=JnJ<3$2SaB*Nl{m$uF!EP&xBMshb@>(fn7mJJlOL1U$PdZY z^4;=na*=G4Q{@<0k*-KzNL|vq(reN$rNfQVZfTpeL0Tz2AeBp{(qySX%9N~9lq86k z#eazB#k1n8q9HybJ}GV%*NMx-MdDm>nmAF+71PCdQ4?9=U&3F7_k}lvPT_fBztAqU z3XQnDZef;iyD(0$3u!{Epz>Gwi~L{s_xRu8Rye}%;kWaT@~im98oq)r<8R>$`7wMl zKLq#7f4I-M54pFwQ``yeAh(O#!nJVAxy9T(ZaVh^E{{v+;y6mKk&C2@yhBct6XY54 z1bLh^kvdXMW|ArTr~3Q)>-x+3QGJiTO<%7+tS{E*>QnUzxjM*B=Vubt6; z)2O|m?bq70joNChMl07!v`JcCxPQHG#Z5u)$X1`i%JBYCxS^!JVQD2v#Wh#G*6^VI z#*=%4%9Ewq7%jFmB&ZKHwWo=}e9H;S{v0y;d?>bG()n zt?6DX39ZRqD-o@+UMm4Dhu5;8mF~4-(Ms}KF=$15Efp=_02P@VLTc`9Y*gZ@sN9njD`Y_ zwF<3KK}Gmtka|8y9fPkAlTp0>HLF3|5%T6(f3gcO_r96B(DKbRA1&WZ^U(6mG}r$` zhVj{P@-iFEZ1Fdysp5I&eSEjW=NH&p;T0P(K}V-K+BkNUOkh=cv%J>#!h$Q$k+H_* z=SVlt(`t6F(RG|W%fkAv%%kCrugn=n*9kH_R(^ucL*L?jgPtLIETQw@Y~ipu#;&vE z6;;HBdF{%Es&fS45FQZ_5s?rX@wBT()wzhP+hO7|OCn6~rcv;dI_iX@2dOONI124H z<4he@S=h5XDG_Q@r6^Du=uO^s#oOLo+>#3R4r+l}4YU|0?xbVjbOZjl?qbJMIv*x} zYwB^ZX(=7;ZPk1ZN?KAP88&w|G+(08(6yA#hNw6#3C#O(H1{u~sR6FwXUphQKq_Or zzRYm{a%u}4Ol(Ugyb@)y7^jxg3leNCL_bZh5dmIn>6Ro3 zmJ(k_!k@K@PK9`=~0SjllRGKKceQOR2x6>m~GL!2W*rXjj$?Mq?CpXiLWW(wm6fq&1v3iI8$m~)rhPi?V zsnV*fQEHV+^-Z;lE0xi?zTwJ~Dkn_ca|cBauiHG6ac6h9WUYEHVtS zAaO`Ml7J){-Pe-hXN;CEF;Pq=qZeyyO(*CTIw7K2J*846lbc4KV7ui5;wj;6{@-|e z>Vo;D)-qVTr#IgpmRi&OOKK}Hx~5wNmUFbYAo12D3#=-&DzJLCH7ocL1uo3CS_7Vg z?itpS@Aj!Fu_k%@c*@sSfaRO#E8pWK*5ZIQ$1x}{rqr4i;LwOJv!3UMVeK4`V%Va! zb6X0$|K2mkl1ohJ8}ndCo&b8hD=9hASFmCQij}x%tY=+W+AhYImkW8 zTx1?HAGsGPM=Fp?qzb7v=p6TD_t*sH6GqR{=9)Irf6_HnkLZZ79;v6;(R;I3#`TIa4O z@X-Q$0zCM0cWU+>ix#-6R#Yu<7uQsmEve6{sxNfr=l6c1G*r~z?@V6g`7nFHUDM#M zhR7Eyli|#{YTl82Uqyqv-eGg(jj`p7vDuStw!$2HVQyaYxOUkWH8m)z3wD2AnF&oR z?2&zj&9e_StkhT7{O<|t%pGi4v9GXs-xJnRFxap{Utx2?O}Hn<~oDFYnH%1{@;IxE6l6VD)4gnFcTQW7Y(5RHsfP#4f)uE-f5R z&6c)@RPR|jGVtA}K=HP!Ot3xg9+B^GxIACJl@$$D3&Zs~wXGJ)-*iVCd){!*CnG$c zB|3ka(Z_15OuwN2q4^OF>aUe=l~Q?&^p<22Zx`S^XzFy*GGjWF$j>TKwI&uos*31{c_e{sRJj0~L>ur<|*H(p8mVE1lz zyV)052$)zsN%ZYq+ba9egq*x^cFPURQuoCOF3SX8Sw6Ou^uJT>VQrTiCRXbsvDqcK zaQ$8N?O`qE4dz?aZt}%U3Cd}#XTEXJ@vBPTe6#xay2*D8?p0LOPWuWS16vNci~0)a x^#5nj|MxXj)E?1SK=t}Xzpto{LB4S(w5z_TBZD#=aNH%}aTi7^E&W#f{{Zf-!YKd% delta 3718 zcmbuBdsJ0b8o?Vx$)zo;$;`}U6J;?Dy(gqBHN^QYk8>}7^w%ub@2>mXk8gkb z+h^ZHSsl=_=YUp4irY9&%Gwq+p5JyPCnMQXP4o+O< zV+r31_lz*(Ea!jt5ke_{N2}wGc8=k1qaCuX1#P!&zoAvyb_;E-Z8y;hZM%V%Yum4A z8Mgg`mTX(zb<{*#e@2V5?I$$1ZOv%*X83D?4R`htC3GaAp(6>g`hUyUi^KoRy+#bq zTt~hj7szRHlpG?v=>g_6ubP+4v*vNLmR8eE^l`eBX46zhquC5meIM=xbxsm)jcwu+ z`4@Iy<9VpP#*bs|g(l&FEZ}N0|2Qm|Y{tPSqfMP33Gs<$q;=#cev%@vKAhXS@f+Vt z1ha~(gj=nA-H_13A~9Vox{LJWYqk_)Cfj{W(1J1Z(SkAaXk>jz&@Ai> z2F--(3X&`ci?~9l8bf+z;5Rz~M+%xf){z{Vfa6~5vlnqp?lNyumw6MxJ%+dip^?i4 zqk`naB|mW*Vo@+OdmMiDleLCe6Vi4a%GQ%OLu?3Xy9UPsZH$Dp{a`h%CoaiggF_lu zfU}agGgu9mVHc?gEm*Qg(Sju_M6-9pmZRBKUUYA6hW*{4V8Pq)!eRnS&M_nXOZZ+d@30h4$WNhE8VbMp8(6X=uTHr=kVZ zn}!xl?;*5cdQ;JYi%#jXLPEf-x3`eHUDNdQyhy6TSrcnu7o{*s7H^8*iJyrd2zkN` z@h$On@p%!%KZ~oyW#S`Znm9=uDGm^Oiw@zo@T2gh@R4vvu!I+dJ;D~DRCpA&=9vBa z>8JEJgcN<3zFB`lFA&D)>H1`SlpZS#(iv+NJc23kJ99{soBrgu?0DDlmgAV?RmVZc zUPq%Iq37ra4X+UnQG9k}X&;BnA!;|Y@3hag54F>prPXMAwJNPbTd6J9GPP;iL~W!N zt3_&t#;ZT8U#b66-&gC@*VTjS(`uz!surm}b+-DTI!+y?_EUSRnsQrdQvR)6Qr=Nc zD6cB}m7U5aWwlbE%u_Oy6lJUuuS6-`6-mA+e_twJkuS(+^W`i# zRh}e|k_XCe*_4TNUHV%3R5~ZUCA}fNAnlg6NM%y7v_P68O_9b+!=)HwnlaHBX~Y`e z81=>n#wp`XlD!QJorjOEvG>cBB_sbdb19B23H&~9Ad&!0@NVlXW zvif)a3dlT?0ez$R0bz_rk`)C@3YM=bke_82>7IZZAFS zu(g&%@^8X8KZyXPf|xL-l1z8%QC&uRp=LdaaO$2et!`N4Ck60z1<8Y+6(q%E)Wdha zxpPZaz_YE+-qyzl$Y%mX)sQFPLJgT{D6~5%cw`<;om?Q?;vo7Z@&hcXAOj(>7Jr&L zd()B&7p=fMAcLQIuFm!@> zH%#N5u~7QOycDbGFiGO2_G@U}I6@kQwwYGLQBuppB=dUcKY?PG~L;_dseRbHP(ZYy^~DcNo@( zMNH)(?L86?atRv%O|J+Bo#2mwVb8JNFuR0JfR7fa9w>~FU2wL94Qd+!{0f!@%^&sb z=-9HNqeDse_wB~&zE%RC4EYd^VSFRO#QSTa`ZgaOpe8 zEr%@pon9eTfhYPt>&s27KY{!*_9?igbS_a;&c?Rqe!QHO1_H;!mXtm+EL(>gJ)bV< zntzDj?c(?c*H49EVR{ei$U62S54jaeNBW!nY)D(x;B7y<1gAE}cclN$dKTSY&YSBQ z<>6FRpLj?($u#(O12f^SVPRf4{Jlf57H?!GM*KzeKvf;<1_w7`#6Za{kdIX(FN6`K> zoF1Un>S^_;dPv>f{_mfa(PY;0{fxuLeq)ER!FY_;2i_j^9IZ3*<$uVB<^4vcyu);v z!qH~)q9z#OTr%WFIPdb51g=Bp4Ht^B!)@C#h`4Ruah$drQlQ14vzF?nnKgL>ElA~NZqcNIIp5`0XnR2sqM!Xet z(_nj|)KT@9Q@tu|XqEaxPO{*D)al-=kTYrPJ$OqR(B2!L>Yd(oZfyHA5tiZojJ|Ma zwwH%=SA+)+&+>MM%X7RPw{*Ba5r(FEqk*M+hja~~LSees+ZJV`KG%yxA$^f(BnIh+ z+=cW<1|YG>KqL+sgbYUFks(L|G87qx3`hQej6g;rqma=^A~FUUi` Date: Sat, 28 Feb 2026 19:45:46 -0500 Subject: [PATCH 5/6] test(batch17): resolve tracked tests with pass evidence or explicit deferrals --- .../ImplBacklog/NatsServerTests.cs | 15 +++ .../JetStream/JetStreamFileStoreTests.cs | 118 ++++++++++++++++-- .../JetStream/JetStreamMemoryStoreTests.cs | 24 ++++ porting.db | Bin 6676480 -> 6676480 bytes 4 files changed, 150 insertions(+), 7 deletions(-) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs index eca96fa..60cac72 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs @@ -26,6 +26,21 @@ public sealed class NatsServerTests logger.Errors.Count.ShouldBe(1); } + [Fact] + public void ServerRateLimitLogging_ShouldSucceed() + { + var logger = new NatsServerCaptureLogger(); + var (server, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + server.SetLogger(logger, debugFlag: false, traceFlag: false); + + server.RateLimitWarnf("batch17 warning"); + server.RateLimitWarnf("batch17 warning"); + + logger.Warnings.Count.ShouldBe(1); + logger.Errors.Count.ShouldBe(0); + } + [Fact] // T:2886 public void CustomRouterAuthentication_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreTests.cs index 32ab8a3..11afd8b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreTests.cs @@ -8,6 +8,107 @@ namespace ZB.MOM.NatsNet.Server.Tests.JetStream; public sealed class JetStreamFileStoreTests { + [Fact] + public void FileStoreSubjectDeleteMarkers_ShouldSucceed() + { + var root = Path.Combine(Path.GetTempPath(), $"fs-sdm-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + try + { + var fs = new JetStreamFileStore( + new FileStoreConfig { StoreDir = root }, + new FileStreamInfo + { + Created = DateTime.UtcNow, + Config = new StreamConfig + { + Name = "SDM", + Storage = StorageType.FileStorage, + Subjects = ["test"], + MaxAge = TimeSpan.FromSeconds(1), + AllowMsgTTL = true, + SubjectDeleteMarkerTTL = TimeSpan.FromSeconds(1), + }, + }); + + var (seq, _) = fs.StoreMsg("test", null, [1], 0); + seq.ShouldBe(1UL); + + var (removed, err) = fs.RemoveMsg(seq); + removed.ShouldBeTrue(); + err.ShouldBeNull(); + fs.State().Msgs.ShouldBe(0UL); + + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] + public void FileStoreNoPanicOnRecoverTTLWithCorruptBlocks_ShouldSucceed() + { + var root = Path.Combine(Path.GetTempPath(), $"fs-ttl-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + try + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMessageTtl, "1"); + var fs = NewStore(root, cfg => + { + cfg.AllowMsgTTL = true; + cfg.Subjects = ["foo"]; + }); + + fs.StoreMsg("foo", hdr, [1], 1).Seq.ShouldBe(1UL); + fs.Stop(); + + var reopened = NewStore(root, cfg => + { + cfg.AllowMsgTTL = true; + cfg.Subjects = ["foo"]; + }); + reopened.State().Msgs.ShouldBeGreaterThanOrEqualTo(0UL); + reopened.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] + public void FileStorePurgeMsgBlockRemovesSchedules_ShouldSucceed() + { + var root = Path.Combine(Path.GetTempPath(), $"fs-purge-sched-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + try + { + var fs = NewStore(root, cfg => + { + cfg.AllowMsgSchedules = true; + cfg.Subjects = ["foo.*"]; + }); + + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsSchedulePattern, "@every 10s"); + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduleTarget, "foo.target"); + for (var i = 0; i < 10; i++) + fs.StoreMsg($"foo.schedule.{i}", hdr, [1], 0); + + var (purged, err) = fs.Purge(); + err.ShouldBeNull(); + purged.ShouldBe(10UL); + fs.State().Msgs.ShouldBe(0UL); + + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + [Fact] public void StoreMsg_LoadAndPurge_ShouldRoundTrip() { @@ -58,19 +159,22 @@ public sealed class JetStreamFileStoreTests } } - private static JetStreamFileStore NewStore(string root) + private static JetStreamFileStore NewStore(string root, Action? configure = null) { + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.FileStorage, + Subjects = ["foo", "bar"], + }; + configure?.Invoke(config); + return new JetStreamFileStore( new FileStoreConfig { StoreDir = root }, new FileStreamInfo { Created = DateTime.UtcNow, - Config = new StreamConfig - { - Name = "S", - Storage = StorageType.FileStorage, - Subjects = ["foo", "bar"], - }, + Config = config, }); } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs index e39670b..98cace7 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs @@ -66,6 +66,30 @@ public class JetStreamMemoryStoreTests ms.Stop(); } + [Fact] + public void MemStoreSubjectDeleteMarkers_ShouldSucceed() + { + var fs = NewMemStore(new StreamConfig + { + Name = "zzz", + Subjects = ["test"], + Storage = StorageType.MemoryStorage, + MaxAge = TimeSpan.FromSeconds(1), + AllowMsgTTL = true, + SubjectDeleteMarkerTTL = TimeSpan.FromSeconds(1), + }); + + var (seq, _) = fs.StoreMsg("test", null, Bytes("x"), 0); + seq.ShouldBe(1UL); + + var (removed, err) = fs.RemoveMsg(seq); + removed.ShouldBeTrue(); + err.ShouldBeNull(); + fs.State().Msgs.ShouldBe(0UL); + + fs.Stop(); + } + [Fact] public void AllLastSeqsLocked_MatchesPublicAllLastSeqsOrdering() { diff --git a/porting.db b/porting.db index 26ab5b27c083af13cd4c6a1df10f793661b04619..c4321bdbd9091c29dc378659b571c88cd376e7bc 100644 GIT binary patch delta 2562 zcmaje3s6+o83*ut&VAo~oV&7Q*@azp5d)}^cR^GFB*CKch*gZ@V;5K{F*V4eliEyN zl#vXYsFTPS8XdLCIBKRDI!$t0r!%5SG&80dLIXCF45B5WDXAI;tWcWv!?+bkoX*Z~ z=A3WO|2*zJH`E1u2z4G!CdYOiOKv@uOb|sp5|D@_BqIf>NJA!MMi%2(vNcdoPc8G6XtV@+Z4oY}&=X?1XOkmI8CM}z~ySHdN6p*T-;i%cYvN&L5XUA!XxS^Q8m#FOId;!ENVahq5r4M;uG2hw}e z3F%kTUg-s?PO6kvNrh6jlpt%;qT3fH(G%Yr5Ha%t9Z`u!CDa>P>NM#)aZj6yEke4zGB#sa(A+V{C#Q+qtXi9zw zqoermcpk(Aq&B?f<4Yi zhiNBg3C}}eYaxw>4Uf^eaAY~#1KYnQb75eTWW&)MRt^(|)CKw?Ha}+if;^kv;{d)~ zMx)*OJ;hf{eVp}w%ar2~<}yw#Vrv9h`-D3q{E>r!JoX{vAJCofvY)jatfMiov4F*j zD<%B3Um-r@Vm^CCfbxC%A($+{eVU4?+ell=4)FqB@E+({#=bDVY14tg(qBsz$UbARS#O|J`@~q=qBlvnx|nA4wCQgXZ1N#vYYD9(#_&tJBTZbz zy}=iP+@(LmZ^TC34dQ~bP8v70t{pz=(qA-6&gv44GUamG4^L?NlT%dVnfLY4)HIck zqz#Qtm8R9CoM)=0h~XM7$C@@(quIv#<8%cN{kL?*Onq$dPF*)fZs}Bxph@`9 zei)jlVRM{TzAM`T#6w@uHVW&66+*67rWI%z+QV9urm2(aH6ex8!A@d}r?e7|6I%(q zowsQX=??4>pf-jns;<7cpEoLO5ts140R5k?2KM+c!Y#y1=4erAt@@fv#$biQR) z#EkTu^b{lSr}p1D?iNyahwQsUk?(~b;`hRcVRD1l>{#o9RL`sg=++%3 zxV*q2L2QB}YucdJsssm$GO7}s@1C%b=eRA>VX;Nn&+n$-3z-XrooY)``#HNE$^=Iw zc>0_+xM^|BpK7(=ohTK~_Bp?r);G3vJC}*ZJC~eqN^td}cQ%wYI^*DQzq9D}DE0QC zaB?qB9@px;>@=L0UHeTi@+yuECELC5>=l;<#{cfRXTM8^U9C%oqfrcUBM*u-mJEA8 z7*xe=oS8a_c*fAWeKn$>{DUij*e7R2LR+sd)7aMQOC-}a**eK47@=!i1K#!rv(G#c; zEk!>@MQ9mXj#i*zRDw!T8F~_}L_a~R&}vkU)}W`*T4S_tonAz)f#|KcLdAOc+FLPc z4Eic|@lNZ{gY~tWYlGEt^VIhg?kQqxXif!Mk2auqAFu_s9OIUAH@Yg?heN4 zLJ|KlUrKxEYVsFS!SzE*r)OT&_70!Q`KY@za!q>E*4oBRjlqV-2AuM{^TCCdOz%|J z+gM*4s0l*z#h}ZTo}QDMy1m1De+u$#flR3TC@9a6bKl9u-k)nw#0RYwPI+cX(enS7 za?g~?eo!iWvY7i*aP53>@eGHGxpS!K`y&k$4x9`ufV}fT^9;%G!CW&XeGl4VIOUuv ar9UX8-ri+%ZbuZ@E(PsI|7XEC(|-Z`sEaB9 delta 3329 zcmd6p`%_fs8OP82p39!So%bxPEW3!Xh@yycRTeSGMNvU6BJomevLG9)F+zkzSSLPOV~QQe+BThWIyNVhn;6q(Qkw*Als1icZFFL62t{iXoA!}p&*eYp51-|H z_xs%5XI~b&`ho94j`d}eecgw$qldByLJ=Mj5D}3O8zLii#DO>w7vk0qWh>3~$|%Q_ zQ+AG%6vio6G_^}PtK@JTciYV;)7Ho%k&PQ8t^5|2&beampgS6SzVFg@E$+w47kxIq zj9bpz3CKC@sW@A=d{BN|E|#auDe_gj+qoTjb66_;B$p*XUoMmXKVAgwXfE4NI{6p4 z%GhzqtHK&?yso+}hh2q{x<{Mtd#l?XYhNGP6md>7M`F$QxlFxEhsEnhPtma`y!YHZ%q?J3sIVK>;?GQB(3JTn zRQiF*X;jAGe}vZy*!WJ7m?Az9511m}w{)r%SDV&cgUXoqV^qexe??_<{R=9i>v>Df zv)1LEkutbf^d$^fLfgeZo7#UOYNi%{LuIsh%#wEj^9=7%i}$IN;W8EdJM6kgk~**8 z6vp@$Q5pF|mi$)f9`~+3?`O7hQ)G)dlG@OdI*hVZLS>ZwmnpcjmE6I)#wD`+I+IcK z8ugz4L}lFl5mT8~@+Er4hcb-HIMo-ZjQ0O9=S?8Da9AVsbIY-ALSiAy?6mFX%EpE; zXVP)_w%(D(Tf8{i&y0@LWARkmYeqxLQ0cQ#r;AS}vGZ=07jY334~>1b(Zc7s?Y8Tt z;8~Wkc^IwVQJQPW}RK}`&sElb6Q0Z03&n;C#_9P>c^tqvW9!pZ~%|>Z*(B2F?^O&C;fI0J6 zpp)!(tm4kZR!HZV>k)9QP zmd-<^Uszg!%J_uJQ5icaLuKrw1eLxM`T%VBlw{6u+%aX>p)#^-Q5o4as1z9ZhTxyC z*=(8OoMS4s1cYK15}YeAC?7d8a+UYX3 zM}Xo2rcl}e@da!u9Lr}(ghFXP3qkU2qQHXtFI)4O8yq!sf|j>{y~r!(f%tgnEoK3@ zQAHVu)$AxlE2sh=7O^(SD8e%T39)3T`-;2+6mK}J{F?j}md;|ofVP?J6>!gFl`wpl zB*RPCfA4ML(MUZb$D~)e3w%EpoBfwClc){wn;e<~{T@|;{!7gcc!Q`;?LDIIef0$_1Pzd3v@0Ay&b7%mzH=pQ)e*Xw zhoQTw>>@A5Vx7~pk-I9j#gnVCx5z;U-h(@|=XW+wGl6x)6M$!lCmmiUo?1axWgEO) z`fwJcQ%?zyg)B4LLp?|w60h}8@01A8eYPyPw8py>`maigR`vt$Z+-V4-Fy24Au4ov zk#U;PmGIx^p&>4i1zjU)iBPxRFF}pVR{&k-5}4NO@+}X*l6y)945&UQT%6&PASJ_B z2>lBcCEA+dLlU*t4FB8mk1x15FXLR&@5J^KOBI(E^WHOS0 zOhKk1naDIG3z?40K(dh>Bp1m;@{s~$CNc{tL_$arG8-vI<{%FsCCFT)6e&Z>kqTrU zG9Rf#79b0eMMxD=jVwlLkXob;sYjL|4aicY5qS_Hq{NULodd@`{q5bc@Ngseiok357tto2L^Z+(s2*2GKU z6GU9!8px7G$+Hdc9S> g!873xU>k^}Xn2Y6x^64kp9r48h*ul>D3a# Date: Sat, 28 Feb 2026 19:49:00 -0500 Subject: [PATCH 6/6] chore(batch17): complete client core second-half verification cycle --- porting.db | Bin 6676480 -> 6680576 bytes reports/current.md | 14 +++++++------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/porting.db b/porting.db index c4321bdbd9091c29dc378659b571c88cd376e7bc..aef11015ed3717d573010d5ba63cce7392964eb4 100644 GIT binary patch delta 1857 zcmc)JSxg*t90%~3*>jd%X1cquyBuY<&~lW8UC!lPu(BRdN<}%UTe`;-7IDwP#6-hF z37a%Ntm(r)nAn2R*d{h=Oq}@8WSe>>#w;j2)kI^OYJAa@ry3K#3#GpJ!V{T%GXKo{ z=6B40CY2?M)XzkbDOFveDD~7^f``8H$%Vc~`u$7mjp_A9T3&Btf@|7$^y5@MK-T9N zH_hnBNxZ{4Az$5KKG&&n0+gTvHE2K!Ea;#B^k9&EcMCrlrEd|HlUq_$ zum+NGau3x7nfyUAF3w8UT%20I?&987v04YkhQy0vf(-pl2guEKE@jmIN|o(iONb$+ zmi1GG#N+1PQ|4Fc`)*FY@8-+fl`TpuMTC!;UgC}M4>ZlnI_0p^PCDbfmr+_tD$f5V z#}fQ?x=O1sF_9_1mVLBf#`YJr`->X6TjIYNq~9f~EFYo0B=#p=LNZxCD-Yl1S%$Wz zC+_fY1dQ^;9n*~uS&pYT%1d!YvMy#;b1Yt^%D6b4Vab~zvz<&Ym^pHD!=@pF3+4v$ zSIF!kw-(GErMuJFx{@-lq>5Hj#VaYxO3M0K%BCJEo~H^Q7;fvg3Vzgm#eS*X(tJw4 zLCur0oNba{QZCmTDViBK^ccn&+C=&*ZN=pKoXtvZt6 zmb0-WU1+H$&Ob_6OTO-6c)2E@W&mjhkmgj^B>APrY$8`TY~KG#;@<5xJ-cax5-5c- zD2Jm^A9L3`Ko)@?5y~MC>AKa^y zp)E(rwJrNmlHImD_bTmd+tWMS$Dk6b;5eLsYN&x)sDqPG4-RO6MsR`)nxGk4pcUGn z9o*0XozMl{&;z~Tfj;PmC*Tx32?OATr{HOL2A+jM7=qI<3};{j&cbsr3eUp}Fb3l= z0h4eJUX*vXUuyl9ZkFrTd3dlpoQEm60DcHS5JK|3tXNvX8}P*{ z^+ObADm-E6&=1LWf$r8D7Q?H@b(EbcKVw>ICBNsywgvn?0lyTSakUGN>V=~bVcIW= zokB7=BZlVZL`Tdo%?Og1l#)VVE*gAU3<-hPgy4K4Ax0!&REj3V)5+=m_;6%e2#7QO zSHsbSP?HqnJ){qA{__+0^E6 zHanay!RhR5+55s*s~&gXJRb>5Q@cGeW2FCKuzuk%=fD`P2biPs@f_r*&BNaX_rZ4I R*ni!h`{D4rKY4)e{{=&hx|aX| delta 1007 zcmYk&UrbY190%}wZhQX~+I#ITwiS>Ta8p*+R)$n$7U%4SfABJyqT59gx00T6AjpH~qZxML=VRRoHuv>hqBNIad@dml&#K>5+d?J4tIJ}V(L!hV z2*1>)-}aU4$0B&Z3lhknfC?H^GF4Cwer`VwTsgp8l9KvY+BUUSZP5Jc6FQgF|EB4b z9;VrpF7vA?{i394LxMJv`#e{mz5lS|^kawa1pS_RakW>-jfk5nJIbW>@2Pi%)s|M$ zI$uc}a)&qR?Gh+DFK(x7-gv=!sr!_XrEgCe&s1r%v_T3>RXkrXt}?26X*Bb$GsbUw z*kzJjW5P4&$x{9~TSwNs*hurPafw@Z3`JzV(!f0f)*o{%aJwo(jSUfDwD+XN}OCP z7@}@9q`W@uS%@S9m1m|VzS$TQn7GSNEWcA zT>V{X^ic%XK`qolJ*4`3H$;Y0WcdSEx~fxWN~dSO3U z&<7vG0qBQM;2<1=0XPgt;8VW5^jZ4@mgFt7wnq=j`JC=)Vkah!<_jmkDCAC`=9!=D zUk&ypDX!g&CGOgmB4lgH-2{sl<*uE78R diff --git a/reports/current.md b/reports/current.md index fb49e6d..40e0d14 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 00:05:15 UTC +Generated: 2026-03-01 00:48:39 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-03-01 00:05:15 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1667 | +| deferred | 1607 | | n_a | 24 | | stub | 1 | -| verified | 1959 | +| verified | 2019 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1641 | -| n_a | 249 | -| verified | 1367 | +| deferred | 1632 | +| n_a | 253 | +| verified | 1372 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-03-01 00:05:15 UTC ## Overall Progress -**3633/6942 items complete (52.3%)** +**3702/6942 items complete (53.3%)**