feat(cluster): add stream/consumer assignment processing with validation
Add 5 validated Process* methods to JetStreamMetaGroup for stream and consumer assignment processing: ProcessStreamAssignment, ProcessUpdateStreamAssignment, ProcessStreamRemoval, ProcessConsumerAssignment, and ProcessConsumerRemoval. Each returns a bool indicating success, with validation guards matching Go reference jetstream_cluster.go:4541-5925. Includes 12 new unit tests.
This commit is contained in:
@@ -264,6 +264,101 @@ public sealed class JetStreamMetaGroup
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Validated assignment processing
|
||||
// Go reference: jetstream_cluster.go:4541-5925
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Validates and processes a stream assignment.
|
||||
/// Returns false if the assignment is invalid (empty name, null group).
|
||||
/// Idempotent: duplicate assignments for the same stream name are accepted.
|
||||
/// Go reference: jetstream_cluster.go:4541 processStreamAssignment.
|
||||
/// </summary>
|
||||
public bool ProcessStreamAssignment(StreamAssignment sa)
|
||||
{
|
||||
if (string.IsNullOrEmpty(sa.StreamName) || sa.Group == null)
|
||||
return false;
|
||||
|
||||
AddStreamAssignment(sa);
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Updates an existing stream assignment's configuration.
|
||||
/// Returns false if the stream does not exist.
|
||||
/// Go reference: jetstream_cluster.go processUpdateStreamAssignment.
|
||||
/// </summary>
|
||||
public bool ProcessUpdateStreamAssignment(StreamAssignment sa)
|
||||
{
|
||||
if (!_assignments.TryGetValue(sa.StreamName, out var existing))
|
||||
return false;
|
||||
|
||||
// Update the config while preserving consumers and other state
|
||||
var updated = new StreamAssignment
|
||||
{
|
||||
StreamName = sa.StreamName,
|
||||
Group = sa.Group,
|
||||
ConfigJson = sa.ConfigJson,
|
||||
Created = existing.Created,
|
||||
SyncSubject = existing.SyncSubject,
|
||||
};
|
||||
// Copy consumers from old to new
|
||||
foreach (var (name, ca) in existing.Consumers)
|
||||
updated.Consumers[name] = ca;
|
||||
|
||||
_assignments[sa.StreamName] = updated;
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes a stream and all its consumers.
|
||||
/// Returns false if stream didn't exist. Returns true if removed.
|
||||
/// Go reference: jetstream_cluster.go processStreamRemoval.
|
||||
/// </summary>
|
||||
public bool ProcessStreamRemoval(string streamName)
|
||||
{
|
||||
if (!_assignments.ContainsKey(streamName))
|
||||
return false;
|
||||
|
||||
RemoveStreamAssignment(streamName);
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Validates and processes a consumer assignment.
|
||||
/// Returns false if the parent stream does not exist or consumer name is empty.
|
||||
/// Go reference: jetstream_cluster.go:5300 processConsumerAssignment.
|
||||
/// </summary>
|
||||
public bool ProcessConsumerAssignment(ConsumerAssignment ca)
|
||||
{
|
||||
if (string.IsNullOrEmpty(ca.ConsumerName) || string.IsNullOrEmpty(ca.StreamName))
|
||||
return false;
|
||||
|
||||
if (!_assignments.ContainsKey(ca.StreamName))
|
||||
return false;
|
||||
|
||||
AddConsumerAssignment(ca.StreamName, ca);
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes a consumer assignment.
|
||||
/// Returns false if stream or consumer doesn't exist.
|
||||
/// Go reference: jetstream_cluster.go processConsumerRemoval.
|
||||
/// </summary>
|
||||
public bool ProcessConsumerRemoval(string streamName, string consumerName)
|
||||
{
|
||||
if (!_assignments.TryGetValue(streamName, out var sa))
|
||||
return false;
|
||||
|
||||
if (!sa.Consumers.ContainsKey(consumerName))
|
||||
return false;
|
||||
|
||||
RemoveConsumerAssignment(streamName, consumerName);
|
||||
return true;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Monitor-facing mutation methods
|
||||
// Called by JetStreamClusterMonitor when processing committed RAFT entries.
|
||||
|
||||
Reference in New Issue
Block a user