diff --git a/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs b/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs index 5b7eae4..1f1e67b 100644 --- a/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs +++ b/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs @@ -11,6 +11,7 @@ public static class SessionServiceCollectionExtensions services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddHostedService(); services.AddHostedService(); diff --git a/src/MxGateway.Server/Sessions/WorkerAlarmRpcDispatcher.cs b/src/MxGateway.Server/Sessions/WorkerAlarmRpcDispatcher.cs new file mode 100644 index 0000000..89e6382 --- /dev/null +++ b/src/MxGateway.Server/Sessions/WorkerAlarmRpcDispatcher.cs @@ -0,0 +1,172 @@ +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Grpc; + +namespace MxGateway.Server.Sessions; + +/// +/// Production that routes the public +/// AcknowledgeAlarm + QueryActiveAlarms RPCs through the +/// worker pipe IPC. Replaces +/// once the worker AlarmCommandHandler is wired in. +/// +/// +/// +/// QueryActiveAlarms is fully wired: issues a +/// over the pipe and yields +/// each from the +/// . +/// +/// +/// AcknowledgeAlarm is partially wired: the public RPC's +/// is a +/// Provider!Group.Tag string, but the worker's wnwrap consumer +/// acks by GUID. When the supplied reference parses as a GUID +/// directly, the dispatcher forwards it as-is. Otherwise it +/// returns an Unimplemented diagnostic. Resolving +/// reference→GUID requires an additional worker IPC command +/// (e.g. AlarmAckByName wrapping +/// wwAlarmConsumerClass.AlarmAckByName) and is tracked as +/// a follow-up. +/// +/// +public sealed class WorkerAlarmRpcDispatcher : IAlarmRpcDispatcher +{ + private readonly ISessionRegistry sessionRegistry; + private readonly TimeProvider timeProvider; + + public WorkerAlarmRpcDispatcher(ISessionRegistry sessionRegistry, TimeProvider? timeProvider = null) + { + this.sessionRegistry = sessionRegistry ?? throw new System.ArgumentNullException(nameof(sessionRegistry)); + this.timeProvider = timeProvider ?? TimeProvider.System; + } + + /// + public async Task AcknowledgeAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken) + { + if (request is null) throw new System.ArgumentNullException(nameof(request)); + + if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session)) + { + return new AcknowledgeAlarmReply + { + SessionId = request.SessionId, + CorrelationId = request.ClientCorrelationId, + ProtocolStatus = MxAccessGrpcMapper.SessionNotFound( + $"Session '{request.SessionId}' not found."), + DiagnosticMessage = "AcknowledgeAlarm: session not found.", + }; + } + + if (!System.Guid.TryParse(request.AlarmFullReference, out System.Guid guid)) + { + // Reference→GUID lookup not yet implemented. Surface a clear + // diagnostic so client teams can plumb the reference parser + // when the worker AlarmAckByName command lands. + return new AcknowledgeAlarmReply + { + SessionId = request.SessionId, + CorrelationId = request.ClientCorrelationId, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.InvalidRequest, + Message = "AlarmFullReference must currently be a canonical GUID; reference→GUID lookup is pending the AlarmAckByName worker command.", + }, + DiagnosticMessage = $"AcknowledgeAlarm received non-GUID reference '{request.AlarmFullReference}'.", + }; + } + + WorkerCommand workerCommand = new WorkerCommand + { + Command = new MxCommand + { + Kind = MxCommandKind.AcknowledgeAlarm, + AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand + { + AlarmGuid = guid.ToString(), + Comment = request.Comment ?? string.Empty, + OperatorUser = request.OperatorUser ?? string.Empty, + // Operator node/domain/full-name are not on the public + // RPC surface today; pass empty strings so the worker + // honours the existing AcknowledgeAlarmCommand schema. + OperatorNode = string.Empty, + OperatorDomain = string.Empty, + OperatorFullName = string.Empty, + }, + }, + EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()), + }; + + WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken) + .ConfigureAwait(false); + + MxCommandReply mxReply = workerReply.Reply ?? new MxCommandReply + { + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.ProtocolViolation, + Message = "Worker reply did not include an MxCommandReply.", + }, + }; + + AcknowledgeAlarmReply reply = new AcknowledgeAlarmReply + { + SessionId = request.SessionId, + CorrelationId = request.ClientCorrelationId, + ProtocolStatus = mxReply.ProtocolStatus ?? MxAccessGrpcMapper.Ok(), + DiagnosticMessage = mxReply.DiagnosticMessage ?? string.Empty, + }; + if (mxReply.HasHresult) + { + reply.Hresult = mxReply.Hresult; + } + return reply; + } + + /// + public async IAsyncEnumerable QueryActiveAlarmsAsync( + QueryActiveAlarmsRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + if (request is null) throw new System.ArgumentNullException(nameof(request)); + + if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session)) + { + yield break; + } + + WorkerCommand workerCommand = new WorkerCommand + { + Command = new MxCommand + { + Kind = MxCommandKind.QueryActiveAlarms, + QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand + { + AlarmFilterPrefix = request.AlarmFilterPrefix ?? string.Empty, + }, + }, + EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()), + }; + + WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken) + .ConfigureAwait(false); + + MxCommandReply? mxReply = workerReply.Reply; + if (mxReply?.ProtocolStatus?.Code != ProtocolStatusCode.Ok) yield break; + + QueryActiveAlarmsReplyPayload? payload = mxReply.QueryActiveAlarms; + if (payload is null) yield break; + + foreach (ActiveAlarmSnapshot snapshot in payload.Snapshots) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return snapshot; + } + } +} diff --git a/src/MxGateway.Tests/Gateway/Sessions/WorkerAlarmRpcDispatcherTests.cs b/src/MxGateway.Tests/Gateway/Sessions/WorkerAlarmRpcDispatcherTests.cs new file mode 100644 index 0000000..ae56f0b --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Sessions/WorkerAlarmRpcDispatcherTests.cs @@ -0,0 +1,300 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Sessions; + +/// +/// Pins the production 's behaviour: +/// resolves the session by id, issues the matching MxCommand over the +/// worker pipe, and unwraps the reply into AcknowledgeAlarmReply or the +/// ActiveAlarmSnapshot stream. +/// +public sealed class WorkerAlarmRpcDispatcherTests +{ + [Fact] + public async Task AcknowledgeAsync_returns_session_not_found_when_session_missing() + { + SessionRegistry registry = new SessionRegistry(); + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync( + new AcknowledgeAlarmRequest + { + SessionId = "missing", + ClientCorrelationId = "c1", + AlarmFullReference = Guid.NewGuid().ToString(), + }, + CancellationToken.None); + + Assert.Equal(ProtocolStatusCode.SessionNotFound, reply.ProtocolStatus.Code); + } + + [Fact] + public async Task AcknowledgeAsync_returns_invalid_request_when_reference_is_not_a_guid() + { + SessionRegistry registry = new SessionRegistry(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient(); + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync( + new AcknowledgeAlarmRequest + { + SessionId = "s1", + ClientCorrelationId = "c1", + AlarmFullReference = "Galaxy!Area.Tag", // not a GUID + Comment = "x", + OperatorUser = "u", + }, + CancellationToken.None); + + Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code); + Assert.Equal(0, worker.InvokeCount); // dispatcher short-circuits before the IPC. + } + + [Fact] + public async Task AcknowledgeAsync_forwards_guid_and_returns_native_status() + { + SessionRegistry registry = new SessionRegistry(); + Guid alarmGuid = Guid.NewGuid(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient + { + ReplyFactory = command => + { + Assert.Equal(MxCommandKind.AcknowledgeAlarm, command.Command.Kind); + Assert.Equal(alarmGuid.ToString(), command.Command.AcknowledgeAlarmCommand.AlarmGuid); + Assert.Equal("ack", command.Command.AcknowledgeAlarmCommand.Comment); + Assert.Equal("alice", command.Command.AcknowledgeAlarmCommand.OperatorUser); + return new MxCommandReply + { + Kind = MxCommandKind.AcknowledgeAlarm, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" }, + Hresult = 0, + AcknowledgeAlarm = new AcknowledgeAlarmReplyPayload { NativeStatus = 0 }, + }; + }, + }; + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync( + new AcknowledgeAlarmRequest + { + SessionId = "s1", + ClientCorrelationId = "c1", + AlarmFullReference = alarmGuid.ToString(), + Comment = "ack", + OperatorUser = "alice", + }, + CancellationToken.None); + + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Equal(0, reply.Hresult); + Assert.Equal("s1", reply.SessionId); + Assert.Equal("c1", reply.CorrelationId); + Assert.Equal(1, worker.InvokeCount); + } + + [Fact] + public async Task AcknowledgeAsync_propagates_worker_diagnostic_on_failure() + { + SessionRegistry registry = new SessionRegistry(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient + { + ReplyFactory = _ => new MxCommandReply + { + Kind = MxCommandKind.AcknowledgeAlarm, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.MxaccessFailure, + Message = "AVEVA Acknowledge failed.", + }, + Hresult = -123, + DiagnosticMessage = "AVEVA AlarmAckByGUID returned non-zero status -123.", + }, + }; + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync( + new AcknowledgeAlarmRequest + { + SessionId = "s1", + AlarmFullReference = Guid.NewGuid().ToString(), + }, + CancellationToken.None); + + Assert.Equal(ProtocolStatusCode.MxaccessFailure, reply.ProtocolStatus.Code); + Assert.Equal(-123, reply.Hresult); + Assert.Contains("-123", reply.DiagnosticMessage); + } + + [Fact] + public async Task QueryActiveAlarmsAsync_yields_each_snapshot_from_payload() + { + SessionRegistry registry = new SessionRegistry(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient + { + ReplyFactory = command => + { + Assert.Equal(MxCommandKind.QueryActiveAlarms, command.Command.Kind); + QueryActiveAlarmsReplyPayload payload = new QueryActiveAlarmsReplyPayload(); + payload.Snapshots.Add(new ActiveAlarmSnapshot + { + AlarmFullReference = "Galaxy!A.T1", + CurrentState = AlarmConditionState.Active, + Severity = 500, + }); + payload.Snapshots.Add(new ActiveAlarmSnapshot + { + AlarmFullReference = "Galaxy!A.T2", + CurrentState = AlarmConditionState.ActiveAcked, + Severity = 100, + }); + return new MxCommandReply + { + Kind = MxCommandKind.QueryActiveAlarms, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" }, + QueryActiveAlarms = payload, + }; + }, + }; + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + List collected = new List(); + await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync( + new QueryActiveAlarmsRequest { SessionId = "s1" }, + CancellationToken.None)) + { + collected.Add(snap); + } + + Assert.Equal(2, collected.Count); + Assert.Equal("Galaxy!A.T1", collected[0].AlarmFullReference); + Assert.Equal("Galaxy!A.T2", collected[1].AlarmFullReference); + } + + [Fact] + public async Task QueryActiveAlarmsAsync_yields_empty_when_session_missing() + { + SessionRegistry registry = new SessionRegistry(); + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + List collected = new List(); + await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync( + new QueryActiveAlarmsRequest { SessionId = "missing" }, + CancellationToken.None)) + { + collected.Add(snap); + } + + Assert.Empty(collected); + } + + [Fact] + public async Task QueryActiveAlarmsAsync_yields_empty_on_worker_failure() + { + SessionRegistry registry = new SessionRegistry(); + FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient + { + ReplyFactory = _ => new MxCommandReply + { + Kind = MxCommandKind.QueryActiveAlarms, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.MxaccessFailure, + Message = "alarm consumer not subscribed", + }, + }, + }; + GatewaySession session = NewSession("s1"); + session.AttachWorkerClient(worker); + session.MarkReady(); + registry.TryAdd(session); + + WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry); + + List collected = new List(); + await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync( + new QueryActiveAlarmsRequest { SessionId = "s1" }, + CancellationToken.None)) + { + collected.Add(snap); + } + + Assert.Empty(collected); + } + + private static GatewaySession NewSession(string sessionId) + { + return new GatewaySession( + sessionId, + "mxaccess", + $"mxaccess-gateway-1-{sessionId}", + "nonce", + "client-1", + "test-session", + "client-correlation-1", + commandTimeout: TimeSpan.FromSeconds(30), + startupTimeout: TimeSpan.FromSeconds(5), + shutdownTimeout: TimeSpan.FromSeconds(5), + leaseDuration: TimeSpan.FromMinutes(30), + openedAt: DateTimeOffset.UtcNow); + } + + private sealed class FakeAlarmWorkerClient : IWorkerClient + { + public string SessionId { get; } = "session-1"; + public int? ProcessId { get; } = 1; + public WorkerClientState State { get; } = WorkerClientState.Ready; + public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; + + public Func? ReplyFactory { get; set; } + public int InvokeCount { get; private set; } + + public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; + + public Task InvokeAsync( + WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken) + { + InvokeCount++; + MxCommandReply reply = ReplyFactory?.Invoke(command) ?? new MxCommandReply(); + return Task.FromResult(new WorkerCommandReply { Reply = reply }); + } + + public async IAsyncEnumerable ReadEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask; + yield break; + } + + public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask; + public void Kill(string reason) { } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } +}