gateway: alarm-RPC dispatcher seam (PRs A.6 + A.7)

Replaces the inline diagnostic strings in PR A.3's AcknowledgeAlarm
+ QueryActiveAlarms handlers with an IAlarmRpcDispatcher seam.

- IAlarmRpcDispatcher (new) — gateway-side abstraction over the
  worker-RPC path that fronts AlarmClient.AlarmAckByGUID and the
  active-alarm walk. AcknowledgeAsync returns the
  AcknowledgeAlarmReply directly; QueryActiveAlarmsAsync yields an
  IAsyncEnumerable<ActiveAlarmSnapshot>.
- NotWiredAlarmRpcDispatcher (new, default impl) — returns
  PROTOCOL_STATUS_OK with a structured worker-pending diagnostic
  on Acknowledge, yields an empty stream on QueryActiveAlarms.
  Same observable shape as PR A.3, but the integration seam is
  now in code instead of hardcoded inside the handler.
- MxAccessGatewayService — handlers delegate to the dispatcher.
  Constructor accepts an optional IAlarmRpcDispatcher (default
  NotWiredAlarmRpcDispatcher); a future WorkerAlarmRpcDispatcher
  registration in DI swaps in the live worker-IPC routing without
  changing the public RPC surface.
- 2 new dispatcher tests pin the not-wired contract; 279 → 281
  total tests, all green.

