Implement Go-parity background flush loop (coalesce 16KB/8ms) in MsgBlock/FileStore, replace O(n) GetStateAsync with incremental counters, skip PruneExpired/LoadAsync/ PrunePerSubject when not needed, and bypass RAFT for single-replica streams. Fix counter tracking bugs in RemoveMsg/EraseMsg/TTL expiry and ObjectDisposedException races in flush loop disposal. FileStore optimizations verified with 3112/3112 JetStream tests passing; async publish benchmark remains at ~174 msg/s due to E2E protocol path bottleneck.
409 lines
16 KiB
C#
409 lines
16 KiB
C#
// Go parity: golang/nats-server/server/jetstream_helpers_test.go
|
|
// Covers: unified cluster fixture consolidating all per-suite fixtures
|
|
// into a single reusable helper used by Tasks 6-10.
|
|
// Corresponds to: checkClusterFormed, waitOnStreamLeader,
|
|
// waitOnConsumerLeader, restartServerAndWait, shutdownServerAndRemoveStorage,
|
|
// streamLeader, consumerLeader helpers in jetstream_helpers_test.go.
|
|
using System.Collections.Concurrent;
|
|
using System.Reflection;
|
|
using System.Text;
|
|
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.Api;
|
|
using NATS.Server.JetStream.Cluster;
|
|
using NATS.Server.JetStream.Consumers;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Publish;
|
|
using NATS.Server.JetStream.Validation;
|
|
|
|
namespace NATS.Server.TestUtilities;
|
|
|
|
/// <summary>
|
|
/// Unified JetStream cluster fixture that consolidates the capabilities of
|
|
/// ClusterFormationFixture, ClusterStreamFixture, ClusterMetaFixture,
|
|
/// ClusterConsumerFixture, ClusterFailoverFixture, LeaderFailoverFixture, and
|
|
/// ConsumerReplicaFixture into a single reusable helper for cluster test suites.
|
|
///
|
|
/// Go ref: jetstream_helpers_test.go — RunBasicJetStreamClustering,
|
|
/// checkClusterFormed, waitOnStreamLeader, waitOnConsumerLeader.
|
|
/// </summary>
|
|
public sealed class JetStreamClusterFixture : IAsyncDisposable
|
|
{
|
|
private readonly JetStreamMetaGroup _metaGroup;
|
|
private readonly StreamManager _streamManager;
|
|
private readonly ConsumerManager _consumerManager;
|
|
private readonly JetStreamApiRouter _router;
|
|
private readonly JetStreamPublisher _publisher;
|
|
private readonly int _nodeCount;
|
|
|
|
// Simulated node lifecycle: removed nodes are tracked here.
|
|
// Go ref: shutdownServerAndRemoveStorage, restartServerAndWait
|
|
private readonly HashSet<int> _removedNodes = [];
|
|
private readonly HashSet<int> _restartedNodes = [];
|
|
|
|
private JetStreamClusterFixture(
|
|
JetStreamMetaGroup metaGroup,
|
|
StreamManager streamManager,
|
|
ConsumerManager consumerManager,
|
|
JetStreamApiRouter router,
|
|
JetStreamPublisher publisher,
|
|
int nodeCount)
|
|
{
|
|
_metaGroup = metaGroup;
|
|
_streamManager = streamManager;
|
|
_consumerManager = consumerManager;
|
|
_router = router;
|
|
_publisher = publisher;
|
|
_nodeCount = nodeCount;
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go ref: checkClusterFormed — cluster size property
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Total number of nodes in the cluster.
|
|
/// Go ref: checkClusterFormed in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public int NodeCount => _nodeCount;
|
|
|
|
// ---------------------------------------------------------------
|
|
// Factory
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Creates and returns a cluster fixture with the given number of nodes.
|
|
/// Go ref: RunBasicJetStreamClustering in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public static Task<JetStreamClusterFixture> StartAsync(int nodes)
|
|
{
|
|
var meta = new JetStreamMetaGroup(nodes);
|
|
var consumerManager = new ConsumerManager(meta);
|
|
var streamManager = new StreamManager(meta, consumerManager: consumerManager);
|
|
var router = new JetStreamApiRouter(streamManager, consumerManager, meta);
|
|
var publisher = new JetStreamPublisher(streamManager);
|
|
return Task.FromResult(new JetStreamClusterFixture(meta, streamManager, consumerManager, router, publisher, nodes));
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Stream operations
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Creates (or updates) a stream with the given name, subjects, replica count,
|
|
/// and optional storage type. Throws on error.
|
|
/// Go ref: addStreamWithError in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public Task<JetStreamApiResponse> CreateStreamAsync(
|
|
string name,
|
|
string[] subjects,
|
|
int replicas,
|
|
StorageType storage = StorageType.Memory)
|
|
{
|
|
var response = _streamManager.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = name,
|
|
Subjects = [.. subjects],
|
|
Replicas = replicas,
|
|
Storage = storage,
|
|
});
|
|
return Task.FromResult(response);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a stream directly from a full StreamConfig. Does not throw on error.
|
|
/// Go ref: addStreamWithError in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public JetStreamApiResponse CreateStreamDirect(StreamConfig config)
|
|
=> _streamManager.CreateOrUpdate(config);
|
|
|
|
/// <summary>
|
|
/// Updates an existing stream's subjects, replica count, and optional max messages.
|
|
/// Go ref: updateStream in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public JetStreamApiResponse UpdateStream(string name, string[] subjects, int replicas, int maxMsgs = 0)
|
|
{
|
|
// Go: stream update rejects unknown stream names — must exist before update.
|
|
if (!_streamManager.TryGet(name, out var existing))
|
|
return JetStreamApiResponse.ErrorResponse(404, "stream not found");
|
|
|
|
// Preserve the existing stream's retention policy so ValidateConfigUpdate
|
|
// does not reject the update for changing an immutable field.
|
|
var retention = existing.Config.Retention;
|
|
|
|
return _streamManager.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = name,
|
|
Subjects = [.. subjects],
|
|
Replicas = replicas,
|
|
MaxMsgs = maxMsgs,
|
|
Retention = retention,
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the full stream info response.
|
|
/// Go ref: getStreamInfo in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public Task<JetStreamApiResponse> GetStreamInfoAsync(string name)
|
|
=> Task.FromResult(_streamManager.GetInfo(name));
|
|
|
|
/// <summary>
|
|
/// Returns the stream's current state (message count, sequences, bytes).
|
|
/// Go ref: getStreamInfo().State in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public Task<ApiStreamState> GetStreamStateAsync(string name)
|
|
=> _streamManager.GetStateAsync(name, default).AsTask();
|
|
|
|
/// <summary>
|
|
/// Returns the storage backend type string ("memory" or "file") for a stream.
|
|
/// </summary>
|
|
public string GetStoreBackendType(string name)
|
|
=> _streamManager.GetStoreBackendType(name);
|
|
|
|
// ---------------------------------------------------------------
|
|
// Publish
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Publishes a message to the given subject and notifies any push consumers.
|
|
/// Throws if the subject does not match a stream.
|
|
/// Go ref: sendStreamMsg in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public Task<PubAck> PublishAsync(string subject, string payload)
|
|
{
|
|
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack))
|
|
{
|
|
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle))
|
|
{
|
|
var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
|
|
if (stored != null)
|
|
_consumerManager.OnPublished(ack.Stream, stored);
|
|
}
|
|
|
|
return Task.FromResult(ack);
|
|
}
|
|
|
|
throw new InvalidOperationException($"Publish to '{subject}' did not match a stream.");
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Consumer operations
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Creates (or updates) a durable consumer on the given stream.
|
|
/// Go ref: addConsumer in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public Task<JetStreamApiResponse> CreateConsumerAsync(
|
|
string stream,
|
|
string durableName,
|
|
string? filterSubject = null,
|
|
AckPolicy ackPolicy = AckPolicy.None)
|
|
{
|
|
var config = new ConsumerConfig
|
|
{
|
|
DurableName = durableName,
|
|
AckPolicy = ackPolicy,
|
|
};
|
|
if (!string.IsNullOrWhiteSpace(filterSubject))
|
|
config.FilterSubject = filterSubject;
|
|
|
|
return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Fetches up to <paramref name="batch"/> messages from the named consumer.
|
|
/// Go ref: fetchMsgs in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
|
|
=> _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
|
|
|
|
/// <summary>
|
|
/// Acknowledges all messages up to and including the given sequence.
|
|
/// Go ref: sendAck / ackAll in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public void AckAll(string stream, string durableName, ulong sequence)
|
|
=> _consumerManager.AckAll(stream, durableName, sequence);
|
|
|
|
// ---------------------------------------------------------------
|
|
// API routing
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Routes a raw JetStream API request by subject and returns the response.
|
|
/// Go ref: nc.Request() in cluster test helpers.
|
|
/// </summary>
|
|
public Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
|
|
{
|
|
var response = _router.Route(subject, Encoding.UTF8.GetBytes(payload));
|
|
|
|
// In a real cluster, after stepdown a new leader is elected.
|
|
// Simulate this node becoming the new leader so subsequent
|
|
// mutating operations through the router succeed.
|
|
if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal) && response.Success)
|
|
_metaGroup.BecomeLeader();
|
|
|
|
return Task.FromResult(response);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Leader operations
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Returns the meta-cluster leader ID.
|
|
/// Go ref: c.leader() in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public string GetMetaLeaderId()
|
|
=> _metaGroup.GetState().LeaderId;
|
|
|
|
/// <summary>
|
|
/// Steps down the current meta-cluster leader, electing a new one.
|
|
/// Go ref: c.leader().Shutdown() in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public void StepDownMetaLeader()
|
|
{
|
|
_metaGroup.StepDown();
|
|
// In a real cluster, a new leader is elected after stepdown.
|
|
// Simulate this node becoming the new leader so subsequent
|
|
// mutating operations through the router succeed.
|
|
_metaGroup.BecomeLeader();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the current meta-group state snapshot.
|
|
/// Go ref: getMetaState in tests.
|
|
/// </summary>
|
|
public MetaGroupState? GetMetaState()
|
|
=> _metaGroup.GetState();
|
|
|
|
/// <summary>
|
|
/// Steps down the current stream leader, electing a new one.
|
|
/// Returns the API response from the step-down request.
|
|
/// Go ref: JSApiStreamLeaderStepDownT in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public Task<JetStreamApiResponse> StepDownStreamLeaderAsync(string stream)
|
|
=> Task.FromResult(_router.Route(
|
|
$"{JetStreamApiSubjects.StreamLeaderStepdown}{stream}",
|
|
"{}"u8));
|
|
|
|
/// <summary>
|
|
/// Returns the replica group leader ID for the named stream.
|
|
/// Go ref: streamLeader in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public string GetStreamLeaderId(string stream)
|
|
{
|
|
var groups = GetReplicaGroupDictionary();
|
|
return groups.TryGetValue(stream, out var group) ? group.Leader.Id : string.Empty;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the replica group for the named stream, or null if not found.
|
|
/// Go ref: streamLeader / stream replica accessor in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public StreamReplicaGroup? GetReplicaGroup(string streamName)
|
|
{
|
|
var groups = GetReplicaGroupDictionary();
|
|
return groups.TryGetValue(streamName, out var g) ? g : null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns a simulated consumer leader ID derived from the stream's replica
|
|
/// group leader. In Go, each consumer has its own RAFT group; here we derive
|
|
/// from the stream group leader since per-consumer RAFT groups are not yet
|
|
/// implemented independently.
|
|
/// Go ref: consumerLeader in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public string GetConsumerLeaderId(string stream, string consumer)
|
|
{
|
|
// Consumers share the stream's RAFT group in this model.
|
|
// Return a deterministic consumer-scoped leader derived from the stream leader.
|
|
var streamLeader = GetStreamLeaderId(stream);
|
|
if (string.IsNullOrEmpty(streamLeader))
|
|
return string.Empty;
|
|
|
|
// Include the consumer name hash to make the ID consumer-scoped
|
|
// while still being deterministic and non-empty.
|
|
return $"{streamLeader}/consumer/{consumer}";
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go ref: waitOnStreamLeader — wait until a stream has a leader
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Waits until the named stream has a non-empty leader ID, polling every 10ms.
|
|
/// Throws TimeoutException if the leader is not elected within the timeout.
|
|
/// Go ref: waitOnStreamLeader in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public async Task WaitOnStreamLeaderAsync(string stream, int timeoutMs = 5000)
|
|
{
|
|
await PollHelper.WaitOrThrowAsync(
|
|
() => !string.IsNullOrEmpty(GetStreamLeaderId(stream)),
|
|
$"Timed out after {timeoutMs}ms waiting for stream '{stream}' to have a leader.",
|
|
timeoutMs);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go ref: waitOnConsumerLeader — wait until a consumer has a leader
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Waits until the named consumer on the named stream has a non-empty leader ID,
|
|
/// polling every 10ms. Throws TimeoutException if not elected within the timeout.
|
|
/// Go ref: waitOnConsumerLeader in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public async Task WaitOnConsumerLeaderAsync(string stream, string consumer, int timeoutMs = 5000)
|
|
{
|
|
await PollHelper.WaitOrThrowAsync(
|
|
() => _consumerManager.TryGet(stream, consumer, out _)
|
|
&& !string.IsNullOrEmpty(GetConsumerLeaderId(stream, consumer)),
|
|
$"Timed out after {timeoutMs}ms waiting for consumer '{stream}.{consumer}' to have a leader.",
|
|
timeoutMs);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go ref: restartServerAndWait — simulate node restart
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Simulates a node restart by removing it from the removed set and recording
|
|
/// it as restarted. In the full runtime, a restarted node rejoins the cluster
|
|
/// and syncs state. Here it is a lifecycle marker for tests that track node restarts.
|
|
/// Go ref: restartServerAndWait in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public void SimulateNodeRestart(int nodeIndex)
|
|
{
|
|
_removedNodes.Remove(nodeIndex);
|
|
_restartedNodes.Add(nodeIndex);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go ref: shutdownServerAndRemoveStorage — remove a node
|
|
// ---------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Simulates removing a node from the cluster (shutdown + storage removal).
|
|
/// Records the node index as removed.
|
|
/// Go ref: shutdownServerAndRemoveStorage in jetstream_helpers_test.go.
|
|
/// </summary>
|
|
public void RemoveNode(int nodeIndex)
|
|
{
|
|
_removedNodes.Add(nodeIndex);
|
|
_restartedNodes.Remove(nodeIndex);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------
|
|
|
|
private ConcurrentDictionary<string, StreamReplicaGroup> GetReplicaGroupDictionary()
|
|
{
|
|
var field = typeof(StreamManager)
|
|
.GetField("_replicaGroups", BindingFlags.NonPublic | BindingFlags.Instance)!;
|
|
return (ConcurrentDictionary<string, StreamReplicaGroup>)field.GetValue(_streamManager)!;
|
|
}
|
|
|
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
|
}
|