Files
mxaccessgw/src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs
T
Joseph Doherty 1d9e3afadd Resolve Server-002, -004, -005, -006 code-review findings
Server-002: the gateway never terminated leftover MxGateway.Worker.exe
processes at startup, contradicting gateway.md and CLAUDE.md. Added
IRunningProcessInspector/SystemRunningProcessInspector, OrphanWorkerTerminator,
and OrphanWorkerCleanupHostedService (best-effort, runs before sessions are
accepted); updated gateway.md to describe the implemented behavior.

Server-004: API-key scopes were persisted verbatim with no validation. Added
GatewayScopes.All/IsKnown; the CLI parser and dashboard create path now
reject unknown scope strings.

Server-005: a non-SqlException/InvalidOperationException fault on the initial
Galaxy hierarchy load faulted the BackgroundService. ExecuteAsync now catches
all non-cancellation exceptions on first load and RefreshCoreAsync broadens
its catch so the cache records Stale/Unavailable instead.

Server-006: OpenSessionAsync incremented the open-sessions gauge before
alarm auto-subscribe; an auto-subscribe failure leaked the gauge. The catch
path now calls SessionRemoved() when the gauge was incremented.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 21:31:10 -04:00

306 lines
11 KiB
C#

using System.Runtime.CompilerServices;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// Pins the alarm auto-subscribe hook on session open. Runs in
/// its own file because the cases are orthogonal to
/// <see cref="SessionManagerTests"/> (alarms-disabled vs.
/// alarms-enabled lanes), and the fake worker client below verifies
/// the issued <c>SubscribeAlarms</c> command shape directly.
/// </summary>
public sealed class SessionManagerAlarmAutoSubscribeTests
{
[Fact]
public async Task OpenSessionAsync_DoesNotAutoSubscribe_WhenAlarmsDisabled()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions { Enabled = false });
await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
}
[Fact]
public async Task OpenSessionAsync_AutoSubscribes_WhenEnabledWithExpression()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
});
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
Assert.Equal(@"\\HOST\Galaxy!Area1",
worker.LastSubscribeAlarmsCommand!.SubscriptionExpression);
}
[Fact]
public async Task OpenSessionAsync_FallsBackToDefaultArea_WhenExpressionEmpty()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
DefaultArea = "DEV",
});
await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
Assert.Contains(@"\Galaxy!DEV",
worker.LastSubscribeAlarmsCommand!.SubscriptionExpression);
}
[Fact]
public async Task OpenSessionAsync_Succeeds_WhenAutoSubscribeFailsWithRequireOff()
{
// Worker rejects the SubscribeAlarms command. With RequireSubscribeOnOpen=false
// (the default), the session still opens — alarm-side commands later return
// "not subscribed", but data subscriptions work.
AlarmAutoSubscribeWorkerClient worker = new()
{
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.MxaccessFailure,
Message = "wnwrap subscribe failed",
},
DiagnosticMessage = "alarm provider unavailable",
},
};
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
RequireSubscribeOnOpen = false,
});
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
}
[Fact]
public async Task OpenSessionAsync_Throws_WhenAutoSubscribeFailsWithRequireOn()
{
AlarmAutoSubscribeWorkerClient worker = new()
{
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.MxaccessFailure,
Message = "wnwrap subscribe failed",
},
},
};
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
RequireSubscribeOnOpen = true,
});
await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None));
}
/// <summary>
/// Server-006 regression: when auto-subscribe throws after
/// <c>SessionOpened()</c> incremented the open-session gauge, the failed
/// open must not leave <c>mxgateway.sessions.open</c> over-counted.
/// </summary>
[Fact]
public async Task OpenSessionAsync_DoesNotLeakOpenSessionGauge_WhenAutoSubscribeFailsWithRequireOn()
{
AlarmAutoSubscribeWorkerClient worker = new()
{
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.MxaccessFailure,
Message = "wnwrap subscribe failed",
},
},
};
using GatewayMetrics metrics = new();
SessionManager manager = NewManager(
worker,
alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
RequireSubscribeOnOpen = true,
},
metrics: metrics);
await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None));
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
}
[Fact]
public async Task OpenSessionAsync_Throws_WhenEnabledButNoExpressionAndRequireOn()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
// No SubscriptionExpression and no DefaultArea.
RequireSubscribeOnOpen = true,
});
await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None));
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
}
[Fact]
public async Task OpenSessionAsync_Succeeds_WhenEnabledButNoExpressionAndRequireOff()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
// No SubscriptionExpression and no DefaultArea — default require=false.
});
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
}
private static SessionManager NewManager(
AlarmAutoSubscribeWorkerClient worker,
AlarmsOptions alarms,
GatewayMetrics? metrics = null)
{
FakeSessionWorkerClientFactory factory = new(worker);
GatewayOptions options = new GatewayOptions
{
Sessions = new SessionOptions
{
DefaultCommandTimeoutSeconds = 30,
MaxSessions = 64,
DefaultLeaseSeconds = 1800,
},
Worker = new WorkerOptions
{
StartupTimeoutSeconds = 30,
ShutdownTimeoutSeconds = 10,
},
Alarms = alarms,
};
return new SessionManager(
new SessionRegistry(),
factory,
Options.Create(options),
metrics ?? new GatewayMetrics());
}
private static SessionOpenRequest CreateOpenRequest()
{
return new SessionOpenRequest(
RequestedBackend: null,
ClientSessionName: "test-session",
ClientCorrelationId: "client-correlation-1",
CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5)));
}
private sealed class FakeSessionWorkerClientFactory(IWorkerClient client) : ISessionWorkerClientFactory
{
public Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
return Task.FromResult(client);
}
}
private sealed class AlarmAutoSubscribeWorkerClient : IWorkerClient
{
public string SessionId { get; } = "session-1";
public int? ProcessId { get; } = 1234;
public WorkerClientState State { get; set; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
public int SubscribeAlarmsInvokeCount { get; private set; }
public SubscribeAlarmsCommand? LastSubscribeAlarmsCommand { get; private set; }
public Func<WorkerCommand, MxCommandReply>? SubscribeAlarmsReplyFactory { get; init; }
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken)
{
if (command.Command?.Kind == MxCommandKind.SubscribeAlarms)
{
SubscribeAlarmsInvokeCount++;
LastSubscribeAlarmsCommand = command.Command.SubscribeAlarms;
MxCommandReply reply = SubscribeAlarmsReplyFactory?.Invoke(command)
?? new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
};
return Task.FromResult(new WorkerCommandReply { Reply = reply });
}
return Task.FromResult(new WorkerCommandReply
{
Reply = new MxCommandReply
{
Kind = command.Command?.Kind ?? MxCommandKind.Unspecified,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
},
});
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken)
=> Task.CompletedTask;
public void Kill(string reason) { }
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
}