Files
mxaccessgw/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs
T
Joseph Doherty 4e02927f01 A.3 (alarm-ack-by-name): public AcknowledgeAlarm now accepts Provider!Group.Tag references
Closes the gap where the public AcknowledgeAlarm RPC required canonical
GUIDs but OnAlarmTransitionEvent.AlarmFullReference is "Provider!Group.Tag".
Adds an AVEVA AlarmAckByName path that wraps wwAlarmConsumerClass.AlarmAckByName
so callers can ack with the natural reference.

Proto:
- New MxCommandKind.AcknowledgeAlarmByName (=29).
- New AcknowledgeAlarmByNameCommand(alarm_name, provider_name, group_name,
  comment, operator_user/node/domain/full_name) on MxCommand oneof.
- AcknowledgeAlarmReplyPayload (existing) carries the AVEVA native
  status; reused for the by-name path.

Worker:
- IMxAccessAlarmConsumer + WnWrapAlarmConsumer + AlarmDispatcher +
  AlarmCommandHandler all gain an AcknowledgeByName(name, provider,
  group, comment, operator-identity) overload that maps to
  wwAlarmConsumerClass.AlarmAckByName.
- MxAccessCommandExecutor: new switch arm routes
  MxCommandKind.AcknowledgeAlarmByName to the handler. Empty alarm_name
  yields InvalidRequest; handler exceptions surface as MxaccessFailure.

Gateway:
- WorkerAlarmRpcDispatcher.TryParseAlarmReference: parses
  "Provider!Group.Tag" with the convention that the FIRST '!' separates
  provider, the FIRST '.' after '!' separates group; tag may contain
  more dots.
- AcknowledgeAsync now branches: GUID input → AcknowledgeAlarm command
  (existing path); reference input → AcknowledgeAlarmByName command
  (new path); neither parses → InvalidRequest with a clear diagnostic.

Tests: 13 new unit tests cover each layer end-to-end:
- WorkerAlarmRpcDispatcher.TryParseAlarmReference (3 valid + 8 invalid
  forms) including the realistic 4-component "Galaxy!TestArea.
  TestMachine_001.TestAlarm001" reference.
- WorkerAlarmRpcDispatcher.AcknowledgeAsync routes references through
  AcknowledgeAlarmByName + propagates the full operator tuple.
- Executor switch arm carries the by-name tuple and rejects empty
  alarm_name.
- AlarmDispatcher.AcknowledgeByName forwards to consumer.
- Existing fakes extended for the new overload.

Counts: server 308/0, worker 195/3 skip / 1 pre-existing structure-fail
(untouched). Solution builds clean.

End-to-end alarms-over-gateway now serves the full lmxopcua flow:
client.AcknowledgeAlarm(reference="Galaxy!TestArea.TestMachine_001.TestAlarm001",
operator_user="alice") → gateway parses → IPC AcknowledgeAlarmByName →
worker AlarmAckByName → AVEVA history. The remaining piece for full
parity is a live dev-rig smoke test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 11:17:15 -04:00

218 lines
8.6 KiB
C#

