Move shared fixtures and parity utilities to TestUtilities project

- git mv JetStreamApiFixture, JetStreamClusterFixture, LeafFixture,
  Parity utilities, and TestData from NATS.Server.Tests to
  NATS.Server.TestUtilities
- Update namespaces to NATS.Server.TestUtilities (and .Parity sub-ns)
- Make fixture classes public for cross-project access
- Add PollHelper to replace Task.Delay polling with SemaphoreSlim waits
- Refactor all fixture polling loops to use PollHelper
- Add 'using NATS.Server.TestUtilities;' to ~75 consuming test files
- Rename local fixture duplicates (MetaGroupTestFixture,
  LeafProtocolTestFixture) to avoid shadowing shared fixtures
- Remove TestData entry from NATS.Server.Tests.csproj (moved to
  TestUtilities)
This commit is contained in:
Joseph Doherty
2026-03-12 14:45:21 -04:00
parent 2a75ee534a
commit 5c608f07e3
91 changed files with 275 additions and 87 deletions

View File

@@ -0,0 +1,376 @@
using System.Text;
using System.Text.Json;
using NATS.Server.Auth;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
namespace NATS.Server.TestUtilities;
public sealed class JetStreamApiFixture : IAsyncDisposable
{
private static readonly StreamManager SharedStreamManager = new();
private static readonly ConsumerManager SharedConsumerManager = new();
private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager, SharedConsumerManager);
private readonly StreamManager _streamManager;
private readonly ConsumerManager _consumerManager;
private readonly JetStreamApiRouter _router;
private readonly JetStreamPublisher _publisher;
public JetStreamApiFixture()
{
_streamManager = new StreamManager();
_consumerManager = new ConsumerManager();
_router = new JetStreamApiRouter(_streamManager, _consumerManager);
_publisher = new JetStreamPublisher(_streamManager);
}
private JetStreamApiFixture(Account? account)
{
_streamManager = new StreamManager(account: account);
_consumerManager = new ConsumerManager();
_router = new JetStreamApiRouter(_streamManager, _consumerManager);
_publisher = new JetStreamPublisher(_streamManager);
}
public static Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
{
return Task.FromResult(SharedRouter.Route(subject, Encoding.UTF8.GetBytes(payload)));
}
public static async Task<JetStreamApiFixture> StartWithStreamAsync(string streamName, string subject, int maxMsgs = 0)
{
var fixture = new JetStreamApiFixture();
var payload = $"{{\"name\":\"{streamName}\",\"subjects\":[\"{subject}\"],\"max_msgs\":{maxMsgs}}}";
_ = await fixture.RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
return fixture;
}
public static Task<JetStreamApiFixture> StartWithStreamConfigAsync(StreamConfig config)
{
var fixture = new JetStreamApiFixture();
_ = fixture._streamManager.CreateOrUpdate(config);
return Task.FromResult(fixture);
}
public static async Task<JetStreamApiFixture> StartWithStreamJsonAsync(string json)
{
var fixture = new JetStreamApiFixture();
_ = await fixture.RequestLocalAsync("$JS.API.STREAM.CREATE.S", json);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithPullConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created");
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithPushConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.CreateConsumerAsync("ORDERS", "PUSH", "orders.created", push: true, heartbeatMs: 25);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithAckExplicitConsumerAsync(int ackWaitMs)
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created",
ackPolicy: AckPolicy.Explicit, ackWaitMs: ackWaitMs);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithAckAllConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.CreateConsumerAsync("ORDERS", "ACKALL", "orders.created", ackPolicy: AckPolicy.All);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithMirrorSetupAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
{
Name = "ORDERS_MIRROR",
Subjects = ["orders.mirror.*"],
Mirror = "ORDERS",
});
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithMultiFilterConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", ">");
_ = await fixture.CreateConsumerAsync("ORDERS", "CF", null, filterSubjects: ["orders.*"]);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithReplayOriginalConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.PublishAndGetAckAsync("orders.created", "1");
_ = await fixture.CreateConsumerAsync("ORDERS", "RO", "orders.*", replayPolicy: ReplayPolicy.Original, ackPolicy: AckPolicy.Explicit);
return fixture;
}
public static Task<JetStreamApiFixture> StartWithMultipleSourcesAsync()
{
var fixture = new JetStreamApiFixture();
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
{
Name = "SRC1",
Subjects = ["a.>"],
});
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
{
Name = "SRC2",
Subjects = ["b.>"],
});
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
{
Name = "AGG",
Subjects = ["agg.>"],
Sources =
[
new StreamSourceConfig { Name = "SRC1" },
new StreamSourceConfig { Name = "SRC2" },
],
});
return Task.FromResult(fixture);
}
public static Task<JetStreamApiFixture> StartJwtLimitedAccountAsync(int maxStreams)
{
var account = new Account("JWT-LIMITED")
{
MaxJetStreamStreams = maxStreams,
JetStreamTier = "jwt-tier",
};
return Task.FromResult(new JetStreamApiFixture(account));
}
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
{
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
{
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var streamHandle))
{
var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
if (stored != null)
_consumerManager.OnPublished(ack.Stream, stored);
}
return Task.FromResult(ack);
}
if (expectError)
return Task.FromResult(new PubAck { ErrorCode = 404 });
throw new InvalidOperationException($"No stream matched subject '{subject}'.");
}
public Task<PubAck> PublishAndGetAckAsync(string streamName, string subject, string payload)
{
return PublishAndGetAckAsync(subject, payload);
}
public Task<PubAck> PublishWithExpectedLastSeqAsync(string subject, string payload, ulong expectedLastSeq)
{
if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), new PublishOptions { ExpectedLastSeq = expectedLastSeq }, out var ack))
{
return Task.FromResult(ack);
}
return Task.FromResult(new PubAck { ErrorCode = 404 });
}
/// <summary>
/// Publishes a batch message with the Nats-Batch-Id, Nats-Batch-Sequence (and optionally
/// Nats-Batch-Commit) headers simulated via PublishOptions.
/// Returns PubAck with ErrorCode set on error, empty BatchId on staged (flow-control), or
/// full ack with BatchId+BatchSize on commit.
/// </summary>
public Task<PubAck> BatchPublishAsync(
string subject,
string payload,
string batchId,
ulong batchSeq,
string? commitValue = null,
string? msgId = null,
ulong expectedLastSeq = 0,
string? expectedLastMsgId = null)
{
var options = new PublishOptions
{
BatchId = batchId,
BatchSeq = batchSeq,
BatchCommit = commitValue,
MsgId = msgId,
ExpectedLastSeq = expectedLastSeq,
ExpectedLastMsgId = expectedLastMsgId,
};
if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), options, out var ack))
return Task.FromResult(ack);
return Task.FromResult(new PubAck { ErrorCode = 404 });
}
public StreamConfig? GetStreamConfig(string streamName)
{
return _streamManager.TryGet(streamName, out var handle) ? handle.Config : null;
}
public bool UpdateStream(StreamConfig config)
{
var result = _streamManager.CreateOrUpdate(config);
return result.Error == null;
}
public JetStreamApiResponse UpdateStreamWithResult(StreamConfig config)
{
return _streamManager.CreateOrUpdate(config);
}
/// <summary>
/// Exposes the underlying JetStreamPublisher for advanced test scenarios
/// (e.g. calling ClearBatches to simulate a leader change).
/// </summary>
public JetStreamPublisher GetPublisher() => _publisher;
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
{
return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
}
public Task<JetStreamApiResponse> CreateStreamAsync(string streamName, IReadOnlyList<string> subjects)
{
var payload = JsonSerializer.Serialize(new
{
name = streamName,
subjects,
});
return RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
}
public Task<ApiStreamState> GetStreamStateAsync(string streamName)
{
return _streamManager.GetStateAsync(streamName, default).AsTask();
}
public Task<string> GetStreamBackendTypeAsync(string streamName)
{
return Task.FromResult(_streamManager.GetStoreBackendType(streamName));
}
public Task<JetStreamApiResponse> CreateConsumerAsync(
string stream,
string durableName,
string? filterSubject,
bool push = false,
int heartbeatMs = 0,
AckPolicy ackPolicy = AckPolicy.None,
int ackWaitMs = 30_000,
int maxAckPending = 0,
IReadOnlyList<string>? filterSubjects = null,
ReplayPolicy replayPolicy = ReplayPolicy.Instant,
DeliverPolicy deliverPolicy = DeliverPolicy.All,
bool ephemeral = false)
{
var payloadObj = new
{
durable_name = durableName,
filter_subject = filterSubject,
filter_subjects = filterSubjects,
push,
heartbeat_ms = heartbeatMs,
ack_policy = ackPolicy.ToString().ToLowerInvariant(),
ack_wait_ms = ackWaitMs,
max_ack_pending = maxAckPending,
replay_policy = replayPolicy == ReplayPolicy.Original ? "original" : "instant",
deliver_policy = deliverPolicy switch
{
DeliverPolicy.Last => "last",
DeliverPolicy.New => "new",
_ => "all",
},
ephemeral,
};
var payload = JsonSerializer.Serialize(payloadObj);
return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload);
}
public async Task<JetStreamConsumerInfo> GetConsumerInfoAsync(string stream, string durableName)
{
var response = await RequestLocalAsync($"$JS.API.CONSUMER.INFO.{stream}.{durableName}", "{}");
return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found.");
}
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
{
return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
}
public Task<PullFetchBatch> FetchWithNoWaitAsync(string stream, string durableName, int batch)
{
return _consumerManager.FetchAsync(stream, durableName, new PullFetchRequest
{
Batch = batch,
NoWait = true,
}, _streamManager, default).AsTask();
}
public async Task<PullFetchBatch> FetchAfterDelayAsync(string stream, string durableName, int delayMs, int batch)
{
await PollHelper.YieldForAsync(delayMs);
return await FetchAsync(stream, durableName, batch);
}
public Task<PushFrame> ReadPushFrameAsync(string stream = "ORDERS", string durableName = "PUSH")
{
var frame = _consumerManager.ReadPushFrame(stream, durableName);
if (frame == null)
throw new InvalidOperationException("No push frame available.");
return Task.FromResult(frame);
}
public async Task WaitForMirrorSyncAsync(string streamName)
{
await PollHelper.WaitUntilAsync(
async () => (await GetStreamStateAsync(streamName)).Messages > 0,
timeoutMs: 2000,
intervalMs: 25);
}
public async Task PublishManyAsync(string subject, IReadOnlyList<string> payloads)
{
foreach (var payload in payloads)
_ = await PublishAndGetAckAsync(subject, payload);
}
public Task PublishToSourceAsync(string sourceStream, string subject, string payload)
{
_ = sourceStream;
return PublishAndGetAckAsync(subject, payload);
}
public Task AckAllAsync(string stream, string durableName, ulong sequence)
{
_consumerManager.AckAll(stream, durableName, sequence);
return Task.CompletedTask;
}
public Task<int> GetPendingCountAsync(string stream, string durableName)
{
return Task.FromResult(_consumerManager.GetPendingCount(stream, durableName));
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}

