diff --git a/src/MxGateway.Server/Configuration/AlarmsOptions.cs b/src/MxGateway.Server/Configuration/AlarmsOptions.cs new file mode 100644 index 0000000..2722fac --- /dev/null +++ b/src/MxGateway.Server/Configuration/AlarmsOptions.cs @@ -0,0 +1,48 @@ +namespace MxGateway.Server.Configuration; + +/// +/// Per-gateway alarm-subsystem configuration. Drives the auto-subscribe +/// hook in : when +/// is true and a session reaches Ready, the +/// manager issues a SubscribeAlarmsCommand to the worker with +/// the configured . +/// +/// +/// Defaults preserve current behaviour (alarms disabled). Operators +/// opt in by setting MxGateway:Alarms:Enabled = true and +/// supplying a canonical +/// \\<machine>\Galaxy!<area> subscription +/// expression. The literal "Galaxy" provider is correct regardless of +/// the configured Galaxy database name (the wnwrap consumer doesn't +/// accept the database name as the provider). +/// +public sealed class AlarmsOptions +{ + /// Gate the auto-subscribe hook on session open. Default false. + public bool Enabled { get; init; } + + /// + /// AVEVA alarm-subscription expression. When empty and + /// is true, the gateway falls back to + /// \\$(MachineName)\Galaxy!$(DefaultArea) if + /// is set; otherwise the session open + /// fails with a configuration diagnostic. + /// + public string SubscriptionExpression { get; init; } = string.Empty; + + /// + /// Optional area name used to compose a default subscription when + /// is empty. Combined with + /// Environment.MachineName as + /// \\<MachineName>\Galaxy!<DefaultArea>. + /// + public string DefaultArea { get; init; } = string.Empty; + + /// + /// If true, an auto-subscribe failure faults the session. If false + /// (default), the failure is logged and the session remains Ready — + /// alarm-side commands return "not subscribed" but data subscriptions + /// work normally. + /// + public bool RequireSubscribeOnOpen { get; init; } +} diff --git a/src/MxGateway.Server/Configuration/GatewayOptions.cs b/src/MxGateway.Server/Configuration/GatewayOptions.cs index d2a7713..312dd4a 100644 --- a/src/MxGateway.Server/Configuration/GatewayOptions.cs +++ b/src/MxGateway.Server/Configuration/GatewayOptions.cs @@ -35,4 +35,11 @@ public sealed class GatewayOptions /// Gets protocol configuration options. /// public ProtocolOptions Protocol { get; init; } = new(); + + /// + /// Gets alarm-subsystem configuration options. Drives the gateway's + /// auto-subscribe-on-session-open hook; default values preserve legacy + /// behaviour (alarms disabled). + /// + public AlarmsOptions Alarms { get; init; } = new(); } diff --git a/src/MxGateway.Server/Sessions/SessionManager.cs b/src/MxGateway.Server/Sessions/SessionManager.cs index cef157b..3ea5a3f 100644 --- a/src/MxGateway.Server/Sessions/SessionManager.cs +++ b/src/MxGateway.Server/Sessions/SessionManager.cs @@ -87,6 +87,8 @@ public sealed class SessionManager : ISessionManager session.MarkReady(); _metrics.SessionOpened(); + await TryAutoSubscribeAlarmsAsync(session, cancellationToken).ConfigureAwait(false); + return session; } catch (Exception exception) @@ -396,4 +398,101 @@ public sealed class SessionManager : ISessionManager return Convert.ToBase64String(bytes); } + + /// + /// If Alarms.Enabled is configured, issue a + /// SubscribeAlarmsCommand on the freshly-Ready session so the + /// worker's wnwrap consumer starts polling. Failure handling is + /// governed by Alarms.RequireSubscribeOnOpen: + /// + /// true — propagate the failure to fault the session. + /// false (default) — log a warning and let the session continue serving data subscriptions. + /// + /// + private async Task TryAutoSubscribeAlarmsAsync( + GatewaySession session, + CancellationToken cancellationToken) + { + AlarmsOptions alarms = _options.Alarms; + if (!alarms.Enabled) return; + + string subscription = ResolveAlarmSubscription(alarms); + if (string.IsNullOrWhiteSpace(subscription)) + { + const string diagnostic = + "Alarms.Enabled is true but no SubscriptionExpression / DefaultArea is configured."; + if (alarms.RequireSubscribeOnOpen) + { + throw new SessionManagerException( + SessionManagerErrorCode.OpenFailed, diagnostic); + } + _logger.LogWarning( + "Auto-subscribe skipped for session {SessionId}: {Diagnostic}", + session.SessionId, diagnostic); + return; + } + + WorkerCommand command = new WorkerCommand + { + Command = new MxCommand + { + Kind = MxCommandKind.SubscribeAlarms, + SubscribeAlarms = new SubscribeAlarmsCommand + { + SubscriptionExpression = subscription, + }, + }, + EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()), + }; + + try + { + WorkerCommandReply reply = await session.InvokeAsync(command, cancellationToken) + .ConfigureAwait(false); + ProtocolStatusCode? code = reply.Reply?.ProtocolStatus?.Code; + if (code != ProtocolStatusCode.Ok) + { + string diagnostic = reply.Reply?.DiagnosticMessage + ?? reply.Reply?.ProtocolStatus?.Message + ?? "Worker rejected SubscribeAlarms."; + if (alarms.RequireSubscribeOnOpen) + { + throw new SessionManagerException( + SessionManagerErrorCode.OpenFailed, + $"Auto-subscribe failed for session {session.SessionId}: {diagnostic}"); + } + _logger.LogWarning( + "Auto-subscribe failed for session {SessionId} (status {StatusCode}): {Diagnostic}", + session.SessionId, code, diagnostic); + return; + } + _logger.LogInformation( + "Alarm auto-subscribe succeeded for session {SessionId} on {Subscription}.", + session.SessionId, subscription); + } + catch (SessionManagerException) + { + throw; + } + catch (Exception ex) when (!alarms.RequireSubscribeOnOpen) + { + _logger.LogWarning( + ex, + "Auto-subscribe threw for session {SessionId} on {Subscription}; alarm path remains inactive.", + session.SessionId, subscription); + } + } + + private static string ResolveAlarmSubscription(AlarmsOptions alarms) + { + if (!string.IsNullOrWhiteSpace(alarms.SubscriptionExpression)) + { + return alarms.SubscriptionExpression; + } + if (!string.IsNullOrWhiteSpace(alarms.DefaultArea)) + { + return $@"\\{Environment.MachineName}\Galaxy!{alarms.DefaultArea}"; + } + return string.Empty; + } } diff --git a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs new file mode 100644 index 0000000..89dfc84 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs @@ -0,0 +1,266 @@ +using System.Runtime.CompilerServices; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Options; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Metrics; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Sessions; + +/// +/// Pins the alarm auto-subscribe hook on session open. Runs in +/// its own file because the cases are orthogonal to +/// (alarms-disabled vs. +/// alarms-enabled lanes), and the fake worker client below verifies +/// the issued SubscribeAlarms command shape directly. +/// +public sealed class SessionManagerAlarmAutoSubscribeTests +{ + [Fact] + public async Task OpenSessionAsync_DoesNotAutoSubscribe_WhenAlarmsDisabled() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions { Enabled = false }); + + await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(0, worker.SubscribeAlarmsInvokeCount); + } + + [Fact] + public async Task OpenSessionAsync_AutoSubscribes_WhenEnabledWithExpression() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + SubscriptionExpression = @"\\HOST\Galaxy!Area1", + }); + + GatewaySession session = await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(SessionState.Ready, session.State); + Assert.Equal(1, worker.SubscribeAlarmsInvokeCount); + Assert.Equal(@"\\HOST\Galaxy!Area1", + worker.LastSubscribeAlarmsCommand!.SubscriptionExpression); + } + + [Fact] + public async Task OpenSessionAsync_FallsBackToDefaultArea_WhenExpressionEmpty() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + DefaultArea = "DEV", + }); + + await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(1, worker.SubscribeAlarmsInvokeCount); + Assert.Contains(@"\Galaxy!DEV", + worker.LastSubscribeAlarmsCommand!.SubscriptionExpression); + } + + [Fact] + public async Task OpenSessionAsync_Succeeds_WhenAutoSubscribeFailsWithRequireOff() + { + // Worker rejects the SubscribeAlarms command. With RequireSubscribeOnOpen=false + // (the default), the session still opens — alarm-side commands later return + // "not subscribed", but data subscriptions work. + AlarmAutoSubscribeWorkerClient worker = new() + { + SubscribeAlarmsReplyFactory = _ => new MxCommandReply + { + Kind = MxCommandKind.SubscribeAlarms, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.MxaccessFailure, + Message = "wnwrap subscribe failed", + }, + DiagnosticMessage = "alarm provider unavailable", + }, + }; + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + SubscriptionExpression = @"\\HOST\Galaxy!Area1", + RequireSubscribeOnOpen = false, + }); + + GatewaySession session = await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(SessionState.Ready, session.State); + Assert.Equal(1, worker.SubscribeAlarmsInvokeCount); + } + + [Fact] + public async Task OpenSessionAsync_Throws_WhenAutoSubscribeFailsWithRequireOn() + { + AlarmAutoSubscribeWorkerClient worker = new() + { + SubscribeAlarmsReplyFactory = _ => new MxCommandReply + { + Kind = MxCommandKind.SubscribeAlarms, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.MxaccessFailure, + Message = "wnwrap subscribe failed", + }, + }, + }; + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + SubscriptionExpression = @"\\HOST\Galaxy!Area1", + RequireSubscribeOnOpen = true, + }); + + await Assert.ThrowsAsync( + async () => await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None)); + } + + [Fact] + public async Task OpenSessionAsync_Throws_WhenEnabledButNoExpressionAndRequireOn() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + // No SubscriptionExpression and no DefaultArea. + RequireSubscribeOnOpen = true, + }); + + await Assert.ThrowsAsync( + async () => await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None)); + Assert.Equal(0, worker.SubscribeAlarmsInvokeCount); + } + + [Fact] + public async Task OpenSessionAsync_Succeeds_WhenEnabledButNoExpressionAndRequireOff() + { + AlarmAutoSubscribeWorkerClient worker = new(); + SessionManager manager = NewManager(worker, alarms: new AlarmsOptions + { + Enabled = true, + // No SubscriptionExpression and no DefaultArea — default require=false. + }); + + GatewaySession session = await manager.OpenSessionAsync( + CreateOpenRequest(), "client-1", CancellationToken.None); + + Assert.Equal(SessionState.Ready, session.State); + Assert.Equal(0, worker.SubscribeAlarmsInvokeCount); + } + + private static SessionManager NewManager( + AlarmAutoSubscribeWorkerClient worker, + AlarmsOptions alarms) + { + FakeSessionWorkerClientFactory factory = new(worker); + GatewayOptions options = new GatewayOptions + { + Sessions = new SessionOptions + { + DefaultCommandTimeoutSeconds = 30, + MaxSessions = 64, + DefaultLeaseSeconds = 1800, + }, + Worker = new WorkerOptions + { + StartupTimeoutSeconds = 30, + ShutdownTimeoutSeconds = 10, + }, + Alarms = alarms, + }; + return new SessionManager( + new SessionRegistry(), + factory, + Options.Create(options), + new GatewayMetrics()); + } + + private static SessionOpenRequest CreateOpenRequest() + { + return new SessionOpenRequest( + RequestedBackend: null, + ClientSessionName: "test-session", + ClientCorrelationId: "client-correlation-1", + CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5))); + } + + private sealed class FakeSessionWorkerClientFactory(IWorkerClient client) : ISessionWorkerClientFactory + { + public Task CreateAsync( + GatewaySession session, + CancellationToken cancellationToken) + { + return Task.FromResult(client); + } + } + + private sealed class AlarmAutoSubscribeWorkerClient : IWorkerClient + { + public string SessionId { get; } = "session-1"; + public int? ProcessId { get; } = 1234; + public WorkerClientState State { get; set; } = WorkerClientState.Ready; + public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; + + public int SubscribeAlarmsInvokeCount { get; private set; } + public SubscribeAlarmsCommand? LastSubscribeAlarmsCommand { get; private set; } + public Func? SubscribeAlarmsReplyFactory { get; init; } + + public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; + + public Task InvokeAsync( + WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken) + { + if (command.Command?.Kind == MxCommandKind.SubscribeAlarms) + { + SubscribeAlarmsInvokeCount++; + LastSubscribeAlarmsCommand = command.Command.SubscribeAlarms; + MxCommandReply reply = SubscribeAlarmsReplyFactory?.Invoke(command) + ?? new MxCommandReply + { + Kind = MxCommandKind.SubscribeAlarms, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = "OK", + }, + }; + return Task.FromResult(new WorkerCommandReply { Reply = reply }); + } + return Task.FromResult(new WorkerCommandReply + { + Reply = new MxCommandReply + { + Kind = command.Command?.Kind ?? MxCommandKind.Unspecified, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = "OK", + }, + }, + }); + } + + public async IAsyncEnumerable ReadEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask; + yield break; + } + + public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) + => Task.CompletedTask; + public void Kill(string reason) { } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } +}