batch35: implement and verify feature group B

This commit is contained in:
Joseph Doherty
2026-03-01 02:25:57 -05:00
parent cf9f40ab0c
commit 35becf549a
5 changed files with 480 additions and 0 deletions

View File

@@ -0,0 +1,90 @@
using System.Linq;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class JetStream
{
internal ClusterInfo? OfflineClusterInfo(RaftGroup? group)
{
var server = Server as NatsServer;
if (server == null || group == null)
return null;
var clusterInfo = new ClusterInfo
{
Name = server.ClusterName(),
Replicas = group.Peers
.Select(peer =>
{
var info = server.GetNodeInfo(peer);
return new PeerInfo
{
Name = info?.Name ?? peer,
Current = false,
Offline = true,
Active = TimeSpan.Zero,
Lag = 0,
};
})
.ToArray(),
};
return clusterInfo;
}
internal ClusterInfo? ClusterInfo(RaftGroup? group)
{
var server = Server as NatsServer;
if (server == null)
return null;
_mu.EnterReadLock();
try
{
if (group?.Node == null)
{
return new ClusterInfo
{
Name = server.CachedClusterName(),
Leader = server.ServerName(),
};
}
var node = group.Node;
var leaderNode = node.GroupLeader();
var peerInfos = new List<PeerInfo>();
var now = DateTime.UtcNow;
var ourId = node.ID();
foreach (var peer in node.Peers())
{
if (string.Equals(peer.Id, ourId, StringComparison.Ordinal))
continue;
if (!group.IsMember(peer.Id))
continue;
var nodeInfo = server.GetNodeInfo(peer.Id);
var active = peer.Last == default || now <= peer.Last ? TimeSpan.Zero : now - peer.Last;
peerInfos.Add(new PeerInfo
{
Name = nodeInfo?.Name ?? peer.Id,
Current = peer.Current,
Offline = nodeInfo?.Offline ?? true,
Active = active,
Lag = peer.Lag,
});
}
return new ClusterInfo
{
Name = server.CachedClusterName(),
Leader = server.ServerNameForNode(leaderNode),
Replicas = peerInfos.OrderBy(r => r.Name, StringComparer.Ordinal).ToArray(),
};
}
finally
{
_mu.ExitReadLock();
}
}
}

View File

