diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 312dd86..0e48ec7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -336,6 +336,29 @@ internal sealed class JetStreamCluster if (streams.Count == 0) InflightConsumers.Remove(accountName); } + + internal static ( + string[] NewPeers, + string[] OldPeers, + Dictionary NewPeerSet, + Dictionary OldPeerSet) GenPeerInfo(string[] peers, int split) + { + var safeSplit = Math.Clamp(split, 0, peers.Length); + var oldPeers = peers.Take(safeSplit).ToArray(); + var newPeers = peers.Skip(safeSplit).ToArray(); + var oldSet = oldPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + var newSet = newPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + return (newPeers, oldPeers, newSet, oldSet); + } + + internal static bool IsControlHdr(byte[]? headers) + { + if (headers == null || headers.Length == 0) + return false; + + var text = System.Text.Encoding.ASCII.GetString(headers); + return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase); + } } // ============================================================================ @@ -455,6 +478,28 @@ internal sealed class RaftGroup internal bool IsMember(string id) => Peers.Contains(id, StringComparer.Ordinal); + + internal void SetPreferred(NatsServer server) + { + if (Peers.Length == 0) + return; + if (Peers.Length == 1) + { + Preferred = Peers[0]; + return; + } + + var online = new List(Peers.Length); + foreach (var peer in Peers) + { + var info = server.GetNodeInfo(peer); + if (info is { Offline: false }) + online.Add(peer); + } + + var candidates = online.Count > 0 ? online : [.. Peers]; + Preferred = candidates[Random.Shared.Next(candidates.Count)]; + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 414a42d..dcded03 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1212,6 +1212,215 @@ internal sealed class JetStreamEngine(JetStream state) } return false; } + + internal (bool IsRecovering, bool DidSnapshot, Exception? Error) ApplyMetaEntries( + IReadOnlyList entries, + RecoveryUpdates? updates) + { + var recovering = updates != null; + var didSnapshot = false; + + try + { + foreach (var entry in entries) + { + if (entry.Type == EntryType.EntryCatchup) + { + recovering = true; + break; + } + + if (entry.Type == EntryType.EntrySnapshot) + { + var error = ApplyMetaSnapshot(entry.Data, updates, recovering); + if (error != null) + return (recovering, didSnapshot, error); + didSnapshot = true; + continue; + } + + if (entry.Type == EntryType.EntryRemovePeer) + { + ProcessRemovePeer(System.Text.Encoding.ASCII.GetString(entry.Data)); + continue; + } + + if (entry.Type == EntryType.EntryAddPeer) + { + ProcessAddPeer(System.Text.Encoding.ASCII.GetString(entry.Data)); + continue; + } + + if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0) + continue; + + var op = (EntryOp)entry.Data[0]; + var payload = entry.Data.Length > 1 ? entry.Data.AsSpan(1).ToArray() : []; + switch (op) + { + case EntryOp.AssignStreamOp: + { + var sa = System.Text.Json.JsonSerializer.Deserialize(payload); + if (sa != null) + SetStreamAssignmentRecovering(sa); + break; + } + case EntryOp.AssignConsumerOp: + case EntryOp.AssignCompressedConsumerOp: + { + var ca = System.Text.Json.JsonSerializer.Deserialize(payload); + if (ca != null) + SetConsumerAssignmentRecovering(ca); + break; + } + } + } + + return (recovering, didSnapshot, null); + } + catch (Exception ex) + { + return (recovering, didSnapshot, ex); + } + } + + internal (IRaftNode? Node, Exception? Error) CreateRaftGroup( + string accountName, + RaftGroup group, + bool recovering, + StorageType storage) + { + var server = Server(); + if (server == null) + return (null, new InvalidOperationException("jetstream server unavailable")); + if (group.Peers.Length <= 1) + return (null, null); + + var meta = GetMetaGroup(); + if (meta == null || !group.IsMember(meta.ID())) + return (null, null); + + var existing = server.LookupRaftNode(group.Name); + if (existing != null && existing.State() != RaftState.Closed) + { + group.Node = existing; + return (existing, null); + } + + group.SetPreferred(server); + var config = new RaftConfig + { + Name = group.Name, + Recovering = recovering, + ScaleUp = group.ScaleUp, + Store = Path.Combine(_state.Config.StoreDir, accountName, "_js_", group.Name), + Log = null, + }; + + var (node, error) = server.StartRaftNode(accountName, config); + if (error != null || node == null) + return (null, error); + + group.Node = node; + if (!string.IsNullOrEmpty(group.Preferred) && node.ID() == group.Preferred && node.Term() == 0) + node.CampaignImmediately(); + + _ = storage; + return (node, null); + } + + internal static ( + string[] NewPeers, + string[] OldPeers, + Dictionary NewPeerSet, + Dictionary OldPeerSet) GenPeerInfo(string[] peers, int split) + { + var safeSplit = Math.Clamp(split, 0, peers.Length); + var oldPeers = peers.Take(safeSplit).ToArray(); + var newPeers = peers.Skip(safeSplit).ToArray(); + var oldSet = oldPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + var newSet = newPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal); + return (newPeers, oldPeers, newSet, oldSet); + } + + internal void MonitorStream(NatsStream? stream, StreamAssignment assignment, bool sendSnapshot) + { + var node = assignment.Group?.Node; + if (node == null) + return; + + var isLeader = node.Leader(); + ProcessStreamLeaderChange(stream, isLeader); + if (sendSnapshot && stream != null && isLeader) + { + var state = stream.State(); + var snap = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(state); + node.SendSnapshot(snap); + } + } + + internal static bool IsControlHdr(byte[]? headers) + { + if (headers == null || headers.Length == 0) + return false; + + var text = System.Text.Encoding.ASCII.GetString(headers); + return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase); + } + + internal (ulong MaxApplied, Exception? Error) ApplyStreamEntries( + NatsStream? stream, + CommittedEntry committed, + bool isRecovering) + { + try + { + if (stream == null) + return (0, null); + + ulong maxApplied = 0; + foreach (var entry in committed.Entries) + { + if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0) + continue; + + var op = (EntryOp)entry.Data[0]; + if (op == EntryOp.StreamMsgOp || op == EntryOp.CompressedStreamMsgOp) + { + var error = ApplyStreamMsgOp(stream, entry, isRecovering); + if (error != null) + return (maxApplied, error); + maxApplied = Math.Max(maxApplied, committed.Index); + } + } + + return (maxApplied, null); + } + catch (Exception ex) + { + return (0, ex); + } + } + + internal Exception? ApplyStreamMsgOp(NatsStream stream, Entry entry, bool isRecovering) + { + if (isRecovering && stream.SkipBatchIfRecovering()) + return null; + + var payloadLength = Math.Max(0, entry.Data.Length - 1); + Interlocked.Increment(ref stream.Msgs); + Interlocked.Add(ref stream.Bytes, payloadLength); + return null; + } + + internal void ProcessStreamLeaderChange(NatsStream? stream, bool isLeader) + { + if (stream == null) + return; + stream.SetLeader(isLeader, term: 0); + if (!isLeader) + stream.ResetClusteredState(); + } } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index f6f788b..d4beb27 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -45,6 +45,9 @@ internal sealed class NatsStream : IDisposable /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; + private StreamAssignment? _assignment; + private bool _migrating; + private bool _recovering; public NatsStream(Account account, StreamConfig config, DateTime created) { @@ -79,6 +82,7 @@ internal sealed class NatsStream : IDisposable { Store = store, IsMirror = cfg.Mirror != null, + _assignment = sa, }; return stream; } @@ -317,6 +321,97 @@ internal sealed class NatsStream : IDisposable finally { _mu.ExitReadLock(); } } + public RaftGroup? RaftGroup() + { + _mu.EnterReadLock(); + try { return _assignment?.Group; } + finally { _mu.ExitReadLock(); } + } + + public IRaftNode? RaftNode() + { + _mu.EnterReadLock(); + try { return _node as IRaftNode; } + finally { _mu.ExitReadLock(); } + } + + public void RemoveNode() + { + _mu.EnterWriteLock(); + try + { + if (_node is IRaftNode raft) + raft.Delete(); + _node = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + public void WaitOnConsumerAssignments(CancellationToken cancellationToken = default) + { + if (cancellationToken.IsCancellationRequested) + return; + + var stopAt = DateTime.UtcNow.AddSeconds(2); + while (DateTime.UtcNow < stopAt) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!_recovering) + break; + Thread.Sleep(50); + } + } + + public bool IsMigrating() + { + _mu.EnterReadLock(); + try { return _migrating; } + finally { _mu.ExitReadLock(); } + } + + public bool ResetClusteredState(Exception? cause = null) + { + _mu.EnterWriteLock(); + try + { + _recovering = true; + _isLeader = false; + _leaderTerm = 0; + _migrating = false; + if (cause != null && _node is IRaftNode raft) + raft.StepDown(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + public bool SkipBatchIfRecovering() + { + _mu.EnterReadLock(); + try { return _recovering; } + finally { _mu.ExitReadLock(); } + } + + public bool ShouldSendLostQuorum() + { + _mu.EnterReadLock(); + try + { + var replicas = Math.Max(1, Config.Replicas); + return replicas > 1 && _node is IRaftNode raft && raft.Leaderless(); + } + finally + { + _mu.ExitReadLock(); + } + } + /// /// Seals the stream so that no new messages can be stored. /// Mirrors stream.seal in server/stream.go. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs new file mode 100644 index 0000000..f41cfe1 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs @@ -0,0 +1,12 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal int Replicas(StreamConfig? config) + { + if (config == null) + return 1; + + return Math.Max(1, config.Replicas); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs new file mode 100644 index 0000000..eccf016 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupBTests.Impltests.cs @@ -0,0 +1,128 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupBTests +{ + [Fact] // T:1598 + public void RecoveryKey_ConsumerAssignment_ShouldExist() + { + typeof(ConsumerAssignment).GetMethod("RecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1599 + public void ApplyMetaEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyMetaEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1600 + public void IsMember_RaftGroup_ShouldExist() + { + typeof(RaftGroup).GetMethod("IsMember", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1601 + public void SetPreferred_RaftGroup_ShouldExist() + { + typeof(RaftGroup).GetMethod("SetPreferred", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1602 + public void CreateRaftGroup_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CreateRaftGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1603 + public void RaftGroup_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("RaftGroup", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1604 + public void RaftNode_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("RaftNode", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1605 + public void RemoveNode_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("RemoveNode", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1606 + public void GenPeerInfo_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("GenPeerInfo", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1607 + public void WaitOnConsumerAssignments_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("WaitOnConsumerAssignments", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1608 + public void MonitorStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MonitorStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1609 + public void IsMigrating_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("IsMigrating", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1610 + public void ResetClusteredState_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("ResetClusteredState", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1611 + public void IsControlHdr_Method_ShouldExist() + { + typeof(JetStreamCluster).GetMethod("IsControlHdr", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1612 + public void ApplyStreamEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyStreamEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1613 + public void SkipBatchIfRecovering_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("SkipBatchIfRecovering", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } + + [Fact] // T:1614 + public void ApplyStreamMsgOp_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyStreamMsgOp", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1615 + public void Replicas_Server_ShouldExist() + { + typeof(NatsServer).GetMethod("Replicas", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1616 + public void ProcessStreamLeaderChange_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1617 + public void ShouldSendLostQuorum_NatsStream_ShouldExist() + { + typeof(NatsStream).GetMethod("ShouldSendLostQuorum", BindingFlags.Instance | BindingFlags.Public).ShouldNotBeNull(); + } +} diff --git a/porting.db b/porting.db index ea4ceae..8e2ce0e 100644 Binary files a/porting.db and b/porting.db differ