// Go parity: golang/nats-server/server/jetstream_helpers_test.go // Covers: unified cluster fixture consolidating all per-suite fixtures // into a single reusable helper used by Tasks 6-10. // Corresponds to: checkClusterFormed, waitOnStreamLeader, // waitOnConsumerLeader, restartServerAndWait, shutdownServerAndRemoveStorage, // streamLeader, consumerLeader helpers in jetstream_helpers_test.go. using System.Collections.Concurrent; using System.Reflection; using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; using NATS.Server.JetStream.Validation; namespace NATS.Server.TestUtilities; /// /// Unified JetStream cluster fixture that consolidates the capabilities of /// ClusterFormationFixture, ClusterStreamFixture, ClusterMetaFixture, /// ClusterConsumerFixture, ClusterFailoverFixture, LeaderFailoverFixture, and /// ConsumerReplicaFixture into a single reusable helper for cluster test suites. /// /// Go ref: jetstream_helpers_test.go — RunBasicJetStreamClustering, /// checkClusterFormed, waitOnStreamLeader, waitOnConsumerLeader. /// public sealed class JetStreamClusterFixture : IAsyncDisposable { private readonly JetStreamMetaGroup _metaGroup; private readonly StreamManager _streamManager; private readonly ConsumerManager _consumerManager; private readonly JetStreamApiRouter _router; private readonly JetStreamPublisher _publisher; private readonly int _nodeCount; // Simulated node lifecycle: removed nodes are tracked here. // Go ref: shutdownServerAndRemoveStorage, restartServerAndWait private readonly HashSet _removedNodes = []; private readonly HashSet _restartedNodes = []; private JetStreamClusterFixture( JetStreamMetaGroup metaGroup, StreamManager streamManager, ConsumerManager consumerManager, JetStreamApiRouter router, JetStreamPublisher publisher, int nodeCount) { _metaGroup = metaGroup; _streamManager = streamManager; _consumerManager = consumerManager; _router = router; _publisher = publisher; _nodeCount = nodeCount; } // --------------------------------------------------------------- // Go ref: checkClusterFormed — cluster size property // --------------------------------------------------------------- /// /// Total number of nodes in the cluster. /// Go ref: checkClusterFormed in jetstream_helpers_test.go. /// public int NodeCount => _nodeCount; // --------------------------------------------------------------- // Factory // --------------------------------------------------------------- /// /// Creates and returns a cluster fixture with the given number of nodes. /// Go ref: RunBasicJetStreamClustering in jetstream_helpers_test.go. /// public static Task StartAsync(int nodes) { var meta = new JetStreamMetaGroup(nodes); var consumerManager = new ConsumerManager(meta); var streamManager = new StreamManager(meta, consumerManager: consumerManager); var router = new JetStreamApiRouter(streamManager, consumerManager, meta); var publisher = new JetStreamPublisher(streamManager); return Task.FromResult(new JetStreamClusterFixture(meta, streamManager, consumerManager, router, publisher, nodes)); } // --------------------------------------------------------------- // Stream operations // --------------------------------------------------------------- /// /// Creates (or updates) a stream with the given name, subjects, replica count, /// and optional storage type. Throws on error. /// Go ref: addStreamWithError in jetstream_helpers_test.go. /// public Task CreateStreamAsync( string name, string[] subjects, int replicas, StorageType storage = StorageType.Memory) { var response = _streamManager.CreateOrUpdate(new StreamConfig { Name = name, Subjects = [.. subjects], Replicas = replicas, Storage = storage, }); return Task.FromResult(response); } /// /// Creates a stream directly from a full StreamConfig. Does not throw on error. /// Go ref: addStreamWithError in jetstream_helpers_test.go. /// public JetStreamApiResponse CreateStreamDirect(StreamConfig config) => _streamManager.CreateOrUpdate(config); /// /// Updates an existing stream's subjects, replica count, and optional max messages. /// Go ref: updateStream in jetstream_helpers_test.go. /// public JetStreamApiResponse UpdateStream(string name, string[] subjects, int replicas, int maxMsgs = 0) { // Go: stream update rejects unknown stream names — must exist before update. if (!_streamManager.TryGet(name, out var existing)) return JetStreamApiResponse.ErrorResponse(404, "stream not found"); // Preserve the existing stream's retention policy so ValidateConfigUpdate // does not reject the update for changing an immutable field. var retention = existing.Config.Retention; return _streamManager.CreateOrUpdate(new StreamConfig { Name = name, Subjects = [.. subjects], Replicas = replicas, MaxMsgs = maxMsgs, Retention = retention, }); } /// /// Returns the full stream info response. /// Go ref: getStreamInfo in jetstream_helpers_test.go. /// public Task GetStreamInfoAsync(string name) => Task.FromResult(_streamManager.GetInfo(name)); /// /// Returns the stream's current state (message count, sequences, bytes). /// Go ref: getStreamInfo().State in jetstream_helpers_test.go. /// public Task GetStreamStateAsync(string name) => _streamManager.GetStateAsync(name, default).AsTask(); /// /// Returns the storage backend type string ("memory" or "file") for a stream. /// public string GetStoreBackendType(string name) => _streamManager.GetStoreBackendType(name); // --------------------------------------------------------------- // Publish // --------------------------------------------------------------- /// /// Publishes a message to the given subject and notifies any push consumers. /// Throws if the subject does not match a stream. /// Go ref: sendStreamMsg in jetstream_helpers_test.go. /// public Task PublishAsync(string subject, string payload) { if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack)) { if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle)) { var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult(); if (stored != null) _consumerManager.OnPublished(ack.Stream, stored); } return Task.FromResult(ack); } throw new InvalidOperationException($"Publish to '{subject}' did not match a stream."); } // --------------------------------------------------------------- // Consumer operations // --------------------------------------------------------------- /// /// Creates (or updates) a durable consumer on the given stream. /// Go ref: addConsumer in jetstream_helpers_test.go. /// public Task CreateConsumerAsync( string stream, string durableName, string? filterSubject = null, AckPolicy ackPolicy = AckPolicy.None) { var config = new ConsumerConfig { DurableName = durableName, AckPolicy = ackPolicy, }; if (!string.IsNullOrWhiteSpace(filterSubject)) config.FilterSubject = filterSubject; return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config)); } /// /// Fetches up to messages from the named consumer. /// Go ref: fetchMsgs in jetstream_helpers_test.go. /// public Task FetchAsync(string stream, string durableName, int batch) => _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask(); /// /// Acknowledges all messages up to and including the given sequence. /// Go ref: sendAck / ackAll in jetstream_helpers_test.go. /// public void AckAll(string stream, string durableName, ulong sequence) => _consumerManager.AckAll(stream, durableName, sequence); // --------------------------------------------------------------- // API routing // --------------------------------------------------------------- /// /// Routes a raw JetStream API request by subject and returns the response. /// Go ref: nc.Request() in cluster test helpers. /// public Task RequestAsync(string subject, string payload) { var response = _router.Route(subject, Encoding.UTF8.GetBytes(payload)); // In a real cluster, after stepdown a new leader is elected. // Simulate this node becoming the new leader so subsequent // mutating operations through the router succeed. if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal) && response.Success) _metaGroup.BecomeLeader(); return Task.FromResult(response); } // --------------------------------------------------------------- // Leader operations // --------------------------------------------------------------- /// /// Returns the meta-cluster leader ID. /// Go ref: c.leader() in jetstream_helpers_test.go. /// public string GetMetaLeaderId() => _metaGroup.GetState().LeaderId; /// /// Steps down the current meta-cluster leader, electing a new one. /// Go ref: c.leader().Shutdown() in jetstream_helpers_test.go. /// public void StepDownMetaLeader() { _metaGroup.StepDown(); // In a real cluster, a new leader is elected after stepdown. // Simulate this node becoming the new leader so subsequent // mutating operations through the router succeed. _metaGroup.BecomeLeader(); } /// /// Returns the current meta-group state snapshot. /// Go ref: getMetaState in tests. /// public MetaGroupState? GetMetaState() => _metaGroup.GetState(); /// /// Steps down the current stream leader, electing a new one. /// Returns the API response from the step-down request. /// Go ref: JSApiStreamLeaderStepDownT in jetstream_helpers_test.go. /// public Task StepDownStreamLeaderAsync(string stream) => Task.FromResult(_router.Route( $"{JetStreamApiSubjects.StreamLeaderStepdown}{stream}", "{}"u8)); /// /// Returns the replica group leader ID for the named stream. /// Go ref: streamLeader in jetstream_helpers_test.go. /// public string GetStreamLeaderId(string stream) { var groups = GetReplicaGroupDictionary(); return groups.TryGetValue(stream, out var group) ? group.Leader.Id : string.Empty; } /// /// Returns the replica group for the named stream, or null if not found. /// Go ref: streamLeader / stream replica accessor in jetstream_helpers_test.go. /// public StreamReplicaGroup? GetReplicaGroup(string streamName) { var groups = GetReplicaGroupDictionary(); return groups.TryGetValue(streamName, out var g) ? g : null; } /// /// Returns a simulated consumer leader ID derived from the stream's replica /// group leader. In Go, each consumer has its own RAFT group; here we derive /// from the stream group leader since per-consumer RAFT groups are not yet /// implemented independently. /// Go ref: consumerLeader in jetstream_helpers_test.go. /// public string GetConsumerLeaderId(string stream, string consumer) { // Consumers share the stream's RAFT group in this model. // Return a deterministic consumer-scoped leader derived from the stream leader. var streamLeader = GetStreamLeaderId(stream); if (string.IsNullOrEmpty(streamLeader)) return string.Empty; // Include the consumer name hash to make the ID consumer-scoped // while still being deterministic and non-empty. return $"{streamLeader}/consumer/{consumer}"; } // --------------------------------------------------------------- // Go ref: waitOnStreamLeader — wait until a stream has a leader // --------------------------------------------------------------- /// /// Waits until the named stream has a non-empty leader ID, polling every 10ms. /// Throws TimeoutException if the leader is not elected within the timeout. /// Go ref: waitOnStreamLeader in jetstream_helpers_test.go. /// public async Task WaitOnStreamLeaderAsync(string stream, int timeoutMs = 5000) { await PollHelper.WaitOrThrowAsync( () => !string.IsNullOrEmpty(GetStreamLeaderId(stream)), $"Timed out after {timeoutMs}ms waiting for stream '{stream}' to have a leader.", timeoutMs); } // --------------------------------------------------------------- // Go ref: waitOnConsumerLeader — wait until a consumer has a leader // --------------------------------------------------------------- /// /// Waits until the named consumer on the named stream has a non-empty leader ID, /// polling every 10ms. Throws TimeoutException if not elected within the timeout. /// Go ref: waitOnConsumerLeader in jetstream_helpers_test.go. /// public async Task WaitOnConsumerLeaderAsync(string stream, string consumer, int timeoutMs = 5000) { await PollHelper.WaitOrThrowAsync( () => _consumerManager.TryGet(stream, consumer, out _) && !string.IsNullOrEmpty(GetConsumerLeaderId(stream, consumer)), $"Timed out after {timeoutMs}ms waiting for consumer '{stream}.{consumer}' to have a leader.", timeoutMs); } // --------------------------------------------------------------- // Go ref: restartServerAndWait — simulate node restart // --------------------------------------------------------------- /// /// Simulates a node restart by removing it from the removed set and recording /// it as restarted. In the full runtime, a restarted node rejoins the cluster /// and syncs state. Here it is a lifecycle marker for tests that track node restarts. /// Go ref: restartServerAndWait in jetstream_helpers_test.go. /// public void SimulateNodeRestart(int nodeIndex) { _removedNodes.Remove(nodeIndex); _restartedNodes.Add(nodeIndex); } // --------------------------------------------------------------- // Go ref: shutdownServerAndRemoveStorage — remove a node // --------------------------------------------------------------- /// /// Simulates removing a node from the cluster (shutdown + storage removal). /// Records the node index as removed. /// Go ref: shutdownServerAndRemoveStorage in jetstream_helpers_test.go. /// public void RemoveNode(int nodeIndex) { _removedNodes.Add(nodeIndex); _restartedNodes.Remove(nodeIndex); } // --------------------------------------------------------------- // Helpers // --------------------------------------------------------------- private ConcurrentDictionary GetReplicaGroupDictionary() { var field = typeof(StreamManager) .GetField("_replicaGroups", BindingFlags.NonPublic | BindingFlags.Instance)!; return (ConcurrentDictionary)field.GetValue(_streamManager)!; } public ValueTask DisposeAsync() => ValueTask.CompletedTask; }