batch33 task2 implement group A cluster stream features

This commit is contained in:
Joseph Doherty
2026-02-28 22:47:53 -05:00
parent 3b9d5079cc
commit d6efba6f8a
4 changed files with 745 additions and 0 deletions

View File

@@ -452,6 +452,9 @@ internal sealed class RaftGroup
/// <summary>Internal Raft node — not serialized.</summary>
[JsonIgnore]
public IRaftNode? Node { get; set; }
internal bool IsMember(string id) =>
Peers.Contains(id, StringComparer.Ordinal);
}
// ============================================================================
@@ -482,6 +485,49 @@ internal sealed class StreamAssignment
[JsonIgnore] public bool Resetting { get; set; }
[JsonIgnore] public Exception? Error { get; set; }
[JsonIgnore] public UnsupportedStreamAssignment? Unsupported { get; set; }
internal StreamAssignment CopyGroup()
{
var clone = new StreamAssignment
{
Client = Client,
Created = Created,
ConfigJson = ConfigJson,
Config = Config,
Group = Group == null
? null
: new RaftGroup
{
Name = Group.Name,
Peers = [.. Group.Peers],
Storage = Group.Storage,
Cluster = Group.Cluster,
Preferred = Group.Preferred,
ScaleUp = Group.ScaleUp,
Node = Group.Node,
},
Sync = Sync,
Subject = Subject,
Reply = Reply,
Restore = Restore,
Consumers = Consumers,
Responded = Responded,
Recovering = Recovering,
Reassigning = Reassigning,
Resetting = Resetting,
Error = Error,
Unsupported = Unsupported,
};
return clone;
}
internal bool MissingPeers() =>
Group != null &&
Config != null &&
Group.Peers.Length < Math.Max(1, Config.Replicas);
internal string RecoveryKey() =>
$"{Client?.ServiceAccount() ?? string.Empty}:{Config?.Name ?? Subject ?? string.Empty}";
}
// ============================================================================
@@ -551,6 +597,45 @@ internal sealed class ConsumerAssignment
[JsonIgnore] public bool Recovering { get; set; }
[JsonIgnore] public Exception? Error { get; set; }
[JsonIgnore] public UnsupportedConsumerAssignment? Unsupported { get; set; }
internal ConsumerAssignment CopyGroup()
{
var clone = new ConsumerAssignment
{
Client = Client,
Created = Created,
Name = Name,
Stream = Stream,
ConfigJson = ConfigJson,
Config = Config,
Group = Group == null
? null
: new RaftGroup
{
Name = Group.Name,
Peers = [.. Group.Peers],
Storage = Group.Storage,
Cluster = Group.Cluster,
Preferred = Group.Preferred,
ScaleUp = Group.ScaleUp,
Node = Group.Node,
},
Subject = Subject,
Reply = Reply,
State = State,
Responded = Responded,
Recovering = Recovering,
Error = Error,
Unsupported = Unsupported,
};
return clone;
}
internal string StreamRecoveryKey() =>
$"{Client?.ServiceAccount() ?? string.Empty}:{Stream}";
internal string RecoveryKey() =>
$"{Client?.ServiceAccount() ?? string.Empty}:{Stream}:{Name}";
}
// ============================================================================

View File

