A.3 (auto-subscribe): SessionManager issues SubscribeAlarms on session open

Adds the missing trigger that activates the worker's wnwrap consumer.
Without this, every session opened in OK state but the consumer never
started, so AcknowledgeAlarm/QueryActiveAlarms returned "alarm consumer
not configured" forever.

New AlarmsOptions config block (under MxGateway:Alarms):
  - Enabled (default false): gates the auto-subscribe path so existing
    deployments without alarm configuration are unaffected.
  - SubscriptionExpression: explicit AVEVA expression like
    \<machine>\Galaxy!<area>.
  - DefaultArea: fallback used when SubscriptionExpression is empty;
    composes \$(MachineName)\Galaxy!$(DefaultArea).
  - RequireSubscribeOnOpen (default false): when true, an auto-subscribe
    failure faults the session; when false, the failure is logged and
    the session stays Ready (data subscriptions keep working, alarms
    return "not subscribed" until the operator retries).

SessionManager.OpenSessionAsync gains a TryAutoSubscribeAlarmsAsync hook
that runs after MarkReady. Skips when alarms are disabled; otherwise
builds a SubscribeAlarmsCommand, invokes it on the session's worker
client, and either logs the resulting status or escalates per
RequireSubscribeOnOpen. SessionManagerException is the failure mode for
the strict path so callers in MxAccessGatewayService surface it as
session-open-failed.

Tests: 7 new unit tests cover the disabled lane, expression-driven
subscribe, DefaultArea fallback, success path, soft-failure (require
off), strict-failure (require on), and missing-config-strict-throw.
Server suite total: 295 pass / 0 fail. Solution builds clean.

