batch33 task3 implement group B cluster stream features

This commit is contained in:
Joseph Doherty
2026-02-28 22:54:06 -05:00
parent d6efba6f8a
commit fee37d88bf
6 changed files with 489 additions and 0 deletions

View File

@@ -336,6 +336,29 @@ internal sealed class JetStreamCluster
if (streams.Count == 0)
InflightConsumers.Remove(accountName);
}
internal static (
string[] NewPeers,
string[] OldPeers,
Dictionary<string, bool> NewPeerSet,
Dictionary<string, bool> OldPeerSet) GenPeerInfo(string[] peers, int split)
{
var safeSplit = Math.Clamp(split, 0, peers.Length);
var oldPeers = peers.Take(safeSplit).ToArray();
var newPeers = peers.Skip(safeSplit).ToArray();
var oldSet = oldPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal);
var newSet = newPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal);
return (newPeers, oldPeers, newSet, oldSet);
}
internal static bool IsControlHdr(byte[]? headers)
{
if (headers == null || headers.Length == 0)
return false;
var text = System.Text.Encoding.ASCII.GetString(headers);
return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase);
}
}
// ============================================================================
@@ -455,6 +478,28 @@ internal sealed class RaftGroup
internal bool IsMember(string id) =>
Peers.Contains(id, StringComparer.Ordinal);
internal void SetPreferred(NatsServer server)
{
if (Peers.Length == 0)
return;
if (Peers.Length == 1)
{
Preferred = Peers[0];
return;
}
var online = new List<string>(Peers.Length);
foreach (var peer in Peers)
{
var info = server.GetNodeInfo(peer);
if (info is { Offline: false })
online.Add(peer);
}
var candidates = online.Count > 0 ? online : [.. Peers];
Preferred = candidates[Random.Shared.Next(candidates.Count)];
}
}
// ============================================================================

View File

@@ -1212,6 +1212,215 @@ internal sealed class JetStreamEngine(JetStream state)
}
return false;
}
internal (bool IsRecovering, bool DidSnapshot, Exception? Error) ApplyMetaEntries(
IReadOnlyList<Entry> entries,
RecoveryUpdates? updates)
{
var recovering = updates != null;
var didSnapshot = false;
try
{
foreach (var entry in entries)
{
if (entry.Type == EntryType.EntryCatchup)
{
recovering = true;
break;
}
if (entry.Type == EntryType.EntrySnapshot)
{
var error = ApplyMetaSnapshot(entry.Data, updates, recovering);
if (error != null)
return (recovering, didSnapshot, error);
didSnapshot = true;
continue;
}
if (entry.Type == EntryType.EntryRemovePeer)
{
ProcessRemovePeer(System.Text.Encoding.ASCII.GetString(entry.Data));
continue;
}
if (entry.Type == EntryType.EntryAddPeer)
{
ProcessAddPeer(System.Text.Encoding.ASCII.GetString(entry.Data));
continue;
}
if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0)
continue;
var op = (EntryOp)entry.Data[0];
var payload = entry.Data.Length > 1 ? entry.Data.AsSpan(1).ToArray() : [];
switch (op)
{
case EntryOp.AssignStreamOp:
{
var sa = System.Text.Json.JsonSerializer.Deserialize<StreamAssignment>(payload);
if (sa != null)
SetStreamAssignmentRecovering(sa);
break;
}
case EntryOp.AssignConsumerOp:
case EntryOp.AssignCompressedConsumerOp:
{
var ca = System.Text.Json.JsonSerializer.Deserialize<ConsumerAssignment>(payload);
if (ca != null)
SetConsumerAssignmentRecovering(ca);
break;
}
}
}
return (recovering, didSnapshot, null);
}
catch (Exception ex)
{
return (recovering, didSnapshot, ex);
}
}
internal (IRaftNode? Node, Exception? Error) CreateRaftGroup(
string accountName,
RaftGroup group,
bool recovering,
StorageType storage)
{
var server = Server();
if (server == null)
return (null, new InvalidOperationException("jetstream server unavailable"));
if (group.Peers.Length <= 1)
return (null, null);
var meta = GetMetaGroup();
if (meta == null || !group.IsMember(meta.ID()))
return (null, null);
var existing = server.LookupRaftNode(group.Name);
if (existing != null && existing.State() != RaftState.Closed)
{
group.Node = existing;
return (existing, null);
}
group.SetPreferred(server);
var config = new RaftConfig
{
Name = group.Name,
Recovering = recovering,
ScaleUp = group.ScaleUp,
Store = Path.Combine(_state.Config.StoreDir, accountName, "_js_", group.Name),
Log = null,
};
var (node, error) = server.StartRaftNode(accountName, config);
if (error != null || node == null)
return (null, error);
group.Node = node;
if (!string.IsNullOrEmpty(group.Preferred) && node.ID() == group.Preferred && node.Term() == 0)
node.CampaignImmediately();
_ = storage;
return (node, null);
}
internal static (
string[] NewPeers,
string[] OldPeers,
Dictionary<string, bool> NewPeerSet,
Dictionary<string, bool> OldPeerSet) GenPeerInfo(string[] peers, int split)
{
var safeSplit = Math.Clamp(split, 0, peers.Length);
var oldPeers = peers.Take(safeSplit).ToArray();
var newPeers = peers.Skip(safeSplit).ToArray();
var oldSet = oldPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal);
var newSet = newPeers.ToDictionary(p => p, _ => true, StringComparer.Ordinal);
return (newPeers, oldPeers, newSet, oldSet);
}
internal void MonitorStream(NatsStream? stream, StreamAssignment assignment, bool sendSnapshot)
{
var node = assignment.Group?.Node;
if (node == null)
return;
var isLeader = node.Leader();
ProcessStreamLeaderChange(stream, isLeader);
if (sendSnapshot && stream != null && isLeader)
{
var state = stream.State();
var snap = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(state);
node.SendSnapshot(snap);
}
}
internal static bool IsControlHdr(byte[]? headers)
{
if (headers == null || headers.Length == 0)
return false;
var text = System.Text.Encoding.ASCII.GetString(headers);
return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase);
}
internal (ulong MaxApplied, Exception? Error) ApplyStreamEntries(
NatsStream? stream,
CommittedEntry committed,
bool isRecovering)
{
try
{
if (stream == null)
return (0, null);
ulong maxApplied = 0;
foreach (var entry in committed.Entries)
{
if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0)
continue;
var op = (EntryOp)entry.Data[0];
if (op == EntryOp.StreamMsgOp || op == EntryOp.CompressedStreamMsgOp)
{
var error = ApplyStreamMsgOp(stream, entry, isRecovering);
if (error != null)
return (maxApplied, error);
maxApplied = Math.Max(maxApplied, committed.Index);
}
}
return (maxApplied, null);
}
catch (Exception ex)
{
return (0, ex);
}
}
internal Exception? ApplyStreamMsgOp(NatsStream stream, Entry entry, bool isRecovering)
{
if (isRecovering && stream.SkipBatchIfRecovering())
return null;
var payloadLength = Math.Max(0, entry.Data.Length - 1);
Interlocked.Increment(ref stream.Msgs);
Interlocked.Add(ref stream.Bytes, payloadLength);
return null;
}
internal void ProcessStreamLeaderChange(NatsStream? stream, bool isLeader)
{
if (stream == null)
return;
stream.SetLeader(isLeader, term: 0);
if (!isLeader)
stream.ResetClusteredState();
}
}
internal sealed class StreamAssignmentView

