From c7411700dc23e244ad26a33f44ca724b85db2acc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 29 May 2026 16:49:25 -0400 Subject: [PATCH] feat(dcl): MxGateway StreamAlarms adapter (snapshot + live transitions, reconnecting) Adds IAlarmSubscribableConnection to MxGatewayDataConnection (shared session-less feed, ref-counted), IMxGatewayClient.RunAlarmStreamAsync over the package StreamAlarmsAsync with internal reconnect, and MxGatewayAlarmMapper (AlarmFeedMessage/OnAlarmTransitionEvent -> NativeAlarmTransition). Behavior verified against a live gateway in Task 28; mapper unit-tested. --- .../Adapters/IMxGatewayClient.cs | 12 +++ .../Adapters/MxGatewayAlarmMapper.cs | 102 ++++++++++++++++++ .../Adapters/MxGatewayDataConnection.cs | 55 +++++++++- .../Adapters/RealMxGatewayClient.cs | 55 ++++++++++ .../Adapters/FakeMxGatewayClient.cs | 6 ++ .../MxGatewayAlarmMapperTests.cs | 66 ++++++++++++ 6 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayAlarmMapper.cs create mode 100644 tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/MxGatewayAlarmMapperTests.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/IMxGatewayClient.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/IMxGatewayClient.cs index db9eefca..da59744c 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/IMxGatewayClient.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/IMxGatewayClient.cs @@ -1,4 +1,5 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms; namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; @@ -69,6 +70,17 @@ public interface IMxGatewayClient : IAsyncDisposable /// Callback invoked per advised-tag value change. /// Cancellation token; ends the loop when cancelled. Task RunEventLoopAsync(Action onUpdate, CancellationToken ct = default); + + /// + /// Long-running consumer of the gateway's session-less StreamAlarms feed. Emits a + /// Snapshot…SnapshotComplete replay of active alarms then live transitions. Re-opens + /// the stream internally on transport faults (the source replays a fresh snapshot). + /// Completes only when is cancelled. + /// + /// Optional source-reference prefix to scope the feed; null = gateway-wide. + /// Callback invoked per native alarm transition. + /// Cancellation token; ends the loop when cancelled. + Task RunAlarmStreamAsync(string? alarmFilterPrefix, Action onTransition, CancellationToken ct = default); } /// Builds instances. diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayAlarmMapper.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayAlarmMapper.cs new file mode 100644 index 00000000..17e8ade8 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayAlarmMapper.cs @@ -0,0 +1,102 @@ +using ZB.MOM.WW.MxGateway.Contracts.Proto; +using ProtoConditionState = ZB.MOM.WW.MxGateway.Contracts.Proto.AlarmConditionState; +using ProtoTransitionKind = ZB.MOM.WW.MxGateway.Contracts.Proto.AlarmTransitionKind; +// Alias the Commons alarm types so their simple names bind here, unambiguous +// against the colliding gateway proto enums above. +using NativeAlarmTransition = ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms.NativeAlarmTransition; +using AlarmConditionState = ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms.AlarmConditionState; +using AlarmTransitionKind = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmTransitionKind; +using AlarmShelveState = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmShelveState; + +namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; + +/// +/// Pure mapping from MxAccess Gateway alarm-feed proto messages to the +/// protocol-neutral shape. The gateway proto +/// enums (AlarmConditionState / AlarmTransitionKind) collide with the Commons +/// alarm enums, so they are aliased here. Unit-tested without a live gateway. +/// +public static class MxGatewayAlarmMapper +{ + /// Clamps the gateway severity onto the unified 0–1000 scale. + public static int NormalizeSeverity(int severity) => Math.Clamp(severity, 0, 1000); + + /// Maps a gateway condition-state + severity to the unified condition. + public static AlarmConditionState MapConditionState(ProtoConditionState state, int severity) + { + var (active, acked) = state switch + { + ProtoConditionState.Active => (true, false), + ProtoConditionState.ActiveAcked => (true, true), + ProtoConditionState.Inactive => (false, true), + _ => (false, true) + }; + return new AlarmConditionState(active, acked, Confirmed: null, + Shelve: AlarmShelveState.Unshelved, Suppressed: false, Severity: NormalizeSeverity(severity)); + } + + /// Maps a gateway transition kind to the unified transition kind. + public static AlarmTransitionKind MapKind(ProtoTransitionKind kind) => kind switch + { + ProtoTransitionKind.Raise => AlarmTransitionKind.Raise, + ProtoTransitionKind.Acknowledge => AlarmTransitionKind.Acknowledge, + ProtoTransitionKind.Clear => AlarmTransitionKind.Clear, + ProtoTransitionKind.Retrigger => AlarmTransitionKind.Retrigger, + _ => AlarmTransitionKind.StateChange + }; + + /// Derives the mirrored condition from a transition kind + severity. + private static AlarmConditionState ConditionFromKind(ProtoTransitionKind kind, int severity) + { + var (active, acked) = kind switch + { + ProtoTransitionKind.Raise => (true, false), + ProtoTransitionKind.Acknowledge => (true, true), + ProtoTransitionKind.Retrigger => (true, false), + ProtoTransitionKind.Clear => (false, true), + _ => (false, true) + }; + return new AlarmConditionState(active, acked, Confirmed: null, + Shelve: AlarmShelveState.Unshelved, Suppressed: false, Severity: NormalizeSeverity(severity)); + } + + /// Maps a live to a transition. + public static NativeAlarmTransition MapTransition(OnAlarmTransitionEvent body) => new( + SourceReference: body.AlarmFullReference, + SourceObjectReference: body.SourceObjectReference, + AlarmTypeName: body.AlarmTypeName, + Kind: MapKind(body.TransitionKind), + Condition: ConditionFromKind(body.TransitionKind, body.Severity), + Category: body.Category, + Description: body.Description, + Message: body.Description, + OperatorUser: body.OperatorUser, + OperatorComment: body.OperatorComment, + OriginalRaiseTime: body.OriginalRaiseTimestamp?.ToDateTimeOffset(), + TransitionTime: body.TransitionTimestamp?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow, + CurrentValue: "", + LimitValue: ""); + + /// The end-of-snapshot sentinel transition (no condition payload). + public static NativeAlarmTransition SnapshotComplete() => new( + "", "", "", AlarmTransitionKind.SnapshotComplete, + new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0), + "", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""); + + /// Maps one initial-snapshot entry to a Snapshot transition. + public static NativeAlarmTransition MapSnapshot(ActiveAlarmSnapshot snapshot) => new( + SourceReference: snapshot.AlarmFullReference, + SourceObjectReference: snapshot.SourceObjectReference, + AlarmTypeName: snapshot.AlarmTypeName, + Kind: AlarmTransitionKind.Snapshot, + Condition: MapConditionState(snapshot.CurrentState, snapshot.Severity), + Category: snapshot.Category, + Description: snapshot.Description, + Message: snapshot.Description, + OperatorUser: snapshot.OperatorUser, + OperatorComment: snapshot.OperatorComment, + OriginalRaiseTime: snapshot.OriginalRaiseTimestamp?.ToDateTimeOffset(), + TransitionTime: snapshot.LastTransitionTimestamp?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow, + CurrentValue: "", + LimitValue: ""); +} diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs index 79d9b8e2..8a5f501c 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs @@ -20,7 +20,7 @@ namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; /// , the actor disposes this adapter, creates a fresh one, /// reconnects and re-subscribes all tags. /// -public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection +public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection, IAlarmSubscribableConnection { private readonly IMxGatewayClientFactory _clientFactory; private readonly ILogger _logger; @@ -28,6 +28,15 @@ public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection private ConnectionHealth _status = ConnectionHealth.Disconnected; private CancellationTokenSource? _eventLoopCts; + // Native alarm feed: the gateway StreamAlarms RPC is session-less and + // gateway-wide, so one shared feed serves the whole connection. The + // DataConnectionActor routes transitions to instances by source reference, + // so a single shared callback (the first registered) suffices; subscriptions + // are ref-counted so the feed stops when the last one is removed. + private CancellationTokenSource? _alarmCts; + private int _alarmSubCount; + private readonly object _alarmLock = new(); + // subscriptionId → (tagPath, callback) so the event loop can route updates by tag, // plus tagPath → subscriptionId for reverse lookup. Concurrent because the event // loop reads from a background thread while Subscribe/Unsubscribe mutate. @@ -112,6 +121,13 @@ public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection public async Task DisconnectAsync(CancellationToken cancellationToken = default) { _eventLoopCts?.Cancel(); + lock (_alarmLock) + { + _alarmCts?.Cancel(); + _alarmCts?.Dispose(); + _alarmCts = null; + _alarmSubCount = 0; + } if (_client is not null) await _client.DisconnectAsync(cancellationToken); _status = ConnectionHealth.Disconnected; @@ -134,6 +150,43 @@ public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection await _client!.UnsubscribeAsync(subscriptionId, cancellationToken); } + /// + public Task SubscribeAlarmsAsync( + string sourceReference, string? conditionFilter, + AlarmTransitionCallback callback, CancellationToken cancellationToken = default) + { + lock (_alarmLock) + { + _alarmSubCount++; + if (_alarmCts == null) + { + _alarmCts = new CancellationTokenSource(); + var token = _alarmCts.Token; + var client = _client!; + // Gateway-wide feed (null prefix); the actor filters per source reference. + _ = Task.Run(() => client.RunAlarmStreamAsync(null, t => callback(t), token), token); + } + } + return Task.FromResult(Guid.NewGuid().ToString()); + } + + /// + public Task UnsubscribeAlarmsAsync(string subscriptionId, CancellationToken cancellationToken = default) + { + lock (_alarmLock) + { + if (_alarmSubCount > 0) + _alarmSubCount--; + if (_alarmSubCount == 0) + { + _alarmCts?.Cancel(); + _alarmCts?.Dispose(); + _alarmCts = null; + } + } + return Task.CompletedTask; + } + /// public async Task ReadAsync(string tagPath, CancellationToken cancellationToken = default) { diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs index d208f6ee..e38caa0b 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs @@ -246,6 +246,61 @@ public sealed class RealMxGatewayClient : IMxGatewayClient } } + /// + public async Task RunAlarmStreamAsync( + string? alarmFilterPrefix, Action onTransition, + CancellationToken ct = default) + { + var reconnectDelay = TimeSpan.FromSeconds(5); + while (!ct.IsCancellationRequested) + { + try + { + var request = new StreamAlarmsRequest + { + ClientCorrelationId = Guid.NewGuid().ToString("N"), + AlarmFilterPrefix = alarmFilterPrefix ?? string.Empty, + }; + + await foreach (var message in _client!.StreamAlarmsAsync(request, ct) + .WithCancellation(ct).ConfigureAwait(false)) + { + if (ct.IsCancellationRequested) break; + switch (message.PayloadCase) + { + case AlarmFeedMessage.PayloadOneofCase.ActiveAlarm: + onTransition(MxGatewayAlarmMapper.MapSnapshot(message.ActiveAlarm)); + break; + case AlarmFeedMessage.PayloadOneofCase.Transition: + onTransition(MxGatewayAlarmMapper.MapTransition(message.Transition)); + break; + case AlarmFeedMessage.PayloadOneofCase.SnapshotComplete: + onTransition(MxGatewayAlarmMapper.SnapshotComplete()); + break; + } + } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + return; // clean shutdown + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "MxGateway alarm stream faulted; reopening in {DelaySeconds}s", reconnectDelay.TotalSeconds); + } + + try + { + await Task.Delay(reconnectDelay, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + return; + } + } + } + /// public async ValueTask DisposeAsync() { diff --git a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs index 1118dedc..b1d1982e 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs @@ -58,6 +58,12 @@ public sealed class FakeMxGatewayClient : IMxGatewayClient, IMxGatewayClientFact ct.ThrowIfCancellationRequested(); // …or FaultEventLoop() faults it to simulate a stream break } + public Task RunAlarmStreamAsync( + string? alarmFilterPrefix, + Action onTransition, + CancellationToken ct = default) + => Task.CompletedTask; // no alarm feed in the fake + public ValueTask DisposeAsync() => ValueTask.CompletedTask; /// Simulate a stream break so the adapter raises Disconnected. diff --git a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/MxGatewayAlarmMapperTests.cs b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/MxGatewayAlarmMapperTests.cs new file mode 100644 index 00000000..74c117af --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/MxGatewayAlarmMapperTests.cs @@ -0,0 +1,66 @@ +using ZB.MOM.WW.MxGateway.Contracts.Proto; +using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; +using CommonsTransitionKind = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmTransitionKind; +using ProtoConditionState = ZB.MOM.WW.MxGateway.Contracts.Proto.AlarmConditionState; +using ProtoTransitionKind = ZB.MOM.WW.MxGateway.Contracts.Proto.AlarmTransitionKind; + +namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests; + +/// Task-12: pure MxGateway alarm-feed proto → NativeAlarmTransition mapping. +public class MxGatewayAlarmMapperTests +{ + [Fact] + public void MapTransition_AckTransition_IsAcknowledgedWithOperator() + { + var ev = new OnAlarmTransitionEvent + { + AlarmFullReference = "Tank01.Level.HiHi", + SourceObjectReference = "Tank01", + AlarmTypeName = "AnalogLimitAlarm.HiHi", + TransitionKind = ProtoTransitionKind.Acknowledge, + Severity = 600, + OperatorUser = "operator1", + OperatorComment = "ack", + Category = "Process", + Description = "hi" + }; + + var t = MxGatewayAlarmMapper.MapTransition(ev); + + Assert.Equal(CommonsTransitionKind.Acknowledge, t.Kind); + Assert.True(t.Condition.Active); + Assert.True(t.Condition.Acknowledged); + Assert.Equal(600, t.Condition.Severity); + Assert.Equal("operator1", t.OperatorUser); + Assert.Equal("Tank01", t.SourceObjectReference); + } + + [Fact] + public void MapConditionState_ActiveAcked_To_ActiveTrue_AckTrue() + { + var c = MxGatewayAlarmMapper.MapConditionState(ProtoConditionState.ActiveAcked, severity: 600); + Assert.True(c.Active); + Assert.True(c.Acknowledged); + Assert.Equal(600, c.Severity); + } + + [Fact] + public void MapSnapshot_ActiveUnacked_IsSnapshotKind() + { + var snap = new ActiveAlarmSnapshot + { + AlarmFullReference = "Tank01.Level.Hi", + SourceObjectReference = "Tank01", + AlarmTypeName = "AnalogLimitAlarm.Hi", + CurrentState = ProtoConditionState.Active, + Severity = 1500 // out of range — must clamp + }; + + var t = MxGatewayAlarmMapper.MapSnapshot(snap); + + Assert.Equal(CommonsTransitionKind.Snapshot, t.Kind); + Assert.True(t.Condition.Active); + Assert.False(t.Condition.Acknowledged); + Assert.Equal(1000, t.Condition.Severity); + } +}