driver-galaxy: GalaxyDriver implements IAlarmSource (PR B.2) #413

Merged
dohertj2 merged 1 commits from track-b2-galaxy-driver-ialarmsource into master 2026-04-30 17:18:22 -04:00
5 changed files with 504 additions and 2 deletions

View File

@@ -26,7 +26,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy;
/// "GalaxyMxGateway" so both paths can be live simultaneously during parity testing.
/// </remarks>
public sealed class GalaxyDriver
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IDisposable
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IAlarmSource, IDisposable
{
private readonly string _driverInstanceId;
private readonly GalaxyDriverOptions _options;
@@ -63,6 +63,16 @@ public sealed class GalaxyDriver
private EventPump? _eventPump;
private readonly Lock _pumpLock = new();
// PR B.2 — IAlarmSource implementation. Production-side acks route through
// GatewayGalaxyAlarmAcknowledger which calls MxGatewayClient.AcknowledgeAlarmAsync
// (PR E.2 SDK). Tests inject IGalaxyAlarmAcknowledger via the internal ctor to
// exercise the wiring without a running gateway. The alarm event stream is
// delivered by EventPump.OnAlarmTransition (PR B.1) — this driver is the
// consumer that bridges it onto IAlarmSource.OnAlarmEvent.
private IGalaxyAlarmAcknowledger? _alarmAcknowledger;
private readonly Lock _alarmHandlersLock = new();
private readonly HashSet<GalaxyAlarmSubscriptionHandle> _alarmSubscriptions = new();
// PR 4.W — production runtime owned by InitializeAsync. The driver builds these
// when it opens a real gw session; tests bypass them by injecting seams via the
// internal ctor.
@@ -99,12 +109,16 @@ public sealed class GalaxyDriver
/// <summary>Fires when a host transitions Running ↔ Stopped (PR 4.7 HostStatusAggregator).</summary>
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
/// <inheritdoc />
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
public GalaxyDriver(
string driverInstanceId,
GalaxyDriverOptions options,
ILogger<GalaxyDriver>? logger = null)
: this(driverInstanceId, options,
hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null, logger)
hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null,
alarmAcknowledger: null, logger)
{
}
@@ -121,6 +135,7 @@ public sealed class GalaxyDriver
IGalaxyDataReader? dataReader = null,
IGalaxyDataWriter? dataWriter = null,
IGalaxySubscriber? subscriber = null,
IGalaxyAlarmAcknowledger? alarmAcknowledger = null,
ILogger<GalaxyDriver>? logger = null)
{
_driverInstanceId = !string.IsNullOrWhiteSpace(driverInstanceId)
@@ -132,6 +147,7 @@ public sealed class GalaxyDriver
_dataReader = dataReader;
_dataWriter = dataWriter;
_subscriber = subscriber;
_alarmAcknowledger = alarmAcknowledger;
// Forward the aggregator's transitions through IHostConnectivityProbe.
_hostStatuses.OnHostStatusChanged += (_, args) => OnHostStatusChanged?.Invoke(this, args);
@@ -213,6 +229,9 @@ public sealed class GalaxyDriver
_probeWatcher = new PerPlatformProbeWatcher(
_subscriber, _hostStatuses, _logger,
bufferedUpdateIntervalMs: _options.MxAccess.PublishingIntervalMs);
// PR B.2 — wire the alarm acknowledger to the live gateway client.
_alarmAcknowledger ??= new GatewayGalaxyAlarmAcknowledger(_ownedMxClient, _ownedMxSession, _logger);
}
/// <summary>
@@ -705,11 +724,132 @@ public sealed class GalaxyDriver
channelCapacity: _options.MxAccess.EventPumpChannelCapacity,
clientName: _options.MxAccess.ClientName);
_eventPump.OnDataChange += OnPumpDataChange;
_eventPump.OnAlarmTransition += OnPumpAlarmTransition;
_eventPump.Start();
return _eventPump;
}
}
// ===== IAlarmSource (PR B.2) =====
/// <inheritdoc />
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(sourceNodeIds);
// The driver doesn't multiplex alarm subscriptions per source-node-id today —
// alarm events arrive on the same gateway StreamEvents channel as data-change
// events once the gateway emits the new family (PRs A.2 + A.3). The
// subscription handle is a sentinel the server uses for symmetric Unsubscribe;
// every active handle receives every alarm transition, and the server filters
// by source node before raising Part 9 conditions. Same shape AbCip uses.
EnsureEventPumpStarted();
var handle = new GalaxyAlarmSubscriptionHandle(Guid.NewGuid().ToString("N"));
lock (_alarmHandlersLock)
{
_alarmSubscriptions.Add(handle);
}
return Task.FromResult<IAlarmSubscriptionHandle>(handle);
}
/// <inheritdoc />
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(handle);
if (handle is not GalaxyAlarmSubscriptionHandle gash)
{
throw new ArgumentException(
$"Subscription handle was not issued by this driver (expected GalaxyAlarmSubscriptionHandle, got {handle.GetType().Name}).",
nameof(handle));
}
lock (_alarmHandlersLock)
{
_alarmSubscriptions.Remove(gash);
}
return Task.CompletedTask;
}
/// <inheritdoc />
public async Task AcknowledgeAsync(
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ArgumentNullException.ThrowIfNull(acknowledgements);
if (acknowledgements.Count == 0) return;
if (_alarmAcknowledger is null)
{
throw new NotSupportedException(
"GalaxyDriver.AcknowledgeAsync requires GatewayGalaxyAlarmAcknowledger wired against a connected " +
"GalaxyMxSession (PR B.2). InitializeAsync must run before alarm acknowledgements can flow.");
}
// Acks are issued one-by-one — the gateway RPC accepts a single alarm
// reference per call. AlarmConditionState's per-condition Acknowledge in the
// server-side ACL layer is the natural rate-limit, so issuing in series here
// keeps the operator-comment ordering deterministic without bursting the
// worker's STA queue.
foreach (var ack in acknowledgements)
{
// ConditionId carries the alarm full reference for the Galaxy driver —
// SourceNodeId is the OPC UA browse path, which the gateway can't address.
// The server-side condition state pairs them through AlarmConditionService.
var alarmFullReference = !string.IsNullOrEmpty(ack.ConditionId)
? ack.ConditionId
: ack.SourceNodeId;
await _alarmAcknowledger.AcknowledgeAsync(
alarmFullReference,
ack.Comment ?? string.Empty,
operatorUser: string.Empty, // server-side ACL fills this from the OPC UA session
cancellationToken).ConfigureAwait(false);
}
}
/// <summary>
/// Receives <see cref="GalaxyAlarmTransition"/> events from the EventPump and
/// reshapes them into <see cref="AlarmEventArgs"/> for OPC UA-side consumers.
/// Fires <see cref="OnAlarmEvent"/> only when at least one alarm subscription is
/// active so a server that hasn't called <see cref="SubscribeAlarmsAsync"/> yet
/// doesn't surface untracked transitions.
/// </summary>
private void OnPumpAlarmTransition(object? sender, GalaxyAlarmTransition transition)
{
GalaxyAlarmSubscriptionHandle? handle;
lock (_alarmHandlersLock)
{
// Pick any active subscription handle as the "owner" of the event. The
// server-side state machine doesn't multiplex by handle today; if multiple
// alarm subscriptions are active we still only fire the event once and
// the AlarmConditionService dispatches per-source-node downstream.
handle = _alarmSubscriptions.Count > 0
? _alarmSubscriptions.First()
: null;
}
if (handle is null) return;
var args = new AlarmEventArgs(
SubscriptionHandle: handle,
SourceNodeId: transition.SourceObjectReference,
ConditionId: transition.AlarmFullReference,
AlarmType: transition.AlarmTypeName,
Message: transition.Description,
Severity: transition.SeverityBucket,
SourceTimestampUtc: transition.TransitionTimestampUtc);
try
{
OnAlarmEvent?.Invoke(this, args);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"GalaxyDriver OnAlarmEvent handler threw for {AlarmRef} — continuing.",
transition.AlarmFullReference);
}
}
/// <summary>
/// Forwards every fan-out event to the public <see cref="OnDataChange"/> for
/// ISubscribable consumers, AND routes ScanState changes to the per-platform

