diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs index 515c063..672fedc 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs @@ -26,7 +26,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy; /// "GalaxyMxGateway" so both paths can be live simultaneously during parity testing. /// public sealed class GalaxyDriver - : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IDisposable + : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IAlarmSource, IDisposable { private readonly string _driverInstanceId; private readonly GalaxyDriverOptions _options; @@ -63,6 +63,16 @@ public sealed class GalaxyDriver private EventPump? _eventPump; private readonly Lock _pumpLock = new(); + // PR B.2 — IAlarmSource implementation. Production-side acks route through + // GatewayGalaxyAlarmAcknowledger which calls MxGatewayClient.AcknowledgeAlarmAsync + // (PR E.2 SDK). Tests inject IGalaxyAlarmAcknowledger via the internal ctor to + // exercise the wiring without a running gateway. The alarm event stream is + // delivered by EventPump.OnAlarmTransition (PR B.1) — this driver is the + // consumer that bridges it onto IAlarmSource.OnAlarmEvent. + private IGalaxyAlarmAcknowledger? _alarmAcknowledger; + private readonly Lock _alarmHandlersLock = new(); + private readonly HashSet _alarmSubscriptions = new(); + // PR 4.W — production runtime owned by InitializeAsync. The driver builds these // when it opens a real gw session; tests bypass them by injecting seams via the // internal ctor. @@ -99,12 +109,16 @@ public sealed class GalaxyDriver /// Fires when a host transitions Running ↔ Stopped (PR 4.7 HostStatusAggregator). public event EventHandler? OnHostStatusChanged; + /// + public event EventHandler? OnAlarmEvent; + public GalaxyDriver( string driverInstanceId, GalaxyDriverOptions options, ILogger? logger = null) : this(driverInstanceId, options, - hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null, logger) + hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null, + alarmAcknowledger: null, logger) { } @@ -121,6 +135,7 @@ public sealed class GalaxyDriver IGalaxyDataReader? dataReader = null, IGalaxyDataWriter? dataWriter = null, IGalaxySubscriber? subscriber = null, + IGalaxyAlarmAcknowledger? alarmAcknowledger = null, ILogger? logger = null) { _driverInstanceId = !string.IsNullOrWhiteSpace(driverInstanceId) @@ -132,6 +147,7 @@ public sealed class GalaxyDriver _dataReader = dataReader; _dataWriter = dataWriter; _subscriber = subscriber; + _alarmAcknowledger = alarmAcknowledger; // Forward the aggregator's transitions through IHostConnectivityProbe. _hostStatuses.OnHostStatusChanged += (_, args) => OnHostStatusChanged?.Invoke(this, args); @@ -213,6 +229,9 @@ public sealed class GalaxyDriver _probeWatcher = new PerPlatformProbeWatcher( _subscriber, _hostStatuses, _logger, bufferedUpdateIntervalMs: _options.MxAccess.PublishingIntervalMs); + + // PR B.2 — wire the alarm acknowledger to the live gateway client. + _alarmAcknowledger ??= new GatewayGalaxyAlarmAcknowledger(_ownedMxClient, _ownedMxSession, _logger); } /// @@ -705,11 +724,132 @@ public sealed class GalaxyDriver channelCapacity: _options.MxAccess.EventPumpChannelCapacity, clientName: _options.MxAccess.ClientName); _eventPump.OnDataChange += OnPumpDataChange; + _eventPump.OnAlarmTransition += OnPumpAlarmTransition; _eventPump.Start(); return _eventPump; } } + // ===== IAlarmSource (PR B.2) ===== + + /// + public Task SubscribeAlarmsAsync( + IReadOnlyList sourceNodeIds, CancellationToken cancellationToken) + { + ObjectDisposedException.ThrowIf(_disposed, this); + ArgumentNullException.ThrowIfNull(sourceNodeIds); + + // The driver doesn't multiplex alarm subscriptions per source-node-id today — + // alarm events arrive on the same gateway StreamEvents channel as data-change + // events once the gateway emits the new family (PRs A.2 + A.3). The + // subscription handle is a sentinel the server uses for symmetric Unsubscribe; + // every active handle receives every alarm transition, and the server filters + // by source node before raising Part 9 conditions. Same shape AbCip uses. + EnsureEventPumpStarted(); + var handle = new GalaxyAlarmSubscriptionHandle(Guid.NewGuid().ToString("N")); + lock (_alarmHandlersLock) + { + _alarmSubscriptions.Add(handle); + } + return Task.FromResult(handle); + } + + /// + public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) + { + ObjectDisposedException.ThrowIf(_disposed, this); + ArgumentNullException.ThrowIfNull(handle); + if (handle is not GalaxyAlarmSubscriptionHandle gash) + { + throw new ArgumentException( + $"Subscription handle was not issued by this driver (expected GalaxyAlarmSubscriptionHandle, got {handle.GetType().Name}).", + nameof(handle)); + } + lock (_alarmHandlersLock) + { + _alarmSubscriptions.Remove(gash); + } + return Task.CompletedTask; + } + + /// + public async Task AcknowledgeAsync( + IReadOnlyList acknowledgements, CancellationToken cancellationToken) + { + ObjectDisposedException.ThrowIf(_disposed, this); + ArgumentNullException.ThrowIfNull(acknowledgements); + if (acknowledgements.Count == 0) return; + + if (_alarmAcknowledger is null) + { + throw new NotSupportedException( + "GalaxyDriver.AcknowledgeAsync requires GatewayGalaxyAlarmAcknowledger wired against a connected " + + "GalaxyMxSession (PR B.2). InitializeAsync must run before alarm acknowledgements can flow."); + } + + // Acks are issued one-by-one — the gateway RPC accepts a single alarm + // reference per call. AlarmConditionState's per-condition Acknowledge in the + // server-side ACL layer is the natural rate-limit, so issuing in series here + // keeps the operator-comment ordering deterministic without bursting the + // worker's STA queue. + foreach (var ack in acknowledgements) + { + // ConditionId carries the alarm full reference for the Galaxy driver — + // SourceNodeId is the OPC UA browse path, which the gateway can't address. + // The server-side condition state pairs them through AlarmConditionService. + var alarmFullReference = !string.IsNullOrEmpty(ack.ConditionId) + ? ack.ConditionId + : ack.SourceNodeId; + await _alarmAcknowledger.AcknowledgeAsync( + alarmFullReference, + ack.Comment ?? string.Empty, + operatorUser: string.Empty, // server-side ACL fills this from the OPC UA session + cancellationToken).ConfigureAwait(false); + } + } + + /// + /// Receives events from the EventPump and + /// reshapes them into for OPC UA-side consumers. + /// Fires only when at least one alarm subscription is + /// active so a server that hasn't called yet + /// doesn't surface untracked transitions. + /// + private void OnPumpAlarmTransition(object? sender, GalaxyAlarmTransition transition) + { + GalaxyAlarmSubscriptionHandle? handle; + lock (_alarmHandlersLock) + { + // Pick any active subscription handle as the "owner" of the event. The + // server-side state machine doesn't multiplex by handle today; if multiple + // alarm subscriptions are active we still only fire the event once and + // the AlarmConditionService dispatches per-source-node downstream. + handle = _alarmSubscriptions.Count > 0 + ? _alarmSubscriptions.First() + : null; + } + if (handle is null) return; + + var args = new AlarmEventArgs( + SubscriptionHandle: handle, + SourceNodeId: transition.SourceObjectReference, + ConditionId: transition.AlarmFullReference, + AlarmType: transition.AlarmTypeName, + Message: transition.Description, + Severity: transition.SeverityBucket, + SourceTimestampUtc: transition.TransitionTimestampUtc); + try + { + OnAlarmEvent?.Invoke(this, args); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "GalaxyDriver OnAlarmEvent handler threw for {AlarmRef} — continuing.", + transition.AlarmFullReference); + } + } + /// /// Forwards every fan-out event to the public for /// ISubscribable consumers, AND routes ScanState changes to the per-platform diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyAlarmSubscriptionHandle.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyAlarmSubscriptionHandle.cs new file mode 100644 index 0000000..902f5ed --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxyAlarmSubscriptionHandle.cs @@ -0,0 +1,21 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// Driver-side handle returned by . +/// The driver doesn't multiplex alarm transitions per handle — every active handle +/// observes the gateway's alarm-event stream — but the handle is needed for +/// symmetric Unsubscribe and for the server-side AlarmConditionService to +/// correlate transitions with the originating subscription. +/// +internal sealed class GalaxyAlarmSubscriptionHandle : IAlarmSubscriptionHandle +{ + public GalaxyAlarmSubscriptionHandle(string diagnosticId) + { + DiagnosticId = diagnosticId; + } + + /// + public string DiagnosticId { get; } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmAcknowledger.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmAcknowledger.cs new file mode 100644 index 0000000..8488124 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyAlarmAcknowledger.cs @@ -0,0 +1,65 @@ +using Microsoft.Extensions.Logging; +using MxGateway.Client; +using MxGateway.Contracts.Proto; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// Production backed by the +/// MxGatewayClient.AcknowledgeAlarmAsync RPC (PR E.2). Maps the +/// reply's protocol status into a thrown exception when the gateway +/// reports a non-OK condition; native MxStatus failures inside the reply +/// surface as a logged warning so operator workflows aren't blocked by a +/// transient MxAccess hiccup. +/// +internal sealed class GatewayGalaxyAlarmAcknowledger : IGalaxyAlarmAcknowledger +{ + private readonly MxGatewayClient _client; + private readonly GalaxyMxSession _session; + private readonly ILogger _logger; + + public GatewayGalaxyAlarmAcknowledger( + MxGatewayClient client, + GalaxyMxSession session, + ILogger logger) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + _session = session ?? throw new ArgumentNullException(nameof(session)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task AcknowledgeAsync( + string alarmFullReference, + string comment, + string operatorUser, + CancellationToken cancellationToken) + { + ArgumentException.ThrowIfNullOrEmpty(alarmFullReference); + + var session = _session.Session + ?? throw new InvalidOperationException( + "GatewayGalaxyAlarmAcknowledger requires a connected GalaxyMxSession; underlying gateway session is null."); + var sessionId = session.SessionId; + + var reply = await _client.AcknowledgeAlarmAsync( + new AcknowledgeAlarmRequest + { + SessionId = sessionId, + ClientCorrelationId = Guid.NewGuid().ToString("N"), + AlarmFullReference = alarmFullReference, + Comment = comment ?? string.Empty, + OperatorUser = operatorUser ?? string.Empty, + }, + cancellationToken).ConfigureAwait(false); + + if (reply.Status is { Success: 0 } status) + { + // Native MxAccess rejected the ack — log but don't throw. Treat as a + // best-effort operator workflow; the operator can retry via the OPC UA + // session if necessary. + _logger.LogWarning( + "Galaxy AcknowledgeAlarm for {AlarmRef} returned MxStatus failure: category={Category} detail={Detail} text={Text}", + alarmFullReference, status.Category, status.Detail, status.DiagnosticText); + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/IGalaxyAlarmAcknowledger.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/IGalaxyAlarmAcknowledger.cs new file mode 100644 index 0000000..aa9b1c8 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/IGalaxyAlarmAcknowledger.cs @@ -0,0 +1,32 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +/// +/// Test seam for the gateway-side Acknowledge call. Production wraps the +/// MxGatewayClient.AcknowledgeAlarmAsync RPC; tests substitute a fake +/// so can be exercised without a +/// running gateway. +/// +internal interface IGalaxyAlarmAcknowledger +{ + /// + /// Forward a single alarm acknowledgement to the gateway. The gateway + /// translates this to an MxAccess Acknowledge call against the worker's + /// session and returns the native MxStatus on the reply. + /// + /// + /// Fully-qualified alarm reference (e.g. "Tank01.Level.HiHi"). + /// + /// Operator-supplied comment forwarded to MxAccess. + /// + /// Operator principal performing the acknowledgement. Resolved from the + /// OPC UA session by the server-side ACL layer before reaching the driver. + /// + /// Cancels the gateway RPC. + Task AcknowledgeAsync( + string alarmFullReference, + string comment, + string operatorUser, + CancellationToken cancellationToken); +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmSourceTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmSourceTests.cs new file mode 100644 index 0000000..dbe6564 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/GalaxyDriverAlarmSourceTests.cs @@ -0,0 +1,244 @@ +using System.Threading.Channels; +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests; + +/// +/// PR B.2 — pins GalaxyDriver's IAlarmSource implementation. The driver bridges +/// EventPump.OnAlarmTransition (PR B.1) onto IAlarmSource.OnAlarmEvent and +/// forwards Acknowledge through IGalaxyAlarmAcknowledger (production: +/// GatewayGalaxyAlarmAcknowledger calling the gateway's AcknowledgeAlarm RPC +/// from PR E.2). +/// +public sealed class GalaxyDriverAlarmSourceTests +{ + [Fact] + public async Task SubscribeAlarmsAsync_returns_handle_and_event_fires_after_pump_alarm() + { + var subscriber = new ManualSubscriber(); + var ack = new RecordingAcknowledger(); + using var driver = NewDriver(subscriber, ack); + + // Subscribe so OnAlarmEvent has a registered handle to fire under. + var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None); + handle.ShouldNotBeNull(); + + var observed = new List(); + driver.OnAlarmEvent += (_, args) => observed.Add(args); + + // SubscribeAsync to start the EventPump (alarm wiring is lazy on first sub). + await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None); + + await subscriber.EmitAlarmAsync(new MxEvent + { + Family = MxEventFamily.OnAlarmTransition, + OnAlarmTransition = new OnAlarmTransitionEvent + { + AlarmFullReference = "Tank01.Level.HiHi", + SourceObjectReference = "Tank01", + AlarmTypeName = "AnalogLimitAlarm.HiHi", + TransitionKind = AlarmTransitionKind.Raise, + Severity = 750, + TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + Description = "Tank 01 high-high level", + }, + }); + + // Drain pump events. + for (var i = 0; i < 20 && observed.Count == 0; i++) + { + await Task.Delay(50); + } + + observed.ShouldHaveSingleItem(); + observed[0].ConditionId.ShouldBe("Tank01.Level.HiHi"); + observed[0].SourceNodeId.ShouldBe("Tank01"); + observed[0].AlarmType.ShouldBe("AnalogLimitAlarm.HiHi"); + observed[0].Severity.ShouldBe(AlarmSeverity.Critical); + observed[0].SubscriptionHandle.ShouldBe(handle); + } + + [Fact] + public async Task OnAlarmEvent_does_not_fire_when_no_subscription_active() + { + var subscriber = new ManualSubscriber(); + var ack = new RecordingAcknowledger(); + using var driver = NewDriver(subscriber, ack); + + var observed = new List(); + driver.OnAlarmEvent += (_, args) => observed.Add(args); + + // Start the pump via a data subscription so alarm events flow but no alarm + // subscription is registered → OnAlarmEvent is suppressed. + await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None); + await subscriber.EmitAlarmAsync(new MxEvent + { + Family = MxEventFamily.OnAlarmTransition, + OnAlarmTransition = new OnAlarmTransitionEvent + { + AlarmFullReference = "Tank01.Level.HiHi", + TransitionKind = AlarmTransitionKind.Raise, + Severity = 600, + TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }, + }); + await Task.Delay(150); + + observed.ShouldBeEmpty(); + } + + [Fact] + public async Task UnsubscribeAlarmsAsync_stops_event_flow() + { + var subscriber = new ManualSubscriber(); + var ack = new RecordingAcknowledger(); + using var driver = NewDriver(subscriber, ack); + + var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None); + var observed = new List(); + driver.OnAlarmEvent += (_, args) => observed.Add(args); + await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None); + + await driver.UnsubscribeAlarmsAsync(handle, CancellationToken.None); + + await subscriber.EmitAlarmAsync(new MxEvent + { + Family = MxEventFamily.OnAlarmTransition, + OnAlarmTransition = new OnAlarmTransitionEvent + { + AlarmFullReference = "Tank01.Level.HiHi", + TransitionKind = AlarmTransitionKind.Raise, + Severity = 600, + TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }, + }); + await Task.Delay(150); + + observed.ShouldBeEmpty(); + } + + [Fact] + public async Task UnsubscribeAlarmsAsync_throws_for_foreign_handle() + { + var subscriber = new ManualSubscriber(); + var ack = new RecordingAcknowledger(); + using var driver = NewDriver(subscriber, ack); + + var foreignHandle = new ForeignAlarmHandle(); + await Should.ThrowAsync(() => + driver.UnsubscribeAlarmsAsync(foreignHandle, CancellationToken.None)); + } + + [Fact] + public async Task AcknowledgeAsync_routes_each_request_to_the_acknowledger() + { + var subscriber = new ManualSubscriber(); + var ack = new RecordingAcknowledger(); + using var driver = NewDriver(subscriber, ack); + + var requests = new[] + { + new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", "shift handover"), + new AlarmAcknowledgeRequest("Tank02", "Tank02.Level.HiHi", "investigating"), + }; + + await driver.AcknowledgeAsync(requests, CancellationToken.None); + + ack.Calls.Count.ShouldBe(2); + ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi"); + ack.Calls[0].Comment.ShouldBe("shift handover"); + ack.Calls[1].AlarmRef.ShouldBe("Tank02.Level.HiHi"); + } + + [Fact] + public async Task AcknowledgeAsync_falls_back_to_SourceNodeId_when_ConditionId_empty() + { + var subscriber = new ManualSubscriber(); + var ack = new RecordingAcknowledger(); + using var driver = NewDriver(subscriber, ack); + + await driver.AcknowledgeAsync( + [new AlarmAcknowledgeRequest("Tank01.Level.HiHi", string.Empty, null)], + CancellationToken.None); + + ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi"); + } + + [Fact] + public async Task AcknowledgeAsync_throws_NotSupported_without_acknowledger() + { + var subscriber = new ManualSubscriber(); + using var driver = NewDriver(subscriber, alarmAcknowledger: null); + + await Should.ThrowAsync(() => + driver.AcknowledgeAsync( + [new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", null)], + CancellationToken.None)); + } + + private static GalaxyDriver NewDriver( + ManualSubscriber subscriber, IGalaxyAlarmAcknowledger? alarmAcknowledger) + { + var options = new GalaxyDriverOptions( + new GalaxyGatewayOptions("http://localhost:5000", "literal-api-key"), + new GalaxyMxAccessOptions("AlarmSourceTest"), + new GalaxyRepositoryOptions(), + new GalaxyReconnectOptions()); + return new GalaxyDriver( + driverInstanceId: "drv-1", + options: options, + hierarchySource: null, + dataReader: null, + dataWriter: null, + subscriber: subscriber, + alarmAcknowledger: alarmAcknowledger); + } + + private sealed class RecordingAcknowledger : IGalaxyAlarmAcknowledger + { + public List<(string AlarmRef, string Comment, string Operator)> Calls { get; } = []; + + public Task AcknowledgeAsync(string alarmFullReference, string comment, string operatorUser, CancellationToken cancellationToken) + { + Calls.Add((alarmFullReference, comment, operatorUser)); + return Task.CompletedTask; + } + } + + private sealed class ForeignAlarmHandle : IAlarmSubscriptionHandle + { + public string DiagnosticId => "foreign"; + } + + private sealed class ManualSubscriber : IGalaxySubscriber + { + private readonly Channel _stream = + Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + + public Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + { + var results = new List(); + var nextHandle = 100; + foreach (var r in fullReferences) + { + results.Add(new SubscribeResult { TagAddress = r, ItemHandle = nextHandle++, WasSuccessful = true }); + } + return Task.FromResult>(results); + } + + public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + => Task.CompletedTask; + + public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) + => _stream.Reader.ReadAllAsync(cancellationToken); + + public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev); + } +}