From 8baa604dce1c99236ce90e3f27e86770bde0108b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 02:30:35 -0500 Subject: [PATCH] batch35: implement and verify feature group C --- .../JetStream/JetStreamClusterTypes.cs | 24 ++++ .../JetStream/JetStreamEngine.cs | 59 ++++++++++ .../JetStream/NatsStream.ClusterRemaining.cs | 56 +++++++++ .../NatsServer.JetStreamClusterRemaining.cs | 109 ++++++++++++++++++ porting.db | Bin 6799360 -> 6799360 bytes 5 files changed, 248 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 55f0303..6aa676a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -32,9 +32,12 @@ namespace ZB.MOM.NatsNet.Server; /// internal sealed class JetStreamCluster { + internal const string JscAllSubj = "$JSC.>"; private static readonly Exception ErrBadStreamMsg = new("jetstream cluster bad replicated stream msg"); private const int CompressThreshold = 8192; private const ulong MsgFlagFromSourceOrMirror = 1UL; + private const int ReplySuffixLength = 10; + private const string Base62 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; /// The meta-controller Raft node. public IRaftNode? Meta { get; set; } @@ -865,6 +868,27 @@ internal sealed class JetStreamCluster internal static byte[] EncodeStreamMsgAllowCompress(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced) => EncodeStreamMsgAllowCompressAndBatch(subject, reply, header, message, sequence, timestamp, sourced, string.Empty, 0, false); + internal static string SyncSubjForStream() => SyncSubject("$JSC.SYNC"); + + internal static string SyncReplySubject() => SyncSubject("$JSC.R"); + + internal static string InfoReplySubject() => SyncSubject("$JSC.R"); + + internal static string SyncAckSubject() => $"{SyncSubject("$JSC.ACK")}.*"; + + internal static string SyncSubject(string prefix) + { + var suffix = new char[ReplySuffixLength]; + var value = Random.Shared.NextInt64(long.MaxValue); + for (var i = 0; i < suffix.Length; i++) + { + suffix[i] = Base62[(int)(value % Base62.Length)]; + value /= Base62.Length; + } + + return $"{prefix}.{new string(suffix)}"; + } + internal static byte[] EncodeStreamMsgAllowCompressAndBatch( string subject, string reply, diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 1ff05d6..f22a899 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -969,6 +969,65 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal string[] StreamAlternates(ClientInfo clientInfo, string streamName) + { + if (_state.Server is not NatsServer server) + return []; + + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var (account, _) = server.LookupAccount(clientInfo.ServiceAccount()); + if (account == null) + return []; + + if (!cluster.Streams.TryGetValue(account.Name, out var accountStreams)) + return []; + + var weights = new Dictionary(StringComparer.Ordinal); + if (clientInfo.Cluster is { Length: > 0 }) + { + for (var i = 0; i < clientInfo.Cluster.Length; i++) + weights[clientInfo.Cluster[i]] = clientInfo.Cluster.Length - i; + } + + if (clientInfo.Alternates is { Count: > 0 }) + { + for (var i = 0; i < clientInfo.Alternates.Count; i++) + weights[clientInfo.Alternates[i]] = clientInfo.Alternates.Count - i; + } + + var candidates = new List<(string Name, string Cluster)>(); + foreach (var assignment in accountStreams.Values) + { + if (assignment.Unsupported != null || assignment.Config == null) + continue; + + if (string.Equals(assignment.Config.Name, streamName, StringComparison.Ordinal) || + string.Equals(assignment.Config.Mirror?.Name, streamName, StringComparison.Ordinal)) + { + candidates.Add((assignment.Config.Name, assignment.Group?.Cluster ?? string.Empty)); + } + } + + if (candidates.Count <= 1) + return []; + + return candidates + .OrderByDescending(c => weights.TryGetValue(c.Cluster, out var weight) ? weight : 0) + .Select(c => c.Name) + .Distinct(StringComparer.Ordinal) + .ToArray(); + } + finally + { + _state.Lock.ExitReadLock(); + } + } + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot() { _state.Lock.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs index 2b7cfbb..aa39009 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs @@ -447,4 +447,60 @@ internal sealed partial class NatsStream ClearCatchupPeer(request.Peer); } } + + internal void CheckClusterInfo(ClusterInfo? clusterInfo) + { + if (clusterInfo?.Replicas == null || clusterInfo.Replicas.Length == 0) + return; + + foreach (var replica in clusterInfo.Replicas) + { + var peer = NatsServer.GetHash(replica.Name); + var lag = LagForCatchupPeer(peer); + if (lag == 0) + continue; + + replica.Current = false; + replica.Lag = lag; + } + } + + internal void HandleClusterStreamInfoRequest(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + _ = message; + _ = Task.Run(() => ProcessClusterStreamInfoRequest(reply)); + } + + internal void ProcessClusterStreamInfoRequest(string reply) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrWhiteSpace(reply)) + return; + + var streamInfo = new StreamInfo + { + Created = CreatedTime(), + State = State(), + Config = Config.Clone(), + Cluster = null, + Sources = SourcesInfo(), + Mirror = MirrorInfo(), + }; + + if (HasCatchupPeers()) + CheckClusterInfo(streamInfo.Cluster); + + _outq.SendMsg(reply, JsonSerializer.SerializeToUtf8Bytes(streamInfo)); + } + finally + { + _mu.ExitReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs index 965fa1c..0ab8ed1 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs @@ -1,9 +1,13 @@ using System.Text.Json; +using System.Threading.Channels; namespace ZB.MOM.NatsNet.Server; public sealed partial class NatsServer { + private Channel? _gcbKickCh; + private const long DefaultMaxTotalCatchupOutBytes = 64L * 1024 * 1024; + internal void JsClusteredConsumerRequest( ClientInfo clientInfo, Account account, @@ -142,4 +146,109 @@ public sealed partial class NatsServer _ = requestMessage; } + + internal long GcbTotal() + { + _gcbMu.EnterReadLock(); + try + { + return _gcbOut; + } + finally + { + _gcbMu.ExitReadLock(); + } + } + + internal bool GcbBelowMax() + { + _gcbMu.EnterReadLock(); + try + { + var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes; + return _gcbOut <= limit; + } + finally + { + _gcbMu.ExitReadLock(); + } + } + + internal void GcbAdd(ref long localOutstandingBytes, long size) + { + _gcbMu.EnterWriteLock(); + try + { + localOutstandingBytes += size; + _gcbOut += size; + var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes; + if (_gcbOut >= limit && _gcbKickCh == null) + _gcbKickCh = Channel.CreateBounded(1); + } + finally + { + _gcbMu.ExitWriteLock(); + } + } + + internal void GcbSubLocked(ref long localOutstandingBytes, long size) + { + if (localOutstandingBytes == 0) + return; + + localOutstandingBytes -= size; + if (localOutstandingBytes < 0) + localOutstandingBytes = 0; + _gcbOut -= size; + if (_gcbOut < 0) + _gcbOut = 0; + + var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes; + if (_gcbKickCh != null && _gcbOut < limit) + { + _gcbKickCh.Writer.TryWrite(true); + _gcbKickCh.Writer.TryComplete(); + _gcbKickCh = null; + } + } + + internal void GcbSub(ref long localOutstandingBytes, long size) + { + _gcbMu.EnterWriteLock(); + try + { + GcbSubLocked(ref localOutstandingBytes, size); + } + finally + { + _gcbMu.ExitWriteLock(); + } + } + + internal void GcbSubLast(ref long localOutstandingBytes) + { + _gcbMu.EnterWriteLock(); + try + { + GcbSubLocked(ref localOutstandingBytes, localOutstandingBytes); + localOutstandingBytes = 0; + } + finally + { + _gcbMu.ExitWriteLock(); + } + } + + internal ChannelReader? CbKickChan() + { + _gcbMu.EnterReadLock(); + try + { + return _gcbKickCh?.Reader; + } + finally + { + _gcbMu.ExitReadLock(); + } + } } diff --git a/porting.db b/porting.db index 3fbeccc42146b779ac1287d570d3d82790507605..5802cd1878ba2f204ff77cd29f8c7b8c4e4642dc 100644 GIT binary patch delta 3111 zcmaKseM}qo8OQH_oX_XCyEEY>z&X4o7)ZbvV;h?Q4GGCIprm76n=~yLjD=RP1xdtf zahl3~5l!i~@pXxlaeZGG%0KM_+ZA~ zje$Pa_j}Lxo_l^j-_POjPm;pOM3(6cMX@6kMHL|Q%T^bC=FPUcGTMFYWSM-jjBcCW z#?Mh^ev-vppTO{6@}KahguQ}W*dqLqpXAR9ZwRx(W5QA4A3{)&<;SVrwdT<%qwpyQ|yZ$yNwyR5C36nI5L^$<2SEf0e^(Zc{Ui?JX_YM}v5S zw!YF5$jG)a|My>R_Xz_8#QU@n8Uum|&;3p;f(t>RO8$Mfu!9~m+@;3Y z7#n*bexEdz#yxS(F5)+`)@&9X50Aekra^;|@GN^-xzHRG(&UhlOyqOul|Sfh6P@_S+D{2|lq@q4RHLj?usGe5T5~=}3Eu!jG)B>tbMa`qyuc$dx zEsDB=szFiFiK-QK36&jHiI0u0!m3=1u3(d*W>BRl>H;dgqRvM`Boa!BgqTQ36A7`A zkT%oA1u2mlCEN&KZ8}J<8(%kkVW`wUqx)FS3~}!exIN504Dat@EpR&}Ne?|ETt2)P z;dE@3r`&CG$dP{T?~F-TO_|IToo}i)W-+#cdm;MM?_ql6(i!0m#1$xwqX{>R4Xq_~K!&-(xS}(<=+)bG@Y`X|8drRE zgnMpfb&aEO4|U5Uqg)RScBe5HPPb%f!92$0t=$(1bCIwv64pn;hDg{L37g&y^YkZL z+>Z|#8wF3c2@P;s415iBBH8&$UBD`zX^$Pf_P>f9@WUG7ribBj4Jm>xH${m)0tGHo zMEA&h{K8i>T)8O{7p3%>YF6qKU~-+U%+glR8vf!9rV zIuEL`PUpa0H?HJT4dLOR+i~7^KNl_VyobC1`~N2LpudS9MRu%~c}Xi2&5FC=jGG(> z?t*9naFS6Na$@zD4$=Uh;~a9Eoea^0`_0=SfthCR5uO6kOg73nqG-gQygOnZG>fDO z2F>CD`75({Q%9&`=)EV{VJ6F50)MiY`KqPpT4Fz=iwSNa!ObVQIe0JAoK;o_wP zcQL`uCb${6n`L%AYp{rj8A(PqASp;Hl7^%s8<7ko6UmYd7U^Hfl2FVG##_b|L%F_I zcT{_cozT3&Tu+)OKI$widg&ALs8ed9;nN|#1*S8z4e+H)S~(6v^f)}-{bFM>MHe31 zTo1XwkT$>rZs|U}pT-nIXO)z__7=lLm2`UL?GJcXJFFb26CP=BrQr5Txp8-Nd8IM1 z-45b z`5y8J@+h(&IeG~N{7YJFCwab)nT#)!vyxVl%)N1_H z7&3fq7&m0@|4vT!#n5*8(hE59g)>>6uQCtui>}QfHS{*^HH}Q=NsA^2WB1W?2;-Qox^c7b*e%iUEtA-wH3lQ%LgnlG385!1;arNyfj|Urq@4txc`v9 z?;(Frdv|YlZ=kfJ-{Y`5;KoECJ#H89>2~$g7gSGQowQt?onJ3&Q)lPa%a*CLSJumx zsS!K6^Q`^*8xbb6ODYTvIF#V5jZL2zeQPq2OFG|$e+4Zu;>gIB#6bm? zniex1N-@*fx9#MD7NaF8Ruqm=P$VWXsZlY;*oH>hCN`RC(b`HIYWv#*Ji<(8`d6ni zU+4Movb&%>@>)~#qFjR-;=Pt2D3!wSubpYxqIIAsH(i|k^saPkSGri#_@EvwL})<; zA6KqBv|C!Ac1CN~7wWV0N%{v`Q0vh5=x^z(^zZ1`^^MlFb7DZ7c*iRzNF}9fO3TYj zOVlHRAPS-PY%VP?TT@nAqRhYxPT=Q16t{_~*5b?J!6b#%3VW5xHYP0X|p58 z`KQrekaMOdf7Ek*H0JCE&h9|Exk)BIYU152`At_Jb8;Xo9 zYo1SvogcslHg9_JsbZxYQ=>hdz!bEnE=+aybR5%mdpd@x!k#)YZLp^fOeOYo6w^w3 z!ry|h#GYQolxI(eG0n9nd>;tYF-}DS1fE;38hrm2HI;Wwb87s1Th&K- z@%pet-nd1L5IoC&<^nC#}>YCNK-+N)U)Rjcbo-s&Eo zz#HSnyLnGw)ub_St2|Rtbg9 ze1Z2FMlx^kNzuHeUv%@!{km7vZ|NWD7xh#65&aeY1zyxI#`2b)A{TEoq^W^?=l}IB z@V|X|xj>#TXXp+3c8AZA?1*wG@__t|d`<3^&&XYJlgxs0tGrb%vkpI@S4dXh95Ghp zK8NJB3QP5?qND~xG6QZ!`sT>)E3ZQ18NzgV+rAk$w)HC+&kylJ-LFq?e$5q&?72 zNlj24>1R+iX*cB0BJKi@kERE!1I5r>o|;$LjJdE`hHq31~W*G{e< z5&IWZL%IunkOra0NS{IZq<=s;q`wORL4JTZfWt)69mpX44Wb{)?5~0#-KMkK z&=t~Opi87%(A%V&PzR}B5CT#Q@y|GHB;A0XC0&QUOZpVrOu7cq60)lhEg|~^q9tVb zL3(cq*~d_Twvhb+q%CB>hiD7g?;zSj)`u$zZ6Ui1(Gs!`AzDKATZopB^+L3S>;tGZ zKwrggK-xm~YbZ!(mmu0g_C7>g$Sy*(gzPK-xm~CPZ7v z-hgNe+3OH(Av+2A>Cfpkh?bCr?m)DJEOZZ|C1hRDEIh8vzGSk{g=h*rP3Tf&l0w%Z zjdXPMeNDMIM(>T$g)w@U``V27gq|_)JU2#X`8<4x_zqvzX-wq>ZAO~@=9mw7gD-MO z#qECG(df7$X#zLGrC2M;Fgzk(9B%AWhcC#F@S^DOI9@j{+_01g<3gB(AMG3N=sWoH z1G=Bj8PM>4`L*dmVN1T4|@)B6R2rymg+X z?lrC-hc&1vN)^vo^7v1z+#IvZX?6I_0+Dy09J$P7OgB?QcNWc6_Jc*Z!A*X8;=_Ex z%Vs36nQjj97 zk)OYbPsV6gbzX2v#WTz{hwpUTXPZUCpK7P(<~2dh!L^*BFU%WklyMo$HBkaeMB`Bs znt+l~3Q9#2(Ihk(rJ?)L6f_k*fF4BCP&)FV3^X0hK$&PJ%0k&_7MhK6&>S=uJ%r|= z`6w5C4lO_rqesx^Q6BmN%12*B3(=R*B2<7Dqesyav=l8v%h3u{h#o^