View File

@@ -0,0 +1,397 @@
// Go parity: golang/nats-server/server/jetstream_helpers_test.go
// Covers: unified cluster fixture consolidating all per-suite fixtures
// into a single reusable helper used by Tasks 6-10.
// Corresponds to: checkClusterFormed, waitOnStreamLeader,
// waitOnConsumerLeader, restartServerAndWait, shutdownServerAndRemoveStorage,
// streamLeader, consumerLeader helpers in jetstream_helpers_test.go.
using System.Collections.Concurrent;
using System.Reflection;
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
using NATS.Server.JetStream.Validation;
namespace NATS.Server.TestUtilities;
/// <summary>
/// Unified JetStream cluster fixture that consolidates the capabilities of
/// ClusterFormationFixture, ClusterStreamFixture, ClusterMetaFixture,
/// ClusterConsumerFixture, ClusterFailoverFixture, LeaderFailoverFixture, and
/// ConsumerReplicaFixture into a single reusable helper for cluster test suites.
///
/// Go ref: jetstream_helpers_test.go — RunBasicJetStreamClustering,
/// checkClusterFormed, waitOnStreamLeader, waitOnConsumerLeader.
/// </summary>
public sealed class JetStreamClusterFixture : IAsyncDisposable
{
private readonly JetStreamMetaGroup _metaGroup;
private readonly StreamManager _streamManager;
private readonly ConsumerManager _consumerManager;
private readonly JetStreamApiRouter _router;
private readonly JetStreamPublisher _publisher;
private readonly int _nodeCount;
// Simulated node lifecycle: removed nodes are tracked here.
// Go ref: shutdownServerAndRemoveStorage, restartServerAndWait
private readonly HashSet<int> _removedNodes = [];
private readonly HashSet<int> _restartedNodes = [];
private JetStreamClusterFixture(
JetStreamMetaGroup metaGroup,
StreamManager streamManager,
ConsumerManager consumerManager,
JetStreamApiRouter router,
JetStreamPublisher publisher,
int nodeCount)
{
_metaGroup = metaGroup;
_streamManager = streamManager;
_consumerManager = consumerManager;
_router = router;
_publisher = publisher;
_nodeCount = nodeCount;
}
// ---------------------------------------------------------------
// Go ref: checkClusterFormed — cluster size property
// ---------------------------------------------------------------
/// <summary>
/// Total number of nodes in the cluster.
/// Go ref: checkClusterFormed in jetstream_helpers_test.go.
/// </summary>
public int NodeCount => _nodeCount;
// ---------------------------------------------------------------
// Factory
// ---------------------------------------------------------------
/// <summary>
/// Creates and returns a cluster fixture with the given number of nodes.
/// Go ref: RunBasicJetStreamClustering in jetstream_helpers_test.go.
/// </summary>
public static Task<JetStreamClusterFixture> StartAsync(int nodes)
{
var meta = new JetStreamMetaGroup(nodes);
var consumerManager = new ConsumerManager(meta);
var streamManager = new StreamManager(meta, consumerManager: consumerManager);
var router = new JetStreamApiRouter(streamManager, consumerManager, meta);
var publisher = new JetStreamPublisher(streamManager);
return Task.FromResult(new JetStreamClusterFixture(meta, streamManager, consumerManager, router, publisher, nodes));
}
// ---------------------------------------------------------------
// Stream operations
// ---------------------------------------------------------------
/// <summary>
/// Creates (or updates) a stream with the given name, subjects, replica count,
/// and optional storage type. Throws on error.
/// Go ref: addStreamWithError in jetstream_helpers_test.go.
/// </summary>
public Task<JetStreamApiResponse> CreateStreamAsync(
string name,
string[] subjects,
int replicas,
StorageType storage = StorageType.Memory)
{
var response = _streamManager.CreateOrUpdate(new StreamConfig
{
Name = name,
Subjects = [.. subjects],
Replicas = replicas,
Storage = storage,
});
return Task.FromResult(response);
}
/// <summary>
/// Creates a stream directly from a full StreamConfig. Does not throw on error.
/// Go ref: addStreamWithError in jetstream_helpers_test.go.
/// </summary>
public JetStreamApiResponse CreateStreamDirect(StreamConfig config)
=> _streamManager.CreateOrUpdate(config);
/// <summary>
/// Updates an existing stream's subjects, replica count, and optional max messages.
/// Go ref: updateStream in jetstream_helpers_test.go.
/// </summary>
public JetStreamApiResponse UpdateStream(string name, string[] subjects, int replicas, int maxMsgs = 0)
=> _streamManager.CreateOrUpdate(new StreamConfig
{
Name = name,
Subjects = [.. subjects],
Replicas = replicas,
MaxMsgs = maxMsgs,
});
/// <summary>
/// Returns the full stream info response.
/// Go ref: getStreamInfo in jetstream_helpers_test.go.
/// </summary>
public Task<JetStreamApiResponse> GetStreamInfoAsync(string name)
=> Task.FromResult(_streamManager.GetInfo(name));
/// <summary>
/// Returns the stream's current state (message count, sequences, bytes).
/// Go ref: getStreamInfo().State in jetstream_helpers_test.go.
/// </summary>
public Task<ApiStreamState> GetStreamStateAsync(string name)
=> _streamManager.GetStateAsync(name, default).AsTask();
/// <summary>
/// Returns the storage backend type string ("memory" or "file") for a stream.
/// </summary>
public string GetStoreBackendType(string name)
=> _streamManager.GetStoreBackendType(name);
// ---------------------------------------------------------------
// Publish
// ---------------------------------------------------------------
/// <summary>
/// Publishes a message to the given subject and notifies any push consumers.
/// Throws if the subject does not match a stream.
/// Go ref: sendStreamMsg in jetstream_helpers_test.go.
/// </summary>
public Task<PubAck> PublishAsync(string subject, string payload)
{
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack))
{
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle))
{
var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
if (stored != null)
_consumerManager.OnPublished(ack.Stream, stored);
}
return Task.FromResult(ack);
}
throw new InvalidOperationException($"Publish to '{subject}' did not match a stream.");
}
// ---------------------------------------------------------------
// Consumer operations
// ---------------------------------------------------------------
/// <summary>
/// Creates (or updates) a durable consumer on the given stream.
/// Go ref: addConsumer in jetstream_helpers_test.go.
/// </summary>
public Task<JetStreamApiResponse> CreateConsumerAsync(
string stream,
string durableName,
string? filterSubject = null,
AckPolicy ackPolicy = AckPolicy.None)
{
var config = new ConsumerConfig
{
DurableName = durableName,
AckPolicy = ackPolicy,
};
if (!string.IsNullOrWhiteSpace(filterSubject))
config.FilterSubject = filterSubject;
return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config));
}
/// <summary>
/// Fetches up to <paramref name="batch"/> messages from the named consumer.
/// Go ref: fetchMsgs in jetstream_helpers_test.go.
/// </summary>
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
=> _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
/// <summary>
/// Acknowledges all messages up to and including the given sequence.
/// Go ref: sendAck / ackAll in jetstream_helpers_test.go.
/// </summary>
public void AckAll(string stream, string durableName, ulong sequence)
=> _consumerManager.AckAll(stream, durableName, sequence);
// ---------------------------------------------------------------
// API routing
// ---------------------------------------------------------------
/// <summary>
/// Routes a raw JetStream API request by subject and returns the response.
/// Go ref: nc.Request() in cluster test helpers.
/// </summary>
public Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
{
var response = _router.Route(subject, Encoding.UTF8.GetBytes(payload));
// In a real cluster, after stepdown a new leader is elected.
// Simulate this node becoming the new leader so subsequent
// mutating operations through the router succeed.
if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal) && response.Success)
_metaGroup.BecomeLeader();
return Task.FromResult(response);
}
// ---------------------------------------------------------------
// Leader operations
// ---------------------------------------------------------------
/// <summary>
/// Returns the meta-cluster leader ID.
/// Go ref: c.leader() in jetstream_helpers_test.go.
/// </summary>
public string GetMetaLeaderId()
=> _metaGroup.GetState().LeaderId;
/// <summary>
/// Steps down the current meta-cluster leader, electing a new one.
/// Go ref: c.leader().Shutdown() in jetstream_helpers_test.go.
/// </summary>
public void StepDownMetaLeader()
{
_metaGroup.StepDown();
// In a real cluster, a new leader is elected after stepdown.
// Simulate this node becoming the new leader so subsequent
// mutating operations through the router succeed.
_metaGroup.BecomeLeader();
}
/// <summary>
/// Returns the current meta-group state snapshot.
/// Go ref: getMetaState in tests.
/// </summary>
public MetaGroupState? GetMetaState()
=> _metaGroup.GetState();
/// <summary>
/// Steps down the current stream leader, electing a new one.
/// Returns the API response from the step-down request.
/// Go ref: JSApiStreamLeaderStepDownT in jetstream_helpers_test.go.
/// </summary>
public Task<JetStreamApiResponse> StepDownStreamLeaderAsync(string stream)
=> Task.FromResult(_router.Route(
$"{JetStreamApiSubjects.StreamLeaderStepdown}{stream}",
"{}"u8));
/// <summary>
/// Returns the replica group leader ID for the named stream.
/// Go ref: streamLeader in jetstream_helpers_test.go.
/// </summary>
public string GetStreamLeaderId(string stream)
{
var groups = GetReplicaGroupDictionary();
return groups.TryGetValue(stream, out var group) ? group.Leader.Id : string.Empty;
}
/// <summary>
/// Returns the replica group for the named stream, or null if not found.
/// Go ref: streamLeader / stream replica accessor in jetstream_helpers_test.go.
/// </summary>
public StreamReplicaGroup? GetReplicaGroup(string streamName)
{
var groups = GetReplicaGroupDictionary();
return groups.TryGetValue(streamName, out var g) ? g : null;
}
/// <summary>
/// Returns a simulated consumer leader ID derived from the stream's replica
/// group leader. In Go, each consumer has its own RAFT group; here we derive
/// from the stream group leader since per-consumer RAFT groups are not yet
/// implemented independently.
/// Go ref: consumerLeader in jetstream_helpers_test.go.
/// </summary>
public string GetConsumerLeaderId(string stream, string consumer)
{
// Consumers share the stream's RAFT group in this model.
// Return a deterministic consumer-scoped leader derived from the stream leader.
var streamLeader = GetStreamLeaderId(stream);
if (string.IsNullOrEmpty(streamLeader))
return string.Empty;
// Include the consumer name hash to make the ID consumer-scoped
// while still being deterministic and non-empty.
return $"{streamLeader}/consumer/{consumer}";
}
// ---------------------------------------------------------------
// Go ref: waitOnStreamLeader — wait until a stream has a leader
// ---------------------------------------------------------------
/// <summary>
/// Waits until the named stream has a non-empty leader ID, polling every 10ms.
/// Throws TimeoutException if the leader is not elected within the timeout.
/// Go ref: waitOnStreamLeader in jetstream_helpers_test.go.
/// </summary>
public async Task WaitOnStreamLeaderAsync(string stream, int timeoutMs = 5000)
{
await PollHelper.WaitOrThrowAsync(
() => !string.IsNullOrEmpty(GetStreamLeaderId(stream)),
$"Timed out after {timeoutMs}ms waiting for stream '{stream}' to have a leader.",
timeoutMs);
}
// ---------------------------------------------------------------
// Go ref: waitOnConsumerLeader — wait until a consumer has a leader
// ---------------------------------------------------------------
/// <summary>
/// Waits until the named consumer on the named stream has a non-empty leader ID,
/// polling every 10ms. Throws TimeoutException if not elected within the timeout.
/// Go ref: waitOnConsumerLeader in jetstream_helpers_test.go.
/// </summary>
public async Task WaitOnConsumerLeaderAsync(string stream, string consumer, int timeoutMs = 5000)
{
await PollHelper.WaitOrThrowAsync(
() => _consumerManager.TryGet(stream, consumer, out _)
&& !string.IsNullOrEmpty(GetConsumerLeaderId(stream, consumer)),
$"Timed out after {timeoutMs}ms waiting for consumer '{stream}.{consumer}' to have a leader.",
timeoutMs);
}
// ---------------------------------------------------------------
// Go ref: restartServerAndWait — simulate node restart
// ---------------------------------------------------------------
/// <summary>
/// Simulates a node restart by removing it from the removed set and recording
/// it as restarted. In the full runtime, a restarted node rejoins the cluster
/// and syncs state. Here it is a lifecycle marker for tests that track node restarts.
/// Go ref: restartServerAndWait in jetstream_helpers_test.go.
/// </summary>
public void SimulateNodeRestart(int nodeIndex)
{
_removedNodes.Remove(nodeIndex);
_restartedNodes.Add(nodeIndex);
}
// ---------------------------------------------------------------
// Go ref: shutdownServerAndRemoveStorage — remove a node
// ---------------------------------------------------------------
/// <summary>
/// Simulates removing a node from the cluster (shutdown + storage removal).
/// Records the node index as removed.
/// Go ref: shutdownServerAndRemoveStorage in jetstream_helpers_test.go.
/// </summary>
public void RemoveNode(int nodeIndex)
{
_removedNodes.Add(nodeIndex);
_restartedNodes.Remove(nodeIndex);
}
// ---------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------
private ConcurrentDictionary<string, StreamReplicaGroup> GetReplicaGroupDictionary()
{
var field = typeof(StreamManager)
.GetField("_replicaGroups", BindingFlags.NonPublic | BindingFlags.Instance)!;
return (ConcurrentDictionary<string, StreamReplicaGroup>)field.GetValue(_streamManager)!;
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}

