driver-galaxy: GalaxyDriver implements IAlarmSource (PR B.2) #413
@@ -26,7 +26,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy;
|
||||
/// "GalaxyMxGateway" so both paths can be live simultaneously during parity testing.
|
||||
/// </remarks>
|
||||
public sealed class GalaxyDriver
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IDisposable
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IRediscoverable, IHostConnectivityProbe, IAlarmSource, IDisposable
|
||||
{
|
||||
private readonly string _driverInstanceId;
|
||||
private readonly GalaxyDriverOptions _options;
|
||||
@@ -63,6 +63,16 @@ public sealed class GalaxyDriver
|
||||
private EventPump? _eventPump;
|
||||
private readonly Lock _pumpLock = new();
|
||||
|
||||
// PR B.2 — IAlarmSource implementation. Production-side acks route through
|
||||
// GatewayGalaxyAlarmAcknowledger which calls MxGatewayClient.AcknowledgeAlarmAsync
|
||||
// (PR E.2 SDK). Tests inject IGalaxyAlarmAcknowledger via the internal ctor to
|
||||
// exercise the wiring without a running gateway. The alarm event stream is
|
||||
// delivered by EventPump.OnAlarmTransition (PR B.1) — this driver is the
|
||||
// consumer that bridges it onto IAlarmSource.OnAlarmEvent.
|
||||
private IGalaxyAlarmAcknowledger? _alarmAcknowledger;
|
||||
private readonly Lock _alarmHandlersLock = new();
|
||||
private readonly HashSet<GalaxyAlarmSubscriptionHandle> _alarmSubscriptions = new();
|
||||
|
||||
// PR 4.W — production runtime owned by InitializeAsync. The driver builds these
|
||||
// when it opens a real gw session; tests bypass them by injecting seams via the
|
||||
// internal ctor.
|
||||
@@ -99,12 +109,16 @@ public sealed class GalaxyDriver
|
||||
/// <summary>Fires when a host transitions Running ↔ Stopped (PR 4.7 HostStatusAggregator).</summary>
|
||||
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||||
|
||||
/// <inheritdoc />
|
||||
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
|
||||
|
||||
public GalaxyDriver(
|
||||
string driverInstanceId,
|
||||
GalaxyDriverOptions options,
|
||||
ILogger<GalaxyDriver>? logger = null)
|
||||
: this(driverInstanceId, options,
|
||||
hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null, logger)
|
||||
hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null,
|
||||
alarmAcknowledger: null, logger)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -121,6 +135,7 @@ public sealed class GalaxyDriver
|
||||
IGalaxyDataReader? dataReader = null,
|
||||
IGalaxyDataWriter? dataWriter = null,
|
||||
IGalaxySubscriber? subscriber = null,
|
||||
IGalaxyAlarmAcknowledger? alarmAcknowledger = null,
|
||||
ILogger<GalaxyDriver>? logger = null)
|
||||
{
|
||||
_driverInstanceId = !string.IsNullOrWhiteSpace(driverInstanceId)
|
||||
@@ -132,6 +147,7 @@ public sealed class GalaxyDriver
|
||||
_dataReader = dataReader;
|
||||
_dataWriter = dataWriter;
|
||||
_subscriber = subscriber;
|
||||
_alarmAcknowledger = alarmAcknowledger;
|
||||
|
||||
// Forward the aggregator's transitions through IHostConnectivityProbe.
|
||||
_hostStatuses.OnHostStatusChanged += (_, args) => OnHostStatusChanged?.Invoke(this, args);
|
||||
@@ -213,6 +229,9 @@ public sealed class GalaxyDriver
|
||||
_probeWatcher = new PerPlatformProbeWatcher(
|
||||
_subscriber, _hostStatuses, _logger,
|
||||
bufferedUpdateIntervalMs: _options.MxAccess.PublishingIntervalMs);
|
||||
|
||||
// PR B.2 — wire the alarm acknowledger to the live gateway client.
|
||||
_alarmAcknowledger ??= new GatewayGalaxyAlarmAcknowledger(_ownedMxClient, _ownedMxSession, _logger);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -705,11 +724,132 @@ public sealed class GalaxyDriver
|
||||
channelCapacity: _options.MxAccess.EventPumpChannelCapacity,
|
||||
clientName: _options.MxAccess.ClientName);
|
||||
_eventPump.OnDataChange += OnPumpDataChange;
|
||||
_eventPump.OnAlarmTransition += OnPumpAlarmTransition;
|
||||
_eventPump.Start();
|
||||
return _eventPump;
|
||||
}
|
||||
}
|
||||
|
||||
// ===== IAlarmSource (PR B.2) =====
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
|
||||
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
ArgumentNullException.ThrowIfNull(sourceNodeIds);
|
||||
|
||||
// The driver doesn't multiplex alarm subscriptions per source-node-id today —
|
||||
// alarm events arrive on the same gateway StreamEvents channel as data-change
|
||||
// events once the gateway emits the new family (PRs A.2 + A.3). The
|
||||
// subscription handle is a sentinel the server uses for symmetric Unsubscribe;
|
||||
// every active handle receives every alarm transition, and the server filters
|
||||
// by source node before raising Part 9 conditions. Same shape AbCip uses.
|
||||
EnsureEventPumpStarted();
|
||||
var handle = new GalaxyAlarmSubscriptionHandle(Guid.NewGuid().ToString("N"));
|
||||
lock (_alarmHandlersLock)
|
||||
{
|
||||
_alarmSubscriptions.Add(handle);
|
||||
}
|
||||
return Task.FromResult<IAlarmSubscriptionHandle>(handle);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
ArgumentNullException.ThrowIfNull(handle);
|
||||
if (handle is not GalaxyAlarmSubscriptionHandle gash)
|
||||
{
|
||||
throw new ArgumentException(
|
||||
$"Subscription handle was not issued by this driver (expected GalaxyAlarmSubscriptionHandle, got {handle.GetType().Name}).",
|
||||
nameof(handle));
|
||||
}
|
||||
lock (_alarmHandlersLock)
|
||||
{
|
||||
_alarmSubscriptions.Remove(gash);
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task AcknowledgeAsync(
|
||||
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
ArgumentNullException.ThrowIfNull(acknowledgements);
|
||||
if (acknowledgements.Count == 0) return;
|
||||
|
||||
if (_alarmAcknowledger is null)
|
||||
{
|
||||
throw new NotSupportedException(
|
||||
"GalaxyDriver.AcknowledgeAsync requires GatewayGalaxyAlarmAcknowledger wired against a connected " +
|
||||
"GalaxyMxSession (PR B.2). InitializeAsync must run before alarm acknowledgements can flow.");
|
||||
}
|
||||
|
||||
// Acks are issued one-by-one — the gateway RPC accepts a single alarm
|
||||
// reference per call. AlarmConditionState's per-condition Acknowledge in the
|
||||
// server-side ACL layer is the natural rate-limit, so issuing in series here
|
||||
// keeps the operator-comment ordering deterministic without bursting the
|
||||
// worker's STA queue.
|
||||
foreach (var ack in acknowledgements)
|
||||
{
|
||||
// ConditionId carries the alarm full reference for the Galaxy driver —
|
||||
// SourceNodeId is the OPC UA browse path, which the gateway can't address.
|
||||
// The server-side condition state pairs them through AlarmConditionService.
|
||||
var alarmFullReference = !string.IsNullOrEmpty(ack.ConditionId)
|
||||
? ack.ConditionId
|
||||
: ack.SourceNodeId;
|
||||
await _alarmAcknowledger.AcknowledgeAsync(
|
||||
alarmFullReference,
|
||||
ack.Comment ?? string.Empty,
|
||||
operatorUser: string.Empty, // server-side ACL fills this from the OPC UA session
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Receives <see cref="GalaxyAlarmTransition"/> events from the EventPump and
|
||||
/// reshapes them into <see cref="AlarmEventArgs"/> for OPC UA-side consumers.
|
||||
/// Fires <see cref="OnAlarmEvent"/> only when at least one alarm subscription is
|
||||
/// active so a server that hasn't called <see cref="SubscribeAlarmsAsync"/> yet
|
||||
/// doesn't surface untracked transitions.
|
||||
/// </summary>
|
||||
private void OnPumpAlarmTransition(object? sender, GalaxyAlarmTransition transition)
|
||||
{
|
||||
GalaxyAlarmSubscriptionHandle? handle;
|
||||
lock (_alarmHandlersLock)
|
||||
{
|
||||
// Pick any active subscription handle as the "owner" of the event. The
|
||||
// server-side state machine doesn't multiplex by handle today; if multiple
|
||||
// alarm subscriptions are active we still only fire the event once and
|
||||
// the AlarmConditionService dispatches per-source-node downstream.
|
||||
handle = _alarmSubscriptions.Count > 0
|
||||
? _alarmSubscriptions.First()
|
||||
: null;
|
||||
}
|
||||
if (handle is null) return;
|
||||
|
||||
var args = new AlarmEventArgs(
|
||||
SubscriptionHandle: handle,
|
||||
SourceNodeId: transition.SourceObjectReference,
|
||||
ConditionId: transition.AlarmFullReference,
|
||||
AlarmType: transition.AlarmTypeName,
|
||||
Message: transition.Description,
|
||||
Severity: transition.SeverityBucket,
|
||||
SourceTimestampUtc: transition.TransitionTimestampUtc);
|
||||
try
|
||||
{
|
||||
OnAlarmEvent?.Invoke(this, args);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"GalaxyDriver OnAlarmEvent handler threw for {AlarmRef} — continuing.",
|
||||
transition.AlarmFullReference);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Forwards every fan-out event to the public <see cref="OnDataChange"/> for
|
||||
/// ISubscribable consumers, AND routes ScanState changes to the per-platform
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Driver-side handle returned by <see cref="GalaxyDriver.SubscribeAlarmsAsync"/>.
|
||||
/// The driver doesn't multiplex alarm transitions per handle — every active handle
|
||||
/// observes the gateway's alarm-event stream — but the handle is needed for
|
||||
/// symmetric Unsubscribe and for the server-side AlarmConditionService to
|
||||
/// correlate transitions with the originating subscription.
|
||||
/// </summary>
|
||||
internal sealed class GalaxyAlarmSubscriptionHandle : IAlarmSubscriptionHandle
|
||||
{
|
||||
public GalaxyAlarmSubscriptionHandle(string diagnosticId)
|
||||
{
|
||||
DiagnosticId = diagnosticId;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public string DiagnosticId { get; }
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using MxGateway.Client;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Production <see cref="IGalaxyAlarmAcknowledger"/> backed by the
|
||||
/// <c>MxGatewayClient.AcknowledgeAlarmAsync</c> RPC (PR E.2). Maps the
|
||||
/// reply's protocol status into a thrown exception when the gateway
|
||||
/// reports a non-OK condition; native MxStatus failures inside the reply
|
||||
/// surface as a logged warning so operator workflows aren't blocked by a
|
||||
/// transient MxAccess hiccup.
|
||||
/// </summary>
|
||||
internal sealed class GatewayGalaxyAlarmAcknowledger : IGalaxyAlarmAcknowledger
|
||||
{
|
||||
private readonly MxGatewayClient _client;
|
||||
private readonly GalaxyMxSession _session;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
public GatewayGalaxyAlarmAcknowledger(
|
||||
MxGatewayClient client,
|
||||
GalaxyMxSession session,
|
||||
ILogger logger)
|
||||
{
|
||||
_client = client ?? throw new ArgumentNullException(nameof(client));
|
||||
_session = session ?? throw new ArgumentNullException(nameof(session));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task AcknowledgeAsync(
|
||||
string alarmFullReference,
|
||||
string comment,
|
||||
string operatorUser,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrEmpty(alarmFullReference);
|
||||
|
||||
var session = _session.Session
|
||||
?? throw new InvalidOperationException(
|
||||
"GatewayGalaxyAlarmAcknowledger requires a connected GalaxyMxSession; underlying gateway session is null.");
|
||||
var sessionId = session.SessionId;
|
||||
|
||||
var reply = await _client.AcknowledgeAlarmAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = Guid.NewGuid().ToString("N"),
|
||||
AlarmFullReference = alarmFullReference,
|
||||
Comment = comment ?? string.Empty,
|
||||
OperatorUser = operatorUser ?? string.Empty,
|
||||
},
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (reply.Status is { Success: 0 } status)
|
||||
{
|
||||
// Native MxAccess rejected the ack — log but don't throw. Treat as a
|
||||
// best-effort operator workflow; the operator can retry via the OPC UA
|
||||
// session if necessary.
|
||||
_logger.LogWarning(
|
||||
"Galaxy AcknowledgeAlarm for {AlarmRef} returned MxStatus failure: category={Category} detail={Detail} text={Text}",
|
||||
alarmFullReference, status.Category, status.Detail, status.DiagnosticText);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Test seam for the gateway-side Acknowledge call. Production wraps the
|
||||
/// <c>MxGatewayClient.AcknowledgeAlarmAsync</c> RPC; tests substitute a fake
|
||||
/// so <see cref="GalaxyDriver.AcknowledgeAsync"/> can be exercised without a
|
||||
/// running gateway.
|
||||
/// </summary>
|
||||
internal interface IGalaxyAlarmAcknowledger
|
||||
{
|
||||
/// <summary>
|
||||
/// Forward a single alarm acknowledgement to the gateway. The gateway
|
||||
/// translates this to an MxAccess Acknowledge call against the worker's
|
||||
/// session and returns the native MxStatus on the reply.
|
||||
/// </summary>
|
||||
/// <param name="alarmFullReference">
|
||||
/// Fully-qualified alarm reference (e.g. <c>"Tank01.Level.HiHi"</c>).
|
||||
/// </param>
|
||||
/// <param name="comment">Operator-supplied comment forwarded to MxAccess.</param>
|
||||
/// <param name="operatorUser">
|
||||
/// Operator principal performing the acknowledgement. Resolved from the
|
||||
/// OPC UA session by the server-side ACL layer before reaching the driver.
|
||||
/// </param>
|
||||
/// <param name="cancellationToken">Cancels the gateway RPC.</param>
|
||||
Task AcknowledgeAsync(
|
||||
string alarmFullReference,
|
||||
string comment,
|
||||
string operatorUser,
|
||||
CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,244 @@
|
||||
using System.Threading.Channels;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// PR B.2 — pins GalaxyDriver's IAlarmSource implementation. The driver bridges
|
||||
/// EventPump.OnAlarmTransition (PR B.1) onto IAlarmSource.OnAlarmEvent and
|
||||
/// forwards Acknowledge through IGalaxyAlarmAcknowledger (production:
|
||||
/// GatewayGalaxyAlarmAcknowledger calling the gateway's AcknowledgeAlarm RPC
|
||||
/// from PR E.2).
|
||||
/// </summary>
|
||||
public sealed class GalaxyDriverAlarmSourceTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task SubscribeAlarmsAsync_returns_handle_and_event_fires_after_pump_alarm()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
|
||||
// Subscribe so OnAlarmEvent has a registered handle to fire under.
|
||||
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
|
||||
handle.ShouldNotBeNull();
|
||||
|
||||
var observed = new List<AlarmEventArgs>();
|
||||
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
||||
|
||||
// SubscribeAsync to start the EventPump (alarm wiring is lazy on first sub).
|
||||
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
||||
|
||||
await subscriber.EmitAlarmAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
OnAlarmTransition = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
SourceObjectReference = "Tank01",
|
||||
AlarmTypeName = "AnalogLimitAlarm.HiHi",
|
||||
TransitionKind = AlarmTransitionKind.Raise,
|
||||
Severity = 750,
|
||||
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||
Description = "Tank 01 high-high level",
|
||||
},
|
||||
});
|
||||
|
||||
// Drain pump events.
|
||||
for (var i = 0; i < 20 && observed.Count == 0; i++)
|
||||
{
|
||||
await Task.Delay(50);
|
||||
}
|
||||
|
||||
observed.ShouldHaveSingleItem();
|
||||
observed[0].ConditionId.ShouldBe("Tank01.Level.HiHi");
|
||||
observed[0].SourceNodeId.ShouldBe("Tank01");
|
||||
observed[0].AlarmType.ShouldBe("AnalogLimitAlarm.HiHi");
|
||||
observed[0].Severity.ShouldBe(AlarmSeverity.Critical);
|
||||
observed[0].SubscriptionHandle.ShouldBe(handle);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OnAlarmEvent_does_not_fire_when_no_subscription_active()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
|
||||
var observed = new List<AlarmEventArgs>();
|
||||
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
||||
|
||||
// Start the pump via a data subscription so alarm events flow but no alarm
|
||||
// subscription is registered → OnAlarmEvent is suppressed.
|
||||
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
||||
await subscriber.EmitAlarmAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
OnAlarmTransition = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
TransitionKind = AlarmTransitionKind.Raise,
|
||||
Severity = 600,
|
||||
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||
},
|
||||
});
|
||||
await Task.Delay(150);
|
||||
|
||||
observed.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UnsubscribeAlarmsAsync_stops_event_flow()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
|
||||
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
|
||||
var observed = new List<AlarmEventArgs>();
|
||||
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
||||
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
||||
|
||||
await driver.UnsubscribeAlarmsAsync(handle, CancellationToken.None);
|
||||
|
||||
await subscriber.EmitAlarmAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
OnAlarmTransition = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
TransitionKind = AlarmTransitionKind.Raise,
|
||||
Severity = 600,
|
||||
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||
},
|
||||
});
|
||||
await Task.Delay(150);
|
||||
|
||||
observed.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UnsubscribeAlarmsAsync_throws_for_foreign_handle()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
|
||||
var foreignHandle = new ForeignAlarmHandle();
|
||||
await Should.ThrowAsync<ArgumentException>(() =>
|
||||
driver.UnsubscribeAlarmsAsync(foreignHandle, CancellationToken.None));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_routes_each_request_to_the_acknowledger()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
|
||||
var requests = new[]
|
||||
{
|
||||
new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", "shift handover"),
|
||||
new AlarmAcknowledgeRequest("Tank02", "Tank02.Level.HiHi", "investigating"),
|
||||
};
|
||||
|
||||
await driver.AcknowledgeAsync(requests, CancellationToken.None);
|
||||
|
||||
ack.Calls.Count.ShouldBe(2);
|
||||
ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi");
|
||||
ack.Calls[0].Comment.ShouldBe("shift handover");
|
||||
ack.Calls[1].AlarmRef.ShouldBe("Tank02.Level.HiHi");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_falls_back_to_SourceNodeId_when_ConditionId_empty()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
|
||||
await driver.AcknowledgeAsync(
|
||||
[new AlarmAcknowledgeRequest("Tank01.Level.HiHi", string.Empty, null)],
|
||||
CancellationToken.None);
|
||||
|
||||
ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_throws_NotSupported_without_acknowledger()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
using var driver = NewDriver(subscriber, alarmAcknowledger: null);
|
||||
|
||||
await Should.ThrowAsync<NotSupportedException>(() =>
|
||||
driver.AcknowledgeAsync(
|
||||
[new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", null)],
|
||||
CancellationToken.None));
|
||||
}
|
||||
|
||||
private static GalaxyDriver NewDriver(
|
||||
ManualSubscriber subscriber, IGalaxyAlarmAcknowledger? alarmAcknowledger)
|
||||
{
|
||||
var options = new GalaxyDriverOptions(
|
||||
new GalaxyGatewayOptions("http://localhost:5000", "literal-api-key"),
|
||||
new GalaxyMxAccessOptions("AlarmSourceTest"),
|
||||
new GalaxyRepositoryOptions(),
|
||||
new GalaxyReconnectOptions());
|
||||
return new GalaxyDriver(
|
||||
driverInstanceId: "drv-1",
|
||||
options: options,
|
||||
hierarchySource: null,
|
||||
dataReader: null,
|
||||
dataWriter: null,
|
||||
subscriber: subscriber,
|
||||
alarmAcknowledger: alarmAcknowledger);
|
||||
}
|
||||
|
||||
private sealed class RecordingAcknowledger : IGalaxyAlarmAcknowledger
|
||||
{
|
||||
public List<(string AlarmRef, string Comment, string Operator)> Calls { get; } = [];
|
||||
|
||||
public Task AcknowledgeAsync(string alarmFullReference, string comment, string operatorUser, CancellationToken cancellationToken)
|
||||
{
|
||||
Calls.Add((alarmFullReference, comment, operatorUser));
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class ForeignAlarmHandle : IAlarmSubscriptionHandle
|
||||
{
|
||||
public string DiagnosticId => "foreign";
|
||||
}
|
||||
|
||||
private sealed class ManualSubscriber : IGalaxySubscriber
|
||||
{
|
||||
private readonly Channel<MxEvent> _stream =
|
||||
Channel.CreateUnbounded<MxEvent>(new UnboundedChannelOptions { SingleReader = true });
|
||||
|
||||
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
|
||||
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
|
||||
{
|
||||
var results = new List<SubscribeResult>();
|
||||
var nextHandle = 100;
|
||||
foreach (var r in fullReferences)
|
||||
{
|
||||
results.Add(new SubscribeResult { TagAddress = r, ItemHandle = nextHandle++, WasSuccessful = true });
|
||||
}
|
||||
return Task.FromResult<IReadOnlyList<SubscribeResult>>(results);
|
||||
}
|
||||
|
||||
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
|
||||
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
|
||||
=> _stream.Reader.ReadAllAsync(cancellationToken);
|
||||
|
||||
public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user