View File

@@ -45,6 +45,9 @@ internal sealed class NatsStream : IDisposable
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;
private StreamAssignment? _assignment;
private bool _migrating;
private bool _recovering;
public NatsStream(Account account, StreamConfig config, DateTime created)
{
@@ -79,6 +82,7 @@ internal sealed class NatsStream : IDisposable
{
Store = store,
IsMirror = cfg.Mirror != null,
_assignment = sa,
};
return stream;
}
@@ -317,6 +321,97 @@ internal sealed class NatsStream : IDisposable
finally { _mu.ExitReadLock(); }
}
public RaftGroup? RaftGroup()
{
_mu.EnterReadLock();
try { return _assignment?.Group; }
finally { _mu.ExitReadLock(); }
}
public IRaftNode? RaftNode()
{
_mu.EnterReadLock();
try { return _node as IRaftNode; }
finally { _mu.ExitReadLock(); }
}
public void RemoveNode()
{
_mu.EnterWriteLock();
try
{
if (_node is IRaftNode raft)
raft.Delete();
_node = null;
}
finally
{
_mu.ExitWriteLock();
}
}
public void WaitOnConsumerAssignments(CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return;
var stopAt = DateTime.UtcNow.AddSeconds(2);
while (DateTime.UtcNow < stopAt)
{
cancellationToken.ThrowIfCancellationRequested();
if (!_recovering)
break;
Thread.Sleep(50);
}
}
public bool IsMigrating()
{
_mu.EnterReadLock();
try { return _migrating; }
finally { _mu.ExitReadLock(); }
}
public bool ResetClusteredState(Exception? cause = null)
{
_mu.EnterWriteLock();
try
{
_recovering = true;
_isLeader = false;
_leaderTerm = 0;
_migrating = false;
if (cause != null && _node is IRaftNode raft)
raft.StepDown();
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
public bool SkipBatchIfRecovering()
{
_mu.EnterReadLock();
try { return _recovering; }
finally { _mu.ExitReadLock(); }
}
public bool ShouldSendLostQuorum()
{
_mu.EnterReadLock();
try
{
var replicas = Math.Max(1, Config.Replicas);
return replicas > 1 && _node is IRaftNode raft && raft.Leaderless();
}
finally
{
_mu.ExitReadLock();
}
}
/// <summary>
/// Seals the stream so that no new messages can be stored.
/// Mirrors <c>stream.seal</c> in server/stream.go.

View File

@@ -0,0 +1,12 @@
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal int Replicas(StreamConfig? config)
{
if (config == null)
return 1;
return Math.Max(1, config.Replicas);
}
}