diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs index ee6d8c0..98ae842 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs @@ -45,6 +45,12 @@ internal sealed class EventPump : IAsyncDisposable private static readonly Counter EventsDropped = Meter.CreateCounter("galaxy.events.dropped", unit: "{event}", description: "MxEvents dropped because the bounded channel was full (newest-dropped)."); + private static readonly Counter AlarmTransitionsReceived = + Meter.CreateCounter("galaxy.alarm_transitions.received", unit: "{event}", + description: "OnAlarmTransition events decoded and forwarded to driver-level handlers."); + private static readonly Counter AlarmTransitionsDecodingFailures = + Meter.CreateCounter("galaxy.alarm_transitions.decoding_failures", unit: "{event}", + description: "OnAlarmTransition events that arrived without a populated body or with an unspecified transition kind."); private readonly IGalaxySubscriber _subscriber; private readonly SubscriptionRegistry _registry; @@ -60,6 +66,15 @@ internal sealed class EventPump : IAsyncDisposable public event EventHandler? OnDataChange; + /// + /// Fires for every event the + /// gateway forwards. Decoded into a with + /// the OPC UA severity bucket already mapped via + /// . The driver wraps this onto + /// IAlarmSource.OnAlarmEvent in PR B.2. + /// + internal event EventHandler? OnAlarmTransition; + public EventPump( IGalaxySubscriber subscriber, SubscriptionRegistry registry, @@ -159,11 +174,24 @@ internal sealed class EventPump : IAsyncDisposable private void Dispatch(MxEvent ev) { - // Only OnDataChange events fan out to driver subscriptions today. OnWriteComplete - // / OperationComplete / OnBufferedDataChange are filtered out — write callers get - // their reply via the InvokeAsync round-trip, not via the event stream. - if (ev.Family != MxEventFamily.OnDataChange) return; + switch (ev.Family) + { + case MxEventFamily.OnDataChange: + DispatchDataChange(ev); + break; + case MxEventFamily.OnAlarmTransition: + DispatchAlarmTransition(ev); + break; + default: + // OnWriteComplete / OperationComplete / OnBufferedDataChange are filtered + // out — write callers get their reply via the InvokeAsync round-trip, not + // via the event stream. + return; + } + } + private void DispatchDataChange(MxEvent ev) + { var subscribers = _registry.ResolveSubscribers(ev.ItemHandle); if (subscribers.Count == 0) return; // stale event after unsubscribe — drop quietly @@ -184,6 +212,73 @@ internal sealed class EventPump : IAsyncDisposable } } + private void DispatchAlarmTransition(MxEvent ev) + { + // Body absent (e.g. malformed gateway event or worker version skew) — count and + // drop. The Part 9 sub-attribute fallback path keeps an alarm functional even + // when the rich payload disappears. + if (ev.OnAlarmTransition is not { } body) + { + AlarmTransitionsDecodingFailures.Add(1, _clientTag); + _logger.LogDebug( + "Galaxy OnAlarmTransition event arrived without a populated body (sequence={Sequence}); ignoring.", + ev.WorkerSequence); + return; + } + if (body.TransitionKind == AlarmTransitionKind.Unspecified) + { + AlarmTransitionsDecodingFailures.Add(1, _clientTag); + _logger.LogDebug( + "Galaxy OnAlarmTransition for {AlarmRef} has unspecified transition kind; ignoring.", + body.AlarmFullReference); + return; + } + + var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(body.Severity); + var transitionTimestamp = body.TransitionTimestamp is { } tts + ? tts.ToDateTime() + : DateTime.UtcNow; + DateTime? originalRaiseTimestamp = body.OriginalRaiseTimestamp is { } orts + ? orts.ToDateTime() + : null; + + var transition = new GalaxyAlarmTransition( + AlarmFullReference: body.AlarmFullReference, + SourceObjectReference: body.SourceObjectReference, + AlarmTypeName: body.AlarmTypeName, + TransitionKind: MapTransitionKind(body.TransitionKind), + SeverityBucket: bucket, + OpcUaSeverity: opcUaSeverity, + RawMxAccessSeverity: body.Severity, + OriginalRaiseTimestampUtc: originalRaiseTimestamp, + TransitionTimestampUtc: transitionTimestamp, + OperatorUser: body.OperatorUser, + OperatorComment: body.OperatorComment, + Category: body.Category, + Description: body.Description); + + AlarmTransitionsReceived.Add(1, _clientTag); + try + { + OnAlarmTransition?.Invoke(this, transition); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Galaxy OnAlarmTransition handler threw for {AlarmRef} — continuing.", + transition.AlarmFullReference); + } + } + + private static GalaxyAlarmTransitionKind MapTransitionKind(AlarmTransitionKind kind) => kind switch + { + AlarmTransitionKind.Raise => GalaxyAlarmTransitionKind.Raise, + AlarmTransitionKind.Acknowledge => GalaxyAlarmTransitionKind.Acknowledge, + AlarmTransitionKind.Clear => GalaxyAlarmTransitionKind.Clear, + AlarmTransitionKind.Retrigger => GalaxyAlarmTransitionKind.Retrigger, + _ => GalaxyAlarmTransitionKind.Unspecified, + }; + private DataValueSnapshot ToSnapshot(MxEvent ev) { var value = MxValueDecoder.Decode(ev.Value); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyAlarmTransition.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyAlarmTransition.cs new file mode 100644 index 0000000..3323133 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyAlarmTransition.cs @@ -0,0 +1,36 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// Decoded MXAccess alarm transition surfaced by . +/// The driver wraps this into on the +/// path; the richer fields +/// (operator user/comment, original raise time, category) become available +/// on the OPC UA Part 9 condition once AlarmEventArgs is extended in +/// the client-surface refresh PR (E.7). +/// +internal sealed record GalaxyAlarmTransition( + string AlarmFullReference, + string SourceObjectReference, + string AlarmTypeName, + GalaxyAlarmTransitionKind TransitionKind, + AlarmSeverity SeverityBucket, + int OpcUaSeverity, + int RawMxAccessSeverity, + DateTime? OriginalRaiseTimestampUtc, + DateTime TransitionTimestampUtc, + string OperatorUser, + string OperatorComment, + string Category, + string Description); + +/// Kind of alarm state change observed by . +internal enum GalaxyAlarmTransitionKind +{ + Unspecified = 0, + Raise = 1, + Acknowledge = 2, + Clear = 3, + Retrigger = 4, +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/MxAccessSeverityMapper.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/MxAccessSeverityMapper.cs new file mode 100644 index 0000000..ee104ee --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/MxAccessSeverityMapper.cs @@ -0,0 +1,55 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// Maps a raw MXAccess alarm severity (0-999, MXAccess scale) onto the +/// ladder + an OPC UA Part 9 numeric severity (1-1000). +/// +/// +/// +/// The four-bucket OPC UA ladder (250 / 500 / 750 / 1000 — Low / Medium / High / +/// Critical) is the same ladder v1's GalaxyAlarmTracker exposed (per +/// docs/v1/AlarmTracking.md). Galaxy templates assign severity values +/// 0-999; the bucket boundaries below match v1 so customers see no +/// surprise re-classification when the v2 path takes over. +/// +/// +/// Out-of-range inputs (negative or >= 1000) are clamped into the nearest +/// bucket rather than rejected. MXAccess occasionally surfaces slightly +/// out-of-range severities for legacy alarm types and we want them to flow +/// through the alarm path rather than disappear at the mapper. +/// +/// +internal static class MxAccessSeverityMapper +{ + /// OPC UA Part 9 numeric severity for the Low bucket (0-249 MxAccess). + public const int OpcUaSeverityLow = 250; + /// OPC UA Part 9 numeric severity for the Medium bucket (250-499 MxAccess). + public const int OpcUaSeverityMedium = 500; + /// OPC UA Part 9 numeric severity for the High bucket (500-749 MxAccess). + public const int OpcUaSeverityHigh = 750; + /// OPC UA Part 9 numeric severity for the Critical bucket (750+ MxAccess). + public const int OpcUaSeverityCritical = 1000; + + /// + /// Translate a raw MXAccess severity into the four-bucket + /// + OPC UA Part 9 numeric severity tuple. + /// + public static (AlarmSeverity Bucket, int OpcUaSeverity) Map(int rawMxAccessSeverity) + { + if (rawMxAccessSeverity < 250) + { + return (AlarmSeverity.Low, OpcUaSeverityLow); + } + if (rawMxAccessSeverity < 500) + { + return (AlarmSeverity.Medium, OpcUaSeverityMedium); + } + if (rawMxAccessSeverity < 750) + { + return (AlarmSeverity.High, OpcUaSeverityHigh); + } + return (AlarmSeverity.Critical, OpcUaSeverityCritical); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpAlarmTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpAlarmTests.cs new file mode 100644 index 0000000..43e712f --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpAlarmTests.cs @@ -0,0 +1,239 @@ +using System.Threading.Channels; +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; + +/// +/// PR B.1 — pins the EventPump's OnAlarmTransition decode path. Synthetic MxEvents +/// with the new family go in; the pump fires OnAlarmTransition with the +/// decoded payload + mapped severity bucket; data-change subscribers stay +/// untouched. +/// +public sealed class EventPumpAlarmTests +{ + [Fact] + public async Task Dispatches_raise_acknowledge_clear_in_sequence() + { + var subscriber = new ManualSubscriber(); + var registry = new SubscriptionRegistry(); + var transitions = new List(); + var dispatched = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await using var pump = new EventPump(subscriber, registry, channelCapacity: 16, clientName: "AlarmTest"); + pump.OnAlarmTransition += (_, transition) => + { + lock (transitions) + { + transitions.Add(transition); + if (transitions.Count == 3) dispatched.TrySetResult(true); + } + }; + pump.Start(); + + var raise = new DateTime(2026, 5, 1, 12, 0, 0, DateTimeKind.Utc); + var ack = raise.AddSeconds(30); + var clear = ack.AddSeconds(60); + + await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", + AlarmTransitionKind.Raise, severity: 750, transitionTime: raise)); + await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", + AlarmTransitionKind.Acknowledge, severity: 750, transitionTime: ack, + originalRaise: raise, operatorUser: "alice", operatorComment: "investigating")); + await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", + AlarmTransitionKind.Clear, severity: 750, transitionTime: clear, + originalRaise: raise)); + + var completed = await Task.WhenAny(dispatched.Task, Task.Delay(TimeSpan.FromSeconds(2))); + completed.ShouldBe(dispatched.Task, "all three alarm transitions should dispatch within 2s"); + + transitions.Count.ShouldBe(3); + + transitions[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise); + transitions[0].SeverityBucket.ShouldBe(AlarmSeverity.Critical); + transitions[0].OpcUaSeverity.ShouldBe(MxAccessSeverityMapper.OpcUaSeverityCritical); + transitions[0].RawMxAccessSeverity.ShouldBe(750); + transitions[0].TransitionTimestampUtc.ShouldBe(raise); + + transitions[1].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Acknowledge); + transitions[1].OperatorUser.ShouldBe("alice"); + transitions[1].OperatorComment.ShouldBe("investigating"); + transitions[1].OriginalRaiseTimestampUtc.ShouldBe(raise); + + transitions[2].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Clear); + transitions[2].OriginalRaiseTimestampUtc.ShouldBe(raise); + } + + [Fact] + public async Task Drops_alarm_event_with_unspecified_transition_kind() + { + var subscriber = new ManualSubscriber(); + var registry = new SubscriptionRegistry(); + var transitions = new List(); + + await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "AlarmTest"); + pump.OnAlarmTransition += (_, transition) => transitions.Add(transition); + pump.Start(); + + await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", + AlarmTransitionKind.Unspecified, severity: 100, + transitionTime: DateTime.UtcNow)); + + // Give the pump a beat to drain the channel. + await Task.Delay(150); + + transitions.ShouldBeEmpty("alarm transitions with Unspecified kind are decoder failures and must not fire OnAlarmTransition"); + } + + [Fact] + public async Task Drops_alarm_event_with_missing_body() + { + var subscriber = new ManualSubscriber(); + var registry = new SubscriptionRegistry(); + var transitions = new List(); + + await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "AlarmTest"); + pump.OnAlarmTransition += (_, transition) => transitions.Add(transition); + pump.Start(); + + // Family marked as alarm-transition but body left empty (worker version skew / + // malformed event). Production should count + drop, not throw. + await subscriber.EmitRawAsync(new MxEvent + { + Family = MxEventFamily.OnAlarmTransition, + WorkerSequence = 42, + }); + + await Task.Delay(150); + + transitions.ShouldBeEmpty(); + } + + [Fact] + public async Task Mixed_data_change_and_alarm_events_dispatch_independently() + { + var subscriber = new ManualSubscriber(); + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tank01.Level", ItemHandle: 7)]); + + var dataChanges = new List(); + var alarms = new List(); + var bothSeen = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await using var pump = new EventPump(subscriber, registry, channelCapacity: 16, clientName: "MixedTest"); + pump.OnDataChange += (_, args) => + { + lock (dataChanges) + { + dataChanges.Add(args); + if (dataChanges.Count >= 1 && alarms.Count >= 1) bothSeen.TrySetResult(true); + } + }; + pump.OnAlarmTransition += (_, transition) => + { + lock (alarms) + { + alarms.Add(transition); + if (dataChanges.Count >= 1 && alarms.Count >= 1) bothSeen.TrySetResult(true); + } + }; + pump.Start(); + + await subscriber.EmitAsync(itemHandle: 7, value: 41.0); + await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", + AlarmTransitionKind.Raise, severity: 600, transitionTime: DateTime.UtcNow)); + + var completed = await Task.WhenAny(bothSeen.Task, Task.Delay(TimeSpan.FromSeconds(2))); + completed.ShouldBe(bothSeen.Task); + + dataChanges.Count.ShouldBe(1); + alarms.Count.ShouldBe(1); + alarms[0].SeverityBucket.ShouldBe(AlarmSeverity.High); + } + + [Fact] + public async Task Filters_out_unsupported_event_families() + { + var subscriber = new ManualSubscriber(); + var registry = new SubscriptionRegistry(); + var transitions = new List(); + + await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "FilterTest"); + pump.OnAlarmTransition += (_, transition) => transitions.Add(transition); + pump.Start(); + + // OnWriteComplete and OperationComplete should be silently dropped. + await subscriber.EmitRawAsync(new MxEvent { Family = MxEventFamily.OnWriteComplete }); + await subscriber.EmitRawAsync(new MxEvent { Family = MxEventFamily.OperationComplete }); + + await Task.Delay(150); + + transitions.ShouldBeEmpty(); + } + + private static MxEvent NewAlarm( + string fullReference, + AlarmTransitionKind kind, + int severity, + DateTime transitionTime, + DateTime? originalRaise = null, + string operatorUser = "", + string operatorComment = "") + { + var body = new OnAlarmTransitionEvent + { + AlarmFullReference = fullReference, + SourceObjectReference = fullReference.Split('.')[0], + AlarmTypeName = "AnalogLimitAlarm.HiHi", + TransitionKind = kind, + Severity = severity, + TransitionTimestamp = Timestamp.FromDateTime(transitionTime), + OperatorUser = operatorUser, + OperatorComment = operatorComment, + Category = "Process", + Description = "Tank 01 high-high level", + }; + if (originalRaise is { } orts) + { + body.OriginalRaiseTimestamp = Timestamp.FromDateTime(orts); + } + return new MxEvent + { + Family = MxEventFamily.OnAlarmTransition, + OnAlarmTransition = body, + }; + } + + private sealed class ManualSubscriber : IGalaxySubscriber + { + private readonly Channel _stream = + Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + + public Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + => Task.FromResult>([]); + + public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + => Task.CompletedTask; + + public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) + => _stream.Reader.ReadAllAsync(cancellationToken); + + public ValueTask EmitAsync(int itemHandle, double value) => + _stream.Writer.WriteAsync(new MxEvent + { + Family = MxEventFamily.OnDataChange, + ItemHandle = itemHandle, + Value = new MxValue { DoubleValue = value }, + Quality = 192, + SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }); + + public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev); + public ValueTask EmitRawAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/MxAccessSeverityMapperTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/MxAccessSeverityMapperTests.cs new file mode 100644 index 0000000..1951e31 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/MxAccessSeverityMapperTests.cs @@ -0,0 +1,43 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; + +/// +/// Pins the four-bucket MxAccess severity → (AlarmSeverity, OPC UA numeric) ladder. +/// Customers see no surprise re-classification when the v2 path takes over from +/// v1's sub-attribute synthesis: the bucket boundaries match v1's +/// GalaxyAlarmTracker per docs/v1/AlarmTracking.md. +/// +public sealed class MxAccessSeverityMapperTests +{ + [Theory] + [InlineData(0, AlarmSeverity.Low, MxAccessSeverityMapper.OpcUaSeverityLow)] + [InlineData(1, AlarmSeverity.Low, MxAccessSeverityMapper.OpcUaSeverityLow)] + [InlineData(249, AlarmSeverity.Low, MxAccessSeverityMapper.OpcUaSeverityLow)] + [InlineData(250, AlarmSeverity.Medium, MxAccessSeverityMapper.OpcUaSeverityMedium)] + [InlineData(499, AlarmSeverity.Medium, MxAccessSeverityMapper.OpcUaSeverityMedium)] + [InlineData(500, AlarmSeverity.High, MxAccessSeverityMapper.OpcUaSeverityHigh)] + [InlineData(749, AlarmSeverity.High, MxAccessSeverityMapper.OpcUaSeverityHigh)] + [InlineData(750, AlarmSeverity.Critical, MxAccessSeverityMapper.OpcUaSeverityCritical)] + [InlineData(999, AlarmSeverity.Critical, MxAccessSeverityMapper.OpcUaSeverityCritical)] + [InlineData(int.MaxValue, AlarmSeverity.Critical, MxAccessSeverityMapper.OpcUaSeverityCritical)] + public void Map_assigns_expected_bucket(int rawMxAccessSeverity, AlarmSeverity expectedBucket, int expectedOpcUaSeverity) + { + var (bucket, opcUa) = MxAccessSeverityMapper.Map(rawMxAccessSeverity); + + bucket.ShouldBe(expectedBucket); + opcUa.ShouldBe(expectedOpcUaSeverity); + } + + [Fact] + public void Map_clamps_negative_severities_into_low_bucket() + { + var (bucket, opcUa) = MxAccessSeverityMapper.Map(-100); + + bucket.ShouldBe(AlarmSeverity.Low); + opcUa.ShouldBe(MxAccessSeverityMapper.OpcUaSeverityLow); + } +}