A.3 (gateway dispatcher): WorkerAlarmRpcDispatcher routes alarm RPCs over the worker pipe

Replaces NotWiredAlarmRpcDispatcher in DI with a production
implementation that issues the new MxCommandKind.{AcknowledgeAlarm,
QueryActiveAlarms} commands across the IPC and unwraps the resulting
MxCommandReply into the public RPC types.

QueryActiveAlarms is fully wired: builds the QueryActiveAlarmsCommand
(forwarding alarm_filter_prefix), invokes it on the resolved
GatewaySession's worker client, and yields each ActiveAlarmSnapshot
from the QueryActiveAlarmsReplyPayload as the RPC stream. Worker
failures + missing sessions yield an empty stream — matches the
ConditionRefresh contract clients already speak to.

AcknowledgeAlarm is partially wired: the public RPC takes
AlarmFullReference (Provider!Group.Tag), but the worker's wnwrap
consumer acks by GUID. Strategy:
- If AlarmFullReference parses as a canonical GUID, forward it
  directly through MxCommandKind.AcknowledgeAlarm. Native status
  flows back via MxCommandReply.Hresult and the dedicated
  AcknowledgeAlarmReplyPayload.NativeStatus.
- Otherwise, return InvalidRequest with a clear diagnostic naming the
  follow-up — reference→GUID lookup needs a worker-side AlarmAckByName
  command wrapping wwAlarmConsumerClass.AlarmAckByName.

DI: SessionServiceCollectionExtensions registers WorkerAlarmRpcDispatcher
as the default IAlarmRpcDispatcher; MxAccessGatewayService picks it up
via constructor injection. NotWiredAlarmRpcDispatcher is retained for
test fixtures that want the no-side-effect fake.

Tests: 7 new unit tests cover session-not-found short-circuit, GUID-vs-
reference branching, native-status propagation, worker MxaccessFailure
diagnostic propagation, and snapshot-stream yielding. Server test
suite total: 288/0 fail. Solution builds clean.

End-to-end alarms-over-gateway pipeline status:
  consumer → sink → queue (A.2 + A.3 in-process slice)
  worker IPC commands (A.3 worker slice)
  gateway dispatcher (this slice)

Remaining for full E2E:
  - Auto-issue SubscribeAlarms on session open (or add a public
    SubscribeAlarms RPC). Without this trigger the consumer never
    starts and Acknowledge/Query return "not subscribed".
  - AlarmAckByName worker command for ack-by-reference.
  - End-to-end live test against the dev rig.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-01 10:58:40 -04:00
