246 lines
8.4 KiB
C#
246 lines
8.4 KiB
C#
using System.Collections.Concurrent;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests;
|
|
|
|
[Trait("Category", "Unit")]
|
|
public sealed class PollGroupEngineTests
|
|
{
|
|
private sealed class FakeSource
|
|
{
|
|
public ConcurrentDictionary<string, object?> Values { get; } = new();
|
|
public int ReadCount;
|
|
|
|
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
|
|
IReadOnlyList<string> refs, CancellationToken ct)
|
|
{
|
|
Interlocked.Increment(ref ReadCount);
|
|
var now = DateTime.UtcNow;
|
|
IReadOnlyList<DataValueSnapshot> snapshots = refs
|
|
.Select(r => Values.TryGetValue(r, out var v)
|
|
? new DataValueSnapshot(v, 0u, now, now)
|
|
: new DataValueSnapshot(null, 0x80340000u, null, now))
|
|
.ToList();
|
|
return Task.FromResult(snapshots);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Initial_poll_force_raises_every_subscribed_tag()
|
|
{
|
|
var src = new FakeSource();
|
|
src.Values["A"] = 1;
|
|
src.Values["B"] = "hello";
|
|
|
|
var events = new ConcurrentQueue<(ISubscriptionHandle h, string r, DataValueSnapshot s)>();
|
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
|
(h, r, s) => events.Enqueue((h, r, s)));
|
|
|
|
var handle = engine.Subscribe(["A", "B"], TimeSpan.FromMilliseconds(200));
|
|
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
|
|
|
events.Select(e => e.r).ShouldBe(["A", "B"], ignoreOrder: true);
|
|
engine.Unsubscribe(handle).ShouldBeTrue();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Unchanged_value_raises_only_once()
|
|
{
|
|
var src = new FakeSource();
|
|
src.Values["X"] = 42;
|
|
|
|
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
|
(h, r, s) => events.Enqueue((h, r, s)));
|
|
|
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
|
await Task.Delay(500);
|
|
engine.Unsubscribe(handle);
|
|
|
|
events.Count.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Value_change_raises_new_event()
|
|
{
|
|
var src = new FakeSource();
|
|
src.Values["X"] = 1;
|
|
|
|
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
|
(h, r, s) => events.Enqueue((h, r, s)));
|
|
|
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
|
src.Values["X"] = 2;
|
|
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
|
|
|
engine.Unsubscribe(handle);
|
|
events.Last().Item3.Value.ShouldBe(2);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Unsubscribe_halts_the_loop()
|
|
{
|
|
var src = new FakeSource();
|
|
src.Values["X"] = 1;
|
|
|
|
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
|
(h, r, s) => events.Enqueue((h, r, s)));
|
|
|
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
|
engine.Unsubscribe(handle).ShouldBeTrue();
|
|
var afterUnsub = events.Count;
|
|
|
|
src.Values["X"] = 999;
|
|
await Task.Delay(400);
|
|
events.Count.ShouldBe(afterUnsub);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Interval_below_floor_is_clamped()
|
|
{
|
|
var src = new FakeSource();
|
|
src.Values["X"] = 1;
|
|
|
|
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
|
(h, r, s) => events.Enqueue((h, r, s)),
|
|
minInterval: TimeSpan.FromMilliseconds(200));
|
|
|
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(5));
|
|
await Task.Delay(300);
|
|
engine.Unsubscribe(handle);
|
|
|
|
// 300 ms window, 200 ms floor, stable value → initial push + at most 1 extra poll.
|
|
// With zero changes only the initial-data push fires.
|
|
events.Count.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Multiple_subscriptions_are_independent()
|
|
{
|
|
var src = new FakeSource();
|
|
src.Values["A"] = 1;
|
|
src.Values["B"] = 2;
|
|
|
|
var a = new ConcurrentQueue<string>();
|
|
var b = new ConcurrentQueue<string>();
|
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
|
(h, r, s) =>
|
|
{
|
|
if (r == "A") a.Enqueue(r);
|
|
else if (r == "B") b.Enqueue(r);
|
|
});
|
|
|
|
var ha = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(100));
|
|
var hb = engine.Subscribe(["B"], TimeSpan.FromMilliseconds(100));
|
|
|
|
await WaitForAsync(() => a.Count >= 1 && b.Count >= 1, TimeSpan.FromSeconds(2));
|
|
engine.Unsubscribe(ha);
|
|
var aCount = a.Count;
|
|
src.Values["B"] = 77;
|
|
await WaitForAsync(() => b.Count >= 2, TimeSpan.FromSeconds(2));
|
|
|
|
a.Count.ShouldBe(aCount);
|
|
b.Count.ShouldBeGreaterThanOrEqualTo(2);
|
|
engine.Unsubscribe(hb);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Reader_exception_does_not_crash_loop()
|
|
{
|
|
var throwCount = 0;
|
|
var readCount = 0;
|
|
Task<IReadOnlyList<DataValueSnapshot>> Reader(IReadOnlyList<string> refs, CancellationToken ct)
|
|
{
|
|
if (Interlocked.Increment(ref readCount) <= 2)
|
|
{
|
|
Interlocked.Increment(ref throwCount);
|
|
throw new InvalidOperationException("boom");
|
|
}
|
|
var now = DateTime.UtcNow;
|
|
return Task.FromResult<IReadOnlyList<DataValueSnapshot>>(
|
|
refs.Select(r => new DataValueSnapshot(1, 0u, now, now)).ToList());
|
|
}
|
|
|
|
var events = new ConcurrentQueue<string>();
|
|
await using var engine = new PollGroupEngine(Reader,
|
|
(h, r, s) => events.Enqueue(r));
|
|
|
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
|
|
engine.Unsubscribe(handle);
|
|
|
|
throwCount.ShouldBe(2);
|
|
events.Count.ShouldBeGreaterThanOrEqualTo(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Unsubscribe_unknown_handle_returns_false()
|
|
{
|
|
var src = new FakeSource();
|
|
await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { });
|
|
|
|
var foreign = new DummyHandle();
|
|
engine.Unsubscribe(foreign).ShouldBeFalse();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ActiveSubscriptionCount_tracks_lifecycle()
|
|
{
|
|
var src = new FakeSource();
|
|
src.Values["X"] = 1;
|
|
await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { });
|
|
|
|
engine.ActiveSubscriptionCount.ShouldBe(0);
|
|
var h1 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200));
|
|
var h2 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200));
|
|
engine.ActiveSubscriptionCount.ShouldBe(2);
|
|
|
|
engine.Unsubscribe(h1);
|
|
engine.ActiveSubscriptionCount.ShouldBe(1);
|
|
engine.Unsubscribe(h2);
|
|
engine.ActiveSubscriptionCount.ShouldBe(0);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DisposeAsync_cancels_all_subscriptions()
|
|
{
|
|
var src = new FakeSource();
|
|
src.Values["X"] = 1;
|
|
|
|
var events = new ConcurrentQueue<string>();
|
|
var engine = new PollGroupEngine(src.ReadAsync,
|
|
(h, r, s) => events.Enqueue(r));
|
|
|
|
_ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
|
_ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
|
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
|
|
|
await engine.DisposeAsync();
|
|
engine.ActiveSubscriptionCount.ShouldBe(0);
|
|
|
|
var afterDispose = events.Count;
|
|
await Task.Delay(300);
|
|
// After dispose no more events — everything is cancelled.
|
|
events.Count.ShouldBe(afterDispose);
|
|
}
|
|
|
|
private sealed record DummyHandle : ISubscriptionHandle
|
|
{
|
|
public string DiagnosticId => "dummy";
|
|
}
|
|
|
|
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
|
|
{
|
|
var deadline = DateTime.UtcNow + timeout;
|
|
while (!condition() && DateTime.UtcNow < deadline)
|
|
await Task.Delay(20);
|
|
}
|
|
}
|