From 47b1fd422c4a42fb71d9c3a9a3c0aed3bb06faed Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 1 May 2026 11:10:13 -0400 Subject: [PATCH] A.3 (auto-subscribe): SessionManager issues SubscribeAlarms on session open MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the missing trigger that activates the worker's wnwrap consumer. Without this, every session opened in OK state but the consumer never started, so AcknowledgeAlarm/QueryActiveAlarms returned "alarm consumer not configured" forever. New AlarmsOptions config block (under MxGateway:Alarms): - Enabled (default false): gates the auto-subscribe path so existing deployments without alarm configuration are unaffected. - SubscriptionExpression: explicit AVEVA expression like \\Galaxy!. - DefaultArea: fallback used when SubscriptionExpression is empty; composes \$(MachineName)\Galaxy!$(DefaultArea). - RequireSubscribeOnOpen (default false): when true, an auto-subscribe failure faults the session; when false, the failure is logged and the session stays Ready (data subscriptions keep working, alarms return "not subscribed" until the operator retries). SessionManager.OpenSessionAsync gains a TryAutoSubscribeAlarmsAsync hook that runs after MarkReady. Skips when alarms are disabled; otherwise builds a SubscribeAlarmsCommand, invokes it on the session's worker client, and either logs the resulting status or escalates per RequireSubscribeOnOpen. SessionManagerException is the failure mode for the strict path so callers in MxAccessGatewayService surface it as session-open-failed. Tests: 7 new unit tests cover the disabled lane, expression-driven subscribe, DefaultArea fallback, success path, soft-failure (require off), strict-failure (require on), and missing-config-strict-throw. Server suite total: 295 pass / 0 fail. Solution builds clean. End-to-end alarms-over-gateway path is now live (with config). Open a session against a gateway with Alarms.Enabled=true + a valid SubscriptionExpression; the worker's wnwrap consumer auto-subscribes; QueryActiveAlarms streams snapshots; AcknowledgeAlarm acks by GUID. Reference→GUID resolution (AlarmAckByName worker command) and the live dev-rig smoke test remain follow-ups. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Configuration/AlarmsOptions.cs | 48 ++++ .../Configuration/GatewayOptions.cs | 7 + .../Sessions/SessionManager.cs | 99 +++++++ .../SessionManagerAlarmAutoSubscribeTests.cs | 266 ++++++++++++++++++ 4 files changed, 420 insertions(+) create mode 100644 src/MxGateway.Server/Configuration/AlarmsOptions.cs create mode 100644 src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs diff --git a/src/MxGateway.Server/Configuration/AlarmsOptions.cs b/src/MxGateway.Server/Configuration/AlarmsOptions.cs new file mode 100644 index 0000000..2722fac --- /dev/null +++ b/src/MxGateway.Server/Configuration/AlarmsOptions.cs @@ -0,0 +1,48 @@ +namespace MxGateway.Server.Configuration; + +/// +/// Per-gateway alarm-subsystem configuration. Drives the auto-subscribe +/// hook in : when +/// is true and a session reaches Ready, the +/// manager issues a SubscribeAlarmsCommand to the worker with +/// the configured . +/// +/// +/// Defaults preserve current behaviour (alarms disabled). Operators +/// opt in by setting MxGateway:Alarms:Enabled = true and +/// supplying a canonical +/// \\<machine>\Galaxy!<area> subscription +/// expression. The literal "Galaxy" provider is correct regardless of +/// the configured Galaxy database name (the wnwrap consumer doesn't +/// accept the database name as the provider). +/// +public sealed class AlarmsOptions +{ + /// Gate the auto-subscribe hook on session open. Default false. + public bool Enabled { get; init; } + + /// + /// AVEVA alarm-subscription expression. When empty and + /// is true, the gateway falls back to + /// \\$(MachineName)\Galaxy!$(DefaultArea) if + /// is set; otherwise the session open + /// fails with a configuration diagnostic. + /// + public string SubscriptionExpression { get; init; } = string.Empty; + + /// + /// Optional area name used to compose a default subscription when + /// is empty. Combined with + /// Environment.MachineName as + /// \\<MachineName>\Galaxy!<DefaultArea>. + /// + public string DefaultArea { get; init; } = string.Empty; + + /// + /// If true, an auto-subscribe failure faults the session. If false + /// (default), the failure is logged and the session remains Ready — + /// alarm-side commands return "not subscribed" but data subscriptions + /// work normally. + /// + public bool RequireSubscribeOnOpen { get; init; } +} diff --git a/src/MxGateway.Server/Configuration/GatewayOptions.cs b/src/MxGateway.Server/Configuration/GatewayOptions.cs index d2a7713..312dd4a 100644 --- a/src/MxGateway.Server/Configuration/GatewayOptions.cs +++ b/src/MxGateway.Server/Configuration/GatewayOptions.cs @@ -35,4 +35,11 @@ public sealed class GatewayOptions /// Gets protocol configuration options. /// public ProtocolOptions Protocol { get; init; } = new(); + + /// + /// Gets alarm-subsystem configuration options. Drives the gateway's + /// auto-subscribe-on-session-open hook; default values preserve legacy + /// behaviour (alarms disabled). + /// + public AlarmsOptions Alarms { get; init; } = new(); } diff --git a/src/MxGateway.Server/Sessions/SessionManager.cs b/src/MxGateway.Server/Sessions/SessionManager.cs index cef157b..3ea5a3f 100644 --- a/src/MxGateway.Server/Sessions/SessionManager.cs +++ b/src/MxGateway.Server/Sessions/SessionManager.cs @@ -87,6 +87,8 @@ public sealed class SessionManager : ISessionManager session.MarkReady(); _metrics.SessionOpened(); + await TryAutoSubscribeAlarmsAsync(session, cancellationToken).ConfigureAwait(false); + return session; } catch (Exception exception) @@ -396,4 +398,101 @@ public sealed class SessionManager : ISessionManager return Convert.ToBase64String(bytes); } + + /// + /// If Alarms.Enabled is configured, issue a + /// SubscribeAlarmsCommand on the freshly-Ready session so the + /// worker's wnwrap consumer starts polling. Failure handling is + /// governed by Alarms.RequireSubscribeOnOpen: + /// + /// true — propagate the failure to fault the session. + /// false (default) — log a warning and let the session continue serving data subscriptions. + /// + /// + private async Task TryAutoSubscribeAlarmsAsync( + GatewaySession session, + CancellationToken cancellationToken) + { + AlarmsOptions alarms = _options.Alarms; + if (!alarms.Enabled) return; + + string subscription = ResolveAlarmSubscription(alarms); + if (string.IsNullOrWhiteSpace(subscription)) + { + const string diagnostic = + "Alarms.Enabled is true but no SubscriptionExpression / DefaultArea is configured."; + if (alarms.RequireSubscribeOnOpen) + { + throw new SessionManagerException( + SessionManagerErrorCode.OpenFailed, diagnostic); + } + _logger.LogWarning( + "Auto-subscribe skipped for session {SessionId}: {Diagnostic}", + session.SessionId, diagnostic); + return; + } + + WorkerCommand command = new WorkerCommand + { + Command = new MxCommand + { + Kind = MxCommandKind.SubscribeAlarms, + SubscribeAlarms = new SubscribeAlarmsCommand + { + SubscriptionExpression = subscription, + }, + }, + EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()), + }; + + try + { + WorkerCommandReply reply = await session.InvokeAsync(command, cancellationToken) + .ConfigureAwait(false); + ProtocolStatusCode? code = reply.Reply?.ProtocolStatus?.Code; + if (code != ProtocolStatusCode.Ok) + { + string diagnostic = reply.Reply?.DiagnosticMessage + ?? reply.Reply?.ProtocolStatus?.Message + ?? "Worker rejected SubscribeAlarms."; + if (alarms.RequireSubscribeOnOpen) + { + throw new SessionManagerException( + SessionManagerErrorCode.OpenFailed, + $"Auto-subscribe failed for session {session.SessionId}: {diagnostic}"); + } + _logger.LogWarning( + "Auto-subscribe failed for session {SessionId} (status {StatusCode}): {Diagnostic}", + session.SessionId, code, diagnostic); + return; + } + _logger.LogInformation( + "Alarm auto-subscribe succeeded for session {SessionId} on {Subscription}.", + session.SessionId, subscription); + } + catch (SessionManagerException) + { + throw; + } + catch (Exception ex) when (!alarms.RequireSubscribeOnOpen) + { + _logger.LogWarning( + ex, + "Auto-subscribe threw for session {SessionId} on {Subscription}; alarm path remains inactive.", + session.SessionId, subscription); + } + } + + private static string ResolveAlarmSubscription(AlarmsOptions alarms) + { + if (!string.IsNullOrWhiteSpace(alarms.SubscriptionExpression)) + { + return alarms.SubscriptionExpression; + } + if (!string.IsNullOrWhiteSpace(alarms.DefaultArea)) + { + return $@"\\{Environment.MachineName}\Galaxy!{alarms.DefaultArea}"; + } + return string.Empty; + } } diff --git a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs new file mode 100644 index 0000000..89dfc84 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs @@ -0,0 +1,266 @@ +using System.Runtime.CompilerServices; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Options; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Metrics; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Sessions; + +/// +/// Pins the alarm auto-subscribe hook on session open. Runs in +/// its own file because the cases are orthogonal to +/// (alarms-disabled vs. +/// alarms-enabled lanes), and the fake worker client below verifies +/// the issued SubscribeAlarms command shape directly. +/// +public sealed class SessionManagerAlarmAutoSubscribeTests +{ + [Fact] + public async Task OpenSessionAsync_DoesNotAutoSubscribe_WhenAlarmsDisabled() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions { Enabled = false }); + + await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(0, worker.SubscribeAlarmsInvokeCount); + } + + [Fact] + public async Task OpenSessionAsync_AutoSubscribes_WhenEnabledWithExpression() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + SubscriptionExpression = @"\\HOST\Galaxy!Area1", + }); + + GatewaySession session = await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(SessionState.Ready, session.State); + Assert.Equal(1, worker.SubscribeAlarmsInvokeCount); + Assert.Equal(@"\\HOST\Galaxy!Area1", + worker.LastSubscribeAlarmsCommand!.SubscriptionExpression); + } + + [Fact] + public async Task OpenSessionAsync_FallsBackToDefaultArea_WhenExpressionEmpty() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + DefaultArea = "DEV", + }); + + await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(1, worker.SubscribeAlarmsInvokeCount); + Assert.Contains(@"\Galaxy!DEV", + worker.LastSubscribeAlarmsCommand!.SubscriptionExpression); + } + + [Fact] + public async Task OpenSessionAsync_Succeeds_WhenAutoSubscribeFailsWithRequireOff() + { + // Worker rejects the SubscribeAlarms command. With RequireSubscribeOnOpen=false + // (the default), the session still opens — alarm-side commands later return + // "not subscribed", but data subscriptions work. + AlarmAutoSubscribeWorkerClient worker = new() + { + SubscribeAlarmsReplyFactory = _ => new MxCommandReply + { + Kind = MxCommandKind.SubscribeAlarms, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.MxaccessFailure, + Message = "wnwrap subscribe failed", + }, + DiagnosticMessage = "alarm provider unavailable", + }, + }; + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + SubscriptionExpression = @"\\HOST\Galaxy!Area1", + RequireSubscribeOnOpen = false, + }); + + GatewaySession session = await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(SessionState.Ready, session.State); + Assert.Equal(1, worker.SubscribeAlarmsInvokeCount); + } + + [Fact] + public async Task OpenSessionAsync_Throws_WhenAutoSubscribeFailsWithRequireOn() + { + AlarmAutoSubscribeWorkerClient worker = new() + { + SubscribeAlarmsReplyFactory = _ => new MxCommandReply + { + Kind = MxCommandKind.SubscribeAlarms, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.MxaccessFailure, + Message = "wnwrap subscribe failed", + }, + }, + }; + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + SubscriptionExpression = @"\\HOST\Galaxy!Area1", + RequireSubscribeOnOpen = true, + }); + + await Assert.ThrowsAsync( + async () => await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None)); + } + + [Fact] + public async Task OpenSessionAsync_Throws_WhenEnabledButNoExpressionAndRequireOn() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + // No SubscriptionExpression and no DefaultArea. + RequireSubscribeOnOpen = true, + }); + + await Assert.ThrowsAsync( + async () => await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None)); + Assert.Equal(0, worker.SubscribeAlarmsInvokeCount); + } + + [Fact] + public async Task OpenSessionAsync_Succeeds_WhenEnabledButNoExpressionAndRequireOff() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + // No SubscriptionExpression and no DefaultArea — default require=false. + }); + + GatewaySession session = await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(SessionState.Ready, session.State); + Assert.Equal(0, worker.SubscribeAlarmsInvokeCount); + } + + private static SessionManager NewManager( + AlarmAutoSubscribeWorkerClient worker, + AlarmsOptions alarms) + { + FakeSessionWorkerClientFactory factory = new(worker); + GatewayOptions options = new GatewayOptions + { + Sessions = new SessionOptions + { + DefaultCommandTimeoutSeconds = 30, + MaxSessions = 64, + DefaultLeaseSeconds = 1800, + }, + Worker = new WorkerOptions + { + StartupTimeoutSeconds = 30, + ShutdownTimeoutSeconds = 10, + }, + Alarms = alarms, + }; + return new SessionManager( + new SessionRegistry(), + factory, + Options.Create(options), + new GatewayMetrics()); + } + + private static SessionOpenRequest CreateOpenRequest() + { + return new SessionOpenRequest( + RequestedBackend: null, + ClientSessionName: "test-session", + ClientCorrelationId: "client-correlation-1", + CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5))); + } + + private sealed class FakeSessionWorkerClientFactory(IWorkerClient client) : ISessionWorkerClientFactory + { + public Task CreateAsync( + GatewaySession session, + CancellationToken cancellationToken) + { + return Task.FromResult(client); + } + } + + private sealed class AlarmAutoSubscribeWorkerClient : IWorkerClient + { + public string SessionId { get; } = "session-1"; + public int? ProcessId { get; } = 1234; + public WorkerClientState State { get; set; } = WorkerClientState.Ready; + public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; + + public int SubscribeAlarmsInvokeCount { get; private set; } + public SubscribeAlarmsCommand? LastSubscribeAlarmsCommand { get; private set; } + public Func? SubscribeAlarmsReplyFactory { get; init; } + + public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; + + public Task InvokeAsync( + WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken) + { + if (command.Command?.Kind == MxCommandKind.SubscribeAlarms) + { + SubscribeAlarmsInvokeCount++; + LastSubscribeAlarmsCommand = command.Command.SubscribeAlarms; + MxCommandReply reply = SubscribeAlarmsReplyFactory?.Invoke(command) + ?? new MxCommandReply + { + Kind = MxCommandKind.SubscribeAlarms, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = "OK", + }, + }; + return Task.FromResult(new WorkerCommandReply { Reply = reply }); + } + return Task.FromResult(new WorkerCommandReply + { + Reply = new MxCommandReply + { + Kind = command.Command?.Kind ?? MxCommandKind.Unspecified, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = "OK", + }, + }, + }); + } + + public async IAsyncEnumerable ReadEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask; + yield break; + } + + public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) + => Task.CompletedTask; + public void Kill(string reason) { } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } +}