From 82eb0ad569419e709fa2809b056d3cd8ad7ad5fc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 1 May 2026 09:52:35 -0400 Subject: [PATCH] A.3 (in-process slice): AlarmDispatcher wires consumer events onto event queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the in-process plumbing that connects WnWrapAlarmConsumer's AlarmTransitionEmitted stream to the worker's MxAccessEventQueue via MxAccessAlarmEventSink. With this change a transition raised by the consumer lands as an OnAlarmTransitionEvent proto on the queue, SessionId attached, ready for IPC dispatch. Mapping: provider!group.tag → AlarmFullReference, tag → SourceObjectReference, priority → severity, wnwrap STATE → AlarmConditionState (Active / ActiveAcked / Inactive — wnwrap's ack-vs-unack-on-cleared distinction collapses since OPC UA Part 9 doesn't model it). State delta drives AlarmTransitionKind via the existing AlarmRecordTransitionMapper table. Holding off on the proto IPC additions (SubscribeAlarms / AcknowledgeAlarm / QueryActiveAlarms commands + WorkerAlarmRpcDispatcher) for a follow-up — those touch every layer of the worker IPC and warrant their own PR. This slice proves the consumer→sink→queue pipeline end-to-end with unit tests and clears the path for the proto additions to plug in cleanly. Tests: 10 new unit tests cover field-by-field mapping, the "unchanged-state-doesn't-emit" filter, the state→transition kind table, Subscribe / Acknowledge passthrough, SnapshotActiveAlarms → proto ActiveAlarmSnapshot mapping, and Dispose detaches the handler. All passing; total worker test count 172/3 skip / 1 pre-existing structure fail (untouched). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../MxAccess/AlarmDispatcherTests.cs | 288 ++++++++++++++++++ .../MxAccess/AlarmDispatcher.cs | 190 ++++++++++++ 2 files changed, 478 insertions(+) create mode 100644 src/MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs create mode 100644 src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs diff --git a/src/MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs b/src/MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs new file mode 100644 index 0000000..5816d64 --- /dev/null +++ b/src/MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs @@ -0,0 +1,288 @@ +using System; +using System.Collections.Generic; +using MxGateway.Contracts.Proto; +using MxGateway.Worker.MxAccess; + +namespace MxGateway.Worker.Tests.MxAccess; + +/// +/// Unit tests for the in-process A.3 dispatcher: prove that +/// events +/// fan out to the worker's as proto +/// messages with correctly mapped +/// fields. The fake consumer below stands in for the wnwrap-backed +/// production implementation so this exercise needs no AVEVA install. +/// +public sealed class AlarmDispatcherTests +{ + private const string SessionId = "session-001"; + + [Fact] + public void TransitionEvent_lands_in_queue_with_mapped_fields() + { + FakeAlarmConsumer consumer = new FakeAlarmConsumer(); + MxAccessEventQueue queue = new MxAccessEventQueue(); + MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(queue, new MxAccessEventMapper()); + using AlarmDispatcher dispatcher = new AlarmDispatcher(consumer, sink, SessionId); + + DateTime ts = new DateTime(2026, 5, 1, 17, 26, 14, 709, DateTimeKind.Utc); + consumer.RaiseTransition(new MxAlarmTransitionEvent + { + PreviousState = MxAlarmStateKind.Unspecified, + Record = new MxAlarmSnapshotRecord + { + AlarmGuid = Guid.NewGuid(), + ProviderName = "Galaxy", + Group = "TestArea", + TagName = "TestMachine_001.TestAlarm001", + Type = "DSC", + Priority = 500, + State = MxAlarmStateKind.UnackAlm, + TransitionTimestampUtc = ts, + AlarmComment = "Test alarm #1", + }, + }); + + Assert.Equal(1, queue.Count); + Assert.True(queue.TryDequeue(out WorkerEvent? workerEvent)); + Assert.NotNull(workerEvent); + MxEvent mxEvent = workerEvent!.Event; + Assert.Equal(MxEventFamily.OnAlarmTransition, mxEvent.Family); + Assert.Equal(SessionId, mxEvent.SessionId); + + OnAlarmTransitionEvent body = mxEvent.OnAlarmTransition; + Assert.NotNull(body); + Assert.Equal("Galaxy!TestArea.TestMachine_001.TestAlarm001", body.AlarmFullReference); + Assert.Equal("TestMachine_001.TestAlarm001", body.SourceObjectReference); + Assert.Equal("DSC", body.AlarmTypeName); + Assert.Equal(AlarmTransitionKind.Raise, body.TransitionKind); + Assert.Equal(500, body.Severity); + Assert.Equal("Test alarm #1", body.OperatorComment); + Assert.Equal("TestArea", body.Category); + Assert.NotNull(body.TransitionTimestamp); + Assert.Equal(ts, body.TransitionTimestamp.ToDateTime()); + } + + [Fact] + public void Consecutive_unchanged_state_does_not_emit_a_transition() + { + // Mapper.MapTransition returns Unspecified when the state didn't + // change; the dispatcher should drop the event before queueing. + FakeAlarmConsumer consumer = new FakeAlarmConsumer(); + MxAccessEventQueue queue = new MxAccessEventQueue(); + MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(queue, new MxAccessEventMapper()); + using AlarmDispatcher dispatcher = new AlarmDispatcher(consumer, sink, SessionId); + + consumer.RaiseTransition(new MxAlarmTransitionEvent + { + PreviousState = MxAlarmStateKind.UnackAlm, + Record = new MxAlarmSnapshotRecord + { + AlarmGuid = Guid.NewGuid(), + ProviderName = "Galaxy", + Group = "X", + TagName = "Y", + State = MxAlarmStateKind.UnackAlm, + }, + }); + + Assert.Equal(0, queue.Count); + } + + [Theory] + [InlineData(MxAlarmStateKind.Unspecified, MxAlarmStateKind.UnackAlm, AlarmTransitionKind.Raise)] + [InlineData(MxAlarmStateKind.UnackAlm, MxAlarmStateKind.AckAlm, AlarmTransitionKind.Acknowledge)] + [InlineData(MxAlarmStateKind.UnackAlm, MxAlarmStateKind.UnackRtn, AlarmTransitionKind.Clear)] + [InlineData(MxAlarmStateKind.UnackRtn, MxAlarmStateKind.UnackAlm, AlarmTransitionKind.Raise)] + public void Transition_kind_follows_state_table( + MxAlarmStateKind previous, + MxAlarmStateKind current, + AlarmTransitionKind expected) + { + FakeAlarmConsumer consumer = new FakeAlarmConsumer(); + MxAccessEventQueue queue = new MxAccessEventQueue(); + MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(queue, new MxAccessEventMapper()); + using AlarmDispatcher dispatcher = new AlarmDispatcher(consumer, sink, SessionId); + + consumer.RaiseTransition(new MxAlarmTransitionEvent + { + PreviousState = previous, + Record = new MxAlarmSnapshotRecord + { + AlarmGuid = Guid.NewGuid(), + ProviderName = "Galaxy", + Group = "G", + TagName = "T", + State = current, + }, + }); + + Assert.Equal(1, queue.Count); + queue.TryDequeue(out WorkerEvent? evt); + Assert.Equal(expected, evt!.Event.OnAlarmTransition.TransitionKind); + } + + [Fact] + public void Subscribe_forwards_to_consumer() + { + FakeAlarmConsumer consumer = new FakeAlarmConsumer(); + using AlarmDispatcher dispatcher = new AlarmDispatcher( + consumer, + new MxAccessAlarmEventSink(new MxAccessEventQueue(), new MxAccessEventMapper()), + SessionId); + + dispatcher.Subscribe(@"\\HOST\Galaxy!Area1"); + Assert.Equal(@"\\HOST\Galaxy!Area1", consumer.LastSubscription); + } + + [Fact] + public void Acknowledge_forwards_to_consumer_with_full_operator_identity() + { + FakeAlarmConsumer consumer = new FakeAlarmConsumer(); + consumer.AcknowledgeReturn = 0; + using AlarmDispatcher dispatcher = new AlarmDispatcher( + consumer, + new MxAccessAlarmEventSink(new MxAccessEventQueue(), new MxAccessEventMapper()), + SessionId); + + Guid guid = Guid.NewGuid(); + int rc = dispatcher.Acknowledge( + guid, "Acked", "alice", "WS01", "CORP", "Alice Smith"); + + Assert.Equal(0, rc); + Assert.Equal(guid, consumer.LastAckGuid); + Assert.Equal("Acked", consumer.LastAckComment); + Assert.Equal("alice", consumer.LastAckOperatorName); + Assert.Equal("WS01", consumer.LastAckOperatorNode); + Assert.Equal("CORP", consumer.LastAckOperatorDomain); + Assert.Equal("Alice Smith", consumer.LastAckOperatorFullName); + } + + [Fact] + public void SnapshotActiveAlarms_maps_records_to_protos() + { + FakeAlarmConsumer consumer = new FakeAlarmConsumer(); + DateTime ts = new DateTime(2026, 5, 1, 17, 26, 14, 709, DateTimeKind.Utc); + consumer.SnapshotResult = new[] + { + new MxAlarmSnapshotRecord + { + AlarmGuid = Guid.NewGuid(), + ProviderName = "Galaxy", + Group = "TestArea", + TagName = "Tag1", + Type = "DSC", + Priority = 500, + State = MxAlarmStateKind.UnackAlm, + TransitionTimestampUtc = ts, + AlarmComment = "x", + }, + new MxAlarmSnapshotRecord + { + AlarmGuid = Guid.NewGuid(), + ProviderName = "Galaxy", + Group = "TestArea", + TagName = "Tag2", + Type = "ANL", + Priority = 100, + State = MxAlarmStateKind.AckAlm, + TransitionTimestampUtc = ts, + }, + }; + using AlarmDispatcher dispatcher = new AlarmDispatcher( + consumer, + new MxAccessAlarmEventSink(new MxAccessEventQueue(), new MxAccessEventMapper()), + SessionId); + + IReadOnlyList snapshots = dispatcher.SnapshotActiveAlarms(); + Assert.Equal(2, snapshots.Count); + + Assert.Equal("Galaxy!TestArea.Tag1", snapshots[0].AlarmFullReference); + Assert.Equal(AlarmConditionState.Active, snapshots[0].CurrentState); + Assert.Equal(500, snapshots[0].Severity); + Assert.Equal(ts, snapshots[0].LastTransitionTimestamp.ToDateTime()); + + Assert.Equal("Galaxy!TestArea.Tag2", snapshots[1].AlarmFullReference); + Assert.Equal(AlarmConditionState.ActiveAcked, snapshots[1].CurrentState); + } + + [Fact] + public void Dispose_unsubscribes_handler_and_disposes_consumer() + { + FakeAlarmConsumer consumer = new FakeAlarmConsumer(); + MxAccessEventQueue queue = new MxAccessEventQueue(); + MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(queue, new MxAccessEventMapper()); + AlarmDispatcher dispatcher = new AlarmDispatcher(consumer, sink, SessionId); + + dispatcher.Dispose(); + + Assert.True(consumer.Disposed); + consumer.RaiseTransition(new MxAlarmTransitionEvent + { + PreviousState = MxAlarmStateKind.Unspecified, + Record = new MxAlarmSnapshotRecord + { + AlarmGuid = Guid.NewGuid(), + ProviderName = "Galaxy", + Group = "G", + TagName = "T", + State = MxAlarmStateKind.UnackAlm, + }, + }); + Assert.Equal(0, queue.Count); + } + + private sealed class FakeAlarmConsumer : IMxAccessAlarmConsumer + { + public event EventHandler? AlarmTransitionEmitted; + + public string? LastSubscription { get; private set; } + public Guid LastAckGuid { get; private set; } + public string? LastAckComment { get; private set; } + public string? LastAckOperatorName { get; private set; } + public string? LastAckOperatorNode { get; private set; } + public string? LastAckOperatorDomain { get; private set; } + public string? LastAckOperatorFullName { get; private set; } + public int AcknowledgeReturn { get; set; } + public IReadOnlyList SnapshotResult { get; set; } = + Array.Empty(); + public bool Disposed { get; private set; } + + public void RaiseTransition(MxAlarmTransitionEvent transition) + { + AlarmTransitionEmitted?.Invoke(this, transition); + } + + public void Subscribe(string subscription) + { + LastSubscription = subscription; + } + + public int AcknowledgeByGuid( + Guid alarmGuid, + string ackComment, + string ackOperatorName, + string ackOperatorNode, + string ackOperatorDomain, + string ackOperatorFullName) + { + LastAckGuid = alarmGuid; + LastAckComment = ackComment; + LastAckOperatorName = ackOperatorName; + LastAckOperatorNode = ackOperatorNode; + LastAckOperatorDomain = ackOperatorDomain; + LastAckOperatorFullName = ackOperatorFullName; + return AcknowledgeReturn; + } + + public IReadOnlyList SnapshotActiveAlarms() + { + return SnapshotResult; + } + + public void Dispose() + { + Disposed = true; + } + } +} diff --git a/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs b/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs new file mode 100644 index 0000000..0614531 --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs @@ -0,0 +1,190 @@ +using System; +using System.Collections.Generic; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Worker.MxAccess; + +/// +/// In-process dispatcher that owns the lifetime of an +/// + +/// pair, and wires the consumer's AlarmTransitionEmitted stream +/// onto the sink's EnqueueTransition path so transitions land on +/// the worker's as proto +/// messages ready for IPC dispatch. +/// +/// +/// +/// This is the in-process slice of A.3 — it proves the +/// consumer→sink→queue pipeline end-to-end without touching the +/// worker's IPC command framing. The companion follow-up PR adds +/// SubscribeAlarmsCommand / AcknowledgeAlarmCommand / +/// QueryActiveAlarmsCommand proto entries plus the gateway- +/// side WorkerAlarmRpcDispatcher that issues them. +/// +/// +/// Threading: polls on a +/// thread today; production +/// hosting should marshal the consumer onto the worker's STA via +/// StaRuntime.InvokeAsync. The dispatcher itself is purely +/// a pass-through, so it inherits whatever thread the consumer's +/// event handler fires on. Fan-out into EnqueueTransition +/// uses which is +/// thread-safe. +/// +/// +public sealed class AlarmDispatcher : IDisposable +{ + private readonly IMxAccessAlarmConsumer consumer; + private readonly MxAccessAlarmEventSink sink; + private readonly string sessionId; + private readonly EventHandler handler; + private bool disposed; + + public AlarmDispatcher( + IMxAccessAlarmConsumer consumer, + MxAccessAlarmEventSink sink, + string sessionId) + { + this.consumer = consumer ?? throw new ArgumentNullException(nameof(consumer)); + this.sink = sink ?? throw new ArgumentNullException(nameof(sink)); + this.sessionId = sessionId ?? string.Empty; + // Sink.Attach is the seam that propagates the session id onto the + // proto SessionId field of every emitted MxEvent. Pass the consumer + // as the "associated COM object" — sink ignores the object reference + // for the alarm path, but the existing IMxAccessEventSink contract + // requires a non-null first arg. + this.sink.Attach(this.consumer, this.sessionId); + this.handler = OnTransition; + consumer.AlarmTransitionEmitted += handler; + } + + /// + /// Begin polling the configured AVEVA alarm provider for + /// transitions. The supplied subscription expression follows the + /// canonical \\<machine>\Galaxy!<area> format. + /// + public void Subscribe(string subscription) + { + if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher)); + consumer.Subscribe(subscription); + } + + /// + /// Forward an AcknowledgeAlarm request to the underlying + /// consumer's AlarmAckByGUID. Returns the AVEVA-native + /// status code (0 = success). + /// + public int Acknowledge( + Guid alarmGuid, + string ackComment, + string ackOperatorName, + string ackOperatorNode, + string ackOperatorDomain, + string ackOperatorFullName) + { + if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher)); + return consumer.AcknowledgeByGuid( + alarmGuid, + ackComment, + ackOperatorName, + ackOperatorNode, + ackOperatorDomain, + ackOperatorFullName); + } + + /// + /// Snapshot the currently-active alarm set as + /// protos for the + /// QueryActiveAlarms RPC's ConditionRefresh stream. + /// + public IReadOnlyList SnapshotActiveAlarms() + { + if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher)); + IReadOnlyList records = consumer.SnapshotActiveAlarms(); + if (records.Count == 0) return Array.Empty(); + List snapshots = new List(records.Count); + foreach (MxAlarmSnapshotRecord record in records) + { + snapshots.Add(MapToSnapshot(record)); + } + return snapshots; + } + + private void OnTransition(object? sender, MxAlarmTransitionEvent transition) + { + if (disposed) return; + if (transition is null) return; + + MxAlarmSnapshotRecord record = transition.Record; + AlarmTransitionKind kind = AlarmRecordTransitionMapper.MapTransition( + transition.PreviousState, record.State); + if (kind == AlarmTransitionKind.Unspecified) return; + + string fullReference = AlarmRecordTransitionMapper.ComposeFullReference( + record.ProviderName, record.Group, record.TagName); + + sink.EnqueueTransition( + alarmFullReference: fullReference, + sourceObjectReference: record.TagName, + alarmTypeName: record.Type, + transitionKind: kind, + severity: record.Priority, + originalRaiseTimestampUtc: null, + transitionTimestampUtc: record.TransitionTimestampUtc, + operatorUser: record.OperatorName, + operatorComment: record.AlarmComment, + category: record.Group, + description: string.Empty); + } + + private static ActiveAlarmSnapshot MapToSnapshot(MxAlarmSnapshotRecord record) + { + ActiveAlarmSnapshot snapshot = new ActiveAlarmSnapshot + { + AlarmFullReference = AlarmRecordTransitionMapper.ComposeFullReference( + record.ProviderName, record.Group, record.TagName), + SourceObjectReference = record.TagName, + AlarmTypeName = record.Type, + CurrentState = MapConditionState(record.State), + Severity = record.Priority, + OperatorUser = record.OperatorName, + OperatorComment = record.AlarmComment, + Category = record.Group, + Description = string.Empty, + }; + if (record.TransitionTimestampUtc != DateTime.MinValue) + { + snapshot.LastTransitionTimestamp = + Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime( + DateTime.SpecifyKind(record.TransitionTimestampUtc, DateTimeKind.Utc)); + } + return snapshot; + } + + private static AlarmConditionState MapConditionState(MxAlarmStateKind state) + { + // The proto's AlarmConditionState only distinguishes Active / + // ActiveAcked / Inactive — both Rtn states collapse to Inactive + // (the ack-vs-unack distinction on a cleared alarm is not exposed + // through OPC UA's Part 9 condition state model anyway). + return state switch + { + MxAlarmStateKind.UnackAlm => AlarmConditionState.Active, + MxAlarmStateKind.AckAlm => AlarmConditionState.ActiveAcked, + MxAlarmStateKind.UnackRtn => AlarmConditionState.Inactive, + MxAlarmStateKind.AckRtn => AlarmConditionState.Inactive, + _ => AlarmConditionState.Unspecified, + }; + } + + public string SessionId => sessionId; + + public void Dispose() + { + if (disposed) return; + disposed = true; + try { consumer.AlarmTransitionEmitted -= handler; } catch { /* swallow */ } + try { sink.Detach(); } catch { /* swallow */ } + try { consumer.Dispose(); } catch { /* swallow */ } + } +}