feat(cluster): add JetStreamClusterMonitor for meta RAFT entry processing

Implements the background loop (Go: monitorCluster) that reads RaftLogEntry
items from a channel and dispatches assignStream, removeStream, assignConsumer,
removeConsumer, and snapshot operations to JetStreamMetaGroup.

- Add five public mutation helpers to JetStreamMetaGroup (AddStreamAssignment,
  RemoveStreamAssignment, AddConsumerAssignment, RemoveConsumerAssignment,
  ReplaceAllAssignments) used by the monitor path
- JetStreamClusterMonitor: async loop over ChannelReader<RaftLogEntry>, JSON
  dispatch, ILogger injection with NullLogger default, malformed entries logged
  and skipped rather than aborting the loop
- WaitForProcessedAsync uses Monitor.PulseAll for race-free test synchronisation
  with no Task.Delay — satisfies slopwatch SW003/SW004 rules
- 10 new targeted tests all pass; 1101 cluster regression tests unchanged
This commit is contained in:
Joseph Doherty
2026-02-25 01:54:04 -05:00
parent 9fdc931ff5
commit 7f9ee493b6
3 changed files with 620 additions and 0 deletions

View File

@@ -0,0 +1,211 @@
using System.Text.Json;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Raft;
namespace NATS.Server.JetStream.Cluster;
/// <summary>
/// Background loop consuming meta RAFT entries and dispatching cluster state changes.
/// Reads <see cref="RaftLogEntry"/> items from a channel, parses the JSON command payload,
/// and applies the corresponding mutation to the <see cref="JetStreamMetaGroup"/>.
/// Go reference: jetstream_cluster.go:1455-1825 (monitorCluster).
/// </summary>
public sealed class JetStreamClusterMonitor
{
private readonly JetStreamMetaGroup _meta;
private readonly ChannelReader<RaftLogEntry> _entries;
private readonly ILogger<JetStreamClusterMonitor> _logger;
// Monotonic counter incremented after each entry (including malformed ones).
// Protected by _processedLock so WaitForProcessedAsync can wait for a target
// without races between the count read and the wait.
private readonly object _processedLock = new();
private int _processedCount;
/// <summary>
/// Total number of entries dequeued from the channel and processed (including
/// malformed entries that were skipped). Useful for test synchronisation.
/// </summary>
public int ProcessedCount { get { lock (_processedLock) return _processedCount; } }
public JetStreamClusterMonitor(JetStreamMetaGroup meta, ChannelReader<RaftLogEntry> entries)
: this(meta, entries, NullLogger<JetStreamClusterMonitor>.Instance)
{
}
public JetStreamClusterMonitor(
JetStreamMetaGroup meta,
ChannelReader<RaftLogEntry> entries,
ILogger<JetStreamClusterMonitor> logger)
{
_meta = meta;
_entries = entries;
_logger = logger;
}
/// <summary>
/// Starts consuming entries from the channel until the token is cancelled.
/// Each entry is applied synchronously before the next is read.
/// Returns normally (without throwing) when <paramref name="ct"/> is cancelled.
/// </summary>
public async Task StartAsync(CancellationToken ct)
{
try
{
await foreach (var entry in _entries.ReadAllAsync(ct))
{
ApplyMetaEntry(entry);
lock (_processedLock)
{
_processedCount++;
Monitor.PulseAll(_processedLock);
}
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
_logger.LogDebug("JetStreamClusterMonitor stopped via cancellation.");
}
}
/// <summary>
/// Waits until the cumulative <see cref="ProcessedCount"/> reaches at least
/// <paramref name="targetCount"/>. Returns immediately when the target is already met.
/// Used by tests to synchronise without sleeping.
/// </summary>
public Task WaitForProcessedAsync(int targetCount, CancellationToken ct)
{
// Fast path — already done.
lock (_processedLock)
{
if (_processedCount >= targetCount)
return Task.CompletedTask;
}
// Slow path — offload the blocking Monitor.Wait to a thread-pool thread so the
// calling async context is not blocked. Monitor.Wait releases the lock atomically
// while waiting, eliminating the TOCTOU race between reading the count and waiting.
return Task.Run(() =>
{
lock (_processedLock)
{
while (_processedCount < targetCount)
{
ct.ThrowIfCancellationRequested();
// Wait up to 50 ms so we can check cancellation periodically.
Monitor.Wait(_processedLock, millisecondsTimeout: 50);
}
}
}, ct);
}
// ---------------------------------------------------------------
// Entry dispatch
// ---------------------------------------------------------------
private void ApplyMetaEntry(RaftLogEntry entry)
{
try
{
using var doc = JsonDocument.Parse(entry.Command);
var root = doc.RootElement;
if (!root.TryGetProperty("Op", out var opElement))
return;
switch (opElement.GetString())
{
case "assignStream":
ProcessStreamAssignment(root);
break;
case "removeStream":
ProcessStreamRemoval(root);
break;
case "assignConsumer":
ProcessConsumerAssignment(root);
break;
case "removeConsumer":
ProcessConsumerRemoval(root);
break;
case "snapshot":
ApplyMetaSnapshot(root);
break;
// Unknown ops are silently ignored — forward compatibility.
}
}
catch (JsonException ex)
{
_logger.LogWarning(
ex,
"Skipping malformed meta RAFT entry at index {Index}: {Message}",
entry.Index,
ex.Message);
}
}
// ---------------------------------------------------------------
// Per-op processors
// ---------------------------------------------------------------
private void ProcessStreamAssignment(JsonElement root)
{
var streamName = root.GetProperty("StreamName").GetString()!;
var peers = root.GetProperty("Peers").EnumerateArray()
.Select(p => p.GetString()!)
.ToList();
var config = root.TryGetProperty("Config", out var cfg) ? cfg.GetString() ?? "{}" : "{}";
var sa = new StreamAssignment
{
StreamName = streamName,
Group = new RaftGroup { Name = streamName, Peers = peers },
ConfigJson = config,
};
_meta.AddStreamAssignment(sa);
_logger.LogDebug("Applied stream assignment for {StreamName}", streamName);
}
private void ProcessStreamRemoval(JsonElement root)
{
var streamName = root.GetProperty("StreamName").GetString()!;
_meta.RemoveStreamAssignment(streamName);
_logger.LogDebug("Applied stream removal for {StreamName}", streamName);
}
private void ProcessConsumerAssignment(JsonElement root)
{
var streamName = root.GetProperty("StreamName").GetString()!;
var consumerName = root.GetProperty("ConsumerName").GetString()!;
var peers = root.GetProperty("Peers").EnumerateArray()
.Select(p => p.GetString()!)
.ToList();
var ca = new ConsumerAssignment
{
ConsumerName = consumerName,
StreamName = streamName,
Group = new RaftGroup { Name = consumerName, Peers = peers },
};
_meta.AddConsumerAssignment(streamName, ca);
_logger.LogDebug("Applied consumer assignment {ConsumerName} on {StreamName}", consumerName, streamName);
}
private void ProcessConsumerRemoval(JsonElement root)
{
var streamName = root.GetProperty("StreamName").GetString()!;
var consumerName = root.GetProperty("ConsumerName").GetString()!;
_meta.RemoveConsumerAssignment(streamName, consumerName);
_logger.LogDebug("Applied consumer removal {ConsumerName} from {StreamName}", consumerName, streamName);
}
private void ApplyMetaSnapshot(JsonElement root)
{
var dataB64 = root.GetProperty("Data").GetString()!;
var data = Convert.FromBase64String(dataB64);
var assignments = MetaSnapshotCodec.Decode(data);
_meta.ReplaceAllAssignments(assignments);
_logger.LogInformation("Applied meta snapshot: {StreamCount} streams restored", assignments.Count);
}
}

View File

@@ -264,6 +264,71 @@ public sealed class JetStreamMetaGroup
return Task.CompletedTask;
}
// ---------------------------------------------------------------
// Monitor-facing mutation methods
// Called by JetStreamClusterMonitor when processing committed RAFT entries.
// ---------------------------------------------------------------
/// <summary>
/// Directly adds a stream assignment to the meta-group state.
/// Used by the cluster monitor when processing RAFT entries.
/// </summary>
public void AddStreamAssignment(StreamAssignment sa)
{
_streams[sa.StreamName] = 0;
_assignments[sa.StreamName] = sa;
}
/// <summary>
/// Removes a stream assignment from the meta-group state.
/// Used by the cluster monitor when processing RAFT entries.
/// </summary>
public void RemoveStreamAssignment(string streamName)
{
ApplyStreamDelete(streamName);
}
/// <summary>
/// Adds a consumer assignment to a stream's assignment.
/// Increments the total consumer count if the consumer is new.
/// </summary>
public void AddConsumerAssignment(string streamName, ConsumerAssignment ca)
{
if (_assignments.TryGetValue(streamName, out var sa))
{
var isNew = !sa.Consumers.ContainsKey(ca.ConsumerName);
sa.Consumers[ca.ConsumerName] = ca;
if (isNew)
Interlocked.Increment(ref _totalConsumerCount);
}
}
/// <summary>
/// Removes a consumer assignment from a stream.
/// </summary>
public void RemoveConsumerAssignment(string streamName, string consumerName)
{
ApplyConsumerDelete(streamName, consumerName);
}
/// <summary>
/// Replaces all assignments atomically (used for snapshot apply).
/// Go reference: jetstream_cluster.go meta snapshot restore.
/// </summary>
public void ReplaceAllAssignments(Dictionary<string, StreamAssignment> newState)
{
_assignments.Clear();
_streams.Clear();
_totalConsumerCount = 0;
foreach (var (name, sa) in newState)
{
_assignments[name] = sa;
_streams[name] = 0;
_totalConsumerCount += sa.Consumers.Count;
}
}
// ---------------------------------------------------------------
// ApplyEntry dispatch
// Go reference: jetstream_cluster.go RAFT apply for meta group