feat: port session 11 — Accounts & Directory JWT Store

- Account: full Account class (200 features) with subject mappings,
  connection counting, export/import checks, expiration timers
- DirJwtStore: directory-based JWT storage with sharding and expiry
- AccountResolver: IAccountResolver, MemoryAccountResolver,
  UrlAccountResolver, DirAccountResolver, CacheDirAccountResolver
- AccountTypes: all supporting types (AccountLimits, SConns, ExportMap,
  ImportMap, ServiceExport, StreamExport, ServiceLatency, etc.)
- 34 unit tests (599 total), 234 features complete (IDs 150-349, 793-826)
This commit is contained in:
Joseph Doherty
2026-02-26 15:37:08 -05:00
parent 06779a1f77
commit 12a14ec476
14 changed files with 5297 additions and 66 deletions

View File

@@ -0,0 +1,525 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/accounts.go in the NATS server Go source.
using System.Collections.Concurrent;
using System.Net;
using System.Net.Http;
namespace ZB.MOM.NatsNet.Server;
// ============================================================================
// IAccountResolver
// Mirrors Go AccountResolver interface (accounts.go ~line 4035).
// ============================================================================
/// <summary>
/// Resolves and stores account JWTs by account public key name.
/// Mirrors Go <c>AccountResolver</c> interface.
/// </summary>
public interface IAccountResolver
{
/// <summary>
/// Fetches the JWT for the named account.
/// Throws <see cref="InvalidOperationException"/> when the account is not found.
/// Mirrors Go <c>AccountResolver.Fetch</c>.
/// </summary>
Task<string> FetchAsync(string name, CancellationToken ct = default);
/// <summary>
/// Stores the JWT for the named account.
/// Read-only implementations throw <see cref="NotSupportedException"/>.
/// Mirrors Go <c>AccountResolver.Store</c>.
/// </summary>
Task StoreAsync(string name, string jwt, CancellationToken ct = default);
/// <summary>Returns true when no writes are permitted. Mirrors Go <c>IsReadOnly</c>.</summary>
bool IsReadOnly();
/// <summary>
/// Starts any background processing needed by the resolver (system subscriptions, timers, etc.).
/// The <paramref name="server"/> parameter accepts an <c>object</c> to avoid a circular assembly
/// reference; implementations should cast it to the concrete server type as needed.
/// Mirrors Go <c>AccountResolver.Start</c>.
/// </summary>
void Start(object server);
/// <summary>Returns true when the resolver reacts to JWT update events. Mirrors Go <c>IsTrackingUpdate</c>.</summary>
bool IsTrackingUpdate();
/// <summary>Reloads state from the backing store. Mirrors Go <c>AccountResolver.Reload</c>.</summary>
void Reload();
/// <summary>Releases resources held by the resolver. Mirrors Go <c>AccountResolver.Close</c>.</summary>
void Close();
}
// ============================================================================
// ResolverDefaultsOps
// Mirrors Go resolverDefaultsOpsImpl (accounts.go ~line 4046).
// ============================================================================
/// <summary>
/// Abstract base that provides sensible no-op / read-only defaults for <see cref="IAccountResolver"/>
/// so concrete implementations only need to override what they change.
/// Mirrors Go <c>resolverDefaultsOpsImpl</c>.
/// </summary>
public abstract class ResolverDefaultsOps : IAccountResolver
{
/// <inheritdoc/>
public abstract Task<string> FetchAsync(string name, CancellationToken ct = default);
/// <summary>
/// Default store implementation — always throws because the base defaults to read-only.
/// Mirrors Go <c>resolverDefaultsOpsImpl.Store</c>.
/// </summary>
public virtual Task StoreAsync(string name, string jwt, CancellationToken ct = default)
=> throw new NotSupportedException("store operation not supported");
/// <summary>Default: the resolver is read-only. Mirrors Go <c>resolverDefaultsOpsImpl.IsReadOnly</c>.</summary>
public virtual bool IsReadOnly() => true;
/// <summary>Default: no-op start. Mirrors Go <c>resolverDefaultsOpsImpl.Start</c>.</summary>
public virtual void Start(object server) { }
/// <summary>Default: does not track updates. Mirrors Go <c>resolverDefaultsOpsImpl.IsTrackingUpdate</c>.</summary>
public virtual bool IsTrackingUpdate() => false;
/// <summary>Default: no-op reload. Mirrors Go <c>resolverDefaultsOpsImpl.Reload</c>.</summary>
public virtual void Reload() { }
/// <summary>Default: no-op close. Mirrors Go <c>resolverDefaultsOpsImpl.Close</c>.</summary>
public virtual void Close() { }
}
// ============================================================================
// MemoryAccountResolver
// Mirrors Go MemAccResolver (accounts.go ~line 4072).
// ============================================================================
/// <summary>
/// An in-memory account resolver backed by a <see cref="ConcurrentDictionary{TKey,TValue}"/>.
/// Primarily intended for testing.
/// Mirrors Go <c>MemAccResolver</c>.
/// </summary>
public sealed class MemoryAccountResolver : ResolverDefaultsOps
{
private readonly ConcurrentDictionary<string, string> _store = new(StringComparer.Ordinal);
/// <summary>In-memory resolver is not read-only.</summary>
public override bool IsReadOnly() => false;
/// <summary>
/// Returns the stored JWT for <paramref name="name"/>, or throws
/// <see cref="InvalidOperationException"/> when the account is unknown.
/// Mirrors Go <c>MemAccResolver.Fetch</c>.
/// </summary>
public override Task<string> FetchAsync(string name, CancellationToken ct = default)
{
if (_store.TryGetValue(name, out var jwt))
{
return Task.FromResult(jwt);
}
throw new InvalidOperationException($"Account not found: {name}");
}
/// <summary>
/// Stores <paramref name="jwt"/> for <paramref name="name"/>.
/// Mirrors Go <c>MemAccResolver.Store</c>.
/// </summary>
public override Task StoreAsync(string name, string jwt, CancellationToken ct = default)
{
_store[name] = jwt;
return Task.CompletedTask;
}
}
// ============================================================================
// UrlAccountResolver
// Mirrors Go URLAccResolver (accounts.go ~line 4097).
// ============================================================================
/// <summary>
/// An HTTP-based account resolver that fetches JWTs by appending the account public key
/// to a configured base URL.
/// Mirrors Go <c>URLAccResolver</c>.
/// </summary>
public sealed class UrlAccountResolver : ResolverDefaultsOps
{
// Mirrors Go DEFAULT_ACCOUNT_FETCH_TIMEOUT.
private static readonly TimeSpan DefaultAccountFetchTimeout = TimeSpan.FromSeconds(2);
private readonly string _url;
private readonly HttpClient _httpClient;
/// <summary>
/// Creates a new URL resolver for the given <paramref name="url"/>.
/// A trailing slash is appended when absent so that account names can be concatenated
/// directly. An <see cref="HttpClient"/> is configured with connection-pooling
/// settings that amortise TLS handshakes across requests, mirroring Go's custom
/// <c>http.Transport</c>.
/// Mirrors Go <c>NewURLAccResolver</c>.
/// </summary>
public UrlAccountResolver(string url)
{
if (!url.EndsWith('/'))
{
url += "/";
}
_url = url;
// Mirror Go: MaxIdleConns=10, IdleConnTimeout=30s on a custom transport.
var handler = new SocketsHttpHandler
{
MaxConnectionsPerServer = 10,
PooledConnectionIdleTimeout = TimeSpan.FromSeconds(30),
};
_httpClient = new HttpClient(handler)
{
Timeout = DefaultAccountFetchTimeout,
};
}
/// <summary>
/// Issues an HTTP GET to the base URL with the account name appended, and returns
/// the response body as the JWT string.
/// Throws <see cref="InvalidOperationException"/> on a non-200 response.
/// Mirrors Go <c>URLAccResolver.Fetch</c>.
/// </summary>
public override async Task<string> FetchAsync(string name, CancellationToken ct = default)
{
var requestUrl = _url + name;
HttpResponseMessage response;
try
{
response = await _httpClient.GetAsync(requestUrl, ct).ConfigureAwait(false);
}
catch (Exception ex)
{
throw new InvalidOperationException($"could not fetch <\"{requestUrl}\">: {ex.Message}", ex);
}
using (response)
{
if (response.StatusCode != HttpStatusCode.OK)
{
throw new InvalidOperationException(
$"could not fetch <\"{requestUrl}\">: {(int)response.StatusCode} {response.ReasonPhrase}");
}
return await response.Content.ReadAsStringAsync(ct).ConfigureAwait(false);
}
}
}
// ============================================================================
// DirResOption — functional option for DirAccountResolver
// Mirrors Go DirResOption func type (accounts.go ~line 4552).
// ============================================================================
/// <summary>
/// A functional option that configures a <see cref="DirAccountResolver"/> instance.
/// Mirrors Go <c>DirResOption</c> function type.
/// </summary>
public delegate void DirResOption(DirAccountResolver resolver);
/// <summary>
/// Factory methods for commonly used <see cref="DirResOption"/> values.
/// </summary>
public static class DirResOptions
{
/// <summary>
/// Returns an option that overrides the default fetch timeout.
/// <paramref name="timeout"/> must be positive.
/// Mirrors Go <c>FetchTimeout</c> option constructor.
/// </summary>
/// <exception cref="ArgumentOutOfRangeException">
/// Thrown at application time when <paramref name="timeout"/> is not positive.
/// </exception>
public static DirResOption FetchTimeout(TimeSpan timeout)
{
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(timeout),
$"Fetch timeout {timeout} is too small");
}
return resolver => resolver.FetchTimeout = timeout;
}
}
// ============================================================================
// DirAccountResolver (stub)
// Mirrors Go DirAccResolver (accounts.go ~line 4143).
// Full system-subscription wiring is deferred to session 12.
// ============================================================================
/// <summary>
/// A directory-backed account resolver that stores JWTs in a <see cref="DirJwtStore"/>
/// and synchronises with peers via NATS system subjects.
/// <para>
/// The Start override that wires up system subscriptions and the periodic sync goroutine
/// is a stub in this session; full implementation requires JetStream and system
/// subscription support (session 12+).
/// </para>
/// Mirrors Go <c>DirAccResolver</c>.
/// </summary>
public class DirAccountResolver : ResolverDefaultsOps, IDisposable
{
// Default fetch timeout — mirrors Go DEFAULT_ACCOUNT_FETCH_TIMEOUT (2 s).
private static readonly TimeSpan DefaultFetchTimeout = TimeSpan.FromSeconds(2);
// Default sync interval — mirrors Go's fallback of 1 minute.
private static readonly TimeSpan DefaultSyncInterval = TimeSpan.FromMinutes(1);
/// <summary>The underlying directory JWT store. Mirrors Go <c>DirAccResolver.DirJWTStore</c>.</summary>
public DirJwtStore Store { get; }
/// <summary>Reference to the running server, set during <see cref="Start"/>. Mirrors Go <c>DirAccResolver.Server</c>.</summary>
public object? Server { get; protected set; }
/// <summary>How often the resolver sends a sync (pack) request to peers. Mirrors Go <c>DirAccResolver.syncInterval</c>.</summary>
public TimeSpan SyncInterval { get; protected set; }
/// <summary>Maximum time to wait for a remote JWT fetch. Mirrors Go <c>DirAccResolver.fetchTimeout</c>.</summary>
public TimeSpan FetchTimeout { get; set; }
/// <summary>
/// Creates a new directory account resolver.
/// <para>
/// When <paramref name="limit"/> is zero it is promoted to <see cref="long.MaxValue"/> (unlimited).
/// When <paramref name="syncInterval"/> is non-positive it defaults to one minute.
/// </para>
/// Mirrors Go <c>NewDirAccResolver</c>.
/// </summary>
/// <param name="path">Directory path for the JWT store.</param>
/// <param name="limit">Maximum number of JWTs the store may hold (0 = unlimited).</param>
/// <param name="syncInterval">How often to broadcast a sync/pack request to peers.</param>
/// <param name="deleteType">Controls whether deletes are soft- or hard-deleted.</param>
/// <param name="opts">Zero or more functional options to further configure this instance.</param>
public DirAccountResolver(
string path,
long limit,
TimeSpan syncInterval,
JwtDeleteType deleteType,
params DirResOption[] opts)
{
if (limit == 0)
{
limit = long.MaxValue;
}
if (syncInterval <= TimeSpan.Zero)
{
syncInterval = DefaultSyncInterval;
}
Store = DirJwtStore.NewExpiringDirJwtStore(
path,
shard: false,
create: true,
deleteType,
expireCheck: TimeSpan.Zero,
limit,
evictOnLimit: false,
ttl: TimeSpan.Zero,
changeNotification: null);
SyncInterval = syncInterval;
FetchTimeout = DefaultFetchTimeout;
Apply(opts);
}
// Internal constructor used by CacheDirAccountResolver which supplies its own store.
internal DirAccountResolver(
DirJwtStore store,
TimeSpan syncInterval,
TimeSpan fetchTimeout)
{
Store = store;
SyncInterval = syncInterval;
FetchTimeout = fetchTimeout;
}
/// <summary>
/// Applies a sequence of functional options to this resolver.
/// Mirrors Go <c>DirAccResolver.apply</c>.
/// </summary>
protected void Apply(IEnumerable<DirResOption> opts)
{
foreach (var opt in opts)
{
opt(this);
}
}
// -------------------------------------------------------------------------
// IAccountResolver overrides
// -------------------------------------------------------------------------
/// <summary>
/// DirAccountResolver is not read-only.
/// Mirrors Go: DirAccResolver does not override IsReadOnly, so it inherits false
/// from the concrete behaviour (store is writable).
/// </summary>
public override bool IsReadOnly() => false;
/// <summary>
/// Tracks updates (reacts to JWT change events).
/// Mirrors Go <c>DirAccResolver.IsTrackingUpdate</c>.
/// </summary>
public override bool IsTrackingUpdate() => true;
/// <summary>
/// Reloads state from the backing <see cref="DirJwtStore"/>.
/// Mirrors Go <c>DirAccResolver.Reload</c>.
/// </summary>
public override void Reload() => Store.Reload();
/// <summary>
/// Fetches the JWT for <paramref name="name"/> from the local <see cref="DirJwtStore"/>.
/// Throws <see cref="InvalidOperationException"/> when the account is not found locally.
/// <para>
/// Note: the Go implementation falls back to <c>srv.fetch</c> (a cluster-wide lookup) when
/// the local store misses. That fallback requires system subscriptions and is deferred to
/// session 12. For now this method only consults the local store.
/// </para>
/// Mirrors Go <c>DirAccResolver.Fetch</c> (local path only).
/// </summary>
public override Task<string> FetchAsync(string name, CancellationToken ct = default)
{
var theJwt = Store.LoadAcc(name);
if (!string.IsNullOrEmpty(theJwt))
{
return Task.FromResult(theJwt);
}
throw new InvalidOperationException($"Account not found: {name}");
}
/// <summary>
/// Stores <paramref name="jwt"/> under <paramref name="name"/>, keeping the newer JWT
/// when a conflicting entry already exists.
/// Mirrors Go <c>DirAccResolver.Store</c> (delegates to <c>saveIfNewer</c>).
/// </summary>
public override Task StoreAsync(string name, string jwt, CancellationToken ct = default)
{
// SaveAcc is equivalent to saveIfNewer in the DirJwtStore implementation.
Store.SaveAcc(name, jwt);
return Task.CompletedTask;
}
/// <summary>
/// Starts background system subscriptions and the periodic sync timer.
/// <para>
/// TODO (session 12): wire up system subscriptions for account JWT update/lookup/pack
/// requests, cluster synchronisation, and the periodic pack broadcast goroutine.
/// </para>
/// Mirrors Go <c>DirAccResolver.Start</c>.
/// </summary>
public override void Start(object server)
{
Server = server;
// TODO (session 12): set up system subscriptions and periodic sync timer.
}
/// <summary>
/// Stops background processing and closes the <see cref="DirJwtStore"/>.
/// Mirrors Go <c>AccountResolver.Close</c> (no explicit Go override; store is closed
/// by the server shutdown path).
/// </summary>
public override void Close() => Store.Close();
/// <inheritdoc/>
public void Dispose() => Store.Dispose();
}
// ============================================================================
// CacheDirAccountResolver (stub)
// Mirrors Go CacheDirAccResolver (accounts.go ~line 4594).
// ============================================================================
/// <summary>
/// A caching variant of <see cref="DirAccountResolver"/> that uses a TTL-based expiring
/// store so that fetched JWTs are automatically evicted after <see cref="Ttl"/>.
/// <para>
/// The Start override that wires up system subscriptions is a stub in this session;
/// full implementation requires system subscription support (session 12+).
/// </para>
/// Mirrors Go <c>CacheDirAccResolver</c>.
/// </summary>
public sealed class CacheDirAccountResolver : DirAccountResolver
{
// Default cache limit — mirrors Go's fallback of 1 000 entries.
private const long DefaultCacheLimit = 1_000;
// Default fetch timeout — mirrors Go DEFAULT_ACCOUNT_FETCH_TIMEOUT (2 s).
private static readonly TimeSpan DefaultFetchTimeout = TimeSpan.FromSeconds(2);
/// <summary>The TTL applied to each cached JWT entry. Mirrors Go <c>CacheDirAccResolver.ttl</c>.</summary>
public TimeSpan Ttl { get; }
/// <summary>
/// Creates a new caching directory account resolver.
/// <para>
/// When <paramref name="limit"/> is zero or negative it defaults to 1 000.
/// </para>
/// Mirrors Go <c>NewCacheDirAccResolver</c>.
/// </summary>
/// <param name="path">Directory path for the JWT store.</param>
/// <param name="limit">Maximum number of JWTs to cache (0 = 1 000).</param>
/// <param name="ttl">Time-to-live for each cached JWT.</param>
/// <param name="opts">Zero or more functional options to further configure this instance.</param>
public CacheDirAccountResolver(
string path,
long limit,
TimeSpan ttl,
params DirResOption[] opts)
: base(
store: DirJwtStore.NewExpiringDirJwtStore(
path,
shard: false,
create: true,
JwtDeleteType.HardDelete,
expireCheck: TimeSpan.Zero,
limit: limit <= 0 ? DefaultCacheLimit : limit,
evictOnLimit: true,
ttl: ttl,
changeNotification: null),
syncInterval: TimeSpan.Zero,
fetchTimeout: DefaultFetchTimeout)
{
Ttl = ttl;
Apply(opts);
}
/// <summary>
/// Starts background system subscriptions for cached JWT update notifications.
/// <para>
/// TODO (session 12): wire up system subscriptions for account JWT update events
/// (cache variant — does not include pack/list/delete handling).
/// </para>
/// Mirrors Go <c>CacheDirAccResolver.Start</c>.
/// </summary>
public override void Start(object server)
{
Server = server;
// TODO (session 12): set up system subscriptions for cache-update notifications.
}
}