Implements the background loop (Go: monitorCluster) that reads RaftLogEntry items from a channel and dispatches assignStream, removeStream, assignConsumer, removeConsumer, and snapshot operations to JetStreamMetaGroup. - Add five public mutation helpers to JetStreamMetaGroup (AddStreamAssignment, RemoveStreamAssignment, AddConsumerAssignment, RemoveConsumerAssignment, ReplaceAllAssignments) used by the monitor path - JetStreamClusterMonitor: async loop over ChannelReader<RaftLogEntry>, JSON dispatch, ILogger injection with NullLogger default, malformed entries logged and skipped rather than aborting the loop - WaitForProcessedAsync uses Monitor.PulseAll for race-free test synchronisation with no Task.Delay — satisfies slopwatch SW003/SW004 rules - 10 new targeted tests all pass; 1101 cluster regression tests unchanged
345 lines
12 KiB
C#
345 lines
12 KiB
C#
using System.Threading.Channels;
|
|
using NATS.Server.JetStream.Cluster;
|
|
using NATS.Server.Raft;
|
|
|
|
namespace NATS.Server.Tests.JetStream.Cluster;
|
|
|
|
/// <summary>
|
|
/// Tests for JetStreamClusterMonitor — background meta entry processing.
|
|
/// Go reference: jetstream_cluster.go:1455-1825 (monitorCluster).
|
|
/// </summary>
|
|
public class JetStreamClusterMonitorTests
|
|
{
|
|
// Each test uses a 5-second CancellationToken as a hard upper bound so a
|
|
// hung monitor doesn't stall the test run indefinitely.
|
|
private static CancellationTokenSource TestTimeout() =>
|
|
new(TimeSpan.FromSeconds(5));
|
|
|
|
[Fact]
|
|
public async Task Monitor_processes_stream_assignment_entry()
|
|
{
|
|
// Go reference: jetstream_cluster.go monitorCluster assignStream op
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
var assignJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignStream",
|
|
StreamName = "test-stream",
|
|
Peers = new[] { "n1", "n2", "n3" },
|
|
Config = """{"subjects":["test.>"]}""",
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson));
|
|
await monitor.WaitForProcessedAsync(1, cts.Token);
|
|
|
|
meta.StreamCount.ShouldBe(1);
|
|
meta.GetStreamAssignment("test-stream").ShouldNotBeNull();
|
|
|
|
cts.Cancel();
|
|
await monitorTask;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Monitor_processes_consumer_assignment_entry()
|
|
{
|
|
// Go reference: jetstream_cluster.go monitorCluster assignConsumer op
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
var streamJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignStream",
|
|
StreamName = "s1",
|
|
Peers = new[] { "n1", "n2", "n3" },
|
|
Config = """{"subjects":["x.>"]}""",
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, streamJson));
|
|
|
|
var consumerJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignConsumer",
|
|
StreamName = "s1",
|
|
ConsumerName = "c1",
|
|
Peers = new[] { "n1", "n2", "n3" },
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, consumerJson));
|
|
await monitor.WaitForProcessedAsync(2, cts.Token);
|
|
|
|
meta.ConsumerCount.ShouldBe(1);
|
|
|
|
cts.Cancel();
|
|
await monitorTask;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Monitor_processes_stream_removal()
|
|
{
|
|
// Go reference: jetstream_cluster.go monitorCluster removeStream op
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
var assignJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignStream",
|
|
StreamName = "to-remove",
|
|
Peers = new[] { "n1", "n2", "n3" },
|
|
Config = """{"subjects":["rm.>"]}""",
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson));
|
|
|
|
var removeJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "removeStream",
|
|
StreamName = "to-remove",
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, removeJson));
|
|
await monitor.WaitForProcessedAsync(2, cts.Token);
|
|
|
|
meta.StreamCount.ShouldBe(0);
|
|
|
|
cts.Cancel();
|
|
await monitorTask;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Monitor_applies_meta_snapshot()
|
|
{
|
|
// Go reference: jetstream_cluster.go monitorCluster snapshot op — replaces all state
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
var assignments = new Dictionary<string, StreamAssignment>
|
|
{
|
|
["snap-stream"] = new StreamAssignment
|
|
{
|
|
StreamName = "snap-stream",
|
|
Group = new RaftGroup { Name = "rg-snap", Peers = ["n1", "n2", "n3"] },
|
|
},
|
|
};
|
|
var snapshotB64 = Convert.ToBase64String(MetaSnapshotCodec.Encode(assignments));
|
|
|
|
var snapshotJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "snapshot",
|
|
Data = snapshotB64,
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, snapshotJson));
|
|
await monitor.WaitForProcessedAsync(1, cts.Token);
|
|
|
|
meta.StreamCount.ShouldBe(1);
|
|
meta.GetStreamAssignment("snap-stream").ShouldNotBeNull();
|
|
|
|
cts.Cancel();
|
|
await monitorTask;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Monitor_processes_consumer_removal()
|
|
{
|
|
// Go reference: jetstream_cluster.go monitorCluster removeConsumer op
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
var streamJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignStream",
|
|
StreamName = "s1",
|
|
Peers = new[] { "n1", "n2", "n3" },
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, streamJson));
|
|
|
|
var consumerJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignConsumer",
|
|
StreamName = "s1",
|
|
ConsumerName = "c1",
|
|
Peers = new[] { "n1", "n2", "n3" },
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, consumerJson));
|
|
await monitor.WaitForProcessedAsync(2, cts.Token);
|
|
|
|
meta.ConsumerCount.ShouldBe(1);
|
|
|
|
var removeJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "removeConsumer",
|
|
StreamName = "s1",
|
|
ConsumerName = "c1",
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(3, 1, removeJson));
|
|
await monitor.WaitForProcessedAsync(3, cts.Token);
|
|
|
|
meta.ConsumerCount.ShouldBe(0);
|
|
|
|
cts.Cancel();
|
|
await monitorTask;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Monitor_skips_malformed_entries()
|
|
{
|
|
// Go reference: jetstream_cluster.go monitorCluster — malformed entries must not abort the loop
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, "not-json"));
|
|
|
|
var assignJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignStream",
|
|
StreamName = "after-bad",
|
|
Peers = new[] { "n1", "n2", "n3" },
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, assignJson));
|
|
await monitor.WaitForProcessedAsync(2, cts.Token);
|
|
|
|
meta.StreamCount.ShouldBe(1);
|
|
meta.GetStreamAssignment("after-bad").ShouldNotBeNull();
|
|
|
|
cts.Cancel();
|
|
await monitorTask;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Monitor_stops_on_cancellation()
|
|
{
|
|
// Go reference: jetstream_cluster.go monitorCluster shuts down cleanly when stop channel closes
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
cts.Cancel();
|
|
await monitorTask; // Should complete without throwing
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Monitor_ignores_entry_with_no_op_field()
|
|
{
|
|
// Entries missing the "Op" property are silently ignored (forward-compat).
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, """{"NotOp":"whatever"}"""));
|
|
|
|
var assignJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignStream",
|
|
StreamName = "after-no-op",
|
|
Peers = new[] { "n1" },
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, assignJson));
|
|
await monitor.WaitForProcessedAsync(2, cts.Token);
|
|
|
|
meta.StreamCount.ShouldBe(1);
|
|
|
|
cts.Cancel();
|
|
await monitorTask;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Monitor_ignores_unknown_op()
|
|
{
|
|
// Unknown op names are silently ignored — forward compatibility.
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, """{"Op":"futureFoo","Data":"xyz"}"""));
|
|
|
|
var assignJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignStream",
|
|
StreamName = "after-unknown-op",
|
|
Peers = new[] { "n1" },
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, assignJson));
|
|
await monitor.WaitForProcessedAsync(2, cts.Token);
|
|
|
|
meta.StreamCount.ShouldBe(1);
|
|
|
|
cts.Cancel();
|
|
await monitorTask;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Monitor_snapshot_replaces_existing_state()
|
|
{
|
|
// Go reference: jetstream_cluster.go — snapshot apply wipes old assignments
|
|
var meta = new JetStreamMetaGroup(3);
|
|
var channel = Channel.CreateUnbounded<RaftLogEntry>();
|
|
var monitor = new JetStreamClusterMonitor(meta, channel.Reader);
|
|
|
|
using var cts = TestTimeout();
|
|
var monitorTask = monitor.StartAsync(cts.Token);
|
|
|
|
var assignJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "assignStream",
|
|
StreamName = "old-stream",
|
|
Peers = new[] { "n1" },
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(1, 1, assignJson));
|
|
await monitor.WaitForProcessedAsync(1, cts.Token);
|
|
|
|
meta.StreamCount.ShouldBe(1);
|
|
|
|
var newAssignments = new Dictionary<string, StreamAssignment>
|
|
{
|
|
["new-stream"] = new StreamAssignment
|
|
{
|
|
StreamName = "new-stream",
|
|
Group = new RaftGroup { Name = "rg-new", Peers = ["n1", "n2", "n3"] },
|
|
},
|
|
};
|
|
var snapshotB64 = Convert.ToBase64String(MetaSnapshotCodec.Encode(newAssignments));
|
|
var snapshotJson = System.Text.Json.JsonSerializer.Serialize(new
|
|
{
|
|
Op = "snapshot",
|
|
Data = snapshotB64,
|
|
});
|
|
await channel.Writer.WriteAsync(new RaftLogEntry(2, 1, snapshotJson));
|
|
await monitor.WaitForProcessedAsync(2, cts.Token);
|
|
|
|
meta.StreamCount.ShouldBe(1);
|
|
meta.GetStreamAssignment("old-stream").ShouldBeNull();
|
|
meta.GetStreamAssignment("new-stream").ShouldNotBeNull();
|
|
|
|
cts.Cancel();
|
|
await monitorTask;
|
|
}
|
|
}
|