View File

@@ -0,0 +1,21 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
/// <summary>
/// Driver-side handle returned by <see cref="GalaxyDriver.SubscribeAlarmsAsync"/>.
/// The driver doesn't multiplex alarm transitions per handle — every active handle
/// observes the gateway's alarm-event stream — but the handle is needed for
/// symmetric Unsubscribe and for the server-side AlarmConditionService to
/// correlate transitions with the originating subscription.
/// </summary>
internal sealed class GalaxyAlarmSubscriptionHandle : IAlarmSubscriptionHandle
{
public GalaxyAlarmSubscriptionHandle(string diagnosticId)
{
DiagnosticId = diagnosticId;
}
/// <inheritdoc />
public string DiagnosticId { get; }
}

View File

@@ -0,0 +1,65 @@
using Microsoft.Extensions.Logging;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
/// <summary>
/// Production <see cref="IGalaxyAlarmAcknowledger"/> backed by the
/// <c>MxGatewayClient.AcknowledgeAlarmAsync</c> RPC (PR E.2). Maps the
/// reply's protocol status into a thrown exception when the gateway
/// reports a non-OK condition; native MxStatus failures inside the reply
/// surface as a logged warning so operator workflows aren't blocked by a
/// transient MxAccess hiccup.
/// </summary>
internal sealed class GatewayGalaxyAlarmAcknowledger : IGalaxyAlarmAcknowledger
{
private readonly MxGatewayClient _client;
private readonly GalaxyMxSession _session;
private readonly ILogger _logger;
public GatewayGalaxyAlarmAcknowledger(
MxGatewayClient client,
GalaxyMxSession session,
ILogger logger)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
_session = session ?? throw new ArgumentNullException(nameof(session));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task AcknowledgeAsync(
string alarmFullReference,
string comment,
string operatorUser,
CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(alarmFullReference);
var session = _session.Session
?? throw new InvalidOperationException(
"GatewayGalaxyAlarmAcknowledger requires a connected GalaxyMxSession; underlying gateway session is null.");
var sessionId = session.SessionId;
var reply = await _client.AcknowledgeAlarmAsync(
new AcknowledgeAlarmRequest
{
SessionId = sessionId,
ClientCorrelationId = Guid.NewGuid().ToString("N"),
AlarmFullReference = alarmFullReference,
Comment = comment ?? string.Empty,
OperatorUser = operatorUser ?? string.Empty,
},
cancellationToken).ConfigureAwait(false);
if (reply.Status is { Success: 0 } status)
{
// Native MxAccess rejected the ack — log but don't throw. Treat as a
// best-effort operator workflow; the operator can retry via the OPC UA
// session if necessary.
_logger.LogWarning(
"Galaxy AcknowledgeAlarm for {AlarmRef} returned MxStatus failure: category={Category} detail={Detail} text={Text}",
alarmFullReference, status.Category, status.Detail, status.DiagnosticText);
}
}
}