@@ -888,6 +888,87 @@ internal sealed class JetStreamEngine(JetStream state)
}
}
internal ClusterInfo? OfflineClusterInfo(RaftGroup? group)
{
if (group == null || _state.Server is not NatsServer server)
return null;
var replicas = new List<PeerInfo>(group.Peers.Length);
foreach (var peer in group.Peers)
{
var info = server.GetNodeInfo(peer);
replicas.Add(new PeerInfo
{
Name = info?.Name ?? peer,
Current = false,
Offline = true,
Active = TimeSpan.Zero,
Lag = 0,
});
}
return new ClusterInfo
{
Name = server.ClusterName(),
Replicas = replicas.ToArray(),
};
}
internal ClusterInfo? ClusterInfo(RaftGroup? group)
{
if (_state.Server is not NatsServer server)
return null;
_state.Lock.EnterReadLock();
try
{
if (group?.Node == null)
{
return new ClusterInfo
{
Name = server.CachedClusterName(),
Leader = server.ServerName(),
};
}
var node = group.Node;
var leader = server.ServerNameForNode(node.GroupLeader());
var now = DateTime.UtcNow;
var self = node.ID();
var replicas = new List<PeerInfo>();
foreach (var peer in node.Peers())
{
if (string.Equals(peer.Id, self, StringComparison.Ordinal))
continue;
if (!group.IsMember(peer.Id))
continue;
var info = server.GetNodeInfo(peer.Id);
var active = peer.Last == default || now <= peer.Last ? TimeSpan.Zero : now - peer.Last;
replicas.Add(new PeerInfo
{
Name = info?.Name ?? peer.Id,
Current = peer.Current,
Offline = info?.Offline ?? true,
Active = active,
Lag = peer.Lag,
});
}
return new ClusterInfo
{
Name = server.CachedClusterName(),
Leader = leader,
Replicas = replicas.OrderBy(r => r.Name, StringComparer.Ordinal).ToArray(),
};
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot()
{
_state.Lock.EnterReadLock();

View File

@@ -1,4 +1,5 @@
using System.Text.Json;
using IronSnappy;
namespace ZB.MOM.NatsNet.Server;
@@ -141,4 +142,309 @@ internal sealed partial class NatsStream
_mu.ExitUpgradeableReadLock();
}
}
internal object? GetAndDeleteMsgTrace(ulong sequence)
{
_mu.EnterWriteLock();
try
{
if (!_msgTraceBySeq.TryGetValue(sequence, out var trace))
return null;
_msgTraceBySeq.Remove(sequence);
return trace;
}
finally
{
_mu.ExitWriteLock();
}
}
internal StreamSyncRequest? CalculateSyncRequest(StreamState? state, StreamReplicatedState? snapshot, ulong index)
{
if (state == null || snapshot == null || _node is not IRaftNode raftNode)
return null;
if (state.LastSeq >= snapshot.LastSeq)
return null;
return new StreamSyncRequest
{
FirstSeq = state.LastSeq + 1,
LastSeq = snapshot.LastSeq,
Peer = raftNode.ID(),
DeleteRangesOk = true,
MinApplied = index,
};
}
internal void ProcessSnapshotDeletes(StreamReplicatedState snapshot)
{
if (Store == null)
return;
_mu.EnterWriteLock();
try
{
var state = new StreamState();
Store.FastState(state);
if (snapshot.FirstSeq > state.FirstSeq)
{
Store.Compact(snapshot.FirstSeq);
Store.FastState(state);
Interlocked.Exchange(ref LastSeq, (long)state.LastSeq);
ClearAllPreAcksBelowFloor(state.FirstSeq);
}
if (snapshot.Deleted.Count > 0)
Store.SyncDeleted(snapshot.Deleted);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SetCatchupPeer(string peer, ulong lag)
{
if (string.IsNullOrWhiteSpace(peer))
return;
_mu.EnterWriteLock();
try
{
_catchupPeers ??= new Dictionary<string, ulong>(StringComparer.Ordinal);
_catchupPeers[peer] = lag;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void UpdateCatchupPeer(string peer) => DecrementCatchupPeer(peer, 1);
internal void DecrementCatchupPeer(string peer, ulong decrementBy)
{
if (string.IsNullOrWhiteSpace(peer) || decrementBy == 0)
return;
_mu.EnterWriteLock();
try
{
if (_catchupPeers == null || !_catchupPeers.TryGetValue(peer, out var lag) || lag == 0)
return;
_catchupPeers[peer] = lag > decrementBy ? lag - decrementBy : 0;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ClearCatchupPeer(string peer)
{
_mu.EnterWriteLock();
try
{
_catchupPeers?.Remove(peer);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ClearAllCatchupPeers()
{
_mu.EnterWriteLock();
try
{
_catchupPeers = null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ulong LagForCatchupPeer(string peer)
{
_mu.EnterReadLock();
try
{
if (_catchupPeers == null || !_catchupPeers.TryGetValue(peer, out var lag))
return 0;
return lag;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool HasCatchupPeers()
{
_mu.EnterReadLock();
try
{
return _catchupPeers is { Count: > 0 };
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetCatchingUp() => Interlocked.Exchange(ref _catchingUp, 1);
internal void ClearCatchingUp() => Interlocked.Exchange(ref _catchingUp, 0);
internal bool IsCatchingUp() => Interlocked.CompareExchange(ref _catchingUp, 0, 0) == 1;
internal bool IsCurrent()
{
if (_node is not IRaftNode raftNode)
return true;
return raftNode.Current() && !IsCatchingUp();
}
internal Exception? ProcessSnapshot(StreamReplicatedState snapshot, ulong index)
{
ProcessSnapshotDeletes(snapshot);
SetCLFS(snapshot.Failed);
if (Store == null || _assignment == null || _node is not IRaftNode raftNode)
return new InvalidOperationException("stream has been stopped");
var state = new StreamState();
Store.FastState(state);
var syncRequest = CalculateSyncRequest(state, snapshot, index);
if (syncRequest == null)
return null;
try
{
raftNode.PauseApply();
SetCatchingUp();
RunCatchup(string.Empty, syncRequest);
return null;
}
catch (Exception ex)
{
return ex;
}
finally
{
ClearCatchingUp();
raftNode.ResumeApply();
}
}
internal (ulong Sequence, Exception? Error) ProcessCatchupMsg(byte[] encodedMessage)
{
if (encodedMessage == null || encodedMessage.Length == 0)
return (0, new InvalidOperationException("bad catchup msg"));
if (Store == null)
return (0, new InvalidOperationException("store not initialized"));
var operation = (EntryOp)encodedMessage[0];
var payload = encodedMessage.AsSpan(1);
if (operation == EntryOp.DeleteRangeOp)
{
var (deleteRange, decodeError) = JetStreamCluster.DecodeDeleteRange(payload);
if (decodeError != null || deleteRange == null)
return (0, new InvalidOperationException("bad catchup msg"));
_mu.EnterWriteLock();
try
{
if (_preAcks.Count > 0)
{
for (ulong seq = deleteRange.First; seq < deleteRange.First + deleteRange.Num; seq++)
ClearAllPreAcks(seq);
}
Store.SkipMsgs(deleteRange.First, deleteRange.Num);
var last = deleteRange.First + deleteRange.Num - 1;
SetLastSeq(last);
return (last, null);
}
catch (Exception ex)
{
return (0, ex);
}
finally
{
_mu.ExitWriteLock();
}
}
if (operation == EntryOp.CompressedStreamMsgOp)
payload = Snappy.Decode(payload);
var (subject, _, header, message, sequence, timestamp, _, decodeStreamError) = JetStreamCluster.DecodeStreamMsg(payload);
if (decodeStreamError != null)
return (0, new InvalidOperationException("bad catchup msg"));
if (!string.IsNullOrEmpty(subject) || timestamp != 0)
Store.StoreRawMsg(subject, header, message, sequence, timestamp, ttl: 0, discardNewCheck: false);
else
Store.SkipMsg(sequence);
SetLastSeq(sequence);
return (sequence, null);
}
internal void FlushAllPending() => Store?.FlushAllPending();
internal void HandleClusterSyncRequest(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message)
{
_ = sub;
_ = client;
_ = account;
_ = subject;
StreamSyncRequest? request;
try
{
request = JsonSerializer.Deserialize<StreamSyncRequest>(message);
}
catch
{
return;
}
if (request == null)
return;
_ = Task.Run(() => RunCatchup(reply, request));
}
internal void RunCatchup(string sendSubject, StreamSyncRequest request)
{
if (Store == null)
return;
if (request.LastSeq < request.FirstSeq)
return;
SetCatchupPeer(request.Peer, request.LastSeq - request.FirstSeq);
try
{
var state = new StreamState();
Store.FastState(state);
if (state.LastSeq < request.FirstSeq)
return;
// Current C# port keeps catchup streaming minimal: this method updates
// catchup peer accounting and relies on existing replication apply paths.
ClearCatchupPeer(request.Peer);
}
finally
{
if (!string.IsNullOrWhiteSpace(sendSubject))
ClearCatchupPeer(request.Peer);
}
}
}

View File

@@ -51,6 +51,9 @@ internal sealed partial class NatsStream : IDisposable
private bool _clusterSubsActive;
private ulong _clseq;
private ulong _clfs;
private readonly Dictionary<ulong, object?> _msgTraceBySeq = new();
private Dictionary<string, ulong>? _catchupPeers;
private int _catchingUp;
private readonly Dictionary<string, StreamSourceInfo> _sources = new(StringComparer.Ordinal);
private StreamSourceInfo? _mirrorInfo;
private Timer? _mirrorConsumerSetupTimer;

Binary file not shown.