A.3 (gateway dispatcher): WorkerAlarmRpcDispatcher routes alarm RPCs over the worker pipe

Replaces NotWiredAlarmRpcDispatcher in DI with a production
implementation that issues the new MxCommandKind.{AcknowledgeAlarm,
QueryActiveAlarms} commands across the IPC and unwraps the resulting
MxCommandReply into the public RPC types.

QueryActiveAlarms is fully wired: builds the QueryActiveAlarmsCommand
(forwarding alarm_filter_prefix), invokes it on the resolved
GatewaySession's worker client, and yields each ActiveAlarmSnapshot
from the QueryActiveAlarmsReplyPayload as the RPC stream. Worker
failures + missing sessions yield an empty stream — matches the
ConditionRefresh contract clients already speak to.

AcknowledgeAlarm is partially wired: the public RPC takes
AlarmFullReference (Provider!Group.Tag), but the worker's wnwrap
consumer acks by GUID. Strategy:
- If AlarmFullReference parses as a canonical GUID, forward it
  directly through MxCommandKind.AcknowledgeAlarm. Native status
  flows back via MxCommandReply.Hresult and the dedicated
  AcknowledgeAlarmReplyPayload.NativeStatus.
- Otherwise, return InvalidRequest with a clear diagnostic naming the
  follow-up — reference→GUID lookup needs a worker-side AlarmAckByName
  command wrapping wwAlarmConsumerClass.AlarmAckByName.

DI: SessionServiceCollectionExtensions registers WorkerAlarmRpcDispatcher
as the default IAlarmRpcDispatcher; MxAccessGatewayService picks it up
via constructor injection. NotWiredAlarmRpcDispatcher is retained for
test fixtures that want the no-side-effect fake.

Tests: 7 new unit tests cover session-not-found short-circuit, GUID-vs-
reference branching, native-status propagation, worker MxaccessFailure
diagnostic propagation, and snapshot-stream yielding. Server test
suite total: 288/0 fail. Solution builds clean.

End-to-end alarms-over-gateway pipeline status:
  consumer → sink → queue (A.2 + A.3 in-process slice)
  worker IPC commands (A.3 worker slice)
  gateway dispatcher (this slice)

Remaining for full E2E:
  - Auto-issue SubscribeAlarms on session open (or add a public
    SubscribeAlarms RPC). Without this trigger the consumer never
    starts and Acknowledge/Query return "not subscribed".
  - AlarmAckByName worker command for ack-by-reference.
  - End-to-end live test against the dev rig.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-01 10:58:40 -04:00
