worker(alarms): SubtagAlarmConsumer synthesizing degraded transitions; dispatcher propagates Degraded
This commit is contained in:
@@ -191,7 +191,8 @@ public sealed class AlarmDispatcher : IDisposable
|
||||
operatorUser: record.OperatorName,
|
||||
operatorComment: record.AlarmComment,
|
||||
category: record.Group,
|
||||
description: string.Empty);
|
||||
description: string.Empty,
|
||||
degraded: record.Degraded);
|
||||
}
|
||||
|
||||
private static ActiveAlarmSnapshot MapToSnapshot(MxAlarmSnapshotRecord record)
|
||||
|
||||
@@ -0,0 +1,323 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Worker.MxAccess;
|
||||
|
||||
/// <summary>
|
||||
/// Subtag-fallback implementation of <see cref="IMxAccessAlarmConsumer"/>.
|
||||
/// Where <see cref="WnWrapAlarmConsumer"/> polls the native alarmmgr
|
||||
/// (wnwrap) COM stream, this consumer advises a configured set of MXAccess
|
||||
/// alarm subtags via an <see cref="ISubtagAlarmSource"/> and synthesizes
|
||||
/// alarm transitions through a <see cref="SubtagAlarmStateMachine"/>. Every
|
||||
/// emitted record is flagged <see cref="MxAlarmSnapshotRecord.Degraded"/>
|
||||
/// and assigned a deterministic <see cref="SyntheticAlarmGuid"/> derived
|
||||
/// from the alarm's full reference, since the subtag path has no
|
||||
/// alarmmgr-supplied GUID.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Threading: like <see cref="WnWrapAlarmConsumer"/>, this consumer is
|
||||
/// driven on the worker's STA thread. <see cref="Subscribe"/> binds the
|
||||
/// source's <c>ValueChanged</c> event; the source raises that event on
|
||||
/// the same STA in production, so <see cref="AlarmTransitionEmitted"/>
|
||||
/// fires on the STA and subscribers must marshal off it themselves if
|
||||
/// they need another thread.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The subtag path is event-driven, so <see cref="PollOnce"/> is a
|
||||
/// no-op — value changes arrive through the source's advise stream
|
||||
/// rather than an explicit poll. Acknowledgment in subtag mode writes
|
||||
/// the comment to the target's writable ack-comment subtag rather than
|
||||
/// calling a native <c>AlarmAckByGUID</c> / <c>AlarmAckByName</c>.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class SubtagAlarmConsumer : IMxAccessAlarmConsumer
|
||||
{
|
||||
private readonly ISubtagAlarmSource source;
|
||||
private readonly SubtagAlarmStateMachine stateMachine;
|
||||
private readonly Dictionary<string, AlarmSubtagTarget> targetsByReference;
|
||||
private readonly Dictionary<Guid, string> referencesBySyntheticGuid;
|
||||
private readonly EventHandler<SubtagValueChange> valueChangedHandler;
|
||||
private bool subscribed;
|
||||
private bool disposed;
|
||||
|
||||
/// <summary>Fires once per synthesized alarm-state transition.</summary>
|
||||
public event EventHandler<MxAlarmTransitionEvent>? AlarmTransitionEmitted;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes the consumer over a subtag source and a watch list of
|
||||
/// alarm targets.
|
||||
/// </summary>
|
||||
/// <param name="source">The subtag value source to advise and write through.</param>
|
||||
/// <param name="watchList">The alarm subtag targets to observe.</param>
|
||||
/// <exception cref="ArgumentNullException">
|
||||
/// Thrown when <paramref name="source"/> or <paramref name="watchList"/>
|
||||
/// is <see langword="null"/>.
|
||||
/// </exception>
|
||||
public SubtagAlarmConsumer(ISubtagAlarmSource source, IReadOnlyList<AlarmSubtagTarget> watchList)
|
||||
{
|
||||
this.source = source ?? throw new ArgumentNullException(nameof(source));
|
||||
if (watchList is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(watchList));
|
||||
}
|
||||
|
||||
this.stateMachine = new SubtagAlarmStateMachine(watchList);
|
||||
this.targetsByReference = new Dictionary<string, AlarmSubtagTarget>(StringComparer.OrdinalIgnoreCase);
|
||||
this.referencesBySyntheticGuid = new Dictionary<Guid, string>();
|
||||
|
||||
foreach (AlarmSubtagTarget target in watchList)
|
||||
{
|
||||
string reference = target.AlarmFullReference ?? string.Empty;
|
||||
this.targetsByReference[reference] = target;
|
||||
this.referencesBySyntheticGuid[SyntheticAlarmGuid.ForReference(reference)] = reference;
|
||||
}
|
||||
|
||||
this.valueChangedHandler = OnValueChanged;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Advises every observable alarm subtag (active / acked / priority —
|
||||
/// the ack-comment subtag is a write-only target and is not advised)
|
||||
/// and begins listening for value changes. The <paramref name="subscription"/>
|
||||
/// expression is ignored: the subtag set is fixed by the watch list.
|
||||
/// </summary>
|
||||
/// <param name="subscription">The subscription expression (unused in subtag mode).</param>
|
||||
public void Subscribe(string subscription)
|
||||
{
|
||||
if (disposed)
|
||||
{
|
||||
throw new ObjectDisposedException(nameof(SubtagAlarmConsumer));
|
||||
}
|
||||
|
||||
List<string> addresses = new List<string>();
|
||||
foreach (AlarmSubtagTarget target in targetsByReference.Values)
|
||||
{
|
||||
AddIfNotEmpty(addresses, target.ActiveSubtag);
|
||||
AddIfNotEmpty(addresses, target.AckedSubtag);
|
||||
AddIfNotEmpty(addresses, target.PrioritySubtag);
|
||||
}
|
||||
|
||||
source.Advise(addresses);
|
||||
|
||||
if (!subscribed)
|
||||
{
|
||||
source.ValueChanged += valueChangedHandler;
|
||||
subscribed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Acknowledges an alarm by its synthetic GUID. Resolves the GUID back
|
||||
/// to its alarm full reference and delegates to the by-name write path.
|
||||
/// </summary>
|
||||
/// <param name="alarmGuid">The synthetic alarm GUID.</param>
|
||||
/// <param name="ackComment">The acknowledgment comment.</param>
|
||||
/// <param name="ackOperatorName">The operator name (unused in subtag mode).</param>
|
||||
/// <param name="ackOperatorNode">The operator node (unused in subtag mode).</param>
|
||||
/// <param name="ackOperatorDomain">The operator domain (unused in subtag mode).</param>
|
||||
/// <param name="ackOperatorFullName">The operator full name (unused in subtag mode).</param>
|
||||
/// <returns>0 on success; non-zero when the GUID or ack-comment subtag is unknown.</returns>
|
||||
public int AcknowledgeByGuid(
|
||||
Guid alarmGuid,
|
||||
string ackComment,
|
||||
string ackOperatorName,
|
||||
string ackOperatorNode,
|
||||
string ackOperatorDomain,
|
||||
string ackOperatorFullName)
|
||||
{
|
||||
if (disposed)
|
||||
{
|
||||
throw new ObjectDisposedException(nameof(SubtagAlarmConsumer));
|
||||
}
|
||||
|
||||
if (!referencesBySyntheticGuid.TryGetValue(alarmGuid, out string reference) ||
|
||||
!targetsByReference.TryGetValue(reference, out AlarmSubtagTarget target))
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return WriteAckComment(target, ackComment);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Acknowledges an alarm by its (name, provider, group) tuple. In subtag
|
||||
/// mode the comment is written to the target's writable ack-comment
|
||||
/// subtag; the operator-identity arguments are not surfaced through the
|
||||
/// subtag write.
|
||||
/// </summary>
|
||||
/// <param name="alarmName">The alarm name (object-rooted tag name or any reference fragment).</param>
|
||||
/// <param name="providerName">The provider name (unused for matching).</param>
|
||||
/// <param name="groupName">The group name (unused for matching).</param>
|
||||
/// <param name="ackComment">The acknowledgment comment.</param>
|
||||
/// <param name="ackOperatorName">The operator name (unused in subtag mode).</param>
|
||||
/// <param name="ackOperatorNode">The operator node (unused in subtag mode).</param>
|
||||
/// <param name="ackOperatorDomain">The operator domain (unused in subtag mode).</param>
|
||||
/// <param name="ackOperatorFullName">The operator full name (unused in subtag mode).</param>
|
||||
/// <returns>0 on success; non-zero when no target matches or it lacks an ack-comment subtag.</returns>
|
||||
public int AcknowledgeByName(
|
||||
string alarmName,
|
||||
string providerName,
|
||||
string groupName,
|
||||
string ackComment,
|
||||
string ackOperatorName,
|
||||
string ackOperatorNode,
|
||||
string ackOperatorDomain,
|
||||
string ackOperatorFullName)
|
||||
{
|
||||
if (disposed)
|
||||
{
|
||||
throw new ObjectDisposedException(nameof(SubtagAlarmConsumer));
|
||||
}
|
||||
|
||||
AlarmSubtagTarget? target = ResolveTargetByName(alarmName);
|
||||
if (target is null)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return WriteAckComment(target, ackComment);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the state machine's currently-active alarm snapshot, with
|
||||
/// each record stamped <see cref="MxAlarmSnapshotRecord.Degraded"/> and
|
||||
/// assigned its synthetic GUID.
|
||||
/// </summary>
|
||||
/// <returns>The active alarm snapshot records.</returns>
|
||||
public IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms()
|
||||
{
|
||||
IReadOnlyList<MxAlarmSnapshotRecord> records = stateMachine.SnapshotActive();
|
||||
foreach (MxAlarmSnapshotRecord record in records)
|
||||
{
|
||||
StampSynthetic(record);
|
||||
}
|
||||
|
||||
return records;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// No-op: the subtag path is event-driven and owns no poll cadence.
|
||||
/// </summary>
|
||||
public void PollOnce()
|
||||
{
|
||||
// Subtag mode is event-driven; value changes arrive via the source's
|
||||
// advise stream, so there is nothing to poll.
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void Dispose()
|
||||
{
|
||||
if (disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
disposed = true;
|
||||
|
||||
if (subscribed)
|
||||
{
|
||||
try
|
||||
{
|
||||
source.ValueChanged -= valueChangedHandler;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// swallow — best-effort detach during dispose
|
||||
}
|
||||
|
||||
subscribed = false;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
source.Dispose();
|
||||
}
|
||||
catch
|
||||
{
|
||||
// swallow — best-effort source dispose
|
||||
}
|
||||
}
|
||||
|
||||
private void OnValueChanged(object? sender, SubtagValueChange change)
|
||||
{
|
||||
if (disposed || change is null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
IReadOnlyList<MxAlarmTransitionEvent> transitions =
|
||||
stateMachine.Apply(change.ItemAddress, change.Value, change.TimestampUtc);
|
||||
if (transitions.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
EventHandler<MxAlarmTransitionEvent>? handler = AlarmTransitionEmitted;
|
||||
foreach (MxAlarmTransitionEvent transition in transitions)
|
||||
{
|
||||
StampSynthetic(transition.Record);
|
||||
handler?.Invoke(this, transition);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stamps the degraded flag and synthetic GUID onto a synthesized
|
||||
/// record. The state machine sets only the provider / group / tag-name
|
||||
/// parts, so the alarm reference is recomposed exactly as
|
||||
/// <see cref="AlarmDispatcher"/> composes it — keying the synthetic GUID
|
||||
/// off that recomposed reference keeps it stable across transitions and
|
||||
/// snapshots for the same alarm.
|
||||
/// </summary>
|
||||
private static void StampSynthetic(MxAlarmSnapshotRecord record)
|
||||
{
|
||||
record.Degraded = true;
|
||||
string reference = AlarmRecordTransitionMapper.ComposeFullReference(
|
||||
record.ProviderName, record.Group, record.TagName);
|
||||
record.AlarmGuid = SyntheticAlarmGuid.ForReference(reference);
|
||||
}
|
||||
|
||||
private AlarmSubtagTarget? ResolveTargetByName(string? alarmName)
|
||||
{
|
||||
if (string.IsNullOrEmpty(alarmName))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
// Match a target whose full reference contains the supplied name. The
|
||||
// dispatcher derives (name, provider, group) from the composed
|
||||
// reference, so the object-rooted tag name is always a substring of the
|
||||
// target's AlarmFullReference.
|
||||
foreach (AlarmSubtagTarget target in targetsByReference.Values)
|
||||
{
|
||||
string reference = target.AlarmFullReference ?? string.Empty;
|
||||
if (reference.IndexOf(alarmName!, StringComparison.OrdinalIgnoreCase) >= 0)
|
||||
{
|
||||
return target;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private int WriteAckComment(AlarmSubtagTarget target, string ackComment)
|
||||
{
|
||||
string ackCommentSubtag = target.AckCommentSubtag ?? string.Empty;
|
||||
if (string.IsNullOrEmpty(ackCommentSubtag))
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
source.Write(ackCommentSubtag, ackComment);
|
||||
return 0;
|
||||
}
|
||||
|
||||
private static void AddIfNotEmpty(List<string> addresses, string? address)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(address))
|
||||
{
|
||||
addresses.Add(address!);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user