driver-galaxy: EventPump dispatches OnAlarmTransition family (PR B.1)
Second PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Depends on PR A.1 in mxaccessgw (merged) which added the OnAlarmTransitionEvent body + family. No runtime impact yet — the gateway doesn't emit the new family until A.3 ships; this PR just stops dropping it on the floor. EventPump.Dispatch becomes a switch on MxEventFamily. The new DispatchAlarmTransition decodes the proto event, runs the raw severity through MxAccessSeverityMapper (the same four-bucket ladder v1 used — 250/500/750/1000 boundaries per docs/v1/AlarmTracking.md), and fires an internal OnAlarmTransition event with a GalaxyAlarmTransition record carrying the full payload. Body absent or transition-kind unspecified → counted via galaxy.alarm_transitions.decoding_failures and dropped. Gateway version skew or worker malformed event therefore degrades to "fall back to the sub-attribute path" rather than crashing the pump. GalaxyDriver consumes the internal event in PR B.2 (next), wrapping it onto IAlarmSource.OnAlarmEvent. The richer fields (operator user + comment, original raise time, category) become visible on the OPC UA Part 9 condition once AlarmEventArgs gets extended in E.7. Tests: - MxAccessSeverityMapperTests — full bucket ladder + clamp behaviour for negative + out-of-range inputs. - EventPumpAlarmTests — raise/ack/clear sequence dispatches in order with operator metadata + original-raise preserved; unspecified kind drops; missing body drops; mixed data-change + alarm streams dispatch independently; OnWriteComplete / OperationComplete filtered out. Full Driver.Galaxy.Tests suite: 196 passed (was 191 — 5 new tests). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -45,6 +45,12 @@ internal sealed class EventPump : IAsyncDisposable
|
||||
private static readonly Counter<long> EventsDropped =
|
||||
Meter.CreateCounter<long>("galaxy.events.dropped", unit: "{event}",
|
||||
description: "MxEvents dropped because the bounded channel was full (newest-dropped).");
|
||||
private static readonly Counter<long> AlarmTransitionsReceived =
|
||||
Meter.CreateCounter<long>("galaxy.alarm_transitions.received", unit: "{event}",
|
||||
description: "OnAlarmTransition events decoded and forwarded to driver-level handlers.");
|
||||
private static readonly Counter<long> AlarmTransitionsDecodingFailures =
|
||||
Meter.CreateCounter<long>("galaxy.alarm_transitions.decoding_failures", unit: "{event}",
|
||||
description: "OnAlarmTransition events that arrived without a populated body or with an unspecified transition kind.");
|
||||
|
||||
private readonly IGalaxySubscriber _subscriber;
|
||||
private readonly SubscriptionRegistry _registry;
|
||||
@@ -60,6 +66,15 @@ internal sealed class EventPump : IAsyncDisposable
|
||||
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
|
||||
/// <summary>
|
||||
/// Fires for every <see cref="MxEventFamily.OnAlarmTransition"/> event the
|
||||
/// gateway forwards. Decoded into a <see cref="GalaxyAlarmTransition"/> with
|
||||
/// the OPC UA severity bucket already mapped via
|
||||
/// <see cref="MxAccessSeverityMapper"/>. The driver wraps this onto
|
||||
/// <c>IAlarmSource.OnAlarmEvent</c> in PR B.2.
|
||||
/// </summary>
|
||||
internal event EventHandler<GalaxyAlarmTransition>? OnAlarmTransition;
|
||||
|
||||
public EventPump(
|
||||
IGalaxySubscriber subscriber,
|
||||
SubscriptionRegistry registry,
|
||||
@@ -159,11 +174,24 @@ internal sealed class EventPump : IAsyncDisposable
|
||||
|
||||
private void Dispatch(MxEvent ev)
|
||||
{
|
||||
// Only OnDataChange events fan out to driver subscriptions today. OnWriteComplete
|
||||
// / OperationComplete / OnBufferedDataChange are filtered out — write callers get
|
||||
// their reply via the InvokeAsync round-trip, not via the event stream.
|
||||
if (ev.Family != MxEventFamily.OnDataChange) return;
|
||||
switch (ev.Family)
|
||||
{
|
||||
case MxEventFamily.OnDataChange:
|
||||
DispatchDataChange(ev);
|
||||
break;
|
||||
case MxEventFamily.OnAlarmTransition:
|
||||
DispatchAlarmTransition(ev);
|
||||
break;
|
||||
default:
|
||||
// OnWriteComplete / OperationComplete / OnBufferedDataChange are filtered
|
||||
// out — write callers get their reply via the InvokeAsync round-trip, not
|
||||
// via the event stream.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private void DispatchDataChange(MxEvent ev)
|
||||
{
|
||||
var subscribers = _registry.ResolveSubscribers(ev.ItemHandle);
|
||||
if (subscribers.Count == 0) return; // stale event after unsubscribe — drop quietly
|
||||
|
||||
@@ -184,6 +212,73 @@ internal sealed class EventPump : IAsyncDisposable
|
||||
}
|
||||
}
|
||||
|
||||
private void DispatchAlarmTransition(MxEvent ev)
|
||||
{
|
||||
// Body absent (e.g. malformed gateway event or worker version skew) — count and
|
||||
// drop. The Part 9 sub-attribute fallback path keeps an alarm functional even
|
||||
// when the rich payload disappears.
|
||||
if (ev.OnAlarmTransition is not { } body)
|
||||
{
|
||||
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
|
||||
_logger.LogDebug(
|
||||
"Galaxy OnAlarmTransition event arrived without a populated body (sequence={Sequence}); ignoring.",
|
||||
ev.WorkerSequence);
|
||||
return;
|
||||
}
|
||||
if (body.TransitionKind == AlarmTransitionKind.Unspecified)
|
||||
{
|
||||
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
|
||||
_logger.LogDebug(
|
||||
"Galaxy OnAlarmTransition for {AlarmRef} has unspecified transition kind; ignoring.",
|
||||
body.AlarmFullReference);
|
||||
return;
|
||||
}
|
||||
|
||||
var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(body.Severity);
|
||||
var transitionTimestamp = body.TransitionTimestamp is { } tts
|
||||
? tts.ToDateTime()
|
||||
: DateTime.UtcNow;
|
||||
DateTime? originalRaiseTimestamp = body.OriginalRaiseTimestamp is { } orts
|
||||
? orts.ToDateTime()
|
||||
: null;
|
||||
|
||||
var transition = new GalaxyAlarmTransition(
|
||||
AlarmFullReference: body.AlarmFullReference,
|
||||
SourceObjectReference: body.SourceObjectReference,
|
||||
AlarmTypeName: body.AlarmTypeName,
|
||||
TransitionKind: MapTransitionKind(body.TransitionKind),
|
||||
SeverityBucket: bucket,
|
||||
OpcUaSeverity: opcUaSeverity,
|
||||
RawMxAccessSeverity: body.Severity,
|
||||
OriginalRaiseTimestampUtc: originalRaiseTimestamp,
|
||||
TransitionTimestampUtc: transitionTimestamp,
|
||||
OperatorUser: body.OperatorUser,
|
||||
OperatorComment: body.OperatorComment,
|
||||
Category: body.Category,
|
||||
Description: body.Description);
|
||||
|
||||
AlarmTransitionsReceived.Add(1, _clientTag);
|
||||
try
|
||||
{
|
||||
OnAlarmTransition?.Invoke(this, transition);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Galaxy OnAlarmTransition handler threw for {AlarmRef} — continuing.",
|
||||
transition.AlarmFullReference);
|
||||
}
|
||||
}
|
||||
|
||||
private static GalaxyAlarmTransitionKind MapTransitionKind(AlarmTransitionKind kind) => kind switch
|
||||
{
|
||||
AlarmTransitionKind.Raise => GalaxyAlarmTransitionKind.Raise,
|
||||
AlarmTransitionKind.Acknowledge => GalaxyAlarmTransitionKind.Acknowledge,
|
||||
AlarmTransitionKind.Clear => GalaxyAlarmTransitionKind.Clear,
|
||||
AlarmTransitionKind.Retrigger => GalaxyAlarmTransitionKind.Retrigger,
|
||||
_ => GalaxyAlarmTransitionKind.Unspecified,
|
||||
};
|
||||
|
||||
private DataValueSnapshot ToSnapshot(MxEvent ev)
|
||||
{
|
||||
var value = MxValueDecoder.Decode(ev.Value);
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Decoded MXAccess alarm transition surfaced by <see cref="EventPump"/>.
|
||||
/// The driver wraps this into <see cref="AlarmEventArgs"/> on the
|
||||
/// <see cref="IAlarmSource.OnAlarmEvent"/> path; the richer fields
|
||||
/// (operator user/comment, original raise time, category) become available
|
||||
/// on the OPC UA Part 9 condition once <c>AlarmEventArgs</c> is extended in
|
||||
/// the client-surface refresh PR (E.7).
|
||||
/// </summary>
|
||||
internal sealed record GalaxyAlarmTransition(
|
||||
string AlarmFullReference,
|
||||
string SourceObjectReference,
|
||||
string AlarmTypeName,
|
||||
GalaxyAlarmTransitionKind TransitionKind,
|
||||
AlarmSeverity SeverityBucket,
|
||||
int OpcUaSeverity,
|
||||
int RawMxAccessSeverity,
|
||||
DateTime? OriginalRaiseTimestampUtc,
|
||||
DateTime TransitionTimestampUtc,
|
||||
string OperatorUser,
|
||||
string OperatorComment,
|
||||
string Category,
|
||||
string Description);
|
||||
|
||||
/// <summary>Kind of alarm state change observed by <see cref="EventPump"/>.</summary>
|
||||
internal enum GalaxyAlarmTransitionKind
|
||||
{
|
||||
Unspecified = 0,
|
||||
Raise = 1,
|
||||
Acknowledge = 2,
|
||||
Clear = 3,
|
||||
Retrigger = 4,
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Maps a raw MXAccess alarm severity (0-999, MXAccess scale) onto the
|
||||
/// <see cref="AlarmSeverity"/> ladder + an OPC UA Part 9 numeric severity (1-1000).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// The four-bucket OPC UA ladder (250 / 500 / 750 / 1000 — Low / Medium / High /
|
||||
/// Critical) is the same ladder v1's <c>GalaxyAlarmTracker</c> exposed (per
|
||||
/// <c>docs/v1/AlarmTracking.md</c>). Galaxy templates assign severity values
|
||||
/// 0-999; the bucket boundaries below match v1 so customers see no
|
||||
/// surprise re-classification when the v2 path takes over.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Out-of-range inputs (negative or >= 1000) are clamped into the nearest
|
||||
/// bucket rather than rejected. MXAccess occasionally surfaces slightly
|
||||
/// out-of-range severities for legacy alarm types and we want them to flow
|
||||
/// through the alarm path rather than disappear at the mapper.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
internal static class MxAccessSeverityMapper
|
||||
{
|
||||
/// <summary>OPC UA Part 9 numeric severity for the Low bucket (0-249 MxAccess).</summary>
|
||||
public const int OpcUaSeverityLow = 250;
|
||||
/// <summary>OPC UA Part 9 numeric severity for the Medium bucket (250-499 MxAccess).</summary>
|
||||
public const int OpcUaSeverityMedium = 500;
|
||||
/// <summary>OPC UA Part 9 numeric severity for the High bucket (500-749 MxAccess).</summary>
|
||||
public const int OpcUaSeverityHigh = 750;
|
||||
/// <summary>OPC UA Part 9 numeric severity for the Critical bucket (750+ MxAccess).</summary>
|
||||
public const int OpcUaSeverityCritical = 1000;
|
||||
|
||||
/// <summary>
|
||||
/// Translate a raw MXAccess severity into the four-bucket
|
||||
/// <see cref="AlarmSeverity"/> + OPC UA Part 9 numeric severity tuple.
|
||||
/// </summary>
|
||||
public static (AlarmSeverity Bucket, int OpcUaSeverity) Map(int rawMxAccessSeverity)
|
||||
{
|
||||
if (rawMxAccessSeverity < 250)
|
||||
{
|
||||
return (AlarmSeverity.Low, OpcUaSeverityLow);
|
||||
}
|
||||
if (rawMxAccessSeverity < 500)
|
||||
{
|
||||
return (AlarmSeverity.Medium, OpcUaSeverityMedium);
|
||||
}
|
||||
if (rawMxAccessSeverity < 750)
|
||||
{
|
||||
return (AlarmSeverity.High, OpcUaSeverityHigh);
|
||||
}
|
||||
return (AlarmSeverity.Critical, OpcUaSeverityCritical);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user