gateway: alarm-RPC dispatcher seam (PRs A.6 + A.7) #117
@@ -20,8 +20,10 @@ public sealed class MxAccessGatewayService(
|
||||
MxAccessGrpcMapper mapper,
|
||||
IEventStreamService eventStreamService,
|
||||
GatewayMetrics metrics,
|
||||
ILogger<MxAccessGatewayService> logger) : MxAccessGateway.MxAccessGatewayBase
|
||||
ILogger<MxAccessGatewayService> logger,
|
||||
IAlarmRpcDispatcher? alarmRpcDispatcher = null) : MxAccessGateway.MxAccessGatewayBase
|
||||
{
|
||||
private readonly IAlarmRpcDispatcher alarmRpcDispatcher = alarmRpcDispatcher ?? new NotWiredAlarmRpcDispatcher();
|
||||
/// <inheritdoc />
|
||||
public override async Task<OpenSessionReply> OpenSession(
|
||||
OpenSessionRequest request,
|
||||
@@ -167,7 +169,7 @@ public sealed class MxAccessGatewayService(
|
||||
/// UNIMPLEMENTED, so the .NET / Python / Go / Java / Rust SDK call sites land
|
||||
/// on a stable surface.
|
||||
/// </remarks>
|
||||
public override Task<AcknowledgeAlarmReply> AcknowledgeAlarm(
|
||||
public override async Task<AcknowledgeAlarmReply> AcknowledgeAlarm(
|
||||
AcknowledgeAlarmRequest request,
|
||||
ServerCallContext context)
|
||||
{
|
||||
@@ -187,13 +189,13 @@ public sealed class MxAccessGatewayService(
|
||||
// gRPC NotFound by the caller's MapException.
|
||||
_ = ResolveSession(request.SessionId);
|
||||
|
||||
return Task.FromResult(new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = request.SessionId,
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; worker dispatch pending PR A.2."),
|
||||
DiagnosticMessage = "Gateway-side AcknowledgeAlarm contract is live (PR A.3); worker-side MxAccess Acknowledge call ships in PR A.2.",
|
||||
});
|
||||
// PR A.6 — delegate to the alarm dispatcher. NotWiredAlarmRpcDispatcher
|
||||
// (default) returns OK + a worker-pending diagnostic. Production
|
||||
// WorkerAlarmRpcDispatcher (dev-rig follow-up) routes through the
|
||||
// worker IPC to AlarmClient.AlarmAckByGUID with full operator-identity
|
||||
// fidelity.
|
||||
return await alarmRpcDispatcher.AcknowledgeAsync(request, context.CancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception exception) when (exception is not RpcException)
|
||||
{
|
||||
@@ -210,7 +212,7 @@ public sealed class MxAccessGatewayService(
|
||||
/// handler will translate the request into a WorkerCommand and stream the
|
||||
/// resulting snapshots.
|
||||
/// </remarks>
|
||||
public override Task QueryActiveAlarms(
|
||||
public override async Task QueryActiveAlarms(
|
||||
QueryActiveAlarmsRequest request,
|
||||
IServerStreamWriter<ActiveAlarmSnapshot> responseStream,
|
||||
ServerCallContext context)
|
||||
@@ -224,9 +226,18 @@ public sealed class MxAccessGatewayService(
|
||||
}
|
||||
_ = ResolveSession(request.SessionId);
|
||||
|
||||
// Empty stream — PR A.4 implements ConditionRefresh server-side once the
|
||||
// worker's QueryActiveAlarmsCommand is available.
|
||||
return Task.CompletedTask;
|
||||
// PR A.7 — delegate to the alarm dispatcher. NotWiredAlarmRpcDispatcher
|
||||
// (default) yields an empty stream. Production WorkerAlarmRpcDispatcher
|
||||
// (dev-rig follow-up) walks the worker's IMxAccessAlarmConsumer
|
||||
// SnapshotActiveAlarms output and translates each AlarmRecord into an
|
||||
// ActiveAlarmSnapshot.
|
||||
await foreach (ActiveAlarmSnapshot snapshot in alarmRpcDispatcher
|
||||
.QueryActiveAlarmsAsync(request, context.CancellationToken)
|
||||
.WithCancellation(context.CancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
await responseStream.WriteAsync(snapshot).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (Exception exception) when (exception is not RpcException)
|
||||
{
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Server.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// PR A.6 / A.7 — gateway-side dispatcher for the alarm-RPC surface.
|
||||
/// Bridges the public <c>AcknowledgeAlarm</c> + <c>QueryActiveAlarms</c>
|
||||
/// gRPC handlers to the worker process that hosts
|
||||
/// <c>IMxAccessAlarmConsumer</c>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Production implementations live in <c>WorkerAlarmRpcDispatcher</c>
|
||||
/// (this PR ships a not-yet-wired default that returns a clear
|
||||
/// worker-pending diagnostic) and route through the existing
|
||||
/// worker-pipe IPC. Tests inject a fake to exercise the gateway
|
||||
/// handler shape without spinning up a worker process.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The dispatcher is session-scoped: every call resolves the
|
||||
/// session and forwards to that session's worker. The handler
|
||||
/// constructs the <see cref="AcknowledgeAlarmReply"/> /
|
||||
/// <see cref="ActiveAlarmSnapshot"/> stream from the dispatcher's
|
||||
/// output without further translation.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public interface IAlarmRpcDispatcher
|
||||
{
|
||||
/// <summary>Forward an Acknowledge to the worker that owns the session.</summary>
|
||||
Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||
AcknowledgeAlarmRequest request,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>Walk active alarms on the worker that owns the session.</summary>
|
||||
IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||
QueryActiveAlarmsRequest request,
|
||||
CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Grpc;
|
||||
|
||||
namespace MxGateway.Server.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// PR A.6 / A.7 — default <see cref="IAlarmRpcDispatcher"/> shipped while
|
||||
/// the worker-side AlarmClient event subscription is gated on dev-rig
|
||||
/// validation. Acknowledges with a structured "worker-pending"
|
||||
/// diagnostic and yields an empty active-alarm stream.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Replaces the inline diagnostic strings in
|
||||
/// <c>MxAccessGatewayService.AcknowledgeAlarm</c> /
|
||||
/// <c>QueryActiveAlarms</c> from PR A.3 with an injectable seam.
|
||||
/// When the worker dispatcher (PR A.6/A.7 dev-rig follow-up) lands,
|
||||
/// <c>WorkerAlarmRpcDispatcher</c> replaces this implementation in
|
||||
/// the DI container and the same handler shape comes alive without
|
||||
/// further changes to the public RPC surface.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class NotWiredAlarmRpcDispatcher : IAlarmRpcDispatcher
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||
AcknowledgeAlarmRequest request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = request.SessionId,
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; worker dispatch pending dev-rig wiring."),
|
||||
DiagnosticMessage = "Gateway-side AcknowledgeAlarm accepted; the worker-side AlarmClient consumer (PR A.5) is in place but the dispatcher hookup is gated on validating the AVEVA alarm-provider event subscription on the dev rig.",
|
||||
});
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
#pragma warning disable CS1998 // Async method lacks 'await' operators — empty stream is intentional.
|
||||
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||
QueryActiveAlarmsRequest request,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
#pragma warning restore CS1998
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Sessions;
|
||||
|
||||
namespace MxGateway.Tests.Gateway.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// PR A.6 / A.7 — pins the not-yet-wired dispatcher's behaviour:
|
||||
/// AcknowledgeAsync returns OK with a worker-pending diagnostic and
|
||||
/// QueryActiveAlarmsAsync yields an empty stream. Production
|
||||
/// <c>WorkerAlarmRpcDispatcher</c> (dev-rig follow-up) replaces this
|
||||
/// impl in DI without changing the gateway handler shape.
|
||||
/// </summary>
|
||||
public sealed class NotWiredAlarmRpcDispatcherTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_returns_ok_with_worker_pending_diagnostic()
|
||||
{
|
||||
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "session-1",
|
||||
ClientCorrelationId = "corr-1",
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
Comment = "investigating",
|
||||
OperatorUser = "alice",
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||
Assert.Equal("session-1", reply.SessionId);
|
||||
Assert.Equal("corr-1", reply.CorrelationId);
|
||||
Assert.Contains("worker", reply.DiagnosticMessage, System.StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarmsAsync_yields_no_snapshots()
|
||||
{
|
||||
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
|
||||
|
||||
int count = 0;
|
||||
await foreach (ActiveAlarmSnapshot _ in dispatcher.QueryActiveAlarmsAsync(
|
||||
new QueryActiveAlarmsRequest { SessionId = "session-1" },
|
||||
CancellationToken.None))
|
||||
{
|
||||
count++;
|
||||
}
|
||||
|
||||
Assert.Equal(0, count);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user