diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs new file mode 100644 index 0000000..66ea5c8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs @@ -0,0 +1,90 @@ +using System.Linq; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JetStream +{ + internal ClusterInfo? OfflineClusterInfo(RaftGroup? group) + { + var server = Server as NatsServer; + if (server == null || group == null) + return null; + + var clusterInfo = new ClusterInfo + { + Name = server.ClusterName(), + Replicas = group.Peers + .Select(peer => + { + var info = server.GetNodeInfo(peer); + return new PeerInfo + { + Name = info?.Name ?? peer, + Current = false, + Offline = true, + Active = TimeSpan.Zero, + Lag = 0, + }; + }) + .ToArray(), + }; + + return clusterInfo; + } + + internal ClusterInfo? ClusterInfo(RaftGroup? group) + { + var server = Server as NatsServer; + if (server == null) + return null; + + _mu.EnterReadLock(); + try + { + if (group?.Node == null) + { + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = server.ServerName(), + }; + } + + var node = group.Node; + var leaderNode = node.GroupLeader(); + var peerInfos = new List(); + var now = DateTime.UtcNow; + var ourId = node.ID(); + + foreach (var peer in node.Peers()) + { + if (string.Equals(peer.Id, ourId, StringComparison.Ordinal)) + continue; + if (!group.IsMember(peer.Id)) + continue; + + var nodeInfo = server.GetNodeInfo(peer.Id); + var active = peer.Last == default || now <= peer.Last ? TimeSpan.Zero : now - peer.Last; + peerInfos.Add(new PeerInfo + { + Name = nodeInfo?.Name ?? peer.Id, + Current = peer.Current, + Offline = nodeInfo?.Offline ?? true, + Active = active, + Lag = peer.Lag, + }); + } + + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = server.ServerNameForNode(leaderNode), + Replicas = peerInfos.OrderBy(r => r.Name, StringComparer.Ordinal).ToArray(), + }; + } + finally + { + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 9710ae4..1ff05d6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -888,6 +888,87 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal ClusterInfo? OfflineClusterInfo(RaftGroup? group) + { + if (group == null || _state.Server is not NatsServer server) + return null; + + var replicas = new List(group.Peers.Length); + foreach (var peer in group.Peers) + { + var info = server.GetNodeInfo(peer); + replicas.Add(new PeerInfo + { + Name = info?.Name ?? peer, + Current = false, + Offline = true, + Active = TimeSpan.Zero, + Lag = 0, + }); + } + + return new ClusterInfo + { + Name = server.ClusterName(), + Replicas = replicas.ToArray(), + }; + } + + internal ClusterInfo? ClusterInfo(RaftGroup? group) + { + if (_state.Server is not NatsServer server) + return null; + + _state.Lock.EnterReadLock(); + try + { + if (group?.Node == null) + { + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = server.ServerName(), + }; + } + + var node = group.Node; + var leader = server.ServerNameForNode(node.GroupLeader()); + var now = DateTime.UtcNow; + var self = node.ID(); + var replicas = new List(); + + foreach (var peer in node.Peers()) + { + if (string.Equals(peer.Id, self, StringComparison.Ordinal)) + continue; + if (!group.IsMember(peer.Id)) + continue; + + var info = server.GetNodeInfo(peer.Id); + var active = peer.Last == default || now <= peer.Last ? TimeSpan.Zero : now - peer.Last; + replicas.Add(new PeerInfo + { + Name = info?.Name ?? peer.Id, + Current = peer.Current, + Offline = info?.Offline ?? true, + Active = active, + Lag = peer.Lag, + }); + } + + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = leader, + Replicas = replicas.OrderBy(r => r.Name, StringComparer.Ordinal).ToArray(), + }; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot() { _state.Lock.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs index 2138b0c..2b7cfbb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using IronSnappy; namespace ZB.MOM.NatsNet.Server; @@ -141,4 +142,309 @@ internal sealed partial class NatsStream _mu.ExitUpgradeableReadLock(); } } + + internal object? GetAndDeleteMsgTrace(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (!_msgTraceBySeq.TryGetValue(sequence, out var trace)) + return null; + + _msgTraceBySeq.Remove(sequence); + return trace; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal StreamSyncRequest? CalculateSyncRequest(StreamState? state, StreamReplicatedState? snapshot, ulong index) + { + if (state == null || snapshot == null || _node is not IRaftNode raftNode) + return null; + if (state.LastSeq >= snapshot.LastSeq) + return null; + + return new StreamSyncRequest + { + FirstSeq = state.LastSeq + 1, + LastSeq = snapshot.LastSeq, + Peer = raftNode.ID(), + DeleteRangesOk = true, + MinApplied = index, + }; + } + + internal void ProcessSnapshotDeletes(StreamReplicatedState snapshot) + { + if (Store == null) + return; + + _mu.EnterWriteLock(); + try + { + var state = new StreamState(); + Store.FastState(state); + if (snapshot.FirstSeq > state.FirstSeq) + { + Store.Compact(snapshot.FirstSeq); + Store.FastState(state); + Interlocked.Exchange(ref LastSeq, (long)state.LastSeq); + ClearAllPreAcksBelowFloor(state.FirstSeq); + } + + if (snapshot.Deleted.Count > 0) + Store.SyncDeleted(snapshot.Deleted); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetCatchupPeer(string peer, ulong lag) + { + if (string.IsNullOrWhiteSpace(peer)) + return; + + _mu.EnterWriteLock(); + try + { + _catchupPeers ??= new Dictionary(StringComparer.Ordinal); + _catchupPeers[peer] = lag; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdateCatchupPeer(string peer) => DecrementCatchupPeer(peer, 1); + + internal void DecrementCatchupPeer(string peer, ulong decrementBy) + { + if (string.IsNullOrWhiteSpace(peer) || decrementBy == 0) + return; + + _mu.EnterWriteLock(); + try + { + if (_catchupPeers == null || !_catchupPeers.TryGetValue(peer, out var lag) || lag == 0) + return; + _catchupPeers[peer] = lag > decrementBy ? lag - decrementBy : 0; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearCatchupPeer(string peer) + { + _mu.EnterWriteLock(); + try + { + _catchupPeers?.Remove(peer); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearAllCatchupPeers() + { + _mu.EnterWriteLock(); + try + { + _catchupPeers = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong LagForCatchupPeer(string peer) + { + _mu.EnterReadLock(); + try + { + if (_catchupPeers == null || !_catchupPeers.TryGetValue(peer, out var lag)) + return 0; + return lag; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasCatchupPeers() + { + _mu.EnterReadLock(); + try + { + return _catchupPeers is { Count: > 0 }; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCatchingUp() => Interlocked.Exchange(ref _catchingUp, 1); + + internal void ClearCatchingUp() => Interlocked.Exchange(ref _catchingUp, 0); + + internal bool IsCatchingUp() => Interlocked.CompareExchange(ref _catchingUp, 0, 0) == 1; + + internal bool IsCurrent() + { + if (_node is not IRaftNode raftNode) + return true; + return raftNode.Current() && !IsCatchingUp(); + } + + internal Exception? ProcessSnapshot(StreamReplicatedState snapshot, ulong index) + { + ProcessSnapshotDeletes(snapshot); + SetCLFS(snapshot.Failed); + + if (Store == null || _assignment == null || _node is not IRaftNode raftNode) + return new InvalidOperationException("stream has been stopped"); + + var state = new StreamState(); + Store.FastState(state); + var syncRequest = CalculateSyncRequest(state, snapshot, index); + if (syncRequest == null) + return null; + + try + { + raftNode.PauseApply(); + SetCatchingUp(); + RunCatchup(string.Empty, syncRequest); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + ClearCatchingUp(); + raftNode.ResumeApply(); + } + } + + internal (ulong Sequence, Exception? Error) ProcessCatchupMsg(byte[] encodedMessage) + { + if (encodedMessage == null || encodedMessage.Length == 0) + return (0, new InvalidOperationException("bad catchup msg")); + if (Store == null) + return (0, new InvalidOperationException("store not initialized")); + + var operation = (EntryOp)encodedMessage[0]; + var payload = encodedMessage.AsSpan(1); + + if (operation == EntryOp.DeleteRangeOp) + { + var (deleteRange, decodeError) = JetStreamCluster.DecodeDeleteRange(payload); + if (decodeError != null || deleteRange == null) + return (0, new InvalidOperationException("bad catchup msg")); + + _mu.EnterWriteLock(); + try + { + if (_preAcks.Count > 0) + { + for (ulong seq = deleteRange.First; seq < deleteRange.First + deleteRange.Num; seq++) + ClearAllPreAcks(seq); + } + + Store.SkipMsgs(deleteRange.First, deleteRange.Num); + var last = deleteRange.First + deleteRange.Num - 1; + SetLastSeq(last); + return (last, null); + } + catch (Exception ex) + { + return (0, ex); + } + finally + { + _mu.ExitWriteLock(); + } + } + + if (operation == EntryOp.CompressedStreamMsgOp) + payload = Snappy.Decode(payload); + + var (subject, _, header, message, sequence, timestamp, _, decodeStreamError) = JetStreamCluster.DecodeStreamMsg(payload); + if (decodeStreamError != null) + return (0, new InvalidOperationException("bad catchup msg")); + + if (!string.IsNullOrEmpty(subject) || timestamp != 0) + Store.StoreRawMsg(subject, header, message, sequence, timestamp, ttl: 0, discardNewCheck: false); + else + Store.SkipMsg(sequence); + + SetLastSeq(sequence); + return (sequence, null); + } + + internal void FlushAllPending() => Store?.FlushAllPending(); + + internal void HandleClusterSyncRequest(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + + StreamSyncRequest? request; + try + { + request = JsonSerializer.Deserialize(message); + } + catch + { + return; + } + + if (request == null) + return; + + _ = Task.Run(() => RunCatchup(reply, request)); + } + + internal void RunCatchup(string sendSubject, StreamSyncRequest request) + { + if (Store == null) + return; + + if (request.LastSeq < request.FirstSeq) + return; + + SetCatchupPeer(request.Peer, request.LastSeq - request.FirstSeq); + try + { + var state = new StreamState(); + Store.FastState(state); + if (state.LastSeq < request.FirstSeq) + return; + + // Current C# port keeps catchup streaming minimal: this method updates + // catchup peer accounting and relies on existing replication apply paths. + ClearCatchupPeer(request.Peer); + } + finally + { + if (!string.IsNullOrWhiteSpace(sendSubject)) + ClearCatchupPeer(request.Peer); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 5ad6a98..bc8dced 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -51,6 +51,9 @@ internal sealed partial class NatsStream : IDisposable private bool _clusterSubsActive; private ulong _clseq; private ulong _clfs; + private readonly Dictionary _msgTraceBySeq = new(); + private Dictionary? _catchupPeers; + private int _catchingUp; private readonly Dictionary _sources = new(StringComparer.Ordinal); private StreamSourceInfo? _mirrorInfo; private Timer? _mirrorConsumerSetupTimer; diff --git a/porting.db b/porting.db index 06cc962..3fbeccc 100644 Binary files a/porting.db and b/porting.db differ