diff --git a/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs b/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs index 581a19c..87b4ad4 100644 --- a/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs +++ b/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs @@ -20,8 +20,10 @@ public sealed class MxAccessGatewayService( MxAccessGrpcMapper mapper, IEventStreamService eventStreamService, GatewayMetrics metrics, - ILogger logger) : MxAccessGateway.MxAccessGatewayBase + ILogger logger, + IAlarmRpcDispatcher? alarmRpcDispatcher = null) : MxAccessGateway.MxAccessGatewayBase { + private readonly IAlarmRpcDispatcher alarmRpcDispatcher = alarmRpcDispatcher ?? new NotWiredAlarmRpcDispatcher(); /// public override async Task OpenSession( OpenSessionRequest request, @@ -167,7 +169,7 @@ public sealed class MxAccessGatewayService( /// UNIMPLEMENTED, so the .NET / Python / Go / Java / Rust SDK call sites land /// on a stable surface. /// - public override Task AcknowledgeAlarm( + public override async Task AcknowledgeAlarm( AcknowledgeAlarmRequest request, ServerCallContext context) { @@ -187,13 +189,13 @@ public sealed class MxAccessGatewayService( // gRPC NotFound by the caller's MapException. _ = ResolveSession(request.SessionId); - return Task.FromResult(new AcknowledgeAlarmReply - { - SessionId = request.SessionId, - CorrelationId = request.ClientCorrelationId, - ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; worker dispatch pending PR A.2."), - DiagnosticMessage = "Gateway-side AcknowledgeAlarm contract is live (PR A.3); worker-side MxAccess Acknowledge call ships in PR A.2.", - }); + // PR A.6 — delegate to the alarm dispatcher. NotWiredAlarmRpcDispatcher + // (default) returns OK + a worker-pending diagnostic. Production + // WorkerAlarmRpcDispatcher (dev-rig follow-up) routes through the + // worker IPC to AlarmClient.AlarmAckByGUID with full operator-identity + // fidelity. + return await alarmRpcDispatcher.AcknowledgeAsync(request, context.CancellationToken) + .ConfigureAwait(false); } catch (Exception exception) when (exception is not RpcException) { @@ -210,7 +212,7 @@ public sealed class MxAccessGatewayService( /// handler will translate the request into a WorkerCommand and stream the /// resulting snapshots. /// - public override Task QueryActiveAlarms( + public override async Task QueryActiveAlarms( QueryActiveAlarmsRequest request, IServerStreamWriter responseStream, ServerCallContext context) @@ -224,9 +226,18 @@ public sealed class MxAccessGatewayService( } _ = ResolveSession(request.SessionId); - // Empty stream — PR A.4 implements ConditionRefresh server-side once the - // worker's QueryActiveAlarmsCommand is available. - return Task.CompletedTask; + // PR A.7 — delegate to the alarm dispatcher. NotWiredAlarmRpcDispatcher + // (default) yields an empty stream. Production WorkerAlarmRpcDispatcher + // (dev-rig follow-up) walks the worker's IMxAccessAlarmConsumer + // SnapshotActiveAlarms output and translates each AlarmRecord into an + // ActiveAlarmSnapshot. + await foreach (ActiveAlarmSnapshot snapshot in alarmRpcDispatcher + .QueryActiveAlarmsAsync(request, context.CancellationToken) + .WithCancellation(context.CancellationToken) + .ConfigureAwait(false)) + { + await responseStream.WriteAsync(snapshot).ConfigureAwait(false); + } } catch (Exception exception) when (exception is not RpcException) { diff --git a/src/MxGateway.Server/Sessions/IAlarmRpcDispatcher.cs b/src/MxGateway.Server/Sessions/IAlarmRpcDispatcher.cs new file mode 100644 index 0000000..328b774 --- /dev/null +++ b/src/MxGateway.Server/Sessions/IAlarmRpcDispatcher.cs @@ -0,0 +1,41 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Sessions; + +/// +/// PR A.6 / A.7 — gateway-side dispatcher for the alarm-RPC surface. +/// Bridges the public AcknowledgeAlarm + QueryActiveAlarms +/// gRPC handlers to the worker process that hosts +/// IMxAccessAlarmConsumer. +/// +/// +/// +/// Production implementations live in WorkerAlarmRpcDispatcher +/// (this PR ships a not-yet-wired default that returns a clear +/// worker-pending diagnostic) and route through the existing +/// worker-pipe IPC. Tests inject a fake to exercise the gateway +/// handler shape without spinning up a worker process. +/// +/// +/// The dispatcher is session-scoped: every call resolves the +/// session and forwards to that session's worker. The handler +/// constructs the / +/// stream from the dispatcher's +/// output without further translation. +/// +/// +public interface IAlarmRpcDispatcher +{ + /// Forward an Acknowledge to the worker that owns the session. + Task AcknowledgeAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken); + + /// Walk active alarms on the worker that owns the session. + IAsyncEnumerable QueryActiveAlarmsAsync( + QueryActiveAlarmsRequest request, + CancellationToken cancellationToken); +} diff --git a/src/MxGateway.Server/Sessions/NotWiredAlarmRpcDispatcher.cs b/src/MxGateway.Server/Sessions/NotWiredAlarmRpcDispatcher.cs new file mode 100644 index 0000000..78389b4 --- /dev/null +++ b/src/MxGateway.Server/Sessions/NotWiredAlarmRpcDispatcher.cs @@ -0,0 +1,52 @@ +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Grpc; + +namespace MxGateway.Server.Sessions; + +/// +/// PR A.6 / A.7 — default shipped while +/// the worker-side AlarmClient event subscription is gated on dev-rig +/// validation. Acknowledges with a structured "worker-pending" +/// diagnostic and yields an empty active-alarm stream. +/// +/// +/// +/// Replaces the inline diagnostic strings in +/// MxAccessGatewayService.AcknowledgeAlarm / +/// QueryActiveAlarms from PR A.3 with an injectable seam. +/// When the worker dispatcher (PR A.6/A.7 dev-rig follow-up) lands, +/// WorkerAlarmRpcDispatcher replaces this implementation in +/// the DI container and the same handler shape comes alive without +/// further changes to the public RPC surface. +/// +/// +public sealed class NotWiredAlarmRpcDispatcher : IAlarmRpcDispatcher +{ + /// + public Task AcknowledgeAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken) + { + return Task.FromResult(new AcknowledgeAlarmReply + { + SessionId = request.SessionId, + CorrelationId = request.ClientCorrelationId, + ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; worker dispatch pending dev-rig wiring."), + DiagnosticMessage = "Gateway-side AcknowledgeAlarm accepted; the worker-side AlarmClient consumer (PR A.5) is in place but the dispatcher hookup is gated on validating the AVEVA alarm-provider event subscription on the dev rig.", + }); + } + + /// +#pragma warning disable CS1998 // Async method lacks 'await' operators — empty stream is intentional. + public async IAsyncEnumerable QueryActiveAlarmsAsync( + QueryActiveAlarmsRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + yield break; + } +#pragma warning restore CS1998 +} diff --git a/src/MxGateway.Tests/Gateway/Sessions/NotWiredAlarmRpcDispatcherTests.cs b/src/MxGateway.Tests/Gateway/Sessions/NotWiredAlarmRpcDispatcherTests.cs new file mode 100644 index 0000000..d416110 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Sessions/NotWiredAlarmRpcDispatcherTests.cs @@ -0,0 +1,54 @@ +using System.Threading; +using System.Threading.Tasks; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Sessions; + +namespace MxGateway.Tests.Gateway.Sessions; + +/// +/// PR A.6 / A.7 — pins the not-yet-wired dispatcher's behaviour: +/// AcknowledgeAsync returns OK with a worker-pending diagnostic and +/// QueryActiveAlarmsAsync yields an empty stream. Production +/// WorkerAlarmRpcDispatcher (dev-rig follow-up) replaces this +/// impl in DI without changing the gateway handler shape. +/// +public sealed class NotWiredAlarmRpcDispatcherTests +{ + [Fact] + public async Task AcknowledgeAsync_returns_ok_with_worker_pending_diagnostic() + { + IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher(); + + AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync( + new AcknowledgeAlarmRequest + { + SessionId = "session-1", + ClientCorrelationId = "corr-1", + AlarmFullReference = "Tank01.Level.HiHi", + Comment = "investigating", + OperatorUser = "alice", + }, + CancellationToken.None); + + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Equal("session-1", reply.SessionId); + Assert.Equal("corr-1", reply.CorrelationId); + Assert.Contains("worker", reply.DiagnosticMessage, System.StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task QueryActiveAlarmsAsync_yields_no_snapshots() + { + IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher(); + + int count = 0; + await foreach (ActiveAlarmSnapshot _ in dispatcher.QueryActiveAlarmsAsync( + new QueryActiveAlarmsRequest { SessionId = "session-1" }, + CancellationToken.None)) + { + count++; + } + + Assert.Equal(0, count); + } +}