185 lines
7.7 KiB
C#
185 lines
7.7 KiB
C#
using System.Collections.Concurrent;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.AbCip;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
|
|
|
|
[Trait("Category", "Unit")]
|
|
public sealed class AbCipSubscriptionTests
|
|
{
|
|
private static (AbCipDriver drv, FakeAbCipTagFactory factory) NewDriver(params AbCipTagDefinition[] tags)
|
|
{
|
|
var factory = new FakeAbCipTagFactory();
|
|
var drv = new AbCipDriver(new AbCipDriverOptions
|
|
{
|
|
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
|
Tags = tags,
|
|
}, "drv-1", factory);
|
|
return (drv, factory);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Initial_poll_raises_OnDataChange_for_every_tag()
|
|
{
|
|
var (drv, factory) = NewDriver(
|
|
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt),
|
|
new AbCipTagDefinition("Temp", "ab://10.0.0.5/1,0", "Temp", AbCipDataType.Real));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
factory.Customise = p => p.TagName switch
|
|
{
|
|
"Speed" => new FakeAbCipTag(p) { Value = 1800 },
|
|
"Temp" => new FakeAbCipTag(p) { Value = 72.5f },
|
|
_ => new FakeAbCipTag(p),
|
|
};
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Speed", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None);
|
|
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
|
|
|
events.Select(e => e.FullReference).ShouldContain("Speed");
|
|
events.Select(e => e.FullReference).ShouldContain("Temp");
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Unchanged_value_raises_only_once()
|
|
{
|
|
var (drv, factory) = NewDriver(
|
|
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
factory.Customise = p => new FakeAbCipTag(p) { Value = 1800 };
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
await Task.Delay(500);
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
|
|
events.Count.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Value_change_between_polls_raises_OnDataChange()
|
|
{
|
|
var (drv, factory) = NewDriver(
|
|
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 100 };
|
|
factory.Customise = _ => tagRef;
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
|
tagRef.Value = 200; // simulate PLC change
|
|
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
|
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
events.Count.ShouldBeGreaterThanOrEqualTo(2);
|
|
events.Last().Snapshot.Value.ShouldBe(200);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Unsubscribe_halts_polling()
|
|
{
|
|
var (drv, factory) = NewDriver(
|
|
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 1 };
|
|
factory.Customise = _ => tagRef;
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
|
|
var afterUnsub = events.Count;
|
|
tagRef.Value = 999;
|
|
await Task.Delay(400);
|
|
events.Count.ShouldBe(afterUnsub);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Interval_below_100ms_is_floored()
|
|
{
|
|
var (drv, factory) = NewDriver(
|
|
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
factory.Customise = p => new FakeAbCipTag(p) { Value = 1 };
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(5), CancellationToken.None);
|
|
await Task.Delay(300);
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
|
|
// Value is stable → only the initial-data push fires; the 100 ms floor keeps polls sparse enough
|
|
// that no extra event is produced against a stable value.
|
|
events.Count.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ShutdownAsync_cancels_active_subscriptions()
|
|
{
|
|
var (drv, factory) = NewDriver(
|
|
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
factory.Customise = p => new FakeAbCipTag(p) { Value = 1 };
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
_ = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
|
await drv.ShutdownAsync(CancellationToken.None);
|
|
|
|
var afterShutdown = events.Count;
|
|
await Task.Delay(300);
|
|
events.Count.ShouldBe(afterShutdown);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Subscription_on_UDT_member_uses_synthesised_full_reference()
|
|
{
|
|
var factory = new FakeAbCipTagFactory();
|
|
var drv = new AbCipDriver(new AbCipDriverOptions
|
|
{
|
|
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
|
Tags =
|
|
[
|
|
new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure,
|
|
Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)]),
|
|
],
|
|
}, "drv-1", factory);
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
factory.Customise = p => p.TagName == "Motor1.Speed"
|
|
? new FakeAbCipTag(p) { Value = 77 }
|
|
: new FakeAbCipTag(p);
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Motor1.Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
|
|
|
|
events.First().Snapshot.Value.ShouldBe(77);
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
}
|
|
|
|
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
|
|
{
|
|
var deadline = DateTime.UtcNow + timeout;
|
|
while (!condition() && DateTime.UtcNow < deadline)
|
|
await Task.Delay(20);
|
|
}
|
|
}
|