alarms-over-gateway: full pipeline (wnwrap consumer + dispatcher + IPC + auto-subscribe + ack-by-name + live smoke) #118

Merged
dohertj2 merged 16 commits from docs/alarm-client-wm-app-finding into main 2026-05-01 12:31:29 -04:00
4 changed files with 420 additions and 0 deletions
Showing only changes of commit 47b1fd422c - Show all commits
@@ -0,0 +1,48 @@
namespace MxGateway.Server.Configuration;
/// <summary>
/// Per-gateway alarm-subsystem configuration. Drives the auto-subscribe
/// hook in <see cref="Sessions.SessionManager"/>: when
/// <see cref="Enabled"/> is true and a session reaches Ready, the
/// manager issues a <c>SubscribeAlarmsCommand</c> to the worker with
/// the configured <see cref="SubscriptionExpression"/>.
/// </summary>
/// <remarks>
/// Defaults preserve current behaviour (alarms disabled). Operators
/// opt in by setting <c>MxGateway:Alarms:Enabled = true</c> and
/// supplying a canonical
/// <c>\\&lt;machine&gt;\Galaxy!&lt;area&gt;</c> subscription
/// expression. The literal "Galaxy" provider is correct regardless of
/// the configured Galaxy database name (the wnwrap consumer doesn't
/// accept the database name as the provider).
/// </remarks>
public sealed class AlarmsOptions
{
/// <summary>Gate the auto-subscribe hook on session open. Default false.</summary>
public bool Enabled { get; init; }
/// <summary>
/// AVEVA alarm-subscription expression. When empty and
/// <see cref="Enabled"/> is true, the gateway falls back to
/// <c>\\$(MachineName)\Galaxy!$(DefaultArea)</c> if
/// <see cref="DefaultArea"/> is set; otherwise the session open
/// fails with a configuration diagnostic.
/// </summary>
public string SubscriptionExpression { get; init; } = string.Empty;
/// <summary>
/// Optional area name used to compose a default subscription when
/// <see cref="SubscriptionExpression"/> is empty. Combined with
/// <c>Environment.MachineName</c> as
/// <c>\\&lt;MachineName&gt;\Galaxy!&lt;DefaultArea&gt;</c>.
/// </summary>
public string DefaultArea { get; init; } = string.Empty;
/// <summary>
/// If true, an auto-subscribe failure faults the session. If false
/// (default), the failure is logged and the session remains Ready —
/// alarm-side commands return "not subscribed" but data subscriptions
/// work normally.
/// </summary>
public bool RequireSubscribeOnOpen { get; init; }
}
@@ -35,4 +35,11 @@ public sealed class GatewayOptions
/// Gets protocol configuration options.
/// </summary>
public ProtocolOptions Protocol { get; init; } = new();
/// <summary>
/// Gets alarm-subsystem configuration options. Drives the gateway's
/// auto-subscribe-on-session-open hook; default values preserve legacy
/// behaviour (alarms disabled).
/// </summary>
public AlarmsOptions Alarms { get; init; } = new();
}
@@ -87,6 +87,8 @@ public sealed class SessionManager : ISessionManager
session.MarkReady();
_metrics.SessionOpened();
await TryAutoSubscribeAlarmsAsync(session, cancellationToken).ConfigureAwait(false);
return session;
}
catch (Exception exception)
@@ -396,4 +398,101 @@ public sealed class SessionManager : ISessionManager
return Convert.ToBase64String(bytes);
}
/// <summary>
/// If <c>Alarms.Enabled</c> is configured, issue a
/// <c>SubscribeAlarmsCommand</c> on the freshly-Ready session so the
/// worker's wnwrap consumer starts polling. Failure handling is
/// governed by <c>Alarms.RequireSubscribeOnOpen</c>:
/// <list type="bullet">
/// <item><description><c>true</c> — propagate the failure to fault the session.</description></item>
/// <item><description><c>false</c> (default) — log a warning and let the session continue serving data subscriptions.</description></item>
/// </list>
/// </summary>
private async Task TryAutoSubscribeAlarmsAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
AlarmsOptions alarms = _options.Alarms;
if (!alarms.Enabled) return;
string subscription = ResolveAlarmSubscription(alarms);
if (string.IsNullOrWhiteSpace(subscription))
{
const string diagnostic =
"Alarms.Enabled is true but no SubscriptionExpression / DefaultArea is configured.";
if (alarms.RequireSubscribeOnOpen)
{
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed, diagnostic);
}
_logger.LogWarning(
"Auto-subscribe skipped for session {SessionId}: {Diagnostic}",
session.SessionId, diagnostic);
return;
}
WorkerCommand command = new WorkerCommand
{
Command = new MxCommand
{
Kind = MxCommandKind.SubscribeAlarms,
SubscribeAlarms = new SubscribeAlarmsCommand
{
SubscriptionExpression = subscription,
},
},
EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()),
};
try
{
WorkerCommandReply reply = await session.InvokeAsync(command, cancellationToken)
.ConfigureAwait(false);
ProtocolStatusCode? code = reply.Reply?.ProtocolStatus?.Code;
if (code != ProtocolStatusCode.Ok)
{
string diagnostic = reply.Reply?.DiagnosticMessage
?? reply.Reply?.ProtocolStatus?.Message
?? "Worker rejected SubscribeAlarms.";
if (alarms.RequireSubscribeOnOpen)
{
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Auto-subscribe failed for session {session.SessionId}: {diagnostic}");
}
_logger.LogWarning(
"Auto-subscribe failed for session {SessionId} (status {StatusCode}): {Diagnostic}",
session.SessionId, code, diagnostic);
return;
}
_logger.LogInformation(
"Alarm auto-subscribe succeeded for session {SessionId} on {Subscription}.",
session.SessionId, subscription);
}
catch (SessionManagerException)
{
throw;
}
catch (Exception ex) when (!alarms.RequireSubscribeOnOpen)
{
_logger.LogWarning(
ex,
"Auto-subscribe threw for session {SessionId} on {Subscription}; alarm path remains inactive.",
session.SessionId, subscription);
}
}
private static string ResolveAlarmSubscription(AlarmsOptions alarms)
{
if (!string.IsNullOrWhiteSpace(alarms.SubscriptionExpression))
{
return alarms.SubscriptionExpression;
}
if (!string.IsNullOrWhiteSpace(alarms.DefaultArea))
{
return $@"\\{Environment.MachineName}\Galaxy!{alarms.DefaultArea}";
}
return string.Empty;
}
}
@@ -0,0 +1,266 @@
using System.Runtime.CompilerServices;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// Pins the alarm auto-subscribe hook on session open. Runs in
/// its own file because the cases are orthogonal to
/// <see cref="SessionManagerTests"/> (alarms-disabled vs.
/// alarms-enabled lanes), and the fake worker client below verifies
/// the issued <c>SubscribeAlarms</c> command shape directly.
/// </summary>
public sealed class SessionManagerAlarmAutoSubscribeTests
{
[Fact]
public async Task OpenSessionAsync_DoesNotAutoSubscribe_WhenAlarmsDisabled()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions { Enabled = false });
await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
}
[Fact]
public async Task OpenSessionAsync_AutoSubscribes_WhenEnabledWithExpression()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
});
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
Assert.Equal(@"\\HOST\Galaxy!Area1",
worker.LastSubscribeAlarmsCommand!.SubscriptionExpression);
}
[Fact]
public async Task OpenSessionAsync_FallsBackToDefaultArea_WhenExpressionEmpty()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
DefaultArea = "DEV",
});
await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
Assert.Contains(@"\Galaxy!DEV",
worker.LastSubscribeAlarmsCommand!.SubscriptionExpression);
}
[Fact]
public async Task OpenSessionAsync_Succeeds_WhenAutoSubscribeFailsWithRequireOff()
{
// Worker rejects the SubscribeAlarms command. With RequireSubscribeOnOpen=false
// (the default), the session still opens — alarm-side commands later return
// "not subscribed", but data subscriptions work.
AlarmAutoSubscribeWorkerClient worker = new()
{
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.MxaccessFailure,
Message = "wnwrap subscribe failed",
},
DiagnosticMessage = "alarm provider unavailable",
},
};
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
RequireSubscribeOnOpen = false,
});
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
}
[Fact]
public async Task OpenSessionAsync_Throws_WhenAutoSubscribeFailsWithRequireOn()
{
AlarmAutoSubscribeWorkerClient worker = new()
{
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.MxaccessFailure,
Message = "wnwrap subscribe failed",
},
},
};
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
RequireSubscribeOnOpen = true,
});
await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None));
}
[Fact]
public async Task OpenSessionAsync_Throws_WhenEnabledButNoExpressionAndRequireOn()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
// No SubscriptionExpression and no DefaultArea.
RequireSubscribeOnOpen = true,
});
await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None));
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
}
[Fact]
public async Task OpenSessionAsync_Succeeds_WhenEnabledButNoExpressionAndRequireOff()
{
AlarmAutoSubscribeWorkerClient worker = new();
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
{
Enabled = true,
// No SubscriptionExpression and no DefaultArea — default require=false.
});
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
}
private static SessionManager NewManager(
AlarmAutoSubscribeWorkerClient worker,
AlarmsOptions alarms)
{
FakeSessionWorkerClientFactory factory = new(worker);
GatewayOptions options = new GatewayOptions
{
Sessions = new SessionOptions
{
DefaultCommandTimeoutSeconds = 30,
MaxSessions = 64,
DefaultLeaseSeconds = 1800,
},
Worker = new WorkerOptions
{
StartupTimeoutSeconds = 30,
ShutdownTimeoutSeconds = 10,
},
Alarms = alarms,
};
return new SessionManager(
new SessionRegistry(),
factory,
Options.Create(options),
new GatewayMetrics());
}
private static SessionOpenRequest CreateOpenRequest()
{
return new SessionOpenRequest(
RequestedBackend: null,
ClientSessionName: "test-session",
ClientCorrelationId: "client-correlation-1",
CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5)));
}
private sealed class FakeSessionWorkerClientFactory(IWorkerClient client) : ISessionWorkerClientFactory
{
public Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
return Task.FromResult(client);
}
}
private sealed class AlarmAutoSubscribeWorkerClient : IWorkerClient
{
public string SessionId { get; } = "session-1";
public int? ProcessId { get; } = 1234;
public WorkerClientState State { get; set; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
public int SubscribeAlarmsInvokeCount { get; private set; }
public SubscribeAlarmsCommand? LastSubscribeAlarmsCommand { get; private set; }
public Func<WorkerCommand, MxCommandReply>? SubscribeAlarmsReplyFactory { get; init; }
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken)
{
if (command.Command?.Kind == MxCommandKind.SubscribeAlarms)
{
SubscribeAlarmsInvokeCount++;
LastSubscribeAlarmsCommand = command.Command.SubscribeAlarms;
MxCommandReply reply = SubscribeAlarmsReplyFactory?.Invoke(command)
?? new MxCommandReply
{
Kind = MxCommandKind.SubscribeAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
};
return Task.FromResult(new WorkerCommandReply { Reply = reply });
}
return Task.FromResult(new WorkerCommandReply
{
Reply = new MxCommandReply
{
Kind = command.Command?.Kind ?? MxCommandKind.Unspecified,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
},
});
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken)
=> Task.CompletedTask;
public void Kill(string reason) { }
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
}