parent 01f5e6ad91
commit 9b21ca3554
3 changed files with 473 additions and 0 deletions
@@ -11,6 +11,7 @@ public static class SessionServiceCollectionExtensions
services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>();
services.AddSingleton<IAlarmRpcDispatcher, WorkerAlarmRpcDispatcher>();
services.AddHostedService<SessionLeaseMonitorHostedService>();
services.AddHostedService<SessionShutdownHostedService>();
@@ -0,0 +1,172 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Grpc;
namespace MxGateway.Server.Sessions;
/// <summary>
/// Production <see cref="IAlarmRpcDispatcher"/> that routes the public
/// <c>AcknowledgeAlarm</c> + <c>QueryActiveAlarms</c> RPCs through the
/// worker pipe IPC. Replaces <see cref="NotWiredAlarmRpcDispatcher"/>
/// once the worker AlarmCommandHandler is wired in.
/// </summary>
/// <remarks>
/// <para>
/// <c>QueryActiveAlarms</c> is fully wired: issues a
/// <see cref="QueryActiveAlarmsCommand"/> over the pipe and yields
/// each <see cref="ActiveAlarmSnapshot"/> from the
/// <see cref="QueryActiveAlarmsReplyPayload"/>.
/// </para>
/// <para>
/// <c>AcknowledgeAlarm</c> is partially wired: the public RPC's
/// <see cref="AcknowledgeAlarmRequest.AlarmFullReference"/> is a
/// <c>Provider!Group.Tag</c> string, but the worker's wnwrap consumer
/// acks by GUID. When the supplied reference parses as a GUID
/// directly, the dispatcher forwards it as-is. Otherwise it
/// returns an <c>Unimplemented</c> diagnostic. Resolving
/// reference→GUID requires an additional worker IPC command
/// (e.g. <c>AlarmAckByName</c> wrapping
/// <c>wwAlarmConsumerClass.AlarmAckByName</c>) and is tracked as
/// a follow-up.
/// </para>
/// </remarks>
public sealed class WorkerAlarmRpcDispatcher : IAlarmRpcDispatcher
{
private readonly ISessionRegistry sessionRegistry;
private readonly TimeProvider timeProvider;
public WorkerAlarmRpcDispatcher(ISessionRegistry sessionRegistry, TimeProvider? timeProvider = null)
{
this.sessionRegistry = sessionRegistry ?? throw new System.ArgumentNullException(nameof(sessionRegistry));
this.timeProvider = timeProvider ?? TimeProvider.System;
}
/// <inheritdoc />
public async Task<AcknowledgeAlarmReply> AcknowledgeAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken)
{
if (request is null) throw new System.ArgumentNullException(nameof(request));
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session))
{
return new AcknowledgeAlarmReply
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
ProtocolStatus = MxAccessGrpcMapper.SessionNotFound(
$"Session '{request.SessionId}' not found."),
DiagnosticMessage = "AcknowledgeAlarm: session not found.",
};
}
if (!System.Guid.TryParse(request.AlarmFullReference, out System.Guid guid))
{
// Reference→GUID lookup not yet implemented. Surface a clear
// diagnostic so client teams can plumb the reference parser
// when the worker AlarmAckByName command lands.
return new AcknowledgeAlarmReply
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.InvalidRequest,
Message = "AlarmFullReference must currently be a canonical GUID; reference→GUID lookup is pending the AlarmAckByName worker command.",
},
DiagnosticMessage = $"AcknowledgeAlarm received non-GUID reference '{request.AlarmFullReference}'.",
};
}
WorkerCommand workerCommand = new WorkerCommand
{
Command = new MxCommand
{
Kind = MxCommandKind.AcknowledgeAlarm,
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
{
AlarmGuid = guid.ToString(),
Comment = request.Comment ?? string.Empty,
OperatorUser = request.OperatorUser ?? string.Empty,
// Operator node/domain/full-name are not on the public
// RPC surface today; pass empty strings so the worker
// honours the existing AcknowledgeAlarmCommand schema.
OperatorNode = string.Empty,
OperatorDomain = string.Empty,
OperatorFullName = string.Empty,
},
},
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
};
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
.ConfigureAwait(false);
MxCommandReply mxReply = workerReply.Reply ?? new MxCommandReply
{
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.ProtocolViolation,
Message = "Worker reply did not include an MxCommandReply.",
},
};
AcknowledgeAlarmReply reply = new AcknowledgeAlarmReply
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
ProtocolStatus = mxReply.ProtocolStatus ?? MxAccessGrpcMapper.Ok(),
DiagnosticMessage = mxReply.DiagnosticMessage ?? string.Empty,
};
if (mxReply.HasHresult)
{
reply.Hresult = mxReply.Hresult;
}
return reply;
}
/// <inheritdoc />
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
QueryActiveAlarmsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (request is null) throw new System.ArgumentNullException(nameof(request));
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session))
{
yield break;
}
WorkerCommand workerCommand = new WorkerCommand
{
Command = new MxCommand
{
Kind = MxCommandKind.QueryActiveAlarms,
QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand
{
AlarmFilterPrefix = request.AlarmFilterPrefix ?? string.Empty,
},
},
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
};
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
.ConfigureAwait(false);
MxCommandReply? mxReply = workerReply.Reply;
if (mxReply?.ProtocolStatus?.Code != ProtocolStatusCode.Ok) yield break;
QueryActiveAlarmsReplyPayload? payload = mxReply.QueryActiveAlarms;
if (payload is null) yield break;
foreach (ActiveAlarmSnapshot snapshot in payload.Snapshots)
{
cancellationToken.ThrowIfCancellationRequested();
yield return snapshot;
}
}
}