// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/client.go in the NATS server Go source.
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
using ZB.MOM.NatsNet.Server.Protocol;
namespace ZB.MOM.NatsNet.Server;
// Wire protocol constants (also in ServerConstants; kept here for local use).
file static class Wires
{
internal const string PingProto = "PING\r\n";
internal const string PongProto = "PONG\r\n";
internal const string ErrProto = "-ERR '{0}'\r\n";
internal const string OkProto = "+OK\r\n";
internal const string MsgHead = "RMSG ";
internal const int MsgScratch = 1024;
// Buffer size tuning.
internal const int StartBufSize = 512;
internal const int MinBufSize = 64;
internal const int MaxBufSize = 65536;
internal const int ShortsToShrink = 2;
internal const int MaxFlushPending = 10;
internal const int MaxVectorSize = 1024; // IOV_MAX
internal static readonly TimeSpan ReadLoopReport = TimeSpan.FromSeconds(2);
internal static readonly TimeSpan MaxNoRttPingBeforePong = TimeSpan.FromSeconds(2);
internal static readonly TimeSpan StallMin = TimeSpan.FromMilliseconds(2);
internal static readonly TimeSpan StallMax = TimeSpan.FromMilliseconds(5);
internal static readonly TimeSpan StallTotal = TimeSpan.FromMilliseconds(10);
// Cache / pruning limits.
internal const int MaxResultCacheSize = 512;
internal const int MaxDenyPermCacheSize = 256;
internal const int MaxPermCacheSize = 128;
internal const int PruneSize = 32;
internal const int RouteTargetInit = 8;
internal const int ReplyPermLimit = 4096;
internal static readonly TimeSpan ReplyPruneTime = TimeSpan.FromSeconds(1);
// Per-account cache defaults.
internal const int MaxPerAccountCacheSize = 8192;
internal static readonly TimeSpan ClosedSubsCheckInterval = TimeSpan.FromMinutes(5);
// TLS handshake client type tags.
internal const string TlsHandshakeLeaf = "leafnode";
internal const string TlsHandshakeMqtt = "mqtt";
// Allowed-connection-type group used in deny-list checks.
internal const string SysGroup = "_sys_";
// Message header status line bytes (UTF-8, immutable).
internal static readonly byte[] HdrLineBytes = Encoding.ASCII.GetBytes(NatsHeaderConstants.HdrLine);
internal static readonly byte[] EmptyHdrLineBytes = Encoding.ASCII.GetBytes(NatsHeaderConstants.EmptyHdrLine);
}
///
/// Represents an individual client connection to the NATS server.
/// Mirrors Go client struct and all its methods from server/client.go.
///
///
/// This is the central networking class — every connected client (NATS, MQTT, WebSocket,
/// route, gateway, leaf node, or internal) has one instance.
///
public sealed partial class ClientConnection
{
// =========================================================================
// Fields — mirrors Go client struct
// =========================================================================
private readonly Lock _mu = new(); // mirrors c.mu sync.Mutex
// Connection kind and server references.
internal ClientKind Kind; // mirrors c.kind
internal INatsServer? Server; // mirrors c.srv
internal INatsAccount? Account; // mirrors c.acc
internal ClientPermissions? Perms; // mirrors c.perms
internal MsgDeny? MPerms; // mirrors c.mperms
// Connection identity.
internal ulong Cid; // mirrors c.cid
internal byte[]? Nonce; // mirrors c.nonce
internal string PubKey = string.Empty; // mirrors c.pubKey
internal string Host = string.Empty; // mirrors c.host
internal ushort Port; // mirrors c.port
internal string NameTag = string.Empty; // mirrors c.nameTag
internal string ProxyKey = string.Empty; // mirrors c.proxyKey
// Client options (from CONNECT message).
internal ClientOptions Opts = ClientOptions.Default;
// Flags and state.
internal ClientFlags Flags; // mirrors c.flags clientFlag
internal bool Trace; // mirrors c.trace
internal bool Echo = true; // mirrors c.echo
internal bool NoIcb; // mirrors c.noIcb
internal bool InProc; // mirrors c.iproc (in-process connection)
internal bool Headers; // mirrors c.headers
// Limits (int32 allows atomic access).
private int _mpay; // mirrors c.mpay — max payload (signed, jwt.NoLimit = -1)
private int _msubs; // mirrors c.msubs — max subscriptions
private int _mcl; // mirrors c.mcl — max control line
// Subscriptions.
internal Dictionary Subs = new(StringComparer.Ordinal);
internal Dictionary? Replies;
internal Dictionary? Pcd; // pending clients with data to flush
internal Dictionary? DArray; // denied subscribe patterns
// Outbound state (simplified — full write loop ported when Server is available).
internal long OutPb; // pending bytes
internal long OutMp; // max pending snapshot
internal TimeSpan OutWdl; // write deadline snapshot
// Timing.
internal DateTime Start;
internal DateTime Last;
internal DateTime LastIn;
internal DateTime Expires;
internal TimeSpan Rtt;
internal DateTime RttStart;
internal DateTime LastReplyPrune;
internal ushort RepliesSincePrune;
// Scratch buffer for processMsg calls.
// Initialised with "RMSG " bytes.
internal byte[] Msgb = new byte[Wires.MsgScratch];
// Auth error override.
internal Exception? AuthErr;
// Network connection (null for in-process).
private Stream? _nc;
private string _ncs = string.Empty; // cached string representation (mirrors c.ncs atomic.Value)
// Parse state (shared with ProtocolParser).
internal ParseContext ParseCtx = new();
// Remote reply tracking.
private RrTracking? _rrTracking;
// Timers.
private Timer? _atmr; // auth timer
private Timer? _pingTimer;
private Timer? _tlsTo;
private Timer? _expTimer;
// Ping state.
private int _pingOut; // outstanding pings
// Connection string (cached for logging).
private string _connStr = string.Empty;
// Read cache (per-read-loop state).
private ReadCacheState _in;
// =========================================================================
// Constructor
// =========================================================================
///
/// Creates a new client connection.
/// Callers should invoke after creation.
///
public ClientConnection(ClientKind kind, INatsServer? server = null, Stream? nc = null)
{
Kind = kind;
Server = server;
_nc = nc;
// Initialise scratch buffer with "RMSG " bytes.
Msgb[0] = (byte)'R'; Msgb[1] = (byte)'M';
Msgb[2] = (byte)'S'; Msgb[3] = (byte)'G';
Msgb[4] = (byte)' ';
}
// =========================================================================
// String / identity (features 398-400)
// =========================================================================
///
/// Returns the cached connection string identifier.
/// Mirrors Go client.String().
///
public override string ToString() => _ncs;
///
/// Returns the cached connection string identifier.
/// Mirrors Go client.String().
///
public string String() => ToString();
///
/// Returns the nonce presented to the client during connection.
/// Mirrors Go client.GetNonce().
///
public byte[]? GetNonce()
{
lock (_mu) { return Nonce; }
}
///
/// Returns the application-supplied name for this connection.
/// Mirrors Go client.GetName().
///
public string GetName()
{
lock (_mu) { return Opts.Name; }
}
/// Returns the client options. Mirrors Go client.GetOpts().
public ClientOptions GetOpts() => Opts;
// =========================================================================
// TLS (feature 402)
// =========================================================================
///
/// Returns TLS connection state if the connection is TLS-secured, otherwise null.
/// Mirrors Go client.GetTLSConnectionState().
///
public SslStream? GetTlsStream()
{
lock (_mu) { return _nc as SslStream; }
}
///
/// Returns TLS connection state if the connection is TLS-secured, otherwise null.
/// Mirrors Go client.GetTLSConnectionState().
///
public SslStream? GetTLSConnectionState() => GetTlsStream();
// =========================================================================
// Client type classification (features 403-404)
// =========================================================================
///
/// Returns the extended client type for CLIENT-kind connections.
/// Mirrors Go client.clientType().
///
public ClientConnectionType ClientType()
{
if (Kind != ClientKind.Client) return ClientConnectionType.NonClient;
if (IsMqtt()) return ClientConnectionType.Mqtt;
if (IsWebSocket()) return ClientConnectionType.WebSocket;
return ClientConnectionType.Nats;
}
private static readonly Dictionary ClientTypeStringMap = new()
{
[ClientConnectionType.NonClient] = string.Empty,
[ClientConnectionType.Nats] = "nats",
[ClientConnectionType.WebSocket] = "websocket",
[ClientConnectionType.Mqtt] = "mqtt",
};
internal string ClientTypeString() =>
ClientTypeStringMap.TryGetValue(ClientType(), out var s) ? s : string.Empty;
// =========================================================================
// Subscription.close / isClosed (features 405-406)
// (These are on the Subscription type; see Internal/Subscription.cs)
// =========================================================================
// =========================================================================
// Trace level (feature 407)
// =========================================================================
///
/// Updates the trace flag based on server logging settings.
/// Mirrors Go client.setTraceLevel().
///
internal void SetTraceLevel()
{
if (Server is null) { Trace = false; return; }
Trace = Kind == ClientKind.System
? Server.TraceSysAcc
: Server.TraceEnabled;
}
// =========================================================================
// initClient (feature 408)
// =========================================================================
///
/// Initialises connection state after the client struct is created.
/// Must be called with _mu held.
/// Mirrors Go client.initClient().
///
internal void InitClient()
{
if (Server is not null)
Cid = Server.NextClientId();
// Snapshot options from server.
if (Server is not null)
{
var opts = Server.Options;
OutWdl = opts.WriteDeadline;
OutMp = opts.MaxPending;
_mcl = opts.MaxControlLine > 0 ? opts.MaxControlLine : ServerConstants.MaxControlLineSize;
}
else
{
_mcl = ServerConstants.MaxControlLineSize;
}
Subs = new Dictionary(StringComparer.Ordinal);
Pcd = new Dictionary();
Echo = true;
SetTraceLevel();
// Scratch buffer "RMSG " prefix.
Msgb[0] = (byte)'R'; Msgb[1] = (byte)'M';
Msgb[2] = (byte)'S'; Msgb[3] = (byte)'G';
Msgb[4] = (byte)' ';
// Snapshot connection string.
if (_nc is not null)
{
var addr = GetRemoteEndPoint();
if (addr is not null)
{
var conn = addr.ToString() ?? string.Empty;
if (conn.Length > 0)
{
var parts = conn.Split(':', 2);
if (parts.Length == 2)
{
Host = parts[0];
if (ushort.TryParse(parts[1], out var p)) Port = p;
}
_connStr = conn.Replace("%", "%%");
}
}
}
_ncs = Kind switch
{
ClientKind.Client when ClientType() == ClientConnectionType.Nats =>
$"{_connStr} - cid:{Cid}",
ClientKind.Client when ClientType() == ClientConnectionType.WebSocket =>
$"{_connStr} - wid:{Cid}",
ClientKind.Client =>
$"{_connStr} - mid:{Cid}",
ClientKind.Router => $"{_connStr} - rid:{Cid}",
ClientKind.Gateway => $"{_connStr} - gid:{Cid}",
ClientKind.Leaf => $"{_connStr} - lid:{Cid}",
ClientKind.System => "SYSTEM",
ClientKind.JetStream => "JETSTREAM",
ClientKind.Account => "ACCOUNT",
_ => _connStr,
};
}
// =========================================================================
// RemoteAddress (feature 409)
// =========================================================================
///
/// Returns the remote network address of the connection, or null.
/// Mirrors Go client.RemoteAddress().
///
public EndPoint? RemoteAddress() => RemoteAddr();
private EndPoint? GetRemoteEndPoint()
{
if (_nc is NetworkStream ns)
{
try { return ns.Socket.RemoteEndPoint; }
catch { return null; }
}
return null;
}
// =========================================================================
// Account registration (features 410-417)
// =========================================================================
///
/// Reports an error when registering with an account.
/// Mirrors Go client.reportErrRegisterAccount().
///
internal void ReportErrRegisterAccount(INatsAccount acc, Exception err)
{
if (err is TooManyAccountConnectionsException)
{
MaxAccountConnExceeded();
return;
}
Errorf("Problem registering with account %q: %s", acc.Name, err.Message);
SendErr("Failed Account Registration");
}
///
/// Returns the client kind. Mirrors Go client.Kind().
///
public ClientKind GetKind()
{
lock (_mu) { return Kind; }
}
///
/// Registers this client with an account.
/// Mirrors Go client.registerWithAccount().
///
internal void RegisterWithAccount(INatsAccount acc)
{
if (acc is null) throw new BadAccountException();
if (!acc.IsValid) throw new BadAccountException();
// Deregister from previous account.
if (Account is not null)
{
var prev = Account.RemoveClient(this);
if (prev == 1) Server?.DecActiveAccounts();
}
lock (_mu)
{
Account = acc;
ApplyAccountLimits();
}
// Check max connection limits.
if (Kind == ClientKind.Client && acc.MaxTotalConnectionsReached())
throw new TooManyAccountConnectionsException();
if (Kind == ClientKind.Leaf && acc.MaxTotalLeafNodesReached())
throw new TooManyAccountConnectionsException();
// Add to new account.
var added = acc.AddClient(this);
if (added == 0) Server?.IncActiveAccounts();
}
///
/// Returns true if the subscription limit has been reached.
/// Mirrors Go client.subsAtLimit().
///
internal bool SubsAtLimit() =>
_msubs != JwtNoLimit && Subs.Count >= _msubs;
// JwtNoLimit mirrors jwt.NoLimit in Go (-1 cast to int32).
private const int JwtNoLimit = -1;
///
/// Atomically applies the minimum of two int32 limits.
/// Mirrors Go minLimit.
///
private static bool MinLimit(ref int value, int limit)
{
int v = Volatile.Read(ref value);
if (v != JwtNoLimit)
{
if (limit != JwtNoLimit && limit < v)
{
Volatile.Write(ref value, limit);
return true;
}
}
else if (limit != JwtNoLimit)
{
Volatile.Write(ref value, limit);
return true;
}
return false;
}
///
/// Applies account-level connection limits to this client.
/// Lock is held on entry.
/// Mirrors Go client.applyAccountLimits().
///
internal void ApplyAccountLimits()
{
if (Account is null || (Kind != ClientKind.Client && Kind != ClientKind.Leaf))
return;
Volatile.Write(ref _mpay, JwtNoLimit);
_msubs = JwtNoLimit;
// Apply server-level limits.
if (Server is not null)
{
var sOpts = Server.Options;
int mPay = sOpts.MaxPayload == 0 ? JwtNoLimit : sOpts.MaxPayload;
int mSubs = sOpts.MaxSubs == 0 ? JwtNoLimit : sOpts.MaxSubs;
MinLimit(ref _mpay, mPay);
MinLimit(ref _msubs, mSubs);
}
if (SubsAtLimit())
Task.Run(() =>
{
MaxSubsExceeded();
Task.Delay(20).Wait();
CloseConnection(ClosedState.MaxSubscriptionsExceeded);
});
}
// =========================================================================
// RegisterUser / RegisterNkeyUser (features 416-417)
// =========================================================================
///
/// Registers an authenticated user with this connection.
/// Mirrors Go client.RegisterUser().
///
public void RegisterUser(User user)
{
if (user.Account is INatsAccount acc)
{
try { RegisterWithAccount(acc); }
catch (Exception ex) { ReportErrRegisterAccount(acc, ex); return; }
}
lock (_mu)
{
Perms = user.Permissions is not null ? BuildPermissions(user.Permissions) : null;
MPerms = null;
if (user.Username.Length > 0)
Opts.Username = user.Username;
if (user.ConnectionDeadline != default)
SetExpirationTimerUnlocked(user.ConnectionDeadline - DateTime.UtcNow);
}
}
///
/// Registers an NKey-authenticated user.
/// Mirrors Go client.RegisterNkeyUser().
///
public void RegisterNkeyUser(NkeyUser user)
{
if (user.Account is INatsAccount acc)
{
try { RegisterWithAccount(acc); }
catch (Exception ex) { ReportErrRegisterAccount(acc, ex); return; }
}
lock (_mu)
{
Perms = user.Permissions is not null ? BuildPermissions(user.Permissions) : null;
MPerms = null;
}
}
// =========================================================================
// splitSubjectQueue (feature 418)
// =========================================================================
///
/// Splits a "subject [queue]" string into subject and optional queue bytes.
/// Mirrors Go splitSubjectQueue.
///
public static (byte[] subject, byte[]? queue) SplitSubjectQueue(string sq)
{
var vals = sq.Trim().Split((char[]?)null, StringSplitOptions.RemoveEmptyEntries);
if (vals.Length == 0)
throw new ArgumentException($"invalid subject-queue \"{sq}\"");
var subject = Encoding.ASCII.GetBytes(vals[0]);
byte[]? queue = null;
if (vals.Length == 2)
queue = Encoding.ASCII.GetBytes(vals[1]);
else if (vals.Length > 2)
throw new FormatException($"invalid subject-queue \"{sq}\"");
return (subject, queue);
}
// =========================================================================
// setPermissions / publicPermissions / mergeDenyPermissions (features 419-422)
// =========================================================================
private ClientPermissions BuildPermissions(Permissions perms)
{
var cp = new ClientPermissions();
if (perms.Publish is not null)
{
if (perms.Publish.Allow is { Count: > 0 })
{
cp.Pub.Allow = SubscriptionIndex.NewSublistWithCache();
foreach (var s in perms.Publish.Allow)
cp.Pub.Allow.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(s) });
}
if (perms.Publish.Deny is { Count: > 0 })
{
cp.Pub.Deny = SubscriptionIndex.NewSublistWithCache();
foreach (var s in perms.Publish.Deny)
cp.Pub.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(s) });
}
}
if (perms.Response is not null)
{
cp.Resp = perms.Response;
Replies = new Dictionary(StringComparer.Ordinal);
}
if (perms.Subscribe is not null)
{
if (perms.Subscribe.Allow is { Count: > 0 })
{
cp.Sub.Allow = SubscriptionIndex.NewSublistWithCache();
foreach (var s in perms.Subscribe.Allow)
{
try
{
var (subj, q) = SplitSubjectQueue(s);
cp.Sub.Allow.Insert(new Subscription { Subject = subj, Queue = q });
}
catch (Exception ex) { Errorf("%s", ex.Message); }
}
}
if (perms.Subscribe.Deny is { Count: > 0 })
{
cp.Sub.Deny = SubscriptionIndex.NewSublistWithCache();
DArray = [];
foreach (var s in perms.Subscribe.Deny)
{
DArray.Add(s, true);
try
{
var (subj, q) = SplitSubjectQueue(s);
cp.Sub.Deny.Insert(new Subscription { Subject = subj, Queue = q });
}
catch (Exception ex) { Errorf("%s", ex.Message); }
}
}
}
return cp;
}
///
/// Builds public permissions from internal permission indexes.
/// Mirrors Go client.publicPermissions().
///
internal Permissions? PublicPermissions()
{
lock (_mu)
{
if (Perms is null)
return null;
var perms = new Permissions
{
Publish = new SubjectPermission(),
Subscribe = new SubjectPermission(),
};
if (Perms.Pub.Allow is not null)
{
var subs = new List(32);
Perms.Pub.Allow.All(subs);
perms.Publish.Allow = [];
foreach (var sub in subs)
perms.Publish.Allow.Add(Encoding.ASCII.GetString(sub.Subject));
}
if (Perms.Pub.Deny is not null)
{
var subs = new List(32);
Perms.Pub.Deny.All(subs);
perms.Publish.Deny = [];
foreach (var sub in subs)
perms.Publish.Deny.Add(Encoding.ASCII.GetString(sub.Subject));
}
if (Perms.Sub.Allow is not null)
{
var subs = new List(32);
Perms.Sub.Allow.All(subs);
perms.Subscribe.Allow = [];
foreach (var sub in subs)
{
if (sub.Queue is { Length: > 0 })
perms.Subscribe.Allow.Add($"{Encoding.ASCII.GetString(sub.Subject)} {Encoding.ASCII.GetString(sub.Queue)}");
else
perms.Subscribe.Allow.Add(Encoding.ASCII.GetString(sub.Subject));
}
}
if (Perms.Sub.Deny is not null)
{
var subs = new List(32);
Perms.Sub.Deny.All(subs);
perms.Subscribe.Deny = [];
foreach (var sub in subs)
{
if (sub.Queue is { Length: > 0 })
perms.Subscribe.Deny.Add($"{Encoding.ASCII.GetString(sub.Subject)} {Encoding.ASCII.GetString(sub.Queue)}");
else
perms.Subscribe.Deny.Add(Encoding.ASCII.GetString(sub.Subject));
}
}
if (Perms.Resp is not null)
{
perms.Response = new ResponsePermission
{
MaxMsgs = Perms.Resp.MaxMsgs,
Expires = Perms.Resp.Expires,
};
}
return perms;
}
}
///
/// Merges deny permissions into publish/subscribe deny lists.
/// Lock is expected on entry.
/// Mirrors Go client.mergeDenyPermissions().
///
internal void MergeDenyPermissions(DenyType what, IReadOnlyList denySubjects)
{
if (denySubjects.Count == 0)
return;
Perms ??= new ClientPermissions();
List targets = what switch
{
DenyType.Pub => [Perms.Pub],
DenyType.Sub => [Perms.Sub],
DenyType.Both => [Perms.Pub, Perms.Sub],
_ => [],
};
foreach (var target in targets)
{
target.Deny ??= SubscriptionIndex.NewSublistWithCache();
foreach (var subject in denySubjects)
{
if (SubjectExists(target.Deny, subject))
continue;
target.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(subject) });
}
}
}
///
/// Merges deny permissions under the client lock.
/// Mirrors Go client.mergeDenyPermissionsLocked().
///
internal void MergeDenyPermissionsLocked(DenyType what, IReadOnlyList denySubjects)
{
lock (_mu)
{
MergeDenyPermissions(what, denySubjects);
}
}
private static bool SubjectExists(SubscriptionIndex index, string subject)
{
var result = index.Match(subject);
foreach (var qGroup in result.QSubs)
foreach (var sub in qGroup)
if (Encoding.ASCII.GetString(sub.Subject) == subject)
return true;
foreach (var sub in result.PSubs)
if (Encoding.ASCII.GetString(sub.Subject) == subject)
return true;
return false;
}
// =========================================================================
// setExpiration / loadMsgDenyFilter (features 423-424)
// =========================================================================
internal void SetExpirationTimer(TimeSpan d)
{
lock (_mu)
{
SetExpirationTimerUnlocked(d);
}
}
internal void SetExpirationTimerUnlocked(TimeSpan d)
{
var prev = Interlocked.Exchange(ref _expTimer, null);
prev?.Dispose();
if (d <= TimeSpan.Zero)
{
ClaimExpiration();
return;
}
Expires = DateTime.UtcNow + d;
_expTimer = new Timer(_ => ClaimExpiration(), null, d, Timeout.InfiniteTimeSpan);
}
///
/// Applies JWT expiration with optional validity cap.
/// Mirrors Go client.setExpiration().
///
internal void SetExpiration(long claimsExpiresUnixSeconds, TimeSpan validFor)
{
if (claimsExpiresUnixSeconds == 0)
{
if (validFor != TimeSpan.Zero)
SetExpirationTimer(validFor);
return;
}
var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
var expiresAt = TimeSpan.Zero;
if (claimsExpiresUnixSeconds > now)
expiresAt = TimeSpan.FromSeconds(claimsExpiresUnixSeconds - now);
if (validFor != TimeSpan.Zero && validFor < expiresAt)
SetExpirationTimer(validFor);
else
SetExpirationTimer(expiresAt);
}
///
/// Loads message deny filter from current deny subject array.
/// Lock is expected on entry.
/// Mirrors Go client.loadMsgDenyFilter().
///
internal void LoadMsgDenyFilter()
{
MPerms = new MsgDeny
{
Deny = SubscriptionIndex.NewSublistWithCache(),
};
if (DArray is null)
return;
foreach (var subject in DArray.Keys)
MPerms.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(subject) });
}
// =========================================================================
// msgParts (feature 470)
// =========================================================================
///
/// Splits a message buffer into header and body parts.
/// Mirrors Go client.msgParts().
///
public (byte[] hdr, byte[] msg) MsgParts(byte[] buf)
{
int hdrLen = ParseCtx.Pa.HeaderSize;
// Return header slice with a capped capacity (no extra capacity beyond the header).
var hdr = buf[..hdrLen];
// Create an isolated copy so appending to hdr doesn't touch msg.
var hdrCopy = new byte[hdrLen];
Buffer.BlockCopy(buf, 0, hdrCopy, 0, hdrLen);
var msg = buf[hdrLen..];
return (hdrCopy, msg);
}
// =========================================================================
// kindString (feature 533)
// =========================================================================
private static readonly Dictionary KindStringMap = new()
{
[ClientKind.Client] = "Client",
[ClientKind.Router] = "Router",
[ClientKind.Gateway] = "Gateway",
[ClientKind.Leaf] = "Leafnode",
[ClientKind.JetStream] = "JetStream",
[ClientKind.Account] = "Account",
[ClientKind.System] = "System",
};
///
/// Returns a human-readable kind name.
/// Mirrors Go client.kindString().
///
internal string KindString() =>
KindStringMap.TryGetValue(Kind, out var s) ? s : "Unknown Type";
// =========================================================================
// isClosed (feature 555)
// =========================================================================
///
/// Returns true if closeConnection has been called.
/// Mirrors Go client.isClosed().
///
public bool IsClosed() => (Flags & ClientFlags.CloseConnection) != 0;
// =========================================================================
// format / formatNoClientInfo / formatClientSuffix (features 556-558)
// =========================================================================
///
/// Returns a formatted log string for this client.
/// Mirrors Go client.format().
///
internal string Format() => $"{_ncs}";
internal string FormatNoClientInfo() => _connStr;
internal string FormatClientSuffix() => $" - {KindString()}:{Cid}";
// =========================================================================
// Logging helpers (features 559-568)
// =========================================================================
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Error(string msg) => Server?.Logger.LogError("[{Client}] {Msg}", _ncs, msg);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Errorf(string fmt, params object?[] args) =>
Server?.Logger.LogError("[{Client}] " + fmt, [_ncs, ..args]);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Debugf(string fmt, params object?[] args) =>
Server?.Logger.LogDebug("[{Client}] " + fmt, [_ncs, ..args]);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Noticef(string fmt, params object?[] args) =>
Server?.Logger.LogInformation("[{Client}] " + fmt, [_ncs, ..args]);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Tracef(string fmt, params object?[] args) =>
Server?.Logger.LogTrace("[{Client}] " + fmt, [_ncs, ..args]);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Warnf(string fmt, params object?[] args) =>
Server?.Logger.LogWarning("[{Client}] " + fmt, [_ncs, ..args]);
// =========================================================================
// Auth-related helpers (features 446-451, 526-531, 570-571)
// =========================================================================
internal void SendErrAndErr(string err) { SendErr(err); Error(err); }
internal void SendErrAndDebug(string msg){ SendErr(msg); Debugf(msg); }
internal void AuthTimeout()
{
SendErrAndDebug("Authentication Timeout");
CloseConnection(ClosedState.AuthenticationTimeout);
}
internal void AuthExpired()
{
SendErrAndDebug("Authorization Expired");
CloseConnection(ClosedState.AuthenticationExpired);
}
internal void AccountAuthExpired()
{
SendErrAndDebug("Account authorization expired");
CloseConnection(ClosedState.AuthenticationExpired);
}
internal void AuthViolation()
{
SendErrAndErr(ServerErrors.ErrAuthorization.Message);
CloseConnection(ClosedState.AuthenticationViolation);
}
internal void MaxAccountConnExceeded()
{
SendErrAndErr(ServerErrors.ErrTooManyAccountConnections.Message);
CloseConnection(ClosedState.MaxAccountConnectionsExceeded);
}
internal void MaxConnExceeded()
{
SendErrAndErr(ServerErrors.ErrTooManyConnections.Message);
CloseConnection(ClosedState.MaxConnectionsExceeded);
}
internal void MaxSubsExceeded()
{
Errorf("Maximum Subscriptions Exceeded (max=%d)", _msubs);
SendErr(ServerErrors.ErrTooManySubs.Message);
}
internal void MaxPayloadViolation(int sz, int max)
{
SendErrAndErr($"Maximum Payload Violation");
CloseConnection(ClosedState.MaxPayloadExceeded);
}
internal void PubPermissionViolation(string subject)
{
SendErr($"Permissions Violation for Publish to \"{subject}\"");
Errorf("Publish Violation - User %q, Subject %q", GetAuthUser(), subject);
}
internal void SubPermissionViolation(Subscription sub)
{
string subj = Encoding.UTF8.GetString(sub.Subject);
string queue = sub.Queue is { Length: > 0 } ? $" using queue \"{Encoding.UTF8.GetString(sub.Queue)}\"" : string.Empty;
SendErr($"Permissions Violation for Subscription to \"{subj}\"{queue}");
Errorf("Subscription Violation - User %q, Subject %q, SID %q",
GetAuthUser(), subj, sub.Sid is not null ? Encoding.UTF8.GetString(sub.Sid) : string.Empty);
}
internal void ReplySubjectViolation(string reply)
{
SendErr($"Permissions Violation for use of Reply subject \"{reply}\"");
Errorf("Reply Subject Violation - User %q, Reply %q", GetAuthUser(), reply);
}
internal void MaxTokensViolation(Subscription sub)
{
SendErrAndErr($"Permissions Violation for Subscription to \"{Encoding.UTF8.GetString(sub.Subject)}\"");
}
internal void SetAuthError(Exception err) { lock (_mu) { AuthErr = err; } }
internal Exception? GetAuthError() { lock (_mu) { return AuthErr; } }
// Auth credential accessors (used by NatsServer.Auth.cs)
internal string GetAuthToken() { lock (_mu) { return Opts.Token; } }
internal string GetNkey() { lock (_mu) { return Opts.Nkey; } }
internal string GetNkeySig() { lock (_mu) { return Opts.Sig; } }
internal string GetUsername() { lock (_mu) { return Opts.Username; } }
internal string GetPassword() { lock (_mu) { return Opts.Password; } }
internal X509Certificate2? GetTlsCertificate()
{
lock (_mu)
{
if (_nc is SslStream ssl)
{
var cert = ssl.RemoteCertificate;
if (cert is X509Certificate2 cert2) return cert2;
if (cert != null) return new X509Certificate2(cert);
}
return null;
}
}
///
/// Returns true when the current TLS peer certificate matches one of the pinned
/// SPKI SHA-256 key identifiers.
/// Mirrors Go client.matchesPinnedCert.
///
internal bool MatchesPinnedCert(PinnedCertSet? tlsPinnedCerts)
{
if (tlsPinnedCerts == null)
{
return true;
}
var certificate = GetTlsCertificate();
if (certificate == null)
{
Debugf("Failed pinned cert test as client did not provide a certificate");
return false;
}
byte[] subjectPublicKeyInfo;
try
{
subjectPublicKeyInfo = certificate.PublicKey.ExportSubjectPublicKeyInfo();
}
catch
{
subjectPublicKeyInfo = certificate.GetPublicKey();
}
var sha = SHA256.HashData(subjectPublicKeyInfo);
var keyId = Convert.ToHexString(sha).ToLowerInvariant();
if (!tlsPinnedCerts.Contains(keyId))
{
Debugf("Failed pinned cert test for key id: {0}", keyId);
return false;
}
return true;
}
internal void SetAccount(INatsAccount? acc)
{
lock (_mu) { Account = acc; }
}
internal void SetAccount(Account? acc) => SetAccount(acc as INatsAccount);
internal void SetPermissions(Auth.Permissions? perms)
{
lock (_mu)
{
if (perms != null)
Perms = BuildPermissions(perms);
}
}
// =========================================================================
// Timer helpers (features 523-531)
// =========================================================================
internal void SetPingTimer()
{
var interval = Server?.Options.PingInterval ?? TimeSpan.FromMinutes(2);
if (interval <= TimeSpan.Zero)
return;
ClearPingTimer();
_pingTimer = new Timer(_ =>
{
if (IsClosed())
return;
SendPing();
}, null, interval, interval);
}
internal void ClearPingTimer()
{
var t = Interlocked.Exchange(ref _pingTimer, null);
t?.Dispose();
}
internal void ClearTlsToTimer()
{
var t = Interlocked.Exchange(ref _tlsTo, null);
t?.Dispose();
}
internal void SetAuthTimer()
{
var timeout = Server?.Options.AuthTimeout ?? 0;
if (timeout <= 0)
return;
SetAuthTimer(TimeSpan.FromSeconds(timeout));
}
internal void ClearAuthTimer()
{
var t = Interlocked.Exchange(ref _atmr, null);
t?.Dispose();
}
internal bool AwaitingAuth() => (Flags & ClientFlags.ExpectConnect) != 0
&& (Flags & ClientFlags.ConnectReceived) == 0;
internal void ClaimExpiration()
{
AuthExpired();
}
// =========================================================================
// flushSignal / queueOutbound / enqueueProto (features 433, 456-459)
// =========================================================================
internal void FlushSignal()
{
FlushClients(0);
}
internal void ProcessInboundClientMsg(byte[] msg)
{
if (msg is null || msg.Length == 0)
return;
LastIn = DateTime.UtcNow;
if (Trace)
TraceMsg(msg);
}
internal void EnqueueProtoAndFlush(ReadOnlySpan proto)
{
EnqueueProto(proto);
}
internal void SendProtoNow(ReadOnlySpan proto)
{
EnqueueProto(proto);
}
internal void EnqueueProto(ReadOnlySpan proto)
{
// Deferred: full write-loop queuing will be completed with server integration (session 09).
if (_nc is not null)
{
try { _nc.Write(proto); }
catch { /* connection errors handled by closeConnection */ }
}
}
// =========================================================================
// sendPong / sendPing / sendRTTPing (features 460-463)
// =========================================================================
internal void SendPong() => EnqueueProtoAndFlush(Encoding.ASCII.GetBytes(Wires.PongProto));
internal void SendRttPing() { lock (_mu) { SendRttPingLocked(); } }
internal void SendRttPingLocked()
{
RttStart = DateTime.UtcNow;
SendPing();
}
internal void SendPing()
{
_pingOut++;
EnqueueProtoAndFlush(Encoding.ASCII.GetBytes(Wires.PingProto));
}
// =========================================================================
// sendErr / sendOK (features 465-466)
// =========================================================================
internal void SendErr(string err) =>
EnqueueProtoAndFlush(Encoding.ASCII.GetBytes(string.Format(Wires.ErrProto, err)));
internal void SendOK()
{
if (Opts.Verbose)
EnqueueProtoAndFlush(Encoding.ASCII.GetBytes(Wires.OkProto));
}
// =========================================================================
// traceMsg / traceInOp / traceOutOp / traceOp (features 434-439)
// =========================================================================
internal void TraceMsg(byte[] msg) { if (Trace) TraceMsgInternal(msg, false, false); }
internal void TraceMsgDelivery(byte[] msg) { if (Trace) TraceMsgInternal(msg, false, true); }
internal void TraceInOp(string op, byte[] arg) { if (Trace) TraceOp("<", op, arg); }
internal void TraceOutOp(string op, byte[] arg) { if (Trace) TraceOp(">", op, arg); }
private void TraceMsgInternal(byte[] msg, bool inbound, bool delivery)
{
var dir = inbound ? "<" : ">";
var marker = delivery ? "[DELIVER]" : "[MSG]";
Tracef("{0} {1} {2}", dir, marker, Encoding.UTF8.GetString(msg));
}
private void TraceOp(string dir, string op, byte[] arg)
{
Tracef("%s %s %s", dir, op, arg is not null ? Encoding.UTF8.GetString(arg) : string.Empty);
}
// =========================================================================
// getAuthUser / getAuthUserLabel (features 550-552)
// =========================================================================
internal string GetRawAuthUserLock()
{
lock (_mu) { return GetRawAuthUser(); }
}
internal string GetRawAuthUser()
{
if (Opts.Nkey.Length > 0) return Opts.Nkey;
if (Opts.Username.Length > 0) return Opts.Username;
if (Opts.Token.Length > 0) return "Token";
return "Unknown";
}
internal string GetAuthUser() => GetRawAuthUser();
internal string GetAuthUserLabel()
{
var u = GetRawAuthUser();
return u.Length > 0 ? u : "Unknown User";
}
// =========================================================================
// connectionTypeAllowed (feature 554)
// =========================================================================
internal bool ConnectionTypeAllowed(string ct)
{
// Deferred: full implementation will be completed with JWT integration.
return true;
}
// =========================================================================
// closeConnection (feature 536)
// =========================================================================
///
/// Closes the client connection with the given reason.
/// Mirrors Go client.closeConnection().
///
public void CloseConnection(ClosedState reason)
{
lock (_mu)
{
if (IsClosed()) return;
Flags |= ClientFlags.CloseConnection;
ClearAuthTimer();
ClearPingTimer();
}
// Close the underlying network connection.
try { _nc?.Close(); } catch { /* ignore */ }
_nc = null;
}
// =========================================================================
// flushAndClose (feature 532)
// =========================================================================
internal void FlushAndClose(bool deadlineExceeded)
{
CloseConnection(ClosedState.ClientClosed);
}
// =========================================================================
// setNoReconnect (feature 538)
// =========================================================================
internal void SetNoReconnect()
{
lock (_mu) { Flags |= ClientFlags.NoReconnect; }
}
// =========================================================================
// getRTTValue (feature 539)
// =========================================================================
internal TimeSpan GetRttValue()
{
lock (_mu) { return Rtt; }
}
// =========================================================================
// Account / server helpers (features 540-545)
// =========================================================================
internal INatsAccount? GetAccount()
{
lock (_mu) { return Account; }
}
// =========================================================================
// TLS handshake helpers (features 546-548)
// =========================================================================
internal async Task DoTlsServerHandshakeAsync(SslServerAuthenticationOptions opts, CancellationToken ct = default)
{
// Deferred: full TLS flow will be completed with server integration.
return false;
}
internal async Task DoTlsClientHandshakeAsync(SslClientAuthenticationOptions opts, CancellationToken ct = default)
{
// Deferred: full TLS flow will be completed with server integration.
return false;
}
// =========================================================================
// Stub methods for server-dependent features
// (Fully implemented when Server/Account sessions are complete)
// =========================================================================
// features 425-427: writeLoop / flushClients / readLoop
internal void WriteLoop() => FlushClients(long.MaxValue);
internal void FlushClients(long budget)
{
try { _nc?.Flush(); }
catch { /* no-op for now */ }
}
internal void ReadLoop(byte[]? pre)
{
LastIn = DateTime.UtcNow;
if (pre is { Length: > 0 })
TraceInOp("PRE", pre);
}
// =========================================================================
// Parser compatibility wrappers (features 2588, 2590, 2591)
// =========================================================================
///
/// Parses protocol bytes using the shared parser state for this connection.
/// Mirrors Go client.parse.
///
internal Exception? Parse(byte[] buf, IProtocolHandler handler)
{
ArgumentNullException.ThrowIfNull(buf);
ArgumentNullException.ThrowIfNull(handler);
ParseCtx.Kind = Kind;
ParseCtx.HasHeaders = Headers;
if (_mcl > 0)
ParseCtx.MaxControlLine = _mcl;
if (_mpay != 0)
ParseCtx.MaxPayload = _mpay;
return ProtocolParser.Parse(ParseCtx, handler, buf);
}
///
/// Checks max control line enforcement for the current connection kind.
/// Mirrors Go client.overMaxControlLineLimit.
///
internal Exception? OverMaxControlLineLimit(byte[] arg, int mcl, IProtocolHandler handler)
{
ArgumentNullException.ThrowIfNull(arg);
ArgumentNullException.ThrowIfNull(handler);
ParseCtx.Kind = Kind;
return ProtocolParser.OverMaxControlLineLimit(ParseCtx, handler, arg, mcl);
}
///
/// Re-processes stored pub args for split-buffer message payload handling.
/// Mirrors Go client.clonePubArg.
///
internal Exception? ClonePubArg(IProtocolHandler handler, bool lmsg)
{
ArgumentNullException.ThrowIfNull(handler);
ParseCtx.Kind = Kind;
ParseCtx.HasHeaders = Headers;
if (_mpay != 0)
ParseCtx.MaxPayload = _mpay;
return ProtocolParser.ClonePubArg(ParseCtx, handler, lmsg);
}
///
/// Generates the INFO JSON bytes sent to the client on connect.
/// Stub — full implementation in session 09.
/// Mirrors Go client.generateClientInfoJSON().
///
internal ReadOnlyMemory GenerateClientInfoJSON(ServerInfo info, bool includeClientIp)
=> ReadOnlyMemory.Empty;
///
/// Sets the auth-timeout timer to the specified duration.
/// Mirrors Go client.setAuthTimer(d).
///
internal void SetAuthTimer(TimeSpan d)
{
var prev = Interlocked.Exchange(ref _atmr, null);
prev?.Dispose();
if (d <= TimeSpan.Zero)
return;
_atmr = new Timer(_ => AuthTimeout(), null, d, Timeout.InfiniteTimeSpan);
}
// features 428-432: closedStateForErr, collapsePtoNB, flushOutbound, handleWriteTimeout, markConnAsClosed
internal static ClosedState ClosedStateForErr(Exception err) =>
err is EndOfStreamException ? ClosedState.ClientClosed : ClosedState.ReadError;
// features 440-441: processInfo, processErr
internal void ProcessInfo(string info)
{
if (string.IsNullOrWhiteSpace(info))
return;
Debugf("INFO {0}", info);
}
internal void ProcessErr(string err)
{
if (string.IsNullOrWhiteSpace(err))
return;
SetAuthError(new InvalidOperationException(err));
Errorf("-ERR {0}", err);
}
// features 442-443: removeSecretsFromTrace, redact
// Delegates to ServerLogging.RemoveSecretsFromTrace (the real implementation lives there).
internal static string RemoveSecretsFromTrace(string s) => ServerLogging.RemoveSecretsFromTrace(s);
internal static string Redact(string s) => s;
// feature 444: computeRTT
internal static TimeSpan ComputeRtt(DateTime start) => DateTime.UtcNow - start;
// feature 445: processConnect
internal void ProcessConnect(byte[] arg)
{
if (arg == null || arg.Length == 0)
return;
try
{
var parsed = JsonSerializer.Deserialize(arg);
if (parsed != null)
{
lock (_mu)
{
Opts = parsed;
Echo = parsed.Echo;
Headers = parsed.Headers;
Flags |= ClientFlags.ConnectReceived;
}
}
}
catch (Exception ex)
{
SetAuthError(ex);
Errorf("CONNECT parse failed: {0}", ex.Message);
}
}
// feature 467-468: processPing, processPong
internal void ProcessPing()
{
_pingOut = 0;
SendPong();
}
internal void ProcessPong()
{
Rtt = ComputeRtt(RttStart);
_pingOut = 0;
}
// feature 469: updateS2AutoCompressionLevel
internal void UpdateS2AutoCompressionLevel()
{
// Placeholder for adaptive compression tuning; keep no-op semantics for now.
if (_pingOut < 0)
_pingOut = 0;
}
// features 471-486: processPub variants, parseSub, processSub, etc.
// Implemented in full when Server+Account sessions complete.
// features 487-503: deliverMsg, addToPCD, trackRemoteReply, pruning, pubAllowed, etc.
// features 512-514: processServiceImport, addSubToRouteTargets, processMsgResults
// feature 515: checkLeafClientInfoHeader
// feature 520-522: processPingTimer, adjustPingInterval, watchForStaleConnection
// feature 534-535: swapAccountAfterReload, processSubsOnConfigReload
// feature 537: reconnect
// feature 569: setFirstPingTimer
// =========================================================================
// IsMqtt / IsWebSocket helpers (used by clientType, not separately tracked)
// =========================================================================
internal bool IsMqtt() => false; // Deferred to session 22 (MQTT).
internal bool IsWebSocket() => false; // Deferred to session 23 (WebSocket).
internal bool IsHubLeafNode() => false; // Deferred to session 15 (leaf nodes).
internal string RemoteCluster() => string.Empty; // Deferred to sessions 14/15.
}
// ============================================================================
// Private read-cache state (per-readLoop invocation)
// ============================================================================
internal struct ReadCacheState
{
public ulong GenId;
public Dictionary? Results;
public Dictionary? PaCache;
public List? Rts;
public int Msgs;
public int Bytes;
public int Subs;
public int Rsz; // read buffer size
public int Srs; // short reads
public ReadCacheFlags Flags;
public DateTime Start;
public TimeSpan Tst; // total stall time
}
internal sealed class PerAccountCache
{
public INatsAccount? Acc { get; set; }
public SubscriptionIndexResult? Results { get; set; }
public ulong GenId { get; set; }
}
internal sealed class RrTracking
{
public Dictionary? RMap { get; set; }
public Timer? Ptmr { get; set; }
public TimeSpan Lrt { get; set; }
}
// ============================================================================
// Server / account interfaces (stubs until sessions 09 and 11)
// ============================================================================
///
/// Minimal server interface used by ClientConnection.
/// Full implementation in session 09 (server.go).
///
public interface INatsServer
{
ulong NextClientId();
ServerOptions Options { get; }
bool TraceEnabled { get; }
bool TraceSysAcc { get; }
ILogger Logger { get; }
void DecActiveAccounts();
void IncActiveAccounts();
}
///
/// Minimal account interface used by ClientConnection.
/// Full implementation in session 11 (accounts.go).
///
public interface INatsAccount
{
string Name { get; }
bool IsValid { get; }
bool MaxTotalConnectionsReached();
bool MaxTotalLeafNodesReached();
int AddClient(ClientConnection c);
int RemoveClient(ClientConnection c);
}
/// Thrown when account connection limits are exceeded.
public sealed class TooManyAccountConnectionsException : Exception
{
public TooManyAccountConnectionsException() : base("Too Many Account Connections")
{
// Intentionally empty.
}
}
/// Thrown when an account is invalid or null.
public sealed class BadAccountException : Exception
{
public BadAccountException() : base("Bad Account")
{
// Intentionally empty.
}
}