Merge pull request 'gateway: alarm-RPC dispatcher seam (PRs A.6 + A.7)' (#117) from track-a6-a7-alarm-rpc-dispatch into main
This commit was merged in pull request #117.
This commit is contained in:
@@ -20,8 +20,10 @@ public sealed class MxAccessGatewayService(
|
|||||||
MxAccessGrpcMapper mapper,
|
MxAccessGrpcMapper mapper,
|
||||||
IEventStreamService eventStreamService,
|
IEventStreamService eventStreamService,
|
||||||
GatewayMetrics metrics,
|
GatewayMetrics metrics,
|
||||||
ILogger<MxAccessGatewayService> logger) : MxAccessGateway.MxAccessGatewayBase
|
ILogger<MxAccessGatewayService> logger,
|
||||||
|
IAlarmRpcDispatcher? alarmRpcDispatcher = null) : MxAccessGateway.MxAccessGatewayBase
|
||||||
{
|
{
|
||||||
|
private readonly IAlarmRpcDispatcher alarmRpcDispatcher = alarmRpcDispatcher ?? new NotWiredAlarmRpcDispatcher();
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override async Task<OpenSessionReply> OpenSession(
|
public override async Task<OpenSessionReply> OpenSession(
|
||||||
OpenSessionRequest request,
|
OpenSessionRequest request,
|
||||||
@@ -167,7 +169,7 @@ public sealed class MxAccessGatewayService(
|
|||||||
/// UNIMPLEMENTED, so the .NET / Python / Go / Java / Rust SDK call sites land
|
/// UNIMPLEMENTED, so the .NET / Python / Go / Java / Rust SDK call sites land
|
||||||
/// on a stable surface.
|
/// on a stable surface.
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
public override Task<AcknowledgeAlarmReply> AcknowledgeAlarm(
|
public override async Task<AcknowledgeAlarmReply> AcknowledgeAlarm(
|
||||||
AcknowledgeAlarmRequest request,
|
AcknowledgeAlarmRequest request,
|
||||||
ServerCallContext context)
|
ServerCallContext context)
|
||||||
{
|
{
|
||||||
@@ -187,13 +189,13 @@ public sealed class MxAccessGatewayService(
|
|||||||
// gRPC NotFound by the caller's MapException.
|
// gRPC NotFound by the caller's MapException.
|
||||||
_ = ResolveSession(request.SessionId);
|
_ = ResolveSession(request.SessionId);
|
||||||
|
|
||||||
return Task.FromResult(new AcknowledgeAlarmReply
|
// PR A.6 — delegate to the alarm dispatcher. NotWiredAlarmRpcDispatcher
|
||||||
{
|
// (default) returns OK + a worker-pending diagnostic. Production
|
||||||
SessionId = request.SessionId,
|
// WorkerAlarmRpcDispatcher (dev-rig follow-up) routes through the
|
||||||
CorrelationId = request.ClientCorrelationId,
|
// worker IPC to AlarmClient.AlarmAckByGUID with full operator-identity
|
||||||
ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; worker dispatch pending PR A.2."),
|
// fidelity.
|
||||||
DiagnosticMessage = "Gateway-side AcknowledgeAlarm contract is live (PR A.3); worker-side MxAccess Acknowledge call ships in PR A.2.",
|
return await alarmRpcDispatcher.AcknowledgeAsync(request, context.CancellationToken)
|
||||||
});
|
.ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
catch (Exception exception) when (exception is not RpcException)
|
catch (Exception exception) when (exception is not RpcException)
|
||||||
{
|
{
|
||||||
@@ -210,7 +212,7 @@ public sealed class MxAccessGatewayService(
|
|||||||
/// handler will translate the request into a WorkerCommand and stream the
|
/// handler will translate the request into a WorkerCommand and stream the
|
||||||
/// resulting snapshots.
|
/// resulting snapshots.
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
public override Task QueryActiveAlarms(
|
public override async Task QueryActiveAlarms(
|
||||||
QueryActiveAlarmsRequest request,
|
QueryActiveAlarmsRequest request,
|
||||||
IServerStreamWriter<ActiveAlarmSnapshot> responseStream,
|
IServerStreamWriter<ActiveAlarmSnapshot> responseStream,
|
||||||
ServerCallContext context)
|
ServerCallContext context)
|
||||||
@@ -224,9 +226,18 @@ public sealed class MxAccessGatewayService(
|
|||||||
}
|
}
|
||||||
_ = ResolveSession(request.SessionId);
|
_ = ResolveSession(request.SessionId);
|
||||||
|
|
||||||
// Empty stream — PR A.4 implements ConditionRefresh server-side once the
|
// PR A.7 — delegate to the alarm dispatcher. NotWiredAlarmRpcDispatcher
|
||||||
// worker's QueryActiveAlarmsCommand is available.
|
// (default) yields an empty stream. Production WorkerAlarmRpcDispatcher
|
||||||
return Task.CompletedTask;
|
// (dev-rig follow-up) walks the worker's IMxAccessAlarmConsumer
|
||||||
|
// SnapshotActiveAlarms output and translates each AlarmRecord into an
|
||||||
|
// ActiveAlarmSnapshot.
|
||||||
|
await foreach (ActiveAlarmSnapshot snapshot in alarmRpcDispatcher
|
||||||
|
.QueryActiveAlarmsAsync(request, context.CancellationToken)
|
||||||
|
.WithCancellation(context.CancellationToken)
|
||||||
|
.ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
await responseStream.WriteAsync(snapshot).ConfigureAwait(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception exception) when (exception is not RpcException)
|
catch (Exception exception) when (exception is not RpcException)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -0,0 +1,41 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// PR A.6 / A.7 — gateway-side dispatcher for the alarm-RPC surface.
|
||||||
|
/// Bridges the public <c>AcknowledgeAlarm</c> + <c>QueryActiveAlarms</c>
|
||||||
|
/// gRPC handlers to the worker process that hosts
|
||||||
|
/// <c>IMxAccessAlarmConsumer</c>.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// Production implementations live in <c>WorkerAlarmRpcDispatcher</c>
|
||||||
|
/// (this PR ships a not-yet-wired default that returns a clear
|
||||||
|
/// worker-pending diagnostic) and route through the existing
|
||||||
|
/// worker-pipe IPC. Tests inject a fake to exercise the gateway
|
||||||
|
/// handler shape without spinning up a worker process.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// The dispatcher is session-scoped: every call resolves the
|
||||||
|
/// session and forwards to that session's worker. The handler
|
||||||
|
/// constructs the <see cref="AcknowledgeAlarmReply"/> /
|
||||||
|
/// <see cref="ActiveAlarmSnapshot"/> stream from the dispatcher's
|
||||||
|
/// output without further translation.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public interface IAlarmRpcDispatcher
|
||||||
|
{
|
||||||
|
/// <summary>Forward an Acknowledge to the worker that owns the session.</summary>
|
||||||
|
Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||||
|
AcknowledgeAlarmRequest request,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
/// <summary>Walk active alarms on the worker that owns the session.</summary>
|
||||||
|
IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||||
|
QueryActiveAlarmsRequest request,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -0,0 +1,52 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Grpc;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// PR A.6 / A.7 — default <see cref="IAlarmRpcDispatcher"/> shipped while
|
||||||
|
/// the worker-side AlarmClient event subscription is gated on dev-rig
|
||||||
|
/// validation. Acknowledges with a structured "worker-pending"
|
||||||
|
/// diagnostic and yields an empty active-alarm stream.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// Replaces the inline diagnostic strings in
|
||||||
|
/// <c>MxAccessGatewayService.AcknowledgeAlarm</c> /
|
||||||
|
/// <c>QueryActiveAlarms</c> from PR A.3 with an injectable seam.
|
||||||
|
/// When the worker dispatcher (PR A.6/A.7 dev-rig follow-up) lands,
|
||||||
|
/// <c>WorkerAlarmRpcDispatcher</c> replaces this implementation in
|
||||||
|
/// the DI container and the same handler shape comes alive without
|
||||||
|
/// further changes to the public RPC surface.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class NotWiredAlarmRpcDispatcher : IAlarmRpcDispatcher
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||||
|
AcknowledgeAlarmRequest request,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.FromResult(new AcknowledgeAlarmReply
|
||||||
|
{
|
||||||
|
SessionId = request.SessionId,
|
||||||
|
CorrelationId = request.ClientCorrelationId,
|
||||||
|
ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; worker dispatch pending dev-rig wiring."),
|
||||||
|
DiagnosticMessage = "Gateway-side AcknowledgeAlarm accepted; the worker-side AlarmClient consumer (PR A.5) is in place but the dispatcher hookup is gated on validating the AVEVA alarm-provider event subscription on the dev rig.",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
#pragma warning disable CS1998 // Async method lacks 'await' operators — empty stream is intentional.
|
||||||
|
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||||
|
QueryActiveAlarmsRequest request,
|
||||||
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
#pragma warning restore CS1998
|
||||||
|
}
|
||||||
@@ -0,0 +1,54 @@
|
|||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Gateway.Sessions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// PR A.6 / A.7 — pins the not-yet-wired dispatcher's behaviour:
|
||||||
|
/// AcknowledgeAsync returns OK with a worker-pending diagnostic and
|
||||||
|
/// QueryActiveAlarmsAsync yields an empty stream. Production
|
||||||
|
/// <c>WorkerAlarmRpcDispatcher</c> (dev-rig follow-up) replaces this
|
||||||
|
/// impl in DI without changing the gateway handler shape.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class NotWiredAlarmRpcDispatcherTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task AcknowledgeAsync_returns_ok_with_worker_pending_diagnostic()
|
||||||
|
{
|
||||||
|
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
|
||||||
|
|
||||||
|
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||||
|
new AcknowledgeAlarmRequest
|
||||||
|
{
|
||||||
|
SessionId = "session-1",
|
||||||
|
ClientCorrelationId = "corr-1",
|
||||||
|
AlarmFullReference = "Tank01.Level.HiHi",
|
||||||
|
Comment = "investigating",
|
||||||
|
OperatorUser = "alice",
|
||||||
|
},
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||||
|
Assert.Equal("session-1", reply.SessionId);
|
||||||
|
Assert.Equal("corr-1", reply.CorrelationId);
|
||||||
|
Assert.Contains("worker", reply.DiagnosticMessage, System.StringComparison.OrdinalIgnoreCase);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task QueryActiveAlarmsAsync_yields_no_snapshots()
|
||||||
|
{
|
||||||
|
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
await foreach (ActiveAlarmSnapshot _ in dispatcher.QueryActiveAlarmsAsync(
|
||||||
|
new QueryActiveAlarmsRequest { SessionId = "session-1" },
|
||||||
|
CancellationToken.None))
|
||||||
|
{
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.Equal(0, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user