docs: add design doc for SYSTEM and ACCOUNT connection types
Covers 6 implementation layers: ClientKind enum + INatsClient interface, event infrastructure with Channel<T>, system event publishing, request-reply monitoring services, import/export model with ACCOUNT client, and response routing with latency tracking.
This commit is contained in:
@@ -8,6 +8,7 @@ using System.Text;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.NKeys;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Monitoring;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Subscriptions;
|
||||
@@ -20,14 +21,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
private readonly NatsOptions _options;
|
||||
private readonly ConcurrentDictionary<ulong, NatsClient> _clients = new();
|
||||
private readonly ConcurrentQueue<ClosedClient> _closedClients = new();
|
||||
private const int MaxClosedClients = 10_000;
|
||||
private readonly ServerInfo _serverInfo;
|
||||
private readonly ILogger<NatsServer> _logger;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly ServerStats _stats = new();
|
||||
private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly AuthService _authService;
|
||||
private AuthService _authService;
|
||||
private readonly ConcurrentDictionary<string, Account> _accounts = new(StringComparer.Ordinal);
|
||||
|
||||
// Config reload state
|
||||
private NatsOptions? _cliSnapshot;
|
||||
private HashSet<string> _cliFlags = [];
|
||||
private string? _configDigest;
|
||||
private readonly Account _globalAccount;
|
||||
private readonly Account _systemAccount;
|
||||
private readonly SslServerAuthenticationOptions? _sslOptions;
|
||||
@@ -224,7 +229,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
_signalRegistrations.Add(PosixSignalRegistration.Create(PosixSignal.SIGHUP, ctx =>
|
||||
{
|
||||
ctx.Cancel = true;
|
||||
_logger.LogWarning("Trapped SIGHUP signal — config reload not yet supported");
|
||||
_logger.LogInformation("Trapped SIGHUP signal — reloading configuration");
|
||||
_ = Task.Run(() => ReloadConfig());
|
||||
}));
|
||||
|
||||
// SIGUSR1 and SIGUSR2 only on non-Windows
|
||||
@@ -320,6 +326,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
}
|
||||
|
||||
BuildCachedInfo();
|
||||
|
||||
// Store initial config digest for reload change detection
|
||||
if (options.ConfigFile != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
var (_, digest) = NatsConfParser.ParseFileWithDigest(options.ConfigFile);
|
||||
_configDigest = digest;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Could not compute initial config digest for {ConfigFile}", options.ConfigFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void BuildCachedInfo()
|
||||
@@ -354,8 +374,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
|
||||
|
||||
// Warn about stub features
|
||||
if (_options.ConfigFile != null)
|
||||
_logger.LogWarning("Config file parsing not yet supported (file: {ConfigFile})", _options.ConfigFile);
|
||||
if (_options.ProfPort > 0)
|
||||
_logger.LogWarning("Profiling endpoint not yet supported (port: {ProfPort})", _options.ProfPort);
|
||||
|
||||
@@ -696,7 +714,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
});
|
||||
|
||||
// Cap closed clients list
|
||||
while (_closedClients.Count > MaxClosedClients)
|
||||
while (_closedClients.Count > _options.MaxClosedClients)
|
||||
_closedClients.TryDequeue(out _);
|
||||
|
||||
var subList = client.Account?.SubList ?? _globalAccount.SubList;
|
||||
@@ -766,6 +784,155 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stores the CLI snapshot and flags so that command-line overrides
|
||||
/// always take precedence during config reload.
|
||||
/// </summary>
|
||||
public void SetCliSnapshot(NatsOptions cliSnapshot, HashSet<string> cliFlags)
|
||||
{
|
||||
_cliSnapshot = cliSnapshot;
|
||||
_cliFlags = cliFlags;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reloads the configuration file, diffs against current options, validates
|
||||
/// the changes, and applies reloadable settings. CLI overrides are preserved.
|
||||
/// </summary>
|
||||
public void ReloadConfig()
|
||||
{
|
||||
if (_options.ConfigFile == null)
|
||||
{
|
||||
_logger.LogWarning("No config file specified, cannot reload");
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var (newConfig, digest) = NatsConfParser.ParseFileWithDigest(_options.ConfigFile);
|
||||
if (digest == _configDigest)
|
||||
{
|
||||
_logger.LogInformation("Config file unchanged, no reload needed");
|
||||
return;
|
||||
}
|
||||
|
||||
var newOpts = new NatsOptions { ConfigFile = _options.ConfigFile };
|
||||
ConfigProcessor.ApplyConfig(newConfig, newOpts);
|
||||
|
||||
// CLI flags override config
|
||||
if (_cliSnapshot != null)
|
||||
ConfigReloader.MergeCliOverrides(newOpts, _cliSnapshot, _cliFlags);
|
||||
|
||||
var changes = ConfigReloader.Diff(_options, newOpts);
|
||||
var errors = ConfigReloader.Validate(changes);
|
||||
if (errors.Count > 0)
|
||||
{
|
||||
foreach (var err in errors)
|
||||
_logger.LogError("Config reload error: {Error}", err);
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply changes to running options
|
||||
ApplyConfigChanges(changes, newOpts);
|
||||
_configDigest = digest;
|
||||
_logger.LogInformation("Config reloaded successfully ({Count} changes applied)", changes.Count);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to reload config file: {ConfigFile}", _options.ConfigFile);
|
||||
}
|
||||
}
|
||||
|
||||
private void ApplyConfigChanges(List<IConfigChange> changes, NatsOptions newOpts)
|
||||
{
|
||||
bool hasLoggingChanges = false;
|
||||
bool hasAuthChanges = false;
|
||||
|
||||
foreach (var change in changes)
|
||||
{
|
||||
if (change.IsLoggingChange) hasLoggingChanges = true;
|
||||
if (change.IsAuthChange) hasAuthChanges = true;
|
||||
}
|
||||
|
||||
// Copy reloadable values from newOpts to _options
|
||||
CopyReloadableOptions(newOpts);
|
||||
|
||||
// Trigger side effects
|
||||
if (hasLoggingChanges)
|
||||
{
|
||||
ReOpenLogFile?.Invoke();
|
||||
_logger.LogInformation("Logging configuration reloaded");
|
||||
}
|
||||
|
||||
if (hasAuthChanges)
|
||||
{
|
||||
// Rebuild auth service with new options
|
||||
_authService = AuthService.Build(_options);
|
||||
_logger.LogInformation("Authorization configuration reloaded");
|
||||
}
|
||||
}
|
||||
|
||||
private void CopyReloadableOptions(NatsOptions newOpts)
|
||||
{
|
||||
// Logging
|
||||
_options.Debug = newOpts.Debug;
|
||||
_options.Trace = newOpts.Trace;
|
||||
_options.TraceVerbose = newOpts.TraceVerbose;
|
||||
_options.Logtime = newOpts.Logtime;
|
||||
_options.LogtimeUTC = newOpts.LogtimeUTC;
|
||||
_options.LogFile = newOpts.LogFile;
|
||||
_options.LogSizeLimit = newOpts.LogSizeLimit;
|
||||
_options.LogMaxFiles = newOpts.LogMaxFiles;
|
||||
_options.Syslog = newOpts.Syslog;
|
||||
_options.RemoteSyslog = newOpts.RemoteSyslog;
|
||||
|
||||
// Auth
|
||||
_options.Username = newOpts.Username;
|
||||
_options.Password = newOpts.Password;
|
||||
_options.Authorization = newOpts.Authorization;
|
||||
_options.Users = newOpts.Users;
|
||||
_options.NKeys = newOpts.NKeys;
|
||||
_options.NoAuthUser = newOpts.NoAuthUser;
|
||||
_options.AuthTimeout = newOpts.AuthTimeout;
|
||||
|
||||
// Limits
|
||||
_options.MaxConnections = newOpts.MaxConnections;
|
||||
_options.MaxPayload = newOpts.MaxPayload;
|
||||
_options.MaxPending = newOpts.MaxPending;
|
||||
_options.WriteDeadline = newOpts.WriteDeadline;
|
||||
_options.PingInterval = newOpts.PingInterval;
|
||||
_options.MaxPingsOut = newOpts.MaxPingsOut;
|
||||
_options.MaxControlLine = newOpts.MaxControlLine;
|
||||
_options.MaxSubs = newOpts.MaxSubs;
|
||||
_options.MaxSubTokens = newOpts.MaxSubTokens;
|
||||
_options.MaxTracedMsgLen = newOpts.MaxTracedMsgLen;
|
||||
_options.MaxClosedClients = newOpts.MaxClosedClients;
|
||||
|
||||
// TLS
|
||||
_options.TlsCert = newOpts.TlsCert;
|
||||
_options.TlsKey = newOpts.TlsKey;
|
||||
_options.TlsCaCert = newOpts.TlsCaCert;
|
||||
_options.TlsVerify = newOpts.TlsVerify;
|
||||
_options.TlsMap = newOpts.TlsMap;
|
||||
_options.TlsTimeout = newOpts.TlsTimeout;
|
||||
_options.TlsHandshakeFirst = newOpts.TlsHandshakeFirst;
|
||||
_options.TlsHandshakeFirstFallback = newOpts.TlsHandshakeFirstFallback;
|
||||
_options.AllowNonTls = newOpts.AllowNonTls;
|
||||
_options.TlsRateLimit = newOpts.TlsRateLimit;
|
||||
_options.TlsPinnedCerts = newOpts.TlsPinnedCerts;
|
||||
|
||||
// Misc
|
||||
_options.Tags = newOpts.Tags;
|
||||
_options.LameDuckDuration = newOpts.LameDuckDuration;
|
||||
_options.LameDuckGracePeriod = newOpts.LameDuckGracePeriod;
|
||||
_options.ClientAdvertise = newOpts.ClientAdvertise;
|
||||
_options.DisableSublistCache = newOpts.DisableSublistCache;
|
||||
_options.ConnectErrorReports = newOpts.ConnectErrorReports;
|
||||
_options.ReconnectErrorReports = newOpts.ReconnectErrorReports;
|
||||
_options.NoHeaderSupport = newOpts.NoHeaderSupport;
|
||||
_options.NoSystemAccount = newOpts.NoSystemAccount;
|
||||
_options.SystemAccount = newOpts.SystemAccount;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (!IsShuttingDown)
|
||||
|
||||
Reference in New Issue
Block a user