Issue #12: implement session manager and registry
This commit is contained in:
@@ -330,6 +330,20 @@ The worker remains authoritative for MXAccess handles. The gateway may keep a
|
|||||||
shadow state for diagnostics, but it must not invent, rewrite, or recycle
|
shadow state for diagnostics, but it must not invent, rewrite, or recycle
|
||||||
MXAccess handles.
|
MXAccess handles.
|
||||||
|
|
||||||
|
`SessionManager` owns the current in-memory session registry. It allocates a
|
||||||
|
session id, creates the worker pipe name and nonce, registers the session before
|
||||||
|
worker startup, and removes the session if startup fails. A successful
|
||||||
|
`OpenSession` attaches the ready `IWorkerClient` and transitions the session to
|
||||||
|
`Ready`.
|
||||||
|
|
||||||
|
Only `Ready` sessions accept command and event operations. `CloseSession` is
|
||||||
|
idempotent for sessions still known to the registry: the first close shuts down
|
||||||
|
the worker, and later closes return the final `Closed` state. Lease handling is
|
||||||
|
exposed as a session hook so a monitor can close expired sessions without
|
||||||
|
embedding lease policy in the worker client. Gateway shutdown walks the
|
||||||
|
registry, closes each known session, and kills a worker if graceful shutdown
|
||||||
|
fails.
|
||||||
|
|
||||||
## Worker Launch
|
## Worker Launch
|
||||||
|
|
||||||
The gateway should launch the worker using explicit configuration:
|
The gateway should launch the worker using explicit configuration:
|
||||||
|
|||||||
@@ -799,6 +799,12 @@ Core operations:
|
|||||||
- track worker state,
|
- track worker state,
|
||||||
- close or kill worker.
|
- close or kill worker.
|
||||||
|
|
||||||
|
The gateway implementation keeps sessions in an in-memory `SessionRegistry`
|
||||||
|
keyed by session id. `SessionManager` owns the state machine, creates
|
||||||
|
per-session pipe names and nonces, starts the worker through the worker-client
|
||||||
|
factory, gates commands to `Ready` sessions, exposes lease-close hooks, and
|
||||||
|
cleans up workers during gateway shutdown.
|
||||||
|
|
||||||
State machine:
|
State machine:
|
||||||
|
|
||||||
```text
|
```text
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ using MxGateway.Server.Diagnostics;
|
|||||||
using MxGateway.Server.Metrics;
|
using MxGateway.Server.Metrics;
|
||||||
using MxGateway.Server.Security.Authentication;
|
using MxGateway.Server.Security.Authentication;
|
||||||
using MxGateway.Server.Security.Authorization;
|
using MxGateway.Server.Security.Authorization;
|
||||||
|
using MxGateway.Server.Sessions;
|
||||||
using MxGateway.Server.Workers;
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
namespace MxGateway.Server;
|
namespace MxGateway.Server;
|
||||||
@@ -31,6 +32,7 @@ public static class GatewayApplication
|
|||||||
builder.Services.AddHealthChecks();
|
builder.Services.AddHealthChecks();
|
||||||
builder.Services.AddSingleton<GatewayMetrics>();
|
builder.Services.AddSingleton<GatewayMetrics>();
|
||||||
builder.Services.AddWorkerProcessLauncher();
|
builder.Services.AddWorkerProcessLauncher();
|
||||||
|
builder.Services.AddGatewaySessions();
|
||||||
|
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,290 @@
|
|||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public sealed class GatewaySession
|
||||||
|
{
|
||||||
|
private readonly object _syncRoot = new();
|
||||||
|
private readonly SemaphoreSlim _closeLock = new(1, 1);
|
||||||
|
private IWorkerClient? _workerClient;
|
||||||
|
private SessionState _state = SessionState.Creating;
|
||||||
|
private string? _finalFault;
|
||||||
|
private DateTimeOffset _lastClientActivityAt;
|
||||||
|
private DateTimeOffset? _leaseExpiresAt;
|
||||||
|
private bool _closeStarted;
|
||||||
|
|
||||||
|
public GatewaySession(
|
||||||
|
string sessionId,
|
||||||
|
string backendName,
|
||||||
|
string pipeName,
|
||||||
|
string nonce,
|
||||||
|
string? clientIdentity,
|
||||||
|
string? clientSessionName,
|
||||||
|
string? clientCorrelationId,
|
||||||
|
TimeSpan commandTimeout,
|
||||||
|
TimeSpan startupTimeout,
|
||||||
|
TimeSpan shutdownTimeout,
|
||||||
|
DateTimeOffset openedAt)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(sessionId))
|
||||||
|
{
|
||||||
|
throw new ArgumentException("Session id is required.", nameof(sessionId));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(backendName))
|
||||||
|
{
|
||||||
|
throw new ArgumentException("Backend name is required.", nameof(backendName));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(pipeName))
|
||||||
|
{
|
||||||
|
throw new ArgumentException("Pipe name is required.", nameof(pipeName));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(nonce))
|
||||||
|
{
|
||||||
|
throw new ArgumentException("Nonce is required.", nameof(nonce));
|
||||||
|
}
|
||||||
|
|
||||||
|
SessionId = sessionId;
|
||||||
|
BackendName = backendName;
|
||||||
|
PipeName = pipeName;
|
||||||
|
Nonce = nonce;
|
||||||
|
ClientIdentity = clientIdentity;
|
||||||
|
ClientSessionName = clientSessionName;
|
||||||
|
ClientCorrelationId = clientCorrelationId;
|
||||||
|
CommandTimeout = commandTimeout;
|
||||||
|
StartupTimeout = startupTimeout;
|
||||||
|
ShutdownTimeout = shutdownTimeout;
|
||||||
|
OpenedAt = openedAt;
|
||||||
|
_lastClientActivityAt = openedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public string SessionId { get; }
|
||||||
|
|
||||||
|
public string BackendName { get; }
|
||||||
|
|
||||||
|
public string PipeName { get; }
|
||||||
|
|
||||||
|
public string Nonce { get; }
|
||||||
|
|
||||||
|
public string? ClientIdentity { get; }
|
||||||
|
|
||||||
|
public string? ClientSessionName { get; }
|
||||||
|
|
||||||
|
public string? ClientCorrelationId { get; }
|
||||||
|
|
||||||
|
public TimeSpan CommandTimeout { get; }
|
||||||
|
|
||||||
|
public TimeSpan StartupTimeout { get; }
|
||||||
|
|
||||||
|
public TimeSpan ShutdownTimeout { get; }
|
||||||
|
|
||||||
|
public DateTimeOffset OpenedAt { get; }
|
||||||
|
|
||||||
|
public int? WorkerProcessId => _workerClient?.ProcessId;
|
||||||
|
|
||||||
|
public IWorkerClient? WorkerClient => _workerClient;
|
||||||
|
|
||||||
|
public SessionState State
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return _state;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public DateTimeOffset LastClientActivityAt
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return _lastClientActivityAt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public DateTimeOffset? LeaseExpiresAt
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return _leaseExpiresAt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public string? FinalFault
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return _finalFault;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void AttachWorkerClient(IWorkerClient workerClient)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(workerClient);
|
||||||
|
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_workerClient = workerClient;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void TransitionTo(SessionState nextState)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
if (_state is SessionState.Closed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_state is SessionState.Faulted && nextState is not SessionState.Closed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_state = nextState;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void MarkReady()
|
||||||
|
{
|
||||||
|
TransitionTo(SessionState.Ready);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void MarkFaulted(string reason)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
if (_state is SessionState.Closed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_finalFault = reason;
|
||||||
|
_state = SessionState.Faulted;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void TouchClientActivity(DateTimeOffset activityAt)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_lastClientActivityAt = activityAt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ExtendLease(DateTimeOffset leaseExpiresAt)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
_leaseExpiresAt = leaseExpiresAt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsLeaseExpired(DateTimeOffset now)
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
return _leaseExpiresAt is not null && _leaseExpiresAt <= now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<WorkerCommandReply> InvokeAsync(
|
||||||
|
WorkerCommand command,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
IWorkerClient workerClient = GetReadyWorkerClient();
|
||||||
|
TouchClientActivity(DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
IWorkerClient workerClient = GetReadyWorkerClient();
|
||||||
|
TouchClientActivity(DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
return workerClient.ReadEventsAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<SessionCloseResult> CloseAsync(
|
||||||
|
string reason,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (_state is SessionState.Closed)
|
||||||
|
{
|
||||||
|
return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool alreadyClosing = _closeStarted;
|
||||||
|
_closeStarted = true;
|
||||||
|
_state = SessionState.Closing;
|
||||||
|
|
||||||
|
if (_workerClient is not null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
_workerClient.Kill(reason);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_state = SessionState.Closed;
|
||||||
|
return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_closeLock.Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void KillWorker(string reason)
|
||||||
|
{
|
||||||
|
_workerClient?.Kill(reason);
|
||||||
|
TransitionTo(SessionState.Closed);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
_closeLock.Dispose();
|
||||||
|
if (_workerClient is not null)
|
||||||
|
{
|
||||||
|
await _workerClient.DisposeAsync().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private IWorkerClient GetReadyWorkerClient()
|
||||||
|
{
|
||||||
|
lock (_syncRoot)
|
||||||
|
{
|
||||||
|
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
|
||||||
|
{
|
||||||
|
throw new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.SessionNotReady,
|
||||||
|
$"Session {SessionId} is not ready. Current state is {_state}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
return _workerClient;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public interface ISessionManager
|
||||||
|
{
|
||||||
|
Task<GatewaySession> OpenSessionAsync(
|
||||||
|
SessionOpenRequest request,
|
||||||
|
string? clientIdentity,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
bool TryGetSession(
|
||||||
|
string sessionId,
|
||||||
|
out GatewaySession session);
|
||||||
|
|
||||||
|
Task<WorkerCommandReply> InvokeAsync(
|
||||||
|
string sessionId,
|
||||||
|
WorkerCommand command,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||||
|
string sessionId,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task<SessionCloseResult> CloseSessionAsync(
|
||||||
|
string sessionId,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task<int> CloseExpiredLeasesAsync(
|
||||||
|
DateTimeOffset now,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task ShutdownAsync(CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public interface ISessionRegistry
|
||||||
|
{
|
||||||
|
int Count { get; }
|
||||||
|
|
||||||
|
int ActiveCount { get; }
|
||||||
|
|
||||||
|
bool TryAdd(GatewaySession session);
|
||||||
|
|
||||||
|
bool TryGet(string sessionId, out GatewaySession session);
|
||||||
|
|
||||||
|
bool TryRemove(string sessionId, out GatewaySession session);
|
||||||
|
|
||||||
|
IReadOnlyCollection<GatewaySession> Snapshot();
|
||||||
|
}
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public interface ISessionWorkerClientFactory
|
||||||
|
{
|
||||||
|
Task<MxGateway.Server.Workers.IWorkerClient> CreateAsync(
|
||||||
|
GatewaySession session,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public sealed record SessionCloseResult(
|
||||||
|
string SessionId,
|
||||||
|
SessionState FinalState,
|
||||||
|
bool AlreadyClosed);
|
||||||
@@ -0,0 +1,287 @@
|
|||||||
|
using System.Security.Cryptography;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public sealed class SessionManager : ISessionManager
|
||||||
|
{
|
||||||
|
public const string DefaultCloseReason = "client-close";
|
||||||
|
public const string GatewayShutdownReason = "gateway-shutdown";
|
||||||
|
public const string LeaseExpiredReason = "lease-expired";
|
||||||
|
|
||||||
|
private readonly ISessionRegistry _registry;
|
||||||
|
private readonly ISessionWorkerClientFactory _workerClientFactory;
|
||||||
|
private readonly GatewayMetrics _metrics;
|
||||||
|
private readonly TimeProvider _timeProvider;
|
||||||
|
private readonly ILogger<SessionManager> _logger;
|
||||||
|
private readonly GatewayOptions _options;
|
||||||
|
|
||||||
|
public SessionManager(
|
||||||
|
ISessionRegistry registry,
|
||||||
|
ISessionWorkerClientFactory workerClientFactory,
|
||||||
|
IOptions<GatewayOptions> options,
|
||||||
|
GatewayMetrics metrics,
|
||||||
|
TimeProvider? timeProvider = null,
|
||||||
|
ILogger<SessionManager>? logger = null)
|
||||||
|
{
|
||||||
|
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
|
||||||
|
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
|
||||||
|
ArgumentNullException.ThrowIfNull(options);
|
||||||
|
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
|
||||||
|
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||||
|
_logger = logger ?? NullLogger<SessionManager>.Instance;
|
||||||
|
_options = options.Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<GatewaySession> OpenSessionAsync(
|
||||||
|
SessionOpenRequest request,
|
||||||
|
string? clientIdentity,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
EnsureSessionCapacity();
|
||||||
|
|
||||||
|
GatewaySession session = CreateSession(request, clientIdentity);
|
||||||
|
if (!_registry.TryAdd(session))
|
||||||
|
{
|
||||||
|
throw new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.OpenFailed,
|
||||||
|
$"Session id collision while opening session {session.SessionId}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
session.TransitionTo(SessionState.StartingWorker);
|
||||||
|
IWorkerClient workerClient = await _workerClientFactory
|
||||||
|
.CreateAsync(session, cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
session.AttachWorkerClient(workerClient);
|
||||||
|
session.MarkReady();
|
||||||
|
_metrics.SessionOpened();
|
||||||
|
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
session.MarkFaulted(exception.Message);
|
||||||
|
_registry.TryRemove(session.SessionId, out _);
|
||||||
|
await session.DisposeAsync().ConfigureAwait(false);
|
||||||
|
_metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString());
|
||||||
|
_logger.LogWarning(
|
||||||
|
exception,
|
||||||
|
"Failed to open gateway session {SessionId}.",
|
||||||
|
session.SessionId);
|
||||||
|
|
||||||
|
throw new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.OpenFailed,
|
||||||
|
$"Failed to open session {session.SessionId}.",
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryGetSession(
|
||||||
|
string sessionId,
|
||||||
|
out GatewaySession session)
|
||||||
|
{
|
||||||
|
return _registry.TryGet(sessionId, out session);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<WorkerCommandReply> InvokeAsync(
|
||||||
|
string sessionId,
|
||||||
|
WorkerCommand command,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
GatewaySession session = GetRequiredSession(sessionId);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return await session.InvokeAsync(command, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (SessionManagerException)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
if (session.WorkerClient?.State == WorkerClientState.Faulted)
|
||||||
|
{
|
||||||
|
session.MarkFaulted(exception.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||||
|
string sessionId,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
GatewaySession session = GetRequiredSession(sessionId);
|
||||||
|
|
||||||
|
return session.ReadEventsAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<SessionCloseResult> CloseSessionAsync(
|
||||||
|
string sessionId,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
GatewaySession session = GetRequiredSession(sessionId);
|
||||||
|
SessionCloseResult result = await CloseSessionCoreAsync(
|
||||||
|
session,
|
||||||
|
DefaultCloseReason,
|
||||||
|
cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<int> CloseExpiredLeasesAsync(
|
||||||
|
DateTimeOffset now,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
int closedCount = 0;
|
||||||
|
foreach (GatewaySession session in _registry.Snapshot())
|
||||||
|
{
|
||||||
|
if (!session.IsLeaseExpired(now))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false);
|
||||||
|
closedCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return closedCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
foreach (GatewaySession session in _registry.Snapshot())
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(
|
||||||
|
exception,
|
||||||
|
"Graceful shutdown failed for session {SessionId}; killing worker.",
|
||||||
|
session.SessionId);
|
||||||
|
session.KillWorker(GatewayShutdownReason);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<SessionCloseResult> CloseSessionCoreAsync(
|
||||||
|
GatewaySession session,
|
||||||
|
string reason,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
bool wasClosed = session.State == SessionState.Closed;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
SessionCloseResult result = await session.CloseAsync(reason, cancellationToken).ConfigureAwait(false);
|
||||||
|
if (!wasClosed && !result.AlreadyClosed)
|
||||||
|
{
|
||||||
|
_metrics.SessionClosed();
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
session.MarkFaulted(exception.Message);
|
||||||
|
_metrics.Fault(SessionManagerErrorCode.CloseFailed.ToString());
|
||||||
|
throw new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.CloseFailed,
|
||||||
|
$"Failed to close session {session.SessionId}.",
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private GatewaySession GetRequiredSession(string sessionId)
|
||||||
|
{
|
||||||
|
if (!_registry.TryGet(sessionId, out GatewaySession session))
|
||||||
|
{
|
||||||
|
throw new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.SessionNotFound,
|
||||||
|
$"Session {sessionId} was not found.");
|
||||||
|
}
|
||||||
|
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void EnsureSessionCapacity()
|
||||||
|
{
|
||||||
|
if (_registry.ActiveCount >= _options.Sessions.MaxSessions)
|
||||||
|
{
|
||||||
|
throw new SessionManagerException(
|
||||||
|
SessionManagerErrorCode.SessionLimitExceeded,
|
||||||
|
$"Gateway session limit {_options.Sessions.MaxSessions} has been reached.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private GatewaySession CreateSession(
|
||||||
|
SessionOpenRequest request,
|
||||||
|
string? clientIdentity)
|
||||||
|
{
|
||||||
|
string sessionId = CreateSessionId();
|
||||||
|
string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend)
|
||||||
|
? GatewayContractInfo.DefaultBackendName
|
||||||
|
: request.RequestedBackend!;
|
||||||
|
TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout);
|
||||||
|
TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds);
|
||||||
|
TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds);
|
||||||
|
string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}";
|
||||||
|
string nonce = CreateNonce();
|
||||||
|
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
|
||||||
|
|
||||||
|
return new GatewaySession(
|
||||||
|
sessionId,
|
||||||
|
backendName,
|
||||||
|
pipeName,
|
||||||
|
nonce,
|
||||||
|
clientIdentity,
|
||||||
|
request.ClientSessionName,
|
||||||
|
request.ClientCorrelationId,
|
||||||
|
commandTimeout,
|
||||||
|
startupTimeout,
|
||||||
|
shutdownTimeout,
|
||||||
|
openedAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout)
|
||||||
|
{
|
||||||
|
if (requestedTimeout is null)
|
||||||
|
{
|
||||||
|
return TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
TimeSpan timeout = requestedTimeout.ToTimeSpan();
|
||||||
|
return timeout <= TimeSpan.Zero
|
||||||
|
? TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds)
|
||||||
|
: timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string CreateSessionId()
|
||||||
|
{
|
||||||
|
return $"session-{Guid.NewGuid():N}";
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string CreateNonce()
|
||||||
|
{
|
||||||
|
Span<byte> bytes = stackalloc byte[32];
|
||||||
|
RandomNumberGenerator.Fill(bytes);
|
||||||
|
|
||||||
|
return Convert.ToBase64String(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public enum SessionManagerErrorCode
|
||||||
|
{
|
||||||
|
SessionNotFound,
|
||||||
|
SessionNotReady,
|
||||||
|
SessionLimitExceeded,
|
||||||
|
OpenFailed,
|
||||||
|
CloseFailed,
|
||||||
|
}
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public sealed class SessionManagerException : Exception
|
||||||
|
{
|
||||||
|
public SessionManagerException(
|
||||||
|
SessionManagerErrorCode errorCode,
|
||||||
|
string message)
|
||||||
|
: base(message)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SessionManagerException(
|
||||||
|
SessionManagerErrorCode errorCode,
|
||||||
|
string message,
|
||||||
|
Exception innerException)
|
||||||
|
: base(message, innerException)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SessionManagerErrorCode ErrorCode { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,22 @@
|
|||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public sealed record SessionOpenRequest(
|
||||||
|
string? RequestedBackend,
|
||||||
|
string? ClientSessionName,
|
||||||
|
string? ClientCorrelationId,
|
||||||
|
Duration? CommandTimeout)
|
||||||
|
{
|
||||||
|
public static SessionOpenRequest FromContract(OpenSessionRequest request)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
|
||||||
|
return new SessionOpenRequest(
|
||||||
|
request.RequestedBackend,
|
||||||
|
request.ClientSessionName,
|
||||||
|
request.ClientCorrelationId,
|
||||||
|
request.CommandTimeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,39 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public sealed class SessionRegistry : ISessionRegistry
|
||||||
|
{
|
||||||
|
private readonly ConcurrentDictionary<string, GatewaySession> _sessions = new(StringComparer.Ordinal);
|
||||||
|
|
||||||
|
public int Count => _sessions.Count;
|
||||||
|
|
||||||
|
public int ActiveCount => _sessions.Values.Count(session => session.State is not SessionState.Closed);
|
||||||
|
|
||||||
|
public bool TryAdd(GatewaySession session)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(session);
|
||||||
|
|
||||||
|
return _sessions.TryAdd(session.SessionId, session);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryGet(
|
||||||
|
string sessionId,
|
||||||
|
out GatewaySession session)
|
||||||
|
{
|
||||||
|
return _sessions.TryGetValue(sessionId, out session!);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryRemove(
|
||||||
|
string sessionId,
|
||||||
|
out GatewaySession session)
|
||||||
|
{
|
||||||
|
return _sessions.TryRemove(sessionId, out session!);
|
||||||
|
}
|
||||||
|
|
||||||
|
public IReadOnlyCollection<GatewaySession> Snapshot()
|
||||||
|
{
|
||||||
|
return _sessions.Values.ToArray();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public static class SessionServiceCollectionExtensions
|
||||||
|
{
|
||||||
|
public static IServiceCollection AddGatewaySessions(this IServiceCollection services)
|
||||||
|
{
|
||||||
|
services.AddSingleton<ISessionRegistry, SessionRegistry>();
|
||||||
|
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
|
||||||
|
services.AddSingleton<ISessionManager, SessionManager>();
|
||||||
|
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,144 @@
|
|||||||
|
using System.IO.Pipes;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
public sealed class SessionWorkerClientFactory : ISessionWorkerClientFactory
|
||||||
|
{
|
||||||
|
private readonly IWorkerProcessLauncher _workerProcessLauncher;
|
||||||
|
private readonly GatewayMetrics _metrics;
|
||||||
|
private readonly TimeProvider _timeProvider;
|
||||||
|
private readonly ILoggerFactory _loggerFactory;
|
||||||
|
private readonly GatewayOptions _options;
|
||||||
|
|
||||||
|
public SessionWorkerClientFactory(
|
||||||
|
IWorkerProcessLauncher workerProcessLauncher,
|
||||||
|
IOptions<GatewayOptions> options,
|
||||||
|
GatewayMetrics metrics,
|
||||||
|
ILoggerFactory loggerFactory,
|
||||||
|
TimeProvider? timeProvider = null)
|
||||||
|
{
|
||||||
|
_workerProcessLauncher = workerProcessLauncher ?? throw new ArgumentNullException(nameof(workerProcessLauncher));
|
||||||
|
ArgumentNullException.ThrowIfNull(options);
|
||||||
|
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
|
||||||
|
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
||||||
|
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||||
|
_options = options.Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<IWorkerClient> CreateAsync(
|
||||||
|
GatewaySession session,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(session);
|
||||||
|
|
||||||
|
NamedPipeServerStream? pipe = CreatePipe(session.PipeName);
|
||||||
|
WorkerProcessHandle? processHandle = null;
|
||||||
|
IWorkerClient? workerClient = null;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
session.TransitionTo(SessionState.StartingWorker);
|
||||||
|
processHandle = await _workerProcessLauncher
|
||||||
|
.LaunchAsync(
|
||||||
|
new WorkerProcessLaunchRequest(
|
||||||
|
session.SessionId,
|
||||||
|
session.PipeName,
|
||||||
|
GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
session.Nonce,
|
||||||
|
pipe),
|
||||||
|
cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
session.TransitionTo(SessionState.WaitingForPipe);
|
||||||
|
await WaitForPipeConnectionAsync(pipe, session.StartupTimeout, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
session.TransitionTo(SessionState.Handshaking);
|
||||||
|
WorkerFrameProtocolOptions frameOptions = new(
|
||||||
|
session.SessionId,
|
||||||
|
GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
_options.Worker.MaxMessageBytes);
|
||||||
|
WorkerClientConnection connection = new(
|
||||||
|
session.SessionId,
|
||||||
|
session.Nonce,
|
||||||
|
pipe,
|
||||||
|
frameOptions,
|
||||||
|
processHandle);
|
||||||
|
WorkerClientOptions clientOptions = new()
|
||||||
|
{
|
||||||
|
HeartbeatGrace = TimeSpan.FromSeconds(_options.Worker.HeartbeatGraceSeconds),
|
||||||
|
HeartbeatCheckInterval = TimeSpan.FromSeconds(_options.Worker.HeartbeatIntervalSeconds),
|
||||||
|
EventChannelCapacity = _options.Events.QueueCapacity,
|
||||||
|
};
|
||||||
|
|
||||||
|
workerClient = new WorkerClient(
|
||||||
|
connection,
|
||||||
|
clientOptions,
|
||||||
|
_metrics,
|
||||||
|
_timeProvider,
|
||||||
|
_loggerFactory.CreateLogger<WorkerClient>());
|
||||||
|
|
||||||
|
pipe = null;
|
||||||
|
processHandle = null;
|
||||||
|
|
||||||
|
session.TransitionTo(SessionState.InitializingWorker);
|
||||||
|
await workerClient.StartAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
return workerClient;
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
if (workerClient is not null)
|
||||||
|
{
|
||||||
|
await workerClient.DisposeAsync().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (processHandle is not null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!processHandle.Process.HasExited)
|
||||||
|
{
|
||||||
|
processHandle.Process.Kill(entireProcessTree: true);
|
||||||
|
_metrics.WorkerKilled("OpenSessionFailed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
processHandle.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pipe?.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static NamedPipeServerStream CreatePipe(string pipeName)
|
||||||
|
{
|
||||||
|
return new NamedPipeServerStream(
|
||||||
|
pipeName,
|
||||||
|
PipeDirection.InOut,
|
||||||
|
maxNumberOfServerInstances: 1,
|
||||||
|
PipeTransmissionMode.Byte,
|
||||||
|
PipeOptions.Asynchronous);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task WaitForPipeConnectionAsync(
|
||||||
|
NamedPipeServerStream pipe,
|
||||||
|
TimeSpan startupTimeout,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||||
|
timeout.CancelAfter(startupTimeout);
|
||||||
|
await pipe.WaitForConnectionAsync(timeout.Token).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,320 @@
|
|||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Configuration;
|
||||||
|
using MxGateway.Server.Metrics;
|
||||||
|
using MxGateway.Server.Sessions;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Gateway.Sessions;
|
||||||
|
|
||||||
|
public sealed class SessionManagerTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task OpenSessionAsync_WithWorkerReady_RegistersReadySession()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
FakeSessionWorkerClientFactory factory = new(workerClient)
|
||||||
|
{
|
||||||
|
ApplyLifecycleTransitions = true,
|
||||||
|
};
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
SessionManager manager = CreateManager(factory, metrics: metrics);
|
||||||
|
|
||||||
|
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.True(manager.TryGetSession(session.SessionId, out GatewaySession registered));
|
||||||
|
Assert.Same(session, registered);
|
||||||
|
Assert.Equal(SessionState.Ready, session.State);
|
||||||
|
Assert.Equal("client-1", session.ClientIdentity);
|
||||||
|
Assert.Equal(["StartingWorker", "WaitingForPipe", "Handshaking", "InitializingWorker"], factory.ObservedStates);
|
||||||
|
Assert.Equal(1, metrics.GetSnapshot().OpenSessions);
|
||||||
|
Assert.Equal(1, metrics.GetSnapshot().SessionsOpened);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task InvokeAsync_WhenSessionReady_ForwardsCommandToWorker()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||||
|
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||||
|
|
||||||
|
WorkerCommandReply reply = await manager.InvokeAsync(
|
||||||
|
session.SessionId,
|
||||||
|
CreateCommand(MxCommandKind.Ping),
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(1, workerClient.InvokeCount);
|
||||||
|
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task InvokeAsync_WhenSessionFaulted_RejectsCommand()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||||
|
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||||
|
session.MarkFaulted("test fault");
|
||||||
|
|
||||||
|
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||||
|
async () => await manager.InvokeAsync(
|
||||||
|
session.SessionId,
|
||||||
|
CreateCommand(MxCommandKind.Ping),
|
||||||
|
CancellationToken.None));
|
||||||
|
|
||||||
|
Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
|
||||||
|
Assert.Equal(0, workerClient.InvokeCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CloseSessionAsync_WhenCalledTwice_IsIdempotent()
|
||||||
|
{
|
||||||
|
FakeWorkerClient workerClient = new();
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics);
|
||||||
|
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||||
|
|
||||||
|
SessionCloseResult firstClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
|
||||||
|
SessionCloseResult secondClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.False(firstClose.AlreadyClosed);
|
||||||
|
Assert.True(secondClose.AlreadyClosed);
|
||||||
|
Assert.Equal(SessionState.Closed, firstClose.FinalState);
|
||||||
|
Assert.Equal(SessionState.Closed, secondClose.FinalState);
|
||||||
|
Assert.Equal(1, workerClient.ShutdownCount);
|
||||||
|
Assert.Equal(1, metrics.GetSnapshot().SessionsClosed);
|
||||||
|
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new();
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
SessionManager manager = CreateManager(
|
||||||
|
new FailingSessionWorkerClientFactory(),
|
||||||
|
registry,
|
||||||
|
metrics);
|
||||||
|
|
||||||
|
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||||
|
async () => await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None));
|
||||||
|
|
||||||
|
Assert.Equal(SessionManagerErrorCode.OpenFailed, exception.ErrorCode);
|
||||||
|
Assert.Equal(0, registry.Count);
|
||||||
|
Assert.Equal(0, metrics.GetSnapshot().SessionsOpened);
|
||||||
|
Assert.Equal(1, metrics.GetSnapshot().Faults);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CloseExpiredLeasesAsync_ClosesExpiredSessionsOnly()
|
||||||
|
{
|
||||||
|
FakeWorkerClient expiredClient = new();
|
||||||
|
FakeWorkerClient activeClient = new();
|
||||||
|
QueueingSessionWorkerClientFactory factory = new(expiredClient, activeClient);
|
||||||
|
SessionManager manager = CreateManager(factory);
|
||||||
|
GatewaySession expiredSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||||
|
GatewaySession activeSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
|
||||||
|
DateTimeOffset now = DateTimeOffset.UtcNow;
|
||||||
|
expiredSession.ExtendLease(now.AddSeconds(-1));
|
||||||
|
activeSession.ExtendLease(now.AddMinutes(5));
|
||||||
|
|
||||||
|
int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(1, closedCount);
|
||||||
|
Assert.Equal(SessionState.Closed, expiredSession.State);
|
||||||
|
Assert.Equal(SessionState.Ready, activeSession.State);
|
||||||
|
Assert.Equal(1, expiredClient.ShutdownCount);
|
||||||
|
Assert.Equal(0, activeClient.ShutdownCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ShutdownAsync_ClosesAllRegisteredSessions()
|
||||||
|
{
|
||||||
|
FakeWorkerClient firstClient = new();
|
||||||
|
FakeWorkerClient secondClient = new();
|
||||||
|
QueueingSessionWorkerClientFactory factory = new(firstClient, secondClient);
|
||||||
|
using GatewayMetrics metrics = new();
|
||||||
|
SessionManager manager = CreateManager(factory, metrics: metrics);
|
||||||
|
GatewaySession firstSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||||
|
GatewaySession secondSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
|
||||||
|
|
||||||
|
await manager.ShutdownAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(SessionState.Closed, firstSession.State);
|
||||||
|
Assert.Equal(SessionState.Closed, secondSession.State);
|
||||||
|
Assert.Equal(1, firstClient.ShutdownCount);
|
||||||
|
Assert.Equal(1, secondClient.ShutdownCount);
|
||||||
|
Assert.Equal(2, metrics.GetSnapshot().SessionsClosed);
|
||||||
|
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SessionManager CreateManager(
|
||||||
|
ISessionWorkerClientFactory factory,
|
||||||
|
ISessionRegistry? registry = null,
|
||||||
|
GatewayMetrics? metrics = null,
|
||||||
|
GatewayOptions? options = null)
|
||||||
|
{
|
||||||
|
return new SessionManager(
|
||||||
|
registry ?? new SessionRegistry(),
|
||||||
|
factory,
|
||||||
|
Options.Create(options ?? CreateOptions()),
|
||||||
|
metrics ?? new GatewayMetrics());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GatewayOptions CreateOptions()
|
||||||
|
{
|
||||||
|
return new GatewayOptions
|
||||||
|
{
|
||||||
|
Sessions = new SessionOptions
|
||||||
|
{
|
||||||
|
DefaultCommandTimeoutSeconds = 30,
|
||||||
|
MaxSessions = 64,
|
||||||
|
},
|
||||||
|
Worker = new WorkerOptions
|
||||||
|
{
|
||||||
|
StartupTimeoutSeconds = 30,
|
||||||
|
ShutdownTimeoutSeconds = 10,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SessionOpenRequest CreateOpenRequest()
|
||||||
|
{
|
||||||
|
return new SessionOpenRequest(
|
||||||
|
RequestedBackend: null,
|
||||||
|
ClientSessionName: "test-session",
|
||||||
|
ClientCorrelationId: "client-correlation-1",
|
||||||
|
CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerCommand CreateCommand(MxCommandKind kind)
|
||||||
|
{
|
||||||
|
return new WorkerCommand
|
||||||
|
{
|
||||||
|
Command = new MxCommand
|
||||||
|
{
|
||||||
|
Kind = kind,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeSessionWorkerClientFactory(IWorkerClient workerClient) : ISessionWorkerClientFactory
|
||||||
|
{
|
||||||
|
public List<string> ObservedStates { get; } = [];
|
||||||
|
|
||||||
|
public bool ApplyLifecycleTransitions { get; init; }
|
||||||
|
|
||||||
|
public Task<IWorkerClient> CreateAsync(
|
||||||
|
GatewaySession session,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ObservedStates.Add(session.State.ToString());
|
||||||
|
if (ApplyLifecycleTransitions)
|
||||||
|
{
|
||||||
|
session.TransitionTo(SessionState.WaitingForPipe);
|
||||||
|
ObservedStates.Add(session.State.ToString());
|
||||||
|
session.TransitionTo(SessionState.Handshaking);
|
||||||
|
ObservedStates.Add(session.State.ToString());
|
||||||
|
session.TransitionTo(SessionState.InitializingWorker);
|
||||||
|
ObservedStates.Add(session.State.ToString());
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.FromResult(workerClient);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class QueueingSessionWorkerClientFactory : ISessionWorkerClientFactory
|
||||||
|
{
|
||||||
|
private readonly Queue<IWorkerClient> _workerClients;
|
||||||
|
|
||||||
|
public QueueingSessionWorkerClientFactory(params IWorkerClient[] workerClients)
|
||||||
|
{
|
||||||
|
_workerClients = new Queue<IWorkerClient>(workerClients);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<IWorkerClient> CreateAsync(
|
||||||
|
GatewaySession session,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.FromResult(_workerClients.Dequeue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FailingSessionWorkerClientFactory : ISessionWorkerClientFactory
|
||||||
|
{
|
||||||
|
public Task<IWorkerClient> CreateAsync(
|
||||||
|
GatewaySession session,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException("worker startup failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeWorkerClient : IWorkerClient
|
||||||
|
{
|
||||||
|
public string SessionId { get; init; } = "session-1";
|
||||||
|
|
||||||
|
public int? ProcessId { get; init; } = 1234;
|
||||||
|
|
||||||
|
public WorkerClientState State { get; set; } = WorkerClientState.Ready;
|
||||||
|
|
||||||
|
public DateTimeOffset LastHeartbeatAt { get; init; } = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
public int InvokeCount { get; private set; }
|
||||||
|
|
||||||
|
public int ShutdownCount { get; private set; }
|
||||||
|
|
||||||
|
public int KillCount { get; private set; }
|
||||||
|
|
||||||
|
public Task StartAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<WorkerCommandReply> InvokeAsync(
|
||||||
|
WorkerCommand command,
|
||||||
|
TimeSpan timeout,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
InvokeCount++;
|
||||||
|
MxCommandKind kind = command.Command?.Kind ?? MxCommandKind.Unspecified;
|
||||||
|
|
||||||
|
return Task.FromResult(new WorkerCommandReply
|
||||||
|
{
|
||||||
|
Reply = new MxCommandReply
|
||||||
|
{
|
||||||
|
SessionId = SessionId,
|
||||||
|
CorrelationId = "correlation-1",
|
||||||
|
Kind = kind,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||||
|
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await Task.CompletedTask;
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task ShutdownAsync(
|
||||||
|
TimeSpan timeout,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ShutdownCount++;
|
||||||
|
State = WorkerClientState.Closed;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Kill(string reason)
|
||||||
|
{
|
||||||
|
KillCount++;
|
||||||
|
State = WorkerClientState.Faulted;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
return ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user