parent 01f5e6ad91
commit 9b21ca3554
3 changed files with 473 additions and 0 deletions
@@ -11,6 +11,7 @@ public static class SessionServiceCollectionExtensions
services.AddSingleton<ISessionRegistry, SessionRegistry>(); services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>(); services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>(); services.AddSingleton<ISessionManager, SessionManager>();
services.AddSingleton<IAlarmRpcDispatcher, WorkerAlarmRpcDispatcher>();
services.AddHostedService<SessionLeaseMonitorHostedService>(); services.AddHostedService<SessionLeaseMonitorHostedService>();
services.AddHostedService<SessionShutdownHostedService>(); services.AddHostedService<SessionShutdownHostedService>();
@@ -0,0 +1,172 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Grpc;
namespace MxGateway.Server.Sessions;
/// <summary>
/// Production <see cref="IAlarmRpcDispatcher"/> that routes the public
/// <c>AcknowledgeAlarm</c> + <c>QueryActiveAlarms</c> RPCs through the
/// worker pipe IPC. Replaces <see cref="NotWiredAlarmRpcDispatcher"/>
/// once the worker AlarmCommandHandler is wired in.
/// </summary>
/// <remarks>
/// <para>
/// <c>QueryActiveAlarms</c> is fully wired: issues a
/// <see cref="QueryActiveAlarmsCommand"/> over the pipe and yields
/// each <see cref="ActiveAlarmSnapshot"/> from the
/// <see cref="QueryActiveAlarmsReplyPayload"/>.
/// </para>
/// <para>
/// <c>AcknowledgeAlarm</c> is partially wired: the public RPC's
/// <see cref="AcknowledgeAlarmRequest.AlarmFullReference"/> is a
/// <c>Provider!Group.Tag</c> string, but the worker's wnwrap consumer
/// acks by GUID. When the supplied reference parses as a GUID
/// directly, the dispatcher forwards it as-is. Otherwise it
/// returns an <c>Unimplemented</c> diagnostic. Resolving
/// reference→GUID requires an additional worker IPC command
/// (e.g. <c>AlarmAckByName</c> wrapping
/// <c>wwAlarmConsumerClass.AlarmAckByName</c>) and is tracked as
/// a follow-up.
/// </para>
/// </remarks>
public sealed class WorkerAlarmRpcDispatcher : IAlarmRpcDispatcher
{
private readonly ISessionRegistry sessionRegistry;
private readonly TimeProvider timeProvider;
public WorkerAlarmRpcDispatcher(ISessionRegistry sessionRegistry, TimeProvider? timeProvider = null)
{
this.sessionRegistry = sessionRegistry ?? throw new System.ArgumentNullException(nameof(sessionRegistry));
this.timeProvider = timeProvider ?? TimeProvider.System;
}
/// <inheritdoc />
public async Task<AcknowledgeAlarmReply> AcknowledgeAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken)
{
if (request is null) throw new System.ArgumentNullException(nameof(request));
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session))
{
return new AcknowledgeAlarmReply
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
ProtocolStatus = MxAccessGrpcMapper.SessionNotFound(
$"Session '{request.SessionId}' not found."),
DiagnosticMessage = "AcknowledgeAlarm: session not found.",
};
}
if (!System.Guid.TryParse(request.AlarmFullReference, out System.Guid guid))
{
// Reference→GUID lookup not yet implemented. Surface a clear
// diagnostic so client teams can plumb the reference parser
// when the worker AlarmAckByName command lands.
return new AcknowledgeAlarmReply
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.InvalidRequest,
Message = "AlarmFullReference must currently be a canonical GUID; reference→GUID lookup is pending the AlarmAckByName worker command.",
},
DiagnosticMessage = $"AcknowledgeAlarm received non-GUID reference '{request.AlarmFullReference}'.",
};
}
WorkerCommand workerCommand = new WorkerCommand
{
Command = new MxCommand
{
Kind = MxCommandKind.AcknowledgeAlarm,
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
{
AlarmGuid = guid.ToString(),
Comment = request.Comment ?? string.Empty,
OperatorUser = request.OperatorUser ?? string.Empty,
// Operator node/domain/full-name are not on the public
// RPC surface today; pass empty strings so the worker
// honours the existing AcknowledgeAlarmCommand schema.
OperatorNode = string.Empty,
OperatorDomain = string.Empty,
OperatorFullName = string.Empty,
},
},
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
};
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
.ConfigureAwait(false);
MxCommandReply mxReply = workerReply.Reply ?? new MxCommandReply
{
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.ProtocolViolation,
Message = "Worker reply did not include an MxCommandReply.",
},
};
AcknowledgeAlarmReply reply = new AcknowledgeAlarmReply
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
ProtocolStatus = mxReply.ProtocolStatus ?? MxAccessGrpcMapper.Ok(),
DiagnosticMessage = mxReply.DiagnosticMessage ?? string.Empty,
};
if (mxReply.HasHresult)
{
reply.Hresult = mxReply.Hresult;
}
return reply;
}
/// <inheritdoc />
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
QueryActiveAlarmsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (request is null) throw new System.ArgumentNullException(nameof(request));
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session))
{
yield break;
}
WorkerCommand workerCommand = new WorkerCommand
{
Command = new MxCommand
{
Kind = MxCommandKind.QueryActiveAlarms,
QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand
{
AlarmFilterPrefix = request.AlarmFilterPrefix ?? string.Empty,
},
},
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
};
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
.ConfigureAwait(false);
MxCommandReply? mxReply = workerReply.Reply;
if (mxReply?.ProtocolStatus?.Code != ProtocolStatusCode.Ok) yield break;
QueryActiveAlarmsReplyPayload? payload = mxReply.QueryActiveAlarms;
if (payload is null) yield break;
foreach (ActiveAlarmSnapshot snapshot in payload.Snapshots)
{
cancellationToken.ThrowIfCancellationRequested();
yield return snapshot;
}
}
}
@@ -0,0 +1,300 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// Pins the production <see cref="WorkerAlarmRpcDispatcher"/>'s behaviour:
/// resolves the session by id, issues the matching MxCommand over the
/// worker pipe, and unwraps the reply into AcknowledgeAlarmReply or the
/// ActiveAlarmSnapshot stream.
/// </summary>
public sealed class WorkerAlarmRpcDispatcherTests
{
[Fact]
public async Task AcknowledgeAsync_returns_session_not_found_when_session_missing()
{
SessionRegistry registry = new SessionRegistry();
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
{
SessionId = "missing",
ClientCorrelationId = "c1",
AlarmFullReference = Guid.NewGuid().ToString(),
},
CancellationToken.None);
Assert.Equal(ProtocolStatusCode.SessionNotFound, reply.ProtocolStatus.Code);
}
[Fact]
public async Task AcknowledgeAsync_returns_invalid_request_when_reference_is_not_a_guid()
{
SessionRegistry registry = new SessionRegistry();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient();
GatewaySession session = NewSession("s1");
session.AttachWorkerClient(worker);
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
{
SessionId = "s1",
ClientCorrelationId = "c1",
AlarmFullReference = "Galaxy!Area.Tag", // not a GUID
Comment = "x",
OperatorUser = "u",
},
CancellationToken.None);
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
Assert.Equal(0, worker.InvokeCount); // dispatcher short-circuits before the IPC.
}
[Fact]
public async Task AcknowledgeAsync_forwards_guid_and_returns_native_status()
{
SessionRegistry registry = new SessionRegistry();
Guid alarmGuid = Guid.NewGuid();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
{
ReplyFactory = command =>
{
Assert.Equal(MxCommandKind.AcknowledgeAlarm, command.Command.Kind);
Assert.Equal(alarmGuid.ToString(), command.Command.AcknowledgeAlarmCommand.AlarmGuid);
Assert.Equal("ack", command.Command.AcknowledgeAlarmCommand.Comment);
Assert.Equal("alice", command.Command.AcknowledgeAlarmCommand.OperatorUser);
return new MxCommandReply
{
Kind = MxCommandKind.AcknowledgeAlarm,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" },
Hresult = 0,
AcknowledgeAlarm = new AcknowledgeAlarmReplyPayload { NativeStatus = 0 },
};
},
};
GatewaySession session = NewSession("s1");
session.AttachWorkerClient(worker);
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
{
SessionId = "s1",
ClientCorrelationId = "c1",
AlarmFullReference = alarmGuid.ToString(),
Comment = "ack",
OperatorUser = "alice",
},
CancellationToken.None);
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.Equal(0, reply.Hresult);
Assert.Equal("s1", reply.SessionId);
Assert.Equal("c1", reply.CorrelationId);
Assert.Equal(1, worker.InvokeCount);
}
[Fact]
public async Task AcknowledgeAsync_propagates_worker_diagnostic_on_failure()
{
SessionRegistry registry = new SessionRegistry();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
{
ReplyFactory = _ => new MxCommandReply
{
Kind = MxCommandKind.AcknowledgeAlarm,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.MxaccessFailure,
Message = "AVEVA Acknowledge failed.",
},
Hresult = -123,
DiagnosticMessage = "AVEVA AlarmAckByGUID returned non-zero status -123.",
},
};
GatewaySession session = NewSession("s1");
session.AttachWorkerClient(worker);
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
new AcknowledgeAlarmRequest
{
SessionId = "s1",
AlarmFullReference = Guid.NewGuid().ToString(),
},
CancellationToken.None);
Assert.Equal(ProtocolStatusCode.MxaccessFailure, reply.ProtocolStatus.Code);
Assert.Equal(-123, reply.Hresult);
Assert.Contains("-123", reply.DiagnosticMessage);
}
[Fact]
public async Task QueryActiveAlarmsAsync_yields_each_snapshot_from_payload()
{
SessionRegistry registry = new SessionRegistry();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
{
ReplyFactory = command =>
{
Assert.Equal(MxCommandKind.QueryActiveAlarms, command.Command.Kind);
QueryActiveAlarmsReplyPayload payload = new QueryActiveAlarmsReplyPayload();
payload.Snapshots.Add(new ActiveAlarmSnapshot
{
AlarmFullReference = "Galaxy!A.T1",
CurrentState = AlarmConditionState.Active,
Severity = 500,
});
payload.Snapshots.Add(new ActiveAlarmSnapshot
{
AlarmFullReference = "Galaxy!A.T2",
CurrentState = AlarmConditionState.ActiveAcked,
Severity = 100,
});
return new MxCommandReply
{
Kind = MxCommandKind.QueryActiveAlarms,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" },
QueryActiveAlarms = payload,
};
},
};
GatewaySession session = NewSession("s1");
session.AttachWorkerClient(worker);
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
new QueryActiveAlarmsRequest { SessionId = "s1" },
CancellationToken.None))
{
collected.Add(snap);
}
Assert.Equal(2, collected.Count);
Assert.Equal("Galaxy!A.T1", collected[0].AlarmFullReference);
Assert.Equal("Galaxy!A.T2", collected[1].AlarmFullReference);
}
[Fact]
public async Task QueryActiveAlarmsAsync_yields_empty_when_session_missing()
{
SessionRegistry registry = new SessionRegistry();
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
new QueryActiveAlarmsRequest { SessionId = "missing" },
CancellationToken.None))
{
collected.Add(snap);
}
Assert.Empty(collected);
}
[Fact]
public async Task QueryActiveAlarmsAsync_yields_empty_on_worker_failure()
{
SessionRegistry registry = new SessionRegistry();
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
{
ReplyFactory = _ => new MxCommandReply
{
Kind = MxCommandKind.QueryActiveAlarms,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.MxaccessFailure,
Message = "alarm consumer not subscribed",
},
},
};
GatewaySession session = NewSession("s1");
session.AttachWorkerClient(worker);
session.MarkReady();
registry.TryAdd(session);
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
new QueryActiveAlarmsRequest { SessionId = "s1" },
CancellationToken.None))
{
collected.Add(snap);
}
Assert.Empty(collected);
}
private static GatewaySession NewSession(string sessionId)
{
return new GatewaySession(
sessionId,
"mxaccess",
$"mxaccess-gateway-1-{sessionId}",
"nonce",
"client-1",
"test-session",
"client-correlation-1",
commandTimeout: TimeSpan.FromSeconds(30),
startupTimeout: TimeSpan.FromSeconds(5),
shutdownTimeout: TimeSpan.FromSeconds(5),
leaseDuration: TimeSpan.FromMinutes(30),
openedAt: DateTimeOffset.UtcNow);
}
private sealed class FakeAlarmWorkerClient : IWorkerClient
{
public string SessionId { get; } = "session-1";
public int? ProcessId { get; } = 1;
public WorkerClientState State { get; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
public Func<WorkerCommand, MxCommandReply>? ReplyFactory { get; set; }
public int InvokeCount { get; private set; }
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken)
{
InvokeCount++;
MxCommandReply reply = ReplyFactory?.Invoke(command) ?? new MxCommandReply();
return Task.FromResult(new WorkerCommandReply { Reply = reply });
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask;
public void Kill(string reason) { }
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
}