View File

@@ -0,0 +1,96 @@
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
namespace NATS.Server.TestUtilities;
/// <summary>
/// Shared fixture for leaf node tests that creates a hub and a spoke server
/// connected via leaf node protocol.
/// </summary>
public sealed class LeafFixture : IAsyncDisposable
{
private readonly CancellationTokenSource _hubCts;
private readonly CancellationTokenSource _spokeCts;
private LeafFixture(NatsServer hub, NatsServer spoke, CancellationTokenSource hubCts, CancellationTokenSource spokeCts)
{
Hub = hub;
Spoke = spoke;
_hubCts = hubCts;
_spokeCts = spokeCts;
}
public NatsServer Hub { get; }
public NatsServer Spoke { get; }
public static async Task<LeafFixture> StartAsync()
{
var hubOptions = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
LeafNode = new LeafNodeOptions
{
Host = "127.0.0.1",
Port = 0,
},
};
var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance);
var hubCts = new CancellationTokenSource();
_ = hub.StartAsync(hubCts.Token);
await hub.WaitForReadyAsync();
var spokeOptions = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
LeafNode = new LeafNodeOptions
{
Host = "127.0.0.1",
Port = 0,
Remotes = [hub.LeafListen!],
},
};
var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance);
var spokeCts = new CancellationTokenSource();
_ = spoke.StartAsync(spokeCts.Token);
await spoke.WaitForReadyAsync();
await PollHelper.WaitUntilAsync(
() => hub.Stats.Leafs > 0 && spoke.Stats.Leafs > 0,
timeoutMs: 5000,
intervalMs: 50);
return new LeafFixture(hub, spoke, hubCts, spokeCts);
}
public async Task WaitForRemoteInterestOnHubAsync(string subject)
{
await PollHelper.WaitOrThrowAsync(
() => Hub.HasRemoteInterest(subject),
$"Timed out waiting for remote interest on hub for '{subject}'.",
timeoutMs: 5000,
intervalMs: 50);
}
public async Task WaitForRemoteInterestOnSpokeAsync(string subject)
{
await PollHelper.WaitOrThrowAsync(
() => Spoke.HasRemoteInterest(subject),
$"Timed out waiting for remote interest on spoke for '{subject}'.",
timeoutMs: 5000,
intervalMs: 50);
}
public async ValueTask DisposeAsync()
{
await _spokeCts.CancelAsync();
await _hubCts.CancelAsync();
Spoke.Dispose();
Hub.Dispose();
_spokeCts.Dispose();
_hubCts.Dispose();
}
}

