batch33 task4 implement group C cluster stream features

This commit is contained in:
Joseph Doherty
2026-02-28 22:58:12 -05:00
parent fee37d88bf
commit 9ef04cc28a
4 changed files with 406 additions and 0 deletions

View File

@@ -1421,6 +1421,270 @@ internal sealed class JetStreamEngine(JetStream state)
if (!isLeader)
stream.ResetClusteredState();
}
internal StreamAssignment? StreamAssignment(string accountName, string streamName)
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return null;
if (!cluster.Streams.TryGetValue(accountName, out var accountStreams))
return null;
return accountStreams.TryGetValue(streamName, out var assignment) ? assignment : null;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal StreamAssignment? StreamAssignmentOrInflight(string accountName, string streamName)
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return null;
if (cluster.Streams.TryGetValue(accountName, out var accountStreams) &&
accountStreams.TryGetValue(streamName, out var current))
return current;
if (cluster.InflightStreams.TryGetValue(accountName, out var inflight) &&
inflight.TryGetValue(streamName, out var info))
return info.Assignment;
return null;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal IEnumerable<StreamAssignment> StreamAssignmentsOrInflightSeq(string accountName)
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return [];
var seen = new HashSet<string>(StringComparer.Ordinal);
var items = new List<StreamAssignment>();
if (cluster.Streams.TryGetValue(accountName, out var accountStreams))
{
foreach (var (streamName, assignment) in accountStreams)
{
seen.Add(streamName);
items.Add(assignment);
}
}
if (cluster.InflightStreams.TryGetValue(accountName, out var inflightStreams))
{
foreach (var (streamName, inflight) in inflightStreams)
{
if (seen.Contains(streamName) || inflight.Assignment == null)
continue;
items.Add(inflight.Assignment);
}
}
return items;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal IEnumerable<(string Account, StreamAssignment Assignment)> StreamAssignmentsOrInflightSeqAllAccounts()
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return [];
var results = new List<(string Account, StreamAssignment Assignment)>();
foreach (var account in cluster.Streams.Keys.Union(cluster.InflightStreams.Keys, StringComparer.Ordinal))
{
var seen = new HashSet<string>(StringComparer.Ordinal);
if (cluster.Streams.TryGetValue(account, out var accountStreams))
{
foreach (var (streamName, assignment) in accountStreams)
{
seen.Add(streamName);
results.Add((account, assignment));
}
}
if (cluster.InflightStreams.TryGetValue(account, out var inflightStreams))
{
foreach (var (streamName, inflight) in inflightStreams)
{
if (seen.Contains(streamName) || inflight.Assignment == null)
continue;
results.Add((account, inflight.Assignment));
}
}
}
return results;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal void ProcessStreamAssignment(StreamAssignment assignment)
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null)
return;
var accountName = assignment.Client.ServiceAccount();
if (!cluster.Streams.TryGetValue(accountName, out var accountStreams))
{
accountStreams = new Dictionary<string, StreamAssignment>(StringComparer.Ordinal);
cluster.Streams[accountName] = accountStreams;
}
var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty;
assignment.Responded = true;
accountStreams[streamName] = assignment;
cluster.RemoveInflightStreamProposal(accountName, streamName);
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void ProcessUpdateStreamAssignment(StreamAssignment assignment)
{
ProcessStreamAssignment(assignment);
}
internal void ProcessClusterUpdateStream(StreamAssignment assignment)
{
ProcessUpdateStreamAssignment(assignment);
}
internal void ProcessClusterCreateStream(StreamAssignment assignment)
{
ProcessStreamAssignment(assignment);
}
internal void ProcessStreamRemoval(StreamAssignment assignment)
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null)
return;
var accountName = assignment.Client.ServiceAccount();
var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty;
if (cluster.Streams.TryGetValue(accountName, out var accountStreams))
{
accountStreams.Remove(streamName);
if (accountStreams.Count == 0)
cluster.Streams.Remove(accountName);
}
cluster.RemoveInflightStreamProposal(accountName, streamName);
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void ProcessClusterDeleteStream(StreamAssignment assignment)
{
ProcessStreamRemoval(assignment);
}
internal void ProcessConsumerAssignment(ConsumerAssignment assignment)
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null)
return;
var accountName = assignment.Client.ServiceAccount();
if (!cluster.Streams.TryGetValue(accountName, out var accountStreams))
return;
if (!accountStreams.TryGetValue(assignment.Stream, out var streamAssignment))
return;
streamAssignment.Consumers ??= new Dictionary<string, ConsumerAssignment>(StringComparer.Ordinal);
assignment.Responded = true;
streamAssignment.Consumers[assignment.Name] = assignment;
cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name);
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void ProcessConsumerRemoval(ConsumerAssignment assignment)
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null)
return;
var accountName = assignment.Client.ServiceAccount();
if (cluster.Streams.TryGetValue(accountName, out var accountStreams) &&
accountStreams.TryGetValue(assignment.Stream, out var streamAssignment))
{
streamAssignment.Consumers?.Remove(assignment.Name);
}
cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name);
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void ProcessClusterCreateConsumer(ConsumerAssignment assignment)
{
ProcessConsumerAssignment(assignment);
}
internal void ProcessClusterDeleteConsumer(ConsumerAssignment assignment)
{
ProcessConsumerRemoval(assignment);
}
internal ConsumerAssignment? ConsumerAssignment(string accountName, string streamName, string consumerName)
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return null;
if (!cluster.Streams.TryGetValue(accountName, out var accountStreams))
return null;
if (!accountStreams.TryGetValue(streamName, out var streamAssignment))
return null;
if (streamAssignment.Consumers == null)
return null;
return streamAssignment.Consumers.TryGetValue(consumerName, out var assignment) ? assignment : null;
}
finally
{
_state.Lock.ExitReadLock();
}
}
}
internal sealed class StreamAssignmentView

View File

@@ -9,4 +9,30 @@ public sealed partial class NatsServer
return Math.Max(1, config.Replicas);
}
internal void SendStreamLostQuorumAdvisory(Account? account, string stream, string[]? peers = null)
{
_ = account;
_ = peers;
Noticef("JetStream stream lost quorum advisory for stream {0}", stream);
}
internal void SendStreamLeaderElectAdvisory(Account? account, string stream, string leader)
{
_ = account;
Noticef("JetStream stream leader elect advisory for stream {0}, leader {1}", stream, leader);
}
internal bool RemoveStream(Account? account, string streamName)
{
if (account == null)
return false;
var (stream, _) = account.LookupStream(streamName);
if (stream == null)
return false;
stream.Delete();
return true;
}
}