Twelfth PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Depends on PR B.1 (EventPump dispatch, merged) and PR E.2 (.NET SDK alarm methods, merged). Restores the v1 IAlarmSource capability that PR 7.2 retired with the legacy Galaxy.Host / Galaxy.Proxy projects. GalaxyDriver gains: - IAlarmSource on the class declaration → eight capabilities total (IDriver / ITagDiscovery / IReadable / IWritable / ISubscribable / IRediscoverable / IHostConnectivityProbe / IAlarmSource). - SubscribeAlarmsAsync — returns a sentinel handle and starts the shared EventPump (alarm wiring is lazy on first sub). Multiple handles share the same gateway stream; the server-side AlarmConditionService dispatches per-source-node downstream. - UnsubscribeAlarmsAsync — symmetric handle removal; rejects handles not issued by this driver. - AcknowledgeAsync — issues one gateway RPC per acknowledgement through IGalaxyAlarmAcknowledger. ConditionId carries the alarm full reference; falls back to SourceNodeId when empty. - OnAlarmEvent — bridges EventPump.OnAlarmTransition (B.1) onto AlarmEventArgs. Suppressed when no alarm subscription is active so untracked transitions don't leak through. New runtime types: - IGalaxyAlarmAcknowledger — test seam. - GatewayGalaxyAlarmAcknowledger — production wrapper around MxGatewayClient.AcknowledgeAlarmAsync (PR E.2). Maps native MxStatus failures to a logged warning rather than a thrown exception so a transient MxAccess hiccup doesn't fail the operator's Acknowledge. - GalaxyAlarmSubscriptionHandle — driver-side IAlarmSubscriptionHandle. Production runtime construction in BuildProductionRuntimeAsync wires the acknowledger when not pre-injected; tests inject a fake via the internal ctor. Tests: - 7 new tests in GalaxyDriverAlarmSourceTests — subscribe → event fire path, suppress without subscription, unsubscribe stops flow, foreign-handle rejection, ack routes per-request, ack falls back to SourceNodeId, ack throws NotSupported without acknowledger. - Full Driver.Galaxy.Tests: 203 passed (was 196; 7 new). Operates as a "stub-ready" surface — runtime ack calls will return PERMISSION_DENIED until A.3 ships the gateway-side dispatch, and no alarm transitions will arrive until A.2 adds the worker MxAccess subscription. Both will activate this code path automatically when the gateway side lands. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
245 lines
9.3 KiB
C#
245 lines
9.3 KiB
C#
using System.Threading.Channels;
|
|
using Google.Protobuf.WellKnownTypes;
|
|
using MxGateway.Contracts.Proto;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests;
|
|
|
|
/// <summary>
|
|
/// PR B.2 — pins GalaxyDriver's IAlarmSource implementation. The driver bridges
|
|
/// EventPump.OnAlarmTransition (PR B.1) onto IAlarmSource.OnAlarmEvent and
|
|
/// forwards Acknowledge through IGalaxyAlarmAcknowledger (production:
|
|
/// GatewayGalaxyAlarmAcknowledger calling the gateway's AcknowledgeAlarm RPC
|
|
/// from PR E.2).
|
|
/// </summary>
|
|
public sealed class GalaxyDriverAlarmSourceTests
|
|
{
|
|
[Fact]
|
|
public async Task SubscribeAlarmsAsync_returns_handle_and_event_fires_after_pump_alarm()
|
|
{
|
|
var subscriber = new ManualSubscriber();
|
|
var ack = new RecordingAcknowledger();
|
|
using var driver = NewDriver(subscriber, ack);
|
|
|
|
// Subscribe so OnAlarmEvent has a registered handle to fire under.
|
|
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
|
|
handle.ShouldNotBeNull();
|
|
|
|
var observed = new List<AlarmEventArgs>();
|
|
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
|
|
|
// SubscribeAsync to start the EventPump (alarm wiring is lazy on first sub).
|
|
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
|
|
|
await subscriber.EmitAlarmAsync(new MxEvent
|
|
{
|
|
Family = MxEventFamily.OnAlarmTransition,
|
|
OnAlarmTransition = new OnAlarmTransitionEvent
|
|
{
|
|
AlarmFullReference = "Tank01.Level.HiHi",
|
|
SourceObjectReference = "Tank01",
|
|
AlarmTypeName = "AnalogLimitAlarm.HiHi",
|
|
TransitionKind = AlarmTransitionKind.Raise,
|
|
Severity = 750,
|
|
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
|
Description = "Tank 01 high-high level",
|
|
},
|
|
});
|
|
|
|
// Drain pump events.
|
|
for (var i = 0; i < 20 && observed.Count == 0; i++)
|
|
{
|
|
await Task.Delay(50);
|
|
}
|
|
|
|
observed.ShouldHaveSingleItem();
|
|
observed[0].ConditionId.ShouldBe("Tank01.Level.HiHi");
|
|
observed[0].SourceNodeId.ShouldBe("Tank01");
|
|
observed[0].AlarmType.ShouldBe("AnalogLimitAlarm.HiHi");
|
|
observed[0].Severity.ShouldBe(AlarmSeverity.Critical);
|
|
observed[0].SubscriptionHandle.ShouldBe(handle);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task OnAlarmEvent_does_not_fire_when_no_subscription_active()
|
|
{
|
|
var subscriber = new ManualSubscriber();
|
|
var ack = new RecordingAcknowledger();
|
|
using var driver = NewDriver(subscriber, ack);
|
|
|
|
var observed = new List<AlarmEventArgs>();
|
|
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
|
|
|
// Start the pump via a data subscription so alarm events flow but no alarm
|
|
// subscription is registered → OnAlarmEvent is suppressed.
|
|
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
|
await subscriber.EmitAlarmAsync(new MxEvent
|
|
{
|
|
Family = MxEventFamily.OnAlarmTransition,
|
|
OnAlarmTransition = new OnAlarmTransitionEvent
|
|
{
|
|
AlarmFullReference = "Tank01.Level.HiHi",
|
|
TransitionKind = AlarmTransitionKind.Raise,
|
|
Severity = 600,
|
|
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
|
},
|
|
});
|
|
await Task.Delay(150);
|
|
|
|
observed.ShouldBeEmpty();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task UnsubscribeAlarmsAsync_stops_event_flow()
|
|
{
|
|
var subscriber = new ManualSubscriber();
|
|
var ack = new RecordingAcknowledger();
|
|
using var driver = NewDriver(subscriber, ack);
|
|
|
|
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
|
|
var observed = new List<AlarmEventArgs>();
|
|
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
|
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
|
|
|
await driver.UnsubscribeAlarmsAsync(handle, CancellationToken.None);
|
|
|
|
await subscriber.EmitAlarmAsync(new MxEvent
|
|
{
|
|
Family = MxEventFamily.OnAlarmTransition,
|
|
OnAlarmTransition = new OnAlarmTransitionEvent
|
|
{
|
|
AlarmFullReference = "Tank01.Level.HiHi",
|
|
TransitionKind = AlarmTransitionKind.Raise,
|
|
Severity = 600,
|
|
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
|
},
|
|
});
|
|
await Task.Delay(150);
|
|
|
|
observed.ShouldBeEmpty();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task UnsubscribeAlarmsAsync_throws_for_foreign_handle()
|
|
{
|
|
var subscriber = new ManualSubscriber();
|
|
var ack = new RecordingAcknowledger();
|
|
using var driver = NewDriver(subscriber, ack);
|
|
|
|
var foreignHandle = new ForeignAlarmHandle();
|
|
await Should.ThrowAsync<ArgumentException>(() =>
|
|
driver.UnsubscribeAlarmsAsync(foreignHandle, CancellationToken.None));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AcknowledgeAsync_routes_each_request_to_the_acknowledger()
|
|
{
|
|
var subscriber = new ManualSubscriber();
|
|
var ack = new RecordingAcknowledger();
|
|
using var driver = NewDriver(subscriber, ack);
|
|
|
|
var requests = new[]
|
|
{
|
|
new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", "shift handover"),
|
|
new AlarmAcknowledgeRequest("Tank02", "Tank02.Level.HiHi", "investigating"),
|
|
};
|
|
|
|
await driver.AcknowledgeAsync(requests, CancellationToken.None);
|
|
|
|
ack.Calls.Count.ShouldBe(2);
|
|
ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi");
|
|
ack.Calls[0].Comment.ShouldBe("shift handover");
|
|
ack.Calls[1].AlarmRef.ShouldBe("Tank02.Level.HiHi");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AcknowledgeAsync_falls_back_to_SourceNodeId_when_ConditionId_empty()
|
|
{
|
|
var subscriber = new ManualSubscriber();
|
|
var ack = new RecordingAcknowledger();
|
|
using var driver = NewDriver(subscriber, ack);
|
|
|
|
await driver.AcknowledgeAsync(
|
|
[new AlarmAcknowledgeRequest("Tank01.Level.HiHi", string.Empty, null)],
|
|
CancellationToken.None);
|
|
|
|
ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AcknowledgeAsync_throws_NotSupported_without_acknowledger()
|
|
{
|
|
var subscriber = new ManualSubscriber();
|
|
using var driver = NewDriver(subscriber, alarmAcknowledger: null);
|
|
|
|
await Should.ThrowAsync<NotSupportedException>(() =>
|
|
driver.AcknowledgeAsync(
|
|
[new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", null)],
|
|
CancellationToken.None));
|
|
}
|
|
|
|
private static GalaxyDriver NewDriver(
|
|
ManualSubscriber subscriber, IGalaxyAlarmAcknowledger? alarmAcknowledger)
|
|
{
|
|
var options = new GalaxyDriverOptions(
|
|
new GalaxyGatewayOptions("http://localhost:5000", "literal-api-key"),
|
|
new GalaxyMxAccessOptions("AlarmSourceTest"),
|
|
new GalaxyRepositoryOptions(),
|
|
new GalaxyReconnectOptions());
|
|
return new GalaxyDriver(
|
|
driverInstanceId: "drv-1",
|
|
options: options,
|
|
hierarchySource: null,
|
|
dataReader: null,
|
|
dataWriter: null,
|
|
subscriber: subscriber,
|
|
alarmAcknowledger: alarmAcknowledger);
|
|
}
|
|
|
|
private sealed class RecordingAcknowledger : IGalaxyAlarmAcknowledger
|
|
{
|
|
public List<(string AlarmRef, string Comment, string Operator)> Calls { get; } = [];
|
|
|
|
public Task AcknowledgeAsync(string alarmFullReference, string comment, string operatorUser, CancellationToken cancellationToken)
|
|
{
|
|
Calls.Add((alarmFullReference, comment, operatorUser));
|
|
return Task.CompletedTask;
|
|
}
|
|
}
|
|
|
|
private sealed class ForeignAlarmHandle : IAlarmSubscriptionHandle
|
|
{
|
|
public string DiagnosticId => "foreign";
|
|
}
|
|
|
|
private sealed class ManualSubscriber : IGalaxySubscriber
|
|
{
|
|
private readonly Channel<MxEvent> _stream =
|
|
Channel.CreateUnbounded<MxEvent>(new UnboundedChannelOptions { SingleReader = true });
|
|
|
|
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
|
|
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
|
|
{
|
|
var results = new List<SubscribeResult>();
|
|
var nextHandle = 100;
|
|
foreach (var r in fullReferences)
|
|
{
|
|
results.Add(new SubscribeResult { TagAddress = r, ItemHandle = nextHandle++, WasSuccessful = true });
|
|
}
|
|
return Task.FromResult<IReadOnlyList<SubscribeResult>>(results);
|
|
}
|
|
|
|
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
|
|
=> Task.CompletedTask;
|
|
|
|
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
|
|
=> _stream.Reader.ReadAllAsync(cancellationToken);
|
|
|
|
public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev);
|
|
}
|
|
}
|