Port config hot-reload (44 tests), opts (1 test), account isolation (5 tests), auth callout (5 tests), and JWT validation (11 tests) from Go reload_test.go, opts_test.go, accounts_test.go, auth_callout_test.go, and jwt_test.go as behavioral blackbox integration tests against the .NET NatsServer using ReloadOptions() and the public NATS client API.
332 lines
11 KiB
C#
332 lines
11 KiB
C#
// Copyright 2012-2026 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
// Mirrors Go cluster struct and createJetStreamCluster* helpers from
|
|
// server/jetstream_helpers_test.go.
|
|
|
|
using ZB.MOM.NatsNet.Server;
|
|
|
|
namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers;
|
|
|
|
/// <summary>
|
|
/// Represents a multi-server JetStream cluster for integration tests.
|
|
/// Mirrors Go <c>cluster</c> struct from server/jetstream_helpers_test.go.
|
|
/// </summary>
|
|
internal sealed class TestCluster : IDisposable
|
|
{
|
|
// =========================================================================
|
|
// Properties
|
|
// =========================================================================
|
|
|
|
/// <summary>Running server instances in the cluster.</summary>
|
|
public NatsServer[] Servers { get; }
|
|
|
|
/// <summary>Options used to configure each server.</summary>
|
|
public ServerOptions[] Options { get; }
|
|
|
|
/// <summary>Name of this cluster (e.g. "HUB").</summary>
|
|
public string Name { get; }
|
|
|
|
private bool _disposed;
|
|
|
|
// =========================================================================
|
|
// Constructor
|
|
// =========================================================================
|
|
|
|
private TestCluster(NatsServer[] servers, ServerOptions[] options, string name)
|
|
{
|
|
Servers = servers;
|
|
Options = options;
|
|
Name = name;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Internal factory used by <see cref="TestSuperCluster"/> to wrap pre-started servers.
|
|
/// </summary>
|
|
internal static TestCluster FromServers(NatsServer[] servers, ServerOptions[] options, string name)
|
|
=> new(servers, options, name);
|
|
|
|
// =========================================================================
|
|
// Static factory: standard JetStream cluster
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Creates a JetStream cluster using the default <see cref="ConfigHelper.JsClusterTemplate"/>.
|
|
/// Mirrors Go <c>createJetStreamCluster</c>.
|
|
/// </summary>
|
|
public static TestCluster CreateJetStreamCluster(int numServers, string name) =>
|
|
CreateJetStreamClusterWithTemplate(ConfigHelper.JsClusterTemplate, numServers, name);
|
|
|
|
/// <summary>
|
|
/// Creates a JetStream cluster using the provided config <paramref name="template"/>.
|
|
/// Allocates free ports for each server's client and cluster listeners, builds route
|
|
/// URLs, generates per-server config from the template, starts all servers, and
|
|
/// waits for the cluster to form.
|
|
/// Mirrors Go <c>createJetStreamClusterWithTemplate</c>.
|
|
/// </summary>
|
|
public static TestCluster CreateJetStreamClusterWithTemplate(
|
|
string template,
|
|
int numServers,
|
|
string name)
|
|
{
|
|
// Allocate cluster (route) ports — one per server.
|
|
var clusterPorts = new int[numServers];
|
|
for (var i = 0; i < numServers; i++)
|
|
clusterPorts[i] = TestServerHelper.GetFreePort();
|
|
|
|
// Build the routes string shared by all servers in this cluster.
|
|
var routeUrls = string.Join(",", clusterPorts.Select(p => $"nats-route://127.0.0.1:{p}"));
|
|
|
|
var servers = new NatsServer[numServers];
|
|
var opts = new ServerOptions[numServers];
|
|
|
|
for (var i = 0; i < numServers; i++)
|
|
{
|
|
var serverName = $"{name}-S{i + 1}";
|
|
var storeDir = TestServerHelper.CreateTempDir($"js-{name}-{i + 1}-");
|
|
|
|
// Format template: {0}=server_name, {1}=store_dir, {2}=cluster_name,
|
|
// {3}=cluster_port, {4}=routes
|
|
var configContent = string.Format(
|
|
template,
|
|
serverName,
|
|
storeDir,
|
|
name,
|
|
clusterPorts[i],
|
|
routeUrls);
|
|
|
|
var configFile = ConfigHelper.CreateConfigFile(configContent);
|
|
|
|
var serverOpts = new ServerOptions
|
|
{
|
|
ServerName = serverName,
|
|
Host = "127.0.0.1",
|
|
Port = -1,
|
|
NoLog = true,
|
|
NoSigs = true,
|
|
JetStream = true,
|
|
StoreDir = storeDir,
|
|
ConfigFile = configFile,
|
|
Cluster = new ClusterOpts
|
|
{
|
|
Name = name,
|
|
Host = "127.0.0.1",
|
|
Port = clusterPorts[i],
|
|
},
|
|
Routes = clusterPorts
|
|
.Where((_, idx) => idx != i)
|
|
.Select(p => new Uri($"nats-route://127.0.0.1:{p}"))
|
|
.ToList(),
|
|
};
|
|
|
|
var (server, _) = TestServerHelper.RunServer(serverOpts);
|
|
servers[i] = server;
|
|
opts[i] = serverOpts;
|
|
}
|
|
|
|
var cluster = new TestCluster(servers, opts, name);
|
|
cluster.WaitOnClusterReady();
|
|
return cluster;
|
|
}
|
|
|
|
// =========================================================================
|
|
// Wait helpers
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Waits until all servers in the cluster have formed routes to one another.
|
|
/// Mirrors Go <c>checkClusterFormed</c>.
|
|
/// </summary>
|
|
public void WaitOnClusterReady()
|
|
{
|
|
CheckHelper.CheckClusterFormed(Servers);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Waits until at least one server in the cluster reports as JetStream meta-leader.
|
|
/// Mirrors Go <c>c.waitOnLeader</c>.
|
|
/// </summary>
|
|
public void WaitOnLeader()
|
|
{
|
|
CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () =>
|
|
{
|
|
var leader = Leader();
|
|
if (leader == null)
|
|
return new Exception($"Cluster {Name}: no JetStream meta-leader elected yet.");
|
|
return null;
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Waits until the named stream has an elected leader in the given account.
|
|
/// Mirrors Go <c>c.waitOnStreamLeader</c>.
|
|
/// </summary>
|
|
public void WaitOnStreamLeader(string account, string stream)
|
|
{
|
|
CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () =>
|
|
{
|
|
var leader = StreamLeader(account, stream);
|
|
if (leader == null)
|
|
return new Exception(
|
|
$"Cluster {Name}: no leader for stream '{stream}' in account '{account}'.");
|
|
return null;
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Waits until the named consumer has an elected leader.
|
|
/// Mirrors Go <c>c.waitOnConsumerLeader</c>.
|
|
/// </summary>
|
|
public void WaitOnConsumerLeader(string account, string stream, string consumer)
|
|
{
|
|
CheckHelper.CheckFor(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(100), () =>
|
|
{
|
|
var leader = ConsumerLeader(account, stream, consumer);
|
|
if (leader == null)
|
|
return new Exception(
|
|
$"Cluster {Name}: no leader for consumer '{consumer}' in stream '{stream}', account '{account}'.");
|
|
return null;
|
|
});
|
|
}
|
|
|
|
// =========================================================================
|
|
// Accessors
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Returns the server that is currently the JetStream meta-leader,
|
|
/// or null if no leader is elected.
|
|
/// Mirrors Go <c>c.leader()</c>.
|
|
/// </summary>
|
|
public NatsServer? Leader()
|
|
{
|
|
foreach (var s in Servers)
|
|
{
|
|
if (s.JetStreamIsLeader())
|
|
return s;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the server that is leader for the named stream in the given account,
|
|
/// or null if no leader is elected.
|
|
/// Mirrors Go <c>c.streamLeader</c>.
|
|
/// </summary>
|
|
public NatsServer? StreamLeader(string account, string stream)
|
|
{
|
|
foreach (var s in Servers)
|
|
{
|
|
if (s.JetStreamIsStreamLeader(account, stream))
|
|
return s;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the server that is leader for the named consumer,
|
|
/// or null if no leader is elected.
|
|
/// Mirrors Go <c>c.consumerLeader</c>.
|
|
/// </summary>
|
|
public NatsServer? ConsumerLeader(string account, string stream, string consumer)
|
|
{
|
|
foreach (var s in Servers)
|
|
{
|
|
if (s.JetStreamIsConsumerLeader(account, stream, consumer))
|
|
return s;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns a random running server from the cluster.
|
|
/// Mirrors Go <c>c.randomServer()</c>.
|
|
/// </summary>
|
|
public NatsServer RandomServer()
|
|
{
|
|
var candidates = Servers.Where(s => s.Running()).ToArray();
|
|
if (candidates.Length == 0)
|
|
throw new InvalidOperationException($"Cluster {Name}: no running servers.");
|
|
return candidates[Random.Shared.Next(candidates.Length)];
|
|
}
|
|
|
|
/// <summary>
|
|
/// Finds a server by its <see cref="ServerOptions.ServerName"/>.
|
|
/// Returns null if not found.
|
|
/// Mirrors Go <c>c.serverByName</c>.
|
|
/// </summary>
|
|
public NatsServer? ServerByName(string name)
|
|
{
|
|
foreach (var s in Servers)
|
|
{
|
|
if (s.Options.ServerName == name)
|
|
return s;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
// =========================================================================
|
|
// Lifecycle
|
|
// =========================================================================
|
|
|
|
/// <summary>Stops all servers in the cluster.</summary>
|
|
public void StopAll()
|
|
{
|
|
foreach (var s in Servers)
|
|
{
|
|
try { s.Shutdown(); } catch { /* best effort */ }
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Restarts all stopped servers.
|
|
/// Note: a true restart would re-create the server; here we call Start() if not running.
|
|
/// </summary>
|
|
public void RestartAll()
|
|
{
|
|
foreach (var (server, i) in Servers.Select((s, i) => (s, i)))
|
|
{
|
|
if (!server.Running())
|
|
{
|
|
try { server.Start(); } catch { /* best effort */ }
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>Shuts down and disposes all servers and cleans up temp files.</summary>
|
|
public void Dispose()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
|
|
foreach (var (server, i) in Servers.Select((s, i) => (s, i)))
|
|
{
|
|
try { server.Shutdown(); } catch { /* best effort */ }
|
|
|
|
// Clean up temp store dir.
|
|
var dir = Options[i].StoreDir;
|
|
if (!string.IsNullOrEmpty(dir) && Directory.Exists(dir))
|
|
{
|
|
try { Directory.Delete(dir, recursive: true); } catch { /* best effort */ }
|
|
}
|
|
|
|
// Clean up temp config file.
|
|
var cfg = Options[i].ConfigFile;
|
|
if (!string.IsNullOrEmpty(cfg) && File.Exists(cfg))
|
|
{
|
|
try { File.Delete(cfg); } catch { /* best effort */ }
|
|
}
|
|
}
|
|
}
|
|
}
|