using System;
using System.Collections.Generic;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
/// <summary>
/// In-process dispatcher that owns the lifetime of an
/// <see cref="IMxAccessAlarmConsumer"/> + <see cref="MxAccessAlarmEventSink"/>
/// pair, and wires the consumer's <c>AlarmTransitionEmitted</c> stream
/// onto the sink's <c>EnqueueTransition</c> path so transitions land on
/// the worker's <see cref="MxAccessEventQueue"/> as proto
/// <see cref="OnAlarmTransitionEvent"/> messages ready for IPC dispatch.
/// </summary>
/// <remarks>
/// <para>
/// This is the in-process slice of A.3 — it proves the
/// consumer→sink→queue pipeline end-to-end without touching the
/// worker's IPC command framing. The companion follow-up PR adds
/// <c>SubscribeAlarmsCommand</c> / <c>AcknowledgeAlarmCommand</c> /
/// <c>QueryActiveAlarmsCommand</c> proto entries plus the gateway-
/// side <c>WorkerAlarmRpcDispatcher</c> that issues them.
/// </para>
/// <para>
/// Threading: <see cref="WnWrapAlarmConsumer"/> polls on a
/// <see cref="System.Threading.Timer"/> thread today; production
/// hosting should marshal the consumer onto the worker's STA via
/// <c>StaRuntime.InvokeAsync</c>. The dispatcher itself is purely
/// a pass-through, so it inherits whatever thread the consumer's
/// event handler fires on. Fan-out into <c>EnqueueTransition</c>
/// uses <see cref="MxAccessEventQueue.Enqueue"/> which is
/// thread-safe.
/// </para>
/// </remarks>
public sealed class AlarmDispatcher : IDisposable
{
private readonly IMxAccessAlarmConsumer consumer;
private readonly MxAccessAlarmEventSink sink;
private readonly string sessionId;
private readonly EventHandler<MxAlarmTransitionEvent> handler;
private bool disposed;
public AlarmDispatcher(
IMxAccessAlarmConsumer consumer,
MxAccessAlarmEventSink sink,
string sessionId)
{
this.consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
this.sink = sink ?? throw new ArgumentNullException(nameof(sink));
this.sessionId = sessionId ?? string.Empty;
// Sink.Attach is the seam that propagates the session id onto the
// proto SessionId field of every emitted MxEvent. Pass the consumer
// as the "associated COM object" — sink ignores the object reference
// for the alarm path, but the existing IMxAccessEventSink contract
// requires a non-null first arg.
this.sink.Attach(this.consumer, this.sessionId);
this.handler = OnTransition;
consumer.AlarmTransitionEmitted += handler;
}
/// <summary>
/// Begin polling the configured AVEVA alarm provider for
/// transitions. The supplied subscription expression follows the
/// canonical <c>\\&lt;machine&gt;\Galaxy!&lt;area&gt;</c> format.
/// </summary>
public void Subscribe(string subscription)
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
consumer.Subscribe(subscription);
}
/// <summary>
/// Forward an <c>AcknowledgeAlarm</c> request to the underlying
/// consumer's <c>AlarmAckByGUID</c>. Returns the AVEVA-native
/// status code (0 = success).
/// </summary>
public int Acknowledge(
Guid alarmGuid,
string ackComment,
string ackOperatorName,
string ackOperatorNode,
string ackOperatorDomain,
string ackOperatorFullName)
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
return consumer.AcknowledgeByGuid(
alarmGuid,
ackComment,
ackOperatorName,
ackOperatorNode,
ackOperatorDomain,
ackOperatorFullName);
}
/// <summary>
/// Acknowledge an alarm by its (name, provider, group) tuple.
/// Routes to the consumer's <c>AcknowledgeByName</c> path which
/// maps to <c>wwAlarmConsumerClass.AlarmAckByName</c>.
/// </summary>
public int AcknowledgeByName(
string alarmName,
string providerName,
string groupName,
string ackComment,
string ackOperatorName,
string ackOperatorNode,
string ackOperatorDomain,
string ackOperatorFullName)
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
return consumer.AcknowledgeByName(
alarmName,
providerName,
groupName,
ackComment,
ackOperatorName,
ackOperatorNode,
ackOperatorDomain,
ackOperatorFullName);
}
/// <summary>
/// Snapshot the currently-active alarm set as
/// <see cref="ActiveAlarmSnapshot"/> protos for the
/// <c>QueryActiveAlarms</c> RPC's ConditionRefresh stream.
/// </summary>
public IReadOnlyList<ActiveAlarmSnapshot> SnapshotActiveAlarms()
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmDispatcher));
IReadOnlyList<MxAlarmSnapshotRecord> records = consumer.SnapshotActiveAlarms();
if (records.Count == 0) return Array.Empty<ActiveAlarmSnapshot>();
List<ActiveAlarmSnapshot> snapshots = new List<ActiveAlarmSnapshot>(records.Count);
foreach (MxAlarmSnapshotRecord record in records)
{
snapshots.Add(MapToSnapshot(record));
}
return snapshots;
}
private void OnTransition(object? sender, MxAlarmTransitionEvent transition)
{
if (disposed) return;
if (transition is null) return;
MxAlarmSnapshotRecord record = transition.Record;
AlarmTransitionKind kind = AlarmRecordTransitionMapper.MapTransition(
transition.PreviousState, record.State);
if (kind == AlarmTransitionKind.Unspecified) return;
string fullReference = AlarmRecordTransitionMapper.ComposeFullReference(
record.ProviderName, record.Group, record.TagName);
sink.EnqueueTransition(
alarmFullReference: fullReference,
sourceObjectReference: record.TagName,
alarmTypeName: record.Type,
transitionKind: kind,
severity: record.Priority,
originalRaiseTimestampUtc: null,
transitionTimestampUtc: record.TransitionTimestampUtc,
operatorUser: record.OperatorName,
operatorComment: record.AlarmComment,
category: record.Group,
description: string.Empty);
}
private static ActiveAlarmSnapshot MapToSnapshot(MxAlarmSnapshotRecord record)
{
ActiveAlarmSnapshot snapshot = new ActiveAlarmSnapshot
{
AlarmFullReference = AlarmRecordTransitionMapper.ComposeFullReference(
record.ProviderName, record.Group, record.TagName),
SourceObjectReference = record.TagName,
AlarmTypeName = record.Type,
CurrentState = MapConditionState(record.State),
Severity = record.Priority,
OperatorUser = record.OperatorName,
OperatorComment = record.AlarmComment,
Category = record.Group,
Description = string.Empty,
};
if (record.TransitionTimestampUtc != DateTime.MinValue)
{
snapshot.LastTransitionTimestamp =
Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(
DateTime.SpecifyKind(record.TransitionTimestampUtc, DateTimeKind.Utc));
}
return snapshot;
}
private static AlarmConditionState MapConditionState(MxAlarmStateKind state)
{
// The proto's AlarmConditionState only distinguishes Active /
// ActiveAcked / Inactive — both Rtn states collapse to Inactive
// (the ack-vs-unack distinction on a cleared alarm is not exposed
// through OPC UA's Part 9 condition state model anyway).
return state switch
{
MxAlarmStateKind.UnackAlm => AlarmConditionState.Active,
MxAlarmStateKind.AckAlm => AlarmConditionState.ActiveAcked,
MxAlarmStateKind.UnackRtn => AlarmConditionState.Inactive,
MxAlarmStateKind.AckRtn => AlarmConditionState.Inactive,
_ => AlarmConditionState.Unspecified,
};
}
public string SessionId => sessionId;
public void Dispose()
{
if (disposed) return;
disposed = true;
try { consumer.AlarmTransitionEmitted -= handler; } catch { /* swallow */ }
try { sink.Detach(); } catch { /* swallow */ }
try { consumer.Dispose(); } catch { /* swallow */ }
}
}