Worker-side dispatch (translating Acknowledge / QueryActiveAlarms
to the IPC method that calls IMxAccessAlarmConsumer from PR A.5)
is the dev-rig follow-up — it depends on validating the AVEVA
GetAlarmChangesCompleted event subscription against a live alarm
provider before pinning a wire format.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-30 22:47:42 -04:00
parent c7d5b83390
commit 6b3c117d1e
4 changed files with 171 additions and 13 deletions
@@ -20,8 +20,10 @@ public sealed class MxAccessGatewayService(
MxAccessGrpcMapper mapper,
IEventStreamService eventStreamService,
GatewayMetrics metrics,
ILogger<MxAccessGatewayService> logger) : MxAccessGateway.MxAccessGatewayBase
ILogger<MxAccessGatewayService> logger,
IAlarmRpcDispatcher? alarmRpcDispatcher = null) : MxAccessGateway.MxAccessGatewayBase
{
private readonly IAlarmRpcDispatcher alarmRpcDispatcher = alarmRpcDispatcher ?? new NotWiredAlarmRpcDispatcher();
/// <inheritdoc />
public override async Task<OpenSessionReply> OpenSession(
OpenSessionRequest request,
@@ -167,7 +169,7 @@ public sealed class MxAccessGatewayService(
/// UNIMPLEMENTED, so the .NET / Python / Go / Java / Rust SDK call sites land
/// on a stable surface.
/// </remarks>
public override Task<AcknowledgeAlarmReply> AcknowledgeAlarm(
public override async Task<AcknowledgeAlarmReply> AcknowledgeAlarm(
AcknowledgeAlarmRequest request,
ServerCallContext context)
{
@@ -187,13 +189,13 @@ public sealed class MxAccessGatewayService(
// gRPC NotFound by the caller's MapException.
_ = ResolveSession(request.SessionId);
return Task.FromResult(new AcknowledgeAlarmReply
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; worker dispatch pending PR A.2."),
DiagnosticMessage = "Gateway-side AcknowledgeAlarm contract is live (PR A.3); worker-side MxAccess Acknowledge call ships in PR A.2.",
});
// PR A.6 — delegate to the alarm dispatcher. NotWiredAlarmRpcDispatcher
// (default) returns OK + a worker-pending diagnostic. Production
// WorkerAlarmRpcDispatcher (dev-rig follow-up) routes through the
// worker IPC to AlarmClient.AlarmAckByGUID with full operator-identity
// fidelity.
return await alarmRpcDispatcher.AcknowledgeAsync(request, context.CancellationToken)
.ConfigureAwait(false);
}
catch (Exception exception) when (exception is not RpcException)
{
@@ -210,7 +212,7 @@ public sealed class MxAccessGatewayService(
/// handler will translate the request into a WorkerCommand and stream the
/// resulting snapshots.
/// </remarks>
public override Task QueryActiveAlarms(
public override async Task QueryActiveAlarms(
QueryActiveAlarmsRequest request,
IServerStreamWriter<ActiveAlarmSnapshot> responseStream,
ServerCallContext context)
@@ -224,9 +226,18 @@ public sealed class MxAccessGatewayService(
}
_ = ResolveSession(request.SessionId);
// Empty stream — PR A.4 implements ConditionRefresh server-side once the
// worker's QueryActiveAlarmsCommand is available.
return Task.CompletedTask;
// PR A.7 — delegate to the alarm dispatcher. NotWiredAlarmRpcDispatcher
// (default) yields an empty stream. Production WorkerAlarmRpcDispatcher
// (dev-rig follow-up) walks the worker's IMxAccessAlarmConsumer
// SnapshotActiveAlarms output and translates each AlarmRecord into an
// ActiveAlarmSnapshot.
await foreach (ActiveAlarmSnapshot snapshot in alarmRpcDispatcher
.QueryActiveAlarmsAsync(request, context.CancellationToken)
.WithCancellation(context.CancellationToken)
.ConfigureAwait(false))
{
await responseStream.WriteAsync(snapshot).ConfigureAwait(false);
}
}
catch (Exception exception) when (exception is not RpcException)
{
@@ -0,0 +1,41 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
/// <summary>
/// PR A.6 / A.7 — gateway-side dispatcher for the alarm-RPC surface.
/// Bridges the public <c>AcknowledgeAlarm</c> + <c>QueryActiveAlarms</c>
/// gRPC handlers to the worker process that hosts
/// <c>IMxAccessAlarmConsumer</c>.
/// </summary>
/// <remarks>
/// <para>
/// Production implementations live in <c>WorkerAlarmRpcDispatcher</c>
/// (this PR ships a not-yet-wired default that returns a clear
/// worker-pending diagnostic) and route through the existing
/// worker-pipe IPC. Tests inject a fake to exercise the gateway
/// handler shape without spinning up a worker process.
/// </para>
/// <para>
/// The dispatcher is session-scoped: every call resolves the
/// session and forwards to that session's worker. The handler
/// constructs the <see cref="AcknowledgeAlarmReply"/> /
/// <see cref="ActiveAlarmSnapshot"/> stream from the dispatcher's
/// output without further translation.
/// </para>
/// </remarks>
public interface IAlarmRpcDispatcher
{
/// <summary>Forward an Acknowledge to the worker that owns the session.</summary>
Task<AcknowledgeAlarmReply> AcknowledgeAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken);
/// <summary>Walk active alarms on the worker that owns the session.</summary>
IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
QueryActiveAlarmsRequest request,
CancellationToken cancellationToken);
}
@@ -0,0 +1,52 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Grpc;
namespace MxGateway.Server.Sessions;
/// <summary>
/// PR A.6 / A.7 — default <see cref="IAlarmRpcDispatcher"/> shipped while
/// the worker-side AlarmClient event subscription is gated on dev-rig
/// validation. Acknowledges with a structured "worker-pending"
/// diagnostic and yields an empty active-alarm stream.
/// </summary>
/// <remarks>
/// <para>
/// Replaces the inline diagnostic strings in
/// <c>MxAccessGatewayService.AcknowledgeAlarm</c> /
/// <c>QueryActiveAlarms</c> from PR A.3 with an injectable seam.
/// When the worker dispatcher (PR A.6/A.7 dev-rig follow-up) lands,
/// <c>WorkerAlarmRpcDispatcher</c> replaces this implementation in
/// the DI container and the same handler shape comes alive without
/// further changes to the public RPC surface.
/// </para>
/// </remarks>
public sealed class NotWiredAlarmRpcDispatcher : IAlarmRpcDispatcher
{
/// <inheritdoc />
public Task<AcknowledgeAlarmReply> AcknowledgeAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken)
{
return Task.FromResult(new AcknowledgeAlarmReply
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; worker dispatch pending dev-rig wiring."),
DiagnosticMessage = "Gateway-side AcknowledgeAlarm accepted; the worker-side AlarmClient consumer (PR A.5) is in place but the dispatcher hookup is gated on validating the AVEVA alarm-provider event subscription on the dev rig.",
});
}
/// <inheritdoc />
#pragma warning disable CS1998 // Async method lacks 'await' operators — empty stream is intentional.
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
QueryActiveAlarmsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
yield break;
}
#pragma warning restore CS1998
}
@@ -0,0 +1,54 @@
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Sessions;
namespace MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// PR A.6 / A.7 — pins the not-yet-wired dispatcher's behaviour:
/// AcknowledgeAsync returns OK with a worker-pending diagnostic and
/// QueryActiveAlarmsAsync yields an empty stream. Production
/// <c>WorkerAlarmRpcDispatcher</c> (dev-rig follow-up) replaces this
/// impl in DI without changing the gateway handler shape.
/// </summary>
public sealed class NotWiredAlarmRpcDispatcherTests
{
[Fact]
public async Task AcknowledgeAsync_returns_ok_with_worker_pending_diagnostic()
{
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
{
SessionId = "session-1",
ClientCorrelationId = "corr-1",
AlarmFullReference = "Tank01.Level.HiHi",
Comment = "investigating",
OperatorUser = "alice",
},
CancellationToken.None);
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.Equal("session-1", reply.SessionId);
Assert.Equal("corr-1", reply.CorrelationId);
Assert.Contains("worker", reply.DiagnosticMessage, System.StringComparison.OrdinalIgnoreCase);
}
[Fact]
public async Task QueryActiveAlarmsAsync_yields_no_snapshots()
{
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
int count = 0;
await foreach (ActiveAlarmSnapshot _ in dispatcher.QueryActiveAlarmsAsync(
new QueryActiveAlarmsRequest { SessionId = "session-1" },
CancellationToken.None))
{
count++;
}
Assert.Equal(0, count);
}
}