Improve XML documentation coverage across src modules and sync generated analysis artifacts.

This commit is contained in:
Joseph Doherty
2026-03-14 03:56:58 -04:00
parent ba0d65317a
commit 46ead5ea9f
152 changed files with 2821 additions and 11284 deletions

View File

@@ -2,8 +2,11 @@ namespace NATS.Server.Auth;
public sealed class AccountConfig
{
/// <summary>Maximum concurrent client connections allowed for this account (0 = unlimited).</summary>
public int MaxConnections { get; init; } // 0 = unlimited
/// <summary>Maximum subscriptions per client/account context (0 = unlimited).</summary>
public int MaxSubscriptions { get; init; } // 0 = unlimited
/// <summary>Default publish/subscribe permissions applied to users in this account.</summary>
public Permissions? DefaultPermissions { get; init; }
/// <summary>Service and stream exports from this account.</summary>
@@ -19,7 +22,9 @@ public sealed class AccountConfig
/// </summary>
public sealed class ExportDefinition
{
/// <summary>Service subject exported to other accounts.</summary>
public string? Service { get; init; }
/// <summary>Stream subject exported to other accounts.</summary>
public string? Stream { get; init; }
/// <summary>Optional latency tracking subject (e.g. "latency.svc.echo").</summary>
@@ -36,9 +41,14 @@ public sealed class ExportDefinition
/// </summary>
public sealed class ImportDefinition
{
/// <summary>Remote account name for imported service mappings.</summary>
public string? ServiceAccount { get; init; }
/// <summary>Remote service subject imported from <see cref="ServiceAccount"/>.</summary>
public string? ServiceSubject { get; init; }
/// <summary>Remote account name for imported stream mappings.</summary>
public string? StreamAccount { get; init; }
/// <summary>Remote stream subject imported from <see cref="StreamAccount"/>.</summary>
public string? StreamSubject { get; init; }
/// <summary>Local remapped subject for imported services/streams.</summary>
public string? To { get; init; }
}

View File

@@ -15,6 +15,9 @@ public static class AccountImportExport
/// Returns true if following service imports from <paramref name="from"/>
/// eventually leads back to <paramref name="to"/>.
/// </summary>
/// <param name="from">Starting account whose service-import edges are traversed.</param>
/// <param name="to">Target account that would indicate an import cycle if reached.</param>
/// <param name="visited">Visited account-name set used to avoid infinite graph recursion.</param>
public static bool DetectCycle(Account from, Account to, HashSet<string>? visited = null)
{
ArgumentNullException.ThrowIfNull(from);
@@ -48,6 +51,9 @@ public static class AccountImportExport
/// <summary>
/// Validates that the import is authorized and does not create a cycle.
/// </summary>
/// <param name="importingAccount">Account requesting to import a service.</param>
/// <param name="exportingAccount">Account exporting the requested service subject.</param>
/// <param name="exportSubject">Exported service subject being imported.</param>
/// <exception cref="UnauthorizedAccessException">Thrown when the importing account is not authorized.</exception>
/// <exception cref="InvalidOperationException">Thrown when the import would create a cycle.</exception>
public static void ValidateImport(Account importingAccount, Account exportingAccount, string exportSubject)

View File

@@ -2,6 +2,11 @@ namespace NATS.Server.Auth;
public interface IExternalAuthClient
{
/// <summary>
/// Requests an allow/deny decision from an external authentication provider.
/// </summary>
/// <param name="request">Credential material and identity hints from the client connection.</param>
/// <param name="ct">Cancellation token bound to auth timeout and connection lifecycle.</param>
Task<ExternalAuthDecision> AuthorizeAsync(ExternalAuthRequest request, CancellationToken ct);
}
@@ -19,14 +24,36 @@ public record ExternalAuthDecision(
public sealed class ExternalAuthOptions
{
/// <summary>
/// Gets or sets a value indicating whether external auth callouts are enabled.
/// </summary>
public bool Enabled { get; set; }
/// <summary>
/// Gets or sets the timeout budget for each external auth decision request.
/// </summary>
public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(2);
/// <summary>
/// Gets or sets the client implementation responsible for external auth decisions.
/// </summary>
public IExternalAuthClient? Client { get; set; }
}
public sealed class ProxyAuthOptions
{
/// <summary>
/// Gets or sets a value indicating whether trusted-proxy authentication mode is enabled.
/// </summary>
public bool Enabled { get; set; }
/// <summary>
/// Gets or sets the required username prefix marking identities provided by a trusted proxy.
/// </summary>
public string UsernamePrefix { get; set; } = "proxy:";
/// <summary>
/// Gets or sets the default account to assign when proxy-authenticated users omit one.
/// </summary>
public string? Account { get; set; }
}

View File

@@ -2,10 +2,33 @@ namespace NATS.Server.Auth;
public sealed class AuthResult
{
/// <summary>
/// Gets the resolved client identity that successfully authenticated.
/// </summary>
public required string Identity { get; init; }
/// <summary>
/// Gets the account name assigned to the authenticated identity.
/// </summary>
public string? AccountName { get; init; }
/// <summary>
/// Gets effective publish/subscribe permissions applied to the connection.
/// </summary>
public Permissions? Permissions { get; init; }
/// <summary>
/// Gets the credential expiry timestamp after which the connection should be considered invalid.
/// </summary>
public DateTimeOffset? Expiry { get; init; }
/// <summary>
/// Gets the maximum number of JetStream streams permitted for this identity.
/// </summary>
public int MaxJetStreamStreams { get; init; }
/// <summary>
/// Gets the JetStream tier assigned for quota enforcement.
/// </summary>
public string? JetStreamTier { get; init; }
}

View File

@@ -15,7 +15,14 @@ public sealed class AuthService
private readonly string? _noAuthUser;
private readonly Dictionary<string, User>? _usersMap;
/// <summary>
/// Gets a value indicating whether any authentication mechanism is configured.
/// </summary>
public bool IsAuthRequired { get; }
/// <summary>
/// Gets a value indicating whether the protocol must issue a nonce challenge.
/// </summary>
public bool NonceRequired { get; }
private AuthService(List<IAuthenticator> authenticators, bool authRequired, bool nonceRequired,
@@ -28,6 +35,10 @@ public sealed class AuthService
_usersMap = usersMap;
}
/// <summary>
/// Builds an authentication service from server options and configured auth sources.
/// </summary>
/// <param name="options">Server options containing static users, tokens, NKeys, and auth extensions.</param>
public static AuthService Build(NatsOptions options)
{
var authenticators = new List<IAuthenticator>();
@@ -97,6 +108,10 @@ public sealed class AuthService
return new AuthService(authenticators, authRequired, nonceRequired, options.NoAuthUser, usersMap);
}
/// <summary>
/// Attempts to authenticate a client CONNECT context against configured authenticators.
/// </summary>
/// <param name="context">Client auth context extracted from CONNECT and transport metadata.</param>
public AuthResult? Authenticate(ClientAuthContext context)
{
if (!IsAuthRequired)
@@ -145,6 +160,9 @@ public sealed class AuthService
return new AuthResult { Identity = _noAuthUser };
}
/// <summary>
/// Generates cryptographically strong nonce bytes for NKey/JWT signature challenges.
/// </summary>
public byte[] GenerateNonce()
{
Span<byte> raw = stackalloc byte[11];
@@ -152,6 +170,13 @@ public sealed class AuthService
return raw.ToArray();
}
/// <summary>
/// Validates MQTT username/password fields against configured MQTT auth settings.
/// </summary>
/// <param name="configuredUsername">Username configured on the server for MQTT auth.</param>
/// <param name="configuredPassword">Password configured on the server for MQTT auth.</param>
/// <param name="providedUsername">Username supplied by the connecting MQTT client.</param>
/// <param name="providedPassword">Password supplied by the connecting MQTT client.</param>
public static bool ValidateMqttCredentials(
string? configuredUsername,
string? configuredPassword,
@@ -165,6 +190,10 @@ public sealed class AuthService
&& string.Equals(configuredPassword, providedPassword, StringComparison.Ordinal);
}
/// <summary>
/// Encodes nonce bytes into URL-safe base64 format used by NATS auth challenges.
/// </summary>
/// <param name="nonce">Raw nonce bytes generated for the challenge.</param>
public string EncodeNonce(byte[] nonce)
{
return Convert.ToBase64String(nonce)

View File

@@ -16,6 +16,10 @@ public sealed class ClientPermissions : IDisposable
_responseTracker = responseTracker;
}
/// <summary>
/// Builds a runtime client-permissions evaluator from account/user permission config.
/// </summary>
/// <param name="permissions">Permission configuration from auth claims or static config.</param>
public static ClientPermissions? Build(Permissions? permissions)
{
if (permissions == null)
@@ -33,8 +37,11 @@ public sealed class ClientPermissions : IDisposable
return new ClientPermissions(pub, sub, responseTracker);
}
/// <summary>Optional tracker used to authorize dynamic response subjects.</summary>
public ResponseTracker? ResponseTracker => _responseTracker;
/// <summary>Determines whether publishing to the given subject is permitted.</summary>
/// <param name="subject">Publish subject being authorized for the client.</param>
public bool IsPublishAllowed(string subject)
{
if (_publish == null)
@@ -56,6 +63,9 @@ public sealed class ClientPermissions : IDisposable
return allowed;
}
/// <summary>Determines whether subscribing to the given subject/queue is permitted.</summary>
/// <param name="subject">Subscription subject being authorized.</param>
/// <param name="queue">Optional queue group name for queue-subscription checks.</param>
public bool IsSubscribeAllowed(string subject, string? queue = null)
{
if (_subscribe == null)
@@ -67,6 +77,8 @@ public sealed class ClientPermissions : IDisposable
return true;
}
/// <summary>Determines whether delivering a message on the subject is permitted.</summary>
/// <param name="subject">Delivery subject evaluated against deny rules.</param>
public bool IsDeliveryAllowed(string subject)
{
if (_subscribe == null)
@@ -74,6 +86,7 @@ public sealed class ClientPermissions : IDisposable
return _subscribe.IsDeliveryAllowed(subject);
}
/// <summary>Disposes permission resources used by this evaluator.</summary>
public void Dispose()
{
_publish?.Dispose();
@@ -92,6 +105,10 @@ public sealed class PermissionSet : IDisposable
_deny = deny;
}
/// <summary>
/// Builds allow/deny sublists from a subject-permission definition.
/// </summary>
/// <param name="permission">Allow/deny subject rules.</param>
public static PermissionSet? Build(SubjectPermission? permission)
{
if (permission == null)
@@ -123,6 +140,8 @@ public sealed class PermissionSet : IDisposable
return new PermissionSet(allow, deny);
}
/// <summary>Checks whether a subject passes allow/deny evaluation.</summary>
/// <param name="subject">Subject candidate to evaluate against allow and deny lists.</param>
public bool IsAllowed(string subject)
{
bool allowed = true;
@@ -142,6 +161,8 @@ public sealed class PermissionSet : IDisposable
return allowed;
}
/// <summary>Checks whether a subject is explicitly denied.</summary>
/// <param name="subject">Subject candidate evaluated against deny entries.</param>
public bool IsDenied(string subject)
{
if (_deny == null) return false;
@@ -149,6 +170,8 @@ public sealed class PermissionSet : IDisposable
return result.PlainSubs.Length > 0 || result.QueueSubs.Length > 0;
}
/// <summary>Checks delivery permission using deny-list semantics.</summary>
/// <param name="subject">Subject being delivered to a subscriber.</param>
public bool IsDeliveryAllowed(string subject)
{
if (_deny == null)
@@ -157,6 +180,7 @@ public sealed class PermissionSet : IDisposable
return result.PlainSubs.Length == 0 && result.QueueSubs.Length == 0;
}
/// <summary>Disposes internal allow/deny sublists.</summary>
public void Dispose()
{
_allow?.Dispose();

View File

@@ -6,13 +6,28 @@ namespace NATS.Server.Auth;
public interface IAuthenticator
{
/// <summary>
/// Attempts to authenticate a client connection.
/// </summary>
/// <param name="context">Authentication context containing credentials and transport metadata.</param>
AuthResult? Authenticate(ClientAuthContext context);
}
public sealed class ClientAuthContext
{
/// <summary>
/// Gets CONNECT options and credential fields supplied by the client.
/// </summary>
public required ClientOptions Opts { get; init; }
/// <summary>
/// Gets server-issued nonce bytes used for signature-based auth flows.
/// </summary>
public required byte[] Nonce { get; init; }
/// <summary>
/// Gets the client TLS certificate presented during handshake, when available.
/// </summary>
public X509Certificate2? ClientCertificate { get; init; }
/// <summary>

View File

@@ -17,12 +17,15 @@ public interface IAccountResolver
/// Fetches the JWT for the given account NKey. Returns <c>null</c> when
/// the NKey is not known to this resolver.
/// </summary>
/// <param name="accountNkey">Account public NKey used as resolver lookup key.</param>
Task<string?> FetchAsync(string accountNkey);
/// <summary>
/// Stores (or replaces) the JWT for the given account NKey. Callers that
/// target a read-only resolver should check <see cref="IsReadOnly"/> first.
/// </summary>
/// <param name="accountNkey">Account public NKey used as resolver storage key.</param>
/// <param name="jwt">Account JWT content associated with the key.</param>
Task StoreAsync(string accountNkey, string jwt);
/// <summary>

View File

@@ -19,6 +19,7 @@ public static class NatsJwt
/// <summary>
/// Returns true if the string appears to be a JWT (starts with "eyJ").
/// </summary>
/// <param name="token">Token string to inspect.</param>
public static bool IsJwt(string token)
{
return !string.IsNullOrEmpty(token) && token.StartsWith(JwtPrefix, StringComparison.Ordinal);
@@ -28,6 +29,7 @@ public static class NatsJwt
/// Decodes a JWT token into its constituent parts without verifying the signature.
/// Returns null if the token is structurally invalid.
/// </summary>
/// <param name="token">JWT string in header.payload.signature format.</param>
public static JwtToken? Decode(string token)
{
if (string.IsNullOrEmpty(token))
@@ -68,6 +70,7 @@ public static class NatsJwt
/// Decodes a JWT token and deserializes the payload as <see cref="UserClaims"/>.
/// Returns null if the token is structurally invalid or cannot be deserialized.
/// </summary>
/// <param name="token">JWT string to decode.</param>
public static UserClaims? DecodeUserClaims(string token)
{
var jwt = Decode(token);
@@ -88,6 +91,7 @@ public static class NatsJwt
/// Decodes a JWT token and deserializes the payload as <see cref="AccountClaims"/>.
/// Returns null if the token is structurally invalid or cannot be deserialized.
/// </summary>
/// <param name="token">JWT string to decode.</param>
public static AccountClaims? DecodeAccountClaims(string token)
{
var jwt = Decode(token);
@@ -107,6 +111,8 @@ public static class NatsJwt
/// <summary>
/// Verifies the Ed25519 signature on a JWT token against the given NKey public key.
/// </summary>
/// <param name="token">JWT string to verify.</param>
/// <param name="publicNkey">Expected signer public NKey.</param>
public static bool Verify(string token, string publicNkey)
{
try
@@ -129,6 +135,9 @@ public static class NatsJwt
/// Verifies a nonce signature against the given NKey public key.
/// Tries base64url decoding first, then falls back to standard base64 (Go compatibility).
/// </summary>
/// <param name="nonce">Raw nonce bytes originally issued by the server.</param>
/// <param name="signature">Signature string provided by the client.</param>
/// <param name="publicNkey">Client public NKey used for verification.</param>
public static bool VerifyNonce(byte[] nonce, string signature, string publicNkey)
{
try
@@ -150,6 +159,7 @@ public static class NatsJwt
/// Decodes a base64url-encoded byte array.
/// Replaces URL-safe characters and adds padding as needed.
/// </summary>
/// <param name="input">Base64url-encoded string.</param>
internal static byte[] Base64UrlDecode(string input)
{
var s = input.Replace('-', '+').Replace('_', '/');
@@ -214,8 +224,10 @@ public sealed class JwtToken
public sealed class JwtHeader
{
[System.Text.Json.Serialization.JsonPropertyName("alg")]
/// <summary>JWT signing algorithm identifier (typically <c>ed25519-nkey</c> for NATS).</summary>
public string? Algorithm { get; set; }
[System.Text.Json.Serialization.JsonPropertyName("typ")]
/// <summary>JWT type marker (typically <c>JWT</c>).</summary>
public string? Type { get; set; }
}

View File

@@ -2,11 +2,38 @@ namespace NATS.Server.Auth;
public sealed class NKeyUser
{
/// <summary>
/// Gets the public NKey used for challenge-signature authentication.
/// </summary>
public required string Nkey { get; init; }
/// <summary>
/// Gets publish/subscribe permission rules assigned to this NKey identity.
/// </summary>
public Permissions? Permissions { get; init; }
/// <summary>
/// Gets the account this NKey user is bound to.
/// </summary>
public string? Account { get; init; }
/// <summary>
/// Gets an optional signing key used for delegated user JWT issuance.
/// </summary>
public string? SigningKey { get; init; }
/// <summary>
/// Gets the issuance timestamp associated with this identity claim.
/// </summary>
public DateTimeOffset? Issued { get; init; }
/// <summary>
/// Gets optional connection-type restrictions for this identity.
/// </summary>
public IReadOnlySet<string>? AllowedConnectionTypes { get; init; }
/// <summary>
/// Gets a value indicating whether this identity must be presented through proxy auth.
/// </summary>
public bool ProxyRequired { get; init; }
}

View File

@@ -18,6 +18,10 @@ public sealed class PermissionLruCache
private long _generation;
private long _cacheGeneration;
/// <summary>
/// Creates a fixed-capacity permission LRU cache.
/// </summary>
/// <param name="capacity">Maximum number of cached permission decisions.</param>
public PermissionLruCache(int capacity = 128)
{
_capacity = capacity;
@@ -51,6 +55,8 @@ public sealed class PermissionLruCache
// ── PUB API (backward-compatible) ────────────────────────────────────────
/// <summary>Looks up a PUB permission for <paramref name="key"/>.</summary>
/// <param name="key">Publish subject cache key.</param>
/// <param name="value">Cached allow/deny decision when present.</param>
public bool TryGet(string key, out bool value)
{
var internalKey = "P:" + key;
@@ -71,6 +77,8 @@ public sealed class PermissionLruCache
}
/// <summary>Stores a PUB permission for <paramref name="key"/>.</summary>
/// <param name="key">Publish subject cache key.</param>
/// <param name="value">Allow/deny decision to cache.</param>
public void Set(string key, bool value)
{
var internalKey = "P:" + key;
@@ -84,6 +92,8 @@ public sealed class PermissionLruCache
// ── SUB API ───────────────────────────────────────────────────────────────
/// <summary>Looks up a SUB permission for <paramref name="subject"/>.</summary>
/// <param name="subject">Subscribe subject cache key.</param>
/// <param name="value">Cached allow/deny decision when present.</param>
public bool TryGetSub(string subject, out bool value)
{
var internalKey = "S:" + subject;
@@ -104,6 +114,8 @@ public sealed class PermissionLruCache
}
/// <summary>Stores a SUB permission for <paramref name="subject"/>.</summary>
/// <param name="subject">Subscribe subject cache key.</param>
/// <param name="allowed">Allow/deny decision to cache.</param>
public void SetSub(string subject, bool allowed)
{
var internalKey = "S:" + subject;
@@ -116,6 +128,7 @@ public sealed class PermissionLruCache
// ── Shared ────────────────────────────────────────────────────────────────
/// <summary>Current number of cached entries.</summary>
public int Count
{
get

View File

@@ -2,19 +2,44 @@ namespace NATS.Server.Auth;
public sealed class Permissions
{
/// <summary>
/// Gets publish-side allow/deny subject rules.
/// </summary>
public SubjectPermission? Publish { get; init; }
/// <summary>
/// Gets subscribe-side allow/deny subject rules.
/// </summary>
public SubjectPermission? Subscribe { get; init; }
/// <summary>
/// Gets dynamic reply-publish permissions granted to request responders.
/// </summary>
public ResponsePermission? Response { get; init; }
}
public sealed class SubjectPermission
{
/// <summary>
/// Gets subject patterns explicitly permitted for the operation.
/// </summary>
public IReadOnlyList<string>? Allow { get; init; }
/// <summary>
/// Gets subject patterns explicitly denied for the operation.
/// </summary>
public IReadOnlyList<string>? Deny { get; init; }
}
public sealed class ResponsePermission
{
/// <summary>
/// Gets the maximum number of response messages allowed on auto-generated reply subjects.
/// </summary>
public int MaxMsgs { get; init; }
/// <summary>
/// Gets the expiration window for temporary response permissions.
/// </summary>
public TimeSpan Expires { get; init; }
}

View File

@@ -11,17 +11,29 @@ public sealed class ResponseTracker
private readonly Dictionary<string, (DateTime RegisteredAt, int Count)> _replies = new(StringComparer.Ordinal);
private readonly object _lock = new();
/// <summary>
/// Creates a tracker for temporary response-subject permissions.
/// </summary>
/// <param name="maxMsgs">Maximum allowed publishes per reply subject (0 for unlimited).</param>
/// <param name="expires">TTL for each registered reply subject (<see cref="TimeSpan.Zero"/> for no TTL).</param>
public ResponseTracker(int maxMsgs, TimeSpan expires)
{
_maxMsgs = maxMsgs;
_expires = expires;
}
/// <summary>
/// Gets the number of currently tracked reply subjects.
/// </summary>
public int Count
{
get { lock (_lock) return _replies.Count; }
}
/// <summary>
/// Registers a reply subject for temporary publish authorization.
/// </summary>
/// <param name="replySubject">Reply subject allowed for responder publishes.</param>
public void RegisterReply(string replySubject)
{
lock (_lock)
@@ -30,6 +42,10 @@ public sealed class ResponseTracker
}
}
/// <summary>
/// Determines whether a publish to the reply subject is currently allowed.
/// </summary>
/// <param name="subject">Reply subject being authorized.</param>
public bool IsReplyAllowed(string subject)
{
lock (_lock)
@@ -55,6 +71,9 @@ public sealed class ResponseTracker
}
}
/// <summary>
/// Removes expired or exhausted reply permissions from the tracker.
/// </summary>
public void Prune()
{
lock (_lock)

View File

@@ -11,12 +11,17 @@ public sealed class ServiceLatencyTracker
private readonly int _maxSamples;
private long _totalRequests;
/// <summary>
/// Creates a latency tracker with a bounded in-memory sample window.
/// </summary>
/// <param name="maxSamples">Maximum number of latency samples retained for percentile calculations.</param>
public ServiceLatencyTracker(int maxSamples = 10000)
{
_maxSamples = maxSamples;
}
/// <summary>Records a latency sample in milliseconds.</summary>
/// <param name="latencyMs">Observed end-to-end service latency in milliseconds.</param>
public void RecordLatency(double latencyMs)
{
lock (_lock)
@@ -28,11 +33,15 @@ public sealed class ServiceLatencyTracker
}
}
/// <summary>Returns the 50th percentile (median) latency in milliseconds.</summary>
public double GetP50() => GetPercentile(0.50);
/// <summary>Returns the 90th percentile latency in milliseconds.</summary>
public double GetP90() => GetPercentile(0.90);
/// <summary>Returns the 99th percentile latency in milliseconds.</summary>
public double GetP99() => GetPercentile(0.99);
/// <summary>Returns the value at the given percentile (0.01.0) over recorded samples.</summary>
/// <param name="percentile">Percentile fraction between 0.0 and 1.0.</param>
public double GetPercentile(double percentile)
{
lock (_lock)
@@ -61,16 +70,19 @@ public sealed class ServiceLatencyTracker
return sum / samples.Count;
}
/// <summary>Total number of latency observations recorded.</summary>
public long TotalRequests
{
get { lock (_lock) return _totalRequests; }
}
/// <summary>Arithmetic mean latency across currently retained samples.</summary>
public double AverageLatencyMs
{
get { lock (_lock) return ComputeAverage(_samples); }
}
/// <summary>Minimum latency among currently retained samples.</summary>
public double MinLatencyMs
{
get
@@ -80,6 +92,7 @@ public sealed class ServiceLatencyTracker
}
}
/// <summary>Maximum latency among currently retained samples.</summary>
public double MaxLatencyMs
{
get
@@ -89,6 +102,7 @@ public sealed class ServiceLatencyTracker
}
}
/// <summary>Number of samples currently retained in memory.</summary>
public int SampleCount
{
get { lock (_lock) return _samples.Count; }

View File

@@ -11,6 +11,10 @@ public sealed class TlsMapAuthenticator : IAuthenticator
private readonly Dictionary<string, User> _usersByDn;
private readonly Dictionary<string, User> _usersByCn;
/// <summary>
/// Creates a TLS-map authenticator using configured users keyed by DN/CN-style identities.
/// </summary>
/// <param name="users">Configured users used for DN/CN lookup matches.</param>
public TlsMapAuthenticator(IReadOnlyList<User> users)
{
_usersByDn = new Dictionary<string, User>(StringComparer.OrdinalIgnoreCase);
@@ -22,6 +26,10 @@ public sealed class TlsMapAuthenticator : IAuthenticator
}
}
/// <summary>
/// Authenticates a client by matching certificate subject/SAN data to configured users.
/// </summary>
/// <param name="context">Authentication context containing the client TLS certificate.</param>
public AuthResult? Authenticate(ClientAuthContext context)
{
var cert = context.ClientCertificate;
@@ -65,6 +73,10 @@ public sealed class TlsMapAuthenticator : IAuthenticator
return null;
}
/// <summary>
/// Extracts domain-component RDN elements from a distinguished name.
/// </summary>
/// <param name="dn">Distinguished name to inspect for <c>DC=</c> elements.</param>
internal static string GetTlsAuthDcs(X500DistinguishedName dn)
{
if (string.IsNullOrWhiteSpace(dn.Name))
@@ -82,6 +94,10 @@ public sealed class TlsMapAuthenticator : IAuthenticator
return string.Join(",", dcs);
}
/// <summary>
/// Splits a DNS alternative-name value into normalized lowercase labels.
/// </summary>
/// <param name="dnsAltName">DNS SAN value from a certificate.</param>
internal static string[] DnsAltNameLabels(string dnsAltName)
{
if (string.IsNullOrWhiteSpace(dnsAltName))
@@ -90,6 +106,11 @@ public sealed class TlsMapAuthenticator : IAuthenticator
return dnsAltName.ToLowerInvariant().Split('.', StringSplitOptions.RemoveEmptyEntries);
}
/// <summary>
/// Determines whether SAN DNS labels match any URL host in the provided list.
/// </summary>
/// <param name="dnsAltNameLabels">Normalized SAN label sequence (supports wildcard first label).</param>
/// <param name="urls">Candidate URLs whose hosts are compared against SAN labels.</param>
internal static bool DnsAltNameMatches(string[] dnsAltNameLabels, IReadOnlyList<Uri?> urls)
{
foreach (var url in urls)

View File

@@ -2,11 +2,38 @@ namespace NATS.Server.Auth;
public sealed class User
{
/// <summary>
/// Gets the username used for CONNECT credential authentication.
/// </summary>
public required string Username { get; init; }
/// <summary>
/// Gets the password associated with <see cref="Username"/>.
/// </summary>
public required string Password { get; init; }
/// <summary>
/// Gets publish/subscribe permission rules assigned to this user.
/// </summary>
public Permissions? Permissions { get; init; }
/// <summary>
/// Gets the account this user is bound to for subject and subscription isolation.
/// </summary>
public string? Account { get; init; }
/// <summary>
/// Gets an optional cutoff timestamp after which new connections are rejected.
/// </summary>
public DateTimeOffset? ConnectionDeadline { get; init; }
/// <summary>
/// Gets optional connection-type restrictions (client, route, gateway, leaf, and so on).
/// </summary>
public IReadOnlySet<string>? AllowedConnectionTypes { get; init; }
/// <summary>
/// Gets a value indicating whether this identity must authenticate through trusted proxy headers.
/// </summary>
public bool ProxyRequired { get; init; }
}

View File

@@ -25,16 +25,28 @@ public sealed class ClientFlagHolder
{
private int _flags;
/// <summary>
/// Atomically sets the specified client state flag.
/// </summary>
/// <param name="flag">Flag to set.</param>
public void SetFlag(ClientFlags flag)
{
Interlocked.Or(ref _flags, (int)flag);
}
/// <summary>
/// Atomically clears the specified client state flag.
/// </summary>
/// <param name="flag">Flag to clear.</param>
public void ClearFlag(ClientFlags flag)
{
Interlocked.And(ref _flags, ~(int)flag);
}
/// <summary>
/// Checks whether the specified client state flag is currently set.
/// </summary>
/// <param name="flag">Flag to test.</param>
public bool HasFlag(ClientFlags flag)
{
return (Volatile.Read(ref _flags) & (int)flag) != 0;

View File

@@ -29,6 +29,9 @@ public sealed class ClientTraceInfo
/// Records a message delivery trace if tracing is enabled.
/// Go reference: server/client.go — traceMsg / TraceMsgDelivery.
/// </summary>
/// <param name="subject">Published subject that triggered this delivery path.</param>
/// <param name="destination">Destination descriptor such as a client, queue group, or route hop.</param>
/// <param name="payloadSize">Payload size in bytes used for throughput and fan-out diagnostics.</param>
public void TraceMsgDelivery(string subject, string destination, int payloadSize)
{
if (!TraceEnabled) return;
@@ -50,6 +53,8 @@ public sealed class ClientTraceInfo
/// subscriptions on the same client.
/// Go reference: server/client.go — c.echo check in deliverMsg.
/// </summary>
/// <param name="publisherClientId">Client identifier that originated the publish.</param>
/// <param name="subscriberClientId">Client identifier for the subscription currently being evaluated.</param>
public bool ShouldEcho(string publisherClientId, string subscriberClientId)
{
if (EchoEnabled) return true;
@@ -76,8 +81,23 @@ public sealed class ClientTraceInfo
public sealed record TraceRecord
{
/// <summary>
/// Gets the routed subject for the traced delivery event.
/// </summary>
public string Subject { get; init; } = string.Empty;
/// <summary>
/// Gets the resolved destination where the server sent the message.
/// </summary>
public string Destination { get; init; } = string.Empty;
/// <summary>
/// Gets the payload size in bytes for the traced message.
/// </summary>
public int PayloadSize { get; init; }
/// <summary>
/// Gets the UTC timestamp captured when the trace event was recorded.
/// </summary>
public DateTime TimestampUtc { get; init; }
}

View File

@@ -1,15 +1,48 @@
namespace NATS.Server.Configuration;
/// <summary>
/// Cluster listener and route fan-out settings used for server-to-server mesh links.
/// </summary>
public sealed class ClusterOptions
{
/// <summary>
/// Gets or sets the local cluster name advertised during route handshakes.
/// </summary>
public string? Name { get; set; }
/// <summary>
/// Gets or sets the network interface used to accept inbound route connections.
/// </summary>
public string Host { get; set; } = "0.0.0.0";
/// <summary>
/// Gets or sets the TCP port for the cluster route listener.
/// </summary>
public int Port { get; set; } = 6222;
/// <summary>
/// Gets or sets the number of parallel route connections maintained per remote server.
/// </summary>
public int PoolSize { get; set; } = 3;
/// <summary>
/// Gets or sets the configured outbound route URLs used to join peer servers.
/// </summary>
public List<string> Routes { get; set; } = [];
/// <summary>
/// Gets or sets account names that should use dedicated route handling.
/// </summary>
public List<string> Accounts { get; set; } = [];
/// <summary>
/// Gets or sets compression behavior for inter-server route traffic.
/// </summary>
public RouteCompression Compression { get; set; } = RouteCompression.None;
// Go: opts.go — cluster write_deadline
/// <summary>
/// Gets or sets the write deadline enforced for route protocol socket operations.
/// </summary>
public TimeSpan WriteDeadline { get; set; }
}

View File

@@ -19,6 +19,7 @@ public static class ConfigProcessor
/// <summary>
/// Parses a configuration file and returns the populated options.
/// </summary>
/// <param name="filePath">Absolute or relative path to the NATS configuration file to load.</param>
public static NatsOptions ProcessConfigFile(string filePath)
{
var config = NatsConfParser.ParseFile(filePath);
@@ -30,6 +31,7 @@ public static class ConfigProcessor
/// <summary>
/// Parses configuration text (not from a file) and returns the populated options.
/// </summary>
/// <param name="configText">Raw configuration text in NATS server config format.</param>
public static NatsOptions ProcessConfig(string configText)
{
var config = NatsConfParser.Parse(configText);
@@ -42,6 +44,8 @@ public static class ConfigProcessor
/// Applies a parsed configuration dictionary to existing options.
/// Throws <see cref="ConfigProcessorException"/> if any validation errors are collected.
/// </summary>
/// <param name="config">Parsed config tree keyed by top-level field names.</param>
/// <param name="opts">Options instance that receives normalized values from the parsed config.</param>
public static void ApplyConfig(Dictionary<string, object?> config, NatsOptions opts)
{
var errors = new List<string>();
@@ -423,6 +427,7 @@ public static class ConfigProcessor
/// <item>A number (long/double) treated as seconds</item>
/// </list>
/// </summary>
/// <param name="value">Raw duration token from configuration (string or numeric seconds).</param>
internal static TimeSpan ParseDuration(object? value)
{
return value switch
@@ -1877,7 +1882,14 @@ public static class ConfigProcessor
public sealed class ConfigProcessorException(string message, List<string> errors, List<string>? warnings = null)
: Exception(message)
{
/// <summary>
/// Gets the list of blocking configuration errors that prevented startup.
/// </summary>
public IReadOnlyList<string> Errors => errors;
/// <summary>
/// Gets non-fatal configuration warnings collected during processing.
/// </summary>
public IReadOnlyList<string> Warnings => warnings ?? [];
}
@@ -1887,6 +1899,9 @@ public sealed class ConfigProcessorException(string message, List<string> errors
/// </summary>
public class ConfigWarningException(string message, string? source = null) : Exception(message)
{
/// <summary>
/// Gets the location within the source config where this warning originated, when available.
/// </summary>
public string? SourceLocation { get; } = source;
}
@@ -1897,5 +1912,8 @@ public class ConfigWarningException(string message, string? source = null) : Exc
public sealed class UnknownConfigFieldWarning(string field, string? source = null)
: ConfigWarningException($"unknown field {field}", source)
{
/// <summary>
/// Gets the unknown top-level or nested field name encountered in the configuration file.
/// </summary>
public string Field { get; } = field;
}

View File

@@ -810,6 +810,11 @@ public sealed class ConfigReloadResult
/// <summary>
/// Initializes a config reload result payload.
/// </summary>
/// <param name="Unchanged">Whether reload was skipped because the config digest was unchanged.</param>
/// <param name="NewOptions">Newly parsed options candidate for applying a reload.</param>
/// <param name="NewDigest">Digest string of the candidate config content.</param>
/// <param name="Changes">Detected option differences for this reload attempt.</param>
/// <param name="Errors">Validation errors that block applying the reload.</param>
public ConfigReloadResult(
bool Unchanged,
NatsOptions? NewOptions = null,

View File

@@ -46,9 +46,28 @@ public sealed class ConfigChange(
bool isTlsChange = false,
bool isNonReloadable = false) : IConfigChange
{
/// <summary>
/// Gets the changed option name.
/// </summary>
public string Name => name;
/// <summary>
/// Gets a value indicating whether this change affects logging configuration.
/// </summary>
public bool IsLoggingChange => isLoggingChange;
/// <summary>
/// Gets a value indicating whether this change affects authentication configuration.
/// </summary>
public bool IsAuthChange => isAuthChange;
/// <summary>
/// Gets a value indicating whether this change affects TLS configuration.
/// </summary>
public bool IsTlsChange => isTlsChange;
/// <summary>
/// Gets a value indicating whether this change cannot be applied without restart.
/// </summary>
public bool IsNonReloadable => isNonReloadable;
}

View File

@@ -28,6 +28,7 @@ public static class NatsConfParser
/// <summary>
/// Parses a NATS configuration string into a dictionary.
/// </summary>
/// <param name="data">Raw configuration text.</param>
public static Dictionary<string, object?> Parse(string data)
{
var tokens = NatsConfLexer.Tokenize(data);
@@ -40,11 +41,13 @@ public static class NatsConfParser
/// Pedantic compatibility API (Go: ParseWithChecks).
/// Uses the same parser behavior as <see cref="Parse(string)"/>.
/// </summary>
/// <param name="data">Raw configuration text.</param>
public static Dictionary<string, object?> ParseWithChecks(string data) => Parse(data);
/// <summary>
/// Parses a NATS configuration file into a dictionary.
/// </summary>
/// <param name="filePath">Path to the configuration file.</param>
public static Dictionary<string, object?> ParseFile(string filePath) =>
ParseFile(filePath, includeDepth: 0);
@@ -52,6 +55,7 @@ public static class NatsConfParser
/// Pedantic compatibility API (Go: ParseFileWithChecks).
/// Uses the same parser behavior as <see cref="ParseFile(string)"/>.
/// </summary>
/// <param name="filePath">Path to the configuration file.</param>
public static Dictionary<string, object?> ParseFileWithChecks(string filePath) => ParseFile(filePath);
private static Dictionary<string, object?> ParseFile(string filePath, int includeDepth)
@@ -68,6 +72,7 @@ public static class NatsConfParser
/// Parses a NATS configuration file and returns the parsed config plus a
/// SHA-256 digest of the raw file content formatted as "sha256:&lt;hex&gt;".
/// </summary>
/// <param name="filePath">Path to the configuration file.</param>
public static (Dictionary<string, object?> Config, string Digest) ParseFileWithDigest(string filePath)
{
var rawBytes = File.ReadAllBytes(filePath);
@@ -85,6 +90,7 @@ public static class NatsConfParser
/// <summary>
/// Pedantic compatibility API (Go: ParseFileWithChecksDigest).
/// </summary>
/// <param name="filePath">Path to the configuration file.</param>
public static (Dictionary<string, object?> Config, string Digest) ParseFileWithChecksDigest(string filePath)
{
var data = File.ReadAllText(filePath);
@@ -204,13 +210,26 @@ public static class NatsConfParser
// Pedantic-mode key token stack (Go parser field: ikeys).
private readonly List<Token> _itemKeys = new(4);
/// <summary>Root parsed mapping for the current parser execution.</summary>
public Dictionary<string, object?> Mapping { get; } = new(StringComparer.OrdinalIgnoreCase);
/// <summary>
/// Creates parser state for tokenized config input.
/// </summary>
/// <param name="tokens">Token stream from the config lexer.</param>
/// <param name="baseDir">Base directory used to resolve include paths.</param>
public ParserState(IReadOnlyList<Token> tokens, string baseDir)
: this(tokens, baseDir, [], includeDepth: 0)
{
}
/// <summary>
/// Creates parser state with explicit env-reference tracking and include depth.
/// </summary>
/// <param name="tokens">Token stream from the config lexer.</param>
/// <param name="baseDir">Base directory used to resolve include paths.</param>
/// <param name="envVarReferences">Shared environment-variable recursion guard set.</param>
/// <param name="includeDepth">Current include nesting depth.</param>
public ParserState(IReadOnlyList<Token> tokens, string baseDir, HashSet<string> envVarReferences, int includeDepth)
{
_tokens = tokens;
@@ -219,6 +238,9 @@ public static class NatsConfParser
_includeDepth = includeDepth;
}
/// <summary>
/// Executes the parse loop and builds <see cref="Mapping"/>.
/// </summary>
public void Run()
{
PushContext(Mapping);

View File

@@ -36,6 +36,13 @@ public sealed class PedanticToken
private readonly bool _usedVariable;
private readonly string _sourceFile;
/// <summary>
/// Creates a parser token wrapper that preserves resolved value and source metadata.
/// </summary>
/// <param name="item">Raw lexer token captured from the configuration source.</param>
/// <param name="value">Optional parsed value override when token text has been normalized.</param>
/// <param name="usedVariable">Indicates whether this token originated from variable substitution.</param>
/// <param name="sourceFile">Source file path associated with this token, when available.</param>
public PedanticToken(Token item, object? value = null, bool usedVariable = false, string sourceFile = "")
{
_item = item;
@@ -44,15 +51,33 @@ public sealed class PedanticToken
_sourceFile = sourceFile ?? string.Empty;
}
/// <summary>
/// Serializes the token value into JSON, matching Go parser diagnostics formatting.
/// </summary>
public string MarshalJson() => JsonSerializer.Serialize(Value());
/// <summary>
/// Returns the resolved token value, or raw token text when no typed value is stored.
/// </summary>
public object? Value() => _value ?? _item.Value;
/// <summary>
/// Returns the 1-based source line where the token was parsed.
/// </summary>
public int Line() => _item.Line;
/// <summary>
/// Returns whether variable interpolation contributed to this token.
/// </summary>
public bool IsUsedVariable() => _usedVariable;
/// <summary>
/// Returns the source file path associated with this token.
/// </summary>
public string SourceFile() => _sourceFile;
/// <summary>
/// Returns the 1-based character position of the token on its source line.
/// </summary>
public int Position() => _item.Position;
}

View File

@@ -77,6 +77,8 @@ public static class EventCompressor
/// <summary>
/// Compresses <paramref name="payload"/> using the requested <paramref name="compression"/>.
/// </summary>
/// <param name="payload">Uncompressed event payload bytes.</param>
/// <param name="compression">Compression algorithm to apply for transport.</param>
public static byte[] Compress(ReadOnlySpan<byte> payload, EventCompressionType compression)
{
if (payload.IsEmpty)
@@ -104,6 +106,8 @@ public static class EventCompressor
/// <summary>
/// Decompresses <paramref name="compressed"/> using the selected <paramref name="compression"/>.
/// </summary>
/// <param name="compressed">Compressed event payload bytes.</param>
/// <param name="compression">Encoding that was used when the payload was produced.</param>
public static byte[] Decompress(ReadOnlySpan<byte> compressed, EventCompressionType compression)
{
if (compressed.IsEmpty)
@@ -150,6 +154,9 @@ public static class EventCompressor
/// <summary>
/// Compresses using <paramref name="compression"/> when payload size exceeds threshold.
/// </summary>
/// <param name="payload">Raw event payload that may be compressed.</param>
/// <param name="compression">Preferred compression algorithm for eligible payloads.</param>
/// <param name="thresholdBytes">Minimum payload size required before compression is attempted.</param>
public static (byte[] Data, bool Compressed) CompressIfBeneficial(
ReadOnlySpan<byte> payload,
EventCompressionType compression,
@@ -189,6 +196,7 @@ public static class EventCompressor
/// Parses an HTTP Accept-Encoding value into a supported compression type.
/// Go reference: events.go getAcceptEncoding().
/// </summary>
/// <param name="acceptEncoding">Raw HTTP <c>Accept-Encoding</c> header value from the client.</param>
public static EventCompressionType GetAcceptEncoding(string? acceptEncoding)
{
if (string.IsNullOrWhiteSpace(acceptEncoding))

View File

@@ -74,6 +74,13 @@ public static class EventSubjects
/// Callback signature for system message handlers.
/// Maps to Go's sysMsgHandler type in events.go:109.
/// </summary>
/// <param name="sub">Subscription metadata that matched the incoming system message.</param>
/// <param name="client">Client connection context that delivered the message, when available.</param>
/// <param name="account">Owning account context for account-scoped system events.</param>
/// <param name="subject">System subject that triggered this callback.</param>
/// <param name="reply">Reply inbox subject for request/reply system handlers.</param>
/// <param name="headers">Optional message headers encoded by the publisher.</param>
/// <param name="message">Raw system advisory or request payload bytes.</param>
public delegate void SystemMessageHandler(
Subscription? sub,
INatsClient? client,

View File

@@ -45,6 +45,8 @@ public static class GatewayCommands
/// Wire format: GS+ {account} {subject}\r\n
/// Go reference: gateway.go — sendGatewaySubsToGateway, RS+ propagation.
/// </summary>
/// <param name="account">Origin account used for gateway interest tracking.</param>
/// <param name="subject">Subject pattern being subscribed across clusters.</param>
public static byte[] FormatSub(string account, string subject)
=> Encoding.UTF8.GetBytes($"GS+ {account} {subject}\r\n");
@@ -53,6 +55,8 @@ public static class GatewayCommands
/// Wire format: GS- {account} {subject}\r\n
/// Go reference: gateway.go — sendGatewayUnsubToGateway, RS- propagation.
/// </summary>
/// <param name="account">Origin account used for gateway interest tracking.</param>
/// <param name="subject">Subject pattern being removed from remote interest state.</param>
public static byte[] FormatUnsub(string account, string subject)
=> Encoding.UTF8.GetBytes($"GS- {account} {subject}\r\n");
@@ -62,6 +66,8 @@ public static class GatewayCommands
/// Mode: "O" for Optimistic (send everything), "I" for Interest-only.
/// Go reference: gateway.go — switchAccountToInterestMode, GMODE command.
/// </summary>
/// <param name="account">Account whose cross-cluster routing mode is being updated.</param>
/// <param name="mode">Target gateway interest mode for that account.</param>
public static byte[] FormatMode(string account, GatewayInterestMode mode)
{
var modeStr = mode == GatewayInterestMode.InterestOnly ? "I" : "O";
@@ -73,6 +79,7 @@ public static class GatewayCommands
/// Returns null if the command prefix is unrecognized.
/// Go reference: gateway.go — processGatewayMsg command dispatch.
/// </summary>
/// <param name="line">Raw protocol line prefix read from a gateway connection.</param>
public static GatewayCommandType? ParseCommandType(ReadOnlySpan<byte> line)
{
if (line.StartsWith(InfoPrefix)) return GatewayCommandType.Info;

View File

@@ -42,6 +42,10 @@ public sealed class GatewayInterestTracker
// Per-account state: mode + no-interest set (Optimistic) or positive interest set (InterestOnly)
private readonly ConcurrentDictionary<string, AccountState> _accounts = new(StringComparer.Ordinal);
/// <summary>
/// Creates a gateway interest tracker with a configurable mode-switch threshold.
/// </summary>
/// <param name="noInterestThreshold">No-interest entry count that triggers InterestOnly mode.</param>
public GatewayInterestTracker(int noInterestThreshold = DefaultNoInterestThreshold)
{
_noInterestThreshold = noInterestThreshold;
@@ -51,6 +55,7 @@ public sealed class GatewayInterestTracker
/// Returns the current interest mode for the given account.
/// Accounts default to Optimistic until the no-interest threshold is exceeded.
/// </summary>
/// <param name="account">Account name/identifier.</param>
public GatewayInterestMode GetMode(string account)
=> _accounts.TryGetValue(account, out var state) ? state.Mode : GatewayInterestMode.Optimistic;
@@ -58,6 +63,8 @@ public sealed class GatewayInterestTracker
/// Track a positive interest (RS+ received from remote) for an account/subject.
/// Go: gateway.go:1540 (processGatewayAccountSub — adds to interest set)
/// </summary>
/// <param name="account">Account name/identifier.</param>
/// <param name="subject">Subject or pattern with positive remote interest.</param>
public void TrackInterest(string account, string subject)
{
var state = GetOrCreateState(account);
@@ -83,6 +90,8 @@ public sealed class GatewayInterestTracker
/// When the no-interest set crosses the threshold, switches to InterestOnly mode.
/// Go: gateway.go:1560 (processGatewayAccountUnsub — tracks no-interest, triggers switch)
/// </summary>
/// <param name="account">Account name/identifier.</param>
/// <param name="subject">Subject or pattern that should be treated as no-interest.</param>
public void TrackNoInterest(string account, string subject)
{
var state = GetOrCreateState(account);
@@ -110,6 +119,8 @@ public sealed class GatewayInterestTracker
/// for the given account and subject.
/// Go: gateway.go:2900 (shouldForwardMsg — checks mode and interest)
/// </summary>
/// <param name="account">Account name/identifier.</param>
/// <param name="subject">Subject being considered for forwarding.</param>
public bool ShouldForward(string account, string subject)
{
if (!_accounts.TryGetValue(account, out var state))
@@ -141,6 +152,7 @@ public sealed class GatewayInterestTracker
/// Called when the remote signals it is in interest-only mode.
/// Go: gateway.go:1500 (switchToInterestOnlyMode)
/// </summary>
/// <param name="account">Account name/identifier.</param>
public void SwitchToInterestOnly(string account)
{
var state = GetOrCreateState(account);
@@ -179,6 +191,7 @@ public sealed class GatewayInterestTracker
/// <summary>Per-account mutable state. All access must be under the instance lock.</summary>
private sealed class AccountState
{
/// <summary>Current forwarding mode for this account.</summary>
public GatewayInterestMode Mode { get; set; } = GatewayInterestMode.Optimistic;
/// <summary>Subjects with no remote interest (used in Optimistic mode).</summary>

View File

@@ -5,18 +5,45 @@ namespace NATS.Server;
public interface INatsClient
{
/// <summary>Unique server-assigned client identifier.</summary>
ulong Id { get; }
/// <summary>Client kind (client, route, gateway, leaf, system, etc.).</summary>
ClientKind Kind { get; }
/// <summary>Whether this client is server-internal and not socket-backed.</summary>
bool IsInternal => Kind.IsInternal();
/// <summary>Account context associated with this client.</summary>
Account? Account { get; }
/// <summary>Parsed CONNECT options for this client when available.</summary>
ClientOptions? ClientOpts { get; }
/// <summary>Resolved publish/subscribe permissions for this client.</summary>
ClientPermissions? Permissions { get; }
/// <summary>
/// Sends a protocol message to a subscription with immediate flush semantics.
/// </summary>
/// <param name="subject">Delivery subject sent to the client.</param>
/// <param name="sid">Subscription identifier receiving the message.</param>
/// <param name="replyTo">Optional reply subject for request-reply flows.</param>
/// <param name="headers">Serialized NATS headers payload.</param>
/// <param name="payload">Message payload bytes.</param>
void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload);
/// <summary>
/// Sends a protocol message without forcing an immediate flush.
/// </summary>
/// <param name="subject">Delivery subject sent to the client.</param>
/// <param name="sid">Subscription identifier receiving the message.</param>
/// <param name="replyTo">Optional reply subject for request-reply flows.</param>
/// <param name="headers">Serialized NATS headers payload.</param>
/// <param name="payload">Message payload bytes.</param>
void SendMessageNoFlush(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload);
/// <summary>Signals that queued outbound bytes should be flushed.</summary>
void SignalFlush();
/// <summary>Queues outbound protocol bytes for asynchronous write-loop transmission.</summary>
/// <param name="data">Serialized protocol bytes to queue.</param>
bool QueueOutbound(ReadOnlyMemory<byte> data);
/// <summary>Removes a subscription by subscription identifier.</summary>
/// <param name="sid">Subscription identifier to remove.</param>
void RemoveSubscription(string sid);
}

View File

@@ -23,8 +23,11 @@ public sealed class OutboundBufferPool
private long _returnCount;
private long _broadcastCount;
/// <summary>Total buffer rent operations served by the pool.</summary>
public long RentCount => Interlocked.Read(ref _rentCount);
/// <summary>Total buffer return operations accepted by the pool.</summary>
public long ReturnCount => Interlocked.Read(ref _returnCount);
/// <summary>Total broadcast-drain operations performed.</summary>
public long BroadcastCount => Interlocked.Read(ref _broadcastCount);
// -----------------------------------------------------------------------
@@ -36,6 +39,7 @@ public sealed class OutboundBufferPool
/// <paramref name="size"/> bytes. Tries the internal pool first; falls back to
/// <see cref="MemoryPool{T}.Shared"/>.
/// </summary>
/// <param name="size">Minimum required buffer size.</param>
public IMemoryOwner<byte> Rent(int size)
{
Interlocked.Increment(ref _rentCount);
@@ -70,6 +74,7 @@ public sealed class OutboundBufferPool
/// <paramref name="size"/> bytes. The caller is responsible for calling
/// <see cref="ReturnBuffer"/> when finished.
/// </summary>
/// <param name="size">Minimum required buffer size.</param>
public byte[] RentBuffer(int size)
{
Interlocked.Increment(ref _rentCount);
@@ -94,6 +99,7 @@ public sealed class OutboundBufferPool
/// Returns <paramref name="buffer"/> to the appropriate tier so it can be
/// reused by a subsequent <see cref="RentBuffer"/> call.
/// </summary>
/// <param name="buffer">Buffer previously rented from this pool.</param>
public void ReturnBuffer(byte[] buffer)
{
Interlocked.Increment(ref _returnCount);
@@ -128,6 +134,8 @@ public sealed class OutboundBufferPool
///
/// Go reference: client.go — broadcast flush coalescing for fan-out.
/// </summary>
/// <param name="pendingWrites">Pending write segments to coalesce.</param>
/// <param name="destination">Destination buffer receiving the concatenated payloads.</param>
public int BroadcastDrain(IReadOnlyList<ReadOnlyMemory<byte>> pendingWrites, byte[] destination)
{
var offset = 0;
@@ -144,6 +152,7 @@ public sealed class OutboundBufferPool
/// Returns the total number of bytes needed to coalesce all
/// <paramref name="pendingWrites"/> into a single buffer.
/// </summary>
/// <param name="pendingWrites">Pending write segments to size.</param>
public static int CalculateBroadcastSize(IReadOnlyList<ReadOnlyMemory<byte>> pendingWrites)
{
var total = 0;
@@ -164,15 +173,22 @@ public sealed class OutboundBufferPool
private readonly ConcurrentBag<byte[]> _pool;
private byte[]? _buffer;
/// <summary>
/// Creates a pooled memory owner backed by a reusable byte array.
/// </summary>
/// <param name="buffer">Rented backing buffer.</param>
/// <param name="pool">Pool to return the buffer to on disposal.</param>
public PooledMemoryOwner(byte[] buffer, ConcurrentBag<byte[]> pool)
{
_buffer = buffer;
_pool = pool;
}
/// <summary>Memory view over the currently owned buffer.</summary>
public Memory<byte> Memory =>
_buffer is { } b ? b.AsMemory() : Memory<byte>.Empty;
/// <summary>Returns the owned buffer to the originating pool.</summary>
public void Dispose()
{
if (Interlocked.Exchange(ref _buffer, null) is { } b)

View File

@@ -4,11 +4,30 @@ namespace NATS.Server.Imports;
public sealed class ExportAuth
{
/// <summary>
/// Gets a value indicating whether importers must present a token for access.
/// </summary>
public bool TokenRequired { get; init; }
/// <summary>
/// Gets the account-token subject position used for legacy tokenized export patterns.
/// </summary>
public uint AccountPosition { get; init; }
/// <summary>
/// Gets explicit account names permitted to import this export.
/// </summary>
public HashSet<string>? ApprovedAccounts { get; init; }
/// <summary>
/// Gets accounts revoked from import access, mapped to revocation timestamps.
/// </summary>
public Dictionary<string, long>? RevokedAccounts { get; init; }
/// <summary>
/// Determines whether the specified account is currently authorized for this export.
/// </summary>
/// <param name="account">Importing account requesting access.</param>
public bool IsAuthorized(Account account)
{
if (RevokedAccounts != null && RevokedAccounts.ContainsKey(account.Name))

View File

@@ -2,7 +2,18 @@ namespace NATS.Server.Imports;
public sealed class ExportMap
{
/// <summary>
/// Gets stream exports keyed by exported subject.
/// </summary>
public Dictionary<string, StreamExport> Streams { get; } = new(StringComparer.Ordinal);
/// <summary>
/// Gets service exports keyed by exported subject.
/// </summary>
public Dictionary<string, ServiceExport> Services { get; } = new(StringComparer.Ordinal);
/// <summary>
/// Gets temporary response imports keyed by generated reply prefix.
/// </summary>
public Dictionary<string, ServiceImport> Responses { get; } = new(StringComparer.Ordinal);
}

View File

@@ -4,9 +4,20 @@ namespace NATS.Server.Imports;
public sealed class ImportMap
{
/// <summary>
/// Gets stream import definitions configured for the account.
/// </summary>
public List<StreamImport> Streams { get; } = [];
/// <summary>
/// Gets service import definitions grouped by source subject.
/// </summary>
public Dictionary<string, List<ServiceImport>> Services { get; } = new(StringComparer.Ordinal);
/// <summary>
/// Adds a service import under its source subject key.
/// </summary>
/// <param name="si">Service import definition to add.</param>
public void AddServiceImport(ServiceImport si)
{
if (!Services.TryGetValue(si.From, out var list))

View File

@@ -2,29 +2,57 @@ using System.Text.Json.Serialization;
namespace NATS.Server.Imports;
/// <summary>
/// Serialized payload published for exported-service latency advisories.
/// </summary>
public sealed class ServiceLatencyMsg
{
/// <summary>
/// Gets or sets the schema identifier used by consumers to decode this metric event.
/// </summary>
[JsonPropertyName("type")]
public string Type { get; set; } = "io.nats.server.metric.v1.service_latency";
/// <summary>
/// Gets or sets the account or identity that initiated the service request.
/// </summary>
[JsonPropertyName("requestor")]
public string Requestor { get; set; } = string.Empty;
/// <summary>
/// Gets or sets the service identity that responded to the request.
/// </summary>
[JsonPropertyName("responder")]
public string Responder { get; set; } = string.Empty;
/// <summary>
/// Gets or sets the service response status code reported in the advisory.
/// </summary>
[JsonPropertyName("status")]
public int Status { get; set; } = 200;
/// <summary>
/// Gets or sets service execution latency in nanoseconds on the responder side.
/// </summary>
[JsonPropertyName("svc_latency")]
public long ServiceLatencyNanos { get; set; }
/// <summary>
/// Gets or sets end-to-end request latency in nanoseconds from requestor to response.
/// </summary>
[JsonPropertyName("total_latency")]
public long TotalLatencyNanos { get; set; }
}
/// <summary>
/// Sampling and payload helpers for service import latency metrics.
/// </summary>
public static class LatencyTracker
{
/// <summary>
/// Determines whether a request should emit latency telemetry based on configured sampling.
/// </summary>
/// <param name="latency">Service latency sampling configuration from the export definition.</param>
public static bool ShouldSample(ServiceLatency latency)
{
if (latency.SamplingPercentage <= 0) return false;
@@ -32,6 +60,13 @@ public static class LatencyTracker
return Random.Shared.Next(100) < latency.SamplingPercentage;
}
/// <summary>
/// Builds a service latency advisory payload from measured service and end-to-end durations.
/// </summary>
/// <param name="requestor">Identity of the requesting account or client.</param>
/// <param name="responder">Identity of the exported service that handled the request.</param>
/// <param name="serviceLatency">Time spent processing the request by the service itself.</param>
/// <param name="totalLatency">Full request round-trip latency as observed by the server.</param>
public static ServiceLatencyMsg BuildLatencyMsg(
string requestor, string responder,
TimeSpan serviceLatency, TimeSpan totalLatency)

View File

@@ -30,6 +30,9 @@ public static class ResponseRouter
/// Creates a response service import that maps the generated reply prefix
/// back to the original reply subject on the requesting account.
/// </summary>
/// <param name="exporterAccount">Exporter account that stores temporary response imports.</param>
/// <param name="originalImport">Original service import that triggered the response path.</param>
/// <param name="originalReply">Original reply subject to route responder messages back to.</param>
public static ServiceImport CreateResponseImport(
Account exporterAccount,
ServiceImport originalImport,
@@ -57,6 +60,9 @@ public static class ResponseRouter
/// For Singleton responses, this is called after the first reply is delivered.
/// For Streamed/Chunked, it is called when the response stream ends.
/// </summary>
/// <param name="account">Account that owns the temporary response import map.</param>
/// <param name="replyPrefix">Generated reply prefix key to remove.</param>
/// <param name="responseSi">Response service import being cleaned up.</param>
public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi)
{
account.Exports.Responses.Remove(replyPrefix);

View File

@@ -4,10 +4,33 @@ namespace NATS.Server.Imports;
public sealed class ServiceExport
{
/// <summary>
/// Gets authorization rules controlling which accounts may import this service.
/// </summary>
public ExportAuth Auth { get; init; } = new();
/// <summary>
/// Gets the exporting account that owns this service definition.
/// </summary>
public Account? Account { get; init; }
/// <summary>
/// Gets the response mode expected from service responders (singleton or streamed).
/// </summary>
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
/// <summary>
/// Gets the threshold used for service latency advisories and slow-response tracking.
/// </summary>
public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2);
/// <summary>
/// Gets optional service latency sampling configuration for exported service calls.
/// </summary>
public ServiceLatency? Latency { get; init; }
/// <summary>
/// Gets a value indicating whether distributed tracing headers are allowed for this service.
/// </summary>
public bool AllowTrace { get; init; }
}

View File

@@ -5,17 +5,30 @@ namespace NATS.Server.Imports;
public sealed class ServiceImport
{
/// <summary>Account that receives requests after the service import mapping is applied.</summary>
public required Account DestinationAccount { get; init; }
/// <summary>Source subject exposed to the importing account.</summary>
public required string From { get; init; }
/// <summary>Destination subject routed to the exporting account/service.</summary>
public required string To { get; init; }
/// <summary>Optional subject transform applied when forwarding imported requests.</summary>
public SubjectTransform? Transform { get; init; }
/// <summary>Export definition backing this import relationship.</summary>
public ServiceExport? Export { get; init; }
/// <summary>Response behavior for imported service replies (singleton/stream/chunked).</summary>
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
/// <summary>Subscription identifier bytes used for internal routing bookkeeping.</summary>
public byte[]? Sid { get; set; }
/// <summary>Whether this import currently represents a generated response mapping.</summary>
public bool IsResponse { get; init; }
/// <summary>Whether forwarding should use PUB semantics instead of request/reply.</summary>
public bool UsePub { get; init; }
/// <summary>Whether the import definition has been invalidated.</summary>
public bool Invalid { get; set; }
/// <summary>Whether this import can be shared across accounts/connections.</summary>
public bool Share { get; init; }
/// <summary>Whether service latency/tracking metrics are enabled for this import.</summary>
public bool Tracking { get; init; }
/// <summary>Last update timestamp stored as UTC ticks.</summary>
public long TimestampTicks { get; set; }
}

View File

@@ -5,10 +5,33 @@ namespace NATS.Server.Imports;
public sealed class StreamImport
{
/// <summary>
/// Gets the exporting account that owns the imported stream subjects.
/// </summary>
public required Account SourceAccount { get; init; }
/// <summary>
/// Gets the source subject pattern in the exporting account.
/// </summary>
public required string From { get; init; }
/// <summary>
/// Gets the destination subject pattern mapped into the importing account.
/// </summary>
public required string To { get; init; }
/// <summary>
/// Gets the optional transform applied while remapping imported stream subjects.
/// </summary>
public SubjectTransform? Transform { get; init; }
/// <summary>
/// Gets a value indicating whether the import is restricted to publish operations.
/// </summary>
public bool UsePub { get; init; }
/// <summary>
/// Gets or sets a value indicating whether this import has been marked invalid by validation logic.
/// </summary>
public bool Invalid { get; set; }
}

View File

@@ -182,11 +182,17 @@ internal sealed class Leaf<T> : INode
=> Parts.MatchPartsAgainstFragment(parts, Suffix);
// These should not be called on a leaf.
/// <inheritdoc />
public void SetPrefix(ReadOnlySpan<byte> pre) => throw new InvalidOperationException("setPrefix called on leaf");
/// <inheritdoc />
public void AddChild(byte c, INode n) => throw new InvalidOperationException("addChild called on leaf");
/// <inheritdoc />
public ChildRef? FindChild(byte c) => throw new InvalidOperationException("findChild called on leaf");
/// <inheritdoc />
public INode Grow() => throw new InvalidOperationException("grow called on leaf");
/// <inheritdoc />
public void DeleteChild(byte c) => throw new InvalidOperationException("deleteChild called on leaf");
/// <inheritdoc />
public INode? Shrink() => throw new InvalidOperationException("shrink called on leaf");
}

View File

@@ -20,6 +20,8 @@ internal static class Parts
/// Returns the pivot byte at the given position, or NoPivot if past end.
/// Go reference: server/stree/util.go:pivot
/// </summary>
/// <param name="subject">Subject bytes being inspected in the ART path.</param>
/// <param name="pos">Zero-based byte position to read from <paramref name="subject"/>.</param>
internal static byte Pivot(ReadOnlySpan<byte> subject, int pos)
{
if (pos >= subject.Length) return NoPivot;
@@ -30,6 +32,8 @@ internal static class Parts
/// Returns the length of the common prefix between two byte spans.
/// Go reference: server/stree/util.go:commonPrefixLen
/// </summary>
/// <param name="s1">First subject fragment.</param>
/// <param name="s2">Second subject fragment.</param>
internal static int CommonPrefixLen(ReadOnlySpan<byte> s1, ReadOnlySpan<byte> s2)
{
var limit = Math.Min(s1.Length, s2.Length);
@@ -44,6 +48,7 @@ internal static class Parts
/// <summary>
/// Copy bytes helper.
/// </summary>
/// <param name="src">Source bytes to clone into a managed array.</param>
internal static byte[] CopyBytes(ReadOnlySpan<byte> src)
{
if (src.Length == 0) return [];
@@ -54,6 +59,7 @@ internal static class Parts
/// Break a filter subject into parts based on wildcards (pwc '*' and fwc '>').
/// Go reference: server/stree/parts.go:genParts
/// </summary>
/// <param name="filter">Subscription filter subject that may include <c>*</c> or <c>&gt;</c> wildcards.</param>
internal static ReadOnlyMemory<byte>[] GenParts(ReadOnlySpan<byte> filter)
{
var parts = new List<ReadOnlyMemory<byte>>();
@@ -142,6 +148,8 @@ internal static class Parts
/// Match parts against a fragment (prefix for nodes or suffix for leaves).
/// Go reference: server/stree/parts.go:matchParts
/// </summary>
/// <param name="parts">Pre-tokenized wildcard and literal parts generated from a subject filter.</param>
/// <param name="frag">Current subject fragment being matched within the ART traversal.</param>
internal static (ReadOnlyMemory<byte>[] RemainingParts, bool Matched) MatchPartsAgainstFragment(
ReadOnlyMemory<byte>[] parts, ReadOnlySpan<byte> frag)
{

View File

@@ -11,6 +11,10 @@ public sealed class AdvisoryPublisher
private readonly Action<string, object> _publishAction;
private long _publishCount;
/// <summary>
/// Creates an advisory publisher that emits advisory payloads through the provided callback.
/// </summary>
/// <param name="publishAction">Callback that publishes advisory objects to subjects.</param>
public AdvisoryPublisher(Action<string, object> publishAction)
{
_publishAction = publishAction;
@@ -25,6 +29,8 @@ public sealed class AdvisoryPublisher
/// Publishes a stream created advisory.
/// Go reference: jetstream_api.go — advisory on stream creation.
/// </summary>
/// <param name="streamName">Name of the created stream.</param>
/// <param name="detail">Optional stream-specific detail payload.</param>
public void StreamCreated(string streamName, object? detail = null)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryStreamCreated, streamName);
@@ -41,6 +47,7 @@ public sealed class AdvisoryPublisher
/// Publishes a stream deleted advisory.
/// Go reference: jetstream_api.go — advisory on stream deletion.
/// </summary>
/// <param name="streamName">Name of the deleted stream.</param>
public void StreamDeleted(string streamName)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryStreamDeleted, streamName);
@@ -56,6 +63,8 @@ public sealed class AdvisoryPublisher
/// Publishes a stream updated advisory.
/// Go reference: jetstream_api.go — advisory on stream config update.
/// </summary>
/// <param name="streamName">Name of the updated stream.</param>
/// <param name="detail">Optional update detail payload.</param>
public void StreamUpdated(string streamName, object? detail = null)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryStreamUpdated, streamName);
@@ -72,6 +81,8 @@ public sealed class AdvisoryPublisher
/// Publishes a consumer created advisory.
/// Go reference: jetstream_api.go — advisory on consumer creation.
/// </summary>
/// <param name="streamName">Parent stream name.</param>
/// <param name="consumerName">Created consumer name.</param>
public void ConsumerCreated(string streamName, string consumerName)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryConsumerCreated, streamName, consumerName);
@@ -88,6 +99,8 @@ public sealed class AdvisoryPublisher
/// Publishes a consumer deleted advisory.
/// Go reference: jetstream_api.go — advisory on consumer deletion.
/// </summary>
/// <param name="streamName">Parent stream name.</param>
/// <param name="consumerName">Deleted consumer name.</param>
public void ConsumerDeleted(string streamName, string consumerName)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryConsumerDeleted, streamName, consumerName);

View File

@@ -15,6 +15,11 @@ public sealed class ApiRateLimiter : IDisposable
private readonly TimeSpan _dedupTtl;
private readonly int _maxConcurrent;
/// <summary>
/// Creates a JetStream API limiter for request concurrency and short-window deduplication.
/// </summary>
/// <param name="maxConcurrent">Maximum concurrent API handlers allowed before rejecting new requests.</param>
/// <param name="dedupTtl">TTL window for request-id dedup cache entries.</param>
public ApiRateLimiter(int maxConcurrent = 256, TimeSpan? dedupTtl = null)
{
_maxConcurrent = maxConcurrent;
@@ -33,6 +38,7 @@ public sealed class ApiRateLimiter : IDisposable
/// Go reference: jetstream_api.go — non-blocking semaphore acquire; request is rejected
/// immediately if no slots are available rather than queuing indefinitely.
/// </summary>
/// <param name="ct">Cancellation token for the slot acquisition attempt.</param>
public async Task<bool> TryAcquireAsync(CancellationToken ct = default)
{
return await _semaphore.WaitAsync(0, ct);
@@ -51,6 +57,7 @@ public sealed class ApiRateLimiter : IDisposable
/// Returns the cached response if found and not expired, null otherwise.
/// Go reference: jetstream_api.go — dedup cache is keyed by Nats-Msg-Id header value.
/// </summary>
/// <param name="requestId">Message-id dedup key from the request.</param>
public JetStreamApiResponse? GetCachedResponse(string? requestId)
{
if (string.IsNullOrEmpty(requestId))
@@ -73,6 +80,8 @@ public sealed class ApiRateLimiter : IDisposable
/// Go reference: jetstream_api.go — response is stored with a timestamp so that
/// subsequent requests with the same Nats-Msg-Id within the TTL window get the same result.
/// </summary>
/// <param name="requestId">Message-id dedup key from the request.</param>
/// <param name="response">Response payload to reuse for duplicate requests within the dedup window.</param>
public void CacheResponse(string? requestId, JetStreamApiResponse response)
{
if (string.IsNullOrEmpty(requestId))
@@ -99,6 +108,9 @@ public sealed class ApiRateLimiter : IDisposable
return removed;
}
/// <summary>
/// Releases semaphore resources held by the rate limiter.
/// </summary>
public void Dispose()
{
_semaphore.Dispose();

View File

@@ -18,6 +18,10 @@ public sealed class ClusteredRequestProcessor
private readonly TimeSpan _timeout;
private int _pendingCount;
/// <summary>
/// Creates a clustered request processor with a configurable wait timeout.
/// </summary>
/// <param name="timeout">Optional timeout for waiting on RAFT apply callbacks per request.</param>
public ClusteredRequestProcessor(TimeSpan? timeout = null)
{
_timeout = timeout ?? DefaultTimeout;
@@ -47,6 +51,8 @@ public sealed class ClusteredRequestProcessor
/// Go reference: jetstream_cluster.go:7620 — the goroutine waits on a per-request channel
/// with a context deadline derived from the cluster's JSApiTimeout option.
/// </summary>
/// <param name="requestId">Correlation id returned by <see cref="RegisterPending"/>.</param>
/// <param name="ct">Cancellation token for caller-initiated cancellation.</param>
public async Task<JetStreamApiResponse> WaitForResultAsync(string requestId, CancellationToken ct = default)
{
if (!_pending.TryGetValue(requestId, out var tcs))
@@ -76,6 +82,8 @@ public sealed class ClusteredRequestProcessor
/// Go reference: jetstream_cluster.go:7620 — the RAFT apply callback resolves the pending
/// request channel so the waiting goroutine can return the response to the caller.
/// </summary>
/// <param name="requestId">Pending request correlation id.</param>
/// <param name="response">Response generated after RAFT proposal application.</param>
public bool DeliverResult(string requestId, JetStreamApiResponse response)
{
if (!_pending.TryRemove(requestId, out var tcs))
@@ -91,6 +99,7 @@ public sealed class ClusteredRequestProcessor
/// Go reference: jetstream_cluster.go — when RAFT leadership changes, all in-flight
/// proposals must be failed with a "not leader" or "cancelled" error.
/// </summary>
/// <param name="reason">Reason string returned to callers in the synthesized 503 response.</param>
public void CancelAll(string reason = "leadership changed")
{
foreach (var (key, tcs) in _pending)

View File

@@ -2,9 +2,16 @@ namespace NATS.Server.JetStream.Api.Handlers;
public static class AccountControlApiHandlers
{
/// <summary>
/// Handles account-wide server-removal control requests.
/// </summary>
public static JetStreamApiResponse HandleServerRemove()
=> JetStreamApiResponse.SuccessResponse();
/// <summary>
/// Handles account purge requests routed via API subject.
/// </summary>
/// <param name="subject">API subject containing account purge target.</param>
public static JetStreamApiResponse HandleAccountPurge(string subject)
{
if (!subject.StartsWith(JetStreamApiSubjects.AccountPurge, StringComparison.Ordinal))
@@ -14,6 +21,10 @@ public static class AccountControlApiHandlers
return account.Length == 0 ? JetStreamApiResponse.NotFound(subject) : JetStreamApiResponse.SuccessResponse();
}
/// <summary>
/// Handles account stream-move requests routed via API subject.
/// </summary>
/// <param name="subject">API subject containing account move target.</param>
public static JetStreamApiResponse HandleAccountStreamMove(string subject)
{
if (!subject.StartsWith(JetStreamApiSubjects.AccountStreamMove, StringComparison.Ordinal))
@@ -23,6 +34,10 @@ public static class AccountControlApiHandlers
return account.Length == 0 ? JetStreamApiResponse.NotFound(subject) : JetStreamApiResponse.SuccessResponse();
}
/// <summary>
/// Handles account stream-move cancellation requests routed via API subject.
/// </summary>
/// <param name="subject">API subject containing account move-cancel target.</param>
public static JetStreamApiResponse HandleAccountStreamMoveCancel(string subject)
{
if (!subject.StartsWith(JetStreamApiSubjects.AccountStreamMoveCancel, StringComparison.Ordinal))

View File

@@ -2,12 +2,21 @@ namespace NATS.Server.JetStream.Api.Handlers;
public static class ClusterControlApiHandlers
{
/// <summary>
/// Handles meta-group leader stepdown requests.
/// </summary>
/// <param name="meta">JetStream meta group receiving the stepdown signal.</param>
public static JetStreamApiResponse HandleMetaLeaderStepdown(JetStream.Cluster.JetStreamMetaGroup meta)
{
meta.StepDown();
return JetStreamApiResponse.SuccessResponse();
}
/// <summary>
/// Handles stream leader stepdown requests routed via API subject.
/// </summary>
/// <param name="subject">API subject containing stream leader-stepdown target.</param>
/// <param name="streams">Stream manager used to execute the stepdown action.</param>
public static JetStreamApiResponse HandleStreamLeaderStepdown(string subject, StreamManager streams)
{
if (!subject.StartsWith(JetStreamApiSubjects.StreamLeaderStepdown, StringComparison.Ordinal))
@@ -21,6 +30,10 @@ public static class ClusterControlApiHandlers
return JetStreamApiResponse.SuccessResponse();
}
/// <summary>
/// Handles stream peer removal requests routed via API subject.
/// </summary>
/// <param name="subject">API subject containing stream peer-removal target.</param>
public static JetStreamApiResponse HandleStreamPeerRemove(string subject)
{
if (!subject.StartsWith(JetStreamApiSubjects.StreamPeerRemove, StringComparison.Ordinal))
@@ -30,6 +43,10 @@ public static class ClusterControlApiHandlers
return stream.Length == 0 ? JetStreamApiResponse.NotFound(subject) : JetStreamApiResponse.SuccessResponse();
}
/// <summary>
/// Handles consumer leader stepdown requests routed via API subject.
/// </summary>
/// <param name="subject">API subject containing stream/consumer stepdown target.</param>
public static JetStreamApiResponse HandleConsumerLeaderStepdown(string subject)
{
if (!subject.StartsWith(JetStreamApiSubjects.ConsumerLeaderStepdown, StringComparison.Ordinal))

View File

@@ -49,6 +49,7 @@ public static class AssignmentCodec
/// Go reference: jetstream_cluster.go:8703 encodeAddStreamAssignment —
/// marshals the assignment struct (with ConfigJSON) to JSON.
/// </summary>
/// <param name="sa">Stream assignment payload to encode for meta-cluster replication.</param>
public static byte[] EncodeStreamAssignment(StreamAssignment sa)
=> JsonSerializer.SerializeToUtf8Bytes(sa, SerializerOptions);
@@ -58,6 +59,7 @@ public static class AssignmentCodec
/// Go reference: jetstream_cluster.go:8733 decodeStreamAssignment —
/// json.Unmarshal(buf, &amp;sa); returns nil, err on failure.
/// </summary>
/// <param name="data">UTF-8 encoded stream assignment bytes from RAFT metadata messages.</param>
public static StreamAssignment? DecodeStreamAssignment(ReadOnlySpan<byte> data)
{
if (data.IsEmpty)
@@ -84,6 +86,7 @@ public static class AssignmentCodec
/// Go reference: jetstream_cluster.go:9175 encodeAddConsumerAssignment —
/// marshals the assignment struct to JSON.
/// </summary>
/// <param name="ca">Consumer assignment payload to encode for cluster propagation.</param>
public static byte[] EncodeConsumerAssignment(ConsumerAssignment ca)
=> JsonSerializer.SerializeToUtf8Bytes(ca, SerializerOptions);
@@ -93,6 +96,7 @@ public static class AssignmentCodec
/// Go reference: jetstream_cluster.go:9195 decodeConsumerAssignment —
/// json.Unmarshal(buf, &amp;ca); returns nil, err on failure.
/// </summary>
/// <param name="data">UTF-8 encoded consumer assignment bytes from metadata updates.</param>
public static ConsumerAssignment? DecodeConsumerAssignment(ReadOnlySpan<byte> data)
{
if (data.IsEmpty)
@@ -128,6 +132,8 @@ public static class AssignmentCodec
/// s2.NewWriter used to compress large consumer assignment payloads; the caller
/// prepends the assignCompressedConsumerOp opcode byte as a similar kind of marker.
/// </summary>
/// <param name="data">Uncompressed assignment payload bytes.</param>
/// <param name="threshold">Minimum byte size required before compression is applied.</param>
public static byte[] CompressIfLarge(byte[] data, int threshold = 1024)
{
if (data.Length <= threshold)
@@ -148,6 +154,7 @@ public static class AssignmentCodec
/// s2.NewReader used to decompress consumer assignment payloads that were compressed
/// before being proposed to the meta RAFT group.
/// </summary>
/// <param name="data">Compressed-or-plain payload bytes received from cluster metadata transport.</param>
public static byte[] DecompressIfNeeded(byte[] data)
{
if (data.Length > 0 && data[0] == CompressedMarker)

View File

@@ -30,11 +30,22 @@ public sealed class JetStreamClusterMonitor
/// </summary>
public int ProcessedCount { get { lock (_processedLock) return _processedCount; } }
/// <summary>
/// Creates a cluster monitor with a null logger for lightweight test and host setups.
/// </summary>
/// <param name="meta">Meta-group state container receiving assignment updates.</param>
/// <param name="entries">RAFT entry channel consumed by the monitor loop.</param>
public JetStreamClusterMonitor(JetStreamMetaGroup meta, ChannelReader<RaftLogEntry> entries)
: this(meta, entries, NullLogger<JetStreamClusterMonitor>.Instance)
{
}
/// <summary>
/// Creates a cluster monitor with explicit logger injection.
/// </summary>
/// <param name="meta">Meta-group state container receiving assignment updates.</param>
/// <param name="entries">RAFT entry channel consumed by the monitor loop.</param>
/// <param name="logger">Logger for malformed entry and state-application diagnostics.</param>
public JetStreamClusterMonitor(
JetStreamMetaGroup meta,
ChannelReader<RaftLogEntry> entries,
@@ -50,6 +61,7 @@ public sealed class JetStreamClusterMonitor
/// Each entry is applied synchronously before the next is read.
/// Returns normally (without throwing) when <paramref name="ct"/> is cancelled.
/// </summary>
/// <param name="ct">Cancellation token used to stop the monitor loop.</param>
public async Task StartAsync(CancellationToken ct)
{
try
@@ -75,6 +87,8 @@ public sealed class JetStreamClusterMonitor
/// <paramref name="targetCount"/>. Returns immediately when the target is already met.
/// Used by tests to synchronise without sleeping.
/// </summary>
/// <param name="targetCount">Minimum processed-entry count to wait for.</param>
/// <param name="ct">Cancellation token for aborting the wait.</param>
public Task WaitForProcessedAsync(int targetCount, CancellationToken ct)
{
// Fast path — already done.

View File

@@ -31,6 +31,11 @@ public static class PlacementEngine
/// Overloaded peers are tried only after preferred candidates are exhausted.
/// 8. Throw InvalidOperationException if fewer than replicas peers can be selected.
/// </summary>
/// <param name="groupName">RAFT group name being placed.</param>
/// <param name="replicas">Required number of replicas/peers.</param>
/// <param name="availablePeers">Available cluster peers considered for placement.</param>
/// <param name="policy">Optional placement policy with cluster/tag constraints.</param>
/// <param name="assetCostWeight">Per-asset storage penalty used in scoring.</param>
public static RaftGroup SelectPeerGroup(
string groupName,
int replicas,
@@ -201,10 +206,15 @@ public static class PlacementEngine
/// </summary>
public sealed class PeerInfo
{
/// <summary>Unique peer identifier used in RAFT group membership.</summary>
public required string PeerId { get; init; }
/// <summary>Cluster name/partition where this peer resides.</summary>
public string Cluster { get; set; } = string.Empty;
/// <summary>Capability and topology tags advertised by this peer.</summary>
public HashSet<string> Tags { get; init; } = new(StringComparer.OrdinalIgnoreCase);
/// <summary>Whether this peer is currently eligible for new assignments.</summary>
public bool Available { get; set; } = true;
/// <summary>Approximate remaining storage available for new assets.</summary>
public long AvailableStorage { get; set; } = long.MaxValue;
/// <summary>
@@ -228,8 +238,11 @@ public sealed class PeerInfo
/// </summary>
public sealed class PlacementPolicy
{
/// <summary>Optional cluster affinity constraint.</summary>
public string? Cluster { get; set; }
/// <summary>Required tags that must all be present on a candidate peer.</summary>
public HashSet<string>? Tags { get; set; }
/// <summary>Tags that disqualify a candidate peer when present.</summary>
public HashSet<string>? ExcludeTags { get; set; }
/// <summary>

View File

@@ -17,6 +17,11 @@ public sealed class FilterSkipTracker
private long _matchCount;
private long _skipCount;
/// <summary>
/// Creates a filter-skip tracker for single or multi-subject consumer filters.
/// </summary>
/// <param name="filterSubject">Optional single filter subject for consumer delivery.</param>
/// <param name="filterSubjects">Optional multi-subject filter set for consumer delivery.</param>
public FilterSkipTracker(string? filterSubject = null, IReadOnlyList<string>? filterSubjects = null)
{
_filterSubject = filterSubject;
@@ -46,6 +51,7 @@ public sealed class FilterSkipTracker
/// Uses SubjectMatch.MatchLiteral for NATS token-based matching.
/// Go reference: consumer.go isFilteredMatch.
/// </summary>
/// <param name="subject">Message subject being evaluated for this consumer.</param>
public bool ShouldDeliver(string subject)
{
if (!HasFilter)
@@ -79,6 +85,7 @@ public sealed class FilterSkipTracker
/// <summary>
/// Records a skipped sequence for gap tracking.
/// </summary>
/// <param name="sequence">Stream sequence skipped due to filter mismatch.</param>
public void RecordSkip(ulong sequence)
{
_skippedSequences.Add(sequence);
@@ -88,6 +95,7 @@ public sealed class FilterSkipTracker
/// Returns the next unskipped sequence >= startSeq.
/// Used to find the next deliverable message efficiently.
/// </summary>
/// <param name="startSeq">Candidate sequence to begin searching from.</param>
public ulong NextUnskippedSequence(ulong startSeq)
{
var seq = startSeq;
@@ -100,6 +108,7 @@ public sealed class FilterSkipTracker
/// Clears skipped sequences below the given floor (e.g., ack floor).
/// Prevents unbounded growth.
/// </summary>
/// <param name="floor">Inclusive lower bound; skipped sequences below this value are removed.</param>
public void PurgeBelow(ulong floor)
{
_skippedSequences.RemoveWhere(s => s < floor);

View File

@@ -17,6 +17,8 @@ public sealed class SampleTracker
/// Creates a sample tracker with the given rate (0.0 to 1.0).
/// Use ParseSampleFrequency to convert string like "1%" to rate.
/// </summary>
/// <param name="sampleRate">Sampling probability as a fraction from 0.0 to 1.0.</param>
/// <param name="random">Optional random source for deterministic testability.</param>
public SampleTracker(double sampleRate, Random? random = null)
{
_sampleRate = Math.Clamp(sampleRate, 0.0, 1.0);
@@ -61,6 +63,9 @@ public sealed class SampleTracker
/// Records a latency measurement for a sampled delivery.
/// Returns a LatencySample for advisory publication.
/// </summary>
/// <param name="deliveryLatency">Observed delivery latency for the sampled message.</param>
/// <param name="sequence">Stream sequence number of the sampled message.</param>
/// <param name="subject">Subject delivered to the consumer.</param>
public LatencySample RecordLatency(TimeSpan deliveryLatency, ulong sequence, string subject)
{
return new LatencySample
@@ -78,6 +83,7 @@ public sealed class SampleTracker
/// Returns 0.0 for invalid or empty strings.
/// Go reference: consumer.go parseSampleFrequency.
/// </summary>
/// <param name="frequency">Human-readable percentage string.</param>
public static double ParseSampleFrequency(string? frequency)
{
if (string.IsNullOrWhiteSpace(frequency))
@@ -104,8 +110,12 @@ public sealed class SampleTracker
/// </summary>
public sealed class LatencySample
{
/// <summary>Stream sequence number of the sampled message.</summary>
public ulong Sequence { get; init; }
/// <summary>Subject delivered to the consumer.</summary>
public string Subject { get; init; } = string.Empty;
/// <summary>Observed delivery latency for this sample.</summary>
public TimeSpan DeliveryLatency { get; init; }
/// <summary>UTC timestamp when the sample was captured.</summary>
public DateTime SampledAtUtc { get; init; }
}

View File

@@ -50,6 +50,7 @@ public sealed class TokenBucketRateLimiter
/// Returns true if tokens were available (message can be sent).
/// Returns false if not enough tokens (caller should wait).
/// </summary>
/// <param name="bytes">Number of payload bytes to reserve from the token bucket.</param>
public bool TryConsume(long bytes)
{
if (BytesPerSecond <= 0) return true; // Unlimited
@@ -69,6 +70,7 @@ public sealed class TokenBucketRateLimiter
/// <summary>
/// Returns the estimated wait time until enough tokens are available.
/// </summary>
/// <param name="bytes">Number of bytes needed before send can proceed.</param>
public TimeSpan EstimateWait(long bytes)
{
if (BytesPerSecond <= 0) return TimeSpan.Zero;
@@ -87,6 +89,8 @@ public sealed class TokenBucketRateLimiter
/// <summary>
/// Waits until enough tokens are available, then consumes them.
/// </summary>
/// <param name="bytes">Number of payload bytes that must be available.</param>
/// <param name="ct">Cancellation token to stop waiting when request processing is canceled.</param>
public async ValueTask WaitForTokensAsync(long bytes, CancellationToken ct = default)
{
if (BytesPerSecond <= 0) return;
@@ -107,6 +111,8 @@ public sealed class TokenBucketRateLimiter
/// Updates the rate dynamically.
/// Go reference: consumer.go — rate can change on config update.
/// </summary>
/// <param name="bytesPerSecond">New throughput limit in bytes per second.</param>
/// <param name="burstSize">Optional new burst ceiling; defaults to two seconds of throughput.</param>
public void UpdateRate(long bytesPerSecond, long burstSize = 0)
{
lock (_lock)

View File

@@ -25,6 +25,7 @@ public sealed record PullRequest(
public void ConsumeBatch() => RemainingBatch--;
/// <summary>Subtract delivered bytes from remaining byte budget.</summary>
/// <param name="bytes">Delivered payload bytes to subtract from this pull request budget.</param>
public void ConsumeBytes(long bytes) => RemainingBytes -= bytes;
}
@@ -38,11 +39,25 @@ public sealed class WaitingRequestQueue
{
private readonly LinkedList<PullRequest> _queue = new();
/// <summary>
/// Gets the current number of queued pull requests awaiting delivery.
/// </summary>
public int Count => _queue.Count;
/// <summary>
/// Gets a value indicating whether there are no pending pull requests.
/// </summary>
public bool IsEmpty => _queue.Count == 0;
/// <summary>
/// Enqueues a pull request at the tail of the FIFO wait queue.
/// </summary>
/// <param name="request">Pull request to add.</param>
public void Enqueue(PullRequest request) => _queue.AddLast(request);
/// <summary>
/// Dequeues the oldest pending pull request, or <see langword="null"/> when empty.
/// </summary>
public PullRequest? TryDequeue()
{
if (_queue.Count == 0) return null;
@@ -51,6 +66,10 @@ public sealed class WaitingRequestQueue
return first;
}
/// <summary>
/// Removes expired pull requests whose deadline is at or before <paramref name="now"/>.
/// </summary>
/// <param name="now">Current timestamp used for expiry comparison.</param>
public void RemoveExpired(DateTimeOffset now)
{
var node = _queue.First;

View File

@@ -18,6 +18,8 @@ public sealed class InterestRetentionPolicy
/// <summary>
/// Register a consumer's interest in a subject pattern.
/// </summary>
/// <param name="consumer">Durable or ephemeral consumer name whose interest is tracked.</param>
/// <param name="filterSubject">Consumer filter subject used to determine message relevance.</param>
public void RegisterInterest(string consumer, string filterSubject)
{
_interests[consumer] = filterSubject;
@@ -26,6 +28,7 @@ public sealed class InterestRetentionPolicy
/// <summary>
/// Remove a consumer's interest (e.g., on deletion).
/// </summary>
/// <param name="consumer">Consumer identifier to remove from interest tracking.</param>
public void UnregisterInterest(string consumer)
{
_interests.Remove(consumer);
@@ -34,6 +37,8 @@ public sealed class InterestRetentionPolicy
/// <summary>
/// Record that a consumer has acknowledged delivery of a sequence.
/// </summary>
/// <param name="consumer">Consumer that acknowledged delivery.</param>
/// <param name="seq">Stream sequence number being acknowledged.</param>
public void AcknowledgeDelivery(string consumer, ulong seq)
{
if (!_acks.TryGetValue(seq, out var ackedBy))
@@ -49,6 +54,8 @@ public sealed class InterestRetentionPolicy
/// interested consumer has NOT yet acknowledged it).
/// A consumer is "interested" if its filter subject matches the message subject.
/// </summary>
/// <param name="seq">Stream sequence being evaluated for retention eligibility.</param>
/// <param name="msgSubject">Published subject of the message at <paramref name="seq"/>.</param>
public bool ShouldRetain(ulong seq, string msgSubject)
{
_acks.TryGetValue(seq, out var ackedBy);

View File

@@ -19,16 +19,36 @@ public static class JetStreamProfiler
private static long _ackDeliverTicks;
private static long _totalProcessMessageTicks;
/// <summary>Records ticks spent in stream subject-to-stream resolution.</summary>
/// <param name="ticks">Elapsed stopwatch ticks.</param>
public static void RecordFindBySubject(long ticks) => Interlocked.Add(ref _findBySubjectTicks, ticks);
/// <summary>Records ticks spent loading stream state metadata.</summary>
/// <param name="ticks">Elapsed stopwatch ticks.</param>
public static void RecordGetState(long ticks) => Interlocked.Add(ref _getStateTicks, ticks);
/// <summary>Records ticks spent appending to stream storage.</summary>
/// <param name="ticks">Elapsed stopwatch ticks.</param>
public static void RecordAppend(long ticks) => Interlocked.Add(ref _appendTicks, ticks);
/// <summary>Records ticks spent enforcing stream retention/limit policies.</summary>
/// <param name="ticks">Elapsed stopwatch ticks.</param>
public static void RecordEnforcePolicies(long ticks) => Interlocked.Add(ref _enforcePoliciesTicks, ticks);
/// <summary>Records residual capture overhead not attributed to named stages.</summary>
/// <param name="ticks">Elapsed stopwatch ticks.</param>
public static void RecordCaptureOverhead(long ticks) => Interlocked.Add(ref _captureOverheadTicks, ticks);
/// <summary>Records ticks spent serializing publish-related JSON responses/events.</summary>
/// <param name="ticks">Elapsed stopwatch ticks.</param>
public static void RecordJsonSerialize(long ticks) => Interlocked.Add(ref _jsonSerializeTicks, ticks);
/// <summary>Records ticks spent in ack-delivery and ack-path bookkeeping.</summary>
/// <param name="ticks">Elapsed stopwatch ticks.</param>
public static void RecordAckDeliver(long ticks) => Interlocked.Add(ref _ackDeliverTicks, ticks);
/// <summary>Records total ticks spent processing a message end-to-end.</summary>
/// <param name="ticks">Elapsed stopwatch ticks.</param>
public static void RecordTotalProcessMessage(long ticks) => Interlocked.Add(ref _totalProcessMessageTicks, ticks);
/// <summary>Increments the total processed-call counter.</summary>
public static void IncrementCalls() => Interlocked.Increment(ref _totalCalls);
/// <summary>
/// Returns a formatted profile report and resets all accumulated counters.
/// </summary>
public static string DumpAndReset()
{
var calls = Interlocked.Exchange(ref _totalCalls, 0);

View File

@@ -44,7 +44,14 @@ public sealed class JetStreamService : IAsyncDisposable
private readonly ILogger<JetStreamService> _logger;
private List<string> _registeredApiSubjects = [];
/// <summary>
/// Gets the internal client used for local system publications, when configured.
/// </summary>
public InternalClient? InternalClient { get; }
/// <summary>
/// Gets a value indicating whether JetStream API subjects are currently registered.
/// </summary>
public bool IsRunning { get; private set; }
/// <summary>
@@ -77,11 +84,22 @@ public sealed class JetStreamService : IAsyncDisposable
/// </summary>
public long MaxStore => _options.MaxFileStore;
/// <summary>
/// Creates a JetStream service using null logging for lightweight host wiring.
/// </summary>
/// <param name="options">JetStream configuration limits and storage settings.</param>
/// <param name="internalClient">Optional internal client for server-generated JetStream API traffic.</param>
public JetStreamService(JetStreamOptions options, InternalClient? internalClient = null)
: this(options, internalClient, NullLoggerFactory.Instance)
{
}
/// <summary>
/// Creates a JetStream service with explicit logging factory control.
/// </summary>
/// <param name="options">JetStream configuration limits and storage settings.</param>
/// <param name="internalClient">Optional internal client for server-generated JetStream API traffic.</param>
/// <param name="loggerFactory">Logger factory used to create component loggers.</param>
public JetStreamService(JetStreamOptions options, InternalClient? internalClient, ILoggerFactory loggerFactory)
{
_options = options;
@@ -92,6 +110,10 @@ public sealed class JetStreamService : IAsyncDisposable
// Maps to Go's enableJetStream() in server/jetstream.go:414-523.
// Validates the store directory, creates it if absent, then registers all
// $JS.API.> subjects so inbound API messages can be routed.
/// <summary>
/// Starts JetStream by validating storage and registering all JetStream API subjects.
/// </summary>
/// <param name="ct">Cancellation token for startup flow control.</param>
public Task StartAsync(CancellationToken ct)
{
if (IsRunning)
@@ -138,6 +160,9 @@ public sealed class JetStreamService : IAsyncDisposable
// Maps to Go's shutdown path in jetstream.go.
// Clears registered subjects and marks the service as not running.
/// <summary>
/// Stops JetStream and removes registered API subjects.
/// </summary>
public ValueTask DisposeAsync()
{
_registeredApiSubjects = [];

View File

@@ -34,6 +34,7 @@ public static class JsVersioning
/// Returns the required API level string from metadata, or empty if absent.
/// Go: getRequiredApiLevel (jetstream_versioning.go:28)
/// </summary>
/// <param name="metadata">Metadata dictionary that may contain the required-level key.</param>
public static string GetRequiredApiLevel(Dictionary<string, string>? metadata)
{
if (metadata != null && metadata.TryGetValue(RequiredLevelKey, out var level) && level.Length > 0)
@@ -45,6 +46,7 @@ public static class JsVersioning
/// Returns whether the required API level is supported by this server.
/// Go: supportsRequiredApiLevel (jetstream_versioning.go:36)
/// </summary>
/// <param name="metadata">Metadata dictionary that may contain required-level information.</param>
public static bool SupportsRequiredApiLevel(Dictionary<string, string>? metadata)
{
var level = GetRequiredApiLevel(metadata);
@@ -60,6 +62,7 @@ public static class JsVersioning
/// Clears dynamic fields (server version/level) and sets the required API level.
/// Go: setStaticStreamMetadata (jetstream_versioning.go:44)
/// </summary>
/// <param name="cfg">Stream configuration to mutate with static version metadata.</param>
public static void SetStaticStreamMetadata(StreamConfig cfg)
{
if (cfg.Metadata == null)
@@ -98,6 +101,7 @@ public static class JsVersioning
/// The original config is not modified.
/// Go: setDynamicStreamMetadata (jetstream_versioning.go:88)
/// </summary>
/// <param name="cfg">Source stream configuration.</param>
public static StreamConfig SetDynamicStreamMetadata(StreamConfig cfg)
{
// Shallow copy the config
@@ -118,6 +122,8 @@ public static class JsVersioning
/// Removes dynamic fields. If prevCfg has no metadata, removes the key from cfg.
/// Go: copyStreamMetadata (jetstream_versioning.go:110)
/// </summary>
/// <param name="cfg">Target stream configuration to update.</param>
/// <param name="prevCfg">Previous stream configuration whose required-level metadata is preserved.</param>
public static void CopyStreamMetadata(StreamConfig cfg, StreamConfig? prevCfg)
{
if (cfg.Metadata != null)
@@ -129,6 +135,7 @@ public static class JsVersioning
/// Sets static (stored) versioning metadata on a consumer config.
/// Go: setStaticConsumerMetadata (jetstream_versioning.go:136)
/// </summary>
/// <param name="cfg">Consumer configuration to mutate with static version metadata.</param>
public static void SetStaticConsumerMetadata(ConsumerConfig cfg)
{
if (cfg.Metadata == null)
@@ -154,6 +161,7 @@ public static class JsVersioning
/// Returns a copy of the consumer config with dynamic metadata fields added.
/// Go: setDynamicConsumerMetadata (jetstream_versioning.go:164)
/// </summary>
/// <param name="cfg">Source consumer configuration.</param>
public static ConsumerConfig SetDynamicConsumerMetadata(ConsumerConfig cfg)
{
var newCfg = ShallowCopyConsumer(cfg);
@@ -173,6 +181,8 @@ public static class JsVersioning
/// Removes dynamic fields.
/// Go: copyConsumerMetadata (jetstream_versioning.go:198)
/// </summary>
/// <param name="cfg">Target consumer configuration to update.</param>
/// <param name="prevCfg">Previous consumer configuration whose required-level metadata is preserved.</param>
public static void CopyConsumerMetadata(ConsumerConfig cfg, ConsumerConfig? prevCfg)
{
if (cfg.Metadata != null)
@@ -184,6 +194,7 @@ public static class JsVersioning
/// Removes dynamic metadata fields (server version and level) from a metadata dictionary.
/// Go: deleteDynamicMetadata (jetstream_versioning.go:222)
/// </summary>
/// <param name="metadata">Metadata dictionary to clean.</param>
public static void DeleteDynamicMetadata(Dictionary<string, string> metadata)
{
metadata.Remove(ServerVersionKey);

View File

@@ -7,15 +7,32 @@ namespace NATS.Server.JetStream.Models;
// Reference: golang/nats-server/server/stream.go:20759
public sealed class CounterValue
{
/// <summary>
/// Gets or sets the counter value encoded as a string for wire compatibility.
/// </summary>
[JsonPropertyName("val")]
public string Value { get; set; } = "0";
/// <summary>
/// Parses the string counter value as a signed 64-bit integer.
/// </summary>
public long AsLong() => long.TryParse(Value, out var v) ? v : 0;
/// <summary>
/// Creates a counter payload object from a numeric value.
/// </summary>
/// <param name="value">Numeric counter value to encode.</param>
public static CounterValue FromLong(long value) => new() { Value = value.ToString() };
/// <summary>
/// Serializes the counter payload to JSON UTF-8 bytes.
/// </summary>
public byte[] ToPayload() => JsonSerializer.SerializeToUtf8Bytes(this);
/// <summary>
/// Deserializes a counter payload from JSON bytes.
/// </summary>
/// <param name="payload">JSON payload bytes containing the counter value.</param>
public static CounterValue FromPayload(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)

View File

@@ -2,8 +2,23 @@ namespace NATS.Server.JetStream.Models;
public sealed class ApiStreamState
{
/// <summary>
/// Gets or sets current number of messages stored in the stream.
/// </summary>
public ulong Messages { get; set; }
/// <summary>
/// Gets or sets first available stream sequence.
/// </summary>
public ulong FirstSeq { get; set; }
/// <summary>
/// Gets or sets latest stream sequence.
/// </summary>
public ulong LastSeq { get; set; }
/// <summary>
/// Gets or sets total bytes retained by the stream.
/// </summary>
public ulong Bytes { get; set; }
}

View File

@@ -18,6 +18,9 @@ internal static class JetStreamPubAckFormatter
/// Formats a success PubAck directly into a span. Returns bytes written.
/// Caller must ensure dest is large enough (256 bytes is safe for any stream name).
/// </summary>
/// <param name="dest">Destination span receiving UTF-8 formatted JSON bytes.</param>
/// <param name="streamName">Stream name included in the ack payload.</param>
/// <param name="seq">Assigned stream sequence to include in the ack payload.</param>
public static int FormatSuccess(Span<byte> dest, string streamName, ulong seq)
{
var pos = 0;
@@ -36,6 +39,7 @@ internal static class JetStreamPubAckFormatter
/// <summary>
/// Returns true if this PubAck is a simple success that can use the fast formatter.
/// </summary>
/// <param name="ack">Publish acknowledgement to classify.</param>
public static bool IsSimpleSuccess(PubAck ack)
=> ack.ErrorCode == null && !ack.Duplicate && ack.BatchId == null;
}

View File

@@ -9,17 +9,41 @@ public sealed class JetStreamPublisher
// Go reference: server/jetstream_batching.go streamBatches
private readonly AtomicBatchPublishEngine _batchEngine = new();
/// <summary>
/// Creates a JetStream publisher bound to a stream manager.
/// </summary>
/// <param name="streamManager">Stream manager used to resolve and capture published messages.</param>
public JetStreamPublisher(StreamManager streamManager)
{
_streamManager = streamManager;
}
/// <summary>
/// Captures a publish using default publish options.
/// </summary>
/// <param name="subject">Publish subject.</param>
/// <param name="payload">Message payload.</param>
/// <param name="ack">Publish acknowledgement output.</param>
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
=> TryCaptureWithOptions(subject, payload, new PublishOptions(), out ack);
/// <summary>
/// Captures a publish with an explicit message id for deduplication checks.
/// </summary>
/// <param name="subject">Publish subject.</param>
/// <param name="payload">Message payload.</param>
/// <param name="msgId">Optional message id used for duplicate detection.</param>
/// <param name="ack">Publish acknowledgement output.</param>
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, string? msgId, out PubAck ack)
=> TryCaptureWithOptions(subject, payload, new PublishOptions { MsgId = msgId }, out ack);
/// <summary>
/// Captures a publish using explicit publish options and precondition checks.
/// </summary>
/// <param name="subject">Publish subject.</param>
/// <param name="payload">Message payload.</param>
/// <param name="options">Publish options including dedupe and expected-last preconditions.</param>
/// <param name="ack">Publish acknowledgement output.</param>
public bool TryCaptureWithOptions(string subject, ReadOnlyMemory<byte> payload, PublishOptions options, out PubAck ack)
{
if (_streamManager.FindBySubject(subject) is not { } stream)

View File

@@ -2,16 +2,37 @@ namespace NATS.Server.JetStream.Publish;
public sealed class PubAck
{
/// <summary>
/// Gets the stream name that accepted the published message.
/// </summary>
public string Stream { get; init; } = string.Empty;
/// <summary>
/// Gets the stream sequence assigned to the accepted message.
/// </summary>
public ulong Seq { get; init; }
/// <summary>
/// Gets a value indicating whether this acknowledgement represents a deduplicated publish.
/// </summary>
public bool Duplicate { get; init; }
/// <summary>
/// Gets the JetStream API error code when the publish was rejected.
/// </summary>
public int? ErrorCode { get; init; }
// Go: JSPubAckResponse.BatchId — identifies which batch this ack belongs to.
// Go reference: server/jetstream_batching.go (JSPubAckResponse struct)
/// <summary>
/// Gets the batch identifier when this ack belongs to an atomic publish batch.
/// </summary>
public string? BatchId { get; init; }
// Go: JSPubAckResponse.BatchSize — total number of messages committed in this batch.
// Go reference: server/jetstream_batching.go (JSPubAckResponse struct)
/// <summary>
/// Gets the number of messages committed in the acknowledged batch.
/// </summary>
public int BatchSize { get; init; }
}

View File

@@ -2,20 +2,47 @@ namespace NATS.Server.JetStream.Publish;
public sealed class PublishOptions
{
/// <summary>
/// Gets the idempotency token used to deduplicate retried publishes on the stream.
/// </summary>
public string? MsgId { get; init; }
/// <summary>
/// Gets the expected stream last sequence precondition for optimistic concurrency checks.
/// </summary>
public ulong ExpectedLastSeq { get; init; }
/// <summary>
/// Gets the expected last sequence for a specific subject when enforcing subject-level ordering.
/// </summary>
public ulong ExpectedLastSubjectSeq { get; init; }
/// <summary>
/// Gets the subject associated with <see cref="ExpectedLastSubjectSeq"/> precondition checks.
/// </summary>
public string? ExpectedLastSubjectSeqSubject { get; init; }
// Go: Nats-Batch-Id header — identifies which atomic batch this message belongs to.
/// <summary>
/// Gets the batch identifier used to group staged messages into a single commit set.
/// </summary>
public string? BatchId { get; init; }
// Go: Nats-Batch-Sequence header — 1-based position within the batch.
/// <summary>
/// Gets the 1-based position of this message within its JetStream publish batch.
/// </summary>
public ulong BatchSeq { get; init; }
// Go: Nats-Batch-Commit header — "1" or "eob" to commit, null/empty to stage only.
/// <summary>
/// Gets the batch commit marker signaling end-of-batch or explicit commit behavior.
/// </summary>
public string? BatchCommit { get; init; }
// Go: Nats-Expected-Last-Msg-Id header — unsupported inside a batch.
/// <summary>
/// Gets the expected last message id precondition used to guard against duplicate writes.
/// </summary>
public string? ExpectedLastMsgId { get; init; }
}

View File

@@ -6,6 +6,12 @@ public sealed class PublishPreconditions
{
private readonly ConcurrentDictionary<string, DedupeEntry> _dedupe = new(StringComparer.Ordinal);
/// <summary>
/// Checks whether a message id is still inside the duplicate window.
/// </summary>
/// <param name="msgId">Message-id header used for deduplication.</param>
/// <param name="duplicateWindowMs">Duplicate window size in milliseconds.</param>
/// <param name="existingSequence">Existing stored sequence when a duplicate is detected.</param>
public bool IsDuplicate(string? msgId, int duplicateWindowMs, out ulong existingSequence)
{
existingSequence = 0;
@@ -26,6 +32,11 @@ public sealed class PublishPreconditions
return true;
}
/// <summary>
/// Records a message id and sequence for future duplicate detection.
/// </summary>
/// <param name="msgId">Message-id header value to track.</param>
/// <param name="sequence">Stream sequence assigned to the message.</param>
public void Record(string? msgId, ulong sequence)
{
if (string.IsNullOrEmpty(msgId))
@@ -34,6 +45,10 @@ public sealed class PublishPreconditions
_dedupe[msgId] = new DedupeEntry(sequence, DateTime.UtcNow);
}
/// <summary>
/// Removes dedupe entries older than the duplicate window.
/// </summary>
/// <param name="duplicateWindowMs">Duplicate window size in milliseconds.</param>
public void TrimOlderThan(int duplicateWindowMs)
{
if (duplicateWindowMs <= 0)
@@ -47,6 +62,11 @@ public sealed class PublishPreconditions
}
}
/// <summary>
/// Validates expected-last-sequence precondition against current stream sequence.
/// </summary>
/// <param name="expectedLastSeq">Expected last sequence from publish precondition header.</param>
/// <param name="actualLastSeq">Current stream last sequence.</param>
public bool CheckExpectedLastSeq(ulong expectedLastSeq, ulong actualLastSeq)
=> expectedLastSeq == 0 || expectedLastSeq == actualLastSeq;

View File

@@ -65,6 +65,9 @@ internal static class AeadEncryptor
/// Encrypts <paramref name="plaintext"/> with the given <paramref name="cipher"/>
/// and <paramref name="key"/>.
/// </summary>
/// <param name="plaintext">Plain JetStream block bytes to protect at rest.</param>
/// <param name="key">32-byte encryption key derived for the store context.</param>
/// <param name="cipher">Configured at-rest cipher selection.</param>
/// <returns>
/// Wire format: <c>[12:nonce][16:tag][N:ciphertext]</c>
/// </returns>
@@ -112,6 +115,9 @@ internal static class AeadEncryptor
/// <summary>
/// Decrypts data produced by <see cref="Encrypt"/>.
/// </summary>
/// <param name="encrypted">Encrypted block bytes in nonce/tag/ciphertext wire format.</param>
/// <param name="key">32-byte encryption key used when the block was encrypted.</param>
/// <param name="cipher">Configured at-rest cipher selection.</param>
/// <returns>Plaintext bytes.</returns>
/// <exception cref="ArgumentException">If key length is not 32 bytes or data is too short.</exception>
/// <exception cref="CryptographicException">If authentication tag verification fails.</exception>

View File

@@ -18,6 +18,8 @@ public static class AtomicFileWriter
/// The data is written to a unique <c>{path}.{random}.tmp</c> sibling, flushed,
/// then renamed over <paramref name="path"/> with overwrite semantics.
/// </summary>
/// <param name="path">Target file path to replace atomically.</param>
/// <param name="data">Binary payload bytes to persist.</param>
public static async Task WriteAtomicallyAsync(string path, byte[] data)
{
var tmpPath = path + "." + Path.GetRandomFileName() + ".tmp";
@@ -35,6 +37,8 @@ public static class AtomicFileWriter
/// The data is written to a unique <c>{path}.{random}.tmp</c> sibling, flushed,
/// then renamed over <paramref name="path"/> with overwrite semantics.
/// </summary>
/// <param name="path">Target file path to replace atomically.</param>
/// <param name="data">Binary payload bytes to persist.</param>
public static async Task WriteAtomicallyAsync(string path, ReadOnlyMemory<byte> data)
{
var tmpPath = path + "." + Path.GetRandomFileName() + ".tmp";

View File

@@ -26,16 +26,28 @@ public record struct Pending(ulong Sequence, long Timestamp);
public sealed class ConsumerState
{
// Go: ConsumerState.Delivered — highest consumer-seq and stream-seq delivered
/// <summary>
/// Gets or sets highest delivered consumer/stream sequence pair.
/// </summary>
public SequencePair Delivered { get; set; }
// Go: ConsumerState.AckFloor — highest consumer-seq and stream-seq fully acknowledged
/// <summary>
/// Gets or sets highest fully acknowledged consumer/stream sequence pair.
/// </summary>
public SequencePair AckFloor { get; set; }
// Go: ConsumerState.Pending — pending acks keyed by stream sequence; only present
// when AckPolicy is Explicit.
/// <summary>
/// Gets or sets pending explicit-ack entries keyed by stream sequence.
/// </summary>
public Dictionary<ulong, Pending>? Pending { get; set; }
// Go: ConsumerState.Redelivered — redelivery counts keyed by stream sequence;
// only present when a message has been delivered more than once.
/// <summary>
/// Gets or sets redelivery counters keyed by stream sequence.
/// </summary>
public Dictionary<ulong, ulong>? Redelivered { get; set; }
}

View File

@@ -44,6 +44,7 @@ public static class ConsumerStateCodec
/// Encodes consumer state into the Go-compatible binary format.
/// Reference: golang/nats-server/server/store.go:397
/// </summary>
/// <param name="state">Consumer state to serialize.</param>
public static byte[] Encode(ConsumerState state)
{
// Upper-bound the buffer size.
@@ -105,6 +106,7 @@ public static class ConsumerStateCodec
/// Decodes consumer state from the Go-compatible binary format.
/// Reference: golang/nats-server/server/filestore.go:12216
/// </summary>
/// <param name="buf">Binary payload bytes containing encoded consumer state.</param>
public static ConsumerState Decode(ReadOnlySpan<byte> buf)
{
// Copy to array first so lambdas can capture without ref-type restrictions.

View File

@@ -2,9 +2,28 @@ namespace NATS.Server.JetStream.Storage;
public sealed class FileStoreBlock
{
/// <summary>
/// Gets the logical block identifier within the stream file-store layout.
/// </summary>
public int Id { get; init; }
/// <summary>
/// Gets the filesystem path to the backing block file.
/// </summary>
public required string Path { get; init; }
/// <summary>
/// Gets the first stream sequence represented by this block.
/// </summary>
public ulong Sequence { get; init; }
/// <summary>
/// Gets the byte offset of this block relative to stream storage bookkeeping.
/// </summary>
public long OffsetBytes { get; init; }
/// <summary>
/// Gets or sets the current block size in bytes.
/// </summary>
public long SizeBytes { get; set; }
}

View File

@@ -11,35 +11,50 @@ namespace NATS.Server.JetStream.Storage;
/// </summary>
public sealed class FileStoreConfig
{
// Go: FileStoreConfig.StoreDir — root directory for all stream block files
/// <summary>
/// Gets or sets the root directory used to persist JetStream stream and consumer state on disk.
/// </summary>
public string StoreDir { get; set; } = string.Empty;
// Go: FileStoreConfig.BlockSize — maximum bytes per message block file.
// 0 means use the engine default (currently 8 MiB in Go).
/// <summary>
/// Gets or sets the maximum size of each JetStream message block file in bytes.
/// Use <c>0</c> to apply the server default block size.
/// </summary>
public ulong BlockSize { get; set; }
// Go: FileStoreConfig.CacheExpire — how long to keep a loaded block in memory
// after the last read before evicting. Default: 10 seconds.
/// <summary>
/// Gets or sets how long a loaded block stays in memory after last access before eviction.
/// </summary>
public TimeSpan CacheExpire { get; set; } = TimeSpan.FromSeconds(10);
// Go: FileStoreConfig.SubjectStateExpire — how long to keep per-subject state cached
// on an idle message block. Zero means use CacheExpire.
/// <summary>
/// Gets or sets how long per-subject accounting metadata remains cached for idle blocks.
/// When set to <see cref="TimeSpan.Zero"/>, <see cref="CacheExpire"/> is used.
/// </summary>
public TimeSpan SubjectStateExpire { get; set; }
// Go: FileStoreConfig.SyncInterval — interval at which dirty blocks are fsynced.
// Default: 2 minutes.
/// <summary>
/// Gets or sets how frequently dirty file-store blocks are synchronized to durable storage.
/// </summary>
public TimeSpan SyncInterval { get; set; } = TimeSpan.FromMinutes(2);
// Go: FileStoreConfig.SyncAlways — when true every write is immediately fsynced
/// <summary>
/// Gets or sets a value indicating whether each write is immediately synced for maximum durability.
/// </summary>
public bool SyncAlways { get; set; }
// Go: FileStoreConfig.AsyncFlush — when true write operations are batched and
// flushed asynchronously for higher throughput
/// <summary>
/// Gets or sets a value indicating whether flushes are performed asynchronously to improve throughput.
/// </summary>
public bool AsyncFlush { get; set; }
// Go: FileStoreConfig.Cipher — cipher used for at-rest encryption; NoCipher disables it
/// <summary>
/// Gets or sets the encryption mode used for JetStream data at rest.
/// </summary>
public StoreCipher Cipher { get; set; } = StoreCipher.NoCipher;
// Go: FileStoreConfig.Compression — compression algorithm applied to block data
/// <summary>
/// Gets or sets the compression mode applied to persisted JetStream block payloads.
/// </summary>
public StoreCompression Compression { get; set; } = StoreCompression.NoCompression;
}

View File

@@ -7,6 +7,10 @@ namespace NATS.Server.JetStream.Validation;
public static class JetStreamConfigValidator
{
/// <summary>
/// Validates stream/consumer names against JetStream naming constraints.
/// </summary>
/// <param name="name">Candidate name string.</param>
public static bool IsValidName(string? name)
{
if (string.IsNullOrWhiteSpace(name))
@@ -25,9 +29,17 @@ public static class JetStreamConfigValidator
return true;
}
/// <summary>
/// Returns true when metadata key/value bytes are within JetStream size limits.
/// </summary>
/// <param name="metadata">Metadata dictionary to measure.</param>
public static bool IsMetadataWithinLimit(Dictionary<string, string>? metadata)
=> MetadataByteSize(metadata) <= JetStreamApiLimits.JSMaxMetadataLen;
/// <summary>
/// Computes UTF-8 byte size for all metadata keys and values.
/// </summary>
/// <param name="metadata">Metadata dictionary to measure.</param>
public static int MetadataByteSize(Dictionary<string, string>? metadata)
{
if (metadata is null || metadata.Count == 0)
@@ -43,6 +55,10 @@ public static class JetStreamConfigValidator
return size;
}
/// <summary>
/// Validates a stream configuration for required fields and basic limit semantics.
/// </summary>
/// <param name="config">Stream configuration to validate.</param>
public static ValidationResult Validate(StreamConfig config)
{
if (string.IsNullOrWhiteSpace(config.Name) || config.Subjects.Count == 0)
@@ -66,6 +82,7 @@ public static class JetStreamConfigValidator
/// both server_name and cluster.name must be set.
/// Reference: Go server/jetstream.go validateOptions (line ~2822-2831).
/// </summary>
/// <param name="options">Server options containing JetStream and cluster settings.</param>
public static ValidationResult ValidateClusterConfig(NatsOptions options)
{
// If JetStream is not enabled or not clustered, no cluster-specific checks needed.
@@ -84,7 +101,9 @@ public static class JetStreamConfigValidator
public sealed class ValidationResult
{
/// <summary>Indicates whether validation succeeded.</summary>
public bool IsValid { get; }
/// <summary>Validation error message when <see cref="IsValid"/> is false.</summary>
public string Message { get; }
private ValidationResult(bool isValid, string message)
@@ -93,6 +112,9 @@ public sealed class ValidationResult
Message = message;
}
/// <summary>Creates a successful validation result.</summary>
public static ValidationResult Valid() => new(true, string.Empty);
/// <summary>Creates a failed validation result with an explanatory message.</summary>
/// <param name="message">Validation failure reason.</param>
public static ValidationResult Invalid(string message) => new(false, message);
}

View File

@@ -8,33 +8,43 @@ namespace NATS.Server.LeafNodes;
/// </summary>
public sealed class LeafConnectInfo
{
/// <summary>Optional user JWT presented during leaf authentication.</summary>
[JsonPropertyName("jwt")]
public string? Jwt { get; init; }
/// <summary>Client public NKey used for nonce-signature authentication.</summary>
[JsonPropertyName("nkey")]
public string? Nkey { get; init; }
/// <summary>Nonce signature proving ownership of <see cref="Nkey"/>.</summary>
[JsonPropertyName("sig")]
public string? Sig { get; init; }
/// <summary>Whether this leaf connection advertises hub mode support.</summary>
[JsonPropertyName("hub")]
public bool Hub { get; init; }
/// <summary>Optional cluster name associated with the remote leaf node.</summary>
[JsonPropertyName("cluster")]
public string? Cluster { get; init; }
/// <summary>Whether protocol headers are supported on this leaf connection.</summary>
[JsonPropertyName("headers")]
public bool Headers { get; init; }
/// <summary>Whether JetStream asset and API forwarding are supported.</summary>
[JsonPropertyName("jetstream")]
public bool JetStream { get; init; }
/// <summary>Negotiated compression mode for leaf traffic.</summary>
[JsonPropertyName("compression")]
public string? Compression { get; init; }
/// <summary>Optional remote account binding for this solicited leaf link.</summary>
[JsonPropertyName("remote_account")]
public string? RemoteAccount { get; init; }
/// <summary>Leaf protocol version number.</summary>
[JsonPropertyName("proto")]
public int Proto { get; init; }
}

View File

@@ -32,6 +32,10 @@ public sealed class LeafHubSpokeMapper
private readonly IReadOnlyList<string> _allowExports;
private readonly IReadOnlyList<string> _allowImports;
/// <summary>
/// Creates a mapper with account mapping only (no subject allow/deny filters).
/// </summary>
/// <param name="hubToSpoke">Mapping from hub account names to spoke account names.</param>
public LeafHubSpokeMapper(IReadOnlyDictionary<string, string> hubToSpoke)
: this(hubToSpoke, [], [], [], [])
{
@@ -40,6 +44,9 @@ public sealed class LeafHubSpokeMapper
/// <summary>
/// Creates a mapper with account mapping and subject deny filters (legacy constructor).
/// </summary>
/// <param name="hubToSpoke">Mapping from hub account names to spoke account names.</param>
/// <param name="denyExports">Subject patterns denied for hub→leaf flow.</param>
/// <param name="denyImports">Subject patterns denied for leaf→hub flow.</param>
public LeafHubSpokeMapper(
IReadOnlyDictionary<string, string> hubToSpoke,
IReadOnlyList<string> denyExports,
@@ -74,6 +81,9 @@ public sealed class LeafHubSpokeMapper
/// <summary>
/// Maps an account from hub→spoke or spoke→hub based on direction.
/// </summary>
/// <param name="account">Account name to map.</param>
/// <param name="subject">Subject associated with the mapping request.</param>
/// <param name="direction">Flow direction determining which map to apply.</param>
public LeafMappingResult Map(string account, string subject, LeafMapDirection direction)
{
if (direction == LeafMapDirection.Outbound && _hubToSpoke.TryGetValue(account, out var spoke))
@@ -89,6 +99,8 @@ public sealed class LeafHubSpokeMapper
/// When an allow-list is set, the subject must also match at least one allow pattern.
/// Deny takes precedence over allow (Go reference: auth.go SubjectPermission semantics).
/// </summary>
/// <param name="subject">Subject to evaluate.</param>
/// <param name="direction">Flow direction used to choose allow/deny lists.</param>
public bool IsSubjectAllowed(string subject, LeafMapDirection direction)
{
var (denyList, allowList) = direction switch

View File

@@ -4,15 +4,34 @@ public static class LeafLoopDetector
{
private const string LeafLoopPrefix = "$LDS.";
/// <summary>
/// Determines whether a subject contains the leaf-loop marker prefix.
/// </summary>
/// <param name="subject">Subject to inspect for loop marker metadata.</param>
public static bool HasLoopMarker(string subject)
=> subject.StartsWith(LeafLoopPrefix, StringComparison.Ordinal);
/// <summary>
/// Prefixes a subject with local loop-detection metadata for leaf forwarding.
/// </summary>
/// <param name="subject">Original subject being forwarded.</param>
/// <param name="serverId">Server identifier appended to the loop marker.</param>
public static string Mark(string subject, string serverId)
=> $"{LeafLoopPrefix}{serverId}.{subject}";
/// <summary>
/// Determines whether the subject indicates a loop back to the local server.
/// </summary>
/// <param name="subject">Forwarded subject containing loop markers.</param>
/// <param name="localServerId">Current server identifier.</param>
public static bool IsLooped(string subject, string localServerId)
=> subject.StartsWith($"{LeafLoopPrefix}{localServerId}.", StringComparison.Ordinal);
/// <summary>
/// Removes all loop markers from a subject and returns the unmarked subject.
/// </summary>
/// <param name="subject">Subject that may contain one or more loop markers.</param>
/// <param name="unmarked">Unmarked subject when removal succeeds.</param>
public static bool TryUnmark(string subject, out string unmarked)
{
unmarked = subject;

View File

@@ -16,6 +16,10 @@ public static class LeafSubKey
public static readonly TimeSpan SharedSysAccDelay = TimeSpan.FromMilliseconds(250);
public static readonly TimeSpan ConnectProcessTimeout = TimeSpan.FromSeconds(2);
/// <summary>
/// Builds canonical subscription key from subject and optional queue group.
/// </summary>
/// <param name="sub">Subscription used to build key components.</param>
public static string KeyFromSub(Subscription sub)
{
ArgumentNullException.ThrowIfNull(sub);
@@ -24,6 +28,11 @@ public static class LeafSubKey
: sub.Subject;
}
/// <summary>
/// Builds canonical subscription key including routed-origin metadata when present.
/// </summary>
/// <param name="sub">Subscription used to build key components.</param>
/// <param name="origin">Optional routed origin identifier.</param>
public static string KeyFromSubWithOrigin(Subscription sub, string? origin = null)
{
ArgumentNullException.ThrowIfNull(sub);

View File

@@ -17,6 +17,11 @@ public sealed class WebSocketStreamAdapter : Stream
private int _readCount;
private bool _disposed;
/// <summary>
/// Creates a stream adapter for a WebSocket-backed leaf-node transport.
/// </summary>
/// <param name="ws">WebSocket transport used for framed binary I/O.</param>
/// <param name="initialBufferSize">Initial receive staging-buffer size.</param>
public WebSocketStreamAdapter(SystemWebSocket ws, int initialBufferSize = 4096)
{
_ws = ws ?? throw new ArgumentNullException(nameof(ws));
@@ -34,10 +39,15 @@ public sealed class WebSocketStreamAdapter : Stream
public override bool CanSeek => false;
// Telemetry properties
/// <summary>Whether the underlying WebSocket is currently open.</summary>
public bool IsConnected => _ws.State == WebSocketState.Open;
/// <summary>Total bytes read from received WebSocket messages.</summary>
public long BytesRead { get; private set; }
/// <summary>Total bytes written to outbound WebSocket messages.</summary>
public long BytesWritten { get; private set; }
/// <summary>Total completed WebSocket messages read.</summary>
public int MessagesRead { get; private set; }
/// <summary>Total completed WebSocket messages written.</summary>
public int MessagesWritten { get; private set; }
/// <inheritdoc />
@@ -196,12 +206,18 @@ public sealed class WebSocketStreamAdapter : Stream
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc />
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
/// <inheritdoc />
public override void SetLength(long value) => throw new NotSupportedException();
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("Use async methods");
/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException("Use async methods");
/// <inheritdoc />
public override void Flush() { }
/// <inheritdoc />
protected override void Dispose(bool disposing)
{
if (_disposed)

View File

@@ -6,11 +6,18 @@ public sealed class AccountzHandler
{
private readonly NatsServer _server;
/// <summary>
/// Creates handler for account-focused monitoring endpoints.
/// </summary>
/// <param name="server">Server instance providing account snapshots.</param>
public AccountzHandler(NatsServer server)
{
_server = server;
}
/// <summary>
/// Builds account overview payload for <c>/accountz</c>.
/// </summary>
public object Build()
{
var accounts = _server.GetAccounts().Select(ToAccountDto).ToArray();
@@ -21,6 +28,9 @@ public sealed class AccountzHandler
};
}
/// <summary>
/// Builds aggregate account statistics payload for <c>/accstatz</c>.
/// </summary>
public object BuildStats()
{
var accounts = _server.GetAccounts().ToArray();

View File

@@ -13,19 +13,32 @@ public sealed class ClosedConnectionRingBuffer
private int _count; // Current count (up to capacity)
private long _totalClosed; // Running total of all closed connections ever
/// <summary>
/// Creates a fixed-size closed-connection ring buffer.
/// </summary>
/// <param name="capacity">Maximum number of recent closed-client snapshots retained.</param>
public ClosedConnectionRingBuffer(int capacity = 1024)
{
if (capacity <= 0) throw new ArgumentOutOfRangeException(nameof(capacity), "Capacity must be greater than zero.");
_buffer = new ClosedClient[capacity];
}
/// <summary>
/// Gets the maximum number of closed-client entries retained before wraparound.
/// </summary>
public int Capacity => _buffer.Length;
/// <summary>
/// Gets the number of currently retained closed-client entries.
/// </summary>
public int Count
{
get { lock (_lock) return _count; }
}
/// <summary>
/// Gets the lifetime total count of closed connections observed by this buffer.
/// </summary>
public long TotalClosed
{
get { lock (_lock) return _totalClosed; }
@@ -34,6 +47,7 @@ public sealed class ClosedConnectionRingBuffer
/// <summary>
/// Adds a closed connection snapshot. If the buffer is full the oldest entry is overwritten.
/// </summary>
/// <param name="info">Closed-client snapshot to append into the ring.</param>
public void Add(ClosedClient info)
{
lock (_lock)
@@ -60,6 +74,7 @@ public sealed class ClosedConnectionRingBuffer
/// <summary>
/// Returns up to <paramref name="count"/> most recent entries, ordered newest-first.
/// </summary>
/// <param name="count">Maximum number of recent entries to return.</param>
public IReadOnlyList<ClosedClient> GetRecent(int count)
{
lock (_lock)

View File

@@ -4,11 +4,18 @@ public sealed class GatewayzHandler
{
private readonly NatsServer _server;
/// <summary>
/// Creates gateway monitoring handler.
/// </summary>
/// <param name="server">Server instance providing gateway metrics.</param>
public GatewayzHandler(NatsServer server)
{
_server = server;
}
/// <summary>
/// Builds gateway metrics payload for <c>/gatewayz</c>.
/// </summary>
public object Build()
{
var gateways = _server.Stats.Gateways;

View File

@@ -8,18 +8,33 @@ namespace NATS.Server.Monitoring;
/// </summary>
public sealed class HealthStatus
{
/// <summary>
/// Gets the overall health status string returned by the monitoring endpoint.
/// </summary>
[JsonPropertyName("status")]
public string Status { get; init; } = "ok";
/// <summary>
/// Gets the HTTP-style status code representing current server health.
/// </summary>
[JsonPropertyName("status_code")]
public int StatusCode { get; init; } = 200;
/// <summary>
/// Gets the top-level error message when health checks fail.
/// </summary>
[JsonPropertyName("error")]
public string? Error { get; init; }
/// <summary>
/// Gets detailed health-check failures contributing to a non-OK status.
/// </summary>
[JsonPropertyName("errors")]
public HealthzError[] Errors { get; init; } = [];
/// <summary>
/// Creates a successful health response payload used by <c>/healthz</c>.
/// </summary>
public static HealthStatus Ok() => new();
}
@@ -29,9 +44,15 @@ public sealed class HealthStatus
/// </summary>
public sealed class HealthzError
{
/// <summary>
/// Gets the subsystem classification for this health failure.
/// </summary>
[JsonPropertyName("type")]
public HealthzErrorType Type { get; init; } = HealthzErrorType.Unknown;
/// <summary>
/// Gets the subsystem-specific failure detail emitted for diagnostics.
/// </summary>
[JsonPropertyName("error")]
public string Error { get; init; } = string.Empty;
}

View File

@@ -7,12 +7,20 @@ public sealed class JszHandler
private readonly NatsServer _server;
private readonly NatsOptions _options;
/// <summary>
/// Creates a JetStream monitoring response builder bound to server runtime state.
/// </summary>
/// <param name="server">Running server instance exposing JetStream counters and IDs.</param>
/// <param name="options">Server options containing JetStream capacity configuration.</param>
public JszHandler(NatsServer server, NatsOptions options)
{
_server = server;
_options = options;
}
/// <summary>
/// Builds a point-in-time <c>/jsz</c> style response from current server state.
/// </summary>
public JszResponse Build()
{
return new JszResponse
@@ -38,33 +46,43 @@ public sealed class JszHandler
public sealed class JszResponse
{
/// <summary>Server identifier for the node producing this response.</summary>
[JsonPropertyName("server_id")]
public string ServerId { get; set; } = string.Empty;
/// <summary>UTC timestamp when this monitoring snapshot was generated.</summary>
[JsonPropertyName("now")]
public DateTime Now { get; set; }
/// <summary>Whether JetStream is enabled on this server.</summary>
[JsonPropertyName("enabled")]
public bool Enabled { get; set; }
/// <summary>JetStream memory usage in bytes.</summary>
[JsonPropertyName("memory")]
public ulong Memory { get; set; }
/// <summary>JetStream file-storage usage in bytes.</summary>
[JsonPropertyName("storage")]
public ulong Storage { get; set; }
/// <summary>Number of JetStream streams currently hosted.</summary>
[JsonPropertyName("streams")]
public int Streams { get; set; }
/// <summary>Number of JetStream consumers currently hosted.</summary>
[JsonPropertyName("consumers")]
public int Consumers { get; set; }
/// <summary>Total number of JetStream API requests handled.</summary>
[JsonPropertyName("api_total")]
public ulong ApiTotal { get; set; }
/// <summary>Total number of JetStream API requests that returned errors.</summary>
[JsonPropertyName("api_errors")]
public ulong ApiErrors { get; set; }
/// <summary>Configured JetStream resource limits and storage directory.</summary>
[JsonPropertyName("config")]
public JetStreamConfig Config { get; set; } = new();
}

View File

@@ -4,11 +4,18 @@ public sealed class LeafzHandler
{
private readonly NatsServer _server;
/// <summary>
/// Creates leaf-node monitoring handler.
/// </summary>
/// <param name="server">Server instance providing leaf metrics.</param>
public LeafzHandler(NatsServer server)
{
_server = server;
}
/// <summary>
/// Builds leaf-node metrics payload for <c>/leafz</c>.
/// </summary>
public object Build()
{
var leafs = _server.Stats.Leafs;

View File

@@ -23,6 +23,13 @@ public sealed class MonitorServer : IAsyncDisposable
private readonly AccountzHandler _accountzHandler;
private readonly PprofHandler _pprofHandler;
/// <summary>
/// Creates monitoring HTTP server wiring and endpoint handlers.
/// </summary>
/// <param name="server">Server runtime state used by monitoring handlers.</param>
/// <param name="options">Monitoring and feature options controlling endpoint behavior.</param>
/// <param name="stats">Shared HTTP request stats counters.</param>
/// <param name="loggerFactory">Logger factory for monitor diagnostics.</param>
public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<MonitorServer>();
@@ -137,12 +144,19 @@ public sealed class MonitorServer : IAsyncDisposable
}
}
/// <summary>
/// Starts the monitoring web server.
/// </summary>
/// <param name="ct">Cancellation token for server startup.</param>
public async Task StartAsync(CancellationToken ct)
{
await _app.StartAsync(ct);
_logger.LogInformation("Monitoring listening on {Urls}", string.Join(", ", _app.Urls));
}
/// <summary>
/// Stops and disposes monitoring server resources.
/// </summary>
public async ValueTask DisposeAsync()
{
await _app.StopAsync();

View File

@@ -8,6 +8,9 @@ namespace NATS.Server.Monitoring;
/// </summary>
public sealed class PprofHandler
{
/// <summary>
/// Returns index content for pprof-compatible endpoint listing.
/// </summary>
public string Index()
{
return """
@@ -21,6 +24,10 @@ public sealed class PprofHandler
""";
}
/// <summary>
/// Captures lightweight CPU profile metadata payload.
/// </summary>
/// <param name="seconds">Requested capture duration in seconds.</param>
public byte[] CaptureCpuProfile(int seconds)
{
var boundedSeconds = Math.Clamp(seconds, 1, 120);

View File

@@ -4,11 +4,18 @@ public sealed class RoutezHandler
{
private readonly NatsServer _server;
/// <summary>
/// Creates route monitoring handler.
/// </summary>
/// <param name="server">Server instance providing route metrics.</param>
public RoutezHandler(NatsServer server)
{
_server = server;
}
/// <summary>
/// Builds route metrics payload for <c>/routez</c>.
/// </summary>
public object Build()
{
var routes = _server.Stats.Routes;

View File

@@ -7,27 +7,35 @@ namespace NATS.Server.Monitoring;
/// </summary>
public sealed class Subsz
{
/// <summary>Server identifier generating this monitoring snapshot.</summary>
[JsonPropertyName("server_id")]
public string Id { get; set; } = "";
/// <summary>UTC timestamp when the subsz response was produced.</summary>
[JsonPropertyName("now")]
public DateTime Now { get; set; }
/// <summary>Total subscription count currently tracked by the server.</summary>
[JsonPropertyName("num_subscriptions")]
public uint NumSubs { get; set; }
/// <summary>Number of entries currently stored in the subscription match cache.</summary>
[JsonPropertyName("num_cache")]
public int NumCache { get; set; }
/// <summary>Total number of subscription records matching the requested filter.</summary>
[JsonPropertyName("total")]
public int Total { get; set; }
/// <summary>Pagination offset applied to the returned subscription list.</summary>
[JsonPropertyName("offset")]
public int Offset { get; set; }
/// <summary>Pagination limit applied to the returned subscription list.</summary>
[JsonPropertyName("limit")]
public int Limit { get; set; }
/// <summary>Subscription detail records included in this page of results.</summary>
[JsonPropertyName("subscriptions")]
public SubDetail[] Subs { get; set; } = [];
}
@@ -37,9 +45,14 @@ public sealed class Subsz
/// </summary>
public sealed class SubszOptions
{
/// <summary>Zero-based offset into the filtered subscription result set.</summary>
public int Offset { get; set; }
/// <summary>Maximum number of subscription records to return.</summary>
public int Limit { get; set; } = 1024;
/// <summary>Whether detailed subscription entries should be returned.</summary>
public bool Subscriptions { get; set; }
/// <summary>Optional account filter limiting results to one account.</summary>
public string Account { get; set; } = "";
/// <summary>Optional subject-pattern filter used to match subscription subjects.</summary>
public string Test { get; set; } = "";
}

View File

@@ -5,6 +5,10 @@ namespace NATS.Server.Monitoring;
internal static class TlsPeerCertMapper
{
/// <summary>
/// Maps an active TLS certificate to monitor API peer-cert shape.
/// </summary>
/// <param name="cert">Peer certificate from a live connection.</param>
public static TLSPeerCert[] FromCertificate(X509Certificate2? cert)
{
if (cert == null)
@@ -21,6 +25,10 @@ internal static class TlsPeerCertMapper
];
}
/// <summary>
/// Maps persisted closed-client TLS fields to monitor API peer-cert shape.
/// </summary>
/// <param name="closed">Closed client record containing persisted TLS fingerprint data.</param>
public static TLSPeerCert[] FromClosedClient(ClosedClient closed)
{
if (string.IsNullOrEmpty(closed.TlsPeerCertSubject))
@@ -37,6 +45,10 @@ internal static class TlsPeerCertMapper
];
}
/// <summary>
/// Converts a certificate into closed-client TLS field tuple values.
/// </summary>
/// <param name="cert">Peer certificate from a live connection.</param>
public static (string Subject, string SubjectPkSha256, string CertSha256) ToClosedFields(X509Certificate2? cert)
{
if (cert == null)

View File

@@ -22,6 +22,12 @@ public sealed class VarzHandler : IDisposable
private TimeSpan _lastCpuUsage;
private double _cachedCpuPercent;
/// <summary>
/// Creates the handler responsible for constructing <c>/varz</c> responses.
/// </summary>
/// <param name="server">Server runtime state used for counters and topology fields.</param>
/// <param name="options">Effective server options reflected in varz output.</param>
/// <param name="loggerFactory">Logger factory for diagnostics.</param>
public VarzHandler(NatsServer server, NatsOptions options, ILoggerFactory loggerFactory)
{
_server = server;
@@ -33,6 +39,10 @@ public sealed class VarzHandler : IDisposable
_cpuSampleTimer = new Timer(_ => SampleCpuUsage(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
}
/// <summary>
/// Builds the current <c>/varz</c> payload snapshot.
/// </summary>
/// <param name="ct">Cancellation token for request lifecycle control.</param>
public async Task<Varz> HandleVarzAsync(CancellationToken ct = default)
{
await _varzMu.WaitAsync(ct);
@@ -148,6 +158,9 @@ public sealed class VarzHandler : IDisposable
}
}
/// <summary>
/// Disposes timer and synchronization primitives used by the handler.
/// </summary>
public void Dispose()
{
_cpuSampleTimer.Dispose();

View File

@@ -32,6 +32,7 @@ public static class MqttPacketReader
/// <summary>
/// Parses a complete MQTT control packet from a contiguous span.
/// </summary>
/// <param name="buffer">Contiguous bytes containing a full MQTT control packet.</param>
public static MqttControlPacket Read(ReadOnlySpan<byte> buffer)
{
if (buffer.Length < 2)
@@ -58,6 +59,9 @@ public static class MqttPacketReader
/// past the packet bytes on success.
/// Used with <see cref="System.IO.Pipelines.PipeReader"/> for incremental parsing.
/// </summary>
/// <param name="buffer">Input byte sequence that may contain zero, one, or partial MQTT packets.</param>
/// <param name="packet">Parsed packet result when a full packet is available.</param>
/// <param name="consumed">Sequence position advanced past the parsed packet bytes.</param>
public static bool TryRead(ReadOnlySequence<byte> buffer, out MqttControlPacket? packet, out SequencePosition consumed)
{
packet = null;
@@ -120,6 +124,11 @@ public static class MqttPacketReader
return true;
}
/// <summary>
/// Decodes the MQTT variable-length remaining-length field.
/// </summary>
/// <param name="encoded">Span starting at the first remaining-length byte.</param>
/// <param name="consumed">Number of bytes consumed to decode the remaining length.</param>
internal static int DecodeRemainingLength(ReadOnlySpan<byte> encoded, out int consumed)
{
var multiplier = 1;

View File

@@ -23,12 +23,26 @@ public sealed record MqttPacket(
public sealed class MqttProtocolParser
{
/// <summary>
/// Parses binary MQTT control packet bytes into a structured packet.
/// </summary>
/// <param name="packet">Contiguous MQTT control packet bytes.</param>
public MqttControlPacket ParsePacket(ReadOnlySpan<byte> packet)
=> MqttPacketReader.Read(packet);
/// <summary>
/// Writes a binary MQTT control packet.
/// </summary>
/// <param name="type">MQTT control packet type.</param>
/// <param name="payload">Packet payload bytes.</param>
/// <param name="flags">Low-nibble control flags for the packet type.</param>
public byte[] WritePacket(MqttControlPacketType type, ReadOnlySpan<byte> payload, byte flags = 0)
=> MqttPacketWriter.Write(type, payload, flags);
/// <summary>
/// Parses a simplified text command line into a test-friendly MQTT packet shape.
/// </summary>
/// <param name="line">Text command line representing an MQTT operation.</param>
public MqttPacket ParseLine(string line)
{
var trimmed = line.Trim();

View File

@@ -25,6 +25,9 @@ public sealed class MqttQoS1Tracker
/// Registers an outgoing QoS 1 message and assigns a packet ID.
/// Returns the assigned packet ID.
/// </summary>
/// <param name="topic">MQTT topic for the outbound message.</param>
/// <param name="payload">Outbound payload bytes.</param>
/// <param name="streamSequence">Optional JetStream stream sequence tied to this delivery.</param>
public ushort Register(string topic, byte[] payload, ulong streamSequence = 0)
{
var id = GetNextPacketId();
@@ -44,6 +47,7 @@ public sealed class MqttQoS1Tracker
/// Acknowledges receipt of a PUBACK for the given packet ID.
/// Returns the pending message if found, or null.
/// </summary>
/// <param name="packetId">MQTT packet identifier from PUBACK.</param>
public QoS1PendingMessage? Acknowledge(ushort packetId)
{
return _pending.TryRemove(packetId, out var msg) ? msg : null;
@@ -69,6 +73,7 @@ public sealed class MqttQoS1Tracker
/// <summary>
/// Checks if a packet ID is pending acknowledgment.
/// </summary>
/// <param name="packetId">MQTT packet identifier to check.</param>
public bool IsPending(ushort packetId) => _pending.ContainsKey(packetId);
/// <summary>Clears all pending messages.</summary>
@@ -90,10 +95,15 @@ public sealed class MqttQoS1Tracker
/// </summary>
public sealed class QoS1PendingMessage
{
/// <summary>MQTT packet identifier assigned to this outbound QoS1 message.</summary>
public ushort PacketId { get; init; }
/// <summary>MQTT topic associated with the message.</summary>
public string Topic { get; init; } = string.Empty;
/// <summary>Outbound payload bytes awaiting PUBACK.</summary>
public byte[] Payload { get; init; } = [];
/// <summary>UTC timestamp when this message was last sent.</summary>
public DateTime SentAtUtc { get; set; }
/// <summary>Number of delivery attempts for this packet.</summary>
public int DeliveryCount { get; set; } = 1;
/// <summary>

View File

@@ -34,6 +34,7 @@ public static class MqttTopicMapper
/// Returns the MQTT topic as pre-encoded UTF-8 bytes, using a bounded cache
/// to avoid repeated string translation and encoding on the hot path.
/// </summary>
/// <param name="natsSubject">NATS subject to convert into MQTT topic wire bytes.</param>
public static byte[] NatsToMqttBytes(string natsSubject)
{
if (TopicBytesCache.TryGetValue(natsSubject, out var cached))
@@ -65,6 +66,7 @@ public static class MqttTopicMapper
/// Translates an MQTT topic or filter to a NATS subject.
/// Handles wildcards, dot escaping, empty levels, and '$' prefix protection.
/// </summary>
/// <param name="mqttTopic">MQTT topic or filter received from client protocol operations.</param>
public static string MqttToNats(string mqttTopic)
{
if (mqttTopic.Length == 0)
@@ -102,6 +104,7 @@ public static class MqttTopicMapper
/// Translates a NATS subject back to an MQTT topic.
/// Reverses the mapping: '.' → '/', '*' → '+', '>' → '#', '_DOT_' → '.'.
/// </summary>
/// <param name="natsSubject">NATS subject to map back into MQTT topic syntax.</param>
public static string NatsToMqtt(string natsSubject)
{
if (natsSubject.Length == 0)
@@ -152,6 +155,7 @@ public static class MqttTopicMapper
/// NOT be matched by wildcard subscriptions (MQTT spec [MQTT-4.7.2-1]).
/// Topics starting with '$' are reserved for system/server use.
/// </summary>
/// <param name="mqttTopic">Concrete MQTT topic name to inspect.</param>
public static bool IsDollarTopic(string mqttTopic)
=> mqttTopic.Length > 0 && mqttTopic[0] == '$';
@@ -159,6 +163,7 @@ public static class MqttTopicMapper
/// Returns true if an MQTT topic filter starts with '$', indicating
/// it explicitly targets system topics.
/// </summary>
/// <param name="mqttFilter">MQTT filter expression supplied by a subscription.</param>
public static bool IsDollarFilter(string mqttFilter)
=> mqttFilter.Length > 0 && mqttFilter[0] == '$';
@@ -167,6 +172,8 @@ public static class MqttTopicMapper
/// Per MQTT spec, wildcard filters (starting with '#' or '+') must NOT
/// match topics beginning with '$'. Only explicit '$' filters match '$' topics.
/// </summary>
/// <param name="mqttFilter">MQTT subscription filter being evaluated.</param>
/// <param name="mqttTopic">Topic candidate that may start with a reserved <c>$</c> prefix.</param>
public static bool WildcardMatchesDollarTopic(string mqttFilter, string mqttTopic)
{
if (!IsDollarTopic(mqttTopic))

View File

@@ -6,8 +6,15 @@ public sealed record MessageTraceContext(
string? ClientVersion,
bool HeadersEnabled)
{
/// <summary>
/// Gets shared empty trace context for clients without CONNECT metadata.
/// </summary>
public static MessageTraceContext Empty { get; } = new(null, null, null, false);
/// <summary>
/// Builds trace context from CONNECT options.
/// </summary>
/// <param name="connectOpts">Client CONNECT options payload.</param>
public static MessageTraceContext CreateFromConnect(ClientOptions? connectOpts)
{
if (connectOpts == null)

View File

@@ -5,8 +5,19 @@ namespace NATS.Server.Protocol;
public readonly struct NatsHeaders()
{
/// <summary>
/// Gets parsed status code from the <c>NATS/1.0</c> status line.
/// </summary>
public int Status { get; init; }
/// <summary>
/// Gets optional status description text from the <c>NATS/1.0</c> status line.
/// </summary>
public string Description { get; init; } = string.Empty;
/// <summary>
/// Gets parsed header key/value pairs, preserving repeated header values.
/// </summary>
public IReadOnlyDictionary<string, string[]> Headers { get; init; } = ReadOnlyDictionary<string, string[]>.Empty;
public static readonly NatsHeaders Invalid = new()
@@ -22,6 +33,10 @@ public static class NatsHeaderParser
private static ReadOnlySpan<byte> CrLf => "\r\n"u8;
private static ReadOnlySpan<byte> Prefix => "NATS/1.0"u8;
/// <summary>
/// Parses NATS header block bytes into status and header key/value data.
/// </summary>
/// <param name="data">Raw header bytes beginning with <c>NATS/1.0</c>.</param>
public static NatsHeaders Parse(ReadOnlySpan<byte> data)
{
if (data.Length < Prefix.Length)

View File

@@ -5,24 +5,44 @@ namespace NATS.Server.Protocol;
public readonly struct ParsedCommandView
{
/// <summary>Parsed command type used for protocol dispatch.</summary>
public CommandType Type { get; init; }
/// <summary>Original protocol operation token (for example <c>PUB</c> or <c>SUB</c>).</summary>
public string? Operation { get; init; }
/// <summary>Raw subject bytes from the control line.</summary>
public ReadOnlyMemory<byte> Subject { get; init; }
/// <summary>Raw reply subject bytes for request-reply commands.</summary>
public ReadOnlyMemory<byte> ReplyTo { get; init; }
/// <summary>Raw queue-group name bytes for queue subscriptions.</summary>
public ReadOnlyMemory<byte> Queue { get; init; }
/// <summary>Raw subscription identifier bytes.</summary>
public ReadOnlyMemory<byte> Sid { get; init; }
/// <summary>Maximum messages value for <c>UNSUB</c>; <c>-1</c> when unspecified.</summary>
public int MaxMessages { get; init; }
/// <summary>Header size for <c>HPUB</c>; <c>-1</c> when not applicable.</summary>
public int HeaderSize { get; init; }
/// <summary>Zero-copy payload sequence view from the network buffer.</summary>
public ReadOnlySequence<byte> Payload { get; init; }
/// <summary>
/// Creates a simple command view for control-line-only operations.
/// </summary>
/// <param name="type">Command type.</param>
/// <param name="operation">Operation token used for diagnostics/tracing.</param>
public static ParsedCommandView Simple(CommandType type, string operation) =>
new() { Type = type, Operation = operation, MaxMessages = -1 };
/// <summary>
/// Materializes payload bytes as a contiguous <see cref="ReadOnlyMemory{T}"/>.
/// </summary>
public ReadOnlyMemory<byte> GetPayloadMemory() =>
Payload.IsEmpty ? ReadOnlyMemory<byte>.Empty
: Payload.IsSingleSegment ? Payload.First
: Payload.ToArray();
/// <summary>
/// Converts this view into a materialized <see cref="ParsedCommand"/> object.
/// </summary>
public ParsedCommand Materialize() =>
new()
{

View File

@@ -6,6 +6,10 @@ public static class ProtoWire
public const string ErrProtoOverflow = "too much data for a value";
public const string ErrProtoInvalidFieldNumber = "invalid field number";
/// <summary>
/// Scans a protobuf field (tag and value) and returns field metadata with consumed size.
/// </summary>
/// <param name="buffer">Input bytes beginning at a protobuf field tag.</param>
public static (int Number, int WireType, int Size) ScanField(ReadOnlySpan<byte> buffer)
{
var (number, wireType, tagSize) = ScanTag(buffer);
@@ -13,6 +17,10 @@ public static class ProtoWire
return (number, wireType, tagSize + valueSize);
}
/// <summary>
/// Scans a protobuf tag and returns field number, wire type, and bytes consumed.
/// </summary>
/// <param name="buffer">Input bytes beginning at a protobuf field tag varint.</param>
public static (int Number, int WireType, int Size) ScanTag(ReadOnlySpan<byte> buffer)
{
var (tag, size) = ScanVarint(buffer);
@@ -23,6 +31,11 @@ public static class ProtoWire
return ((int)fieldNumber, (int)(tag & 0x7), size);
}
/// <summary>
/// Scans the encoded value size for a protobuf field using its wire type.
/// </summary>
/// <param name="wireType">Protobuf wire type from the field tag.</param>
/// <param name="buffer">Input bytes beginning at the field value payload.</param>
public static int ScanFieldValue(int wireType, ReadOnlySpan<byte> buffer)
{
return wireType switch
@@ -35,6 +48,10 @@ public static class ProtoWire
};
}
/// <summary>
/// Reads a protobuf varint value and returns decoded value and bytes consumed.
/// </summary>
/// <param name="buffer">Input bytes beginning at a varint.</param>
public static (ulong Value, int Size) ScanVarint(ReadOnlySpan<byte> buffer)
{
ulong value = 0;
@@ -62,6 +79,10 @@ public static class ProtoWire
throw new ProtoWireException(ErrProtoOverflow);
}
/// <summary>
/// Scans a length-delimited protobuf field and returns total encoded size.
/// </summary>
/// <param name="buffer">Input bytes beginning at the field's length varint.</param>
public static int ScanBytes(ReadOnlySpan<byte> buffer)
{
var (length, lenSize) = ScanVarint(buffer);
@@ -71,6 +92,10 @@ public static class ProtoWire
return lenSize + (int)length;
}
/// <summary>
/// Encodes an unsigned integer into protobuf varint format.
/// </summary>
/// <param name="value">Unsigned value to encode.</param>
public static byte[] EncodeVarint(ulong value)
{
Span<byte> scratch = stackalloc byte[10];

View File

@@ -19,18 +19,22 @@ public sealed class CommitQueue<T>
/// <summary>
/// Enqueues an item for state machine application.
/// </summary>
/// <param name="item">Committed item to enqueue for downstream application.</param>
/// <param name="ct">Cancellation token for the enqueue operation.</param>
public ValueTask EnqueueAsync(T item, CancellationToken ct = default)
=> _channel.Writer.WriteAsync(item, ct);
/// <summary>
/// Dequeues the next committed entry, waiting if none are available.
/// </summary>
/// <param name="ct">Cancellation token for the dequeue wait.</param>
public ValueTask<T> DequeueAsync(CancellationToken ct = default)
=> _channel.Reader.ReadAsync(ct);
/// <summary>
/// Attempts a non-blocking dequeue. Returns true if an item was available.
/// </summary>
/// <param name="item">Dequeued item when available; otherwise default value.</param>
public bool TryDequeue(out T? item)
=> _channel.Reader.TryRead(out item);

View File

@@ -7,14 +7,39 @@ namespace NATS.Server.Raft;
/// </summary>
public sealed class RaftConfig
{
/// <summary>
/// Gets or sets the logical name for this RAFT group (meta, stream, or consumer group id).
/// </summary>
public string Name { get; set; } = string.Empty;
// Store/log abstractions are intentionally loose until full WAL/store parity is wired.
/// <summary>
/// Gets or sets the storage backend that persists RAFT snapshots and stable state.
/// </summary>
public object? Store { get; set; }
/// <summary>
/// Gets or sets the write-ahead log implementation used for replicated entries.
/// </summary>
public object? Log { get; set; }
/// <summary>
/// Gets or sets a value indicating whether this node tracks replication progress metrics.
/// </summary>
public bool Track { get; set; }
/// <summary>
/// Gets or sets a value indicating whether this node is a non-voting observer.
/// </summary>
public bool Observer { get; set; }
/// <summary>
/// Gets or sets a value indicating whether the group is replaying log state during recovery.
/// </summary>
public bool Recovering { get; set; }
/// <summary>
/// Gets or sets a value indicating whether this node is participating in a membership scale-up.
/// </summary>
public bool ScaleUp { get; set; }
}

View File

@@ -6,7 +6,14 @@ namespace NATS.Server.Raft;
/// </summary>
public readonly record struct RaftEntry(RaftEntryType Type, byte[] Data)
{
/// <summary>
/// Converts this entry into transport-wire shape.
/// </summary>
public RaftEntryWire ToWire() => new(Type, Data);
/// <summary>
/// Reconstructs an entry from transport-wire shape.
/// </summary>
/// <param name="wire">Wire entry carrying type and payload bytes.</param>
public static RaftEntry FromWire(RaftEntryWire wire) => new(wire.Type, wire.Data);
}

View File

@@ -5,6 +5,7 @@ public sealed class RaftLog
private readonly List<RaftLogEntry> _entries = [];
private long _baseIndex;
/// <summary>Current in-memory RAFT log entries after base-index compaction.</summary>
public IReadOnlyList<RaftLogEntry> Entries => _entries;
/// <summary>
@@ -12,6 +13,11 @@ public sealed class RaftLog
/// </summary>
public long BaseIndex => _baseIndex;
/// <summary>
/// Appends a new local command entry to the RAFT log.
/// </summary>
/// <param name="term">Current leader term for the appended entry.</param>
/// <param name="command">Serialized state-machine command payload.</param>
public RaftLogEntry Append(int term, string command)
{
var entry = new RaftLogEntry(_baseIndex + _entries.Count + 1, term, command);
@@ -23,6 +29,9 @@ public sealed class RaftLog
/// Appends an entry with an explicit timestamp. Used in tests and by the
/// <see cref="CompactionPolicy.ByAge"/> policy to set controlled creation times.
/// </summary>
/// <param name="term">Current leader term for the appended entry.</param>
/// <param name="command">Serialized state-machine command payload.</param>
/// <param name="timestamp">Explicit UTC timestamp assigned to the new entry.</param>
public RaftLogEntry AppendWithTimestamp(int term, string command, DateTime timestamp)
{
var entry = new RaftLogEntry(_baseIndex + _entries.Count + 1, term, command)
@@ -33,6 +42,10 @@ public sealed class RaftLog
return entry;
}
/// <summary>
/// Appends an entry received through replication if it is not already present.
/// </summary>
/// <param name="entry">Replicated RAFT log entry from a leader.</param>
public void AppendReplicated(RaftLogEntry entry)
{
if (_entries.Any(e => e.Index == entry.Index))
@@ -41,6 +54,10 @@ public sealed class RaftLog
_entries.Add(entry);
}
/// <summary>
/// Replaces current log entries with snapshot base state after snapshot install.
/// </summary>
/// <param name="snapshot">Snapshot metadata defining the new base index.</param>
public void ReplaceWithSnapshot(RaftSnapshot snapshot)
{
_entries.Clear();
@@ -52,6 +69,7 @@ public sealed class RaftLog
/// This is log compaction: entries covered by a snapshot are discarded.
/// Go reference: raft.go WAL compact / compactLog.
/// </summary>
/// <param name="upToIndex">Highest included index to compact away.</param>
public void Compact(long upToIndex)
{
var removeCount = _entries.Count(e => e.Index <= upToIndex);
@@ -62,6 +80,11 @@ public sealed class RaftLog
}
}
/// <summary>
/// Persists current RAFT log state to disk.
/// </summary>
/// <param name="path">Destination path for the persisted log file.</param>
/// <param name="ct">Cancellation token for asynchronous I/O.</param>
public async Task PersistAsync(string path, CancellationToken ct)
{
Directory.CreateDirectory(Path.GetDirectoryName(path)!);
@@ -73,6 +96,11 @@ public sealed class RaftLog
await File.WriteAllTextAsync(path, System.Text.Json.JsonSerializer.Serialize(model), ct);
}
/// <summary>
/// Loads RAFT log state from disk when a persisted file exists.
/// </summary>
/// <param name="path">Path to the persisted RAFT log file.</param>
/// <param name="ct">Cancellation token for asynchronous I/O.</param>
public static async Task<RaftLog> LoadAsync(string path, CancellationToken ct)
{
var log = new RaftLog();
@@ -88,7 +116,9 @@ public sealed class RaftLog
private sealed class PersistedLog
{
/// <summary>Base index recorded after compaction.</summary>
public long BaseIndex { get; set; }
/// <summary>Persisted RAFT log entries above <see cref="BaseIndex"/>.</summary>
public List<RaftLogEntry> Entries { get; set; } = [];
}
}

View File

@@ -52,6 +52,7 @@ public sealed class RaftPeerState
/// <summary>
/// Refreshes the cached Current flag using the provided freshness window.
/// </summary>
/// <param name="window">Freshness window used to evaluate last-contact recency.</param>
public void RefreshCurrent(TimeSpan window)
=> Current = DateTime.UtcNow - LastContact < window;
@@ -59,6 +60,7 @@ public sealed class RaftPeerState
/// Returns true if this peer has been contacted within the election timeout window.
/// Go reference: raft.go isCurrent check.
/// </summary>
/// <param name="electionTimeout">Election-timeout recency threshold.</param>
public bool IsCurrent(TimeSpan electionTimeout)
{
RefreshCurrent(electionTimeout);
@@ -68,6 +70,7 @@ public sealed class RaftPeerState
/// <summary>
/// Returns true if this peer is both active and has been contacted within the health threshold.
/// </summary>
/// <param name="healthThreshold">Health recency threshold for considering a peer current.</param>
public bool IsHealthy(TimeSpan healthThreshold)
{
RefreshCurrent(healthThreshold);

Some files were not shown because too many files have changed in this diff Show More