@@ -727,6 +727,491 @@ internal sealed class JetStreamEngine(JetStream state)
!string.IsNullOrWhiteSpace(value) &&
value.Length >= 10 &&
value.StartsWith("A", StringComparison.Ordinal);
// -------------------------------------------------------------------------
// JetStream cluster stream methods (Batch 33 Group A)
// -------------------------------------------------------------------------
internal void MonitorCluster()
{
var meta = GetMetaGroup();
if (meta == null)
return;
SetMetaRecovering();
try
{
CheckClusterSize();
var (_, _, _, error) = MetaSnapshot();
if (error != null)
Server()?.Warnf("JetStream meta snapshot failed in MonitorCluster: {0}", error.Message);
}
finally
{
ClearMetaRecovering();
ClusterStoppedC()?.Writer.TryWrite(true);
}
}
internal void CheckClusterSize()
{
var server = Server();
var meta = GetMetaGroup();
if (server == null || meta == null)
return;
var activePeers = server.ActivePeers();
if (activePeers.Count == 0)
return;
var jsPeers = 0;
foreach (var peer in activePeers)
{
var info = server.GetNodeInfo(peer);
if (info?.Js == true)
jsPeers++;
}
if (jsPeers > 0 && jsPeers < meta.ClusterSize())
meta.AdjustClusterSize(jsPeers);
}
internal (StreamConfig Config, bool Ok) ClusterStreamConfig(string accountName, string streamName)
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return (new StreamConfig(), false);
if (!cluster.Streams.TryGetValue(accountName, out var accountStreams))
return (new StreamConfig(), false);
if (!accountStreams.TryGetValue(streamName, out var assignment))
return (new StreamConfig(), false);
if (assignment.Config == null)
return (new StreamConfig(), false);
return (assignment.Config.Clone(), true);
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot()
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return ([], 0, 0, null);
return EncodeMetaSnapshot(cluster.Streams);
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal Exception? ApplyMetaSnapshot(byte[] buffer, RecoveryUpdates? updates, bool isRecovering)
{
var (decoded, error) = DecodeMetaSnapshot(buffer);
if (error != null || decoded == null)
return error;
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return null;
foreach (var accountStreams in decoded.Values)
{
foreach (var assignment in accountStreams.Values)
{
SetStreamAssignmentRecovering(assignment);
if (assignment.Consumers == null)
continue;
foreach (var consumer in assignment.Consumers.Values)
{
SetConsumerAssignmentRecovering(consumer);
}
}
}
cluster.Streams = decoded;
if (!isRecovering && updates != null)
{
updates.AddStreams.Clear();
updates.UpdateStreams.Clear();
updates.RemoveStreams.Clear();
updates.UpdateConsumers.Clear();
updates.RemoveConsumers.Clear();
}
return null;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal (Dictionary<string, Dictionary<string, StreamAssignment>>? Streams, Exception? Error) DecodeMetaSnapshot(byte[] buffer)
{
if (buffer.Length == 0)
{
return (
new Dictionary<string, Dictionary<string, StreamAssignment>>(StringComparer.Ordinal),
null
);
}
try
{
var writeable = System.Text.Json.JsonSerializer.Deserialize<List<WriteableStreamAssignment>>(buffer) ?? [];
var streams = new Dictionary<string, Dictionary<string, StreamAssignment>>(StringComparer.Ordinal);
foreach (var item in writeable)
{
var accountName = item.Client?.ServiceAccount() ?? string.Empty;
if (!streams.TryGetValue(accountName, out var accountStreams))
{
accountStreams = new Dictionary<string, StreamAssignment>(StringComparer.Ordinal);
streams[accountName] = accountStreams;
}
var assignment = new StreamAssignment
{
Client = item.Client,
Created = item.Created,
ConfigJson = item.ConfigJson,
Group = item.Group,
Sync = item.Sync,
Config = item.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined
? null
: System.Text.Json.JsonSerializer.Deserialize<StreamConfig>(item.ConfigJson.GetRawText()),
Consumers = new Dictionary<string, ConsumerAssignment>(StringComparer.Ordinal),
};
if (item.Consumers.Count > 0)
{
foreach (var consumer in item.Consumers)
{
var streamName = !string.IsNullOrWhiteSpace(consumer.Stream)
? consumer.Stream
: assignment.Config?.Name ?? string.Empty;
var assignmentConsumer = new ConsumerAssignment
{
Client = consumer.Client,
Created = consumer.Created,
Name = consumer.Name,
Stream = streamName,
ConfigJson = consumer.ConfigJson,
Group = consumer.Group,
State = consumer.State,
Config = consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined
? null
: System.Text.Json.JsonSerializer.Deserialize<ConsumerConfig>(consumer.ConfigJson.GetRawText()),
};
assignment.Consumers[assignmentConsumer.Name] = assignmentConsumer;
}
}
var streamKey = assignment.Config?.Name ?? string.Empty;
accountStreams[streamKey] = assignment;
}
return (streams, null);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) EncodeMetaSnapshot(
Dictionary<string, Dictionary<string, StreamAssignment>> streams)
{
try
{
var streamCount = 0;
var consumerCount = 0;
var writeable = new List<WriteableStreamAssignment>();
foreach (var accountStreams in streams.Values)
{
foreach (var assignment in accountStreams.Values)
{
streamCount++;
var configJson = assignment.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined &&
assignment.Config != null
? System.Text.Json.JsonSerializer.SerializeToElement(assignment.Config)
: assignment.ConfigJson;
var ws = new WriteableStreamAssignment
{
Client = assignment.Client,
Created = assignment.Created,
ConfigJson = configJson,
Group = assignment.Group,
Sync = assignment.Sync,
};
if (assignment.Consumers != null)
{
foreach (var consumer in assignment.Consumers.Values)
{
consumerCount++;
var consumerConfigJson =
consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined &&
consumer.Config != null
? System.Text.Json.JsonSerializer.SerializeToElement(consumer.Config)
: consumer.ConfigJson;
ws.Consumers.Add(new WriteableConsumerAssignment
{
Client = consumer.Client,
Created = consumer.Created,
Name = consumer.Name,
Stream = consumer.Stream,
ConfigJson = consumerConfigJson,
Group = consumer.Group,
State = consumer.State,
});
}
}
writeable.Add(ws);
}
}
if (writeable.Count == 0)
return ([], 0, 0, null);
var snapshot = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(writeable);
return (snapshot, streamCount, consumerCount, null);
}
catch (Exception ex)
{
return ([], 0, 0, ex);
}
}
internal Exception? CollectStreamAndConsumerChanges(
IRaftNodeCheckpoint checkpoint,
Dictionary<string, Dictionary<string, StreamAssignment>> streams)
{
try
{
foreach (var (appendEntry, error) in checkpoint.AppendEntriesSeq())
{
if (error != null)
return error;
foreach (var entry in appendEntry.Entries)
{
if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0)
continue;
var op = (EntryOp)entry.Data[0];
var payload = entry.Data.Length > 1 ? entry.Data.AsSpan(1).ToArray() : [];
switch (op)
{
case EntryOp.AssignStreamOp:
case EntryOp.UpdateStreamOp:
{
var sa = System.Text.Json.JsonSerializer.Deserialize<StreamAssignment>(payload);
if (sa?.Client == null || sa.Config == null)
break;
var account = sa.Client.ServiceAccount();
if (!streams.TryGetValue(account, out var accountStreams))
{
accountStreams = new Dictionary<string, StreamAssignment>(StringComparer.Ordinal);
streams[account] = accountStreams;
}
accountStreams[sa.Config.Name] = sa;
break;
}
case EntryOp.RemoveStreamOp:
{
var sa = System.Text.Json.JsonSerializer.Deserialize<StreamAssignment>(payload);
if (sa?.Client == null || sa.Config == null)
break;
if (streams.TryGetValue(sa.Client.ServiceAccount(), out var accountStreams))
{
accountStreams.Remove(sa.Config.Name);
}
break;
}
case EntryOp.AssignConsumerOp:
case EntryOp.AssignCompressedConsumerOp:
{
var ca = System.Text.Json.JsonSerializer.Deserialize<ConsumerAssignment>(payload);
if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream))
break;
var account = ca.Client.ServiceAccount();
if (!streams.TryGetValue(account, out var accountStreams))
break;
if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment))
break;
streamAssignment.Consumers ??= new Dictionary<string, ConsumerAssignment>(StringComparer.Ordinal);
streamAssignment.Consumers[ca.Name] = ca;
break;
}
case EntryOp.RemoveConsumerOp:
{
var ca = System.Text.Json.JsonSerializer.Deserialize<ConsumerAssignment>(payload);
if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream))
break;
var account = ca.Client.ServiceAccount();
if (!streams.TryGetValue(account, out var accountStreams))
break;
if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment))
break;
streamAssignment.Consumers?.Remove(ca.Name);
break;
}
}
}
}
return null;
}
catch (Exception ex)
{
return ex;
}
}
internal void SetStreamAssignmentRecovering(StreamAssignment assignment)
{
assignment.Responded = true;
assignment.Recovering = true;
assignment.Restore = null;
if (assignment.Group != null)
{
assignment.Group.Preferred = string.Empty;
assignment.Group.ScaleUp = false;
}
}
internal void SetConsumerAssignmentRecovering(ConsumerAssignment assignment)
{
assignment.Responded = true;
assignment.Recovering = true;
if (assignment.Group != null)
{
assignment.Group.Preferred = string.Empty;
assignment.Group.ScaleUp = false;
}
}
internal void ProcessAddPeer(string peer)
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
if (!cluster.IsLeader())
return;
foreach (var accountStreams in cluster.Streams.Values)
{
foreach (var assignment in accountStreams.Values)
{
if (!assignment.MissingPeers())
continue;
var copy = assignment.CopyGroup();
copy.Group!.Peers = [.. copy.Group.Peers, peer];
assignment.Group = copy.Group;
if (assignment.Consumers == null)
continue;
foreach (var consumer in assignment.Consumers.Values)
{
if (consumer.Config?.Durable is { Length: > 0 } || (consumer.Group?.Peers.Length ?? 0) > 1)
consumer.Group!.Peers = assignment.Group.Peers;
}
}
}
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void ProcessRemovePeer(string peer)
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
foreach (var accountStreams in cluster.Streams.Values)
{
foreach (var assignment in accountStreams.Values)
{
if (assignment.Group == null || !assignment.Group.IsMember(peer))
continue;
RemovePeerFromStreamLocked(assignment, peer);
}
}
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal bool RemovePeerFromStream(StreamAssignment assignment, string peer)
{
_state.Lock.EnterWriteLock();
try
{
return RemovePeerFromStreamLocked(assignment, peer);
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal bool RemovePeerFromStreamLocked(StreamAssignment assignment, string peer)
{
if (assignment.Group == null || !assignment.Group.IsMember(peer))
return false;
assignment.Group.Peers = [.. assignment.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))];
assignment.Group.Preferred = string.Empty;
if (assignment.Consumers == null)
return true;
foreach (var consumer in assignment.Consumers.Values)
{
if (consumer.Group == null)
continue;
consumer.Group.Peers = [.. consumer.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))];
consumer.Group.Preferred = string.Empty;
}
return true;
}
internal bool HasPeerEntries(IEnumerable<Entry> entries)
{
foreach (var entry in entries)
{
if (entry.Type == EntryType.EntryAddPeer || entry.Type == EntryType.EntryRemovePeer)
return true;
}
return false;
}
}
internal sealed class StreamAssignmentView