Implement ConfigureAuthorization, CheckAuthentication, and full auth dispatch in NatsServer.Auth.cs; add HandleSignals in NatsServer.Signals.cs; extend AuthHandler with GetAuthErrClosedState, ValidateProxies, GetTlsAuthDcs, CheckClientTlsCertSubject, ProcessUserPermissionsTemplate; add ReadOperatorJwt/ValidateTrustedOperators to JwtProcessor; add AuthCallout stub; add auth accessor helpers to ClientConnection; add NATS.NKeys package for NKey signature verification; 12 new tests pass.
1048 lines
37 KiB
C#
1048 lines
37 KiB
C#
// Copyright 2012-2026 The NATS Authors
|
||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
// you may not use this file except in compliance with the License.
|
||
// You may obtain a copy of the License at
|
||
//
|
||
// http://www.apache.org/licenses/LICENSE-2.0
|
||
//
|
||
// Unless required by applicable law or agreed to in writing, software
|
||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
// See the License for the specific language governing permissions and
|
||
// limitations under the License.
|
||
//
|
||
// Adapted from server/server.go in the NATS server Go source (lines 85–2579).
|
||
// Session 09: server initialization, validation, compression helpers.
|
||
|
||
using System.Net;
|
||
using System.Text.RegularExpressions;
|
||
using ZB.MOM.NatsNet.Server.Auth;
|
||
using ZB.MOM.NatsNet.Server.Internal;
|
||
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||
|
||
namespace ZB.MOM.NatsNet.Server;
|
||
|
||
public sealed partial class NatsServer
|
||
{
|
||
// =========================================================================
|
||
// Server protocol version helpers (features 2974–2975)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Returns the latest server-to-server protocol version, unless the test
|
||
/// override option is set.
|
||
/// Mirrors Go <c>Server.getServerProto()</c>.
|
||
/// </summary>
|
||
public int GetServerProto()
|
||
{
|
||
var opts = GetOpts();
|
||
var proto = ServerProtocol.MsgTraceProto;
|
||
if (opts.OverrideProto < 0)
|
||
{
|
||
// overrideProto was set by SetServerProtoForTest as (wantedProto+1)*-1
|
||
proto = (opts.OverrideProto * -1) - 1;
|
||
}
|
||
return proto;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Converts a wanted protocol level into the override value stored in options.
|
||
/// Mirrors Go <c>setServerProtoForTest</c> (test helper).
|
||
/// </summary>
|
||
public static int SetServerProtoForTest(int wantedProto) => (wantedProto + 1) * -1;
|
||
|
||
// =========================================================================
|
||
// Compression helpers (features 2976–2982)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Validates and normalises the <see cref="CompressionOpts.Mode"/> field.
|
||
/// Mirrors Go <c>validateAndNormalizeCompressionOption</c>.
|
||
/// </summary>
|
||
public static void ValidateAndNormalizeCompressionOption(CompressionOpts? c, string chosenModeForOn)
|
||
{
|
||
if (c is null) return;
|
||
|
||
var cmtl = c.Mode.ToLowerInvariant();
|
||
// Resolve "on" to the caller-chosen default.
|
||
switch (cmtl)
|
||
{
|
||
case "on":
|
||
case "enabled":
|
||
case "true":
|
||
cmtl = chosenModeForOn;
|
||
break;
|
||
}
|
||
|
||
switch (cmtl)
|
||
{
|
||
case "not supported":
|
||
case "not_supported":
|
||
c.Mode = CompressionMode.NotSupported;
|
||
break;
|
||
|
||
case "disabled":
|
||
case "off":
|
||
case "false":
|
||
c.Mode = CompressionMode.Off;
|
||
break;
|
||
|
||
case "accept":
|
||
c.Mode = CompressionMode.Accept;
|
||
break;
|
||
|
||
case "auto":
|
||
case "s2_auto":
|
||
{
|
||
List<TimeSpan> rtts;
|
||
if (c.RttThresholds.Count == 0)
|
||
{
|
||
rtts = [.. DefaultCompressionS2AutoRttThresholds];
|
||
}
|
||
else
|
||
{
|
||
rtts = [];
|
||
foreach (var n in c.RttThresholds)
|
||
{
|
||
var t = n < TimeSpan.Zero ? TimeSpan.Zero : n;
|
||
if (rtts.Count > 0 && t != TimeSpan.Zero)
|
||
{
|
||
foreach (var v in rtts)
|
||
{
|
||
if (t < v)
|
||
throw new InvalidOperationException(
|
||
$"RTT threshold values {c.RttThresholds} should be in ascending order");
|
||
}
|
||
}
|
||
rtts.Add(t);
|
||
}
|
||
|
||
// Trim trailing zeros.
|
||
int stop = -1;
|
||
for (int i = rtts.Count - 1; i >= 0; i--)
|
||
{
|
||
if (rtts[i] != TimeSpan.Zero) { stop = i; break; }
|
||
}
|
||
rtts = rtts[..(stop + 1)];
|
||
|
||
if (rtts.Count > 4)
|
||
throw new InvalidOperationException(
|
||
$"Compression mode \"{c.Mode}\" should have no more than 4 RTT thresholds");
|
||
if (rtts.Count == 0)
|
||
throw new InvalidOperationException(
|
||
$"Compression mode \"{c.Mode}\" requires at least one RTT threshold");
|
||
}
|
||
c.Mode = CompressionMode.S2Auto;
|
||
c.RttThresholds = rtts;
|
||
break;
|
||
}
|
||
|
||
case "fast":
|
||
case "s2_fast":
|
||
c.Mode = CompressionMode.S2Fast;
|
||
break;
|
||
|
||
case "better":
|
||
case "s2_better":
|
||
c.Mode = CompressionMode.S2Better;
|
||
break;
|
||
|
||
case "best":
|
||
case "s2_best":
|
||
c.Mode = CompressionMode.S2Best;
|
||
break;
|
||
|
||
default:
|
||
throw new InvalidOperationException($"Unsupported compression mode \"{c.Mode}\"");
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Default RTT thresholds for <see cref="CompressionMode.S2Auto"/> mode.
|
||
/// Mirrors Go <c>defaultCompressionS2AutoRTTThresholds</c>.
|
||
/// </summary>
|
||
public static readonly TimeSpan[] DefaultCompressionS2AutoRttThresholds =
|
||
[
|
||
TimeSpan.FromMilliseconds(10),
|
||
TimeSpan.FromMilliseconds(50),
|
||
TimeSpan.FromMilliseconds(100),
|
||
];
|
||
|
||
/// <summary>
|
||
/// Returns true if the compression mode requires negotiation with the remote.
|
||
/// Mirrors Go <c>needsCompression</c>.
|
||
/// </summary>
|
||
public static bool NeedsCompression(string m) =>
|
||
m != string.Empty && m != CompressionMode.Off && m != CompressionMode.NotSupported;
|
||
|
||
/// <summary>
|
||
/// Selects the effective compression mode given our local (<paramref name="scm"/>)
|
||
/// and remote (<paramref name="rcm"/>) compression modes.
|
||
/// Mirrors Go <c>selectCompressionMode</c>.
|
||
/// </summary>
|
||
public static string SelectCompressionMode(string scm, string rcm)
|
||
{
|
||
if (rcm == CompressionMode.NotSupported || rcm == string.Empty)
|
||
return CompressionMode.NotSupported;
|
||
|
||
switch (rcm)
|
||
{
|
||
case CompressionMode.Off:
|
||
return CompressionMode.Off;
|
||
|
||
case CompressionMode.Accept:
|
||
return scm == CompressionMode.Accept ? CompressionMode.Off : scm;
|
||
|
||
case CompressionMode.S2Auto:
|
||
case CompressionMode.S2Uncompressed:
|
||
case CompressionMode.S2Fast:
|
||
case CompressionMode.S2Better:
|
||
case CompressionMode.S2Best:
|
||
if (scm == CompressionMode.Accept)
|
||
return rcm == CompressionMode.S2Auto ? CompressionMode.S2Fast : rcm;
|
||
return scm;
|
||
|
||
default:
|
||
throw new InvalidOperationException($"Unsupported route compression mode \"{rcm}\"");
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Returns S2Auto if the configured mode is S2Auto; otherwise returns <paramref name="cm"/>.
|
||
/// Mirrors Go <c>compressionModeForInfoProtocol</c>.
|
||
/// </summary>
|
||
public static string CompressionModeForInfoProtocol(CompressionOpts co, string cm) =>
|
||
co.Mode == CompressionMode.S2Auto ? CompressionMode.S2Auto : cm;
|
||
|
||
/// <summary>
|
||
/// Given an RTT and the configured thresholds, returns the appropriate S2 level.
|
||
/// Mirrors Go <c>selectS2AutoModeBasedOnRTT</c>.
|
||
/// </summary>
|
||
public static string SelectS2AutoModeBasedOnRtt(TimeSpan rtt, IReadOnlyList<TimeSpan> rttThresholds)
|
||
{
|
||
int idx = -1;
|
||
for (int i = 0; i < rttThresholds.Count; i++)
|
||
{
|
||
if (rtt <= rttThresholds[i]) { idx = i; break; }
|
||
}
|
||
if (idx < 0)
|
||
{
|
||
// Not found — use "best" when ≥3 levels, otherwise last index.
|
||
idx = rttThresholds.Count >= 3 ? 3 : rttThresholds.Count - 1;
|
||
}
|
||
return idx switch
|
||
{
|
||
0 => CompressionMode.S2Uncompressed,
|
||
1 => CompressionMode.S2Fast,
|
||
2 => CompressionMode.S2Better,
|
||
_ => CompressionMode.S2Best,
|
||
};
|
||
}
|
||
|
||
/// <summary>
|
||
/// Returns true if the two <see cref="CompressionOpts"/> are logically equal.
|
||
/// Mirrors Go <c>compressOptsEqual</c>.
|
||
/// </summary>
|
||
public static bool CompressOptsEqual(CompressionOpts? c1, CompressionOpts? c2)
|
||
{
|
||
if (ReferenceEquals(c1, c2)) return true;
|
||
if (c1 is null || c2 is null) return false;
|
||
if (c1.Mode != c2.Mode) return false;
|
||
|
||
if (c1.Mode == CompressionMode.S2Auto)
|
||
{
|
||
var c1Rtts = c1.RttThresholds.Count == 0
|
||
? (IReadOnlyList<TimeSpan>)DefaultCompressionS2AutoRttThresholds
|
||
: c1.RttThresholds;
|
||
var c2Rtts = c2.RttThresholds.Count == 0
|
||
? (IReadOnlyList<TimeSpan>)DefaultCompressionS2AutoRttThresholds
|
||
: c2.RttThresholds;
|
||
|
||
if (c1Rtts.Count != c2Rtts.Count) return false;
|
||
for (int i = 0; i < c1Rtts.Count; i++)
|
||
if (c1Rtts[i] != c2Rtts[i]) return false;
|
||
}
|
||
return true;
|
||
}
|
||
|
||
// =========================================================================
|
||
// Factory methods (features 2983–2985)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Deprecated factory. Use <see cref="NewServer"/> instead.
|
||
/// Mirrors Go <c>New</c>.
|
||
/// </summary>
|
||
public static NatsServer? New(ServerOptions opts)
|
||
{
|
||
var (s, _) = NewServer(opts);
|
||
return s;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Creates a server from an options file path if <see cref="ServerOptions.ConfigFile"/> is set.
|
||
/// Mirrors Go <c>NewServerFromConfig</c>.
|
||
/// </summary>
|
||
public static (NatsServer? Server, Exception? Error) NewServerFromConfig(ServerOptions opts)
|
||
{
|
||
if (!string.IsNullOrEmpty(opts.ConfigFile) && string.IsNullOrEmpty(opts.ConfigDigest()))
|
||
{
|
||
// opts.ProcessConfigFile(opts.ConfigFile) — full config file parsing deferred to session 03.
|
||
// For now, skip re-processing since Phase 6 tests supply options directly.
|
||
}
|
||
return NewServer(opts);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Creates and fully initializes a new NATS server.
|
||
/// Mirrors Go <c>NewServer</c>.
|
||
/// </summary>
|
||
public static (NatsServer? Server, Exception? Error) NewServer(ServerOptions opts)
|
||
{
|
||
opts.SetBaselineOptions();
|
||
|
||
// Generate server NKey identity.
|
||
// In Go: nkeys.CreateServer() — simplified here with Guid-based id.
|
||
var pub = Guid.NewGuid().ToString("N"); // mirrors kp.PublicKey()
|
||
// xkp (curve keys for encryption) — stub; full implementation in session 09 crypto.
|
||
var xpub = string.Empty;
|
||
|
||
var serverName = !string.IsNullOrEmpty(opts.ServerName) ? opts.ServerName : pub;
|
||
var httpBasePath = ServerOptions.NormalizeBasePath(opts.HttpBasePath);
|
||
|
||
// Validate options.
|
||
var valErr = ValidateOptions(opts);
|
||
if (valErr != null) return (null, valErr);
|
||
|
||
var now = DateTime.UtcNow;
|
||
var tlsReq = opts.TlsConfig != null;
|
||
var verify = tlsReq && opts.TlsVerify;
|
||
|
||
var info = new ServerInfo
|
||
{
|
||
Id = pub,
|
||
XKey = xpub,
|
||
Version = ServerConstants.Version,
|
||
Proto = ServerConstants.Proto,
|
||
GitCommit = ServerConstants.GitCommit,
|
||
GoVersion = System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription,
|
||
Name = serverName,
|
||
Host = opts.Host,
|
||
Port = opts.Port,
|
||
AuthRequired = false,
|
||
TlsRequired = tlsReq && !opts.AllowNonTls,
|
||
TlsVerify = verify,
|
||
MaxPayload = opts.MaxPayload,
|
||
JetStream = opts.JetStream,
|
||
Headers = !opts.NoHeaderSupport,
|
||
Cluster = opts.Cluster.Name,
|
||
Domain = opts.JetStreamDomain,
|
||
JsApiLevel = 1, // stub — session 19
|
||
};
|
||
|
||
if (tlsReq && !info.TlsRequired)
|
||
info.TlsAvailable = true;
|
||
|
||
var s = new NatsServer(opts)
|
||
{
|
||
_info = info,
|
||
_configFile = opts.ConfigFile,
|
||
_start = now,
|
||
_configTime = now,
|
||
_pub = pub,
|
||
_xpub = xpub,
|
||
_httpBasePath = httpBasePath,
|
||
};
|
||
|
||
// Fill sync semaphore.
|
||
// (Already at capacity since SemaphoreSlim starts at MaxConcurrentSyncRequests)
|
||
|
||
if (opts.TlsRateLimit > 0)
|
||
s._connRateCounter = RateCounterFactory.Create(opts.TlsRateLimit);
|
||
|
||
// Process trusted operator keys.
|
||
if (!s.ProcessTrustedKeys())
|
||
return (null, new InvalidOperationException("Error processing trusted operator keys"));
|
||
|
||
// Handle leaf-node-only scenario (no cluster, needs stable cluster name).
|
||
if (opts.LeafNode.Remotes.Count > 0 && opts.Cluster.Port == 0 && string.IsNullOrEmpty(opts.Cluster.Name))
|
||
{
|
||
s._leafNoCluster = true;
|
||
opts.Cluster.Name = opts.ServerName;
|
||
}
|
||
|
||
if (!string.IsNullOrEmpty(opts.Cluster.Name))
|
||
{
|
||
s._cnMu.EnterWriteLock();
|
||
try { s._cn = opts.Cluster.Name; }
|
||
finally { s._cnMu.ExitWriteLock(); }
|
||
}
|
||
|
||
s._mu.EnterWriteLock();
|
||
try
|
||
{
|
||
// Process proxies trusted public keys (stub — session 09 proxy).
|
||
s.ProcessProxiesTrustedKeys();
|
||
|
||
// JetStream node info.
|
||
if (opts.JetStream)
|
||
{
|
||
var ourNode = GetHash(serverName);
|
||
s._nodeToInfo.TryAdd(ourNode, new NodeInfo
|
||
{
|
||
Name = serverName,
|
||
Version = ServerConstants.Version,
|
||
Cluster = opts.Cluster.Name,
|
||
Domain = opts.JetStreamDomain,
|
||
Id = info.Id,
|
||
Tags = [.. opts.Tags],
|
||
Js = true,
|
||
BinarySnapshots = true,
|
||
AccountNrg = true,
|
||
});
|
||
}
|
||
|
||
// Route resolver.
|
||
s._routeResolver = null; // Default system DNS — session 14
|
||
|
||
// URL maps.
|
||
// (Initialized via new() in field declarations)
|
||
|
||
// Assign leaf options.
|
||
s._leafNodeEnabled = opts.LeafNode.Port != 0 || opts.LeafNode.Remotes.Count > 0;
|
||
|
||
// OCSP (stub — session 23).
|
||
// s.EnableOcsp() — deferred
|
||
|
||
// Gateway (stub — session 16).
|
||
// s.NewGateway(opts) — deferred
|
||
|
||
// Cluster name.
|
||
if (opts.Cluster.Port != 0 && string.IsNullOrEmpty(opts.Cluster.Name))
|
||
s._info.Cluster = Guid.NewGuid().ToString("N");
|
||
else if (!string.IsNullOrEmpty(opts.Cluster.Name))
|
||
s._info.Cluster = opts.Cluster.Name;
|
||
|
||
// INFO host/port (stub — needs listener port, session 10).
|
||
s.SetInfoHostPort();
|
||
|
||
// Client tracking.
|
||
// (Initialized in field declarations)
|
||
|
||
// Closed-clients ring buffer.
|
||
s._closed = new ClosedRingBuffer(opts.MaxClosedClients);
|
||
|
||
// Route structures.
|
||
s.InitRouteStructures(opts);
|
||
|
||
// Quit channel already via CancellationTokenSource.
|
||
|
||
// Check account resolver.
|
||
var resolverErr = s.ConfigureResolver();
|
||
if (resolverErr != null)
|
||
{
|
||
s._mu.ExitWriteLock();
|
||
return (null, resolverErr);
|
||
}
|
||
|
||
// URL account resolver health check (stub — session 11 for URLAccResolver).
|
||
|
||
// Operator mode bootstrap: inject temporary system account if needed.
|
||
// (stub — requires full Account implementation from session 11)
|
||
|
||
// Configure accounts.
|
||
var (_, accErr) = s.ConfigureAccounts(false);
|
||
if (accErr != null)
|
||
{
|
||
s._mu.ExitWriteLock();
|
||
return (null, accErr);
|
||
}
|
||
|
||
// Configure authorization (stub — session 09 auth).
|
||
s.ConfigureAuthorization();
|
||
|
||
// Signal handler (stub — session 04 already handled signals separately).
|
||
s.HandleSignals();
|
||
}
|
||
finally
|
||
{
|
||
if (s._mu.IsWriteLockHeld)
|
||
s._mu.ExitWriteLock();
|
||
}
|
||
|
||
return (s, null);
|
||
}
|
||
|
||
// =========================================================================
|
||
// Route structures (feature 2986)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Initializes route tracking structures based on pool size and pinned accounts.
|
||
/// Mirrors Go <c>Server.initRouteStructures</c>.
|
||
/// Server lock must be held on entry.
|
||
/// </summary>
|
||
public void InitRouteStructures(ServerOptions opts)
|
||
{
|
||
_routes = [];
|
||
_routesPoolSize = opts.Cluster.PoolSize > 0 ? opts.Cluster.PoolSize : 1;
|
||
|
||
if (opts.Cluster.PinnedAccounts.Count > 0)
|
||
{
|
||
_accRoutes = [];
|
||
foreach (var acc in opts.Cluster.PinnedAccounts)
|
||
_accRoutes[acc] = [];
|
||
}
|
||
}
|
||
|
||
// =========================================================================
|
||
// logRejectedTLSConns background loop (feature 2987)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Background loop that logs TLS rate-limited connection rejections every second.
|
||
/// Mirrors Go <c>Server.logRejectedTLSConns</c>.
|
||
/// </summary>
|
||
internal async Task LogRejectedTlsConnsAsync(CancellationToken ct)
|
||
{
|
||
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
|
||
while (!ct.IsCancellationRequested)
|
||
{
|
||
try { await timer.WaitForNextTickAsync(ct); }
|
||
catch (OperationCanceledException) { break; }
|
||
|
||
if (_connRateCounter is not null)
|
||
{
|
||
var blocked = _connRateCounter.CountBlocked();
|
||
if (blocked > 0)
|
||
Warnf("Rejected {0} connections due to TLS rate limiting", blocked);
|
||
}
|
||
}
|
||
}
|
||
|
||
// =========================================================================
|
||
// Cluster name (features 2988–2991)
|
||
// =========================================================================
|
||
|
||
/// <summary>Returns the cluster name from the server info.</summary>
|
||
public string ClusterName()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _info.Cluster ?? string.Empty; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Returns cluster name via the dedicated cluster-name lock (faster).</summary>
|
||
public string CachedClusterName()
|
||
{
|
||
_cnMu.EnterReadLock();
|
||
try { return _cn; }
|
||
finally { _cnMu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>
|
||
/// Updates the cluster name and notifies affected leaf nodes.
|
||
/// Mirrors Go <c>Server.setClusterName</c>.
|
||
/// </summary>
|
||
public void SetClusterName(string name)
|
||
{
|
||
_mu.EnterWriteLock();
|
||
_info.Cluster = name;
|
||
_routeInfo.Cluster = name;
|
||
_mu.ExitWriteLock();
|
||
|
||
_cnMu.EnterWriteLock();
|
||
try { _cn = name; }
|
||
finally { _cnMu.ExitWriteLock(); }
|
||
|
||
Noticef("Cluster name updated to {0}", name);
|
||
}
|
||
|
||
/// <summary>Returns true if the cluster name was not set in config (dynamic assignment).</summary>
|
||
public bool IsClusterNameDynamic()
|
||
{
|
||
_optsMu.EnterReadLock();
|
||
try { return string.IsNullOrEmpty(_opts.Cluster.Name); }
|
||
finally { _optsMu.ExitReadLock(); }
|
||
}
|
||
|
||
// =========================================================================
|
||
// Server name / URLs (features 2992–2994)
|
||
// =========================================================================
|
||
|
||
/// <summary>Returns the configured server name.</summary>
|
||
public string ServerName() => GetOpts().ServerName;
|
||
|
||
/// <summary>
|
||
/// Returns the URL used to connect clients.
|
||
/// Mirrors Go <c>Server.ClientURL</c>.
|
||
/// </summary>
|
||
public string ClientUrl()
|
||
{
|
||
var opts = GetOpts();
|
||
var scheme = opts.TlsConfig != null ? "tls" : "nats";
|
||
return $"{scheme}://{opts.Host}:{opts.Port}";
|
||
}
|
||
|
||
/// <summary>
|
||
/// Returns the URL used to connect WebSocket clients.
|
||
/// Mirrors Go <c>Server.WebsocketURL</c>.
|
||
/// </summary>
|
||
public string WebsocketUrl()
|
||
{
|
||
var opts = GetOpts();
|
||
var scheme = opts.Websocket.TlsConfig != null ? "wss" : "ws";
|
||
return $"{scheme}://{opts.Websocket.Host}:{opts.Websocket.Port}";
|
||
}
|
||
|
||
// =========================================================================
|
||
// Validation (features 2995–2997)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Validates cluster configuration options.
|
||
/// Mirrors Go <c>validateCluster</c>.
|
||
/// </summary>
|
||
public static Exception? ValidateCluster(ServerOptions o)
|
||
{
|
||
if (!string.IsNullOrEmpty(o.Cluster.Name) && o.Cluster.Name.Contains(' '))
|
||
return ServerErrors.ErrClusterNameHasSpaces;
|
||
|
||
if (!string.IsNullOrEmpty(o.Cluster.Compression.Mode))
|
||
{
|
||
try { ValidateAndNormalizeCompressionOption(o.Cluster.Compression, CompressionMode.S2Fast); }
|
||
catch (Exception ex) { return ex; }
|
||
}
|
||
|
||
var pinnedErr = ValidatePinnedCerts(o.Cluster.TlsPinnedCerts);
|
||
if (pinnedErr != null)
|
||
return new InvalidOperationException($"cluster: {pinnedErr.Message}", pinnedErr);
|
||
|
||
// Sync gateway name with cluster name.
|
||
if (!string.IsNullOrEmpty(o.Gateway.Name) && o.Gateway.Name != o.Cluster.Name)
|
||
{
|
||
if (!string.IsNullOrEmpty(o.Cluster.Name))
|
||
return ServerErrors.ErrClusterNameConfigConflict;
|
||
o.Cluster.Name = o.Gateway.Name;
|
||
}
|
||
|
||
if (o.Cluster.PinnedAccounts.Count > 0)
|
||
{
|
||
if (o.Cluster.PoolSize < 0)
|
||
return new InvalidOperationException("pool_size cannot be negative if pinned accounts are specified");
|
||
|
||
var seen = new HashSet<string>(StringComparer.Ordinal);
|
||
foreach (var a in o.Cluster.PinnedAccounts)
|
||
{
|
||
if (!seen.Add(a))
|
||
return new InvalidOperationException(
|
||
$"found duplicate account name \"{a}\" in pinned accounts list");
|
||
}
|
||
}
|
||
return null;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Validates pinned certificate SHA-256 fingerprints.
|
||
/// Mirrors Go <c>validatePinnedCerts</c>.
|
||
/// </summary>
|
||
public static Exception? ValidatePinnedCerts(PinnedCertSet? pinned)
|
||
{
|
||
if (pinned is null) return null;
|
||
var re = new Regex("^[a-f0-9]{64}$", RegexOptions.Compiled);
|
||
foreach (var certId in pinned)
|
||
{
|
||
if (!re.IsMatch(certId.ToLowerInvariant()))
|
||
return new InvalidOperationException(
|
||
$"error parsing 'pinned_certs' key {certId} does not look like lower case hex-encoded sha256 of DER encoded SubjectPublicKeyInfo");
|
||
}
|
||
return null;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Validates all server options.
|
||
/// Mirrors Go <c>validateOptions</c>.
|
||
/// </summary>
|
||
public static Exception? ValidateOptions(ServerOptions o)
|
||
{
|
||
if (o.LameDuckDuration > TimeSpan.Zero && o.LameDuckGracePeriod >= o.LameDuckDuration)
|
||
return new InvalidOperationException(
|
||
$"lame duck grace period ({o.LameDuckGracePeriod}) should be strictly lower than lame duck duration ({o.LameDuckDuration})");
|
||
|
||
if ((long)o.MaxPayload > o.MaxPending)
|
||
return new InvalidOperationException(
|
||
$"max_payload ({o.MaxPayload}) cannot be higher than max_pending ({o.MaxPending})");
|
||
|
||
if (!string.IsNullOrEmpty(o.ServerName) && o.ServerName.Contains(' '))
|
||
return new InvalidOperationException("server name cannot contain spaces");
|
||
|
||
// Trusted operators, leafnode, auth, proxies, gateway, cluster, MQTT, websocket
|
||
// — validation stubs delegating to not-yet-ported subsystems.
|
||
var err = ValidateCluster(o);
|
||
return err;
|
||
}
|
||
|
||
// =========================================================================
|
||
// Options accessors (features 2998–2999)
|
||
// =========================================================================
|
||
|
||
/// <summary>Returns a thread-safe snapshot of the current options.</summary>
|
||
public ServerOptions GetOpts()
|
||
{
|
||
_optsMu.EnterReadLock();
|
||
try { return _opts; }
|
||
finally { _optsMu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>Replaces the options atomically (used during config reload).</summary>
|
||
public void SetOpts(ServerOptions opts)
|
||
{
|
||
_optsMu.EnterWriteLock();
|
||
try { _opts = opts; }
|
||
finally { _optsMu.ExitWriteLock(); }
|
||
}
|
||
|
||
// =========================================================================
|
||
// Global account (feature 3000)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Returns the global account (internal, lock-protected).
|
||
/// Mirrors Go <c>Server.globalAccount</c>.
|
||
/// </summary>
|
||
internal Account? GlobalAccountInternal()
|
||
{
|
||
_mu.EnterReadLock();
|
||
try { return _gacc; }
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
// =========================================================================
|
||
// Trusted keys (features 3008–3011)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Returns true if the given issuer key is trusted.
|
||
/// Mirrors Go <c>Server.isTrustedIssuer</c>.
|
||
/// </summary>
|
||
public bool IsTrustedIssuer(string issuer)
|
||
{
|
||
_mu.EnterReadLock();
|
||
try
|
||
{
|
||
if (_trustedKeys is null && string.IsNullOrEmpty(issuer)) return true;
|
||
return _trustedKeys?.Contains(issuer) == true;
|
||
}
|
||
finally { _mu.ExitReadLock(); }
|
||
}
|
||
|
||
/// <summary>
|
||
/// Processes binary-stamped and options-based trusted NKeys.
|
||
/// Mirrors Go <c>Server.processTrustedKeys</c>.
|
||
/// </summary>
|
||
public bool ProcessTrustedKeys()
|
||
{
|
||
_strictSigningKeyUsage = [];
|
||
var opts = GetOpts();
|
||
|
||
if (!string.IsNullOrEmpty(StampedTrustedKeys) && !InitStampedTrustedKeys())
|
||
return false;
|
||
|
||
if (opts.TrustedKeys is { Count: > 0 })
|
||
{
|
||
foreach (var key in opts.TrustedKeys)
|
||
{
|
||
if (!IsValidPublicOperatorKey(key))
|
||
return false;
|
||
}
|
||
_trustedKeys = [.. opts.TrustedKeys];
|
||
|
||
foreach (var claim in opts.TrustedOperators)
|
||
{
|
||
// stub: claim.StrictSigningKeyUsage / claim.SigningKeys — session 06
|
||
// Will be populated in auth session when TrustedOperator is fully typed.
|
||
}
|
||
}
|
||
return true;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Parses a space-separated list of public operator NKeys.
|
||
/// Mirrors Go <c>checkTrustedKeyString</c>.
|
||
/// </summary>
|
||
public static List<string>? CheckTrustedKeyString(string keys)
|
||
{
|
||
var tks = keys.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||
if (tks.Length == 0) return null;
|
||
foreach (var key in tks)
|
||
{
|
||
if (!IsValidPublicOperatorKey(key)) return null;
|
||
}
|
||
return [.. tks];
|
||
}
|
||
|
||
/// <summary>
|
||
/// Initialises trusted keys from the binary-stamped <see cref="StampedTrustedKeys"/> field.
|
||
/// Mirrors Go <c>Server.initStampedTrustedKeys</c>.
|
||
/// </summary>
|
||
public bool InitStampedTrustedKeys()
|
||
{
|
||
if (GetOpts().TrustedKeys is { Count: > 0 }) return false;
|
||
var tks = CheckTrustedKeyString(StampedTrustedKeys);
|
||
if (tks is null) return false;
|
||
_trustedKeys = tks;
|
||
return true;
|
||
}
|
||
|
||
// =========================================================================
|
||
// CLI helpers (features 3012–3014)
|
||
// =========================================================================
|
||
|
||
/// <summary>Prints <paramref name="msg"/> to stderr and exits with code 1.</summary>
|
||
public static void PrintAndDie(string msg)
|
||
{
|
||
Console.Error.WriteLine(msg);
|
||
Environment.Exit(1);
|
||
}
|
||
|
||
/// <summary>Prints the server version string and exits with code 0.</summary>
|
||
public static void PrintServerAndExit()
|
||
{
|
||
Console.WriteLine($"nats-server: v{ServerConstants.Version}");
|
||
Environment.Exit(0);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Processes subcommands in the CLI args array.
|
||
/// Mirrors Go <c>ProcessCommandLineArgs</c>.
|
||
/// Returns (showVersion, showHelp, error).
|
||
/// </summary>
|
||
public static (bool ShowVersion, bool ShowHelp, Exception? Error)
|
||
ProcessCommandLineArgs(string[] args)
|
||
{
|
||
foreach (var arg in args)
|
||
{
|
||
switch (arg.ToLowerInvariant())
|
||
{
|
||
case "version": return (true, false, null);
|
||
case "help": return (false, true, null);
|
||
default:
|
||
return (false, false,
|
||
new InvalidOperationException($"Unknown argument: {arg}"));
|
||
}
|
||
}
|
||
return (false, false, null);
|
||
}
|
||
|
||
// =========================================================================
|
||
// Running state (features 3015–3016)
|
||
// =========================================================================
|
||
|
||
/// <summary>Returns true if the server is running.</summary>
|
||
public bool Running() => IsRunning();
|
||
|
||
/// <summary>Protected check on running state.</summary>
|
||
private bool IsRunning() => Interlocked.CompareExchange(ref _running, 0, 0) != 0;
|
||
|
||
/// <summary>Returns true if the server is shutting down.</summary>
|
||
public bool IsShuttingDown() => Interlocked.CompareExchange(ref _shutdown, 0, 0) != 0;
|
||
|
||
// =========================================================================
|
||
// PID file (feature 3017)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Writes the process PID to the configured PID file.
|
||
/// Mirrors Go <c>Server.logPid</c>.
|
||
/// </summary>
|
||
public Exception? LogPid()
|
||
{
|
||
var pidFile = GetOpts().PidFile;
|
||
if (string.IsNullOrEmpty(pidFile)) return null;
|
||
try
|
||
{
|
||
File.WriteAllText(pidFile, Environment.ProcessId.ToString());
|
||
return null;
|
||
}
|
||
catch (Exception ex) { return ex; }
|
||
}
|
||
|
||
// =========================================================================
|
||
// Active account counters (features 3018–3021)
|
||
// =========================================================================
|
||
|
||
/// <summary>Returns the number of reserved accounts (currently always 1).</summary>
|
||
public int NumReservedAccounts() => 1;
|
||
|
||
/// <summary>Reports the number of active accounts on this server.</summary>
|
||
public int NumActiveAccounts() => Interlocked.CompareExchange(ref _activeAccounts, 0, 0);
|
||
|
||
// =========================================================================
|
||
// Misc helpers
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Sets the INFO host/port from either <c>ClientAdvertise</c> or the bind options.
|
||
/// Mirrors Go <c>Server.setInfoHostPort()</c>.
|
||
/// Returns non-null on parse error.
|
||
/// </summary>
|
||
internal Exception? SetInfoHostPort()
|
||
{
|
||
var opts = GetOpts();
|
||
if (!string.IsNullOrEmpty(opts.ClientAdvertise))
|
||
{
|
||
var (h, p, err) = Internal.ServerUtilities.ParseHostPort(opts.ClientAdvertise, opts.Port);
|
||
if (err != null) return err;
|
||
_info.Host = h;
|
||
_info.Port = p;
|
||
}
|
||
else
|
||
{
|
||
_info.Host = opts.Host;
|
||
_info.Port = opts.Port;
|
||
}
|
||
return null;
|
||
}
|
||
|
||
// ConfigureAuthorization, HandleSignals, ProcessProxiesTrustedKeys
|
||
// are implemented in NatsServer.Auth.cs and NatsServer.Signals.cs.
|
||
|
||
/// <summary>
|
||
/// Computes a stable short hash from a string (used for JetStream node names).
|
||
/// Mirrors Go <c>getHash</c>.
|
||
/// </summary>
|
||
internal static string GetHash(string s)
|
||
{
|
||
var bytes = System.Text.Encoding.UTF8.GetBytes(s);
|
||
var hash = System.Security.Cryptography.SHA256.HashData(bytes);
|
||
return Convert.ToBase64String(hash)[..8].Replace('+', '-').Replace('/', '_');
|
||
}
|
||
|
||
/// <summary>
|
||
/// Validates that a string is a valid public operator NKey.
|
||
/// Mirrors Go <c>nkeys.IsValidPublicOperatorKey</c>.
|
||
/// Simplified: checks length and prefix 'O' for operator keys.
|
||
/// </summary>
|
||
internal static bool IsValidPublicOperatorKey(string key) =>
|
||
!string.IsNullOrEmpty(key) && key.Length == 56 && key[0] == 'O';
|
||
|
||
// =========================================================================
|
||
// Start (feature 3049)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Starts the server (non-blocking). Writes startup log lines and begins accept loops.
|
||
/// Full implementation requires sessions 10-23 (gateway, websocket, leafnode, routes, etc.).
|
||
/// This stub handles the bootstrap sequence up to the subsystems not yet ported.
|
||
/// Mirrors Go <c>Server.Start</c>.
|
||
/// </summary>
|
||
public void Start()
|
||
{
|
||
Noticef("Starting nats-server");
|
||
|
||
var gc = string.IsNullOrEmpty(ServerConstants.GitCommit) ? "not set" : ServerConstants.GitCommit;
|
||
var opts = GetOpts();
|
||
|
||
_mu.EnterReadLock();
|
||
var leafNoCluster = _leafNoCluster;
|
||
_mu.ExitReadLock();
|
||
|
||
var clusterName = leafNoCluster ? string.Empty : ClusterName();
|
||
|
||
Noticef(" Version: {0}", ServerConstants.Version);
|
||
Noticef(" Git: [{0}]", gc);
|
||
if (!string.IsNullOrEmpty(clusterName))
|
||
Noticef(" Cluster: {0}", clusterName);
|
||
Noticef(" Name: {0}", _info.Name);
|
||
Noticef(" ID: {0}", _info.Id);
|
||
|
||
// Avoid RACE between Start() and Shutdown().
|
||
Interlocked.Exchange(ref _running, 1);
|
||
|
||
_mu.EnterWriteLock();
|
||
_leafNodeEnabled = opts.LeafNode.Port != 0 || opts.LeafNode.Remotes.Count > 0;
|
||
_mu.ExitWriteLock();
|
||
|
||
lock (_grMu) { _grRunning = true; }
|
||
|
||
// Log PID.
|
||
if (!string.IsNullOrEmpty(opts.PidFile))
|
||
{
|
||
var pidErr = LogPid();
|
||
if (pidErr != null)
|
||
{
|
||
Fatalf("Could not write pidfile: {0}", pidErr);
|
||
return;
|
||
}
|
||
}
|
||
|
||
// System account setup.
|
||
if (!string.IsNullOrEmpty(opts.SystemAccount))
|
||
{
|
||
var saErr = SetSystemAccount(opts.SystemAccount);
|
||
if (saErr != null)
|
||
{
|
||
Fatalf("Can't set system account: {0}", saErr);
|
||
return;
|
||
}
|
||
}
|
||
else if (!opts.NoSystemAccount)
|
||
{
|
||
SetDefaultSystemAccount();
|
||
}
|
||
|
||
// Signal startup complete.
|
||
_startupComplete.TrySetResult();
|
||
|
||
Noticef("Server is ready");
|
||
}
|
||
|
||
// =========================================================================
|
||
// Account resolver (feature 3002)
|
||
// =========================================================================
|
||
|
||
/// <summary>
|
||
/// Wires up the account resolver from opts and preloads any JWT claims.
|
||
/// Mirrors Go <c>Server.configureResolver</c>.
|
||
/// Server lock should be held on entry; released/reacquired internally for preloads.
|
||
/// </summary>
|
||
public Exception? ConfigureResolver()
|
||
{
|
||
var opts = GetOpts();
|
||
_accResolver = opts.AccountResolver;
|
||
|
||
if (opts.AccountResolver is not null && opts.ResolverPreloads.Count > 0)
|
||
{
|
||
var ar = _accResolver!;
|
||
if (ar.IsReadOnly())
|
||
return new InvalidOperationException(
|
||
"resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR");
|
||
|
||
foreach (var (k, v) in opts.ResolverPreloads)
|
||
{
|
||
// Validate JWT format (stub — session 06 has JWT decoder).
|
||
// jwt.DecodeAccountClaims(v) — skip here, checked again in CheckResolvePreloads.
|
||
ar.StoreAsync(k, v).GetAwaiter().GetResult();
|
||
}
|
||
}
|
||
return null;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Validates preloaded resolver JWT claims and logs warnings.
|
||
/// Mirrors Go <c>Server.checkResolvePreloads</c>.
|
||
/// </summary>
|
||
public void CheckResolvePreloads()
|
||
{
|
||
var opts = GetOpts();
|
||
foreach (var (k, _) in opts.ResolverPreloads)
|
||
{
|
||
// Full JWT validation deferred to session 06 JWT integration.
|
||
Debugf("Checking preloaded account [{0}]", k);
|
||
}
|
||
}
|
||
|
||
/// <summary>Returns the configured account resolver.</summary>
|
||
public IAccountResolver? AccountResolver() => _accResolver;
|
||
}
|