diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs b/src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs new file mode 100644 index 0000000..453a5aa --- /dev/null +++ b/src/NATS.Server/JetStream/Cluster/JetStreamClusterMonitor.cs @@ -0,0 +1,211 @@ +using System.Text.Json; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Raft; + +namespace NATS.Server.JetStream.Cluster; + +/// +/// Background loop consuming meta RAFT entries and dispatching cluster state changes. +/// Reads items from a channel, parses the JSON command payload, +/// and applies the corresponding mutation to the . +/// Go reference: jetstream_cluster.go:1455-1825 (monitorCluster). +/// +public sealed class JetStreamClusterMonitor +{ + private readonly JetStreamMetaGroup _meta; + private readonly ChannelReader _entries; + private readonly ILogger _logger; + + // Monotonic counter incremented after each entry (including malformed ones). + // Protected by _processedLock so WaitForProcessedAsync can wait for a target + // without races between the count read and the wait. + private readonly object _processedLock = new(); + private int _processedCount; + + /// + /// Total number of entries dequeued from the channel and processed (including + /// malformed entries that were skipped). Useful for test synchronisation. + /// + public int ProcessedCount { get { lock (_processedLock) return _processedCount; } } + + public JetStreamClusterMonitor(JetStreamMetaGroup meta, ChannelReader entries) + : this(meta, entries, NullLogger.Instance) + { + } + + public JetStreamClusterMonitor( + JetStreamMetaGroup meta, + ChannelReader entries, + ILogger logger) + { + _meta = meta; + _entries = entries; + _logger = logger; + } + + /// + /// Starts consuming entries from the channel until the token is cancelled. + /// Each entry is applied synchronously before the next is read. + /// Returns normally (without throwing) when is cancelled. + /// + public async Task StartAsync(CancellationToken ct) + { + try + { + await foreach (var entry in _entries.ReadAllAsync(ct)) + { + ApplyMetaEntry(entry); + lock (_processedLock) + { + _processedCount++; + Monitor.PulseAll(_processedLock); + } + } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + _logger.LogDebug("JetStreamClusterMonitor stopped via cancellation."); + } + } + + /// + /// Waits until the cumulative reaches at least + /// . Returns immediately when the target is already met. + /// Used by tests to synchronise without sleeping. + /// + public Task WaitForProcessedAsync(int targetCount, CancellationToken ct) + { + // Fast path — already done. + lock (_processedLock) + { + if (_processedCount >= targetCount) + return Task.CompletedTask; + } + + // Slow path — offload the blocking Monitor.Wait to a thread-pool thread so the + // calling async context is not blocked. Monitor.Wait releases the lock atomically + // while waiting, eliminating the TOCTOU race between reading the count and waiting. + return Task.Run(() => + { + lock (_processedLock) + { + while (_processedCount < targetCount) + { + ct.ThrowIfCancellationRequested(); + // Wait up to 50 ms so we can check cancellation periodically. + Monitor.Wait(_processedLock, millisecondsTimeout: 50); + } + } + }, ct); + } + + // --------------------------------------------------------------- + // Entry dispatch + // --------------------------------------------------------------- + + private void ApplyMetaEntry(RaftLogEntry entry) + { + try + { + using var doc = JsonDocument.Parse(entry.Command); + var root = doc.RootElement; + + if (!root.TryGetProperty("Op", out var opElement)) + return; + + switch (opElement.GetString()) + { + case "assignStream": + ProcessStreamAssignment(root); + break; + case "removeStream": + ProcessStreamRemoval(root); + break; + case "assignConsumer": + ProcessConsumerAssignment(root); + break; + case "removeConsumer": + ProcessConsumerRemoval(root); + break; + case "snapshot": + ApplyMetaSnapshot(root); + break; + // Unknown ops are silently ignored — forward compatibility. + } + } + catch (JsonException ex) + { + _logger.LogWarning( + ex, + "Skipping malformed meta RAFT entry at index {Index}: {Message}", + entry.Index, + ex.Message); + } + } + + // --------------------------------------------------------------- + // Per-op processors + // --------------------------------------------------------------- + + private void ProcessStreamAssignment(JsonElement root) + { + var streamName = root.GetProperty("StreamName").GetString()!; + var peers = root.GetProperty("Peers").EnumerateArray() + .Select(p => p.GetString()!) + .ToList(); + var config = root.TryGetProperty("Config", out var cfg) ? cfg.GetString() ?? "{}" : "{}"; + + var sa = new StreamAssignment + { + StreamName = streamName, + Group = new RaftGroup { Name = streamName, Peers = peers }, + ConfigJson = config, + }; + _meta.AddStreamAssignment(sa); + _logger.LogDebug("Applied stream assignment for {StreamName}", streamName); + } + + private void ProcessStreamRemoval(JsonElement root) + { + var streamName = root.GetProperty("StreamName").GetString()!; + _meta.RemoveStreamAssignment(streamName); + _logger.LogDebug("Applied stream removal for {StreamName}", streamName); + } + + private void ProcessConsumerAssignment(JsonElement root) + { + var streamName = root.GetProperty("StreamName").GetString()!; + var consumerName = root.GetProperty("ConsumerName").GetString()!; + var peers = root.GetProperty("Peers").EnumerateArray() + .Select(p => p.GetString()!) + .ToList(); + + var ca = new ConsumerAssignment + { + ConsumerName = consumerName, + StreamName = streamName, + Group = new RaftGroup { Name = consumerName, Peers = peers }, + }; + _meta.AddConsumerAssignment(streamName, ca); + _logger.LogDebug("Applied consumer assignment {ConsumerName} on {StreamName}", consumerName, streamName); + } + + private void ProcessConsumerRemoval(JsonElement root) + { + var streamName = root.GetProperty("StreamName").GetString()!; + var consumerName = root.GetProperty("ConsumerName").GetString()!; + _meta.RemoveConsumerAssignment(streamName, consumerName); + _logger.LogDebug("Applied consumer removal {ConsumerName} from {StreamName}", consumerName, streamName); + } + + private void ApplyMetaSnapshot(JsonElement root) + { + var dataB64 = root.GetProperty("Data").GetString()!; + var data = Convert.FromBase64String(dataB64); + var assignments = MetaSnapshotCodec.Decode(data); + _meta.ReplaceAllAssignments(assignments); + _logger.LogInformation("Applied meta snapshot: {StreamCount} streams restored", assignments.Count); + } +} diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 1732ec2..eddb230 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -264,6 +264,71 @@ public sealed class JetStreamMetaGroup return Task.CompletedTask; } + // --------------------------------------------------------------- + // Monitor-facing mutation methods + // Called by JetStreamClusterMonitor when processing committed RAFT entries. + // --------------------------------------------------------------- + + /// + /// Directly adds a stream assignment to the meta-group state. + /// Used by the cluster monitor when processing RAFT entries. + /// + public void AddStreamAssignment(StreamAssignment sa) + { + _streams[sa.StreamName] = 0; + _assignments[sa.StreamName] = sa; + } + + /// + /// Removes a stream assignment from the meta-group state. + /// Used by the cluster monitor when processing RAFT entries. + /// + public void RemoveStreamAssignment(string streamName) + { + ApplyStreamDelete(streamName); + } + + /// + /// Adds a consumer assignment to a stream's assignment. + /// Increments the total consumer count if the consumer is new. + /// + public void AddConsumerAssignment(string streamName, ConsumerAssignment ca) + { + if (_assignments.TryGetValue(streamName, out var sa)) + { + var isNew = !sa.Consumers.ContainsKey(ca.ConsumerName); + sa.Consumers[ca.ConsumerName] = ca; + if (isNew) + Interlocked.Increment(ref _totalConsumerCount); + } + } + + /// + /// Removes a consumer assignment from a stream. + /// + public void RemoveConsumerAssignment(string streamName, string consumerName) + { + ApplyConsumerDelete(streamName, consumerName); + } + + /// + /// Replaces all assignments atomically (used for snapshot apply). + /// Go reference: jetstream_cluster.go meta snapshot restore. + /// + public void ReplaceAllAssignments(Dictionary newState) + { + _assignments.Clear(); + _streams.Clear(); + _totalConsumerCount = 0; + + foreach (var (name, sa) in newState) + { + _assignments[name] = sa; + _streams[name] = 0; + _totalConsumerCount += sa.Consumers.Count; + } + } + // --------------------------------------------------------------- // ApplyEntry dispatch // Go reference: jetstream_cluster.go RAFT apply for meta group diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamClusterMonitorTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamClusterMonitorTests.cs new file mode 100644 index 0000000..3ca4672 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamClusterMonitorTests.cs @@ -0,0 +1,344 @@ +using System.Threading.Channels; +using NATS.Server.JetStream.Cluster; +using NATS.Server.Raft; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for JetStreamClusterMonitor — background meta entry processing. +/// Go reference: jetstream_cluster.go:1455-1825 (monitorCluster). +/// +public class JetStreamClusterMonitorTests +{ + // Each test uses a 5-second CancellationToken as a hard upper bound so a + // hung monitor doesn't stall the test run indefinitely. + private static CancellationTokenSource TestTimeout() => + new(TimeSpan.FromSeconds(5)); + + [Fact] + public async Task Monitor_processes_stream_assignment_entry() + { + // Go reference: jetstream_cluster.go monitorCluster assignStream op + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + var assignJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "test-stream", + Peers = new[] { "n1", "n2", "n3" }, + Config = """{"subjects":["test.>"]}""", + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson)); + await monitor.WaitForProcessedAsync(1, cts.Token); + + meta.StreamCount.ShouldBe(1); + meta.GetStreamAssignment("test-stream").ShouldNotBeNull(); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_processes_consumer_assignment_entry() + { + // Go reference: jetstream_cluster.go monitorCluster assignConsumer op + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + var streamJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "s1", + Peers = new[] { "n1", "n2", "n3" }, + Config = """{"subjects":["x.>"]}""", + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, streamJson)); + + var consumerJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignConsumer", + StreamName = "s1", + ConsumerName = "c1", + Peers = new[] { "n1", "n2", "n3" }, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, consumerJson)); + await monitor.WaitForProcessedAsync(2, cts.Token); + + meta.ConsumerCount.ShouldBe(1); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_processes_stream_removal() + { + // Go reference: jetstream_cluster.go monitorCluster removeStream op + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + var assignJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "to-remove", + Peers = new[] { "n1", "n2", "n3" }, + Config = """{"subjects":["rm.>"]}""", + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson)); + + var removeJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "removeStream", + StreamName = "to-remove", + }); + await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, removeJson)); + await monitor.WaitForProcessedAsync(2, cts.Token); + + meta.StreamCount.ShouldBe(0); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_applies_meta_snapshot() + { + // Go reference: jetstream_cluster.go monitorCluster snapshot op — replaces all state + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + var assignments = new Dictionary + { + ["snap-stream"] = new StreamAssignment + { + StreamName = "snap-stream", + Group = new RaftGroup { Name = "rg-snap", Peers = ["n1", "n2", "n3"] }, + }, + }; + var snapshotB64 = Convert.ToBase64String(MetaSnapshotCodec.Encode(assignments)); + + var snapshotJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "snapshot", + Data = snapshotB64, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, snapshotJson)); + await monitor.WaitForProcessedAsync(1, cts.Token); + + meta.StreamCount.ShouldBe(1); + meta.GetStreamAssignment("snap-stream").ShouldNotBeNull(); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_processes_consumer_removal() + { + // Go reference: jetstream_cluster.go monitorCluster removeConsumer op + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + var streamJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "s1", + Peers = new[] { "n1", "n2", "n3" }, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, streamJson)); + + var consumerJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignConsumer", + StreamName = "s1", + ConsumerName = "c1", + Peers = new[] { "n1", "n2", "n3" }, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, consumerJson)); + await monitor.WaitForProcessedAsync(2, cts.Token); + + meta.ConsumerCount.ShouldBe(1); + + var removeJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "removeConsumer", + StreamName = "s1", + ConsumerName = "c1", + }); + await channel.Writer.WriteAsync(new RaftLogEntry(3, 1, removeJson)); + await monitor.WaitForProcessedAsync(3, cts.Token); + + meta.ConsumerCount.ShouldBe(0); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_skips_malformed_entries() + { + // Go reference: jetstream_cluster.go monitorCluster — malformed entries must not abort the loop + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, "not-json")); + + var assignJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "after-bad", + Peers = new[] { "n1", "n2", "n3" }, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, assignJson)); + await monitor.WaitForProcessedAsync(2, cts.Token); + + meta.StreamCount.ShouldBe(1); + meta.GetStreamAssignment("after-bad").ShouldNotBeNull(); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_stops_on_cancellation() + { + // Go reference: jetstream_cluster.go monitorCluster shuts down cleanly when stop channel closes + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + cts.Cancel(); + await monitorTask; // Should complete without throwing + } + + [Fact] + public async Task Monitor_ignores_entry_with_no_op_field() + { + // Entries missing the "Op" property are silently ignored (forward-compat). + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, """{"NotOp":"whatever"}""")); + + var assignJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "after-no-op", + Peers = new[] { "n1" }, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, assignJson)); + await monitor.WaitForProcessedAsync(2, cts.Token); + + meta.StreamCount.ShouldBe(1); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_ignores_unknown_op() + { + // Unknown op names are silently ignored — forward compatibility. + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, """{"Op":"futureFoo","Data":"xyz"}""")); + + var assignJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "after-unknown-op", + Peers = new[] { "n1" }, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, assignJson)); + await monitor.WaitForProcessedAsync(2, cts.Token); + + meta.StreamCount.ShouldBe(1); + + cts.Cancel(); + await monitorTask; + } + + [Fact] + public async Task Monitor_snapshot_replaces_existing_state() + { + // Go reference: jetstream_cluster.go — snapshot apply wipes old assignments + var meta = new JetStreamMetaGroup(3); + var channel = Channel.CreateUnbounded(); + var monitor = new JetStreamClusterMonitor(meta, channel.Reader); + + using var cts = TestTimeout(); + var monitorTask = monitor.StartAsync(cts.Token); + + var assignJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "assignStream", + StreamName = "old-stream", + Peers = new[] { "n1" }, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson)); + await monitor.WaitForProcessedAsync(1, cts.Token); + + meta.StreamCount.ShouldBe(1); + + var newAssignments = new Dictionary + { + ["new-stream"] = new StreamAssignment + { + StreamName = "new-stream", + Group = new RaftGroup { Name = "rg-new", Peers = ["n1", "n2", "n3"] }, + }, + }; + var snapshotB64 = Convert.ToBase64String(MetaSnapshotCodec.Encode(newAssignments)); + var snapshotJson = System.Text.Json.JsonSerializer.Serialize(new + { + Op = "snapshot", + Data = snapshotB64, + }); + await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, snapshotJson)); + await monitor.WaitForProcessedAsync(2, cts.Token); + + meta.StreamCount.ShouldBe(1); + meta.GetStreamAssignment("old-stream").ShouldBeNull(); + meta.GetStreamAssignment("new-stream").ShouldNotBeNull(); + + cts.Cancel(); + await monitorTask; + } +}