diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/CheckHelper.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/CheckHelper.cs new file mode 100644 index 0000000..2c1dee2 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/CheckHelper.cs @@ -0,0 +1,117 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mirrors Go checkFor from server/test_test.go. + +using System.Diagnostics; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Retry/polling helpers for integration tests. +/// Mirrors Go checkFor from server/test_test.go. +/// +internal static class CheckHelper +{ + /// + /// Polls repeatedly until it returns null (success) + /// or the timeout expires, in which case the last exception is thrown. + /// Mirrors Go checkFor(t, timeout, interval, func() error). + /// + public static void CheckFor(TimeSpan timeout, TimeSpan interval, Func check) + { + var sw = Stopwatch.StartNew(); + Exception? last = null; + while (sw.Elapsed < timeout) + { + last = check(); + if (last == null) return; + Thread.Sleep(interval); + } + + // One final attempt after the sleep boundary. + last = check(); + if (last == null) return; + + throw new TimeoutException( + $"CheckFor timed out after {timeout}: {last.Message}", last); + } + + /// + /// Async version of . Uses Task.Delay instead of + /// Thread.Sleep to avoid blocking the thread pool. + /// + public static async Task CheckForAsync( + TimeSpan timeout, + TimeSpan interval, + Func> check, + CancellationToken cancellationToken = default) + { + var sw = Stopwatch.StartNew(); + Exception? last = null; + while (sw.Elapsed < timeout) + { + last = await check().ConfigureAwait(false); + if (last == null) return; + await Task.Delay(interval, cancellationToken).ConfigureAwait(false); + } + + // One final attempt. + last = await check().ConfigureAwait(false); + if (last == null) return; + + throw new TimeoutException( + $"CheckForAsync timed out after {timeout}: {last.Message}", last); + } + + /// + /// Waits until all servers in have formed a cluster + /// (each server sees at least servers.Length - 1 routes). + /// Uses a 10-second timeout with 100 ms poll interval. + /// Mirrors Go checkClusterFormed. + /// + public static void CheckClusterFormed(params NatsServer[] servers) + { + var expected = servers.Length - 1; + CheckFor(TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(100), () => + { + foreach (var s in servers) + { + var routes = s.NumRoutes(); + if (routes < expected) + return new Exception( + $"Server {s.Options.ServerName} has {routes} routes, expected {expected}."); + } + return null; + }); + } + + /// + /// Waits until the given server has at least + /// leaf node connections. + /// Uses a 10-second timeout with 100 ms poll interval. + /// Mirrors Go checkLeafNodeConnectedCount. + /// + public static void CheckLeafNodeConnectedCount(NatsServer server, int expected) + { + CheckFor(TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(100), () => + { + var count = server.NumLeafNodes(); + if (count < expected) + return new Exception( + $"Server {server.Options.ServerName} has {count} leaf nodes, expected {expected}."); + return null; + }); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/ConfigHelper.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/ConfigHelper.cs new file mode 100644 index 0000000..7f48040 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/ConfigHelper.cs @@ -0,0 +1,124 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Config templates mirror Go templates from server/jetstream_helpers_test.go. +// Note: C# string.Format uses {{ }} to escape literal braces. + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Config templates and temp config file management for integration tests. +/// Templates mirror the Go originals from server/jetstream_helpers_test.go. +/// +internal static class ConfigHelper +{ + // ========================================================================= + // Config templates + // ========================================================================= + + /// + /// Standard JetStream cluster template. + /// Placeholders: {0}=server_name, {1}=store_dir, {2}=cluster_name, + /// {3}=cluster_port, {4}=routes. + /// Mirrors Go jsClusterTempl. + /// + public const string JsClusterTemplate = @" +listen: 127.0.0.1:-1 +server_name: {0} +jetstream: {{max_mem_store: 2GB, max_file_store: 2GB, store_dir: '{1}'}} + +leaf {{ + listen: 127.0.0.1:-1 +}} + +cluster {{ + name: {2} + listen: 127.0.0.1:{3} + routes = [{4}] +}} + +# For access to system account. +accounts {{ $SYS {{ users = [ {{ user: ""admin"", pass: ""s3cr3t!"" }} ] }} }} +"; + + /// + /// JetStream cluster template with multiple named accounts. + /// Placeholders: {0}=server_name, {1}=store_dir, {2}=cluster_name, + /// {3}=cluster_port, {4}=routes. + /// Mirrors Go jsClusterAccountsTempl. + /// + public const string JsClusterAccountsTemplate = @" +listen: 127.0.0.1:-1 +server_name: {0} +jetstream: {{max_mem_store: 2GB, max_file_store: 2GB, store_dir: '{1}'}} + +leaf {{ + listen: 127.0.0.1:-1 +}} + +cluster {{ + name: {2} + listen: 127.0.0.1:{3} + routes = [{4}] +}} + +no_auth_user: one + +accounts {{ + ONE {{ users = [ {{ user: ""one"", pass: ""p"" }} ]; jetstream: enabled }} + TWO {{ users = [ {{ user: ""two"", pass: ""p"" }} ]; jetstream: enabled }} + NOJS {{ users = [ {{ user: ""nojs"", pass: ""p"" }} ] }} + $SYS {{ users = [ {{ user: ""admin"", pass: ""s3cr3t!"" }} ] }} +}} +"; + + /// + /// Super-cluster gateway wrapper template. + /// Placeholders: {0}=inner_cluster_config, {1}=gateway_name, + /// {2}=gateway_port, {3}=gateway_list. + /// Mirrors Go jsSuperClusterTempl. + /// + public const string JsSuperClusterTemplate = @" +{0} +gateway {{ + name: {1} + listen: 127.0.0.1:{2} + gateways = [{3} + ] +}} + +system_account: ""$SYS"" +"; + + /// + /// Gateway entry template used inside . + /// Placeholders: {0}=prefix_whitespace, {1}=gateway_name, {2}=urls. + /// Mirrors Go jsGWTempl. + /// + public const string JsGatewayEntryTemplate = @"{0}{{name: {1}, urls: [{2}]}}"; + + // ========================================================================= + // File helpers + // ========================================================================= + + /// + /// Writes to a temporary file and returns the path. + /// The caller is responsible for deleting the file when done. + /// + public static string CreateConfigFile(string content) + { + var path = Path.Combine(Path.GetTempPath(), "nats-test-" + Guid.NewGuid().ToString("N")[..8] + ".conf"); + File.WriteAllText(path, content); + return path; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/IntegrationTestBase.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/IntegrationTestBase.cs new file mode 100644 index 0000000..662be71 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/IntegrationTestBase.cs @@ -0,0 +1,57 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Xunit.Abstractions; + +namespace ZB.MOM.NatsNet.Server.IntegrationTests; + +/// +/// Abstract base class for all integration tests. +/// Skips the entire test class if the server cannot boot (i.e., the .NET server +/// runtime is not yet complete). Individual test classes inherit from this class. +/// +[Trait("Category", "Integration")] +public abstract class IntegrationTestBase : IDisposable +{ + // ========================================================================= + // Constructor — Skip guard + // ========================================================================= + + /// + /// Initializes the test base and verifies that the server can boot. + /// If returns false the test + /// is skipped via Xunit.SkippableFact's Skip.If mechanism. + /// + protected IntegrationTestBase(ITestOutputHelper output) + { + Output = output; + Skip.If(!Helpers.TestServerHelper.CanBoot(), "Server cannot boot — skipping integration tests."); + } + + // ========================================================================= + // Protected members + // ========================================================================= + + /// xUnit output helper, available to derived test classes. + protected ITestOutputHelper Output { get; } + + // ========================================================================= + // IDisposable + // ========================================================================= + + /// + /// Override in subclasses to perform per-test cleanup (e.g., shut down servers, + /// delete temp dirs). The base implementation does nothing. + /// + public virtual void Dispose() { } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/NatsTestClient.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/NatsTestClient.cs new file mode 100644 index 0000000..47f153a --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/NatsTestClient.cs @@ -0,0 +1,66 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mirrors Go natsConnect helpers from test files. + +using NATS.Client.Core; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// NATS.Client.Core wrapper helpers for integration test connections. +/// Mirrors Go natsConnect pattern from test helper files. +/// +internal static class NatsTestClient +{ + // Default test connection options applied unless overridden. + private static readonly NatsOpts DefaultTestOpts = new() + { + Name = "test-client", + ConnectTimeout = TimeSpan.FromSeconds(5), + RequestTimeout = TimeSpan.FromSeconds(10), + }; + + /// + /// Creates a to the given with + /// sensible test defaults. Settings in override the defaults. + /// + public static NatsConnection Connect(string url, NatsOpts? opts = null) + { + var effective = opts ?? DefaultTestOpts; + + // Always override the URL; apply default name when not supplied. + effective = effective with { Url = url }; + if (string.IsNullOrEmpty(effective.Name)) + effective = effective with { Name = DefaultTestOpts.Name }; + + return new NatsConnection(effective); + } + + /// + /// Creates a to the given . + /// The URL is derived from the server's client port — uses the value from + /// (resolved during server setup). When the server + /// was configured with port -1 (random), the actual port is stored in + /// after Start(). + /// + public static NatsConnection ConnectToServer(NatsServer server, NatsOpts? opts = null) + { + var port = server.Options.Port; + // Fallback to well-known port if options still show 0 or -1. + if (port <= 0) port = 4222; + + return Connect($"nats://127.0.0.1:{port}", opts); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestCluster.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestCluster.cs new file mode 100644 index 0000000..9bbf86e --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestCluster.cs @@ -0,0 +1,331 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mirrors Go cluster struct and createJetStreamCluster* helpers from +// server/jetstream_helpers_test.go. + +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Represents a multi-server JetStream cluster for integration tests. +/// Mirrors Go cluster struct from server/jetstream_helpers_test.go. +/// +internal sealed class TestCluster : IDisposable +{ + // ========================================================================= + // Properties + // ========================================================================= + + /// Running server instances in the cluster. + public NatsServer[] Servers { get; } + + /// Options used to configure each server. + public ServerOptions[] Options { get; } + + /// Name of this cluster (e.g. "HUB"). + public string Name { get; } + + private bool _disposed; + + // ========================================================================= + // Constructor + // ========================================================================= + + private TestCluster(NatsServer[] servers, ServerOptions[] options, string name) + { + Servers = servers; + Options = options; + Name = name; + } + + /// + /// Internal factory used by to wrap pre-started servers. + /// + internal static TestCluster FromServers(NatsServer[] servers, ServerOptions[] options, string name) + => new(servers, options, name); + + // ========================================================================= + // Static factory: standard JetStream cluster + // ========================================================================= + + /// + /// Creates a JetStream cluster using the default . + /// Mirrors Go createJetStreamCluster. + /// + public static TestCluster CreateJetStreamCluster(int numServers, string name) => + CreateJetStreamClusterWithTemplate(ConfigHelper.JsClusterTemplate, numServers, name); + + /// + /// Creates a JetStream cluster using the provided config . + /// Allocates free ports for each server's client and cluster listeners, builds route + /// URLs, generates per-server config from the template, starts all servers, and + /// waits for the cluster to form. + /// Mirrors Go createJetStreamClusterWithTemplate. + /// + public static TestCluster CreateJetStreamClusterWithTemplate( + string template, + int numServers, + string name) + { + // Allocate cluster (route) ports — one per server. + var clusterPorts = new int[numServers]; + for (var i = 0; i < numServers; i++) + clusterPorts[i] = TestServerHelper.GetFreePort(); + + // Build the routes string shared by all servers in this cluster. + var routeUrls = string.Join(",", clusterPorts.Select(p => $"nats-route://127.0.0.1:{p}")); + + var servers = new NatsServer[numServers]; + var opts = new ServerOptions[numServers]; + + for (var i = 0; i < numServers; i++) + { + var serverName = $"{name}-S{i + 1}"; + var storeDir = TestServerHelper.CreateTempDir($"js-{name}-{i + 1}-"); + + // Format template: {0}=server_name, {1}=store_dir, {2}=cluster_name, + // {3}=cluster_port, {4}=routes + var configContent = string.Format( + template, + serverName, + storeDir, + name, + clusterPorts[i], + routeUrls); + + var configFile = ConfigHelper.CreateConfigFile(configContent); + + var serverOpts = new ServerOptions + { + ServerName = serverName, + Host = "127.0.0.1", + Port = -1, + NoLog = true, + NoSigs = true, + JetStream = true, + StoreDir = storeDir, + ConfigFile = configFile, + Cluster = new ClusterOpts + { + Name = name, + Host = "127.0.0.1", + Port = clusterPorts[i], + }, + Routes = clusterPorts + .Where((_, idx) => idx != i) + .Select(p => new Uri($"nats-route://127.0.0.1:{p}")) + .ToList(), + }; + + var (server, _) = TestServerHelper.RunServer(serverOpts); + servers[i] = server; + opts[i] = serverOpts; + } + + var cluster = new TestCluster(servers, opts, name); + cluster.WaitOnClusterReady(); + return cluster; + } + + // ========================================================================= + // Wait helpers + // ========================================================================= + + /// + /// Waits until all servers in the cluster have formed routes to one another. + /// Mirrors Go checkClusterFormed. + /// + public void WaitOnClusterReady() + { + CheckHelper.CheckClusterFormed(Servers); + } + + /// + /// Waits until at least one server in the cluster reports as JetStream meta-leader. + /// Mirrors Go c.waitOnLeader. + /// + public void WaitOnLeader() + { + CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () => + { + var leader = Leader(); + if (leader == null) + return new Exception($"Cluster {Name}: no JetStream meta-leader elected yet."); + return null; + }); + } + + /// + /// Waits until the named stream has an elected leader in the given account. + /// Mirrors Go c.waitOnStreamLeader. + /// + public void WaitOnStreamLeader(string account, string stream) + { + CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () => + { + var leader = StreamLeader(account, stream); + if (leader == null) + return new Exception( + $"Cluster {Name}: no leader for stream '{stream}' in account '{account}'."); + return null; + }); + } + + /// + /// Waits until the named consumer has an elected leader. + /// Mirrors Go c.waitOnConsumerLeader. + /// + public void WaitOnConsumerLeader(string account, string stream, string consumer) + { + CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () => + { + var leader = ConsumerLeader(account, stream, consumer); + if (leader == null) + return new Exception( + $"Cluster {Name}: no leader for consumer '{consumer}' in stream '{stream}', account '{account}'."); + return null; + }); + } + + // ========================================================================= + // Accessors + // ========================================================================= + + /// + /// Returns the server that is currently the JetStream meta-leader, + /// or null if no leader is elected. + /// Mirrors Go c.leader(). + /// + public NatsServer? Leader() + { + foreach (var s in Servers) + { + if (s.JetStreamIsLeader()) + return s; + } + return null; + } + + /// + /// Returns the server that is leader for the named stream in the given account, + /// or null if no leader is elected. + /// Mirrors Go c.streamLeader. + /// + public NatsServer? StreamLeader(string account, string stream) + { + foreach (var s in Servers) + { + if (s.JetStreamIsStreamLeader(account, stream)) + return s; + } + return null; + } + + /// + /// Returns the server that is leader for the named consumer, + /// or null if no leader is elected. + /// Mirrors Go c.consumerLeader. + /// + public NatsServer? ConsumerLeader(string account, string stream, string consumer) + { + foreach (var s in Servers) + { + if (s.JetStreamIsConsumerLeader(account, stream, consumer)) + return s; + } + return null; + } + + /// + /// Returns a random running server from the cluster. + /// Mirrors Go c.randomServer(). + /// + public NatsServer RandomServer() + { + var candidates = Servers.Where(s => s.Running()).ToArray(); + if (candidates.Length == 0) + throw new InvalidOperationException($"Cluster {Name}: no running servers."); + return candidates[Random.Shared.Next(candidates.Length)]; + } + + /// + /// Finds a server by its . + /// Returns null if not found. + /// Mirrors Go c.serverByName. + /// + public NatsServer? ServerByName(string name) + { + foreach (var s in Servers) + { + if (s.Options.ServerName == name) + return s; + } + return null; + } + + // ========================================================================= + // Lifecycle + // ========================================================================= + + /// Stops all servers in the cluster. + public void StopAll() + { + foreach (var s in Servers) + { + try { s.Shutdown(); } catch { /* best effort */ } + } + } + + /// + /// Restarts all stopped servers. + /// Note: a true restart would re-create the server; here we call Start() if not running. + /// + public void RestartAll() + { + foreach (var (server, i) in Servers.Select((s, i) => (s, i))) + { + if (!server.Running()) + { + try { server.Start(); } catch { /* best effort */ } + } + } + } + + /// Shuts down and disposes all servers and cleans up temp files. + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + foreach (var (server, i) in Servers.Select((s, i) => (s, i))) + { + try { server.Shutdown(); } catch { /* best effort */ } + + // Clean up temp store dir. + var dir = Options[i].StoreDir; + if (!string.IsNullOrEmpty(dir) && Directory.Exists(dir)) + { + try { Directory.Delete(dir, recursive: true); } catch { /* best effort */ } + } + + // Clean up temp config file. + var cfg = Options[i].ConfigFile; + if (!string.IsNullOrEmpty(cfg) && File.Exists(cfg)) + { + try { File.Delete(cfg); } catch { /* best effort */ } + } + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestServerHelper.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestServerHelper.cs new file mode 100644 index 0000000..beab6c0 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestServerHelper.cs @@ -0,0 +1,135 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mirrors Go test helpers: RunServer, GetFreePort, etc. from server/test_test.go. + +using System.Net; +using System.Net.Sockets; +using Xunit.Abstractions; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Server lifecycle helpers for integration tests. +/// Mirrors Go patterns from server/test_test.go: RunServer, GetFreePort, etc. +/// +internal static class TestServerHelper +{ + /// + /// Returns true if a NatsServer can be instantiated with basic options. + /// Used as a Skip guard — if the server can't boot, all integration tests skip gracefully. + /// + public static bool CanBoot() + { + try + { + var opts = new ServerOptions + { + Host = "127.0.0.1", + Port = -1, + NoLog = true, + NoSigs = true, + }; + var (server, err) = NatsServer.NewServer(opts); + if (err != null || server == null) + return false; + + server.Shutdown(); + return true; + } + catch + { + return false; + } + } + + /// + /// Creates and starts a NatsServer with the given options. + /// Returns the running server and the options used. + /// Mirrors Go RunServer. + /// + public static (NatsServer Server, ServerOptions Options) RunServer(ServerOptions opts) + { + var (server, err) = NatsServer.NewServer(opts); + if (err != null) + throw new InvalidOperationException($"Failed to create server: {err.Message}", err); + if (server == null) + throw new InvalidOperationException("Failed to create server: NewServer returned null."); + + server.Start(); + return (server, opts); + } + + /// + /// Creates and starts a NatsServer with JetStream enabled and a temp store directory. + /// Mirrors Go RunServer with JetStream options. + /// + public static NatsServer RunBasicJetStreamServer(ITestOutputHelper? output = null) + { + var storeDir = CreateTempDir("js-store-"); + var opts = new ServerOptions + { + Host = "127.0.0.1", + Port = -1, + NoLog = true, + NoSigs = true, + JetStream = true, + StoreDir = storeDir, + }; + + var (server, _) = RunServer(opts); + return server; + } + + /// + /// Creates and starts a NatsServer using the options parsed from a config file path. + /// The config file content is read and minimal parsing extracts key options. + /// Returns the running server and the options. + /// + public static (NatsServer Server, ServerOptions Options) RunServerWithConfig(string configFile) + { + var opts = new ServerOptions + { + ConfigFile = configFile, + NoLog = true, + NoSigs = true, + }; + + return RunServer(opts); + } + + /// + /// Finds a free TCP port on loopback. + /// Mirrors Go GetFreePort. + /// + public static int GetFreePort() + { + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + listener.Stop(); + return port; + } + + /// + /// Creates a uniquely named temp directory with the given prefix. + /// The caller is responsible for deleting it when done. + /// + public static string CreateTempDir(string prefix = "nats-test-") + { + var path = Path.Combine(Path.GetTempPath(), prefix + Guid.NewGuid().ToString("N")[..8]); + Directory.CreateDirectory(path); + return path; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestSuperCluster.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestSuperCluster.cs new file mode 100644 index 0000000..c411141 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestSuperCluster.cs @@ -0,0 +1,294 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mirrors Go supercluster struct and createJetStreamSuperCluster* helpers from +// server/jetstream_helpers_test.go. + +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Represents a multi-cluster super-cluster connected via NATS gateways. +/// Mirrors Go supercluster struct from server/jetstream_helpers_test.go. +/// +internal sealed class TestSuperCluster : IDisposable +{ + // ========================================================================= + // Properties + // ========================================================================= + + /// All clusters that form this super-cluster. + public TestCluster[] Clusters { get; } + + private bool _disposed; + + // ========================================================================= + // Constructor + // ========================================================================= + + private TestSuperCluster(TestCluster[] clusters) + { + Clusters = clusters; + } + + // ========================================================================= + // Static factory + // ========================================================================= + + /// + /// Creates a JetStream super-cluster consisting of clusters, + /// each with servers, connected via gateways. + /// Cluster names are C1, C2, … Cn. + /// Mirrors Go createJetStreamSuperCluster. + /// + public static TestSuperCluster CreateJetStreamSuperCluster(int numPerCluster, int numClusters) + { + if (numClusters <= 1) + throw new ArgumentException("numClusters must be > 1.", nameof(numClusters)); + if (numPerCluster < 1) + throw new ArgumentException("numPerCluster must be >= 1.", nameof(numPerCluster)); + + // Allocate gateway ports — one per server across all clusters. + var totalServers = numClusters * numPerCluster; + var gatewayPorts = new int[totalServers]; + for (var i = 0; i < totalServers; i++) + gatewayPorts[i] = TestServerHelper.GetFreePort(); + + // Build gateway remote-entry lines for each cluster. + // Each cluster has numPerCluster gateway ports. + var gwEntries = new string[numClusters]; + for (var ci = 0; ci < numClusters; ci++) + { + var clusterName = $"C{ci + 1}"; + var baseIndex = ci * numPerCluster; + var urls = string.Join( + ",", + Enumerable.Range(baseIndex, numPerCluster) + .Select(idx => $"nats-gw://127.0.0.1:{gatewayPorts[idx]}")); + + gwEntries[ci] = string.Format( + ConfigHelper.JsGatewayEntryTemplate, + "\n\t\t\t", + clusterName, + urls); + } + var allGwConf = string.Join(string.Empty, gwEntries); + + // Create each cluster with the super-cluster gateway wrapper. + var clusters = new TestCluster[numClusters]; + + for (var ci = 0; ci < numClusters; ci++) + { + var clusterName = $"C{ci + 1}"; + var gwBaseIndex = ci * numPerCluster; + + // Allocate cluster-route ports for this sub-cluster. + var clusterPorts = Enumerable.Range(0, numPerCluster) + .Select(_ => TestServerHelper.GetFreePort()) + .ToArray(); + var routeUrls = string.Join( + ",", + clusterPorts.Select(p => $"nats-route://127.0.0.1:{p}")); + + var servers = new NatsServer[numPerCluster]; + var opts = new ServerOptions[numPerCluster]; + + for (var si = 0; si < numPerCluster; si++) + { + var serverName = $"{clusterName}-S{si + 1}"; + var storeDir = TestServerHelper.CreateTempDir($"js-sc-{clusterName}-{si + 1}-"); + var gwPort = gatewayPorts[gwBaseIndex + si]; + + // Inner cluster config (using JsClusterTemplate). + var innerConf = string.Format( + ConfigHelper.JsClusterTemplate, + serverName, + storeDir, + clusterName, + clusterPorts[si], + routeUrls); + + // Wrap with super-cluster template (gateway section). + var fullConf = string.Format( + ConfigHelper.JsSuperClusterTemplate, + innerConf, + clusterName, + gwPort, + allGwConf); + + var configFile = ConfigHelper.CreateConfigFile(fullConf); + + var serverOpts = new ServerOptions + { + ServerName = serverName, + Host = "127.0.0.1", + Port = -1, + NoLog = true, + NoSigs = true, + JetStream = true, + StoreDir = storeDir, + ConfigFile = configFile, + Cluster = new ClusterOpts + { + Name = clusterName, + Host = "127.0.0.1", + Port = clusterPorts[si], + }, + Gateway = new GatewayOpts + { + Name = clusterName, + Host = "127.0.0.1", + Port = gwPort, + Gateways = Enumerable.Range(0, numClusters) + .Where(gci => gci != ci) + .Select(gci => + { + var remoteName = $"C{gci + 1}"; + var remoteBase = gci * numPerCluster; + return new RemoteGatewayOpts + { + Name = remoteName, + Urls = Enumerable.Range(remoteBase, numPerCluster) + .Select(idx => new Uri($"nats-gw://127.0.0.1:{gatewayPorts[idx]}")) + .ToList(), + }; + }) + .ToList(), + }, + Routes = clusterPorts + .Where((_, idx) => idx != si) + .Select(p => new Uri($"nats-route://127.0.0.1:{p}")) + .ToList(), + }; + + var (server, _) = TestServerHelper.RunServer(serverOpts); + servers[si] = server; + opts[si] = serverOpts; + } + + clusters[ci] = TestCluster.FromServers(servers, opts, clusterName); + } + + var sc = new TestSuperCluster(clusters); + sc.WaitOnLeader(); + return sc; + } + + // ========================================================================= + // Accessors + // ========================================================================= + + /// + /// Finds the JetStream meta-leader across all clusters. + /// Returns null if no leader is elected. + /// Mirrors Go sc.leader(). + /// + public NatsServer? Leader() + { + foreach (var c in Clusters) + { + var l = c.Leader(); + if (l != null) return l; + } + return null; + } + + /// + /// Returns a random running server from a random cluster. + /// Mirrors Go sc.randomServer(). + /// + public NatsServer RandomServer() + { + var cluster = Clusters[Random.Shared.Next(Clusters.Length)]; + return cluster.RandomServer(); + } + + /// + /// Searches all clusters for a server with the given name. + /// Mirrors Go sc.serverByName. + /// + public NatsServer? ServerByName(string name) + { + foreach (var c in Clusters) + { + var s = c.ServerByName(name); + if (s != null) return s; + } + return null; + } + + /// + /// Returns the with the given cluster name (e.g. "C1"). + /// Mirrors Go sc.clusterForName. + /// + public TestCluster? ClusterForName(string name) + { + foreach (var c in Clusters) + { + if (c.Name == name) return c; + } + return null; + } + + // ========================================================================= + // Wait helpers + // ========================================================================= + + /// + /// Waits until a JetStream meta-leader is elected across all clusters. + /// Mirrors Go sc.waitOnLeader(). + /// + public void WaitOnLeader() + { + CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () => + { + if (Leader() == null) + return new Exception("SuperCluster: no JetStream meta-leader elected yet."); + return null; + }); + } + + /// + /// Waits until the named stream has an elected leader across all clusters. + /// Mirrors Go sc.waitOnStreamLeader. + /// + public void WaitOnStreamLeader(string account, string stream) + { + CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () => + { + foreach (var c in Clusters) + { + if (c.StreamLeader(account, stream) != null) return null; + } + return new Exception( + $"SuperCluster: no leader for stream '{stream}' in account '{account}'."); + }); + } + + // ========================================================================= + // Lifecycle + // ========================================================================= + + /// + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + foreach (var c in Clusters) + { + try { c.Dispose(); } catch { /* best effort */ } + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ZB.MOM.NatsNet.Server.IntegrationTests.csproj b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ZB.MOM.NatsNet.Server.IntegrationTests.csproj index e8740b2..e48edff 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ZB.MOM.NatsNet.Server.IntegrationTests.csproj +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ZB.MOM.NatsNet.Server.IntegrationTests.csproj @@ -20,6 +20,7 @@ +