A.3 (in-process slice): AlarmDispatcher wires consumer events onto event queue

Adds the in-process plumbing that connects WnWrapAlarmConsumer's
AlarmTransitionEmitted stream to the worker's MxAccessEventQueue via
MxAccessAlarmEventSink. With this change a transition raised by the
consumer lands as an OnAlarmTransitionEvent proto on the queue,
SessionId attached, ready for IPC dispatch.

Mapping: provider!group.tag → AlarmFullReference, tag → SourceObjectReference,
priority → severity, wnwrap STATE → AlarmConditionState (Active /
ActiveAcked / Inactive — wnwrap's ack-vs-unack-on-cleared distinction
collapses since OPC UA Part 9 doesn't model it). State delta drives
AlarmTransitionKind via the existing AlarmRecordTransitionMapper table.

Holding off on the proto IPC additions (SubscribeAlarms /
AcknowledgeAlarm / QueryActiveAlarms commands + WorkerAlarmRpcDispatcher)
for a follow-up — those touch every layer of the worker IPC and warrant
their own PR. This slice proves the consumer→sink→queue pipeline
end-to-end with unit tests and clears the path for the proto additions
to plug in cleanly.

Tests: 10 new unit tests cover field-by-field mapping, the
"unchanged-state-doesn't-emit" filter, the state→transition kind table,
Subscribe / Acknowledge passthrough, SnapshotActiveAlarms → proto
ActiveAlarmSnapshot mapping, and Dispose detaches the handler. All
passing; total worker test count 172/3 skip / 1 pre-existing structure
fail (untouched).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-01 09:52:35 -04:00
parent f711a55be4
commit 82eb0ad569
2 changed files with 478 additions and 0 deletions
@@ -0,0 +1,288 @@
using System;
using System.Collections.Generic;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
namespace MxGateway.Worker.Tests.MxAccess;
/// <summary>
/// Unit tests for the in-process A.3 dispatcher: prove that
/// <see cref="IMxAccessAlarmConsumer.AlarmTransitionEmitted"/> events
/// fan out to the worker's <see cref="MxAccessEventQueue"/> as proto
/// <see cref="OnAlarmTransitionEvent"/> messages with correctly mapped
/// fields. The fake consumer below stands in for the wnwrap-backed
/// production implementation so this exercise needs no AVEVA install.
/// </summary>
public sealed class AlarmDispatcherTests
{
private const string SessionId = "session-001";
[Fact]
public void TransitionEvent_lands_in_queue_with_mapped_fields()
{
FakeAlarmConsumer consumer = new FakeAlarmConsumer();
MxAccessEventQueue queue = new MxAccessEventQueue();
MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(queue, new MxAccessEventMapper());
using AlarmDispatcher dispatcher = new AlarmDispatcher(consumer, sink, SessionId);
DateTime ts = new DateTime(2026, 5, 1, 17, 26, 14, 709, DateTimeKind.Utc);
consumer.RaiseTransition(new MxAlarmTransitionEvent
{
PreviousState = MxAlarmStateKind.Unspecified,
Record = new MxAlarmSnapshotRecord
{
AlarmGuid = Guid.NewGuid(),
ProviderName = "Galaxy",
Group = "TestArea",
TagName = "TestMachine_001.TestAlarm001",
Type = "DSC",
Priority = 500,
State = MxAlarmStateKind.UnackAlm,
TransitionTimestampUtc = ts,
AlarmComment = "Test alarm #1",
},
});
Assert.Equal(1, queue.Count);
Assert.True(queue.TryDequeue(out WorkerEvent? workerEvent));
Assert.NotNull(workerEvent);
MxEvent mxEvent = workerEvent!.Event;
Assert.Equal(MxEventFamily.OnAlarmTransition, mxEvent.Family);
Assert.Equal(SessionId, mxEvent.SessionId);
OnAlarmTransitionEvent body = mxEvent.OnAlarmTransition;
Assert.NotNull(body);
Assert.Equal("Galaxy!TestArea.TestMachine_001.TestAlarm001", body.AlarmFullReference);
Assert.Equal("TestMachine_001.TestAlarm001", body.SourceObjectReference);
Assert.Equal("DSC", body.AlarmTypeName);
Assert.Equal(AlarmTransitionKind.Raise, body.TransitionKind);
Assert.Equal(500, body.Severity);
Assert.Equal("Test alarm #1", body.OperatorComment);
Assert.Equal("TestArea", body.Category);
Assert.NotNull(body.TransitionTimestamp);
Assert.Equal(ts, body.TransitionTimestamp.ToDateTime());
}
[Fact]
public void Consecutive_unchanged_state_does_not_emit_a_transition()
{
// Mapper.MapTransition returns Unspecified when the state didn't
// change; the dispatcher should drop the event before queueing.
FakeAlarmConsumer consumer = new FakeAlarmConsumer();
MxAccessEventQueue queue = new MxAccessEventQueue();
MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(queue, new MxAccessEventMapper());
using AlarmDispatcher dispatcher = new AlarmDispatcher(consumer, sink, SessionId);
consumer.RaiseTransition(new MxAlarmTransitionEvent
{
PreviousState = MxAlarmStateKind.UnackAlm,
Record = new MxAlarmSnapshotRecord
{
AlarmGuid = Guid.NewGuid(),
ProviderName = "Galaxy",
Group = "X",
TagName = "Y",
State = MxAlarmStateKind.UnackAlm,
},
});
Assert.Equal(0, queue.Count);
}
[Theory]
[InlineData(MxAlarmStateKind.Unspecified, MxAlarmStateKind.UnackAlm, AlarmTransitionKind.Raise)]
[InlineData(MxAlarmStateKind.UnackAlm, MxAlarmStateKind.AckAlm, AlarmTransitionKind.Acknowledge)]
[InlineData(MxAlarmStateKind.UnackAlm, MxAlarmStateKind.UnackRtn, AlarmTransitionKind.Clear)]
[InlineData(MxAlarmStateKind.UnackRtn, MxAlarmStateKind.UnackAlm, AlarmTransitionKind.Raise)]
public void Transition_kind_follows_state_table(
MxAlarmStateKind previous,
MxAlarmStateKind current,
AlarmTransitionKind expected)
{
FakeAlarmConsumer consumer = new FakeAlarmConsumer();
MxAccessEventQueue queue = new MxAccessEventQueue();
MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(queue, new MxAccessEventMapper());
using AlarmDispatcher dispatcher = new AlarmDispatcher(consumer, sink, SessionId);
consumer.RaiseTransition(new MxAlarmTransitionEvent
{
PreviousState = previous,
Record = new MxAlarmSnapshotRecord
{
AlarmGuid = Guid.NewGuid(),
ProviderName = "Galaxy",
Group = "G",
TagName = "T",
State = current,
},
});
Assert.Equal(1, queue.Count);
queue.TryDequeue(out WorkerEvent? evt);
Assert.Equal(expected, evt!.Event.OnAlarmTransition.TransitionKind);
}
[Fact]
public void Subscribe_forwards_to_consumer()
{
FakeAlarmConsumer consumer = new FakeAlarmConsumer();
using AlarmDispatcher dispatcher = new AlarmDispatcher(
consumer,
new MxAccessAlarmEventSink(new MxAccessEventQueue(), new MxAccessEventMapper()),
SessionId);
dispatcher.Subscribe(@"\\HOST\Galaxy!Area1");
Assert.Equal(@"\\HOST\Galaxy!Area1", consumer.LastSubscription);
}
[Fact]
public void Acknowledge_forwards_to_consumer_with_full_operator_identity()
{
FakeAlarmConsumer consumer = new FakeAlarmConsumer();
consumer.AcknowledgeReturn = 0;
using AlarmDispatcher dispatcher = new AlarmDispatcher(
consumer,
new MxAccessAlarmEventSink(new MxAccessEventQueue(), new MxAccessEventMapper()),
SessionId);
Guid guid = Guid.NewGuid();
int rc = dispatcher.Acknowledge(
guid, "Acked", "alice", "WS01", "CORP", "Alice Smith");
Assert.Equal(0, rc);
Assert.Equal(guid, consumer.LastAckGuid);
Assert.Equal("Acked", consumer.LastAckComment);
Assert.Equal("alice", consumer.LastAckOperatorName);
Assert.Equal("WS01", consumer.LastAckOperatorNode);
Assert.Equal("CORP", consumer.LastAckOperatorDomain);
Assert.Equal("Alice Smith", consumer.LastAckOperatorFullName);
}
[Fact]
public void SnapshotActiveAlarms_maps_records_to_protos()
{
FakeAlarmConsumer consumer = new FakeAlarmConsumer();
DateTime ts = new DateTime(2026, 5, 1, 17, 26, 14, 709, DateTimeKind.Utc);
consumer.SnapshotResult = new[]
{
new MxAlarmSnapshotRecord
{
AlarmGuid = Guid.NewGuid(),
ProviderName = "Galaxy",
Group = "TestArea",
TagName = "Tag1",
Type = "DSC",
Priority = 500,
State = MxAlarmStateKind.UnackAlm,
TransitionTimestampUtc = ts,
AlarmComment = "x",
},
new MxAlarmSnapshotRecord
{
AlarmGuid = Guid.NewGuid(),
ProviderName = "Galaxy",
Group = "TestArea",
TagName = "Tag2",
Type = "ANL",
Priority = 100,
State = MxAlarmStateKind.AckAlm,
TransitionTimestampUtc = ts,
},
};
using AlarmDispatcher dispatcher = new AlarmDispatcher(
consumer,
new MxAccessAlarmEventSink(new MxAccessEventQueue(), new MxAccessEventMapper()),
SessionId);
IReadOnlyList<ActiveAlarmSnapshot> snapshots = dispatcher.SnapshotActiveAlarms();
Assert.Equal(2, snapshots.Count);
Assert.Equal("Galaxy!TestArea.Tag1", snapshots[0].AlarmFullReference);
Assert.Equal(AlarmConditionState.Active, snapshots[0].CurrentState);
Assert.Equal(500, snapshots[0].Severity);
Assert.Equal(ts, snapshots[0].LastTransitionTimestamp.ToDateTime());
Assert.Equal("Galaxy!TestArea.Tag2", snapshots[1].AlarmFullReference);
Assert.Equal(AlarmConditionState.ActiveAcked, snapshots[1].CurrentState);
}
[Fact]
public void Dispose_unsubscribes_handler_and_disposes_consumer()
{
FakeAlarmConsumer consumer = new FakeAlarmConsumer();
MxAccessEventQueue queue = new MxAccessEventQueue();
MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(queue, new MxAccessEventMapper());
AlarmDispatcher dispatcher = new AlarmDispatcher(consumer, sink, SessionId);
dispatcher.Dispose();
Assert.True(consumer.Disposed);
consumer.RaiseTransition(new MxAlarmTransitionEvent
{
PreviousState = MxAlarmStateKind.Unspecified,
Record = new MxAlarmSnapshotRecord
{
AlarmGuid = Guid.NewGuid(),
ProviderName = "Galaxy",
Group = "G",
TagName = "T",
State = MxAlarmStateKind.UnackAlm,
},
});
Assert.Equal(0, queue.Count);
}
private sealed class FakeAlarmConsumer : IMxAccessAlarmConsumer
{
public event EventHandler<MxAlarmTransitionEvent>? AlarmTransitionEmitted;
public string? LastSubscription { get; private set; }
public Guid LastAckGuid { get; private set; }
public string? LastAckComment { get; private set; }
public string? LastAckOperatorName { get; private set; }
public string? LastAckOperatorNode { get; private set; }
public string? LastAckOperatorDomain { get; private set; }
public string? LastAckOperatorFullName { get; private set; }
public int AcknowledgeReturn { get; set; }
public IReadOnlyList<MxAlarmSnapshotRecord> SnapshotResult { get; set; } =
Array.Empty<MxAlarmSnapshotRecord>();
public bool Disposed { get; private set; }
public void RaiseTransition(MxAlarmTransitionEvent transition)
{
AlarmTransitionEmitted?.Invoke(this, transition);
}
public void Subscribe(string subscription)
{
LastSubscription = subscription;
}
public int AcknowledgeByGuid(
Guid alarmGuid,
string ackComment,
string ackOperatorName,
string ackOperatorNode,
string ackOperatorDomain,
string ackOperatorFullName)
{
LastAckGuid = alarmGuid;
LastAckComment = ackComment;
LastAckOperatorName = ackOperatorName;
LastAckOperatorNode = ackOperatorNode;
LastAckOperatorDomain = ackOperatorDomain;
LastAckOperatorFullName = ackOperatorFullName;
return AcknowledgeReturn;
}
public IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms()
{
return SnapshotResult;
}
public void Dispose()
{
Disposed = true;
}
}
}
@@ -0,0 +1,190 @@
using System;
using System.Collections.Generic;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
/// <summary>
/// In-process dispatcher that owns the lifetime of an
/// <see cref="IMxAccessAlarmConsumer"/> + <see cref="MxAccessAlarmEventSink"/>
/// pair, and wires the consumer's <c>AlarmTransitionEmitted</c> stream
/// onto the sink's <c>EnqueueTransition</c> path so transitions land on
/// the worker's <see cref="MxAccessEventQueue"/> as proto
/// <see cref="OnAlarmTransitionEvent"/> messages ready for IPC dispatch.
/// </summary>
/// <remarks>
/// <para>
/// This is the in-process slice of A.3 — it proves the
/// consumer→sink→queue pipeline end-to-end without touching the
/// worker's IPC command framing. The companion follow-up PR adds
/// <c>SubscribeAlarmsCommand</c> / <c>AcknowledgeAlarmCommand</c> /
/// <c>QueryActiveAlarmsCommand</c> proto entries plus the gateway-
/// side <c>WorkerAlarmRpcDispatcher</c> that issues them.
/// </para>
/// <para>
/// Threading: <see cref="WnWrapAlarmConsumer"/> polls on a
/// <see cref="System.Threading.Timer"/> thread today; production
/// hosting should marshal the consumer onto the worker's STA via
/// <c>StaRuntime.InvokeAsync</c>. The dispatcher itself is purely
/// a pass-through, so it inherits whatever thread the consumer's
/// event handler fires on. Fan-out into <c>EnqueueTransition</c>
/// uses <see cref="MxAccessEventQueue.Enqueue"/> which is
/// thread-safe.
/// </para>
/// </remarks>
public sealed class AlarmDispatcher : IDisposable
{
private readonly IMxAccessAlarmConsumer consumer;
private readonly MxAccessAlarmEventSink sink;
private readonly string sessionId;
private readonly EventHandler<MxAlarmTransitionEvent> handler;
private bool disposed;
public AlarmDispatcher(
IMxAccessAlarmConsumer consumer,
MxAccessAlarmEventSink sink,
string sessionId)
{
this.consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
this.sink = sink ?? throw new ArgumentNullException(nameof(sink));
this.sessionId = sessionId ?? string.Empty;
// Sink.Attach is the seam that propagates the session id onto the
// proto SessionId field of every emitted MxEvent. Pass the consumer
// as the "associated COM object" — sink ignores the object reference
// for the alarm path, but the existing IMxAccessEventSink contract
// requires a non-null first arg.
this.sink.Attach(this.consumer, this.sessionId);
this.handler = OnTransition;
consumer.AlarmTransitionEmitted += handler;
}
/// <summary>
/// Begin polling the configured AVEVA alarm provider for
/// transitions. The supplied subscription expression follows the
/// canonical <c>\\&lt;machine&gt;\Galaxy!&lt;area&gt;</c> format.
/// </summary>
public void Subscribe(string subscription)
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
consumer.Subscribe(subscription);
}
/// <summary>
/// Forward an <c>AcknowledgeAlarm</c> request to the underlying
/// consumer's <c>AlarmAckByGUID</c>. Returns the AVEVA-native
/// status code (0 = success).
/// </summary>
public int Acknowledge(
Guid alarmGuid,
string ackComment,
string ackOperatorName,
string ackOperatorNode,
string ackOperatorDomain,
string ackOperatorFullName)
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
return consumer.AcknowledgeByGuid(
alarmGuid,
ackComment,
ackOperatorName,
ackOperatorNode,
ackOperatorDomain,
ackOperatorFullName);
}
/// <summary>
/// Snapshot the currently-active alarm set as
/// <see cref="ActiveAlarmSnapshot"/> protos for the
/// <c>QueryActiveAlarms</c> RPC's ConditionRefresh stream.
/// </summary>
public IReadOnlyList<ActiveAlarmSnapshot> SnapshotActiveAlarms()
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
IReadOnlyList<MxAlarmSnapshotRecord> records = consumer.SnapshotActiveAlarms();
if (records.Count == 0) return Array.Empty<ActiveAlarmSnapshot>();
List<ActiveAlarmSnapshot> snapshots = new List<ActiveAlarmSnapshot>(records.Count);
foreach (MxAlarmSnapshotRecord record in records)
{
snapshots.Add(MapToSnapshot(record));
}
return snapshots;
}
private void OnTransition(object? sender, MxAlarmTransitionEvent transition)
{
if (disposed) return;
if (transition is null) return;
MxAlarmSnapshotRecord record = transition.Record;
AlarmTransitionKind kind = AlarmRecordTransitionMapper.MapTransition(
transition.PreviousState, record.State);
if (kind == AlarmTransitionKind.Unspecified) return;
string fullReference = AlarmRecordTransitionMapper.ComposeFullReference(
record.ProviderName, record.Group, record.TagName);
sink.EnqueueTransition(
alarmFullReference: fullReference,
sourceObjectReference: record.TagName,
alarmTypeName: record.Type,
transitionKind: kind,
severity: record.Priority,
originalRaiseTimestampUtc: null,
transitionTimestampUtc: record.TransitionTimestampUtc,
operatorUser: record.OperatorName,
operatorComment: record.AlarmComment,
category: record.Group,
description: string.Empty);
}
private static ActiveAlarmSnapshot MapToSnapshot(MxAlarmSnapshotRecord record)
{
ActiveAlarmSnapshot snapshot = new ActiveAlarmSnapshot
{
AlarmFullReference = AlarmRecordTransitionMapper.ComposeFullReference(
record.ProviderName, record.Group, record.TagName),
SourceObjectReference = record.TagName,
AlarmTypeName = record.Type,
CurrentState = MapConditionState(record.State),
Severity = record.Priority,
OperatorUser = record.OperatorName,
OperatorComment = record.AlarmComment,
Category = record.Group,
Description = string.Empty,
};
if (record.TransitionTimestampUtc != DateTime.MinValue)
{
snapshot.LastTransitionTimestamp =
Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(
DateTime.SpecifyKind(record.TransitionTimestampUtc, DateTimeKind.Utc));
}
return snapshot;
}
private static AlarmConditionState MapConditionState(MxAlarmStateKind state)
{
// The proto's AlarmConditionState only distinguishes Active /
// ActiveAcked / Inactive — both Rtn states collapse to Inactive
// (the ack-vs-unack distinction on a cleared alarm is not exposed
// through OPC UA's Part 9 condition state model anyway).
return state switch
{
MxAlarmStateKind.UnackAlm => AlarmConditionState.Active,
MxAlarmStateKind.AckAlm => AlarmConditionState.ActiveAcked,
MxAlarmStateKind.UnackRtn => AlarmConditionState.Inactive,
MxAlarmStateKind.AckRtn => AlarmConditionState.Inactive,
_ => AlarmConditionState.Unspecified,
};
}
public string SessionId => sessionId;
public void Dispose()
{
if (disposed) return;
disposed = true;
try { consumer.AlarmTransitionEmitted -= handler; } catch { /* swallow */ }
try { sink.Detach(); } catch { /* swallow */ }
try { consumer.Dispose(); } catch { /* swallow */ }
}
}