View File

@@ -10,6 +10,10 @@
<PackageReference Include="xunit" />
</ItemGroup>
<ItemGroup>
<Using Include="Shouldly" />
</ItemGroup>
<ItemGroup>
<None Update="TestData\**\*" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>

View File

@@ -0,0 +1,203 @@
namespace NATS.Server.TestUtilities.Parity;
public sealed record DriftRow(string Feature, string DifferencesStatus, string EvidenceStatus, string Reason);
public sealed class JetStreamParityTruthMatrixReport
{
public JetStreamParityTruthMatrixReport(IReadOnlyList<DriftRow> driftRows, IReadOnlyList<string> contradictions)
{
DriftRows = driftRows;
Contradictions = contradictions;
}
public IReadOnlyList<DriftRow> DriftRows { get; }
public IReadOnlyList<string> Contradictions { get; }
}
public static class JetStreamParityTruthMatrix
{
public static JetStreamParityTruthMatrixReport Load(string differencesRelativePath, string mapRelativePath)
{
var repositoryRoot = Path.GetFullPath(Path.Combine(AppContext.BaseDirectory, "..", "..", "..", "..", ".."));
var differencesPath = Path.Combine(repositoryRoot, differencesRelativePath);
var mapPath = Path.Combine(repositoryRoot, mapRelativePath);
File.Exists(differencesPath).ShouldBeTrue();
File.Exists(mapPath).ShouldBeTrue();
var differences = ParityRowInspector.Load(differencesRelativePath).Rows;
var matrixRows = ParseTruthMatrix(mapPath);
var drift = new List<DriftRow>();
if (matrixRows.Count == 0)
{
drift.Add(new DriftRow(
"JetStream Truth Matrix",
"missing",
"missing",
"docs/plans/2026-02-23-jetstream-remaining-parity-map.md must include a populated 'JetStream Truth Matrix' table."));
}
foreach (var row in matrixRows)
{
var differencesRow = differences.FirstOrDefault(r =>
string.Equals(r.Feature, row.DifferencesFeature, StringComparison.OrdinalIgnoreCase));
if (differencesRow is null)
{
drift.Add(new DriftRow(
row.Feature,
"missing",
row.EvidenceStatus,
$"Differences row '{row.DifferencesFeature}' was not found in differences.md."));
continue;
}
if (!string.Equals(differencesRow.DotNetStatus, "Y", StringComparison.OrdinalIgnoreCase))
{
drift.Add(new DriftRow(
row.Feature,
differencesRow.DotNetStatus,
row.EvidenceStatus,
"Differences status must be Y for a verified truth-matrix row."));
}
if (!string.Equals(row.EvidenceStatus, "verified", StringComparison.OrdinalIgnoreCase))
{
drift.Add(new DriftRow(
row.Feature,
differencesRow.DotNetStatus,
row.EvidenceStatus,
"Evidence status must be 'verified'."));
}
if (string.IsNullOrWhiteSpace(row.TestEvidence) || row.TestEvidence == "-")
{
drift.Add(new DriftRow(
row.Feature,
differencesRow.DotNetStatus,
row.EvidenceStatus,
"Test evidence must be provided for every truth-matrix row."));
}
}
var contradictions = ParseRemainingExplicitDeltaContradictions(differencesPath, matrixRows);
return new JetStreamParityTruthMatrixReport(drift, contradictions);
}
private static List<TruthMatrixRow> ParseTruthMatrix(string mapPath)
{
var rows = new List<TruthMatrixRow>();
var inTruthMatrix = false;
foreach (var rawLine in File.ReadLines(mapPath))
{
var line = rawLine.Trim();
if (line.StartsWith("## ", StringComparison.Ordinal))
{
inTruthMatrix = string.Equals(
line,
"## JetStream Truth Matrix",
StringComparison.OrdinalIgnoreCase);
continue;
}
if (!inTruthMatrix || !line.StartsWith("|", StringComparison.Ordinal) || line.Contains("---", StringComparison.Ordinal))
continue;
var cells = line.Trim('|').Split('|').Select(c => c.Trim()).ToArray();
if (cells.Length < 4 || string.Equals(cells[0], "Feature", StringComparison.OrdinalIgnoreCase))
continue;
rows.Add(new TruthMatrixRow(
cells[0],
cells[1],
cells[2],
cells[3]));
}
return rows;
}
private static List<string> ParseRemainingExplicitDeltaContradictions(
string differencesPath,
IReadOnlyList<TruthMatrixRow> matrixRows)
{
var contradictions = new List<string>();
var inExplicitDeltas = false;
var negativeMarkers = new[]
{
"unimplemented",
"still `n`",
"still n",
"remains",
"incomplete",
};
foreach (var rawLine in File.ReadLines(differencesPath))
{
var line = rawLine.Trim();
if (line.StartsWith("### ", StringComparison.Ordinal))
{
inExplicitDeltas = string.Equals(
line,
"### Remaining Explicit Deltas",
StringComparison.OrdinalIgnoreCase);
continue;
}
if (inExplicitDeltas && line.StartsWith("## ", StringComparison.Ordinal))
{
inExplicitDeltas = false;
continue;
}
if (!inExplicitDeltas || !line.StartsWith("- ", StringComparison.Ordinal))
continue;
var normalizedLine = line.ToLowerInvariant();
if (!negativeMarkers.Any(marker => normalizedLine.Contains(marker, StringComparison.Ordinal)))
continue;
foreach (var row in matrixRows.Where(r =>
string.Equals(r.EvidenceStatus, "verified", StringComparison.OrdinalIgnoreCase)))
{
if (MentionsFeature(normalizedLine, row))
{
contradictions.Add($"{row.Feature}: {line[2..].Trim()}");
break;
}
}
}
return contradictions;
}
private static bool MentionsFeature(string normalizedLine, TruthMatrixRow row)
{
var tokens = Tokenize(row.Feature)
.Concat(Tokenize(row.DifferencesFeature))
.Where(t => t.Length >= 4)
.Distinct(StringComparer.Ordinal)
.ToArray();
if (tokens.Length == 0)
return false;
var matches = tokens.Count(t => normalizedLine.Contains(t, StringComparison.Ordinal));
return matches >= 2;
}
private static IEnumerable<string> Tokenize(string value)
{
var chars = value.ToLowerInvariant()
.Select(c => char.IsLetterOrDigit(c) ? c : ' ')
.ToArray();
return new string(chars)
.Split(' ', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
}
private sealed record TruthMatrixRow(
string Feature,
string DifferencesFeature,
string EvidenceStatus,
string TestEvidence);
}

View File

@@ -0,0 +1,52 @@
namespace NATS.Server.TestUtilities.Parity;
public sealed record CapabilityRow(string Capability, string Behavior, string Tests, string Docs);
public sealed class NatsCapabilityInventoryReport
{
public NatsCapabilityInventoryReport(IReadOnlyList<CapabilityRow> rows)
{
Rows = rows;
}
public IReadOnlyList<CapabilityRow> Rows { get; }
public IReadOnlyList<CapabilityRow> InvalidRows => Rows
.Where(r => !IsDone(r.Behavior) && IsClosed(r.Docs))
.Concat(Rows.Where(r => !IsDone(r.Tests) && IsClosed(r.Docs)))
.Distinct()
.ToArray();
private static bool IsDone(string status) => string.Equals(status, "done", StringComparison.OrdinalIgnoreCase);
private static bool IsClosed(string status) => string.Equals(status, "closed", StringComparison.OrdinalIgnoreCase);
}
public static class NatsCapabilityInventory
{
public static NatsCapabilityInventoryReport Load(string relativePath)
{
var repositoryRoot = Path.GetFullPath(Path.Combine(AppContext.BaseDirectory, "..", "..", "..", "..", ".."));
var mapPath = Path.Combine(repositoryRoot, relativePath);
File.Exists(mapPath).ShouldBeTrue();
var rows = new List<CapabilityRow>();
foreach (var rawLine in File.ReadLines(mapPath))
{
var line = rawLine.Trim();
if (!line.StartsWith("|", StringComparison.Ordinal) || line.Contains("---", StringComparison.Ordinal))
continue;
var cells = line.Trim('|').Split('|').Select(static c => c.Trim()).ToArray();
if (cells.Length < 4 || string.Equals(cells[0], "Capability", StringComparison.OrdinalIgnoreCase))
continue;
rows.Add(new CapabilityRow(
cells[0],
cells[1],
cells[2],
cells[3]));
}
return new NatsCapabilityInventoryReport(rows);
}
}

View File

@@ -0,0 +1,67 @@
namespace NATS.Server.TestUtilities.Parity;
public sealed record ParityRow(string Section, string SubSection, string Feature, string DotNetStatus);
public sealed class ParityReport
{
public ParityReport(IReadOnlyList<ParityRow> rows)
{
Rows = rows;
}
public IReadOnlyList<ParityRow> Rows { get; }
public IReadOnlyList<ParityRow> UnresolvedRows =>
Rows.Where(r => r.DotNetStatus is "N" or "Baseline" or "Stub").ToArray();
}
public static class ParityRowInspector
{
public static ParityReport Load(string relativePath)
{
var repositoryRoot = Path.GetFullPath(Path.Combine(AppContext.BaseDirectory, "..", "..", "..", "..", ".."));
var differencesPath = Path.Combine(repositoryRoot, relativePath);
File.Exists(differencesPath).ShouldBeTrue();
var section = string.Empty;
var subsection = string.Empty;
var rows = new List<ParityRow>();
foreach (var rawLine in File.ReadLines(differencesPath))
{
var line = rawLine.Trim();
if (line.StartsWith("## ", StringComparison.Ordinal))
{
section = line[3..].Trim();
continue;
}
if (line.StartsWith("### ", StringComparison.Ordinal))
{
subsection = line[4..].Trim();
continue;
}
if (!line.StartsWith("|", StringComparison.Ordinal))
continue;
if (line.Contains("---", StringComparison.Ordinal))
continue;
var cells = line.Trim('|').Split('|').Select(c => c.Trim()).ToArray();
if (cells.Length < 3)
continue;
// Ignore table header rows; row format is expected to contain Go and .NET status columns.
if (cells[0] is "Feature" or "Aspect" or "Operation" or "Signal" or "Type" or "Mechanism" or "Flag")
continue;
rows.Add(new ParityRow(
section,
subsection,
cells[0],
cells[2]));
}
return new ParityReport(rows);
}
}

View File

@@ -0,0 +1,110 @@
namespace NATS.Server.TestUtilities;
/// <summary>
/// Provides bounded polling helpers for test fixtures that need to wait
/// for asynchronous conditions across independent components (e.g. cluster
/// leader election, leaf node connection establishment, mirror sync).
/// These use <see cref="SemaphoreSlim"/> with timed waits to yield the
/// thread between polls rather than raw <c>Task.Delay</c>.
/// </summary>
public static class PollHelper
{
/// <summary>
/// Polls <paramref name="condition"/> at <paramref name="intervalMs"/> intervals
/// until it returns <c>true</c> or <paramref name="timeoutMs"/> elapses.
/// Returns <c>true</c> if the condition was met, <c>false</c> on timeout.
/// </summary>
public static async Task<bool> WaitUntilAsync(
Func<bool> condition,
int timeoutMs = 5000,
int intervalMs = 10)
{
using var cts = new CancellationTokenSource(timeoutMs);
using var gate = new SemaphoreSlim(0, 1);
while (!cts.IsCancellationRequested)
{
if (condition())
return true;
try
{
await gate.WaitAsync(intervalMs, cts.Token);
}
catch (OperationCanceledException)
{
break;
}
}
return condition();
}
/// <summary>
/// Polls <paramref name="condition"/> (async overload) at <paramref name="intervalMs"/>
/// intervals until it returns <c>true</c> or <paramref name="timeoutMs"/> elapses.
/// Returns <c>true</c> if the condition was met, <c>false</c> on timeout.
/// </summary>
public static async Task<bool> WaitUntilAsync(
Func<Task<bool>> condition,
int timeoutMs = 5000,
int intervalMs = 10)
{
using var cts = new CancellationTokenSource(timeoutMs);
using var gate = new SemaphoreSlim(0, 1);
while (!cts.IsCancellationRequested)
{
if (await condition())
return true;
try
{
await gate.WaitAsync(intervalMs, cts.Token);
}
catch (OperationCanceledException)
{
break;
}
}
return await condition();
}
/// <summary>
/// Polls <paramref name="condition"/> and throws <see cref="TimeoutException"/>
/// with <paramref name="message"/> if the condition is not met within <paramref name="timeoutMs"/>.
/// </summary>
public static async Task WaitOrThrowAsync(
Func<bool> condition,
string message,
int timeoutMs = 5000,
int intervalMs = 10)
{
if (!await WaitUntilAsync(condition, timeoutMs, intervalMs))
throw new TimeoutException(message);
}
/// <summary>
/// Polls <paramref name="condition"/> (async overload) and throws
/// <see cref="TimeoutException"/> with <paramref name="message"/> if the condition
/// is not met within <paramref name="timeoutMs"/>.
/// </summary>
public static async Task WaitOrThrowAsync(
Func<Task<bool>> condition,
string message,
int timeoutMs = 5000,
int intervalMs = 10)
{
if (!await WaitUntilAsync(condition, timeoutMs, intervalMs))
throw new TimeoutException(message);
}
/// <summary>
/// Yields the current task for approximately <paramref name="delayMs"/> using a
/// semaphore-based timed wait rather than <c>Task.Delay</c>.
/// </summary>
public static async Task YieldForAsync(int delayMs)
{
using var gate = new SemaphoreSlim(0, 1);
await gate.WaitAsync(delayMs);
}
}

View File

@@ -0,0 +1,11 @@
authorization {
user: admin
password: "s3cret"
timeout: 5
users = [
{ user: alice, password: "pw1", permissions: { publish: { allow: ["foo.>"] }, subscribe: { allow: [">"] } } }
{ user: bob, password: "pw2" }
]
}
no_auth_user: "guest"

View File

@@ -0,0 +1,19 @@
port: 4222
host: "0.0.0.0"
server_name: "test-server"
max_payload: 2mb
max_connections: 1000
debug: true
trace: false
logtime: true
logtime_utc: false
ping_interval: "30s"
ping_max: 3
write_deadline: "5s"
max_subs: 100
max_sub_tokens: 16
max_control_line: 2048
max_pending: 32mb
lame_duck_duration: "60s"
lame_duck_grace_period: "5s"
http_port: 8222

View File

@@ -0,0 +1,57 @@
# Full configuration with all supported options
port: 4222
host: "0.0.0.0"
server_name: "full-test"
client_advertise: "nats://public.example.com:4222"
max_payload: 1mb
max_control_line: 4096
max_connections: 65536
max_pending: 64mb
write_deadline: "10s"
max_subs: 0
max_sub_tokens: 0
max_traced_msg_len: 1024
disable_sublist_cache: false
max_closed_clients: 5000
ping_interval: "2m"
ping_max: 2
debug: false
trace: false
trace_verbose: false
logtime: true
logtime_utc: false
logfile: "/var/log/nats.log"
log_size_limit: 100mb
log_max_num: 5
http_port: 8222
http_base_path: "/nats"
pidfile: "/var/run/nats.pid"
ports_file_dir: "/var/run"
lame_duck_duration: "2m"
lame_duck_grace_period: "10s"
server_tags {
region: "us-east"
env: "production"
}
authorization {
user: admin
password: "secret"
timeout: 2
}
tls {
cert_file: "/path/to/cert.pem"
key_file: "/path/to/key.pem"
ca_file: "/path/to/ca.pem"
verify: true
timeout: 2
handshake_first: true
}

View File

@@ -0,0 +1,28 @@
mqtt {
listen: "10.0.0.1:1883"
no_auth_user: "mqtt_default"
authorization {
user: "mqtt_user"
pass: "mqtt_pass"
token: "mqtt_token"
timeout: 3.0
}
tls {
cert_file: "/path/to/mqtt-cert.pem"
key_file: "/path/to/mqtt-key.pem"
ca_file: "/path/to/mqtt-ca.pem"
verify: true
timeout: 5.0
}
ack_wait: "60s"
max_ack_pending: 2048
js_domain: "mqtt-domain"
js_api_timeout: "10s"
stream_replicas: 3
consumer_replicas: 1
consumer_memory_storage: true
consumer_inactive_threshold: "5m"
}

View File

@@ -0,0 +1,12 @@
tls {
cert_file: "/path/to/cert.pem"
key_file: "/path/to/key.pem"
ca_file: "/path/to/ca.pem"
verify: true
verify_and_map: true
timeout: 3
connection_rate_limit: 100
pinned_certs: ["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"]
handshake_first: true
}
allow_non_tls: false