Files
natsnet/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestCluster.cs
Joseph Doherty e846cb664a test(batch48): add integration test harness infrastructure
Create 7 helper files under ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/
and add Xunit.SkippableFact package. All tests skip gracefully via
IntegrationTestBase.CanBoot() guard until the .NET server runtime is complete.
2026-03-01 12:06:08 -05:00

332 lines
11 KiB
C#

// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Mirrors Go cluster struct and createJetStreamCluster* helpers from
// server/jetstream_helpers_test.go.
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers;
/// <summary>
/// Represents a multi-server JetStream cluster for integration tests.
/// Mirrors Go <c>cluster</c> struct from server/jetstream_helpers_test.go.
/// </summary>
internal sealed class TestCluster : IDisposable
{
// =========================================================================
// Properties
// =========================================================================
/// <summary>Running server instances in the cluster.</summary>
public NatsServer[] Servers { get; }
/// <summary>Options used to configure each server.</summary>
public ServerOptions[] Options { get; }
/// <summary>Name of this cluster (e.g. "HUB").</summary>
public string Name { get; }
private bool _disposed;
// =========================================================================
// Constructor
// =========================================================================
private TestCluster(NatsServer[] servers, ServerOptions[] options, string name)
{
Servers = servers;
Options = options;
Name = name;
}
/// <summary>
/// Internal factory used by <see cref="TestSuperCluster"/> to wrap pre-started servers.
/// </summary>
internal static TestCluster FromServers(NatsServer[] servers, ServerOptions[] options, string name)
=> new(servers, options, name);
// =========================================================================
// Static factory: standard JetStream cluster
// =========================================================================
/// <summary>
/// Creates a JetStream cluster using the default <see cref="ConfigHelper.JsClusterTemplate"/>.
/// Mirrors Go <c>createJetStreamCluster</c>.
/// </summary>
public static TestCluster CreateJetStreamCluster(int numServers, string name) =>
CreateJetStreamClusterWithTemplate(ConfigHelper.JsClusterTemplate, numServers, name);
/// <summary>
/// Creates a JetStream cluster using the provided config <paramref name="template"/>.
/// Allocates free ports for each server's client and cluster listeners, builds route
/// URLs, generates per-server config from the template, starts all servers, and
/// waits for the cluster to form.
/// Mirrors Go <c>createJetStreamClusterWithTemplate</c>.
/// </summary>
public static TestCluster CreateJetStreamClusterWithTemplate(
string template,
int numServers,
string name)
{
// Allocate cluster (route) ports — one per server.
var clusterPorts = new int[numServers];
for (var i = 0; i < numServers; i++)
clusterPorts[i] = TestServerHelper.GetFreePort();
// Build the routes string shared by all servers in this cluster.
var routeUrls = string.Join(",", clusterPorts.Select(p => $"nats-route://127.0.0.1:{p}"));
var servers = new NatsServer[numServers];
var opts = new ServerOptions[numServers];
for (var i = 0; i < numServers; i++)
{
var serverName = $"{name}-S{i + 1}";
var storeDir = TestServerHelper.CreateTempDir($"js-{name}-{i + 1}-");
// Format template: {0}=server_name, {1}=store_dir, {2}=cluster_name,
// {3}=cluster_port, {4}=routes
var configContent = string.Format(
template,
serverName,
storeDir,
name,
clusterPorts[i],
routeUrls);
var configFile = ConfigHelper.CreateConfigFile(configContent);
var serverOpts = new ServerOptions
{
ServerName = serverName,
Host = "127.0.0.1",
Port = -1,
NoLog = true,
NoSigs = true,
JetStream = true,
StoreDir = storeDir,
ConfigFile = configFile,
Cluster = new ClusterOpts
{
Name = name,
Host = "127.0.0.1",
Port = clusterPorts[i],
},
Routes = clusterPorts
.Where((_, idx) => idx != i)
.Select(p => new Uri($"nats-route://127.0.0.1:{p}"))
.ToList(),
};
var (server, _) = TestServerHelper.RunServer(serverOpts);
servers[i] = server;
opts[i] = serverOpts;
}
var cluster = new TestCluster(servers, opts, name);
cluster.WaitOnClusterReady();
return cluster;
}
// =========================================================================
// Wait helpers
// =========================================================================
/// <summary>
/// Waits until all servers in the cluster have formed routes to one another.
/// Mirrors Go <c>checkClusterFormed</c>.
/// </summary>
public void WaitOnClusterReady()
{
CheckHelper.CheckClusterFormed(Servers);
}
/// <summary>
/// Waits until at least one server in the cluster reports as JetStream meta-leader.
/// Mirrors Go <c>c.waitOnLeader</c>.
/// </summary>
public void WaitOnLeader()
{
CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () =>
{
var leader = Leader();
if (leader == null)
return new Exception($"Cluster {Name}: no JetStream meta-leader elected yet.");
return null;
});
}
/// <summary>
/// Waits until the named stream has an elected leader in the given account.
/// Mirrors Go <c>c.waitOnStreamLeader</c>.
/// </summary>
public void WaitOnStreamLeader(string account, string stream)
{
CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () =>
{
var leader = StreamLeader(account, stream);
if (leader == null)
return new Exception(
$"Cluster {Name}: no leader for stream '{stream}' in account '{account}'.");
return null;
});
}
/// <summary>
/// Waits until the named consumer has an elected leader.
/// Mirrors Go <c>c.waitOnConsumerLeader</c>.
/// </summary>
public void WaitOnConsumerLeader(string account, string stream, string consumer)
{
CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () =>
{
var leader = ConsumerLeader(account, stream, consumer);
if (leader == null)
return new Exception(
$"Cluster {Name}: no leader for consumer '{consumer}' in stream '{stream}', account '{account}'.");
return null;
});
}
// =========================================================================
// Accessors
// =========================================================================
/// <summary>
/// Returns the server that is currently the JetStream meta-leader,
/// or null if no leader is elected.
/// Mirrors Go <c>c.leader()</c>.
/// </summary>
public NatsServer? Leader()
{
foreach (var s in Servers)
{
if (s.JetStreamIsLeader())
return s;
}
return null;
}
/// <summary>
/// Returns the server that is leader for the named stream in the given account,
/// or null if no leader is elected.
/// Mirrors Go <c>c.streamLeader</c>.
/// </summary>
public NatsServer? StreamLeader(string account, string stream)
{
foreach (var s in Servers)
{
if (s.JetStreamIsStreamLeader(account, stream))
return s;
}
return null;
}
/// <summary>
/// Returns the server that is leader for the named consumer,
/// or null if no leader is elected.
/// Mirrors Go <c>c.consumerLeader</c>.
/// </summary>
public NatsServer? ConsumerLeader(string account, string stream, string consumer)
{
foreach (var s in Servers)
{
if (s.JetStreamIsConsumerLeader(account, stream, consumer))
return s;
}
return null;
}
/// <summary>
/// Returns a random running server from the cluster.
/// Mirrors Go <c>c.randomServer()</c>.
/// </summary>
public NatsServer RandomServer()
{
var candidates = Servers.Where(s => s.Running()).ToArray();
if (candidates.Length == 0)
throw new InvalidOperationException($"Cluster {Name}: no running servers.");
return candidates[Random.Shared.Next(candidates.Length)];
}
/// <summary>
/// Finds a server by its <see cref="ServerOptions.ServerName"/>.
/// Returns null if not found.
/// Mirrors Go <c>c.serverByName</c>.
/// </summary>
public NatsServer? ServerByName(string name)
{
foreach (var s in Servers)
{
if (s.Options.ServerName == name)
return s;
}
return null;
}
// =========================================================================
// Lifecycle
// =========================================================================
/// <summary>Stops all servers in the cluster.</summary>
public void StopAll()
{
foreach (var s in Servers)
{
try { s.Shutdown(); } catch { /* best effort */ }
}
}
/// <summary>
/// Restarts all stopped servers.
/// Note: a true restart would re-create the server; here we call Start() if not running.
/// </summary>
public void RestartAll()
{
foreach (var (server, i) in Servers.Select((s, i) => (s, i)))
{
if (!server.Running())
{
try { server.Start(); } catch { /* best effort */ }
}
}
}
/// <summary>Shuts down and disposes all servers and cleans up temp files.</summary>
public void Dispose()
{
if (_disposed) return;
_disposed = true;
foreach (var (server, i) in Servers.Select((s, i) => (s, i)))
{
try { server.Shutdown(); } catch { /* best effort */ }
// Clean up temp store dir.
var dir = Options[i].StoreDir;
if (!string.IsNullOrEmpty(dir) && Directory.Exists(dir))
{
try { Directory.Delete(dir, recursive: true); } catch { /* best effort */ }
}
// Clean up temp config file.
var cfg = Options[i].ConfigFile;
if (!string.IsNullOrEmpty(cfg) && File.Exists(cfg))
{
try { File.Delete(cfg); } catch { /* best effort */ }
}
}
}
}