using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Options; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Configuration; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; using ZB.MOM.WW.MxGateway.Tests.TestSupport; namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions; public sealed class SessionManagerTests { /// Verifies that opening a session with a ready worker registers the session in ready state. [Fact] public async Task OpenSessionAsync_WithWorkerReady_RegistersReadySession() { FakeWorkerClient workerClient = new(); FakeSessionWorkerClientFactory factory = new(workerClient) { ApplyLifecycleTransitions = true, }; using GatewayMetrics metrics = new(); SessionManager manager = CreateManager(factory, metrics: metrics); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); Assert.True(manager.TryGetSession(session.SessionId, out GatewaySession? registered)); Assert.Same(session, registered); Assert.Equal(SessionState.Ready, session.State); Assert.Equal("client-1", session.ClientIdentity); Assert.Equal(["StartingWorker", "WaitingForPipe", "Handshaking", "InitializingWorker"], factory.ObservedStates); Assert.Equal(1, metrics.GetSnapshot().OpenSessions); Assert.Equal(1, metrics.GetSnapshot().SessionsOpened); } /// Verifies that opening a session sets the initial lease expiry from the configured default lease. [Fact] public async Task OpenSessionAsync_SetsInitialDefaultLease() { ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-04-29T10:00:00Z", System.Globalization.CultureInfo.InvariantCulture)); GatewayOptions options = CreateOptions(defaultLeaseSeconds: 1800); SessionManager manager = CreateManager( new FakeSessionWorkerClientFactory(new FakeWorkerClient()), options: options, timeProvider: clock); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); Assert.Equal(clock.GetUtcNow() + TimeSpan.FromMinutes(30), session.LeaseExpiresAt); } [Fact] public async Task OpenSessionAsync_GeneratesClientCorrelationIdFromClientNameAndSessionId() { SessionOpenRequest request = CreateOpenRequest() with { ClientSessionName = "rust-load-client", ClientCorrelationId = "caller-provided-correlation", }; SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient())); GatewaySession session = await manager.OpenSessionAsync(request, "client-1", CancellationToken.None); Assert.Equal($"rust-load-client-{session.SessionId}", session.ClientCorrelationId); } /// Verifies that opening a session without a client session name uses the client correlation prefix. [Fact] public async Task OpenSessionAsync_WhenClientSessionNameMissing_UsesClientCorrelationPrefix() { SessionOpenRequest request = CreateOpenRequest() with { ClientSessionName = "", }; SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient())); GatewaySession session = await manager.OpenSessionAsync(request, "client-1", CancellationToken.None); Assert.Equal($"client-{session.SessionId}", session.ClientCorrelationId); } /// Verifies that invoking a command on a ready session forwards the command to the worker. [Fact] public async Task InvokeAsync_WhenSessionReady_ForwardsCommandToWorker() { FakeWorkerClient workerClient = new(); SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); WorkerCommandReply reply = await manager.InvokeAsync( session.SessionId, CreateCommand(MxCommandKind.Ping), CancellationToken.None); Assert.Equal(1, workerClient.InvokeCount); Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); } /// Verifies that invoking a command on a ready session refreshes its lease expiry. [Fact] public async Task InvokeAsync_WhenSessionReady_RefreshesLease() { GatewaySession session = new( "session-lease-refresh", "mxaccess", "mxaccess-gateway-1-session-lease-refresh", "nonce", "client-1", "test-session", "client-correlation-1", TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), TimeSpan.FromMinutes(30), DateTimeOffset.UtcNow - TimeSpan.FromHours(1)); session.AttachWorkerClient(new FakeWorkerClient()); session.MarkReady(); DateTimeOffset? initialLease = session.LeaseExpiresAt; await session.InvokeAsync(CreateCommand(MxCommandKind.Ping), CancellationToken.None); Assert.True(session.LeaseExpiresAt > initialLease); Assert.True(session.LeaseExpiresAt > DateTimeOffset.UtcNow); } [Fact] public async Task GatewaySessionSubscribeBulkAsync_ForwardsOneBulkCommandAndReturnsResults() { FakeWorkerClient workerClient = new() { InvokeReply = new WorkerCommandReply { Reply = new MxCommandReply { SessionId = "session-1", CorrelationId = "correlation-1", Kind = MxCommandKind.SubscribeBulk, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, SubscribeBulk = new BulkSubscribeReply { Results = { new SubscribeResult { ServerHandle = 12, TagAddress = "Galaxy.Tag.Value", ItemHandle = 512, WasSuccessful = true, }, }, }, }, }, }; SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); IReadOnlyList results = await session.SubscribeBulkAsync( 12, ["Galaxy.Tag.Value"], CancellationToken.None); SubscribeResult result = Assert.Single(results); Assert.Equal(512, result.ItemHandle); Assert.Equal(1, workerClient.InvokeCount); Assert.Equal(MxCommandKind.SubscribeBulk, workerClient.LastCommand?.Command.Kind); Assert.Equal(["Galaxy.Tag.Value"], workerClient.LastCommand?.Command.SubscribeBulk.TagAddresses); } [Fact] public async Task GatewaySessionWriteBulkAsync_ForwardsOneBulkCommandAndReturnsResults() { FakeWorkerClient workerClient = new() { InvokeReply = new WorkerCommandReply { Reply = new MxCommandReply { SessionId = "session-1", CorrelationId = "correlation-1", Kind = MxCommandKind.WriteBulk, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, WriteBulk = new BulkWriteReply { Results = { new BulkWriteResult { ServerHandle = 12, ItemHandle = 901, WasSuccessful = true, }, new BulkWriteResult { ServerHandle = 12, ItemHandle = 902, WasSuccessful = false, ErrorMessage = "MXAccess invalid handle", }, }, }, }, }, }; SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); IReadOnlyList results = await session.WriteBulkAsync( 12, new[] { new WriteBulkEntry { ItemHandle = 901, UserId = 5, Value = new MxValue { DataType = MxDataType.Integer, Int32Value = 11 }, }, new WriteBulkEntry { ItemHandle = 902, UserId = 5, Value = new MxValue { DataType = MxDataType.Integer, Int32Value = 22 }, }, }, CancellationToken.None); Assert.Equal(2, results.Count); Assert.True(results[0].WasSuccessful); Assert.False(results[1].WasSuccessful); Assert.Equal(MxCommandKind.WriteBulk, workerClient.LastCommand?.Command.Kind); Assert.Equal(2, workerClient.LastCommand?.Command.WriteBulk.Entries.Count); } [Fact] public async Task GatewaySessionReadBulkAsync_ForwardsOneBulkCommandAndReturnsResults() { FakeWorkerClient workerClient = new() { InvokeReply = new WorkerCommandReply { Reply = new MxCommandReply { SessionId = "session-1", CorrelationId = "correlation-1", Kind = MxCommandKind.ReadBulk, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, ReadBulk = new BulkReadReply { Results = { new BulkReadResult { ServerHandle = 12, TagAddress = "Galaxy.Tag.Value", ItemHandle = 512, WasSuccessful = true, WasCached = true, Value = new MxValue { DataType = MxDataType.Integer, Int32Value = 42 }, }, }, }, }, }, }; SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); IReadOnlyList results = await session.ReadBulkAsync( 12, ["Galaxy.Tag.Value"], TimeSpan.FromMilliseconds(500), CancellationToken.None); BulkReadResult result = Assert.Single(results); Assert.True(result.WasSuccessful); Assert.True(result.WasCached); Assert.Equal(42, result.Value.Int32Value); Assert.Equal(MxCommandKind.ReadBulk, workerClient.LastCommand?.Command.Kind); Assert.Equal(["Galaxy.Tag.Value"], workerClient.LastCommand?.Command.ReadBulk.TagAddresses); Assert.Equal(500u, workerClient.LastCommand?.Command.ReadBulk.TimeoutMs); } /// Verifies that invoking a command on a faulted session rejects the command. [Fact] public async Task InvokeAsync_WhenSessionFaulted_RejectsCommand() { FakeWorkerClient workerClient = new(); SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); session.MarkFaulted("test fault"); SessionManagerException exception = await Assert.ThrowsAsync( async () => await manager.InvokeAsync( session.SessionId, CreateCommand(MxCommandKind.Ping), CancellationToken.None)); Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode); Assert.Equal(0, workerClient.InvokeCount); } /// /// Server-030 regression: when the gateway-side SessionState is /// Ready but the worker client's own state is not, the diagnostic /// must surface both states so the mismatch is actionable instead of /// producing a self-contradictory "Session ... is not ready. Current /// state is Ready." message. /// [Fact] public async Task InvokeAsync_WhenWorkerNotReadyButSessionReady_DiagnosticIncludesBothStates() { FakeWorkerClient workerClient = new(); SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); // Force a state mismatch: session stays Ready, worker transitions out. workerClient.State = WorkerClientState.Handshaking; Assert.Equal(SessionState.Ready, session.State); SessionManagerException exception = await Assert.ThrowsAsync( async () => await manager.InvokeAsync( session.SessionId, CreateCommand(MxCommandKind.Ping), CancellationToken.None)); Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode); Assert.Contains("Session state is Ready", exception.Message); Assert.Contains("worker state is Handshaking", exception.Message); Assert.Equal(0, workerClient.InvokeCount); } /// Verifies that closing a session removes it from the registry. [Fact] public async Task CloseSessionAsync_RemovesClosedSession() { FakeWorkerClient workerClient = new(); using GatewayMetrics metrics = new(); SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); SessionCloseResult firstClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None); SessionManagerException secondClose = await Assert.ThrowsAsync( async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None)); Assert.False(firstClose.AlreadyClosed); Assert.Equal(SessionState.Closed, firstClose.FinalState); Assert.Equal(SessionManagerErrorCode.SessionNotFound, secondClose.ErrorCode); Assert.Equal(1, workerClient.ShutdownCount); Assert.Equal(1, metrics.GetSnapshot().SessionsClosed); Assert.Equal(0, metrics.GetSnapshot().OpenSessions); } /// Verifies that closing a session kills the worker when shutdown fails. [Fact] public async Task CloseSessionAsync_WhenWorkerShutdownFails_KillsWorker() { FakeWorkerClient workerClient = new() { ShutdownException = new WorkerClientException( WorkerClientErrorCode.ShutdownTimeout, "Worker shutdown timed out."), }; SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); SessionManagerException exception = await Assert.ThrowsAsync( async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None)); Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode); Assert.Equal(1, workerClient.ShutdownCount); Assert.Equal(1, workerClient.KillCount); } /// Verifies that when worker shutdown fails, the session is removed and the slot is released. [Fact] public async Task CloseSessionAsync_WhenWorkerShutdownFails_RemovesSessionAndReleasesSlot() { FakeWorkerClient failingWorkerClient = new() { ShutdownException = new WorkerClientException( WorkerClientErrorCode.ShutdownTimeout, "Worker shutdown timed out."), }; FakeWorkerClient replacementWorkerClient = new(); SessionRegistry registry = new(); using GatewayMetrics metrics = new(); SessionManager manager = CreateManager( new QueueingSessionWorkerClientFactory(failingWorkerClient, replacementWorkerClient), registry, metrics, CreateOptions(maxSessions: 1)); GatewaySession firstSession = await manager.OpenSessionAsync( CreateOpenRequest(), "client-1", CancellationToken.None); metrics.EventReceived(firstSession.SessionId, MxEventFamily.OnDataChange.ToString()); SessionManagerException exception = await Assert.ThrowsAsync( async () => await manager.CloseSessionAsync(firstSession.SessionId, CancellationToken.None)); GatewaySession secondSession = await manager.OpenSessionAsync( CreateOpenRequest(), "client-2", CancellationToken.None); Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode); Assert.False(manager.TryGetSession(firstSession.SessionId, out _)); Assert.True(manager.TryGetSession(secondSession.SessionId, out _)); Assert.Equal(1, registry.Count); Assert.Equal(1, failingWorkerClient.KillCount); Assert.Equal(1, failingWorkerClient.DisposeCount); GatewayMetricsSnapshot snapshot = metrics.GetSnapshot(); // Server-046: a close-that-failed now accounts as SessionClosed (counter += 1) rather // than SessionRemoved (gauge -= 1, counter unchanged). The session is being removed // from the registry on this path, so it must show up in the closed count. Assert.Equal(1, snapshot.SessionsClosed); Assert.False(snapshot.EventsBySession.ContainsKey(firstSession.SessionId)); Assert.Equal(1, snapshot.OpenSessions); } /// Verifies that when the second close is canceled, the session is not removed if owned by the first close. [Fact] public async Task CloseSessionAsync_WhenSecondCloseIsCanceled_DoesNotRemoveSessionOwnedByFirstClose() { FakeWorkerClient workerClient = new() { BlockShutdown = true, }; SessionRegistry registry = new(); using GatewayMetrics metrics = new(); SessionManager manager = CreateManager( new FakeSessionWorkerClientFactory(workerClient), registry, metrics, CreateOptions(maxSessions: 1)); GatewaySession session = await manager.OpenSessionAsync( CreateOpenRequest(), "client-1", CancellationToken.None); Task firstClose = manager.CloseSessionAsync(session.SessionId, CancellationToken.None); await workerClient.WaitForShutdownStartAsync(); using CancellationTokenSource secondCloseCancellation = new(); Task secondClose = manager.CloseSessionAsync( session.SessionId, secondCloseCancellation.Token); await secondCloseCancellation.CancelAsync(); await Assert.ThrowsAnyAsync( async () => await secondClose); Assert.True(manager.TryGetSession(session.SessionId, out _)); Assert.Equal(1, registry.Count); Assert.Equal(0, workerClient.DisposeCount); Assert.Equal(0, metrics.GetSnapshot().SessionsClosed); Assert.Equal(1, metrics.GetSnapshot().OpenSessions); workerClient.ReleaseShutdown(); SessionCloseResult closeResult = await firstClose; Assert.Equal(SessionState.Closed, closeResult.FinalState); Assert.False(manager.TryGetSession(session.SessionId, out _)); Assert.Equal(0, registry.Count); Assert.Equal(1, workerClient.DisposeCount); Assert.Equal(1, metrics.GetSnapshot().SessionsClosed); Assert.Equal(0, metrics.GetSnapshot().OpenSessions); } /// Verifies that killing a worker removes the session from the registry without calling shutdown. [Fact] public async Task KillWorkerAsync_KillsWorkerAndRemovesSession() { FakeWorkerClient workerClient = new(); using GatewayMetrics metrics = new(); SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); SessionCloseResult result = await manager.KillWorkerAsync(session.SessionId, "test-kill", CancellationToken.None); Assert.False(result.AlreadyClosed); Assert.Equal(SessionState.Closed, result.FinalState); Assert.Equal(1, workerClient.KillCount); Assert.Equal(0, workerClient.ShutdownCount); Assert.False(manager.TryGetSession(session.SessionId, out _)); Assert.Equal(1, metrics.GetSnapshot().SessionsClosed); Assert.Equal(0, metrics.GetSnapshot().OpenSessions); } /// Verifies that killing the worker for an unknown session raises SessionNotFound. [Fact] public async Task KillWorkerAsync_WhenSessionMissing_ThrowsSessionNotFound() { SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient())); SessionManagerException exception = await Assert.ThrowsAsync( async () => await manager.KillWorkerAsync("session-missing", "test-kill", CancellationToken.None)); Assert.Equal(SessionManagerErrorCode.SessionNotFound, exception.ErrorCode); } /// /// Regression for Server-044: when session.KillWorker throws, the catch path must still /// decrement mxgateway.sessions.open (parity with the Server-006 fix in /// OpenSessionAsync). Without the fix the gauge leaks one open session per failed kill. /// [Fact] public async Task KillWorkerAsync_WhenSessionKillThrows_DecrementsOpenSessionGauge() { FakeWorkerClient workerClient = new() { KillException = new InvalidOperationException("worker kill failed"), }; using GatewayMetrics metrics = new(); SessionManager manager = CreateManager( new FakeSessionWorkerClientFactory(workerClient), metrics: metrics); GatewaySession session = await manager.OpenSessionAsync( CreateOpenRequest(), "client-1", CancellationToken.None); Assert.Equal(1, metrics.GetSnapshot().OpenSessions); SessionManagerException exception = await Assert.ThrowsAsync( async () => await manager.KillWorkerAsync(session.SessionId, "test-kill", CancellationToken.None)); Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode); Assert.False(manager.TryGetSession(session.SessionId, out _)); Assert.Equal(0, metrics.GetSnapshot().OpenSessions); Assert.True(metrics.GetSnapshot().Faults > 0); } /// /// Regression for Server-045 / Server-048: concurrent kills on the same session must not /// double-increment mxgateway.sessions.closed. The first kill wins, the second /// observes wasClosed == true (or a missing session after removal) and short-circuits. /// [Fact] public async Task KillWorkerAsync_ConcurrentCallsOnSameSession_CountClosedExactlyOnce() { FakeWorkerClient workerClient = new(); using GatewayMetrics metrics = new(); SessionManager manager = CreateManager( new FakeSessionWorkerClientFactory(workerClient), metrics: metrics); GatewaySession session = await manager.OpenSessionAsync( CreateOpenRequest(), "client-1", CancellationToken.None); Task first = manager.KillWorkerAsync(session.SessionId, "kill-a", CancellationToken.None); Task second = Task.Run(async () => { try { return await manager.KillWorkerAsync(session.SessionId, "kill-b", CancellationToken.None); } catch (SessionManagerException missing) when (missing.ErrorCode == SessionManagerErrorCode.SessionNotFound) { return new SessionCloseResult(session.SessionId, SessionState.Closed, AlreadyClosed: true); } }); await Task.WhenAll(first, second); Assert.Equal(1, metrics.GetSnapshot().SessionsClosed); Assert.Equal(0, metrics.GetSnapshot().OpenSessions); Assert.False(manager.TryGetSession(session.SessionId, out _)); } /// /// Regression for Server-046: ShutdownAsync's graceful-close fallback (which calls /// KillWorker + RemoveSessionAsync when CloseSessionCoreAsync throws) /// must still account a successful close: both the open-session gauge must drop to zero AND /// the mxgateway.sessions.closed counter must increment. Without the fix, the /// graceful-close failure path under-counts the closed counter. /// [Fact] public async Task ShutdownAsync_WhenSessionCloseThrows_StillDecrementsOpenSessionGaugeAndIncrementsClosedCounter() { FakeWorkerClient throwingClient = new() { ShutdownException = new InvalidOperationException("worker shutdown failed"), }; using GatewayMetrics metrics = new(); SessionManager manager = CreateManager( new FakeSessionWorkerClientFactory(throwingClient), metrics: metrics); GatewaySession session = await manager.OpenSessionAsync( CreateOpenRequest(), "client-1", CancellationToken.None); Assert.Equal(1, metrics.GetSnapshot().OpenSessions); await manager.ShutdownAsync(CancellationToken.None); // After shutdown, regardless of whether the graceful close path or the kill fallback ran, // the open-session gauge must be zero and the closed counter must be incremented. Assert.Equal(0, metrics.GetSnapshot().OpenSessions); Assert.Equal(1, metrics.GetSnapshot().SessionsClosed); Assert.False(manager.TryGetSession(session.SessionId, out _)); } /// Verifies that when worker creation fails, the session is removed from the registry. [Fact] public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry() { SessionRegistry registry = new(); using GatewayMetrics metrics = new(); SessionManager manager = CreateManager( new FailingSessionWorkerClientFactory(), registry, metrics); SessionManagerException exception = await Assert.ThrowsAsync( async () => await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None)); Assert.Equal(SessionManagerErrorCode.OpenFailed, exception.ErrorCode); Assert.Equal(0, registry.Count); Assert.Equal(0, metrics.GetSnapshot().SessionsOpened); Assert.Equal(1, metrics.GetSnapshot().Faults); } /// Verifies that closing expired leases only closes expired sessions. [Fact] public async Task CloseExpiredLeasesAsync_ClosesExpiredSessionsOnly() { FakeWorkerClient expiredClient = new(); FakeWorkerClient activeClient = new(); QueueingSessionWorkerClientFactory factory = new(expiredClient, activeClient); SessionManager manager = CreateManager(factory); GatewaySession expiredSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); GatewaySession activeSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None); DateTimeOffset now = DateTimeOffset.UtcNow; expiredSession.ExtendLease(now.AddSeconds(-1)); activeSession.ExtendLease(now.AddMinutes(5)); int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None); Assert.Equal(1, closedCount); Assert.Equal(SessionState.Closed, expiredSession.State); Assert.Equal(SessionState.Ready, activeSession.State); Assert.Equal(1, expiredClient.ShutdownCount); Assert.Equal(0, activeClient.ShutdownCount); } /// Verifies that an expired-lease sweep leaves a session with an active event subscriber open. [Fact] public async Task CloseExpiredLeasesAsync_DoesNotCloseActiveEventSubscriber() { FakeWorkerClient workerClient = new(); SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient)); GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); DateTimeOffset now = DateTimeOffset.UtcNow; session.ExtendLease(now.AddSeconds(-1)); using IDisposable eventSubscriber = session.AttachEventSubscriber(allowMultipleSubscribers: false); int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None); Assert.Equal(0, closedCount); Assert.Equal(SessionState.Ready, session.State); Assert.Equal(0, workerClient.ShutdownCount); } [Fact] public async Task ShutdownAsync_ClosesAllRegisteredSessions() { FakeWorkerClient firstClient = new(); FakeWorkerClient secondClient = new(); QueueingSessionWorkerClientFactory factory = new(firstClient, secondClient); using GatewayMetrics metrics = new(); SessionManager manager = CreateManager(factory, metrics: metrics); GatewaySession firstSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); GatewaySession secondSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None); await manager.ShutdownAsync(CancellationToken.None); Assert.Equal(SessionState.Closed, firstSession.State); Assert.Equal(SessionState.Closed, secondSession.State); Assert.Equal(1, firstClient.ShutdownCount); Assert.Equal(1, secondClient.ShutdownCount); Assert.Equal(2, metrics.GetSnapshot().SessionsClosed); Assert.Equal(0, metrics.GetSnapshot().OpenSessions); } /// Creates a session manager for testing. /// Worker client factory. /// Session registry; defaults to a new registry. /// Metrics collector; defaults to a new instance. /// Gateway options; defaults to test defaults. /// Configured session manager. private static SessionManager CreateManager( ISessionWorkerClientFactory factory, ISessionRegistry? registry = null, GatewayMetrics? metrics = null, GatewayOptions? options = null, TimeProvider? timeProvider = null) { return new SessionManager( registry ?? new SessionRegistry(), factory, Options.Create(options ?? CreateOptions()), metrics ?? new GatewayMetrics(), timeProvider); } private static GatewayOptions CreateOptions( int maxSessions = 64, int defaultLeaseSeconds = 1800) { return new GatewayOptions { Sessions = new SessionOptions { DefaultCommandTimeoutSeconds = 30, MaxSessions = maxSessions, DefaultLeaseSeconds = defaultLeaseSeconds, }, Worker = new WorkerOptions { StartupTimeoutSeconds = 30, ShutdownTimeoutSeconds = 10, }, }; } private static SessionOpenRequest CreateOpenRequest() { return new SessionOpenRequest( RequestedBackend: null, ClientSessionName: "test-session", ClientCorrelationId: "client-correlation-1", CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5))); } private static WorkerCommand CreateCommand(MxCommandKind kind) { return new WorkerCommand { Command = new MxCommand { Kind = kind, }, }; } private sealed class FakeSessionWorkerClientFactory(IWorkerClient workerClient) : ISessionWorkerClientFactory { /// Gets the list of observed session states during worker creation. public List ObservedStates { get; } = []; /// Gets or sets a value indicating whether to apply lifecycle transitions during worker creation. public bool ApplyLifecycleTransitions { get; init; } /// public Task CreateAsync( GatewaySession session, CancellationToken cancellationToken) { ObservedStates.Add(session.State.ToString()); if (ApplyLifecycleTransitions) { session.TransitionTo(SessionState.WaitingForPipe); ObservedStates.Add(session.State.ToString()); session.TransitionTo(SessionState.Handshaking); ObservedStates.Add(session.State.ToString()); session.TransitionTo(SessionState.InitializingWorker); ObservedStates.Add(session.State.ToString()); } return Task.FromResult(workerClient); } } private sealed class QueueingSessionWorkerClientFactory : ISessionWorkerClientFactory { private readonly Queue _workerClients; /// Initializes a new instance of the class. /// Array of worker clients to queue. public QueueingSessionWorkerClientFactory(params IWorkerClient[] workerClients) { _workerClients = new Queue(workerClients); } /// public Task CreateAsync( GatewaySession session, CancellationToken cancellationToken) { return Task.FromResult(_workerClients.Dequeue()); } } private sealed class FailingSessionWorkerClientFactory : ISessionWorkerClientFactory { /// public Task CreateAsync( GatewaySession session, CancellationToken cancellationToken) { throw new InvalidOperationException("worker startup failed"); } } private sealed class FakeWorkerClient : IWorkerClient { /// Gets the session ID for the fake worker client. public string SessionId { get; init; } = "session-1"; /// Gets the process ID for the fake worker client. public int? ProcessId { get; init; } = 1234; /// Gets or sets the state of the fake worker client. public WorkerClientState State { get; set; } = WorkerClientState.Ready; /// Gets the last heartbeat timestamp for the fake worker client. public DateTimeOffset LastHeartbeatAt { get; init; } = DateTimeOffset.UtcNow; /// Gets the number of times invoke was called on the fake worker client. public int InvokeCount { get; private set; } /// Gets the number of times shutdown was called on the fake worker client. public int ShutdownCount { get; private set; } /// Gets the number of times kill was called on the fake worker client. public int KillCount { get; private set; } /// Gets the number of times dispose was called on the fake worker client. public int DisposeCount { get; private set; } /// Gets the exception to throw when shutdown is called, if any. public Exception? ShutdownException { get; init; } /// Gets the exception to throw when kill is called, if any. public Exception? KillException { get; init; } /// Gets a value indicating whether to block shutdown on the fake worker client. public bool BlockShutdown { get; init; } /// Gets the last command invoked on the fake worker client. public WorkerCommand? LastCommand { get; private set; } /// Gets the reply to return for invoke calls on the fake worker client. public WorkerCommandReply? InvokeReply { get; init; } private TaskCompletionSource ShutdownStarted { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); private TaskCompletionSource ShutdownReleased { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); /// public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } /// public Task InvokeAsync( WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken) { InvokeCount++; LastCommand = command; if (InvokeReply is not null) { return Task.FromResult(InvokeReply); } MxCommandKind kind = command.Command?.Kind ?? MxCommandKind.Unspecified; return Task.FromResult(new WorkerCommandReply { Reply = new MxCommandReply { SessionId = SessionId, CorrelationId = "correlation-1", Kind = kind, }, }); } /// public async IAsyncEnumerable ReadEventsAsync( [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) { await Task.CompletedTask; yield break; } /// public async Task ShutdownAsync( TimeSpan timeout, CancellationToken cancellationToken) { ShutdownCount++; if (ShutdownException is not null) { throw ShutdownException; } if (BlockShutdown) { ShutdownStarted.TrySetResult(); await ShutdownReleased.Task.WaitAsync(cancellationToken); } State = WorkerClientState.Closed; } /// public void Kill(string reason) { KillCount++; if (KillException is not null) { throw KillException; } State = WorkerClientState.Faulted; } /// public ValueTask DisposeAsync() { DisposeCount++; return ValueTask.CompletedTask; } /// Waits for shutdown to start on the fake worker client. public Task WaitForShutdownStartAsync() { return ShutdownStarted.Task.WaitAsync(TimeSpan.FromSeconds(5)); } /// Releases the shutdown block on the fake worker client. public void ReleaseShutdown() { ShutdownReleased.TrySetResult(); } } }