1407 lines
49 KiB
C#
1407 lines
49 KiB
C#
// Copyright 2012-2026 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
// Adapted from server/client.go in the NATS server Go source.
|
|
|
|
using System.Net;
|
|
using System.Net.Security;
|
|
using System.Net.Sockets;
|
|
using System.Runtime.CompilerServices;
|
|
using System.Security.Cryptography.X509Certificates;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using Microsoft.Extensions.Logging;
|
|
using ZB.MOM.NatsNet.Server.Auth;
|
|
using ZB.MOM.NatsNet.Server.Internal;
|
|
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
|
using ZB.MOM.NatsNet.Server.Protocol;
|
|
|
|
namespace ZB.MOM.NatsNet.Server;
|
|
|
|
// Wire protocol constants (also in ServerConstants; kept here for local use).
|
|
file static class Wires
|
|
{
|
|
internal const string PingProto = "PING\r\n";
|
|
internal const string PongProto = "PONG\r\n";
|
|
internal const string ErrProto = "-ERR '{0}'\r\n";
|
|
internal const string OkProto = "+OK\r\n";
|
|
internal const string MsgHead = "RMSG ";
|
|
internal const int MsgScratch = 1024;
|
|
|
|
// Buffer size tuning.
|
|
internal const int StartBufSize = 512;
|
|
internal const int MinBufSize = 64;
|
|
internal const int MaxBufSize = 65536;
|
|
internal const int ShortsToShrink = 2;
|
|
internal const int MaxFlushPending = 10;
|
|
internal const int MaxVectorSize = 1024; // IOV_MAX
|
|
|
|
internal static readonly TimeSpan ReadLoopReport = TimeSpan.FromSeconds(2);
|
|
internal static readonly TimeSpan MaxNoRttPingBeforePong = TimeSpan.FromSeconds(2);
|
|
internal static readonly TimeSpan StallMin = TimeSpan.FromMilliseconds(2);
|
|
internal static readonly TimeSpan StallMax = TimeSpan.FromMilliseconds(5);
|
|
internal static readonly TimeSpan StallTotal = TimeSpan.FromMilliseconds(10);
|
|
|
|
// Cache / pruning limits.
|
|
internal const int MaxResultCacheSize = 512;
|
|
internal const int MaxDenyPermCacheSize = 256;
|
|
internal const int MaxPermCacheSize = 128;
|
|
internal const int PruneSize = 32;
|
|
internal const int RouteTargetInit = 8;
|
|
internal const int ReplyPermLimit = 4096;
|
|
internal static readonly TimeSpan ReplyPruneTime = TimeSpan.FromSeconds(1);
|
|
|
|
// Per-account cache defaults.
|
|
internal const int MaxPerAccountCacheSize = 8192;
|
|
internal static readonly TimeSpan ClosedSubsCheckInterval = TimeSpan.FromMinutes(5);
|
|
|
|
// TLS handshake client type tags.
|
|
internal const string TlsHandshakeLeaf = "leafnode";
|
|
internal const string TlsHandshakeMqtt = "mqtt";
|
|
|
|
// Allowed-connection-type group used in deny-list checks.
|
|
internal const string SysGroup = "_sys_";
|
|
|
|
// Message header status line bytes (UTF-8, immutable).
|
|
internal static readonly byte[] HdrLineBytes = Encoding.ASCII.GetBytes(NatsHeaderConstants.HdrLine);
|
|
internal static readonly byte[] EmptyHdrLineBytes = Encoding.ASCII.GetBytes(NatsHeaderConstants.EmptyHdrLine);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Represents an individual client connection to the NATS server.
|
|
/// Mirrors Go <c>client</c> struct and all its methods from server/client.go.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// This is the central networking class — every connected client (NATS, MQTT, WebSocket,
|
|
/// route, gateway, leaf node, or internal) has one instance.
|
|
/// </remarks>
|
|
public sealed partial class ClientConnection
|
|
{
|
|
// =========================================================================
|
|
// Fields — mirrors Go client struct
|
|
// =========================================================================
|
|
|
|
private readonly Lock _mu = new(); // mirrors c.mu sync.Mutex
|
|
|
|
// Connection kind and server references.
|
|
internal ClientKind Kind; // mirrors c.kind
|
|
internal INatsServer? Server; // mirrors c.srv
|
|
internal INatsAccount? Account; // mirrors c.acc
|
|
internal ClientPermissions? Perms; // mirrors c.perms
|
|
internal MsgDeny? MPerms; // mirrors c.mperms
|
|
|
|
// Connection identity.
|
|
internal ulong Cid; // mirrors c.cid
|
|
internal byte[]? Nonce; // mirrors c.nonce
|
|
internal string PubKey = string.Empty; // mirrors c.pubKey
|
|
internal string Host = string.Empty; // mirrors c.host
|
|
internal ushort Port; // mirrors c.port
|
|
internal string NameTag = string.Empty; // mirrors c.nameTag
|
|
internal string ProxyKey = string.Empty; // mirrors c.proxyKey
|
|
|
|
// Client options (from CONNECT message).
|
|
internal ClientOptions Opts = ClientOptions.Default;
|
|
|
|
// Flags and state.
|
|
internal ClientFlags Flags; // mirrors c.flags clientFlag
|
|
internal bool Trace; // mirrors c.trace
|
|
internal bool Echo = true; // mirrors c.echo
|
|
internal bool NoIcb; // mirrors c.noIcb
|
|
internal bool InProc; // mirrors c.iproc (in-process connection)
|
|
internal bool Headers; // mirrors c.headers
|
|
|
|
// Limits (int32 allows atomic access).
|
|
private int _mpay; // mirrors c.mpay — max payload (signed, jwt.NoLimit = -1)
|
|
private int _msubs; // mirrors c.msubs — max subscriptions
|
|
private int _mcl; // mirrors c.mcl — max control line
|
|
|
|
// Subscriptions.
|
|
internal Dictionary<string, Subscription> Subs = new(StringComparer.Ordinal);
|
|
internal Dictionary<string, RespEntry>? Replies;
|
|
internal Dictionary<ClientConnection, bool>? Pcd; // pending clients with data to flush
|
|
internal Dictionary<string, bool>? DArray; // denied subscribe patterns
|
|
|
|
// Outbound state (simplified — full write loop ported when Server is available).
|
|
internal long OutPb; // pending bytes
|
|
internal long OutMp; // max pending snapshot
|
|
internal TimeSpan OutWdl; // write deadline snapshot
|
|
|
|
// Timing.
|
|
internal DateTime Start;
|
|
internal DateTime Last;
|
|
internal DateTime LastIn;
|
|
internal DateTime Expires;
|
|
internal TimeSpan Rtt;
|
|
internal DateTime RttStart;
|
|
internal DateTime LastReplyPrune;
|
|
internal ushort RepliesSincePrune;
|
|
|
|
// Scratch buffer for processMsg calls.
|
|
// Initialised with "RMSG " bytes.
|
|
internal byte[] Msgb = new byte[Wires.MsgScratch];
|
|
|
|
// Auth error override.
|
|
internal Exception? AuthErr;
|
|
|
|
// Network connection (null for in-process).
|
|
private Stream? _nc;
|
|
private string _ncs = string.Empty; // cached string representation (mirrors c.ncs atomic.Value)
|
|
|
|
// Parse state (shared with ProtocolParser).
|
|
internal ParseContext ParseCtx = new();
|
|
|
|
// Remote reply tracking.
|
|
private RrTracking? _rrTracking;
|
|
|
|
// Timers.
|
|
private Timer? _atmr; // auth timer
|
|
private Timer? _pingTimer;
|
|
private Timer? _tlsTo;
|
|
private Timer? _expTimer;
|
|
|
|
// Ping state.
|
|
private int _pingOut; // outstanding pings
|
|
|
|
// Connection string (cached for logging).
|
|
private string _connStr = string.Empty;
|
|
|
|
// Read cache (per-read-loop state).
|
|
private ReadCacheState _in;
|
|
|
|
// =========================================================================
|
|
// Constructor
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Creates a new client connection.
|
|
/// Callers should invoke <see cref="InitClient"/> after creation.
|
|
/// </summary>
|
|
public ClientConnection(ClientKind kind, INatsServer? server = null, Stream? nc = null)
|
|
{
|
|
Kind = kind;
|
|
Server = server;
|
|
_nc = nc;
|
|
|
|
// Initialise scratch buffer with "RMSG " bytes.
|
|
Msgb[0] = (byte)'R'; Msgb[1] = (byte)'M';
|
|
Msgb[2] = (byte)'S'; Msgb[3] = (byte)'G';
|
|
Msgb[4] = (byte)' ';
|
|
}
|
|
|
|
// =========================================================================
|
|
// String / identity (features 398-400)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Returns the cached connection string identifier.
|
|
/// Mirrors Go <c>client.String()</c>.
|
|
/// </summary>
|
|
public override string ToString() => _ncs;
|
|
|
|
/// <summary>
|
|
/// Returns the nonce presented to the client during connection.
|
|
/// Mirrors Go <c>client.GetNonce()</c>.
|
|
/// </summary>
|
|
public byte[]? GetNonce()
|
|
{
|
|
lock (_mu) { return Nonce; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the application-supplied name for this connection.
|
|
/// Mirrors Go <c>client.GetName()</c>.
|
|
/// </summary>
|
|
public string GetName()
|
|
{
|
|
lock (_mu) { return Opts.Name; }
|
|
}
|
|
|
|
/// <summary>Returns the client options. Mirrors Go <c>client.GetOpts()</c>.</summary>
|
|
public ClientOptions GetOpts() => Opts;
|
|
|
|
// =========================================================================
|
|
// TLS (feature 402)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Returns TLS connection state if the connection is TLS-secured, otherwise <c>null</c>.
|
|
/// Mirrors Go <c>client.GetTLSConnectionState()</c>.
|
|
/// </summary>
|
|
public SslStream? GetTlsStream()
|
|
{
|
|
lock (_mu) { return _nc as SslStream; }
|
|
}
|
|
|
|
// =========================================================================
|
|
// Client type classification (features 403-404)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Returns the extended client type for CLIENT-kind connections.
|
|
/// Mirrors Go <c>client.clientType()</c>.
|
|
/// </summary>
|
|
public ClientConnectionType ClientType()
|
|
{
|
|
if (Kind != ClientKind.Client) return ClientConnectionType.NonClient;
|
|
if (IsMqtt()) return ClientConnectionType.Mqtt;
|
|
if (IsWebSocket()) return ClientConnectionType.WebSocket;
|
|
return ClientConnectionType.Nats;
|
|
}
|
|
|
|
private static readonly Dictionary<ClientConnectionType, string> ClientTypeStringMap = new()
|
|
{
|
|
[ClientConnectionType.NonClient] = string.Empty,
|
|
[ClientConnectionType.Nats] = "nats",
|
|
[ClientConnectionType.WebSocket] = "websocket",
|
|
[ClientConnectionType.Mqtt] = "mqtt",
|
|
};
|
|
|
|
internal string ClientTypeString() =>
|
|
ClientTypeStringMap.TryGetValue(ClientType(), out var s) ? s : string.Empty;
|
|
|
|
// =========================================================================
|
|
// Subscription.close / isClosed (features 405-406)
|
|
// (These are on the Subscription type; see Internal/Subscription.cs)
|
|
// =========================================================================
|
|
|
|
// =========================================================================
|
|
// Trace level (feature 407)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Updates the trace flag based on server logging settings.
|
|
/// Mirrors Go <c>client.setTraceLevel()</c>.
|
|
/// </summary>
|
|
internal void SetTraceLevel()
|
|
{
|
|
if (Server is null) { Trace = false; return; }
|
|
Trace = Kind == ClientKind.System
|
|
? Server.TraceSysAcc
|
|
: Server.TraceEnabled;
|
|
}
|
|
|
|
// =========================================================================
|
|
// initClient (feature 408)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Initialises connection state after the client struct is created.
|
|
/// Must be called with <c>_mu</c> held.
|
|
/// Mirrors Go <c>client.initClient()</c>.
|
|
/// </summary>
|
|
internal void InitClient()
|
|
{
|
|
if (Server is not null)
|
|
Cid = Server.NextClientId();
|
|
|
|
// Snapshot options from server.
|
|
if (Server is not null)
|
|
{
|
|
var opts = Server.Options;
|
|
OutWdl = opts.WriteDeadline;
|
|
OutMp = opts.MaxPending;
|
|
_mcl = opts.MaxControlLine > 0 ? opts.MaxControlLine : ServerConstants.MaxControlLineSize;
|
|
}
|
|
else
|
|
{
|
|
_mcl = ServerConstants.MaxControlLineSize;
|
|
}
|
|
|
|
Subs = new Dictionary<string, Subscription>(StringComparer.Ordinal);
|
|
Pcd = new Dictionary<ClientConnection, bool>();
|
|
Echo = true;
|
|
|
|
SetTraceLevel();
|
|
|
|
// Scratch buffer "RMSG " prefix.
|
|
Msgb[0] = (byte)'R'; Msgb[1] = (byte)'M';
|
|
Msgb[2] = (byte)'S'; Msgb[3] = (byte)'G';
|
|
Msgb[4] = (byte)' ';
|
|
|
|
// Snapshot connection string.
|
|
if (_nc is not null)
|
|
{
|
|
var addr = GetRemoteEndPoint();
|
|
if (addr is not null)
|
|
{
|
|
var conn = addr.ToString() ?? string.Empty;
|
|
if (conn.Length > 0)
|
|
{
|
|
var parts = conn.Split(':', 2);
|
|
if (parts.Length == 2)
|
|
{
|
|
Host = parts[0];
|
|
if (ushort.TryParse(parts[1], out var p)) Port = p;
|
|
}
|
|
_connStr = conn.Replace("%", "%%");
|
|
}
|
|
}
|
|
}
|
|
|
|
_ncs = Kind switch
|
|
{
|
|
ClientKind.Client when ClientType() == ClientConnectionType.Nats =>
|
|
$"{_connStr} - cid:{Cid}",
|
|
ClientKind.Client when ClientType() == ClientConnectionType.WebSocket =>
|
|
$"{_connStr} - wid:{Cid}",
|
|
ClientKind.Client =>
|
|
$"{_connStr} - mid:{Cid}",
|
|
ClientKind.Router => $"{_connStr} - rid:{Cid}",
|
|
ClientKind.Gateway => $"{_connStr} - gid:{Cid}",
|
|
ClientKind.Leaf => $"{_connStr} - lid:{Cid}",
|
|
ClientKind.System => "SYSTEM",
|
|
ClientKind.JetStream => "JETSTREAM",
|
|
ClientKind.Account => "ACCOUNT",
|
|
_ => _connStr,
|
|
};
|
|
}
|
|
|
|
// =========================================================================
|
|
// RemoteAddress (feature 409)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Returns the remote network address of the connection, or <c>null</c>.
|
|
/// Mirrors Go <c>client.RemoteAddress()</c>.
|
|
/// </summary>
|
|
public EndPoint? RemoteAddress() => RemoteAddr();
|
|
|
|
private EndPoint? GetRemoteEndPoint()
|
|
{
|
|
if (_nc is NetworkStream ns)
|
|
{
|
|
try { return ns.Socket.RemoteEndPoint; }
|
|
catch { return null; }
|
|
}
|
|
return null;
|
|
}
|
|
|
|
// =========================================================================
|
|
// Account registration (features 410-417)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Reports an error when registering with an account.
|
|
/// Mirrors Go <c>client.reportErrRegisterAccount()</c>.
|
|
/// </summary>
|
|
internal void ReportErrRegisterAccount(INatsAccount acc, Exception err)
|
|
{
|
|
if (err is TooManyAccountConnectionsException)
|
|
{
|
|
MaxAccountConnExceeded();
|
|
return;
|
|
}
|
|
Errorf("Problem registering with account %q: %s", acc.Name, err.Message);
|
|
SendErr("Failed Account Registration");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the client kind. Mirrors Go <c>client.Kind()</c>.
|
|
/// </summary>
|
|
public ClientKind GetKind()
|
|
{
|
|
lock (_mu) { return Kind; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registers this client with an account.
|
|
/// Mirrors Go <c>client.registerWithAccount()</c>.
|
|
/// </summary>
|
|
internal void RegisterWithAccount(INatsAccount acc)
|
|
{
|
|
if (acc is null) throw new BadAccountException();
|
|
if (!acc.IsValid) throw new BadAccountException();
|
|
|
|
// Deregister from previous account.
|
|
if (Account is not null)
|
|
{
|
|
var prev = Account.RemoveClient(this);
|
|
if (prev == 1) Server?.DecActiveAccounts();
|
|
}
|
|
|
|
lock (_mu)
|
|
{
|
|
Account = acc;
|
|
ApplyAccountLimits();
|
|
}
|
|
|
|
// Check max connection limits.
|
|
if (Kind == ClientKind.Client && acc.MaxTotalConnectionsReached())
|
|
throw new TooManyAccountConnectionsException();
|
|
|
|
if (Kind == ClientKind.Leaf && acc.MaxTotalLeafNodesReached())
|
|
throw new TooManyAccountConnectionsException();
|
|
|
|
// Add to new account.
|
|
var added = acc.AddClient(this);
|
|
if (added == 0) Server?.IncActiveAccounts();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns true if the subscription limit has been reached.
|
|
/// Mirrors Go <c>client.subsAtLimit()</c>.
|
|
/// </summary>
|
|
internal bool SubsAtLimit() =>
|
|
_msubs != JwtNoLimit && Subs.Count >= _msubs;
|
|
|
|
// JwtNoLimit mirrors jwt.NoLimit in Go (-1 cast to int32).
|
|
private const int JwtNoLimit = -1;
|
|
|
|
/// <summary>
|
|
/// Atomically applies the minimum of two int32 limits.
|
|
/// Mirrors Go <c>minLimit</c>.
|
|
/// </summary>
|
|
private static bool MinLimit(ref int value, int limit)
|
|
{
|
|
int v = Volatile.Read(ref value);
|
|
if (v != JwtNoLimit)
|
|
{
|
|
if (limit != JwtNoLimit && limit < v)
|
|
{
|
|
Volatile.Write(ref value, limit);
|
|
return true;
|
|
}
|
|
}
|
|
else if (limit != JwtNoLimit)
|
|
{
|
|
Volatile.Write(ref value, limit);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Applies account-level connection limits to this client.
|
|
/// Lock is held on entry.
|
|
/// Mirrors Go <c>client.applyAccountLimits()</c>.
|
|
/// </summary>
|
|
internal void ApplyAccountLimits()
|
|
{
|
|
if (Account is null || (Kind != ClientKind.Client && Kind != ClientKind.Leaf))
|
|
return;
|
|
|
|
Volatile.Write(ref _mpay, JwtNoLimit);
|
|
_msubs = JwtNoLimit;
|
|
|
|
// Apply server-level limits.
|
|
if (Server is not null)
|
|
{
|
|
var sOpts = Server.Options;
|
|
int mPay = sOpts.MaxPayload == 0 ? JwtNoLimit : sOpts.MaxPayload;
|
|
int mSubs = sOpts.MaxSubs == 0 ? JwtNoLimit : sOpts.MaxSubs;
|
|
MinLimit(ref _mpay, mPay);
|
|
MinLimit(ref _msubs, mSubs);
|
|
}
|
|
|
|
if (SubsAtLimit())
|
|
Task.Run(() =>
|
|
{
|
|
MaxSubsExceeded();
|
|
Task.Delay(20).Wait();
|
|
CloseConnection(ClosedState.MaxSubscriptionsExceeded);
|
|
});
|
|
}
|
|
|
|
// =========================================================================
|
|
// RegisterUser / RegisterNkeyUser (features 416-417)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Registers an authenticated user with this connection.
|
|
/// Mirrors Go <c>client.RegisterUser()</c>.
|
|
/// </summary>
|
|
public void RegisterUser(User user)
|
|
{
|
|
if (user.Account is INatsAccount acc)
|
|
{
|
|
try { RegisterWithAccount(acc); }
|
|
catch (Exception ex) { ReportErrRegisterAccount(acc, ex); return; }
|
|
}
|
|
|
|
lock (_mu)
|
|
{
|
|
Perms = user.Permissions is not null ? BuildPermissions(user.Permissions) : null;
|
|
MPerms = null;
|
|
if (user.Username.Length > 0)
|
|
Opts.Username = user.Username;
|
|
if (user.ConnectionDeadline != default)
|
|
SetExpirationTimerUnlocked(user.ConnectionDeadline - DateTime.UtcNow);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registers an NKey-authenticated user.
|
|
/// Mirrors Go <c>client.RegisterNkeyUser()</c>.
|
|
/// </summary>
|
|
public void RegisterNkeyUser(NkeyUser user)
|
|
{
|
|
if (user.Account is INatsAccount acc)
|
|
{
|
|
try { RegisterWithAccount(acc); }
|
|
catch (Exception ex) { ReportErrRegisterAccount(acc, ex); return; }
|
|
}
|
|
|
|
lock (_mu)
|
|
{
|
|
Perms = user.Permissions is not null ? BuildPermissions(user.Permissions) : null;
|
|
MPerms = null;
|
|
}
|
|
}
|
|
|
|
// =========================================================================
|
|
// splitSubjectQueue (feature 418)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Splits a "subject [queue]" string into subject and optional queue bytes.
|
|
/// Mirrors Go <c>splitSubjectQueue</c>.
|
|
/// </summary>
|
|
public static (byte[] subject, byte[]? queue) SplitSubjectQueue(string sq)
|
|
{
|
|
var vals = sq.Trim().Split((char[]?)null, StringSplitOptions.RemoveEmptyEntries);
|
|
if (vals.Length == 0)
|
|
throw new ArgumentException($"invalid subject-queue \"{sq}\"");
|
|
|
|
var subject = Encoding.ASCII.GetBytes(vals[0]);
|
|
byte[]? queue = null;
|
|
|
|
if (vals.Length == 2)
|
|
queue = Encoding.ASCII.GetBytes(vals[1]);
|
|
else if (vals.Length > 2)
|
|
throw new FormatException($"invalid subject-queue \"{sq}\"");
|
|
|
|
return (subject, queue);
|
|
}
|
|
|
|
// =========================================================================
|
|
// setPermissions / publicPermissions / mergeDenyPermissions (features 419-422)
|
|
// =========================================================================
|
|
|
|
private ClientPermissions BuildPermissions(Permissions perms)
|
|
{
|
|
var cp = new ClientPermissions();
|
|
|
|
if (perms.Publish is not null)
|
|
{
|
|
if (perms.Publish.Allow is { Count: > 0 })
|
|
{
|
|
cp.Pub.Allow = SubscriptionIndex.NewSublistWithCache();
|
|
foreach (var s in perms.Publish.Allow)
|
|
cp.Pub.Allow.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(s) });
|
|
}
|
|
if (perms.Publish.Deny is { Count: > 0 })
|
|
{
|
|
cp.Pub.Deny = SubscriptionIndex.NewSublistWithCache();
|
|
foreach (var s in perms.Publish.Deny)
|
|
cp.Pub.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(s) });
|
|
}
|
|
}
|
|
|
|
if (perms.Response is not null)
|
|
{
|
|
cp.Resp = perms.Response;
|
|
Replies = new Dictionary<string, RespEntry>(StringComparer.Ordinal);
|
|
}
|
|
|
|
if (perms.Subscribe is not null)
|
|
{
|
|
if (perms.Subscribe.Allow is { Count: > 0 })
|
|
{
|
|
cp.Sub.Allow = SubscriptionIndex.NewSublistWithCache();
|
|
foreach (var s in perms.Subscribe.Allow)
|
|
{
|
|
try
|
|
{
|
|
var (subj, q) = SplitSubjectQueue(s);
|
|
cp.Sub.Allow.Insert(new Subscription { Subject = subj, Queue = q });
|
|
}
|
|
catch (Exception ex) { Errorf("%s", ex.Message); }
|
|
}
|
|
}
|
|
if (perms.Subscribe.Deny is { Count: > 0 })
|
|
{
|
|
cp.Sub.Deny = SubscriptionIndex.NewSublistWithCache();
|
|
DArray = [];
|
|
foreach (var s in perms.Subscribe.Deny)
|
|
{
|
|
DArray.Add(s, true);
|
|
try
|
|
{
|
|
var (subj, q) = SplitSubjectQueue(s);
|
|
cp.Sub.Deny.Insert(new Subscription { Subject = subj, Queue = q });
|
|
}
|
|
catch (Exception ex) { Errorf("%s", ex.Message); }
|
|
}
|
|
}
|
|
}
|
|
|
|
return cp;
|
|
}
|
|
|
|
// =========================================================================
|
|
// setExpiration / loadMsgDenyFilter (features 423-424)
|
|
// =========================================================================
|
|
|
|
internal void SetExpirationTimer(TimeSpan d)
|
|
{
|
|
lock (_mu)
|
|
{
|
|
SetExpirationTimerUnlocked(d);
|
|
}
|
|
}
|
|
|
|
internal void SetExpirationTimerUnlocked(TimeSpan d)
|
|
{
|
|
var prev = Interlocked.Exchange(ref _expTimer, null);
|
|
prev?.Dispose();
|
|
|
|
if (d <= TimeSpan.Zero)
|
|
{
|
|
ClaimExpiration();
|
|
return;
|
|
}
|
|
|
|
Expires = DateTime.UtcNow + d;
|
|
_expTimer = new Timer(_ => ClaimExpiration(), null, d, Timeout.InfiniteTimeSpan);
|
|
}
|
|
|
|
// =========================================================================
|
|
// msgParts (feature 470)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Splits a message buffer into header and body parts.
|
|
/// Mirrors Go <c>client.msgParts()</c>.
|
|
/// </summary>
|
|
public (byte[] hdr, byte[] msg) MsgParts(byte[] buf)
|
|
{
|
|
int hdrLen = ParseCtx.Pa.HeaderSize;
|
|
|
|
// Return header slice with a capped capacity (no extra capacity beyond the header).
|
|
var hdr = buf[..hdrLen];
|
|
// Create an isolated copy so appending to hdr doesn't touch msg.
|
|
var hdrCopy = new byte[hdrLen];
|
|
Buffer.BlockCopy(buf, 0, hdrCopy, 0, hdrLen);
|
|
|
|
var msg = buf[hdrLen..];
|
|
return (hdrCopy, msg);
|
|
}
|
|
|
|
// =========================================================================
|
|
// kindString (feature 533)
|
|
// =========================================================================
|
|
|
|
private static readonly Dictionary<ClientKind, string> KindStringMap = new()
|
|
{
|
|
[ClientKind.Client] = "Client",
|
|
[ClientKind.Router] = "Router",
|
|
[ClientKind.Gateway] = "Gateway",
|
|
[ClientKind.Leaf] = "Leafnode",
|
|
[ClientKind.JetStream] = "JetStream",
|
|
[ClientKind.Account] = "Account",
|
|
[ClientKind.System] = "System",
|
|
};
|
|
|
|
/// <summary>
|
|
/// Returns a human-readable kind name.
|
|
/// Mirrors Go <c>client.kindString()</c>.
|
|
/// </summary>
|
|
internal string KindString() =>
|
|
KindStringMap.TryGetValue(Kind, out var s) ? s : "Unknown Type";
|
|
|
|
// =========================================================================
|
|
// isClosed (feature 555)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Returns true if closeConnection has been called.
|
|
/// Mirrors Go <c>client.isClosed()</c>.
|
|
/// </summary>
|
|
public bool IsClosed() => (Flags & ClientFlags.CloseConnection) != 0;
|
|
|
|
// =========================================================================
|
|
// format / formatNoClientInfo / formatClientSuffix (features 556-558)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Returns a formatted log string for this client.
|
|
/// Mirrors Go <c>client.format()</c>.
|
|
/// </summary>
|
|
internal string Format() => $"{_ncs}";
|
|
|
|
internal string FormatNoClientInfo() => _connStr;
|
|
|
|
internal string FormatClientSuffix() => $" - {KindString()}:{Cid}";
|
|
|
|
// =========================================================================
|
|
// Logging helpers (features 559-568)
|
|
// =========================================================================
|
|
|
|
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
internal void Error(string msg) => Server?.Logger.LogError("[{Client}] {Msg}", _ncs, msg);
|
|
|
|
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
internal void Errorf(string fmt, params object?[] args) =>
|
|
Server?.Logger.LogError("[{Client}] " + fmt, [_ncs, ..args]);
|
|
|
|
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
internal void Debugf(string fmt, params object?[] args) =>
|
|
Server?.Logger.LogDebug("[{Client}] " + fmt, [_ncs, ..args]);
|
|
|
|
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
internal void Noticef(string fmt, params object?[] args) =>
|
|
Server?.Logger.LogInformation("[{Client}] " + fmt, [_ncs, ..args]);
|
|
|
|
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
internal void Tracef(string fmt, params object?[] args) =>
|
|
Server?.Logger.LogTrace("[{Client}] " + fmt, [_ncs, ..args]);
|
|
|
|
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
internal void Warnf(string fmt, params object?[] args) =>
|
|
Server?.Logger.LogWarning("[{Client}] " + fmt, [_ncs, ..args]);
|
|
|
|
// =========================================================================
|
|
// Auth-related helpers (features 446-451, 526-531, 570-571)
|
|
// =========================================================================
|
|
|
|
internal void SendErrAndErr(string err) { SendErr(err); Error(err); }
|
|
internal void SendErrAndDebug(string msg){ SendErr(msg); Debugf(msg); }
|
|
|
|
internal void AuthTimeout()
|
|
{
|
|
SendErrAndDebug("Authentication Timeout");
|
|
CloseConnection(ClosedState.AuthenticationTimeout);
|
|
}
|
|
|
|
internal void AuthExpired()
|
|
{
|
|
SendErrAndDebug("Authorization Expired");
|
|
CloseConnection(ClosedState.AuthenticationExpired);
|
|
}
|
|
|
|
internal void AccountAuthExpired()
|
|
{
|
|
SendErrAndDebug("Account authorization expired");
|
|
CloseConnection(ClosedState.AuthenticationExpired);
|
|
}
|
|
|
|
internal void AuthViolation()
|
|
{
|
|
SendErrAndErr(ServerErrors.ErrAuthorization.Message);
|
|
CloseConnection(ClosedState.AuthenticationViolation);
|
|
}
|
|
|
|
internal void MaxAccountConnExceeded()
|
|
{
|
|
SendErrAndErr(ServerErrors.ErrTooManyAccountConnections.Message);
|
|
CloseConnection(ClosedState.MaxAccountConnectionsExceeded);
|
|
}
|
|
|
|
internal void MaxConnExceeded()
|
|
{
|
|
SendErrAndErr(ServerErrors.ErrTooManyConnections.Message);
|
|
CloseConnection(ClosedState.MaxConnectionsExceeded);
|
|
}
|
|
|
|
internal void MaxSubsExceeded()
|
|
{
|
|
Errorf("Maximum Subscriptions Exceeded (max=%d)", _msubs);
|
|
SendErr(ServerErrors.ErrTooManySubs.Message);
|
|
}
|
|
|
|
internal void MaxPayloadViolation(int sz, int max)
|
|
{
|
|
SendErrAndErr($"Maximum Payload Violation");
|
|
CloseConnection(ClosedState.MaxPayloadExceeded);
|
|
}
|
|
|
|
internal void PubPermissionViolation(string subject)
|
|
{
|
|
SendErr($"Permissions Violation for Publish to \"{subject}\"");
|
|
Errorf("Publish Violation - User %q, Subject %q", GetAuthUser(), subject);
|
|
}
|
|
|
|
internal void SubPermissionViolation(Subscription sub)
|
|
{
|
|
string subj = Encoding.UTF8.GetString(sub.Subject);
|
|
string queue = sub.Queue is { Length: > 0 } ? $" using queue \"{Encoding.UTF8.GetString(sub.Queue)}\"" : string.Empty;
|
|
SendErr($"Permissions Violation for Subscription to \"{subj}\"{queue}");
|
|
Errorf("Subscription Violation - User %q, Subject %q, SID %q",
|
|
GetAuthUser(), subj, sub.Sid is not null ? Encoding.UTF8.GetString(sub.Sid) : string.Empty);
|
|
}
|
|
|
|
internal void ReplySubjectViolation(string reply)
|
|
{
|
|
SendErr($"Permissions Violation for use of Reply subject \"{reply}\"");
|
|
Errorf("Reply Subject Violation - User %q, Reply %q", GetAuthUser(), reply);
|
|
}
|
|
|
|
internal void MaxTokensViolation(Subscription sub)
|
|
{
|
|
SendErrAndErr($"Permissions Violation for Subscription to \"{Encoding.UTF8.GetString(sub.Subject)}\"");
|
|
}
|
|
|
|
internal void SetAuthError(Exception err) { lock (_mu) { AuthErr = err; } }
|
|
internal Exception? GetAuthError() { lock (_mu) { return AuthErr; } }
|
|
|
|
// Auth credential accessors (used by NatsServer.Auth.cs)
|
|
internal string GetAuthToken() { lock (_mu) { return Opts.Token; } }
|
|
internal string GetNkey() { lock (_mu) { return Opts.Nkey; } }
|
|
internal string GetNkeySig() { lock (_mu) { return Opts.Sig; } }
|
|
internal string GetUsername() { lock (_mu) { return Opts.Username; } }
|
|
internal string GetPassword() { lock (_mu) { return Opts.Password; } }
|
|
|
|
internal X509Certificate2? GetTlsCertificate()
|
|
{
|
|
lock (_mu)
|
|
{
|
|
if (_nc is SslStream ssl)
|
|
{
|
|
var cert = ssl.RemoteCertificate;
|
|
if (cert is X509Certificate2 cert2) return cert2;
|
|
if (cert != null) return new X509Certificate2(cert);
|
|
}
|
|
return null;
|
|
}
|
|
}
|
|
|
|
internal void SetAccount(INatsAccount? acc)
|
|
{
|
|
lock (_mu) { Account = acc; }
|
|
}
|
|
|
|
internal void SetAccount(Account? acc) => SetAccount(acc as INatsAccount);
|
|
|
|
internal void SetPermissions(Auth.Permissions? perms)
|
|
{
|
|
lock (_mu)
|
|
{
|
|
if (perms != null)
|
|
Perms = BuildPermissions(perms);
|
|
}
|
|
}
|
|
|
|
// =========================================================================
|
|
// Timer helpers (features 523-531)
|
|
// =========================================================================
|
|
|
|
internal void SetPingTimer()
|
|
{
|
|
var interval = Server?.Options.PingInterval ?? TimeSpan.FromMinutes(2);
|
|
if (interval <= TimeSpan.Zero)
|
|
return;
|
|
|
|
ClearPingTimer();
|
|
_pingTimer = new Timer(_ =>
|
|
{
|
|
if (IsClosed())
|
|
return;
|
|
SendPing();
|
|
}, null, interval, interval);
|
|
}
|
|
|
|
internal void ClearPingTimer()
|
|
{
|
|
var t = Interlocked.Exchange(ref _pingTimer, null);
|
|
t?.Dispose();
|
|
}
|
|
|
|
internal void ClearTlsToTimer()
|
|
{
|
|
var t = Interlocked.Exchange(ref _tlsTo, null);
|
|
t?.Dispose();
|
|
}
|
|
|
|
internal void SetAuthTimer()
|
|
{
|
|
var timeout = Server?.Options.AuthTimeout ?? 0;
|
|
if (timeout <= 0)
|
|
return;
|
|
SetAuthTimer(TimeSpan.FromSeconds(timeout));
|
|
}
|
|
|
|
internal void ClearAuthTimer()
|
|
{
|
|
var t = Interlocked.Exchange(ref _atmr, null);
|
|
t?.Dispose();
|
|
}
|
|
|
|
internal bool AwaitingAuth() => (Flags & ClientFlags.ExpectConnect) != 0
|
|
&& (Flags & ClientFlags.ConnectReceived) == 0;
|
|
|
|
internal void ClaimExpiration()
|
|
{
|
|
AuthExpired();
|
|
}
|
|
|
|
// =========================================================================
|
|
// flushSignal / queueOutbound / enqueueProto (features 433, 456-459)
|
|
// =========================================================================
|
|
|
|
internal void FlushSignal()
|
|
{
|
|
FlushClients(0);
|
|
}
|
|
|
|
internal void EnqueueProtoAndFlush(ReadOnlySpan<byte> proto)
|
|
{
|
|
EnqueueProto(proto);
|
|
}
|
|
|
|
internal void SendProtoNow(ReadOnlySpan<byte> proto)
|
|
{
|
|
EnqueueProto(proto);
|
|
}
|
|
|
|
internal void EnqueueProto(ReadOnlySpan<byte> proto)
|
|
{
|
|
// TODO: Full write-loop queuing when Server is ported (session 09).
|
|
if (_nc is not null)
|
|
{
|
|
try { _nc.Write(proto); }
|
|
catch { /* connection errors handled by closeConnection */ }
|
|
}
|
|
}
|
|
|
|
// =========================================================================
|
|
// sendPong / sendPing / sendRTTPing (features 460-463)
|
|
// =========================================================================
|
|
|
|
internal void SendPong() => EnqueueProtoAndFlush(Encoding.ASCII.GetBytes(Wires.PongProto));
|
|
|
|
internal void SendRttPing() { lock (_mu) { SendRttPingLocked(); } }
|
|
|
|
internal void SendRttPingLocked()
|
|
{
|
|
RttStart = DateTime.UtcNow;
|
|
SendPing();
|
|
}
|
|
|
|
internal void SendPing()
|
|
{
|
|
_pingOut++;
|
|
EnqueueProtoAndFlush(Encoding.ASCII.GetBytes(Wires.PingProto));
|
|
}
|
|
|
|
// =========================================================================
|
|
// sendErr / sendOK (features 465-466)
|
|
// =========================================================================
|
|
|
|
internal void SendErr(string err) =>
|
|
EnqueueProtoAndFlush(Encoding.ASCII.GetBytes(string.Format(Wires.ErrProto, err)));
|
|
|
|
internal void SendOK()
|
|
{
|
|
if (Opts.Verbose)
|
|
EnqueueProtoAndFlush(Encoding.ASCII.GetBytes(Wires.OkProto));
|
|
}
|
|
|
|
// =========================================================================
|
|
// traceMsg / traceInOp / traceOutOp / traceOp (features 434-439)
|
|
// =========================================================================
|
|
|
|
internal void TraceMsg(byte[] msg) { if (Trace) TraceMsgInternal(msg, false, false); }
|
|
internal void TraceMsgDelivery(byte[] msg) { if (Trace) TraceMsgInternal(msg, false, true); }
|
|
internal void TraceInOp(string op, byte[] arg) { if (Trace) TraceOp("<", op, arg); }
|
|
internal void TraceOutOp(string op, byte[] arg) { if (Trace) TraceOp(">", op, arg); }
|
|
|
|
private void TraceMsgInternal(byte[] msg, bool inbound, bool delivery)
|
|
{
|
|
var dir = inbound ? "<" : ">";
|
|
var marker = delivery ? "[DELIVER]" : "[MSG]";
|
|
Tracef("{0} {1} {2}", dir, marker, Encoding.UTF8.GetString(msg));
|
|
}
|
|
private void TraceOp(string dir, string op, byte[] arg)
|
|
{
|
|
Tracef("%s %s %s", dir, op, arg is not null ? Encoding.UTF8.GetString(arg) : string.Empty);
|
|
}
|
|
|
|
// =========================================================================
|
|
// getAuthUser / getAuthUserLabel (features 550-552)
|
|
// =========================================================================
|
|
|
|
internal string GetRawAuthUserLock()
|
|
{
|
|
lock (_mu) { return GetRawAuthUser(); }
|
|
}
|
|
|
|
internal string GetRawAuthUser()
|
|
{
|
|
if (Opts.Nkey.Length > 0) return Opts.Nkey;
|
|
if (Opts.Username.Length > 0) return Opts.Username;
|
|
if (Opts.Token.Length > 0) return "Token";
|
|
return "Unknown";
|
|
}
|
|
|
|
internal string GetAuthUser() => GetRawAuthUser();
|
|
|
|
internal string GetAuthUserLabel()
|
|
{
|
|
var u = GetRawAuthUser();
|
|
return u.Length > 0 ? u : "Unknown User";
|
|
}
|
|
|
|
// =========================================================================
|
|
// connectionTypeAllowed (feature 554)
|
|
// =========================================================================
|
|
|
|
internal bool ConnectionTypeAllowed(string ct)
|
|
{
|
|
// TODO: Full implementation when JWT is integrated.
|
|
return true;
|
|
}
|
|
|
|
// =========================================================================
|
|
// closeConnection (feature 536)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Closes the client connection with the given reason.
|
|
/// Mirrors Go <c>client.closeConnection()</c>.
|
|
/// </summary>
|
|
public void CloseConnection(ClosedState reason)
|
|
{
|
|
lock (_mu)
|
|
{
|
|
if (IsClosed()) return;
|
|
Flags |= ClientFlags.CloseConnection;
|
|
ClearAuthTimer();
|
|
ClearPingTimer();
|
|
}
|
|
|
|
// Close the underlying network connection.
|
|
try { _nc?.Close(); } catch { /* ignore */ }
|
|
_nc = null;
|
|
}
|
|
|
|
// =========================================================================
|
|
// flushAndClose (feature 532)
|
|
// =========================================================================
|
|
|
|
internal void FlushAndClose(bool deadlineExceeded)
|
|
{
|
|
CloseConnection(ClosedState.ClientClosed);
|
|
}
|
|
|
|
// =========================================================================
|
|
// setNoReconnect (feature 538)
|
|
// =========================================================================
|
|
|
|
internal void SetNoReconnect()
|
|
{
|
|
lock (_mu) { Flags |= ClientFlags.NoReconnect; }
|
|
}
|
|
|
|
// =========================================================================
|
|
// getRTTValue (feature 539)
|
|
// =========================================================================
|
|
|
|
internal TimeSpan GetRttValue()
|
|
{
|
|
lock (_mu) { return Rtt; }
|
|
}
|
|
|
|
// =========================================================================
|
|
// Account / server helpers (features 540-545)
|
|
// =========================================================================
|
|
|
|
internal INatsAccount? GetAccount()
|
|
{
|
|
lock (_mu) { return Account; }
|
|
}
|
|
|
|
// =========================================================================
|
|
// TLS handshake helpers (features 546-548)
|
|
// =========================================================================
|
|
|
|
internal async Task<bool> DoTlsServerHandshakeAsync(SslServerAuthenticationOptions opts, CancellationToken ct = default)
|
|
{
|
|
// TODO: Full TLS when Server is ported.
|
|
return false;
|
|
}
|
|
|
|
internal async Task<bool> DoTlsClientHandshakeAsync(SslClientAuthenticationOptions opts, CancellationToken ct = default)
|
|
{
|
|
// TODO: Full TLS when Server is ported.
|
|
return false;
|
|
}
|
|
|
|
// =========================================================================
|
|
// Stub methods for server-dependent features
|
|
// (Fully implemented when Server/Account sessions are complete)
|
|
// =========================================================================
|
|
|
|
// features 425-427: writeLoop / flushClients / readLoop
|
|
internal void WriteLoop() => FlushClients(long.MaxValue);
|
|
internal void FlushClients(long budget)
|
|
{
|
|
try { _nc?.Flush(); }
|
|
catch { /* no-op for now */ }
|
|
}
|
|
internal void ReadLoop(byte[]? pre)
|
|
{
|
|
LastIn = DateTime.UtcNow;
|
|
if (pre is { Length: > 0 })
|
|
TraceInOp("PRE", pre);
|
|
}
|
|
|
|
// =========================================================================
|
|
// Parser compatibility wrappers (features 2588, 2590, 2591)
|
|
// =========================================================================
|
|
|
|
/// <summary>
|
|
/// Parses protocol bytes using the shared parser state for this connection.
|
|
/// Mirrors Go <c>client.parse</c>.
|
|
/// </summary>
|
|
internal Exception? Parse(byte[] buf, IProtocolHandler handler)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(buf);
|
|
ArgumentNullException.ThrowIfNull(handler);
|
|
|
|
ParseCtx.Kind = Kind;
|
|
ParseCtx.HasHeaders = Headers;
|
|
if (_mcl > 0)
|
|
ParseCtx.MaxControlLine = _mcl;
|
|
if (_mpay != 0)
|
|
ParseCtx.MaxPayload = _mpay;
|
|
|
|
return ProtocolParser.Parse(ParseCtx, handler, buf);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Checks max control line enforcement for the current connection kind.
|
|
/// Mirrors Go <c>client.overMaxControlLineLimit</c>.
|
|
/// </summary>
|
|
internal Exception? OverMaxControlLineLimit(byte[] arg, int mcl, IProtocolHandler handler)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(arg);
|
|
ArgumentNullException.ThrowIfNull(handler);
|
|
|
|
ParseCtx.Kind = Kind;
|
|
return ProtocolParser.OverMaxControlLineLimit(ParseCtx, handler, arg, mcl);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Re-processes stored pub args for split-buffer message payload handling.
|
|
/// Mirrors Go <c>client.clonePubArg</c>.
|
|
/// </summary>
|
|
internal Exception? ClonePubArg(IProtocolHandler handler, bool lmsg)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(handler);
|
|
|
|
ParseCtx.Kind = Kind;
|
|
ParseCtx.HasHeaders = Headers;
|
|
if (_mpay != 0)
|
|
ParseCtx.MaxPayload = _mpay;
|
|
|
|
return ProtocolParser.ClonePubArg(ParseCtx, handler, lmsg);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Generates the INFO JSON bytes sent to the client on connect.
|
|
/// Stub — full implementation in session 09.
|
|
/// Mirrors Go <c>client.generateClientInfoJSON()</c>.
|
|
/// </summary>
|
|
internal ReadOnlyMemory<byte> GenerateClientInfoJSON(ServerInfo info, bool includeClientIp)
|
|
=> ReadOnlyMemory<byte>.Empty;
|
|
|
|
/// <summary>
|
|
/// Sets the auth-timeout timer to the specified duration.
|
|
/// Mirrors Go <c>client.setAuthTimer(d)</c>.
|
|
/// </summary>
|
|
internal void SetAuthTimer(TimeSpan d)
|
|
{
|
|
var prev = Interlocked.Exchange(ref _atmr, null);
|
|
prev?.Dispose();
|
|
if (d <= TimeSpan.Zero)
|
|
return;
|
|
_atmr = new Timer(_ => AuthTimeout(), null, d, Timeout.InfiniteTimeSpan);
|
|
}
|
|
|
|
// features 428-432: closedStateForErr, collapsePtoNB, flushOutbound, handleWriteTimeout, markConnAsClosed
|
|
internal static ClosedState ClosedStateForErr(Exception err) =>
|
|
err is EndOfStreamException ? ClosedState.ClientClosed : ClosedState.ReadError;
|
|
|
|
// features 440-441: processInfo, processErr
|
|
internal void ProcessInfo(string info)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(info))
|
|
return;
|
|
Debugf("INFO {0}", info);
|
|
}
|
|
internal void ProcessErr(string err)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(err))
|
|
return;
|
|
SetAuthError(new InvalidOperationException(err));
|
|
Errorf("-ERR {0}", err);
|
|
}
|
|
|
|
// features 442-443: removeSecretsFromTrace, redact
|
|
// Delegates to ServerLogging.RemoveSecretsFromTrace (the real implementation lives there).
|
|
internal static string RemoveSecretsFromTrace(string s) => ServerLogging.RemoveSecretsFromTrace(s);
|
|
internal static string Redact(string s) => s;
|
|
|
|
// feature 444: computeRTT
|
|
internal static TimeSpan ComputeRtt(DateTime start) => DateTime.UtcNow - start;
|
|
|
|
// feature 445: processConnect
|
|
internal void ProcessConnect(byte[] arg)
|
|
{
|
|
if (arg == null || arg.Length == 0)
|
|
return;
|
|
|
|
try
|
|
{
|
|
var parsed = JsonSerializer.Deserialize<ClientOptions>(arg);
|
|
if (parsed != null)
|
|
{
|
|
lock (_mu)
|
|
{
|
|
Opts = parsed;
|
|
Echo = parsed.Echo;
|
|
Headers = parsed.Headers;
|
|
Flags |= ClientFlags.ConnectReceived;
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
SetAuthError(ex);
|
|
Errorf("CONNECT parse failed: {0}", ex.Message);
|
|
}
|
|
}
|
|
|
|
// feature 467-468: processPing, processPong
|
|
internal void ProcessPing()
|
|
{
|
|
_pingOut = 0;
|
|
SendPong();
|
|
}
|
|
|
|
internal void ProcessPong()
|
|
{
|
|
Rtt = ComputeRtt(RttStart);
|
|
_pingOut = 0;
|
|
}
|
|
|
|
// feature 469: updateS2AutoCompressionLevel
|
|
internal void UpdateS2AutoCompressionLevel()
|
|
{
|
|
// Placeholder for adaptive compression tuning; keep no-op semantics for now.
|
|
if (_pingOut < 0)
|
|
_pingOut = 0;
|
|
}
|
|
|
|
// features 471-486: processPub variants, parseSub, processSub, etc.
|
|
// Implemented in full when Server+Account sessions complete.
|
|
|
|
// features 487-503: deliverMsg, addToPCD, trackRemoteReply, pruning, pubAllowed, etc.
|
|
|
|
// features 512-514: processServiceImport, addSubToRouteTargets, processMsgResults
|
|
|
|
// feature 515: checkLeafClientInfoHeader
|
|
// feature 520-522: processPingTimer, adjustPingInterval, watchForStaleConnection
|
|
// feature 534-535: swapAccountAfterReload, processSubsOnConfigReload
|
|
// feature 537: reconnect
|
|
// feature 569: setFirstPingTimer
|
|
|
|
// =========================================================================
|
|
// IsMqtt / IsWebSocket helpers (used by clientType, not separately tracked)
|
|
// =========================================================================
|
|
|
|
internal bool IsMqtt() => false; // TODO: set in session 22 (MQTT)
|
|
internal bool IsWebSocket() => false; // TODO: set in session 23 (WebSocket)
|
|
internal bool IsHubLeafNode() => false; // TODO: set in session 15 (leaf nodes)
|
|
internal string RemoteCluster() => string.Empty; // TODO: session 14/15
|
|
}
|
|
|
|
// ============================================================================
|
|
// Private read-cache state (per-readLoop invocation)
|
|
// ============================================================================
|
|
|
|
internal struct ReadCacheState
|
|
{
|
|
public ulong GenId;
|
|
public Dictionary<string, SubscriptionIndexResult>? Results;
|
|
public Dictionary<string, PerAccountCache>? PaCache;
|
|
public List<RouteTarget>? Rts;
|
|
public int Msgs;
|
|
public int Bytes;
|
|
public int Subs;
|
|
public int Rsz; // read buffer size
|
|
public int Srs; // short reads
|
|
public ReadCacheFlags Flags;
|
|
public DateTime Start;
|
|
public TimeSpan Tst; // total stall time
|
|
}
|
|
|
|
internal sealed class PerAccountCache
|
|
{
|
|
public INatsAccount? Acc { get; set; }
|
|
public SubscriptionIndexResult? Results { get; set; }
|
|
public ulong GenId { get; set; }
|
|
}
|
|
|
|
internal sealed class RrTracking
|
|
{
|
|
public Dictionary<string, object>? RMap { get; set; }
|
|
public Timer? Ptmr { get; set; }
|
|
public TimeSpan Lrt { get; set; }
|
|
}
|
|
|
|
// ============================================================================
|
|
// Server / account interfaces (stubs until sessions 09 and 11)
|
|
// ============================================================================
|
|
|
|
/// <summary>
|
|
/// Minimal server interface used by ClientConnection.
|
|
/// Full implementation in session 09 (server.go).
|
|
/// </summary>
|
|
public interface INatsServer
|
|
{
|
|
ulong NextClientId();
|
|
ServerOptions Options { get; }
|
|
bool TraceEnabled { get; }
|
|
bool TraceSysAcc { get; }
|
|
ILogger Logger { get; }
|
|
void DecActiveAccounts();
|
|
void IncActiveAccounts();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Minimal account interface used by ClientConnection.
|
|
/// Full implementation in session 11 (accounts.go).
|
|
/// </summary>
|
|
public interface INatsAccount
|
|
{
|
|
string Name { get; }
|
|
bool IsValid { get; }
|
|
bool MaxTotalConnectionsReached();
|
|
bool MaxTotalLeafNodesReached();
|
|
int AddClient(ClientConnection c);
|
|
int RemoveClient(ClientConnection c);
|
|
}
|
|
|
|
/// <summary>Thrown when account connection limits are exceeded.</summary>
|
|
public sealed class TooManyAccountConnectionsException : Exception
|
|
{
|
|
public TooManyAccountConnectionsException() : base("Too Many Account Connections") { }
|
|
}
|
|
|
|
/// <summary>Thrown when an account is invalid or null.</summary>
|
|
public sealed class BadAccountException : Exception
|
|
{
|
|
public BadAccountException() : base("Bad Account") { }
|
|
}
|