A.3 (in-process slice): AlarmDispatcher wires consumer events onto event queue
Adds the in-process plumbing that connects WnWrapAlarmConsumer's AlarmTransitionEmitted stream to the worker's MxAccessEventQueue via MxAccessAlarmEventSink. With this change a transition raised by the consumer lands as an OnAlarmTransitionEvent proto on the queue, SessionId attached, ready for IPC dispatch. Mapping: provider!group.tag → AlarmFullReference, tag → SourceObjectReference, priority → severity, wnwrap STATE → AlarmConditionState (Active / ActiveAcked / Inactive — wnwrap's ack-vs-unack-on-cleared distinction collapses since OPC UA Part 9 doesn't model it). State delta drives AlarmTransitionKind via the existing AlarmRecordTransitionMapper table. Holding off on the proto IPC additions (SubscribeAlarms / AcknowledgeAlarm / QueryActiveAlarms commands + WorkerAlarmRpcDispatcher) for a follow-up — those touch every layer of the worker IPC and warrant their own PR. This slice proves the consumer→sink→queue pipeline end-to-end with unit tests and clears the path for the proto additions to plug in cleanly. Tests: 10 new unit tests cover field-by-field mapping, the "unchanged-state-doesn't-emit" filter, the state→transition kind table, Subscribe / Acknowledge passthrough, SnapshotActiveAlarms → proto ActiveAlarmSnapshot mapping, and Dispose detaches the handler. All passing; total worker test count 172/3 skip / 1 pre-existing structure fail (untouched). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,190 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Worker.MxAccess;
|
||||
|
||||
/// <summary>
|
||||
/// In-process dispatcher that owns the lifetime of an
|
||||
/// <see cref="IMxAccessAlarmConsumer"/> + <see cref="MxAccessAlarmEventSink"/>
|
||||
/// pair, and wires the consumer's <c>AlarmTransitionEmitted</c> stream
|
||||
/// onto the sink's <c>EnqueueTransition</c> path so transitions land on
|
||||
/// the worker's <see cref="MxAccessEventQueue"/> as proto
|
||||
/// <see cref="OnAlarmTransitionEvent"/> messages ready for IPC dispatch.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// This is the in-process slice of A.3 — it proves the
|
||||
/// consumer→sink→queue pipeline end-to-end without touching the
|
||||
/// worker's IPC command framing. The companion follow-up PR adds
|
||||
/// <c>SubscribeAlarmsCommand</c> / <c>AcknowledgeAlarmCommand</c> /
|
||||
/// <c>QueryActiveAlarmsCommand</c> proto entries plus the gateway-
|
||||
/// side <c>WorkerAlarmRpcDispatcher</c> that issues them.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Threading: <see cref="WnWrapAlarmConsumer"/> polls on a
|
||||
/// <see cref="System.Threading.Timer"/> thread today; production
|
||||
/// hosting should marshal the consumer onto the worker's STA via
|
||||
/// <c>StaRuntime.InvokeAsync</c>. The dispatcher itself is purely
|
||||
/// a pass-through, so it inherits whatever thread the consumer's
|
||||
/// event handler fires on. Fan-out into <c>EnqueueTransition</c>
|
||||
/// uses <see cref="MxAccessEventQueue.Enqueue"/> which is
|
||||
/// thread-safe.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class AlarmDispatcher : IDisposable
|
||||
{
|
||||
private readonly IMxAccessAlarmConsumer consumer;
|
||||
private readonly MxAccessAlarmEventSink sink;
|
||||
private readonly string sessionId;
|
||||
private readonly EventHandler<MxAlarmTransitionEvent> handler;
|
||||
private bool disposed;
|
||||
|
||||
public AlarmDispatcher(
|
||||
IMxAccessAlarmConsumer consumer,
|
||||
MxAccessAlarmEventSink sink,
|
||||
string sessionId)
|
||||
{
|
||||
this.consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
|
||||
this.sink = sink ?? throw new ArgumentNullException(nameof(sink));
|
||||
this.sessionId = sessionId ?? string.Empty;
|
||||
// Sink.Attach is the seam that propagates the session id onto the
|
||||
// proto SessionId field of every emitted MxEvent. Pass the consumer
|
||||
// as the "associated COM object" — sink ignores the object reference
|
||||
// for the alarm path, but the existing IMxAccessEventSink contract
|
||||
// requires a non-null first arg.
|
||||
this.sink.Attach(this.consumer, this.sessionId);
|
||||
this.handler = OnTransition;
|
||||
consumer.AlarmTransitionEmitted += handler;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Begin polling the configured AVEVA alarm provider for
|
||||
/// transitions. The supplied subscription expression follows the
|
||||
/// canonical <c>\\<machine>\Galaxy!<area></c> format.
|
||||
/// </summary>
|
||||
public void Subscribe(string subscription)
|
||||
{
|
||||
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
|
||||
consumer.Subscribe(subscription);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Forward an <c>AcknowledgeAlarm</c> request to the underlying
|
||||
/// consumer's <c>AlarmAckByGUID</c>. Returns the AVEVA-native
|
||||
/// status code (0 = success).
|
||||
/// </summary>
|
||||
public int Acknowledge(
|
||||
Guid alarmGuid,
|
||||
string ackComment,
|
||||
string ackOperatorName,
|
||||
string ackOperatorNode,
|
||||
string ackOperatorDomain,
|
||||
string ackOperatorFullName)
|
||||
{
|
||||
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
|
||||
return consumer.AcknowledgeByGuid(
|
||||
alarmGuid,
|
||||
ackComment,
|
||||
ackOperatorName,
|
||||
ackOperatorNode,
|
||||
ackOperatorDomain,
|
||||
ackOperatorFullName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Snapshot the currently-active alarm set as
|
||||
/// <see cref="ActiveAlarmSnapshot"/> protos for the
|
||||
/// <c>QueryActiveAlarms</c> RPC's ConditionRefresh stream.
|
||||
/// </summary>
|
||||
public IReadOnlyList<ActiveAlarmSnapshot> SnapshotActiveAlarms()
|
||||
{
|
||||
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
|
||||
IReadOnlyList<MxAlarmSnapshotRecord> records = consumer.SnapshotActiveAlarms();
|
||||
if (records.Count == 0) return Array.Empty<ActiveAlarmSnapshot>();
|
||||
List<ActiveAlarmSnapshot> snapshots = new List<ActiveAlarmSnapshot>(records.Count);
|
||||
foreach (MxAlarmSnapshotRecord record in records)
|
||||
{
|
||||
snapshots.Add(MapToSnapshot(record));
|
||||
}
|
||||
return snapshots;
|
||||
}
|
||||
|
||||
private void OnTransition(object? sender, MxAlarmTransitionEvent transition)
|
||||
{
|
||||
if (disposed) return;
|
||||
if (transition is null) return;
|
||||
|
||||
MxAlarmSnapshotRecord record = transition.Record;
|
||||
AlarmTransitionKind kind = AlarmRecordTransitionMapper.MapTransition(
|
||||
transition.PreviousState, record.State);
|
||||
if (kind == AlarmTransitionKind.Unspecified) return;
|
||||
|
||||
string fullReference = AlarmRecordTransitionMapper.ComposeFullReference(
|
||||
record.ProviderName, record.Group, record.TagName);
|
||||
|
||||
sink.EnqueueTransition(
|
||||
alarmFullReference: fullReference,
|
||||
sourceObjectReference: record.TagName,
|
||||
alarmTypeName: record.Type,
|
||||
transitionKind: kind,
|
||||
severity: record.Priority,
|
||||
originalRaiseTimestampUtc: null,
|
||||
transitionTimestampUtc: record.TransitionTimestampUtc,
|
||||
operatorUser: record.OperatorName,
|
||||
operatorComment: record.AlarmComment,
|
||||
category: record.Group,
|
||||
description: string.Empty);
|
||||
}
|
||||
|
||||
private static ActiveAlarmSnapshot MapToSnapshot(MxAlarmSnapshotRecord record)
|
||||
{
|
||||
ActiveAlarmSnapshot snapshot = new ActiveAlarmSnapshot
|
||||
{
|
||||
AlarmFullReference = AlarmRecordTransitionMapper.ComposeFullReference(
|
||||
record.ProviderName, record.Group, record.TagName),
|
||||
SourceObjectReference = record.TagName,
|
||||
AlarmTypeName = record.Type,
|
||||
CurrentState = MapConditionState(record.State),
|
||||
Severity = record.Priority,
|
||||
OperatorUser = record.OperatorName,
|
||||
OperatorComment = record.AlarmComment,
|
||||
Category = record.Group,
|
||||
Description = string.Empty,
|
||||
};
|
||||
if (record.TransitionTimestampUtc != DateTime.MinValue)
|
||||
{
|
||||
snapshot.LastTransitionTimestamp =
|
||||
Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(
|
||||
DateTime.SpecifyKind(record.TransitionTimestampUtc, DateTimeKind.Utc));
|
||||
}
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
private static AlarmConditionState MapConditionState(MxAlarmStateKind state)
|
||||
{
|
||||
// The proto's AlarmConditionState only distinguishes Active /
|
||||
// ActiveAcked / Inactive — both Rtn states collapse to Inactive
|
||||
// (the ack-vs-unack distinction on a cleared alarm is not exposed
|
||||
// through OPC UA's Part 9 condition state model anyway).
|
||||
return state switch
|
||||
{
|
||||
MxAlarmStateKind.UnackAlm => AlarmConditionState.Active,
|
||||
MxAlarmStateKind.AckAlm => AlarmConditionState.ActiveAcked,
|
||||
MxAlarmStateKind.UnackRtn => AlarmConditionState.Inactive,
|
||||
MxAlarmStateKind.AckRtn => AlarmConditionState.Inactive,
|
||||
_ => AlarmConditionState.Unspecified,
|
||||
};
|
||||
}
|
||||
|
||||
public string SessionId => sessionId;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (disposed) return;
|
||||
disposed = true;
|
||||
try { consumer.AlarmTransitionEmitted -= handler; } catch { /* swallow */ }
|
||||
try { sink.Detach(); } catch { /* swallow */ }
|
||||
try { consumer.Dispose(); } catch { /* swallow */ }
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user