From 9b21ca3554dbd342ef41213939f6cbcfd04f77d4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 1 May 2026 10:58:40 -0400 Subject: [PATCH] A.3 (gateway dispatcher): WorkerAlarmRpcDispatcher routes alarm RPCs over the worker pipe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces NotWiredAlarmRpcDispatcher in DI with a production implementation that issues the new MxCommandKind.{AcknowledgeAlarm, QueryActiveAlarms} commands across the IPC and unwraps the resulting MxCommandReply into the public RPC types. QueryActiveAlarms is fully wired: builds the QueryActiveAlarmsCommand (forwarding alarm_filter_prefix), invokes it on the resolved GatewaySession's worker client, and yields each ActiveAlarmSnapshot from the QueryActiveAlarmsReplyPayload as the RPC stream. Worker failures + missing sessions yield an empty stream — matches the ConditionRefresh contract clients already speak to. AcknowledgeAlarm is partially wired: the public RPC takes AlarmFullReference (Provider!Group.Tag), but the worker's wnwrap consumer acks by GUID. Strategy: - If AlarmFullReference parses as a canonical GUID, forward it directly through MxCommandKind.AcknowledgeAlarm. Native status flows back via MxCommandReply.Hresult and the dedicated AcknowledgeAlarmReplyPayload.NativeStatus. - Otherwise, return InvalidRequest with a clear diagnostic naming the follow-up — reference→GUID lookup needs a worker-side AlarmAckByName command wrapping wwAlarmConsumerClass.AlarmAckByName. DI: SessionServiceCollectionExtensions registers WorkerAlarmRpcDispatcher as the default IAlarmRpcDispatcher; MxAccessGatewayService picks it up via constructor injection. NotWiredAlarmRpcDispatcher is retained for test fixtures that want the no-side-effect fake. Tests: 7 new unit tests cover session-not-found short-circuit, GUID-vs- reference branching, native-status propagation, worker MxaccessFailure diagnostic propagation, and snapshot-stream yielding. Server test suite total: 288/0 fail. Solution builds clean. End-to-end alarms-over-gateway pipeline status: consumer → sink → queue (A.2 + A.3 in-process slice) worker IPC commands (A.3 worker slice) gateway dispatcher (this slice) Remaining for full E2E: - Auto-issue SubscribeAlarms on session open (or add a public SubscribeAlarms RPC). Without this trigger the consumer never starts and Acknowledge/Query return "not subscribed". - AlarmAckByName worker command for ack-by-reference. - End-to-end live test against the dev rig. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../SessionServiceCollectionExtensions.cs | 1 + .../Sessions/WorkerAlarmRpcDispatcher.cs | 172 ++++++++++ .../Sessions/WorkerAlarmRpcDispatcherTests.cs | 300 ++++++++++++++++++ 3 files changed, 473 insertions(+) create mode 100644 src/MxGateway.Server/Sessions/WorkerAlarmRpcDispatcher.cs create mode 100644 src/MxGateway.Tests/Gateway/Sessions/WorkerAlarmRpcDispatcherTests.cs diff --git a/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs b/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs index 5b7eae4..1f1e67b 100644 --- a/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs +++ b/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs @@ -11,6 +11,7 @@ public static class SessionServiceCollectionExtensions services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddHostedService(); services.AddHostedService(); diff --git a/src/MxGateway.Server/Sessions/WorkerAlarmRpcDispatcher.cs b/src/MxGateway.Server/Sessions/WorkerAlarmRpcDispatcher.cs new file mode 100644 index 0000000..89e6382 --- /dev/null +++ b/src/MxGateway.Server/Sessions/WorkerAlarmRpcDispatcher.cs @@ -0,0 +1,172 @@ +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Grpc; + +namespace MxGateway.Server.Sessions; + +/// +/// Production that routes the public +/// AcknowledgeAlarm + QueryActiveAlarms RPCs through the +/// worker pipe IPC. Replaces +/// once the worker AlarmCommandHandler is wired in. +/// +/// +/// +/// QueryActiveAlarms is fully wired: issues a +/// over the pipe and yields +/// each from the +/// . +/// +/// +/// AcknowledgeAlarm is partially wired: the public RPC's +/// is a +/// Provider!Group.Tag string, but the worker's wnwrap consumer +/// acks by GUID. When the supplied reference parses as a GUID +/// directly, the dispatcher forwards it as-is. Otherwise it +/// returns an Unimplemented diagnostic. Resolving +/// reference→GUID requires an additional worker IPC command +/// (e.g. AlarmAckByName wrapping +/// wwAlarmConsumerClass.AlarmAckByName) and is tracked as +/// a follow-up. +/// +/// +public sealed class WorkerAlarmRpcDispatcher : IAlarmRpcDispatcher +{ + private readonly ISessionRegistry sessionRegistry; + private readonly TimeProvider timeProvider; + + public WorkerAlarmRpcDispatcher(ISessionRegistry sessionRegistry, TimeProvider? timeProvider = null) + { + this.sessionRegistry = sessionRegistry ?? throw new System.ArgumentNullException(nameof(sessionRegistry)); + this.timeProvider = timeProvider ?? TimeProvider.System; + } + + /// + public async Task AcknowledgeAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken) + { + if (request is null) throw new System.ArgumentNullException(nameof(request)); + + if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session)) + { + return new AcknowledgeAlarmReply + { + SessionId = request.SessionId, + CorrelationId = request.ClientCorrelationId, + ProtocolStatus = MxAccessGrpcMapper.SessionNotFound( + $"Session '{request.SessionId}' not found."), + DiagnosticMessage = "AcknowledgeAlarm: session not found.", + }; + } + + if (!System.Guid.TryParse(request.AlarmFullReference, out System.Guid guid)) + { + // Reference→GUID lookup not yet implemented. Surface a clear + // diagnostic so client teams can plumb the reference parser + // when the worker AlarmAckByName command lands. + return new AcknowledgeAlarmReply + { + SessionId = request.SessionId, + CorrelationId = request.ClientCorrelationId, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.InvalidRequest, + Message = "AlarmFullReference must currently be a canonical GUID; reference→GUID lookup is pending the AlarmAckByName worker command.", + }, + DiagnosticMessage = $"AcknowledgeAlarm received non-GUID reference '{request.AlarmFullReference}'.", + }; + } + + WorkerCommand workerCommand = new WorkerCommand + { + Command = new MxCommand + { + Kind = MxCommandKind.AcknowledgeAlarm, + AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand + { + AlarmGuid = guid.ToString(), + Comment = request.Comment ?? string.Empty, + OperatorUser = request.OperatorUser ?? string.Empty, + // Operator node/domain/full-name are not on the public + // RPC surface today; pass empty strings so the worker + // honours the existing AcknowledgeAlarmCommand schema. + OperatorNode = string.Empty, + OperatorDomain = string.Empty, + OperatorFullName = string.Empty, + }, + }, + EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()), + }; + + WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken) + .ConfigureAwait(false); + + MxCommandReply mxReply = workerReply.Reply ?? new MxCommandReply + { + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.ProtocolViolation, + Message = "Worker reply did not include an MxCommandReply.", + }, + }; + + AcknowledgeAlarmReply reply = new AcknowledgeAlarmReply + { + SessionId = request.SessionId, + CorrelationId = request.ClientCorrelationId, + ProtocolStatus = mxReply.ProtocolStatus ?? MxAccessGrpcMapper.Ok(), + DiagnosticMessage = mxReply.DiagnosticMessage ?? string.Empty, + }; + if (mxReply.HasHresult) + { + reply.Hresult = mxReply.Hresult; + } + return reply; + } + + /// + public async IAsyncEnumerable QueryActiveAlarmsAsync( + QueryActiveAlarmsRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + if (request is null) throw new System.ArgumentNullException(nameof(request)); + + if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session)) + { + yield break; + } + + WorkerCommand workerCommand = new WorkerCommand + { + Command = new MxCommand + { + Kind = MxCommandKind.QueryActiveAlarms, + QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand + { + AlarmFilterPrefix = request.AlarmFilterPrefix ?? string.Empty, + }, + }, + EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()), + }; + + WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken) + .ConfigureAwait(false); + + MxCommandReply? mxReply = workerReply.Reply; + if (mxReply?.ProtocolStatus?.Code != ProtocolStatusCode.Ok) yield break; + + QueryActiveAlarmsReplyPayload? payload = mxReply.QueryActiveAlarms; + if (payload is null) yield break; + + foreach (ActiveAlarmSnapshot snapshot in payload.Snapshots) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return snapshot; + } + } +} diff --git a/src/MxGateway.Tests/Gateway/Sessions/WorkerAlarmRpcDispatcherTests.cs b/src/MxGateway.Tests/Gateway/Sessions/WorkerAlarmRpcDispatcherTests.cs new file mode 100644 index 0000000..ae56f0b --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Sessions/WorkerAlarmRpcDispatcherTests.cs @@ -0,0 +1,300 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Sessions; + +/// +/// Pins the production 's behaviour: +/// resolves the session by id, issues the matching MxCommand over the +/// worker pipe, and unwraps the reply into AcknowledgeAlarmReply or the +/// ActiveAlarmSnapshot stream. +/// +public sealed class WorkerAlarmRpcDispatcherTests +{ + [Fact] + public async Task AcknowledgeAsync_returns_session_not_found_when_session_missing() + { + SessionRegistry registry = new SessionRegistry(); + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync( + new AcknowledgeAlarmRequest + { + SessionId = "missing", + ClientCorrelationId = "c1", + AlarmFullReference = Guid.NewGuid().ToString(), + }, + CancellationToken.None); + + Assert.Equal(ProtocolStatusCode.SessionNotFound, reply.ProtocolStatus.Code); + } + + [Fact] + public async Task AcknowledgeAsync_returns_invalid_request_when_reference_is_not_a_guid() + { + SessionRegistry registry = new SessionRegistry(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient(); + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync( + new AcknowledgeAlarmRequest + { + SessionId = "s1", + ClientCorrelationId = "c1", + AlarmFullReference = "Galaxy!Area.Tag", // not a GUID + Comment = "x", + OperatorUser = "u", + }, + CancellationToken.None); + + Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code); + Assert.Equal(0, worker.InvokeCount); // dispatcher short-circuits before the IPC. + } + + [Fact] + public async Task AcknowledgeAsync_forwards_guid_and_returns_native_status() + { + SessionRegistry registry = new SessionRegistry(); + Guid alarmGuid = Guid.NewGuid(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient + { + ReplyFactory = command => + { + Assert.Equal(MxCommandKind.AcknowledgeAlarm, command.Command.Kind); + Assert.Equal(alarmGuid.ToString(), command.Command.AcknowledgeAlarmCommand.AlarmGuid); + Assert.Equal("ack", command.Command.AcknowledgeAlarmCommand.Comment); + Assert.Equal("alice", command.Command.AcknowledgeAlarmCommand.OperatorUser); + return new MxCommandReply + { + Kind = MxCommandKind.AcknowledgeAlarm, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" }, + Hresult = 0, + AcknowledgeAlarm = new AcknowledgeAlarmReplyPayload { NativeStatus = 0 }, + }; + }, + }; + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync( + new AcknowledgeAlarmRequest + { + SessionId = "s1", + ClientCorrelationId = "c1", + AlarmFullReference = alarmGuid.ToString(), + Comment = "ack", + OperatorUser = "alice", + }, + CancellationToken.None); + + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Equal(0, reply.Hresult); + Assert.Equal("s1", reply.SessionId); + Assert.Equal("c1", reply.CorrelationId); + Assert.Equal(1, worker.InvokeCount); + } + + [Fact] + public async Task AcknowledgeAsync_propagates_worker_diagnostic_on_failure() + { + SessionRegistry registry = new SessionRegistry(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient + { + ReplyFactory = _ => new MxCommandReply + { + Kind = MxCommandKind.AcknowledgeAlarm, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.MxaccessFailure, + Message = "AVEVA Acknowledge failed.", + }, + Hresult = -123, + DiagnosticMessage = "AVEVA AlarmAckByGUID returned non-zero status -123.", + }, + }; + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync( + new AcknowledgeAlarmRequest + { + SessionId = "s1", + AlarmFullReference = Guid.NewGuid().ToString(), + }, + CancellationToken.None); + + Assert.Equal(ProtocolStatusCode.MxaccessFailure, reply.ProtocolStatus.Code); + Assert.Equal(-123, reply.Hresult); + Assert.Contains("-123", reply.DiagnosticMessage); + } + + [Fact] + public async Task QueryActiveAlarmsAsync_yields_each_snapshot_from_payload() + { + SessionRegistry registry = new SessionRegistry(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient + { + ReplyFactory = command => + { + Assert.Equal(MxCommandKind.QueryActiveAlarms, command.Command.Kind); + QueryActiveAlarmsReplyPayload payload = new QueryActiveAlarmsReplyPayload(); + payload.Snapshots.Add(new ActiveAlarmSnapshot + { + AlarmFullReference = "Galaxy!A.T1", + CurrentState = AlarmConditionState.Active, + Severity = 500, + }); + payload.Snapshots.Add(new ActiveAlarmSnapshot + { + AlarmFullReference = "Galaxy!A.T2", + CurrentState = AlarmConditionState.ActiveAcked, + Severity = 100, + }); + return new MxCommandReply + { + Kind = MxCommandKind.QueryActiveAlarms, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" }, + QueryActiveAlarms = payload, + }; + }, + }; + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + List collected = new List(); + await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync( + new QueryActiveAlarmsRequest { SessionId = "s1" }, + CancellationToken.None)) + { + collected.Add(snap); + } + + Assert.Equal(2, collected.Count); + Assert.Equal("Galaxy!A.T1", collected[0].AlarmFullReference); + Assert.Equal("Galaxy!A.T2", collected[1].AlarmFullReference); + } + + [Fact] + public async Task QueryActiveAlarmsAsync_yields_empty_when_session_missing() + { + SessionRegistry registry = new SessionRegistry(); + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + List collected = new List(); + await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync( + new QueryActiveAlarmsRequest { SessionId = "missing" }, + CancellationToken.None)) + { + collected.Add(snap); + } + + Assert.Empty(collected); + } + + [Fact] + public async Task QueryActiveAlarmsAsync_yields_empty_on_worker_failure() + { + SessionRegistry registry = new SessionRegistry(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient + { + ReplyFactory = _ => new MxCommandReply + { + Kind = MxCommandKind.QueryActiveAlarms, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.MxaccessFailure, + Message = "alarm consumer not subscribed", + }, + }, + }; + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + List collected = new List(); + await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync( + new QueryActiveAlarmsRequest { SessionId = "s1" }, + CancellationToken.None)) + { + collected.Add(snap); + } + + Assert.Empty(collected); + } + + private static GatewaySession NewSession(string sessionId) + { + return new GatewaySession( + sessionId, + "mxaccess", + $"mxaccess-gateway-1-{sessionId}", + "nonce", + "client-1", + "test-session", + "client-correlation-1", + commandTimeout: TimeSpan.FromSeconds(30), + startupTimeout: TimeSpan.FromSeconds(5), + shutdownTimeout: TimeSpan.FromSeconds(5), + leaseDuration: TimeSpan.FromMinutes(30), + openedAt: DateTimeOffset.UtcNow); + } + + private sealed class FakeAlarmWorkerClient : IWorkerClient + { + public string SessionId { get; } = "session-1"; + public int? ProcessId { get; } = 1; + public WorkerClientState State { get; } = WorkerClientState.Ready; + public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; + + public Func? ReplyFactory { get; set; } + public int InvokeCount { get; private set; } + + public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; + + public Task InvokeAsync( + WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken) + { + InvokeCount++; + MxCommandReply reply = ReplyFactory?.Invoke(command) ?? new MxCommandReply(); + return Task.FromResult(new WorkerCommandReply { Reply = reply }); + } + + public async IAsyncEnumerable ReadEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask; + yield break; + } + + public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask; + public void Kill(string reason) { } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } +}