diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs index e06d976..e10bedf 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs @@ -63,14 +63,18 @@ public sealed class GalaxyDriver private EventPump? _eventPump; private readonly Lock _pumpLock = new(); - // PR B.2 — IAlarmSource implementation. Production-side acks route through - // GatewayGalaxyAlarmAcknowledger which calls MxGatewayClient.AcknowledgeAlarmAsync - // (PR E.2 SDK). Tests inject IGalaxyAlarmAcknowledger via the internal ctor to - // exercise the wiring without a running gateway. The alarm event stream is - // delivered by EventPump.OnAlarmTransition (PR B.1) — this driver is the - // consumer that bridges it onto IAlarmSource.OnAlarmEvent. + // IAlarmSource implementation. Production-side acks route through + // GatewayGalaxyAlarmAcknowledger which calls the session-less + // MxGatewayClient.AcknowledgeAlarmAsync RPC; alarm transitions arrive on the + // gateway's session-less StreamAlarms feed via GatewayGalaxyAlarmFeed. Tests inject + // IGalaxyAlarmAcknowledger + IGalaxyAlarmFeed via the internal ctor to exercise the + // wiring without a running gateway. This driver bridges the feed's OnAlarmTransition + // onto IAlarmSource.OnAlarmEvent. private IGalaxyAlarmAcknowledger? _alarmAcknowledger; + private IGalaxyAlarmFeed? _alarmFeed; private readonly Lock _alarmHandlersLock = new(); + private readonly Lock _alarmFeedLock = new(); + private bool _alarmFeedWired; private readonly HashSet _alarmSubscriptions = new(); // PR 4.W — production runtime owned by InitializeAsync. The driver builds these @@ -118,7 +122,7 @@ public sealed class GalaxyDriver ILogger? logger = null) : this(driverInstanceId, options, hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null, - alarmAcknowledger: null, logger) + alarmAcknowledger: null, alarmFeed: null, logger) { } @@ -136,6 +140,7 @@ public sealed class GalaxyDriver IGalaxyDataWriter? dataWriter = null, IGalaxySubscriber? subscriber = null, IGalaxyAlarmAcknowledger? alarmAcknowledger = null, + IGalaxyAlarmFeed? alarmFeed = null, ILogger? logger = null) { _driverInstanceId = !string.IsNullOrWhiteSpace(driverInstanceId) @@ -148,6 +153,7 @@ public sealed class GalaxyDriver _dataWriter = dataWriter; _subscriber = subscriber; _alarmAcknowledger = alarmAcknowledger; + _alarmFeed = alarmFeed; // Forward the aggregator's transitions through IHostConnectivityProbe. _hostStatuses.OnHostStatusChanged += (_, args) => OnHostStatusChanged?.Invoke(this, args); @@ -230,8 +236,12 @@ public sealed class GalaxyDriver _subscriber, _hostStatuses, _logger, bufferedUpdateIntervalMs: _options.MxAccess.PublishingIntervalMs); - // PR B.2 — wire the alarm acknowledger to the live gateway client. - _alarmAcknowledger ??= new GatewayGalaxyAlarmAcknowledger(_ownedMxClient, _ownedMxSession, _logger); + // Wire the alarm acknowledger + feed to the live gateway client. Both are + // session-less — the gateway serves alarms from an always-on central monitor — + // so they hang off the owned MxGatewayClient, not the worker session. + _alarmAcknowledger ??= new GatewayGalaxyAlarmAcknowledger(_ownedMxClient, _logger); + _alarmFeed ??= new GatewayGalaxyAlarmFeed( + _ownedMxClient.StreamAlarmsAsync, _logger, _options.MxAccess.ClientName); } /// @@ -724,13 +734,34 @@ public sealed class GalaxyDriver channelCapacity: _options.MxAccess.EventPumpChannelCapacity, clientName: _options.MxAccess.ClientName); _eventPump.OnDataChange += OnPumpDataChange; - _eventPump.OnAlarmTransition += OnPumpAlarmTransition; _eventPump.Start(); return _eventPump; } } - // ===== IAlarmSource (PR B.2) ===== + // ===== IAlarmSource ===== + + /// + /// Start the gateway alarm feed (idempotent) and wire its transitions onto this + /// driver's bridge. The feed is session-less — it does + /// not depend on a data subscription or the . + /// + private void EnsureAlarmFeedStarted() + { + lock (_alarmFeedLock) + { + if (_alarmFeed is null) + { + throw new InvalidOperationException( + "GalaxyDriver alarm feed is not wired. InitializeAsync must run (or a feed " + + "seam must be injected via the internal ctor) before subscribing to alarms."); + } + if (_alarmFeedWired) return; + _alarmFeed.OnAlarmTransition += OnAlarmFeedTransition; + _alarmFeed.Start(); + _alarmFeedWired = true; + } + } /// public Task SubscribeAlarmsAsync( @@ -740,12 +771,11 @@ public sealed class GalaxyDriver ArgumentNullException.ThrowIfNull(sourceNodeIds); // The driver doesn't multiplex alarm subscriptions per source-node-id today — - // alarm events arrive on the same gateway StreamEvents channel as data-change - // events once the gateway emits the new family (PRs A.2 + A.3). The - // subscription handle is a sentinel the server uses for symmetric Unsubscribe; - // every active handle receives every alarm transition, and the server filters - // by source node before raising Part 9 conditions. Same shape AbCip uses. - EnsureEventPumpStarted(); + // every active handle receives every transition off the gateway's session-less + // StreamAlarms feed, and the server filters by source node before raising Part 9 + // conditions. The subscription handle is a sentinel the server uses for + // symmetric Unsubscribe. Same shape AbCip uses. + EnsureAlarmFeedStarted(); var handle = new GalaxyAlarmSubscriptionHandle(Guid.NewGuid().ToString("N")); lock (_alarmHandlersLock) { @@ -809,13 +839,13 @@ public sealed class GalaxyDriver } /// - /// Receives events from the EventPump and - /// reshapes them into for OPC UA-side consumers. - /// Fires only when at least one alarm subscription is - /// active so a server that hasn't called yet - /// doesn't surface untracked transitions. + /// Receives events from the gateway alarm + /// feed and reshapes them into for OPC UA-side + /// consumers. Fires only when at least one alarm + /// subscription is active so a server that hasn't called + /// yet doesn't surface untracked transitions. /// - private void OnPumpAlarmTransition(object? sender, GalaxyAlarmTransition transition) + private void OnAlarmFeedTransition(object? sender, GalaxyAlarmTransition transition) { GalaxyAlarmSubscriptionHandle? handle; lock (_alarmHandlersLock) @@ -921,6 +951,11 @@ public sealed class GalaxyDriver lock (_pumpLock) { pump = _eventPump; _eventPump = null; } pump?.DisposeAsync().AsTask().GetAwaiter().GetResult(); + IGalaxyAlarmFeed? alarmFeed; + lock (_alarmFeedLock) { alarmFeed = _alarmFeed; _alarmFeed = null; } + try { alarmFeed?.DisposeAsync().AsTask().GetAwaiter().GetResult(); } + catch (Exception ex) { _logger.LogWarning(ex, "Alarm feed dispose failed"); } + _ownedMxSession?.DisposeAsync().AsTask().GetAwaiter().GetResult(); _ownedMxSession = null; diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs index 98ae842..5cfe796 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs @@ -45,12 +45,6 @@ internal sealed class EventPump : IAsyncDisposable private static readonly Counter EventsDropped = Meter.CreateCounter("galaxy.events.dropped", unit: "{event}", description: "MxEvents dropped because the bounded channel was full (newest-dropped)."); - private static readonly Counter AlarmTransitionsReceived = - Meter.CreateCounter("galaxy.alarm_transitions.received", unit: "{event}", - description: "OnAlarmTransition events decoded and forwarded to driver-level handlers."); - private static readonly Counter AlarmTransitionsDecodingFailures = - Meter.CreateCounter("galaxy.alarm_transitions.decoding_failures", unit: "{event}", - description: "OnAlarmTransition events that arrived without a populated body or with an unspecified transition kind."); private readonly IGalaxySubscriber _subscriber; private readonly SubscriptionRegistry _registry; @@ -66,15 +60,6 @@ internal sealed class EventPump : IAsyncDisposable public event EventHandler? OnDataChange; - /// - /// Fires for every event the - /// gateway forwards. Decoded into a with - /// the OPC UA severity bucket already mapped via - /// . The driver wraps this onto - /// IAlarmSource.OnAlarmEvent in PR B.2. - /// - internal event EventHandler? OnAlarmTransition; - public EventPump( IGalaxySubscriber subscriber, SubscriptionRegistry registry, @@ -179,13 +164,12 @@ internal sealed class EventPump : IAsyncDisposable case MxEventFamily.OnDataChange: DispatchDataChange(ev); break; - case MxEventFamily.OnAlarmTransition: - DispatchAlarmTransition(ev); - break; default: - // OnWriteComplete / OperationComplete / OnBufferedDataChange are filtered - // out — write callers get their reply via the InvokeAsync round-trip, not - // via the event stream. + // OnAlarmTransition is no longer carried on the per-session event stream + // — alarms come from the gateway's session-less StreamAlarms feed + // (GatewayGalaxyAlarmFeed). OnWriteComplete / OperationComplete / + // OnBufferedDataChange are filtered out: write callers get their reply + // via the InvokeAsync round-trip, not via the event stream. return; } } @@ -212,73 +196,6 @@ internal sealed class EventPump : IAsyncDisposable } } - private void DispatchAlarmTransition(MxEvent ev) - { - // Body absent (e.g. malformed gateway event or worker version skew) — count and - // drop. The Part 9 sub-attribute fallback path keeps an alarm functional even - // when the rich payload disappears. - if (ev.OnAlarmTransition is not { } body) - { - AlarmTransitionsDecodingFailures.Add(1, _clientTag); - _logger.LogDebug( - "Galaxy OnAlarmTransition event arrived without a populated body (sequence={Sequence}); ignoring.", - ev.WorkerSequence); - return; - } - if (body.TransitionKind == AlarmTransitionKind.Unspecified) - { - AlarmTransitionsDecodingFailures.Add(1, _clientTag); - _logger.LogDebug( - "Galaxy OnAlarmTransition for {AlarmRef} has unspecified transition kind; ignoring.", - body.AlarmFullReference); - return; - } - - var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(body.Severity); - var transitionTimestamp = body.TransitionTimestamp is { } tts - ? tts.ToDateTime() - : DateTime.UtcNow; - DateTime? originalRaiseTimestamp = body.OriginalRaiseTimestamp is { } orts - ? orts.ToDateTime() - : null; - - var transition = new GalaxyAlarmTransition( - AlarmFullReference: body.AlarmFullReference, - SourceObjectReference: body.SourceObjectReference, - AlarmTypeName: body.AlarmTypeName, - TransitionKind: MapTransitionKind(body.TransitionKind), - SeverityBucket: bucket, - OpcUaSeverity: opcUaSeverity, - RawMxAccessSeverity: body.Severity, - OriginalRaiseTimestampUtc: originalRaiseTimestamp, - TransitionTimestampUtc: transitionTimestamp, - OperatorUser: body.OperatorUser, - OperatorComment: body.OperatorComment, - Category: body.Category, - Description: body.Description); - - AlarmTransitionsReceived.Add(1, _clientTag); - try - { - OnAlarmTransition?.Invoke(this, transition); - } - catch (Exception ex) - { - _logger.LogWarning(ex, - "Galaxy OnAlarmTransition handler threw for {AlarmRef} — continuing.", - transition.AlarmFullReference); - } - } - - private static GalaxyAlarmTransitionKind MapTransitionKind(AlarmTransitionKind kind) => kind switch - { - AlarmTransitionKind.Raise => GalaxyAlarmTransitionKind.Raise, - AlarmTransitionKind.Acknowledge => GalaxyAlarmTransitionKind.Acknowledge, - AlarmTransitionKind.Clear => GalaxyAlarmTransitionKind.Clear, - AlarmTransitionKind.Retrigger => GalaxyAlarmTransitionKind.Retrigger, - _ => GalaxyAlarmTransitionKind.Unspecified, - }; - private DataValueSnapshot ToSnapshot(MxEvent ev) { var value = MxValueDecoder.Decode(ev.Value); diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmAcknowledger.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmAcknowledger.cs index 8488124..1f0e0c0 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmAcknowledger.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmAcknowledger.cs @@ -5,26 +5,27 @@ using MxGateway.Contracts.Proto; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; /// -/// Production backed by the -/// MxGatewayClient.AcknowledgeAlarmAsync RPC (PR E.2). Maps the -/// reply's protocol status into a thrown exception when the gateway -/// reports a non-OK condition; native MxStatus failures inside the reply -/// surface as a logged warning so operator workflows aren't blocked by a -/// transient MxAccess hiccup. +/// Production backed by the session-less +/// MxGatewayClient.AcknowledgeAlarmAsync RPC. The updated gateway routes +/// acknowledgement through its always-on central alarm monitor, so no worker +/// session is involved — the driver supplies only the alarm reference, comment, +/// and operator principal. /// +/// +/// A non-OK means the gateway never reached MXAccess +/// (transport / dispatch failure) and is surfaced as a thrown exception. A non-zero +/// native ack return code (hresult) means MXAccess itself rejected the ack; +/// that is logged as a warning rather than thrown so a transient MXAccess hiccup +/// doesn't block the operator workflow — the operator can retry. +/// internal sealed class GatewayGalaxyAlarmAcknowledger : IGalaxyAlarmAcknowledger { private readonly MxGatewayClient _client; - private readonly GalaxyMxSession _session; private readonly ILogger _logger; - public GatewayGalaxyAlarmAcknowledger( - MxGatewayClient client, - GalaxyMxSession session, - ILogger logger) + public GatewayGalaxyAlarmAcknowledger(MxGatewayClient client, ILogger logger) { _client = client ?? throw new ArgumentNullException(nameof(client)); - _session = session ?? throw new ArgumentNullException(nameof(session)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } @@ -36,15 +37,9 @@ internal sealed class GatewayGalaxyAlarmAcknowledger : IGalaxyAlarmAcknowledger { ArgumentException.ThrowIfNullOrEmpty(alarmFullReference); - var session = _session.Session - ?? throw new InvalidOperationException( - "GatewayGalaxyAlarmAcknowledger requires a connected GalaxyMxSession; underlying gateway session is null."); - var sessionId = session.SessionId; - var reply = await _client.AcknowledgeAlarmAsync( new AcknowledgeAlarmRequest { - SessionId = sessionId, ClientCorrelationId = Guid.NewGuid().ToString("N"), AlarmFullReference = alarmFullReference, Comment = comment ?? string.Empty, @@ -52,14 +47,23 @@ internal sealed class GatewayGalaxyAlarmAcknowledger : IGalaxyAlarmAcknowledger }, cancellationToken).ConfigureAwait(false); - if (reply.Status is { Success: 0 } status) + // Protocol status — the gateway failed before MXAccess saw the ack. This is a + // hard failure: the operator's request was not delivered at all. + if (reply.ProtocolStatus is { } proto && proto.Code != ProtocolStatusCode.Ok) + { + throw new InvalidOperationException( + $"Galaxy AcknowledgeAlarm for '{alarmFullReference}' failed at the gateway: " + + $"{proto.Code} {proto.Message}"); + } + + // hresult is the authoritative native ack return code (0 = success). It is + // absent only on a worker protocol violation; with an OK protocol status a + // missing value is treated as success. + if (reply.HasHresult && reply.Hresult != 0) { - // Native MxAccess rejected the ack — log but don't throw. Treat as a - // best-effort operator workflow; the operator can retry via the OPC UA - // session if necessary. _logger.LogWarning( - "Galaxy AcknowledgeAlarm for {AlarmRef} returned MxStatus failure: category={Category} detail={Detail} text={Text}", - alarmFullReference, status.Category, status.Detail, status.DiagnosticText); + "Galaxy AcknowledgeAlarm for {AlarmRef} returned native ack failure code {Hresult}.", + alarmFullReference, reply.Hresult); } } } diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmFeed.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmFeed.cs new file mode 100644 index 0000000..85aa67a --- /dev/null +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmFeed.cs @@ -0,0 +1,264 @@ +using System.Diagnostics.Metrics; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using MxGateway.Contracts.Proto; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// Production over the gateway's session-less +/// StreamAlarms RPC. The stream opens with one +/// per currently-active alarm (the ConditionRefresh snapshot), then a +/// snapshot_complete sentinel, then a live +/// for every subsequent raise / acknowledge / clear. Each message is decoded into a +/// (severity already bucketed via +/// ) and surfaced on . +/// +/// +/// +/// The feed is independent of any worker session — the gateway's always-on central +/// alarm monitor owns the AVEVA subscription. The driver previously decoded alarm +/// transitions off the per-session StreamEvents stream (); +/// that path was retired when the gateway moved to the session-less alarm model. +/// +/// +/// The stream is supplied as a factory delegate (production passes +/// MxGatewayClient.StreamAlarmsAsync) so tests can drive synthetic feeds. +/// Streaming RPCs are not covered by the client's unary retry pipeline, so the feed +/// owns its reconnect: on any non-cancellation stream fault it logs, waits +/// reconnectDelay, and re-opens. The gateway re-sends the active-alarm +/// snapshot on every re-open, so the OPC UA condition layer sees current state +/// after a reconnect. +/// +/// +internal sealed class GatewayGalaxyAlarmFeed : IGalaxyAlarmFeed +{ + /// + /// Opens a StreamAlarms feed. Matches the method group + /// MxGatewayClient.StreamAlarmsAsync. + /// + internal delegate IAsyncEnumerable AlarmStreamFactory( + StreamAlarmsRequest request, CancellationToken cancellationToken); + + private static readonly TimeSpan DefaultReconnectDelay = TimeSpan.FromSeconds(5); + + // Shares the driver meter name so a host-level MeterListener catches feed counters + // alongside the EventPump's. Distinct Meter instance — same name is intentional. + private static readonly Meter Meter = new(EventPump.MeterName); + private static readonly Counter AlarmTransitionsReceived = + Meter.CreateCounter("galaxy.alarm_feed.transitions.received", unit: "{event}", + description: "Alarm feed messages decoded and forwarded to driver-level handlers."); + private static readonly Counter AlarmTransitionsDecodingFailures = + Meter.CreateCounter("galaxy.alarm_feed.transitions.decoding_failures", unit: "{event}", + description: "Alarm feed messages dropped for a missing body or unspecified transition kind."); + private static readonly Counter AlarmFeedReconnects = + Meter.CreateCounter("galaxy.alarm_feed.reconnects", unit: "{reconnect}", + description: "Times the alarm feed re-opened its StreamAlarms stream after a transport fault."); + + private readonly AlarmStreamFactory _streamFactory; + private readonly ILogger _logger; + private readonly string _alarmFilterPrefix; + private readonly TimeSpan _reconnectDelay; + private readonly KeyValuePair _clientTag; + private readonly CancellationTokenSource _cts = new(); + + private Task? _loop; + private bool _disposed; + + public event EventHandler? OnAlarmTransition; + + public GatewayGalaxyAlarmFeed( + AlarmStreamFactory streamFactory, + ILogger? logger = null, + string? clientName = null, + string? alarmFilterPrefix = null, + TimeSpan? reconnectDelay = null) + { + _streamFactory = streamFactory ?? throw new ArgumentNullException(nameof(streamFactory)); + _logger = logger ?? NullLogger.Instance; + _alarmFilterPrefix = alarmFilterPrefix ?? string.Empty; + _reconnectDelay = reconnectDelay ?? DefaultReconnectDelay; + _clientTag = new KeyValuePair("galaxy.client", clientName ?? ""); + } + + public void Start() + { + ObjectDisposedException.ThrowIf(_disposed, this); + if (_loop is not null) return; + _loop = Task.Run(() => RunAsync(_cts.Token)); + } + + private async Task RunAsync(CancellationToken ct) + { + var firstAttempt = true; + while (!ct.IsCancellationRequested) + { + if (!firstAttempt) + { + AlarmFeedReconnects.Add(1, _clientTag); + } + firstAttempt = false; + + try + { + var request = new StreamAlarmsRequest + { + ClientCorrelationId = Guid.NewGuid().ToString("N"), + AlarmFilterPrefix = _alarmFilterPrefix, + }; + + await foreach (var message in _streamFactory(request, ct) + .WithCancellation(ct).ConfigureAwait(false)) + { + if (ct.IsCancellationRequested) break; + Dispatch(message); + } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + return; // clean shutdown + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Galaxy alarm feed stream faulted — reopening in {DelaySeconds}s.", + _reconnectDelay.TotalSeconds); + } + + try + { + await Task.Delay(_reconnectDelay, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + return; + } + } + } + + private void Dispatch(AlarmFeedMessage message) + { + switch (message.PayloadCase) + { + case AlarmFeedMessage.PayloadOneofCase.ActiveAlarm: + DispatchSnapshotEntry(message.ActiveAlarm); + break; + case AlarmFeedMessage.PayloadOneofCase.Transition: + DispatchTransition(message.Transition); + break; + case AlarmFeedMessage.PayloadOneofCase.SnapshotComplete: + _logger.LogDebug("Galaxy alarm feed active-alarm snapshot complete."); + break; + default: + // Empty oneof — worker / gateway version skew. Count and drop. + AlarmTransitionsDecodingFailures.Add(1, _clientTag); + break; + } + } + + /// + /// Decode one entry of the initial active-alarm snapshot. Each currently-active + /// alarm is surfaced as a transition so the OPC UA Part 9 condition layer sees + /// the alarm's present state on (re)connect: an unacknowledged active alarm as + /// a , an acknowledged one as a + /// . + /// + private void DispatchSnapshotEntry(ActiveAlarmSnapshot snapshot) + { + var kind = snapshot.CurrentState switch + { + AlarmConditionState.Active => GalaxyAlarmTransitionKind.Raise, + AlarmConditionState.ActiveAcked => GalaxyAlarmTransitionKind.Acknowledge, + AlarmConditionState.Inactive => GalaxyAlarmTransitionKind.Clear, + _ => GalaxyAlarmTransitionKind.Unspecified, + }; + if (kind == GalaxyAlarmTransitionKind.Unspecified) + { + AlarmTransitionsDecodingFailures.Add(1, _clientTag); + _logger.LogDebug( + "Galaxy alarm feed snapshot entry for {AlarmRef} has unspecified condition state; ignoring.", + snapshot.AlarmFullReference); + return; + } + + var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(snapshot.Severity); + Raise(new GalaxyAlarmTransition( + AlarmFullReference: snapshot.AlarmFullReference, + SourceObjectReference: snapshot.SourceObjectReference, + AlarmTypeName: snapshot.AlarmTypeName, + TransitionKind: kind, + SeverityBucket: bucket, + OpcUaSeverity: opcUaSeverity, + RawMxAccessSeverity: snapshot.Severity, + OriginalRaiseTimestampUtc: snapshot.OriginalRaiseTimestamp?.ToDateTime(), + TransitionTimestampUtc: snapshot.LastTransitionTimestamp?.ToDateTime() ?? DateTime.UtcNow, + OperatorUser: snapshot.OperatorUser, + OperatorComment: snapshot.OperatorComment, + Category: snapshot.Category, + Description: snapshot.Description)); + } + + private void DispatchTransition(OnAlarmTransitionEvent body) + { + if (body.TransitionKind == AlarmTransitionKind.Unspecified) + { + AlarmTransitionsDecodingFailures.Add(1, _clientTag); + _logger.LogDebug( + "Galaxy alarm feed transition for {AlarmRef} has unspecified transition kind; ignoring.", + body.AlarmFullReference); + return; + } + + var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(body.Severity); + Raise(new GalaxyAlarmTransition( + AlarmFullReference: body.AlarmFullReference, + SourceObjectReference: body.SourceObjectReference, + AlarmTypeName: body.AlarmTypeName, + TransitionKind: MapTransitionKind(body.TransitionKind), + SeverityBucket: bucket, + OpcUaSeverity: opcUaSeverity, + RawMxAccessSeverity: body.Severity, + OriginalRaiseTimestampUtc: body.OriginalRaiseTimestamp?.ToDateTime(), + TransitionTimestampUtc: body.TransitionTimestamp?.ToDateTime() ?? DateTime.UtcNow, + OperatorUser: body.OperatorUser, + OperatorComment: body.OperatorComment, + Category: body.Category, + Description: body.Description)); + } + + private void Raise(GalaxyAlarmTransition transition) + { + AlarmTransitionsReceived.Add(1, _clientTag); + try + { + OnAlarmTransition?.Invoke(this, transition); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Galaxy alarm feed OnAlarmTransition handler threw for {AlarmRef} — continuing.", + transition.AlarmFullReference); + } + } + + private static GalaxyAlarmTransitionKind MapTransitionKind(AlarmTransitionKind kind) => kind switch + { + AlarmTransitionKind.Raise => GalaxyAlarmTransitionKind.Raise, + AlarmTransitionKind.Acknowledge => GalaxyAlarmTransitionKind.Acknowledge, + AlarmTransitionKind.Clear => GalaxyAlarmTransitionKind.Clear, + AlarmTransitionKind.Retrigger => GalaxyAlarmTransitionKind.Retrigger, + _ => GalaxyAlarmTransitionKind.Unspecified, + }; + + public async ValueTask DisposeAsync() + { + if (_disposed) return; + _disposed = true; + _cts.Cancel(); + if (_loop is not null) + { + try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ } + } + _cts.Dispose(); + } +} diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/IGalaxyAlarmFeed.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/IGalaxyAlarmFeed.cs new file mode 100644 index 0000000..e0cb4fe --- /dev/null +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/IGalaxyAlarmFeed.cs @@ -0,0 +1,29 @@ +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// Driver-side seam for the gateway's session-less alarm feed. Production wraps +/// MxGatewayClient.StreamAlarmsAsync (); +/// tests substitute a fake to drive synthetic +/// events through 's IAlarmSource bridge without a +/// running gateway. +/// +/// +/// The feed is independent of any worker session — the updated gateway serves +/// alarms from an always-on central monitor, so the feed survives subscription +/// churn and reconnects its own stream on transient transport failures. +/// +internal interface IGalaxyAlarmFeed : IAsyncDisposable +{ + /// + /// Fires for every alarm transition the gateway feed delivers — both the + /// entries of the initial active-alarm snapshot and every subsequent live + /// raise / acknowledge / clear. The OPC UA severity bucket is already mapped. + /// + event EventHandler? OnAlarmTransition; + + /// + /// Start consuming the alarm feed on a background task. Idempotent — second + /// calls are no-ops while the loop is running. + /// + void Start(); +} diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmEventArgsExtensionTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmEventArgsExtensionTests.cs index ce451c3..2aa04d5 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmEventArgsExtensionTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmEventArgsExtensionTests.cs @@ -1,6 +1,3 @@ -using System.Threading.Channels; -using Google.Protobuf.WellKnownTypes; -using MxGateway.Contracts.Proto; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; @@ -10,49 +7,40 @@ using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests; /// -/// PR E.7 — pins that the GalaxyDriver populates the extended AlarmEventArgs -/// fields (OperatorComment, OriginalRaiseTimestampUtc, AlarmCategory) when the -/// gateway emits a transition with the rich payload, and leaves them null on -/// events that don't carry them. +/// Pins that the GalaxyDriver populates the extended AlarmEventArgs fields +/// (OperatorComment, OriginalRaiseTimestampUtc, AlarmCategory) when the gateway +/// alarm feed delivers a transition with the rich payload, and leaves them null on +/// transitions that don't carry them. /// public sealed class GalaxyDriverAlarmEventArgsExtensionTests { [Fact] public async Task Acknowledge_transition_with_full_payload_populates_extended_fields() { - var subscriber = new ManualSubscriber(); - using var driver = NewDriver(subscriber); + var feed = new FakeAlarmFeed(); + using var driver = NewDriver(feed); await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None); var observed = new List(); driver.OnAlarmEvent += (_, args) => observed.Add(args); - await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None); var raise = new DateTime(2026, 5, 1, 12, 0, 0, DateTimeKind.Utc); var ack = raise.AddSeconds(45); - await subscriber.EmitAlarmAsync(new MxEvent - { - Family = MxEventFamily.OnAlarmTransition, - OnAlarmTransition = new OnAlarmTransitionEvent - { - AlarmFullReference = "Tank01.Level.HiHi", - SourceObjectReference = "Tank01", - AlarmTypeName = "AnalogLimitAlarm.HiHi", - TransitionKind = AlarmTransitionKind.Acknowledge, - Severity = 750, - OriginalRaiseTimestamp = Timestamp.FromDateTime(raise), - TransitionTimestamp = Timestamp.FromDateTime(ack), - OperatorUser = "alice", - OperatorComment = "investigating", - Category = "Process", - Description = "Tank 01 high-high level", - }, - }); + feed.Emit(new GalaxyAlarmTransition( + AlarmFullReference: "Tank01.Level.HiHi", + SourceObjectReference: "Tank01", + AlarmTypeName: "AnalogLimitAlarm.HiHi", + TransitionKind: GalaxyAlarmTransitionKind.Acknowledge, + SeverityBucket: AlarmSeverity.Critical, + OpcUaSeverity: 800, + RawMxAccessSeverity: 750, + OriginalRaiseTimestampUtc: raise, + TransitionTimestampUtc: ack, + OperatorUser: "alice", + OperatorComment: "investigating", + Category: "Process", + Description: "Tank 01 high-high level")); - for (var i = 0; i < 20 && observed.Count == 0; i++) - { - await Task.Delay(50); - } observed.ShouldHaveSingleItem(); observed[0].OperatorComment.ShouldBe("investigating"); observed[0].OriginalRaiseTimestampUtc.ShouldBe(raise); @@ -62,38 +50,35 @@ public sealed class GalaxyDriverAlarmEventArgsExtensionTests [Fact] public async Task Raise_transition_without_optional_fields_leaves_them_null() { - var subscriber = new ManualSubscriber(); - using var driver = NewDriver(subscriber); + var feed = new FakeAlarmFeed(); + using var driver = NewDriver(feed); await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None); var observed = new List(); driver.OnAlarmEvent += (_, args) => observed.Add(args); - await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None); - await subscriber.EmitAlarmAsync(new MxEvent - { - Family = MxEventFamily.OnAlarmTransition, - OnAlarmTransition = new OnAlarmTransitionEvent - { - AlarmFullReference = "Tank01.Level.HiHi", - AlarmTypeName = "AnalogLimitAlarm.HiHi", - TransitionKind = AlarmTransitionKind.Raise, - Severity = 750, - TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), - }, - }); + feed.Emit(new GalaxyAlarmTransition( + AlarmFullReference: "Tank01.Level.HiHi", + SourceObjectReference: string.Empty, + AlarmTypeName: "AnalogLimitAlarm.HiHi", + TransitionKind: GalaxyAlarmTransitionKind.Raise, + SeverityBucket: AlarmSeverity.Critical, + OpcUaSeverity: 800, + RawMxAccessSeverity: 750, + OriginalRaiseTimestampUtc: null, + TransitionTimestampUtc: DateTime.UtcNow, + OperatorUser: string.Empty, + OperatorComment: string.Empty, + Category: string.Empty, + Description: string.Empty)); - for (var i = 0; i < 20 && observed.Count == 0; i++) - { - await Task.Delay(50); - } observed.ShouldHaveSingleItem(); observed[0].OperatorComment.ShouldBeNull(); observed[0].OriginalRaiseTimestampUtc.ShouldBeNull(); observed[0].AlarmCategory.ShouldBeNull(); } - private static GalaxyDriver NewDriver(ManualSubscriber subscriber) + private static GalaxyDriver NewDriver(IGalaxyAlarmFeed feed) { var options = new GalaxyDriverOptions( new GalaxyGatewayOptions("http://localhost:5000", "literal-api-key"), @@ -104,35 +89,19 @@ public sealed class GalaxyDriverAlarmEventArgsExtensionTests driverInstanceId: "drv-1", options: options, hierarchySource: null, - dataReader: null, - dataWriter: null, - subscriber: subscriber, - alarmAcknowledger: null); + alarmFeed: feed); } - private sealed class ManualSubscriber : IGalaxySubscriber + /// In-memory the test drives directly. + private sealed class FakeAlarmFeed : IGalaxyAlarmFeed { - private readonly Channel _stream = - Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + public event EventHandler? OnAlarmTransition; - public Task> SubscribeBulkAsync( - IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) - { - var results = new List(); - var nextHandle = 100; - foreach (var r in fullReferences) - { - results.Add(new SubscribeResult { TagAddress = r, ItemHandle = nextHandle++, WasSuccessful = true }); - } - return Task.FromResult>(results); - } + public void Start() { } - public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) - => Task.CompletedTask; + public void Emit(GalaxyAlarmTransition transition) + => OnAlarmTransition?.Invoke(this, transition); - public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) - => _stream.Reader.ReadAllAsync(cancellationToken); - - public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev); + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } } diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmSourceTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmSourceTests.cs index dbe6564..89d45bf 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmSourceTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmSourceTests.cs @@ -1,6 +1,3 @@ -using System.Threading.Channels; -using Google.Protobuf.WellKnownTypes; -using MxGateway.Contracts.Proto; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; @@ -10,51 +7,31 @@ using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests; /// -/// PR B.2 — pins GalaxyDriver's IAlarmSource implementation. The driver bridges -/// EventPump.OnAlarmTransition (PR B.1) onto IAlarmSource.OnAlarmEvent and -/// forwards Acknowledge through IGalaxyAlarmAcknowledger (production: -/// GatewayGalaxyAlarmAcknowledger calling the gateway's AcknowledgeAlarm RPC -/// from PR E.2). +/// Pins GalaxyDriver's IAlarmSource implementation. The driver bridges the +/// gateway's session-less alarm feed (, production: +/// GatewayGalaxyAlarmFeed) onto IAlarmSource.OnAlarmEvent and forwards +/// Acknowledge through (production: +/// GatewayGalaxyAlarmAcknowledger calling the session-less +/// AcknowledgeAlarm RPC). /// public sealed class GalaxyDriverAlarmSourceTests { [Fact] - public async Task SubscribeAlarmsAsync_returns_handle_and_event_fires_after_pump_alarm() + public async Task SubscribeAlarmsAsync_starts_feed_and_event_fires_on_transition() { - var subscriber = new ManualSubscriber(); + var feed = new FakeAlarmFeed(); var ack = new RecordingAcknowledger(); - using var driver = NewDriver(subscriber, ack); + using var driver = NewDriver(feed, ack); - // Subscribe so OnAlarmEvent has a registered handle to fire under. var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None); handle.ShouldNotBeNull(); + feed.Started.ShouldBeTrue("SubscribeAlarmsAsync must start the alarm feed"); var observed = new List(); driver.OnAlarmEvent += (_, args) => observed.Add(args); - // SubscribeAsync to start the EventPump (alarm wiring is lazy on first sub). - await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None); - - await subscriber.EmitAlarmAsync(new MxEvent - { - Family = MxEventFamily.OnAlarmTransition, - OnAlarmTransition = new OnAlarmTransitionEvent - { - AlarmFullReference = "Tank01.Level.HiHi", - SourceObjectReference = "Tank01", - AlarmTypeName = "AnalogLimitAlarm.HiHi", - TransitionKind = AlarmTransitionKind.Raise, - Severity = 750, - TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), - Description = "Tank 01 high-high level", - }, - }); - - // Drain pump events. - for (var i = 0; i < 20 && observed.Count == 0; i++) - { - await Task.Delay(50); - } + feed.Emit(NewTransition("Tank01.Level.HiHi", "Tank01", + GalaxyAlarmTransitionKind.Raise, AlarmSeverity.Critical)); observed.ShouldHaveSingleItem(); observed[0].ConditionId.ShouldBe("Tank01.Level.HiHi"); @@ -65,30 +42,19 @@ public sealed class GalaxyDriverAlarmSourceTests } [Fact] - public async Task OnAlarmEvent_does_not_fire_when_no_subscription_active() + public void OnAlarmEvent_does_not_fire_before_any_alarm_subscription() { - var subscriber = new ManualSubscriber(); + var feed = new FakeAlarmFeed(); var ack = new RecordingAcknowledger(); - using var driver = NewDriver(subscriber, ack); + using var driver = NewDriver(feed, ack); var observed = new List(); driver.OnAlarmEvent += (_, args) => observed.Add(args); - // Start the pump via a data subscription so alarm events flow but no alarm - // subscription is registered → OnAlarmEvent is suppressed. - await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None); - await subscriber.EmitAlarmAsync(new MxEvent - { - Family = MxEventFamily.OnAlarmTransition, - OnAlarmTransition = new OnAlarmTransitionEvent - { - AlarmFullReference = "Tank01.Level.HiHi", - TransitionKind = AlarmTransitionKind.Raise, - Severity = 600, - TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), - }, - }); - await Task.Delay(150); + // No SubscribeAlarmsAsync → the feed is never wired onto the driver, so a + // transition surfaces nowhere. + feed.Emit(NewTransition("Tank01.Level.HiHi", "Tank01", + GalaxyAlarmTransitionKind.Raise, AlarmSeverity.High)); observed.ShouldBeEmpty(); } @@ -96,29 +62,20 @@ public sealed class GalaxyDriverAlarmSourceTests [Fact] public async Task UnsubscribeAlarmsAsync_stops_event_flow() { - var subscriber = new ManualSubscriber(); + var feed = new FakeAlarmFeed(); var ack = new RecordingAcknowledger(); - using var driver = NewDriver(subscriber, ack); + using var driver = NewDriver(feed, ack); var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None); var observed = new List(); driver.OnAlarmEvent += (_, args) => observed.Add(args); - await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None); await driver.UnsubscribeAlarmsAsync(handle, CancellationToken.None); - await subscriber.EmitAlarmAsync(new MxEvent - { - Family = MxEventFamily.OnAlarmTransition, - OnAlarmTransition = new OnAlarmTransitionEvent - { - AlarmFullReference = "Tank01.Level.HiHi", - TransitionKind = AlarmTransitionKind.Raise, - Severity = 600, - TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), - }, - }); - await Task.Delay(150); + // The feed keeps running (it is session-less and shared), but with no active + // subscription the driver suppresses the bridged event. + feed.Emit(NewTransition("Tank01.Level.HiHi", "Tank01", + GalaxyAlarmTransitionKind.Raise, AlarmSeverity.High)); observed.ShouldBeEmpty(); } @@ -126,9 +83,9 @@ public sealed class GalaxyDriverAlarmSourceTests [Fact] public async Task UnsubscribeAlarmsAsync_throws_for_foreign_handle() { - var subscriber = new ManualSubscriber(); + var feed = new FakeAlarmFeed(); var ack = new RecordingAcknowledger(); - using var driver = NewDriver(subscriber, ack); + using var driver = NewDriver(feed, ack); var foreignHandle = new ForeignAlarmHandle(); await Should.ThrowAsync(() => @@ -138,9 +95,9 @@ public sealed class GalaxyDriverAlarmSourceTests [Fact] public async Task AcknowledgeAsync_routes_each_request_to_the_acknowledger() { - var subscriber = new ManualSubscriber(); + var feed = new FakeAlarmFeed(); var ack = new RecordingAcknowledger(); - using var driver = NewDriver(subscriber, ack); + using var driver = NewDriver(feed, ack); var requests = new[] { @@ -159,9 +116,9 @@ public sealed class GalaxyDriverAlarmSourceTests [Fact] public async Task AcknowledgeAsync_falls_back_to_SourceNodeId_when_ConditionId_empty() { - var subscriber = new ManualSubscriber(); + var feed = new FakeAlarmFeed(); var ack = new RecordingAcknowledger(); - using var driver = NewDriver(subscriber, ack); + using var driver = NewDriver(feed, ack); await driver.AcknowledgeAsync( [new AlarmAcknowledgeRequest("Tank01.Level.HiHi", string.Empty, null)], @@ -173,8 +130,8 @@ public sealed class GalaxyDriverAlarmSourceTests [Fact] public async Task AcknowledgeAsync_throws_NotSupported_without_acknowledger() { - var subscriber = new ManualSubscriber(); - using var driver = NewDriver(subscriber, alarmAcknowledger: null); + var feed = new FakeAlarmFeed(); + using var driver = NewDriver(feed, alarmAcknowledger: null); await Should.ThrowAsync(() => driver.AcknowledgeAsync( @@ -183,7 +140,7 @@ public sealed class GalaxyDriverAlarmSourceTests } private static GalaxyDriver NewDriver( - ManualSubscriber subscriber, IGalaxyAlarmAcknowledger? alarmAcknowledger) + IGalaxyAlarmFeed alarmFeed, IGalaxyAlarmAcknowledger? alarmAcknowledger) { var options = new GalaxyDriverOptions( new GalaxyGatewayOptions("http://localhost:5000", "literal-api-key"), @@ -194,10 +151,43 @@ public sealed class GalaxyDriverAlarmSourceTests driverInstanceId: "drv-1", options: options, hierarchySource: null, - dataReader: null, - dataWriter: null, - subscriber: subscriber, - alarmAcknowledger: alarmAcknowledger); + alarmAcknowledger: alarmAcknowledger, + alarmFeed: alarmFeed); + } + + private static GalaxyAlarmTransition NewTransition( + string alarmFullReference, + string sourceObjectReference, + GalaxyAlarmTransitionKind kind, + AlarmSeverity severity) + => new( + AlarmFullReference: alarmFullReference, + SourceObjectReference: sourceObjectReference, + AlarmTypeName: "AnalogLimitAlarm.HiHi", + TransitionKind: kind, + SeverityBucket: severity, + OpcUaSeverity: 800, + RawMxAccessSeverity: 750, + OriginalRaiseTimestampUtc: null, + TransitionTimestampUtc: DateTime.UtcNow, + OperatorUser: string.Empty, + OperatorComment: string.Empty, + Category: "Process", + Description: "Tank 01 high-high level"); + + /// In-memory the test drives directly. + private sealed class FakeAlarmFeed : IGalaxyAlarmFeed + { + public bool Started { get; private set; } + + public event EventHandler? OnAlarmTransition; + + public void Start() => Started = true; + + public void Emit(GalaxyAlarmTransition transition) + => OnAlarmTransition?.Invoke(this, transition); + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } private sealed class RecordingAcknowledger : IGalaxyAlarmAcknowledger @@ -215,30 +205,4 @@ public sealed class GalaxyDriverAlarmSourceTests { public string DiagnosticId => "foreign"; } - - private sealed class ManualSubscriber : IGalaxySubscriber - { - private readonly Channel _stream = - Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); - - public Task> SubscribeBulkAsync( - IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) - { - var results = new List(); - var nextHandle = 100; - foreach (var r in fullReferences) - { - results.Add(new SubscribeResult { TagAddress = r, ItemHandle = nextHandle++, WasSuccessful = true }); - } - return Task.FromResult>(results); - } - - public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) - => Task.CompletedTask; - - public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) - => _stream.Reader.ReadAllAsync(cancellationToken); - - public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev); - } } diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpAlarmTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpAlarmTests.cs deleted file mode 100644 index 43e712f..0000000 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpAlarmTests.cs +++ /dev/null @@ -1,239 +0,0 @@ -using System.Threading.Channels; -using Google.Protobuf.WellKnownTypes; -using MxGateway.Contracts.Proto; -using Shouldly; -using Xunit; -using ZB.MOM.WW.OtOpcUa.Core.Abstractions; -using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; - -namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; - -/// -/// PR B.1 — pins the EventPump's OnAlarmTransition decode path. Synthetic MxEvents -/// with the new family go in; the pump fires OnAlarmTransition with the -/// decoded payload + mapped severity bucket; data-change subscribers stay -/// untouched. -/// -public sealed class EventPumpAlarmTests -{ - [Fact] - public async Task Dispatches_raise_acknowledge_clear_in_sequence() - { - var subscriber = new ManualSubscriber(); - var registry = new SubscriptionRegistry(); - var transitions = new List(); - var dispatched = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - await using var pump = new EventPump(subscriber, registry, channelCapacity: 16, clientName: "AlarmTest"); - pump.OnAlarmTransition += (_, transition) => - { - lock (transitions) - { - transitions.Add(transition); - if (transitions.Count == 3) dispatched.TrySetResult(true); - } - }; - pump.Start(); - - var raise = new DateTime(2026, 5, 1, 12, 0, 0, DateTimeKind.Utc); - var ack = raise.AddSeconds(30); - var clear = ack.AddSeconds(60); - - await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", - AlarmTransitionKind.Raise, severity: 750, transitionTime: raise)); - await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", - AlarmTransitionKind.Acknowledge, severity: 750, transitionTime: ack, - originalRaise: raise, operatorUser: "alice", operatorComment: "investigating")); - await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", - AlarmTransitionKind.Clear, severity: 750, transitionTime: clear, - originalRaise: raise)); - - var completed = await Task.WhenAny(dispatched.Task, Task.Delay(TimeSpan.FromSeconds(2))); - completed.ShouldBe(dispatched.Task, "all three alarm transitions should dispatch within 2s"); - - transitions.Count.ShouldBe(3); - - transitions[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise); - transitions[0].SeverityBucket.ShouldBe(AlarmSeverity.Critical); - transitions[0].OpcUaSeverity.ShouldBe(MxAccessSeverityMapper.OpcUaSeverityCritical); - transitions[0].RawMxAccessSeverity.ShouldBe(750); - transitions[0].TransitionTimestampUtc.ShouldBe(raise); - - transitions[1].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Acknowledge); - transitions[1].OperatorUser.ShouldBe("alice"); - transitions[1].OperatorComment.ShouldBe("investigating"); - transitions[1].OriginalRaiseTimestampUtc.ShouldBe(raise); - - transitions[2].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Clear); - transitions[2].OriginalRaiseTimestampUtc.ShouldBe(raise); - } - - [Fact] - public async Task Drops_alarm_event_with_unspecified_transition_kind() - { - var subscriber = new ManualSubscriber(); - var registry = new SubscriptionRegistry(); - var transitions = new List(); - - await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "AlarmTest"); - pump.OnAlarmTransition += (_, transition) => transitions.Add(transition); - pump.Start(); - - await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", - AlarmTransitionKind.Unspecified, severity: 100, - transitionTime: DateTime.UtcNow)); - - // Give the pump a beat to drain the channel. - await Task.Delay(150); - - transitions.ShouldBeEmpty("alarm transitions with Unspecified kind are decoder failures and must not fire OnAlarmTransition"); - } - - [Fact] - public async Task Drops_alarm_event_with_missing_body() - { - var subscriber = new ManualSubscriber(); - var registry = new SubscriptionRegistry(); - var transitions = new List(); - - await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "AlarmTest"); - pump.OnAlarmTransition += (_, transition) => transitions.Add(transition); - pump.Start(); - - // Family marked as alarm-transition but body left empty (worker version skew / - // malformed event). Production should count + drop, not throw. - await subscriber.EmitRawAsync(new MxEvent - { - Family = MxEventFamily.OnAlarmTransition, - WorkerSequence = 42, - }); - - await Task.Delay(150); - - transitions.ShouldBeEmpty(); - } - - [Fact] - public async Task Mixed_data_change_and_alarm_events_dispatch_independently() - { - var subscriber = new ManualSubscriber(); - var registry = new SubscriptionRegistry(); - registry.Register(1, [new TagBinding("Tank01.Level", ItemHandle: 7)]); - - var dataChanges = new List(); - var alarms = new List(); - var bothSeen = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - await using var pump = new EventPump(subscriber, registry, channelCapacity: 16, clientName: "MixedTest"); - pump.OnDataChange += (_, args) => - { - lock (dataChanges) - { - dataChanges.Add(args); - if (dataChanges.Count >= 1 && alarms.Count >= 1) bothSeen.TrySetResult(true); - } - }; - pump.OnAlarmTransition += (_, transition) => - { - lock (alarms) - { - alarms.Add(transition); - if (dataChanges.Count >= 1 && alarms.Count >= 1) bothSeen.TrySetResult(true); - } - }; - pump.Start(); - - await subscriber.EmitAsync(itemHandle: 7, value: 41.0); - await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", - AlarmTransitionKind.Raise, severity: 600, transitionTime: DateTime.UtcNow)); - - var completed = await Task.WhenAny(bothSeen.Task, Task.Delay(TimeSpan.FromSeconds(2))); - completed.ShouldBe(bothSeen.Task); - - dataChanges.Count.ShouldBe(1); - alarms.Count.ShouldBe(1); - alarms[0].SeverityBucket.ShouldBe(AlarmSeverity.High); - } - - [Fact] - public async Task Filters_out_unsupported_event_families() - { - var subscriber = new ManualSubscriber(); - var registry = new SubscriptionRegistry(); - var transitions = new List(); - - await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "FilterTest"); - pump.OnAlarmTransition += (_, transition) => transitions.Add(transition); - pump.Start(); - - // OnWriteComplete and OperationComplete should be silently dropped. - await subscriber.EmitRawAsync(new MxEvent { Family = MxEventFamily.OnWriteComplete }); - await subscriber.EmitRawAsync(new MxEvent { Family = MxEventFamily.OperationComplete }); - - await Task.Delay(150); - - transitions.ShouldBeEmpty(); - } - - private static MxEvent NewAlarm( - string fullReference, - AlarmTransitionKind kind, - int severity, - DateTime transitionTime, - DateTime? originalRaise = null, - string operatorUser = "", - string operatorComment = "") - { - var body = new OnAlarmTransitionEvent - { - AlarmFullReference = fullReference, - SourceObjectReference = fullReference.Split('.')[0], - AlarmTypeName = "AnalogLimitAlarm.HiHi", - TransitionKind = kind, - Severity = severity, - TransitionTimestamp = Timestamp.FromDateTime(transitionTime), - OperatorUser = operatorUser, - OperatorComment = operatorComment, - Category = "Process", - Description = "Tank 01 high-high level", - }; - if (originalRaise is { } orts) - { - body.OriginalRaiseTimestamp = Timestamp.FromDateTime(orts); - } - return new MxEvent - { - Family = MxEventFamily.OnAlarmTransition, - OnAlarmTransition = body, - }; - } - - private sealed class ManualSubscriber : IGalaxySubscriber - { - private readonly Channel _stream = - Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); - - public Task> SubscribeBulkAsync( - IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) - => Task.FromResult>([]); - - public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) - => Task.CompletedTask; - - public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) - => _stream.Reader.ReadAllAsync(cancellationToken); - - public ValueTask EmitAsync(int itemHandle, double value) => - _stream.Writer.WriteAsync(new MxEvent - { - Family = MxEventFamily.OnDataChange, - ItemHandle = itemHandle, - Value = new MxValue { DoubleValue = value }, - Quality = 192, - SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), - }); - - public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev); - public ValueTask EmitRawAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev); - } -} diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyAlarmFeedTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyAlarmFeedTests.cs new file mode 100644 index 0000000..5c2e11b --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyAlarmFeedTests.cs @@ -0,0 +1,213 @@ +using System.Runtime.CompilerServices; +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; + +/// +/// Pins — the session-less consumer of the +/// gateway's StreamAlarms feed. Synthetic s go +/// in through the stream-factory seam; the feed fires OnAlarmTransition with +/// decoded payloads and mapped severity buckets, drops malformed messages, and +/// re-opens the stream after a transport fault. +/// +public sealed class GatewayGalaxyAlarmFeedTests +{ + [Fact] + public async Task Decodes_active_alarm_snapshot_then_live_transition() + { + var raise = new DateTime(2026, 5, 1, 12, 0, 0, DateTimeKind.Utc); + var messages = new[] + { + SnapshotMessage("Tank01.Level.HiHi", AlarmConditionState.Active, severity: 750, + lastTransition: raise), + SnapshotMessage("Tank02.Level.HiHi", AlarmConditionState.ActiveAcked, severity: 500, + lastTransition: raise, operatorUser: "alice", operatorComment: "investigating"), + new AlarmFeedMessage { SnapshotComplete = true }, + TransitionMessage("Tank01.Level.HiHi", AlarmTransitionKind.Clear, severity: 750, + transitionTime: raise.AddMinutes(5), originalRaise: raise), + }; + + var observed = new List(); + var got3 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await using var feed = new GatewayGalaxyAlarmFeed( + (_, ct) => OpenStream(messages, ct), clientName: "FeedTest"); + feed.OnAlarmTransition += (_, t) => + { + lock (observed) + { + observed.Add(t); + if (observed.Count == 3) got3.TrySetResult(true); + } + }; + feed.Start(); + + (await Task.WhenAny(got3.Task, Task.Delay(TimeSpan.FromSeconds(2)))) + .ShouldBe(got3.Task, "snapshot + transition should dispatch within 2s"); + + observed.Count.ShouldBe(3); + + // Active snapshot entry → Raise. + observed[0].AlarmFullReference.ShouldBe("Tank01.Level.HiHi"); + observed[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise); + observed[0].SeverityBucket.ShouldBe(AlarmSeverity.Critical); + observed[0].RawMxAccessSeverity.ShouldBe(750); + + // Acknowledged snapshot entry → Acknowledge, operator fields preserved. + observed[1].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Acknowledge); + observed[1].OperatorUser.ShouldBe("alice"); + observed[1].OperatorComment.ShouldBe("investigating"); + + // Live transition after snapshot_complete. + observed[2].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Clear); + observed[2].OriginalRaiseTimestampUtc.ShouldBe(raise); + } + + [Fact] + public async Task Drops_transition_with_unspecified_kind_and_empty_message() + { + var messages = new[] + { + TransitionMessage("Tank01.Level.HiHi", AlarmTransitionKind.Unspecified, severity: 100, + transitionTime: DateTime.UtcNow), + new AlarmFeedMessage(), // empty oneof — version skew + TransitionMessage("Tank01.Level.HiHi", AlarmTransitionKind.Raise, severity: 600, + transitionTime: DateTime.UtcNow), + }; + + var observed = new List(); + var gotOne = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await using var feed = new GatewayGalaxyAlarmFeed( + (_, ct) => OpenStream(messages, ct), clientName: "FeedTest"); + feed.OnAlarmTransition += (_, t) => + { + lock (observed) + { + observed.Add(t); + gotOne.TrySetResult(true); + } + }; + feed.Start(); + + (await Task.WhenAny(gotOne.Task, Task.Delay(TimeSpan.FromSeconds(2)))) + .ShouldBe(gotOne.Task); + + // Only the well-formed Raise survives; the Unspecified + empty messages drop. + observed.ShouldHaveSingleItem(); + observed[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise); + observed[0].SeverityBucket.ShouldBe(AlarmSeverity.High); + } + + [Fact] + public async Task Reopens_stream_after_a_transport_fault() + { + var calls = 0; + var liveTransition = new[] + { + TransitionMessage("Tank01.Level.HiHi", AlarmTransitionKind.Raise, severity: 750, + transitionTime: DateTime.UtcNow), + }; + + var observed = new List(); + var gotOne = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await using var feed = new GatewayGalaxyAlarmFeed( + (_, ct) => + { + // First open faults; the feed must reconnect and succeed on the retry. + if (Interlocked.Increment(ref calls) == 1) + { + throw new InvalidOperationException("synthetic stream fault"); + } + return OpenStream(liveTransition, ct); + }, + clientName: "ReconnectTest", + reconnectDelay: TimeSpan.FromMilliseconds(20)); + feed.OnAlarmTransition += (_, t) => + { + observed.Add(t); + gotOne.TrySetResult(true); + }; + feed.Start(); + + (await Task.WhenAny(gotOne.Task, Task.Delay(TimeSpan.FromSeconds(3)))) + .ShouldBe(gotOne.Task, "the feed should reopen the stream and deliver after a fault"); + + calls.ShouldBeGreaterThanOrEqualTo(2); + observed.ShouldHaveSingleItem(); + observed[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise); + } + + /// + /// Yields each message in order, then holds the stream open until the feed is + /// disposed — mirrors a live server-streaming RPC that does not complete on its + /// own. + /// + private static async IAsyncEnumerable OpenStream( + IEnumerable messages, + [EnumeratorCancellation] CancellationToken ct = default) + { + foreach (var message in messages) + { + ct.ThrowIfCancellationRequested(); + yield return message; + await Task.Yield(); + } + await Task.Delay(Timeout.Infinite, ct); + } + + private static AlarmFeedMessage SnapshotMessage( + string fullReference, + AlarmConditionState state, + int severity, + DateTime lastTransition, + string operatorUser = "", + string operatorComment = "") + => new() + { + ActiveAlarm = new ActiveAlarmSnapshot + { + AlarmFullReference = fullReference, + SourceObjectReference = fullReference.Split('.')[0], + AlarmTypeName = "AnalogLimitAlarm.HiHi", + Severity = severity, + CurrentState = state, + Category = "Process", + Description = "Tank high-high level", + LastTransitionTimestamp = Timestamp.FromDateTime(lastTransition), + OperatorUser = operatorUser, + OperatorComment = operatorComment, + }, + }; + + private static AlarmFeedMessage TransitionMessage( + string fullReference, + AlarmTransitionKind kind, + int severity, + DateTime transitionTime, + DateTime? originalRaise = null) + { + var body = new OnAlarmTransitionEvent + { + AlarmFullReference = fullReference, + SourceObjectReference = fullReference.Split('.')[0], + AlarmTypeName = "AnalogLimitAlarm.HiHi", + TransitionKind = kind, + Severity = severity, + TransitionTimestamp = Timestamp.FromDateTime(transitionTime), + Category = "Process", + Description = "Tank high-high level", + }; + if (originalRaise is { } orts) + { + body.OriginalRaiseTimestamp = Timestamp.FromDateTime(orts); + } + return new AlarmFeedMessage { Transition = body }; + } +}