feat: add reload semantics for cluster and jetstream options
This commit is contained in:
@@ -11,7 +11,8 @@ namespace NATS.Server.Configuration;
|
|||||||
public static class ConfigReloader
|
public static class ConfigReloader
|
||||||
{
|
{
|
||||||
// Non-reloadable options (match Go server — Host, Port, ServerName require restart)
|
// Non-reloadable options (match Go server — Host, Port, ServerName require restart)
|
||||||
private static readonly HashSet<string> NonReloadable = ["Host", "Port", "ServerName"];
|
private static readonly HashSet<string> NonReloadable =
|
||||||
|
["Host", "Port", "ServerName", "Cluster", "JetStream.StoreDir"];
|
||||||
|
|
||||||
// Logging-related options
|
// Logging-related options
|
||||||
private static readonly HashSet<string> LoggingOptions =
|
private static readonly HashSet<string> LoggingOptions =
|
||||||
@@ -102,6 +103,13 @@ public static class ConfigReloader
|
|||||||
CompareAndAdd(changes, "NoSystemAccount", oldOpts.NoSystemAccount, newOpts.NoSystemAccount);
|
CompareAndAdd(changes, "NoSystemAccount", oldOpts.NoSystemAccount, newOpts.NoSystemAccount);
|
||||||
CompareAndAdd(changes, "SystemAccount", oldOpts.SystemAccount, newOpts.SystemAccount);
|
CompareAndAdd(changes, "SystemAccount", oldOpts.SystemAccount, newOpts.SystemAccount);
|
||||||
|
|
||||||
|
// Cluster and JetStream (restart-required boundaries)
|
||||||
|
if (!ClusterEquivalent(oldOpts.Cluster, newOpts.Cluster))
|
||||||
|
changes.Add(new ConfigChange("Cluster", isNonReloadable: true));
|
||||||
|
|
||||||
|
if (JetStreamStoreDirChanged(oldOpts.JetStream, newOpts.JetStream))
|
||||||
|
changes.Add(new ConfigChange("JetStream.StoreDir", isNonReloadable: true));
|
||||||
|
|
||||||
return changes;
|
return changes;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -338,4 +346,35 @@ public static class ConfigReloader
|
|||||||
isNonReloadable: NonReloadable.Contains(name)));
|
isNonReloadable: NonReloadable.Contains(name)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static bool ClusterEquivalent(ClusterOptions? oldCluster, ClusterOptions? newCluster)
|
||||||
|
{
|
||||||
|
if (oldCluster is null && newCluster is null)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (oldCluster is null || newCluster is null)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (!string.Equals(oldCluster.Name, newCluster.Name, StringComparison.Ordinal))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (!string.Equals(oldCluster.Host, newCluster.Host, StringComparison.Ordinal))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (oldCluster.Port != newCluster.Port)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return oldCluster.Routes.SequenceEqual(newCluster.Routes, StringComparer.Ordinal);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static bool JetStreamStoreDirChanged(JetStreamOptions? oldJetStream, JetStreamOptions? newJetStream)
|
||||||
|
{
|
||||||
|
if (oldJetStream is null && newJetStream is null)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (oldJetStream is null || newJetStream is null)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
return !string.Equals(oldJetStream.StoreDir, newJetStream.StoreDir, StringComparison.Ordinal);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1025,10 +1025,22 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
/// the changes, and applies reloadable settings. CLI overrides are preserved.
|
/// the changes, and applies reloadable settings. CLI overrides are preserved.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public void ReloadConfig()
|
public void ReloadConfig()
|
||||||
|
{
|
||||||
|
ReloadConfigCore(throwOnError: false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ReloadConfigOrThrow()
|
||||||
|
{
|
||||||
|
ReloadConfigCore(throwOnError: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ReloadConfigCore(bool throwOnError)
|
||||||
{
|
{
|
||||||
if (_options.ConfigFile == null)
|
if (_options.ConfigFile == null)
|
||||||
{
|
{
|
||||||
_logger.LogWarning("No config file specified, cannot reload");
|
_logger.LogWarning("No config file specified, cannot reload");
|
||||||
|
if (throwOnError)
|
||||||
|
throw new InvalidOperationException("No config file specified.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1054,6 +1066,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
{
|
{
|
||||||
foreach (var err in errors)
|
foreach (var err in errors)
|
||||||
_logger.LogError("Config reload error: {Error}", err);
|
_logger.LogError("Config reload error: {Error}", err);
|
||||||
|
if (throwOnError)
|
||||||
|
throw new InvalidOperationException(string.Join("; ", errors));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1065,6 +1079,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "Failed to reload config file: {ConfigFile}", _options.ConfigFile);
|
_logger.LogError(ex, "Failed to reload config file: {ConfigFile}", _options.ConfigFile);
|
||||||
|
if (throwOnError)
|
||||||
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
64
tests/NATS.Server.Tests/JetStreamClusterReloadTests.cs
Normal file
64
tests/NATS.Server.Tests/JetStreamClusterReloadTests.cs
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NATS.Server.Configuration;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class JetStreamClusterReloadTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Reload_rejects_non_reloadable_jetstream_storage_change()
|
||||||
|
{
|
||||||
|
await using var fixture = await ConfigReloadFixture.StartJetStreamAsync();
|
||||||
|
|
||||||
|
var ex = await Should.ThrowAsync<InvalidOperationException>(() => fixture.ReloadAsync("jetstream { store_dir: '/new' }"));
|
||||||
|
ex.Message.ShouldContain("requires restart");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal sealed class ConfigReloadFixture : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly string _configPath;
|
||||||
|
private readonly NatsServer _server;
|
||||||
|
|
||||||
|
private ConfigReloadFixture(string configPath, NatsServer server)
|
||||||
|
{
|
||||||
|
_configPath = configPath;
|
||||||
|
_server = server;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Task<ConfigReloadFixture> StartJetStreamAsync()
|
||||||
|
{
|
||||||
|
var configPath = Path.Combine(Path.GetTempPath(), $"natsdotnet-reload-{Guid.NewGuid():N}.conf");
|
||||||
|
File.WriteAllText(configPath, "jetstream { store_dir: '/old' }");
|
||||||
|
|
||||||
|
var options = new NatsOptions
|
||||||
|
{
|
||||||
|
ConfigFile = configPath,
|
||||||
|
JetStream = new JetStreamOptions
|
||||||
|
{
|
||||||
|
StoreDir = "/old",
|
||||||
|
MaxMemoryStore = 1_024 * 1_024,
|
||||||
|
MaxFileStore = 10 * 1_024 * 1_024,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var server = new NatsServer(options, NullLoggerFactory.Instance);
|
||||||
|
return Task.FromResult(new ConfigReloadFixture(configPath, server));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task ReloadAsync(string configText)
|
||||||
|
{
|
||||||
|
File.WriteAllText(_configPath, configText);
|
||||||
|
_server.ReloadConfigOrThrow();
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
_server.Dispose();
|
||||||
|
if (File.Exists(_configPath))
|
||||||
|
File.Delete(_configPath);
|
||||||
|
|
||||||
|
return ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user