View File

@@ -0,0 +1,32 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
/// <summary>
/// Test seam for the gateway-side Acknowledge call. Production wraps the
/// <c>MxGatewayClient.AcknowledgeAlarmAsync</c> RPC; tests substitute a fake
/// so <see cref="GalaxyDriver.AcknowledgeAsync"/> can be exercised without a
/// running gateway.
/// </summary>
internal interface IGalaxyAlarmAcknowledger
{
/// <summary>
/// Forward a single alarm acknowledgement to the gateway. The gateway
/// translates this to an MxAccess Acknowledge call against the worker's
/// session and returns the native MxStatus on the reply.
/// </summary>
/// <param name="alarmFullReference">
/// Fully-qualified alarm reference (e.g. <c>"Tank01.Level.HiHi"</c>).
/// </param>
/// <param name="comment">Operator-supplied comment forwarded to MxAccess.</param>
/// <param name="operatorUser">
/// Operator principal performing the acknowledgement. Resolved from the
/// OPC UA session by the server-side ACL layer before reaching the driver.
/// </param>
/// <param name="cancellationToken">Cancels the gateway RPC.</param>
Task AcknowledgeAsync(
string alarmFullReference,
string comment,
string operatorUser,
CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,244 @@
using System.Threading.Channels;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests;
/// <summary>
/// PR B.2 — pins GalaxyDriver's IAlarmSource implementation. The driver bridges
/// EventPump.OnAlarmTransition (PR B.1) onto IAlarmSource.OnAlarmEvent and
/// forwards Acknowledge through IGalaxyAlarmAcknowledger (production:
/// GatewayGalaxyAlarmAcknowledger calling the gateway's AcknowledgeAlarm RPC
/// from PR E.2).
/// </summary>
public sealed class GalaxyDriverAlarmSourceTests
{
[Fact]
public async Task SubscribeAlarmsAsync_returns_handle_and_event_fires_after_pump_alarm()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
// Subscribe so OnAlarmEvent has a registered handle to fire under.
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
handle.ShouldNotBeNull();
var observed = new List<AlarmEventArgs>();
driver.OnAlarmEvent += (_, args) => observed.Add(args);
// SubscribeAsync to start the EventPump (alarm wiring is lazy on first sub).
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
await subscriber.EmitAlarmAsync(new MxEvent
{
Family = MxEventFamily.OnAlarmTransition,
OnAlarmTransition = new OnAlarmTransitionEvent
{
AlarmFullReference = "Tank01.Level.HiHi",
SourceObjectReference = "Tank01",
AlarmTypeName = "AnalogLimitAlarm.HiHi",
TransitionKind = AlarmTransitionKind.Raise,
Severity = 750,
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
Description = "Tank 01 high-high level",
},
});
// Drain pump events.
for (var i = 0; i < 20 && observed.Count == 0; i++)
{
await Task.Delay(50);
}
observed.ShouldHaveSingleItem();
observed[0].ConditionId.ShouldBe("Tank01.Level.HiHi");
observed[0].SourceNodeId.ShouldBe("Tank01");
observed[0].AlarmType.ShouldBe("AnalogLimitAlarm.HiHi");
observed[0].Severity.ShouldBe(AlarmSeverity.Critical);
observed[0].SubscriptionHandle.ShouldBe(handle);
}
[Fact]
public async Task OnAlarmEvent_does_not_fire_when_no_subscription_active()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
var observed = new List<AlarmEventArgs>();
driver.OnAlarmEvent += (_, args) => observed.Add(args);
// Start the pump via a data subscription so alarm events flow but no alarm
// subscription is registered → OnAlarmEvent is suppressed.
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
await subscriber.EmitAlarmAsync(new MxEvent
{
Family = MxEventFamily.OnAlarmTransition,
OnAlarmTransition = new OnAlarmTransitionEvent
{
AlarmFullReference = "Tank01.Level.HiHi",
TransitionKind = AlarmTransitionKind.Raise,
Severity = 600,
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
},
});
await Task.Delay(150);
observed.ShouldBeEmpty();
}
[Fact]
public async Task UnsubscribeAlarmsAsync_stops_event_flow()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
var observed = new List<AlarmEventArgs>();
driver.OnAlarmEvent += (_, args) => observed.Add(args);
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
await driver.UnsubscribeAlarmsAsync(handle, CancellationToken.None);
await subscriber.EmitAlarmAsync(new MxEvent
{
Family = MxEventFamily.OnAlarmTransition,
OnAlarmTransition = new OnAlarmTransitionEvent
{
AlarmFullReference = "Tank01.Level.HiHi",
TransitionKind = AlarmTransitionKind.Raise,
Severity = 600,
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
},
});
await Task.Delay(150);
observed.ShouldBeEmpty();
}
[Fact]
public async Task UnsubscribeAlarmsAsync_throws_for_foreign_handle()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
var foreignHandle = new ForeignAlarmHandle();
await Should.ThrowAsync<ArgumentException>(() =>
driver.UnsubscribeAlarmsAsync(foreignHandle, CancellationToken.None));
}
[Fact]
public async Task AcknowledgeAsync_routes_each_request_to_the_acknowledger()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
var requests = new[]
{
new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", "shift handover"),
new AlarmAcknowledgeRequest("Tank02", "Tank02.Level.HiHi", "investigating"),
};
await driver.AcknowledgeAsync(requests, CancellationToken.None);
ack.Calls.Count.ShouldBe(2);
ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi");
ack.Calls[0].Comment.ShouldBe("shift handover");
ack.Calls[1].AlarmRef.ShouldBe("Tank02.Level.HiHi");
}
[Fact]
public async Task AcknowledgeAsync_falls_back_to_SourceNodeId_when_ConditionId_empty()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
await driver.AcknowledgeAsync(
[new AlarmAcknowledgeRequest("Tank01.Level.HiHi", string.Empty, null)],
CancellationToken.None);
ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi");
}
[Fact]
public async Task AcknowledgeAsync_throws_NotSupported_without_acknowledger()
{
var subscriber = new ManualSubscriber();
using var driver = NewDriver(subscriber, alarmAcknowledger: null);
await Should.ThrowAsync<NotSupportedException>(() =>
driver.AcknowledgeAsync(
[new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", null)],
CancellationToken.None));
}
private static GalaxyDriver NewDriver(
ManualSubscriber subscriber, IGalaxyAlarmAcknowledger? alarmAcknowledger)
{
var options = new GalaxyDriverOptions(
new GalaxyGatewayOptions("http://localhost:5000", "literal-api-key"),
new GalaxyMxAccessOptions("AlarmSourceTest"),
new GalaxyRepositoryOptions(),
new GalaxyReconnectOptions());
return new GalaxyDriver(
driverInstanceId: "drv-1",
options: options,
hierarchySource: null,
dataReader: null,
dataWriter: null,
subscriber: subscriber,
alarmAcknowledger: alarmAcknowledger);
}
private sealed class RecordingAcknowledger : IGalaxyAlarmAcknowledger
{
public List<(string AlarmRef, string Comment, string Operator)> Calls { get; } = [];
public Task AcknowledgeAsync(string alarmFullReference, string comment, string operatorUser, CancellationToken cancellationToken)
{
Calls.Add((alarmFullReference, comment, operatorUser));
return Task.CompletedTask;
}
}
private sealed class ForeignAlarmHandle : IAlarmSubscriptionHandle
{
public string DiagnosticId => "foreign";
}
private sealed class ManualSubscriber : IGalaxySubscriber
{
private readonly Channel<MxEvent> _stream =
Channel.CreateUnbounded<MxEvent>(new UnboundedChannelOptions { SingleReader = true });
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
{
var results = new List<SubscribeResult>();
var nextHandle = 100;
foreach (var r in fullReferences)
{
results.Add(new SubscribeResult { TagAddress = r, ItemHandle = nextHandle++, WasSuccessful = true });
}
return Task.FromResult<IReadOnlyList<SubscribeResult>>(results);
}
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
=> Task.CompletedTask;
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
=> _stream.Reader.ReadAllAsync(cancellationToken);
public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev);
}
}