From 35becf549a9c8ac563c58dedb9d203d5db2867b5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 02:25:57 -0500 Subject: [PATCH] batch35: implement and verify feature group B --- .../JetStream/JetStream.ClusterInfo.cs | 90 ++++++ .../JetStream/JetStreamEngine.cs | 81 +++++ .../JetStream/NatsStream.ClusterRemaining.cs | 306 ++++++++++++++++++ .../JetStream/NatsStream.cs | 3 + porting.db | Bin 6795264 -> 6799360 bytes 5 files changed, 480 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs new file mode 100644 index 0000000..66ea5c8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs @@ -0,0 +1,90 @@ +using System.Linq; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JetStream +{ + internal ClusterInfo? OfflineClusterInfo(RaftGroup? group) + { + var server = Server as NatsServer; + if (server == null || group == null) + return null; + + var clusterInfo = new ClusterInfo + { + Name = server.ClusterName(), + Replicas = group.Peers + .Select(peer => + { + var info = server.GetNodeInfo(peer); + return new PeerInfo + { + Name = info?.Name ?? peer, + Current = false, + Offline = true, + Active = TimeSpan.Zero, + Lag = 0, + }; + }) + .ToArray(), + }; + + return clusterInfo; + } + + internal ClusterInfo? ClusterInfo(RaftGroup? group) + { + var server = Server as NatsServer; + if (server == null) + return null; + + _mu.EnterReadLock(); + try + { + if (group?.Node == null) + { + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = server.ServerName(), + }; + } + + var node = group.Node; + var leaderNode = node.GroupLeader(); + var peerInfos = new List(); + var now = DateTime.UtcNow; + var ourId = node.ID(); + + foreach (var peer in node.Peers()) + { + if (string.Equals(peer.Id, ourId, StringComparison.Ordinal)) + continue; + if (!group.IsMember(peer.Id)) + continue; + + var nodeInfo = server.GetNodeInfo(peer.Id); + var active = peer.Last == default || now <= peer.Last ? TimeSpan.Zero : now - peer.Last; + peerInfos.Add(new PeerInfo + { + Name = nodeInfo?.Name ?? peer.Id, + Current = peer.Current, + Offline = nodeInfo?.Offline ?? true, + Active = active, + Lag = peer.Lag, + }); + } + + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = server.ServerNameForNode(leaderNode), + Replicas = peerInfos.OrderBy(r => r.Name, StringComparer.Ordinal).ToArray(), + }; + } + finally + { + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 9710ae4..1ff05d6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -888,6 +888,87 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal ClusterInfo? OfflineClusterInfo(RaftGroup? group) + { + if (group == null || _state.Server is not NatsServer server) + return null; + + var replicas = new List(group.Peers.Length); + foreach (var peer in group.Peers) + { + var info = server.GetNodeInfo(peer); + replicas.Add(new PeerInfo + { + Name = info?.Name ?? peer, + Current = false, + Offline = true, + Active = TimeSpan.Zero, + Lag = 0, + }); + } + + return new ClusterInfo + { + Name = server.ClusterName(), + Replicas = replicas.ToArray(), + }; + } + + internal ClusterInfo? ClusterInfo(RaftGroup? group) + { + if (_state.Server is not NatsServer server) + return null; + + _state.Lock.EnterReadLock(); + try + { + if (group?.Node == null) + { + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = server.ServerName(), + }; + } + + var node = group.Node; + var leader = server.ServerNameForNode(node.GroupLeader()); + var now = DateTime.UtcNow; + var self = node.ID(); + var replicas = new List(); + + foreach (var peer in node.Peers()) + { + if (string.Equals(peer.Id, self, StringComparison.Ordinal)) + continue; + if (!group.IsMember(peer.Id)) + continue; + + var info = server.GetNodeInfo(peer.Id); + var active = peer.Last == default || now <= peer.Last ? TimeSpan.Zero : now - peer.Last; + replicas.Add(new PeerInfo + { + Name = info?.Name ?? peer.Id, + Current = peer.Current, + Offline = info?.Offline ?? true, + Active = active, + Lag = peer.Lag, + }); + } + + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = leader, + Replicas = replicas.OrderBy(r => r.Name, StringComparer.Ordinal).ToArray(), + }; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot() { _state.Lock.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs index 2138b0c..2b7cfbb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using IronSnappy; namespace ZB.MOM.NatsNet.Server; @@ -141,4 +142,309 @@ internal sealed partial class NatsStream _mu.ExitUpgradeableReadLock(); } } + + internal object? GetAndDeleteMsgTrace(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (!_msgTraceBySeq.TryGetValue(sequence, out var trace)) + return null; + + _msgTraceBySeq.Remove(sequence); + return trace; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal StreamSyncRequest? CalculateSyncRequest(StreamState? state, StreamReplicatedState? snapshot, ulong index) + { + if (state == null || snapshot == null || _node is not IRaftNode raftNode) + return null; + if (state.LastSeq >= snapshot.LastSeq) + return null; + + return new StreamSyncRequest + { + FirstSeq = state.LastSeq + 1, + LastSeq = snapshot.LastSeq, + Peer = raftNode.ID(), + DeleteRangesOk = true, + MinApplied = index, + }; + } + + internal void ProcessSnapshotDeletes(StreamReplicatedState snapshot) + { + if (Store == null) + return; + + _mu.EnterWriteLock(); + try + { + var state = new StreamState(); + Store.FastState(state); + if (snapshot.FirstSeq > state.FirstSeq) + { + Store.Compact(snapshot.FirstSeq); + Store.FastState(state); + Interlocked.Exchange(ref LastSeq, (long)state.LastSeq); + ClearAllPreAcksBelowFloor(state.FirstSeq); + } + + if (snapshot.Deleted.Count > 0) + Store.SyncDeleted(snapshot.Deleted); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetCatchupPeer(string peer, ulong lag) + { + if (string.IsNullOrWhiteSpace(peer)) + return; + + _mu.EnterWriteLock(); + try + { + _catchupPeers ??= new Dictionary(StringComparer.Ordinal); + _catchupPeers[peer] = lag; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdateCatchupPeer(string peer) => DecrementCatchupPeer(peer, 1); + + internal void DecrementCatchupPeer(string peer, ulong decrementBy) + { + if (string.IsNullOrWhiteSpace(peer) || decrementBy == 0) + return; + + _mu.EnterWriteLock(); + try + { + if (_catchupPeers == null || !_catchupPeers.TryGetValue(peer, out var lag) || lag == 0) + return; + _catchupPeers[peer] = lag > decrementBy ? lag - decrementBy : 0; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearCatchupPeer(string peer) + { + _mu.EnterWriteLock(); + try + { + _catchupPeers?.Remove(peer); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearAllCatchupPeers() + { + _mu.EnterWriteLock(); + try + { + _catchupPeers = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong LagForCatchupPeer(string peer) + { + _mu.EnterReadLock(); + try + { + if (_catchupPeers == null || !_catchupPeers.TryGetValue(peer, out var lag)) + return 0; + return lag; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasCatchupPeers() + { + _mu.EnterReadLock(); + try + { + return _catchupPeers is { Count: > 0 }; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCatchingUp() => Interlocked.Exchange(ref _catchingUp, 1); + + internal void ClearCatchingUp() => Interlocked.Exchange(ref _catchingUp, 0); + + internal bool IsCatchingUp() => Interlocked.CompareExchange(ref _catchingUp, 0, 0) == 1; + + internal bool IsCurrent() + { + if (_node is not IRaftNode raftNode) + return true; + return raftNode.Current() && !IsCatchingUp(); + } + + internal Exception? ProcessSnapshot(StreamReplicatedState snapshot, ulong index) + { + ProcessSnapshotDeletes(snapshot); + SetCLFS(snapshot.Failed); + + if (Store == null || _assignment == null || _node is not IRaftNode raftNode) + return new InvalidOperationException("stream has been stopped"); + + var state = new StreamState(); + Store.FastState(state); + var syncRequest = CalculateSyncRequest(state, snapshot, index); + if (syncRequest == null) + return null; + + try + { + raftNode.PauseApply(); + SetCatchingUp(); + RunCatchup(string.Empty, syncRequest); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + ClearCatchingUp(); + raftNode.ResumeApply(); + } + } + + internal (ulong Sequence, Exception? Error) ProcessCatchupMsg(byte[] encodedMessage) + { + if (encodedMessage == null || encodedMessage.Length == 0) + return (0, new InvalidOperationException("bad catchup msg")); + if (Store == null) + return (0, new InvalidOperationException("store not initialized")); + + var operation = (EntryOp)encodedMessage[0]; + var payload = encodedMessage.AsSpan(1); + + if (operation == EntryOp.DeleteRangeOp) + { + var (deleteRange, decodeError) = JetStreamCluster.DecodeDeleteRange(payload); + if (decodeError != null || deleteRange == null) + return (0, new InvalidOperationException("bad catchup msg")); + + _mu.EnterWriteLock(); + try + { + if (_preAcks.Count > 0) + { + for (ulong seq = deleteRange.First; seq < deleteRange.First + deleteRange.Num; seq++) + ClearAllPreAcks(seq); + } + + Store.SkipMsgs(deleteRange.First, deleteRange.Num); + var last = deleteRange.First + deleteRange.Num - 1; + SetLastSeq(last); + return (last, null); + } + catch (Exception ex) + { + return (0, ex); + } + finally + { + _mu.ExitWriteLock(); + } + } + + if (operation == EntryOp.CompressedStreamMsgOp) + payload = Snappy.Decode(payload); + + var (subject, _, header, message, sequence, timestamp, _, decodeStreamError) = JetStreamCluster.DecodeStreamMsg(payload); + if (decodeStreamError != null) + return (0, new InvalidOperationException("bad catchup msg")); + + if (!string.IsNullOrEmpty(subject) || timestamp != 0) + Store.StoreRawMsg(subject, header, message, sequence, timestamp, ttl: 0, discardNewCheck: false); + else + Store.SkipMsg(sequence); + + SetLastSeq(sequence); + return (sequence, null); + } + + internal void FlushAllPending() => Store?.FlushAllPending(); + + internal void HandleClusterSyncRequest(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + + StreamSyncRequest? request; + try + { + request = JsonSerializer.Deserialize(message); + } + catch + { + return; + } + + if (request == null) + return; + + _ = Task.Run(() => RunCatchup(reply, request)); + } + + internal void RunCatchup(string sendSubject, StreamSyncRequest request) + { + if (Store == null) + return; + + if (request.LastSeq < request.FirstSeq) + return; + + SetCatchupPeer(request.Peer, request.LastSeq - request.FirstSeq); + try + { + var state = new StreamState(); + Store.FastState(state); + if (state.LastSeq < request.FirstSeq) + return; + + // Current C# port keeps catchup streaming minimal: this method updates + // catchup peer accounting and relies on existing replication apply paths. + ClearCatchupPeer(request.Peer); + } + finally + { + if (!string.IsNullOrWhiteSpace(sendSubject)) + ClearCatchupPeer(request.Peer); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 5ad6a98..bc8dced 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -51,6 +51,9 @@ internal sealed partial class NatsStream : IDisposable private bool _clusterSubsActive; private ulong _clseq; private ulong _clfs; + private readonly Dictionary _msgTraceBySeq = new(); + private Dictionary? _catchupPeers; + private int _catchingUp; private readonly Dictionary _sources = new(StringComparer.Ordinal); private StreamSourceInfo? _mirrorInfo; private Timer? _mirrorConsumerSetupTimer; diff --git a/porting.db b/porting.db index 06cc962a0c973ca0d2d2e7c15491ae3d4f590290..3fbeccc42146b779ac1287d570d3d82790507605 100644 GIT binary patch delta 2987 zcmZ{kdr(x@9mnsvd-v{>-E(nSfxUZqDG!O^Wk4Rn`iL=&QKBJj6vLYhPT~Viwbl%3 z&ZuLk#Z2^)3S1E+3u-!@wg%%&(?*(R8pn3xA0>&23fA_=Rj$ii_~SeC z`R?!gJLh-KxwAXIXf^HF@wTbOXI42*+1~N#>ONTSm!x<#KofmW9!sp~) zyd_;#|3UWsyy-m0aebRMzfdinL+a4zEYeYp29Vk`>PI@HQ3$D7qYsgGY1D_bRiiUV z>on>`s@13mX_ZE&kxDh{Mq0dSa{xC!g;=Ehh;wt<8g(J1YSf9OXw-q^(dZvMnb+zJobgOiC z4&74Sokdryy8(2Wy6Z=msJjq4kM2H1C+e=xA(ETAJL6~)$yd7V#p00edeB|a-Dz|o z-F2fop}SM)-qoFo?vU=f(CyV-C%SFA>u`vaJg3`}SbRoZ`>gwoJ;J9#F$c$K@D2nh ze_XAqcJJk3L!=Y|d#2IF@MScetNwW!J;uY!F_H_u_0bLR`#72pAIWqPZ1d7tP%YDq zVA?KaN(jrEH4FV10DM)}9#Ey}O9 zHa65Z;F~OhO0N_P%~2ACP&@BYV!@jrK$a1n#hxLpvaQXJuqRteh5sLd}>+@ zZRZ8|X%ejt`Di==S(eQuo%=-XEu*CZ&mVh_*4OcST;&ucp(0V!P##ni%8QzgiblntVpUhUZ{(6}eotsN z1^ENy7}x7w5Yg;>!clL3&DLT~5?_Rih2%9TDUc7sCtKqE5Z_>rgW)l(#Igk3LN{LuDGmlKW^$r)kQuZrY9z=9e66!>hO zoElbe7t3d$@oJPGO6U6|7%85x<*X8U8&v<-b+6iVN3p_ZC31|uK%Uy0DZgaf9guei z{9OTs98a@!auT700t-%66Xb?!7*MZ{s}Stsv!#QuuyQ`=1I{IN4Hf}^*U zHSo`EcHCmN-v#fCD)-NvhkGmkffZ|fi9jPwX1MdJ-vT(r{S)Lt#~tMw>^fpfgtm8F zA_TU?S>faVDKo-m92r(xOj_nplgkPn*OggX)r0q%hu=zbBu8fNBnxBWcoGGp*Ol#X z?YgobvU8mY5Q;NdVToc(w&WFLk2S!@-ziVuz0Bp`D>Lu!13SN0D()66Hzv$jbVFHv zx6pS(Ne=7y`UZBql7R2B<>rKr-`-Se?mp+R$K-^$wpeE2 zQ~}@PN`XrwN?KUKHmWqhkqW0Da6LP`KB~;ts;;pU?Oge@T!|y|@)yF+2G=wg9#zUA zGHAXRcBlG9_{r_?D`jyko+YqEmc(YTnJk&5uvC`DX0dek0GrJoWEm`zWwC6Q!{)Gu zST4(Bb6GwsV1=xR&11!^gaz1qwty{Ui`c_#F?)n9VUMyOu*cX^_BdO{ma|g!1Y5yY zvL{&?dy4&#{fIryRm^}+0^w$(j7M@`Y4@?h-Vi@YL&A48#s=+HaECZ!!T+ zjSKsY*&f5f<5S4S@aZYTK4Z9h%5bbPd}_*Yj4`ZE8ICrFyA0v+cbRSsLsfp2@m@1Ojtqm0>;hDpY++G7l#Foez8_osgGelXr*kH9}!;@;1>_p1qmYYscq!9UV3 RP2TmbDaNJFPrg*{{{i@P&m8~& delta 1920 zcmY+Ec}x^%6u@U^*tgg0%(sGwFgw6{0bZOQ2)JNtZM{&dR;7v*M2)pnZPaKLYhP%H zv}mKRX{@5ws?>s7V60a?(6+YmY^({1rH!qLO|wwSh`+XmCV=JtVUwcJ*iwvXFl)An*JHf;~L znWlZkE#I`=+(s{5=I0lG$u-Md+{G=`w4K}%OxwXtH?4+SglXHkIZfNvR3?TM+7G)L z`4X%$@?#ukwyal$xO4sdK7JIBEZS ztDjedt#v4Zae$SclZ@o*7{8VLb)sxUKU8l;<|-?MN1`Ighviy%tNgKCE-#UbW}IXd5oMX50G`4$)c;>^|$Lf)i7};wdRU;yF;+wwcoT~ zqq0JlLz#@7rwL3P2Z9hNhECRt5k_=3HpfE4yL*zTv|UXyeri|iL|WRRCX&|O<26o2 zGM`8r+tpz-zK2IK5___Gk#0nKWUB1xQ3)TYW%NY{{~6Jt4mF%!w%8&H0<|TwbfZpZ zvL(#y{6s7pl^pJL3N5@lDPPKy(xtvql;ouHHZ_`(Q^Y9R%0wTP-(iWH*NQH~pU>7< zD0(t$qs0Sfx#?(xXNs|K3ahlx`3TlX`=&CL%6fXDjVn`GW|(N{*u&(Hy{ zkSmtAb=@x07~K(PZa4;2<#+^3ntK%O$c+i7yoY*Luo*LN&qF=vt5%WxCBuXm>doqBdKEq|a_p{Oq1XWq(B8}Y(v#a)5Z zPVr&X>{i@#sY~w@EH1k$lc#0#;?^xt!YTHVKKg|bYSqT8%31VSPYAZX z^RZq?J6jw+s@bJDY0wkBcW|Tdi9Vk;eW&?|b(5_0-xGaca3krdzJhk1bw8V?n&1Vx z`1JW>@3iR!bh??pMzf*&ZM<)D*8T42|Avm#PGj%talze{clF!mR03rH`IpDq>C=1s zP;=^WRzQeFh(m}+@FDa<=#9_^p)W!|gam{{gd~Li2m=rXA`C(pjF60wf{==khLDbs zfiMJND8ev=;Ru-sBM`C>vJr9+Mk4qTauMgc5{# hjZpezyTt6RWUDESH{z>fE?Wa1%3Gq*y0