feat(cluster): add stream/consumer assignments, placement engine, and meta proposal workflow (B7+B8+B9)

- RaftGroup, StreamAssignment, ConsumerAssignment types matching Go structs
  (jetstream_cluster.go:154-266)
- PlacementEngine.SelectPeerGroup: topology-aware peer selection with cluster
  affinity, tag filtering, exclude tags, and storage-weighted sorting
  (Go ref: selectPeerGroup at line 7212)
- JetStreamMetaGroup: backward-compatible rewrite with full assignment tracking,
  consumer proposal workflow, and delete operations
- 41 new tests in ClusterAssignmentAndPlacementTests
This commit is contained in:
Joseph Doherty
2026-02-24 17:13:01 -05:00
parent a41d0f453c
commit a323715495
3 changed files with 1276 additions and 2 deletions

View File

@@ -3,11 +3,33 @@ using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Cluster;
/// <summary>
/// Orchestrates cluster-wide stream/consumer lifecycle via RAFT proposals.
/// The meta-group tracks StreamAssignment and ConsumerAssignment dictionaries,
/// validates proposals, and dispatches applied entries.
/// Go reference: jetstream_cluster.go:500-2000 (processStreamAssignment, processConsumerAssignment).
/// </summary>
public sealed class JetStreamMetaGroup
{
private readonly int _nodes;
private readonly int _selfIndex;
// Backward-compatible stream name set used by existing GetState().Streams.
private readonly ConcurrentDictionary<string, byte> _streams = new(StringComparer.Ordinal);
// Full StreamAssignment tracking for proposal workflow.
// Go reference: jetstream_cluster.go streamAssignment, consumerAssignment maps.
private readonly ConcurrentDictionary<string, StreamAssignment> _assignments =
new(StringComparer.Ordinal);
// B8: Inflight proposal tracking -- entries that have been proposed but not yet committed.
// Go reference: jetstream_cluster.go inflight tracking for proposals.
private readonly ConcurrentDictionary<string, string> _inflightStreams = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, string> _inflightConsumers = new(StringComparer.Ordinal);
// Running count of consumers across all stream assignments.
private int _totalConsumerCount;
private int _leaderIndex = 1;
private long _leadershipVersion = 1;
@@ -24,7 +46,7 @@ public sealed class JetStreamMetaGroup
/// <summary>
/// Returns true when this node is the current meta-group leader.
/// Go reference: jetstream_api.go:200-300 leader check before mutating operations.
/// Go reference: jetstream_api.go:200-300 -- leader check before mutating operations.
/// </summary>
public bool IsLeader() => _leaderIndex == _selfIndex;
@@ -34,12 +56,206 @@ public sealed class JetStreamMetaGroup
/// </summary>
public string Leader => $"meta-{_leaderIndex}";
/// <summary>
/// Number of streams currently tracked.
/// </summary>
public int StreamCount => _assignments.Count;
/// <summary>
/// Number of consumers across all streams.
/// </summary>
public int ConsumerCount => _totalConsumerCount;
/// <summary>
/// Number of inflight stream proposals.
/// </summary>
public int InflightStreamCount => _inflightStreams.Count;
/// <summary>
/// Number of inflight consumer proposals.
/// </summary>
public int InflightConsumerCount => _inflightConsumers.Count;
// ---------------------------------------------------------------
// Stream proposals
// ---------------------------------------------------------------
/// <summary>
/// Proposes creating a stream. Stores in both the backward-compatible name set
/// and the full assignment map.
/// Go reference: jetstream_cluster.go processStreamAssignment.
/// </summary>
public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct)
=> ProposeCreateStreamAsync(config, group: null, ct);
/// <summary>
/// Proposes creating a stream with an explicit RAFT group assignment.
/// Validates leader status and duplicate stream names before proposing.
/// Go reference: jetstream_cluster.go processStreamAssignment.
/// </summary>
public Task ProposeCreateStreamAsync(StreamConfig config, RaftGroup? group, CancellationToken ct)
{
_streams[config.Name] = 0;
_ = ct;
if (!IsLeader())
throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}");
if (_assignments.ContainsKey(config.Name))
throw new InvalidOperationException($"Stream '{config.Name}' already exists.");
// Track as inflight
_inflightStreams[config.Name] = config.Name;
// Apply the entry
ApplyStreamCreate(config.Name, group ?? new RaftGroup { Name = config.Name });
// Clear inflight
_inflightStreams.TryRemove(config.Name, out _);
return Task.CompletedTask;
}
/// <summary>
/// Proposes deleting a stream. Removes from both tracking structures.
/// Go reference: jetstream_cluster.go processStreamDelete.
/// </summary>
public Task ProposeDeleteStreamAsync(string streamName, CancellationToken ct)
{
_ = ct;
if (!IsLeader())
throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}");
ApplyStreamDelete(streamName);
return Task.CompletedTask;
}
// ---------------------------------------------------------------
// Consumer proposals
// ---------------------------------------------------------------
/// <summary>
/// Proposes creating a consumer assignment within a stream.
/// Validates that the stream exists.
/// Go reference: jetstream_cluster.go processConsumerAssignment.
/// </summary>
public Task ProposeCreateConsumerAsync(
string streamName,
string consumerName,
RaftGroup group,
CancellationToken ct)
{
_ = ct;
if (!IsLeader())
throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}");
if (!_assignments.ContainsKey(streamName))
throw new InvalidOperationException($"Stream '{streamName}' not found.");
// Track as inflight
var inflightKey = $"{streamName}/{consumerName}";
_inflightConsumers[inflightKey] = inflightKey;
// Apply the entry
ApplyConsumerCreate(streamName, consumerName, group);
// Clear inflight
_inflightConsumers.TryRemove(inflightKey, out _);
return Task.CompletedTask;
}
/// <summary>
/// Proposes deleting a consumer assignment from a stream.
/// Go reference: jetstream_cluster.go processConsumerDelete.
/// </summary>
public Task ProposeDeleteConsumerAsync(
string streamName,
string consumerName,
CancellationToken ct)
{
_ = ct;
if (!IsLeader())
throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}");
ApplyConsumerDelete(streamName, consumerName);
return Task.CompletedTask;
}
// ---------------------------------------------------------------
// ApplyEntry dispatch
// Go reference: jetstream_cluster.go RAFT apply for meta group
// ---------------------------------------------------------------
/// <summary>
/// Applies a committed RAFT entry to the meta-group state.
/// Dispatches based on entry type prefix.
/// Go reference: jetstream_cluster.go processStreamAssignment / processConsumerAssignment.
/// </summary>
public void ApplyEntry(MetaEntryType entryType, string name, string? streamName = null, RaftGroup? group = null)
{
switch (entryType)
{
case MetaEntryType.StreamCreate:
ApplyStreamCreate(name, group ?? new RaftGroup { Name = name });
break;
case MetaEntryType.StreamDelete:
ApplyStreamDelete(name);
break;
case MetaEntryType.ConsumerCreate:
if (streamName is null)
throw new ArgumentNullException(nameof(streamName), "Stream name required for consumer operations.");
ApplyConsumerCreate(streamName, name, group ?? new RaftGroup { Name = name });
break;
case MetaEntryType.ConsumerDelete:
if (streamName is null)
throw new ArgumentNullException(nameof(streamName), "Stream name required for consumer operations.");
ApplyConsumerDelete(streamName, name);
break;
}
}
// ---------------------------------------------------------------
// Lookup
// ---------------------------------------------------------------
/// <summary>
/// Returns the StreamAssignment for the given stream name, or null if not found.
/// Go reference: jetstream_cluster.go streamAssignment lookup in meta leader.
/// </summary>
public StreamAssignment? GetStreamAssignment(string streamName)
=> _assignments.TryGetValue(streamName, out var assignment) ? assignment : null;
/// <summary>
/// Returns the ConsumerAssignment for the given stream and consumer, or null if not found.
/// Go reference: jetstream_cluster.go consumerAssignment lookup.
/// </summary>
public ConsumerAssignment? GetConsumerAssignment(string streamName, string consumerName)
{
if (_assignments.TryGetValue(streamName, out var sa)
&& sa.Consumers.TryGetValue(consumerName, out var ca))
{
return ca;
}
return null;
}
/// <summary>
/// Returns all current stream assignments.
/// Go reference: jetstream_cluster.go meta leader assignment enumeration.
/// </summary>
public IReadOnlyCollection<StreamAssignment> GetAllAssignments()
=> _assignments.Values.ToArray();
// ---------------------------------------------------------------
// State
// ---------------------------------------------------------------
public MetaGroupState GetState()
{
return new MetaGroupState
@@ -48,9 +264,16 @@ public sealed class JetStreamMetaGroup
ClusterSize = _nodes,
LeaderId = $"meta-{_leaderIndex}",
LeadershipVersion = _leadershipVersion,
AssignmentCount = _assignments.Count,
ConsumerCount = _totalConsumerCount,
};
}
/// <summary>
/// Steps down the current leader, rotating to the next node.
/// Clears all inflight proposals on leader change.
/// Go reference: jetstream_cluster.go leader stepdown, clear inflight.
/// </summary>
public void StepDown()
{
_leaderIndex++;
@@ -58,7 +281,80 @@ public sealed class JetStreamMetaGroup
_leaderIndex = 1;
Interlocked.Increment(ref _leadershipVersion);
// Clear inflight on leader change
// Go reference: jetstream_cluster.go -- inflight entries are cleared when leadership changes.
_inflightStreams.Clear();
_inflightConsumers.Clear();
}
// ---------------------------------------------------------------
// Internal apply methods
// ---------------------------------------------------------------
private void ApplyStreamCreate(string streamName, RaftGroup group)
{
_streams[streamName] = 0;
_assignments.AddOrUpdate(
streamName,
name => new StreamAssignment
{
StreamName = name,
Group = group,
ConfigJson = "{}",
},
(_, existing) => existing);
}
private void ApplyStreamDelete(string streamName)
{
if (_assignments.TryRemove(streamName, out var removed))
{
// Decrement consumer count for all consumers in this stream
Interlocked.Add(ref _totalConsumerCount, -removed.Consumers.Count);
}
_streams.TryRemove(streamName, out _);
}
private void ApplyConsumerCreate(string streamName, string consumerName, RaftGroup group)
{
if (_assignments.TryGetValue(streamName, out var streamAssignment))
{
var isNew = !streamAssignment.Consumers.ContainsKey(consumerName);
streamAssignment.Consumers[consumerName] = new ConsumerAssignment
{
ConsumerName = consumerName,
StreamName = streamName,
Group = group,
};
if (isNew)
Interlocked.Increment(ref _totalConsumerCount);
}
}
private void ApplyConsumerDelete(string streamName, string consumerName)
{
if (_assignments.TryGetValue(streamName, out var streamAssignment))
{
if (streamAssignment.Consumers.Remove(consumerName))
Interlocked.Decrement(ref _totalConsumerCount);
}
}
}
/// <summary>
/// Types of entries that can be proposed/applied in the meta group.
/// Go reference: jetstream_cluster.go entry type constants.
/// </summary>
public enum MetaEntryType
{
StreamCreate,
StreamDelete,
ConsumerCreate,
ConsumerDelete,
}
public sealed class MetaGroupState
@@ -67,4 +363,14 @@ public sealed class MetaGroupState
public int ClusterSize { get; init; }
public string LeaderId { get; init; } = string.Empty;
public long LeadershipVersion { get; init; }
/// <summary>
/// Number of stream assignments currently tracked by the meta group.
/// </summary>
public int AssignmentCount { get; init; }
/// <summary>
/// Total consumer count across all stream assignments.
/// </summary>
public int ConsumerCount { get; init; }
}