using System.Threading.Channels;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests;
///
/// PR B.2 — pins GalaxyDriver's IAlarmSource implementation. The driver bridges
/// EventPump.OnAlarmTransition (PR B.1) onto IAlarmSource.OnAlarmEvent and
/// forwards Acknowledge through IGalaxyAlarmAcknowledger (production:
/// GatewayGalaxyAlarmAcknowledger calling the gateway's AcknowledgeAlarm RPC
/// from PR E.2).
///
public sealed class GalaxyDriverAlarmSourceTests
{
[Fact]
public async Task SubscribeAlarmsAsync_returns_handle_and_event_fires_after_pump_alarm()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
// Subscribe so OnAlarmEvent has a registered handle to fire under.
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
handle.ShouldNotBeNull();
var observed = new List();
driver.OnAlarmEvent += (_, args) => observed.Add(args);
// SubscribeAsync to start the EventPump (alarm wiring is lazy on first sub).
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
await subscriber.EmitAlarmAsync(new MxEvent
{
Family = MxEventFamily.OnAlarmTransition,
OnAlarmTransition = new OnAlarmTransitionEvent
{
AlarmFullReference = "Tank01.Level.HiHi",
SourceObjectReference = "Tank01",
AlarmTypeName = "AnalogLimitAlarm.HiHi",
TransitionKind = AlarmTransitionKind.Raise,
Severity = 750,
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
Description = "Tank 01 high-high level",
},
});
// Drain pump events.
for (var i = 0; i < 20 && observed.Count == 0; i++)
{
await Task.Delay(50);
}
observed.ShouldHaveSingleItem();
observed[0].ConditionId.ShouldBe("Tank01.Level.HiHi");
observed[0].SourceNodeId.ShouldBe("Tank01");
observed[0].AlarmType.ShouldBe("AnalogLimitAlarm.HiHi");
observed[0].Severity.ShouldBe(AlarmSeverity.Critical);
observed[0].SubscriptionHandle.ShouldBe(handle);
}
[Fact]
public async Task OnAlarmEvent_does_not_fire_when_no_subscription_active()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
var observed = new List();
driver.OnAlarmEvent += (_, args) => observed.Add(args);
// Start the pump via a data subscription so alarm events flow but no alarm
// subscription is registered → OnAlarmEvent is suppressed.
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
await subscriber.EmitAlarmAsync(new MxEvent
{
Family = MxEventFamily.OnAlarmTransition,
OnAlarmTransition = new OnAlarmTransitionEvent
{
AlarmFullReference = "Tank01.Level.HiHi",
TransitionKind = AlarmTransitionKind.Raise,
Severity = 600,
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
},
});
await Task.Delay(150);
observed.ShouldBeEmpty();
}
[Fact]
public async Task UnsubscribeAlarmsAsync_stops_event_flow()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
var observed = new List();
driver.OnAlarmEvent += (_, args) => observed.Add(args);
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
await driver.UnsubscribeAlarmsAsync(handle, CancellationToken.None);
await subscriber.EmitAlarmAsync(new MxEvent
{
Family = MxEventFamily.OnAlarmTransition,
OnAlarmTransition = new OnAlarmTransitionEvent
{
AlarmFullReference = "Tank01.Level.HiHi",
TransitionKind = AlarmTransitionKind.Raise,
Severity = 600,
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
},
});
await Task.Delay(150);
observed.ShouldBeEmpty();
}
[Fact]
public async Task UnsubscribeAlarmsAsync_throws_for_foreign_handle()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
var foreignHandle = new ForeignAlarmHandle();
await Should.ThrowAsync(() =>
driver.UnsubscribeAlarmsAsync(foreignHandle, CancellationToken.None));
}
[Fact]
public async Task AcknowledgeAsync_routes_each_request_to_the_acknowledger()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
var requests = new[]
{
new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", "shift handover"),
new AlarmAcknowledgeRequest("Tank02", "Tank02.Level.HiHi", "investigating"),
};
await driver.AcknowledgeAsync(requests, CancellationToken.None);
ack.Calls.Count.ShouldBe(2);
ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi");
ack.Calls[0].Comment.ShouldBe("shift handover");
ack.Calls[1].AlarmRef.ShouldBe("Tank02.Level.HiHi");
}
[Fact]
public async Task AcknowledgeAsync_falls_back_to_SourceNodeId_when_ConditionId_empty()
{
var subscriber = new ManualSubscriber();
var ack = new RecordingAcknowledger();
using var driver = NewDriver(subscriber, ack);
await driver.AcknowledgeAsync(
[new AlarmAcknowledgeRequest("Tank01.Level.HiHi", string.Empty, null)],
CancellationToken.None);
ack.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi");
}
[Fact]
public async Task AcknowledgeAsync_throws_NotSupported_without_acknowledger()
{
var subscriber = new ManualSubscriber();
using var driver = NewDriver(subscriber, alarmAcknowledger: null);
await Should.ThrowAsync(() =>
driver.AcknowledgeAsync(
[new AlarmAcknowledgeRequest("Tank01", "Tank01.Level.HiHi", null)],
CancellationToken.None));
}
private static GalaxyDriver NewDriver(
ManualSubscriber subscriber, IGalaxyAlarmAcknowledger? alarmAcknowledger)
{
var options = new GalaxyDriverOptions(
new GalaxyGatewayOptions("http://localhost:5000", "literal-api-key"),
new GalaxyMxAccessOptions("AlarmSourceTest"),
new GalaxyRepositoryOptions(),
new GalaxyReconnectOptions());
return new GalaxyDriver(
driverInstanceId: "drv-1",
options: options,
hierarchySource: null,
dataReader: null,
dataWriter: null,
subscriber: subscriber,
alarmAcknowledger: alarmAcknowledger);
}
private sealed class RecordingAcknowledger : IGalaxyAlarmAcknowledger
{
public List<(string AlarmRef, string Comment, string Operator)> Calls { get; } = [];
public Task AcknowledgeAsync(string alarmFullReference, string comment, string operatorUser, CancellationToken cancellationToken)
{
Calls.Add((alarmFullReference, comment, operatorUser));
return Task.CompletedTask;
}
}
private sealed class ForeignAlarmHandle : IAlarmSubscriptionHandle
{
public string DiagnosticId => "foreign";
}
private sealed class ManualSubscriber : IGalaxySubscriber
{
private readonly Channel _stream =
Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true });
public Task> SubscribeBulkAsync(
IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
{
var results = new List();
var nextHandle = 100;
foreach (var r in fullReferences)
{
results.Add(new SubscribeResult { TagAddress = r, ItemHandle = nextHandle++, WasSuccessful = true });
}
return Task.FromResult>(results);
}
public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken)
=> Task.CompletedTask;
public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken)
=> _stream.Reader.ReadAllAsync(cancellationToken);
public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev);
}
}