diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/IMxGatewayClient.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/IMxGatewayClient.cs
index db9eefca..da59744c 100644
--- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/IMxGatewayClient.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/IMxGatewayClient.cs
@@ -1,4 +1,5 @@
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
+using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms;
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters;
@@ -69,6 +70,17 @@ public interface IMxGatewayClient : IAsyncDisposable
/// Callback invoked per advised-tag value change.
/// Cancellation token; ends the loop when cancelled.
Task RunEventLoopAsync(Action onUpdate, CancellationToken ct = default);
+
+ ///
+ /// Long-running consumer of the gateway's session-less StreamAlarms feed. Emits a
+ /// Snapshot…SnapshotComplete replay of active alarms then live transitions. Re-opens
+ /// the stream internally on transport faults (the source replays a fresh snapshot).
+ /// Completes only when is cancelled.
+ ///
+ /// Optional source-reference prefix to scope the feed; null = gateway-wide.
+ /// Callback invoked per native alarm transition.
+ /// Cancellation token; ends the loop when cancelled.
+ Task RunAlarmStreamAsync(string? alarmFilterPrefix, Action onTransition, CancellationToken ct = default);
}
/// Builds instances.
diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayAlarmMapper.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayAlarmMapper.cs
new file mode 100644
index 00000000..17e8ade8
--- /dev/null
+++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayAlarmMapper.cs
@@ -0,0 +1,102 @@
+using ZB.MOM.WW.MxGateway.Contracts.Proto;
+using ProtoConditionState = ZB.MOM.WW.MxGateway.Contracts.Proto.AlarmConditionState;
+using ProtoTransitionKind = ZB.MOM.WW.MxGateway.Contracts.Proto.AlarmTransitionKind;
+// Alias the Commons alarm types so their simple names bind here, unambiguous
+// against the colliding gateway proto enums above.
+using NativeAlarmTransition = ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms.NativeAlarmTransition;
+using AlarmConditionState = ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms.AlarmConditionState;
+using AlarmTransitionKind = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmTransitionKind;
+using AlarmShelveState = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmShelveState;
+
+namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters;
+
+///
+/// Pure mapping from MxAccess Gateway alarm-feed proto messages to the
+/// protocol-neutral shape. The gateway proto
+/// enums (AlarmConditionState / AlarmTransitionKind) collide with the Commons
+/// alarm enums, so they are aliased here. Unit-tested without a live gateway.
+///
+public static class MxGatewayAlarmMapper
+{
+ /// Clamps the gateway severity onto the unified 0–1000 scale.
+ public static int NormalizeSeverity(int severity) => Math.Clamp(severity, 0, 1000);
+
+ /// Maps a gateway condition-state + severity to the unified condition.
+ public static AlarmConditionState MapConditionState(ProtoConditionState state, int severity)
+ {
+ var (active, acked) = state switch
+ {
+ ProtoConditionState.Active => (true, false),
+ ProtoConditionState.ActiveAcked => (true, true),
+ ProtoConditionState.Inactive => (false, true),
+ _ => (false, true)
+ };
+ return new AlarmConditionState(active, acked, Confirmed: null,
+ Shelve: AlarmShelveState.Unshelved, Suppressed: false, Severity: NormalizeSeverity(severity));
+ }
+
+ /// Maps a gateway transition kind to the unified transition kind.
+ public static AlarmTransitionKind MapKind(ProtoTransitionKind kind) => kind switch
+ {
+ ProtoTransitionKind.Raise => AlarmTransitionKind.Raise,
+ ProtoTransitionKind.Acknowledge => AlarmTransitionKind.Acknowledge,
+ ProtoTransitionKind.Clear => AlarmTransitionKind.Clear,
+ ProtoTransitionKind.Retrigger => AlarmTransitionKind.Retrigger,
+ _ => AlarmTransitionKind.StateChange
+ };
+
+ /// Derives the mirrored condition from a transition kind + severity.
+ private static AlarmConditionState ConditionFromKind(ProtoTransitionKind kind, int severity)
+ {
+ var (active, acked) = kind switch
+ {
+ ProtoTransitionKind.Raise => (true, false),
+ ProtoTransitionKind.Acknowledge => (true, true),
+ ProtoTransitionKind.Retrigger => (true, false),
+ ProtoTransitionKind.Clear => (false, true),
+ _ => (false, true)
+ };
+ return new AlarmConditionState(active, acked, Confirmed: null,
+ Shelve: AlarmShelveState.Unshelved, Suppressed: false, Severity: NormalizeSeverity(severity));
+ }
+
+ /// Maps a live to a transition.
+ public static NativeAlarmTransition MapTransition(OnAlarmTransitionEvent body) => new(
+ SourceReference: body.AlarmFullReference,
+ SourceObjectReference: body.SourceObjectReference,
+ AlarmTypeName: body.AlarmTypeName,
+ Kind: MapKind(body.TransitionKind),
+ Condition: ConditionFromKind(body.TransitionKind, body.Severity),
+ Category: body.Category,
+ Description: body.Description,
+ Message: body.Description,
+ OperatorUser: body.OperatorUser,
+ OperatorComment: body.OperatorComment,
+ OriginalRaiseTime: body.OriginalRaiseTimestamp?.ToDateTimeOffset(),
+ TransitionTime: body.TransitionTimestamp?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow,
+ CurrentValue: "",
+ LimitValue: "");
+
+ /// The end-of-snapshot sentinel transition (no condition payload).
+ public static NativeAlarmTransition SnapshotComplete() => new(
+ "", "", "", AlarmTransitionKind.SnapshotComplete,
+ new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0),
+ "", "", "", "", "", null, DateTimeOffset.UtcNow, "", "");
+
+ /// Maps one initial-snapshot entry to a Snapshot transition.
+ public static NativeAlarmTransition MapSnapshot(ActiveAlarmSnapshot snapshot) => new(
+ SourceReference: snapshot.AlarmFullReference,
+ SourceObjectReference: snapshot.SourceObjectReference,
+ AlarmTypeName: snapshot.AlarmTypeName,
+ Kind: AlarmTransitionKind.Snapshot,
+ Condition: MapConditionState(snapshot.CurrentState, snapshot.Severity),
+ Category: snapshot.Category,
+ Description: snapshot.Description,
+ Message: snapshot.Description,
+ OperatorUser: snapshot.OperatorUser,
+ OperatorComment: snapshot.OperatorComment,
+ OriginalRaiseTime: snapshot.OriginalRaiseTimestamp?.ToDateTimeOffset(),
+ TransitionTime: snapshot.LastTransitionTimestamp?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow,
+ CurrentValue: "",
+ LimitValue: "");
+}
diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs
index 79d9b8e2..8a5f501c 100644
--- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs
@@ -20,7 +20,7 @@ namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters;
/// , the actor disposes this adapter, creates a fresh one,
/// reconnects and re-subscribes all tags.
///
-public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection
+public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection, IAlarmSubscribableConnection
{
private readonly IMxGatewayClientFactory _clientFactory;
private readonly ILogger _logger;
@@ -28,6 +28,15 @@ public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection
private ConnectionHealth _status = ConnectionHealth.Disconnected;
private CancellationTokenSource? _eventLoopCts;
+ // Native alarm feed: the gateway StreamAlarms RPC is session-less and
+ // gateway-wide, so one shared feed serves the whole connection. The
+ // DataConnectionActor routes transitions to instances by source reference,
+ // so a single shared callback (the first registered) suffices; subscriptions
+ // are ref-counted so the feed stops when the last one is removed.
+ private CancellationTokenSource? _alarmCts;
+ private int _alarmSubCount;
+ private readonly object _alarmLock = new();
+
// subscriptionId → (tagPath, callback) so the event loop can route updates by tag,
// plus tagPath → subscriptionId for reverse lookup. Concurrent because the event
// loop reads from a background thread while Subscribe/Unsubscribe mutate.
@@ -112,6 +121,13 @@ public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
_eventLoopCts?.Cancel();
+ lock (_alarmLock)
+ {
+ _alarmCts?.Cancel();
+ _alarmCts?.Dispose();
+ _alarmCts = null;
+ _alarmSubCount = 0;
+ }
if (_client is not null)
await _client.DisconnectAsync(cancellationToken);
_status = ConnectionHealth.Disconnected;
@@ -134,6 +150,43 @@ public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection
await _client!.UnsubscribeAsync(subscriptionId, cancellationToken);
}
+ ///
+ public Task SubscribeAlarmsAsync(
+ string sourceReference, string? conditionFilter,
+ AlarmTransitionCallback callback, CancellationToken cancellationToken = default)
+ {
+ lock (_alarmLock)
+ {
+ _alarmSubCount++;
+ if (_alarmCts == null)
+ {
+ _alarmCts = new CancellationTokenSource();
+ var token = _alarmCts.Token;
+ var client = _client!;
+ // Gateway-wide feed (null prefix); the actor filters per source reference.
+ _ = Task.Run(() => client.RunAlarmStreamAsync(null, t => callback(t), token), token);
+ }
+ }
+ return Task.FromResult(Guid.NewGuid().ToString());
+ }
+
+ ///
+ public Task UnsubscribeAlarmsAsync(string subscriptionId, CancellationToken cancellationToken = default)
+ {
+ lock (_alarmLock)
+ {
+ if (_alarmSubCount > 0)
+ _alarmSubCount--;
+ if (_alarmSubCount == 0)
+ {
+ _alarmCts?.Cancel();
+ _alarmCts?.Dispose();
+ _alarmCts = null;
+ }
+ }
+ return Task.CompletedTask;
+ }
+
///
public async Task ReadAsync(string tagPath, CancellationToken cancellationToken = default)
{
diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs
index d208f6ee..e38caa0b 100644
--- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/RealMxGatewayClient.cs
@@ -246,6 +246,61 @@ public sealed class RealMxGatewayClient : IMxGatewayClient
}
}
+ ///
+ public async Task RunAlarmStreamAsync(
+ string? alarmFilterPrefix, Action onTransition,
+ CancellationToken ct = default)
+ {
+ var reconnectDelay = TimeSpan.FromSeconds(5);
+ while (!ct.IsCancellationRequested)
+ {
+ try
+ {
+ var request = new StreamAlarmsRequest
+ {
+ ClientCorrelationId = Guid.NewGuid().ToString("N"),
+ AlarmFilterPrefix = alarmFilterPrefix ?? string.Empty,
+ };
+
+ await foreach (var message in _client!.StreamAlarmsAsync(request, ct)
+ .WithCancellation(ct).ConfigureAwait(false))
+ {
+ if (ct.IsCancellationRequested) break;
+ switch (message.PayloadCase)
+ {
+ case AlarmFeedMessage.PayloadOneofCase.ActiveAlarm:
+ onTransition(MxGatewayAlarmMapper.MapSnapshot(message.ActiveAlarm));
+ break;
+ case AlarmFeedMessage.PayloadOneofCase.Transition:
+ onTransition(MxGatewayAlarmMapper.MapTransition(message.Transition));
+ break;
+ case AlarmFeedMessage.PayloadOneofCase.SnapshotComplete:
+ onTransition(MxGatewayAlarmMapper.SnapshotComplete());
+ break;
+ }
+ }
+ }
+ catch (OperationCanceledException) when (ct.IsCancellationRequested)
+ {
+ return; // clean shutdown
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "MxGateway alarm stream faulted; reopening in {DelaySeconds}s", reconnectDelay.TotalSeconds);
+ }
+
+ try
+ {
+ await Task.Delay(reconnectDelay, ct).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) when (ct.IsCancellationRequested)
+ {
+ return;
+ }
+ }
+ }
+
///
public async ValueTask DisposeAsync()
{
diff --git a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs
index 1118dedc..b1d1982e 100644
--- a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs
+++ b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs
@@ -58,6 +58,12 @@ public sealed class FakeMxGatewayClient : IMxGatewayClient, IMxGatewayClientFact
ct.ThrowIfCancellationRequested(); // …or FaultEventLoop() faults it to simulate a stream break
}
+ public Task RunAlarmStreamAsync(
+ string? alarmFilterPrefix,
+ Action onTransition,
+ CancellationToken ct = default)
+ => Task.CompletedTask; // no alarm feed in the fake
+
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
/// Simulate a stream break so the adapter raises Disconnected.
diff --git a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/MxGatewayAlarmMapperTests.cs b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/MxGatewayAlarmMapperTests.cs
new file mode 100644
index 00000000..74c117af
--- /dev/null
+++ b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/MxGatewayAlarmMapperTests.cs
@@ -0,0 +1,66 @@
+using ZB.MOM.WW.MxGateway.Contracts.Proto;
+using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters;
+using CommonsTransitionKind = ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmTransitionKind;
+using ProtoConditionState = ZB.MOM.WW.MxGateway.Contracts.Proto.AlarmConditionState;
+using ProtoTransitionKind = ZB.MOM.WW.MxGateway.Contracts.Proto.AlarmTransitionKind;
+
+namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests;
+
+/// Task-12: pure MxGateway alarm-feed proto → NativeAlarmTransition mapping.
+public class MxGatewayAlarmMapperTests
+{
+ [Fact]
+ public void MapTransition_AckTransition_IsAcknowledgedWithOperator()
+ {
+ var ev = new OnAlarmTransitionEvent
+ {
+ AlarmFullReference = "Tank01.Level.HiHi",
+ SourceObjectReference = "Tank01",
+ AlarmTypeName = "AnalogLimitAlarm.HiHi",
+ TransitionKind = ProtoTransitionKind.Acknowledge,
+ Severity = 600,
+ OperatorUser = "operator1",
+ OperatorComment = "ack",
+ Category = "Process",
+ Description = "hi"
+ };
+
+ var t = MxGatewayAlarmMapper.MapTransition(ev);
+
+ Assert.Equal(CommonsTransitionKind.Acknowledge, t.Kind);
+ Assert.True(t.Condition.Active);
+ Assert.True(t.Condition.Acknowledged);
+ Assert.Equal(600, t.Condition.Severity);
+ Assert.Equal("operator1", t.OperatorUser);
+ Assert.Equal("Tank01", t.SourceObjectReference);
+ }
+
+ [Fact]
+ public void MapConditionState_ActiveAcked_To_ActiveTrue_AckTrue()
+ {
+ var c = MxGatewayAlarmMapper.MapConditionState(ProtoConditionState.ActiveAcked, severity: 600);
+ Assert.True(c.Active);
+ Assert.True(c.Acknowledged);
+ Assert.Equal(600, c.Severity);
+ }
+
+ [Fact]
+ public void MapSnapshot_ActiveUnacked_IsSnapshotKind()
+ {
+ var snap = new ActiveAlarmSnapshot
+ {
+ AlarmFullReference = "Tank01.Level.Hi",
+ SourceObjectReference = "Tank01",
+ AlarmTypeName = "AnalogLimitAlarm.Hi",
+ CurrentState = ProtoConditionState.Active,
+ Severity = 1500 // out of range — must clamp
+ };
+
+ var t = MxGatewayAlarmMapper.MapSnapshot(snap);
+
+ Assert.Equal(CommonsTransitionKind.Snapshot, t.Kind);
+ Assert.True(t.Condition.Active);
+ Assert.False(t.Condition.Acknowledged);
+ Assert.Equal(1000, t.Condition.Severity);
+ }
+}