End-to-end alarms-over-gateway path is now live (with config). Open a
session against a gateway with Alarms.Enabled=true + a valid
SubscriptionExpression; the worker's wnwrap consumer auto-subscribes;
QueryActiveAlarms streams snapshots; AcknowledgeAlarm acks by GUID.
Reference→GUID resolution (AlarmAckByName worker command) and the live
dev-rig smoke test remain follow-ups.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-01 11:10:13 -04:00
parent 9b21ca3554
commit 47b1fd422c
4 changed files with 420 additions and 0 deletions
@@ -0,0 +1,48 @@
namespace MxGateway.Server.Configuration;
/// <summary>
/// Per-gateway alarm-subsystem configuration. Drives the auto-subscribe
/// hook in <see cref="Sessions.SessionManager"/>: when
/// <see cref="Enabled"/> is true and a session reaches Ready, the
/// manager issues a <c>SubscribeAlarmsCommand</c> to the worker with
/// the configured <see cref="SubscriptionExpression"/>.
/// </summary>
/// <remarks>
/// Defaults preserve current behaviour (alarms disabled). Operators
/// opt in by setting <c>MxGateway:Alarms:Enabled = true</c> and
/// supplying a canonical
/// <c>\\&lt;machine&gt;\Galaxy!&lt;area&gt;</c> subscription
/// expression. The literal "Galaxy" provider is correct regardless of
/// the configured Galaxy database name (the wnwrap consumer doesn't
/// accept the database name as the provider).
/// </remarks>
public sealed class AlarmsOptions
{
/// <summary>Gate the auto-subscribe hook on session open. Default false.</summary>
public bool Enabled { get; init; }
/// <summary>
/// AVEVA alarm-subscription expression. When empty and
/// <see cref="Enabled"/> is true, the gateway falls back to
/// <c>\\$(MachineName)\Galaxy!$(DefaultArea)</c> if
/// <see cref="DefaultArea"/> is set; otherwise the session open
/// fails with a configuration diagnostic.
/// </summary>
public string SubscriptionExpression { get; init; } = string.Empty;
/// <summary>
/// Optional area name used to compose a default subscription when
/// <see cref="SubscriptionExpression"/> is empty. Combined with
/// <c>Environment.MachineName</c> as
/// <c>\\&lt;MachineName&gt;\Galaxy!&lt;DefaultArea&gt;</c>.
/// </summary>
public string DefaultArea { get; init; } = string.Empty;
/// <summary>
/// If true, an auto-subscribe failure faults the session. If false
/// (default), the failure is logged and the session remains Ready —
/// alarm-side commands return "not subscribed" but data subscriptions
/// work normally.
/// </summary>
public bool RequireSubscribeOnOpen { get; init; }
}
@@ -35,4 +35,11 @@ public sealed class GatewayOptions
/// Gets protocol configuration options. /// Gets protocol configuration options.
/// </summary> /// </summary>
public ProtocolOptions Protocol { get; init; } = new(); public ProtocolOptions Protocol { get; init; } = new();
/// <summary>
/// Gets alarm-subsystem configuration options. Drives the gateway's
/// auto-subscribe-on-session-open hook; default values preserve legacy
/// behaviour (alarms disabled).
/// </summary>
public AlarmsOptions Alarms { get; init; } = new();
} }
@@ -87,6 +87,8 @@ public sealed class SessionManager : ISessionManager
session.MarkReady(); session.MarkReady();
_metrics.SessionOpened(); _metrics.SessionOpened();
await TryAutoSubscribeAlarmsAsync(session, cancellationToken).ConfigureAwait(false);
return session; return session;
} }
catch (Exception exception) catch (Exception exception)
@@ -396,4 +398,101 @@ public sealed class SessionManager : ISessionManager
return Convert.ToBase64String(bytes); return Convert.ToBase64String(bytes);
} }
/// <summary>
/// If <c>Alarms.Enabled</c> is configured, issue a
/// <c>SubscribeAlarmsCommand</c> on the freshly-Ready session so the
/// worker's wnwrap consumer starts polling. Failure handling is
/// governed by <c>Alarms.RequireSubscribeOnOpen</c>:
/// <list type="bullet">
/// <item><description><c>true</c> — propagate the failure to fault the session.</description></item>
/// <item><description><c>false</c> (default) — log a warning and let the session continue serving data subscriptions.</description></item>
/// </list>
/// </summary>
private async Task TryAutoSubscribeAlarmsAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
AlarmsOptions alarms = _options.Alarms;
if (!alarms.Enabled) return;
string subscription = ResolveAlarmSubscription(alarms);
if (string.IsNullOrWhiteSpace(subscription))
{
const string diagnostic =
"Alarms.Enabled is true but no SubscriptionExpression / DefaultArea is configured.";
if (alarms.RequireSubscribeOnOpen)
{
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed, diagnostic);
}
_logger.LogWarning(
"Auto-subscribe skipped for session {SessionId}: {Diagnostic}",
session.SessionId, diagnostic);
return;
}
WorkerCommand command = new WorkerCommand
{
Command = new MxCommand
{
Kind = MxCommandKind.SubscribeAlarms,
SubscribeAlarms = new SubscribeAlarmsCommand
{
SubscriptionExpression = subscription,
},
},
EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()),
};
try
{
WorkerCommandReply reply = await session.InvokeAsync(command, cancellationToken)
.ConfigureAwait(false);
ProtocolStatusCode? code = reply.Reply?.ProtocolStatus?.Code;
if (code != ProtocolStatusCode.Ok)
{
string diagnostic = reply.Reply?.DiagnosticMessage
?? reply.Reply?.ProtocolStatus?.Message
?? "Worker rejected SubscribeAlarms.";
if (alarms.RequireSubscribeOnOpen)
{
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Auto-subscribe failed for session {session.SessionId}: {diagnostic}");
}
_logger.LogWarning(
"Auto-subscribe failed for session {SessionId} (status {StatusCode}): {Diagnostic}",
session.SessionId, code, diagnostic);
return;
}
_logger.LogInformation(
"Alarm auto-subscribe succeeded for session {SessionId} on {Subscription}.",
session.SessionId, subscription);
}
catch (SessionManagerException)
{
throw;
}
catch (Exception ex) when (!alarms.RequireSubscribeOnOpen)
{
_logger.LogWarning(
ex,
"Auto-subscribe threw for session {SessionId} on {Subscription}; alarm path remains inactive.",
session.SessionId, subscription);
}
}
private static string ResolveAlarmSubscription(AlarmsOptions alarms)
{
if (!string.IsNullOrWhiteSpace(alarms.SubscriptionExpression))
{
return alarms.SubscriptionExpression;
}
if (!string.IsNullOrWhiteSpace(alarms.DefaultArea))
{
return $@"\\{Environment.MachineName}\Galaxy!{alarms.DefaultArea}";
}
return string.Empty;
}
} }
@@ -0,0 +1,266 @@
using System.Runtime.CompilerServices;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// Pins the alarm auto-subscribe hook on session open. Runs in
/// its own file because the cases are orthogonal to
/// <see cref="SessionManagerTests"/> (alarms-disabled vs.
/// alarms-enabled lanes), and the fake worker client below verifies
/// the issued <c>SubscribeAlarms</c> command shape directly.
/// </summary>
public sealed class SessionManagerAlarmAutoSubscribeTests
{
[Fact]
public async Task OpenSessionAsync_DoesNotAutoSubscribe_WhenAlarmsDisabled()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions { Enabled = false });
await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
}
[Fact]
public async Task OpenSessionAsync_AutoSubscribes_WhenEnabledWithExpression()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
});
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
Assert.Equal(@"\\HOST\Galaxy!Area1",
worker.LastSubscribeAlarmsCommand!.SubscriptionExpression);
}
[Fact]
public async Task OpenSessionAsync_FallsBackToDefaultArea_WhenExpressionEmpty()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
DefaultArea = "DEV",
});
await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
Assert.Contains(@"\Galaxy!DEV",
worker.LastSubscribeAlarmsCommand!.SubscriptionExpression);
}
[Fact]
public async Task OpenSessionAsync_Succeeds_WhenAutoSubscribeFailsWithRequireOff()
{
// Worker rejects the SubscribeAlarms command. With RequireSubscribeOnOpen=false
// (the default), the session still opens — alarm-side commands later return
// "not subscribed", but data subscriptions work.
AlarmAutoSubscribeWorkerClient worker = new()
{
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.MxaccessFailure,
Message = "wnwrap subscribe failed",
},
DiagnosticMessage = "alarm provider unavailable",
},
};
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
RequireSubscribeOnOpen = false,
});
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
}
[Fact]
public async Task OpenSessionAsync_Throws_WhenAutoSubscribeFailsWithRequireOn()
{
AlarmAutoSubscribeWorkerClient worker = new()
{
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.MxaccessFailure,
Message = "wnwrap subscribe failed",
},
},
};
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
RequireSubscribeOnOpen = true,
});
await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None));
}
[Fact]
public async Task OpenSessionAsync_Throws_WhenEnabledButNoExpressionAndRequireOn()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
// No SubscriptionExpression and no DefaultArea.
RequireSubscribeOnOpen = true,
});
await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None));
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
}
[Fact]
public async Task OpenSessionAsync_Succeeds_WhenEnabledButNoExpressionAndRequireOff()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
// No SubscriptionExpression and no DefaultArea — default require=false.
});
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
}
private static SessionManager NewManager(
AlarmAutoSubscribeWorkerClient worker,
AlarmsOptions alarms)
{
FakeSessionWorkerClientFactory factory = new(worker);
GatewayOptions options = new GatewayOptions
{
Sessions = new SessionOptions
{
DefaultCommandTimeoutSeconds = 30,
MaxSessions = 64,
DefaultLeaseSeconds = 1800,
},
Worker = new WorkerOptions
{
StartupTimeoutSeconds = 30,
ShutdownTimeoutSeconds = 10,
},
Alarms = alarms,
};
return new SessionManager(
new SessionRegistry(),
factory,
Options.Create(options),
new GatewayMetrics());
}
private static SessionOpenRequest CreateOpenRequest()
{
return new SessionOpenRequest(
RequestedBackend: null,
ClientSessionName: "test-session",
ClientCorrelationId: "client-correlation-1",
CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5)));
}
private sealed class FakeSessionWorkerClientFactory(IWorkerClient client) : ISessionWorkerClientFactory
{
public Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
return Task.FromResult(client);
}
}
private sealed class AlarmAutoSubscribeWorkerClient : IWorkerClient
{
public string SessionId { get; } = "session-1";
public int? ProcessId { get; } = 1234;
public WorkerClientState State { get; set; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
public int SubscribeAlarmsInvokeCount { get; private set; }
public SubscribeAlarmsCommand? LastSubscribeAlarmsCommand { get; private set; }
public Func<WorkerCommand, MxCommandReply>? SubscribeAlarmsReplyFactory { get; init; }
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken)
{
if (command.Command?.Kind == MxCommandKind.SubscribeAlarms)
{
SubscribeAlarmsInvokeCount++;
LastSubscribeAlarmsCommand = command.Command.SubscribeAlarms;
MxCommandReply reply = SubscribeAlarmsReplyFactory?.Invoke(command)
?? new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
};
return Task.FromResult(new WorkerCommandReply { Reply = reply });
}
return Task.FromResult(new WorkerCommandReply
{
Reply = new MxCommandReply
{
Kind = command.Command?.Kind ?? MxCommandKind.Unspecified,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
},
});
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken)
=> Task.CompletedTask;
public void Kill(string reason) { }
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
}