Compare commits
4 Commits
abcip-pr6-
...
abcip-pr8-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ac14ba9664 | ||
| 5978ea002d | |||
|
|
33780eb64c | ||
| 521bcb2f68 |
@@ -20,17 +20,22 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip;
|
||||
/// from native-heap growth that the CLR allocator can't see; it tears down every
|
||||
/// <see cref="PlcTagHandle"/> and reconnects each device.</para>
|
||||
/// </remarks>
|
||||
public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, IDisposable, IAsyncDisposable
|
||||
public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable,
|
||||
IHostConnectivityProbe, IPerCallHostResolver, IDisposable, IAsyncDisposable
|
||||
{
|
||||
private readonly AbCipDriverOptions _options;
|
||||
private readonly string _driverInstanceId;
|
||||
private readonly IAbCipTagFactory _tagFactory;
|
||||
private readonly IAbCipTagEnumeratorFactory _enumeratorFactory;
|
||||
private readonly AbCipTemplateCache _templateCache = new();
|
||||
private readonly PollGroupEngine _poll;
|
||||
private readonly Dictionary<string, DeviceState> _devices = new(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly Dictionary<string, AbCipTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
|
||||
private DriverHealth _health = new(DriverState.Unknown, null, null);
|
||||
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||||
|
||||
public AbCipDriver(AbCipDriverOptions options, string driverInstanceId,
|
||||
IAbCipTagFactory? tagFactory = null,
|
||||
IAbCipTagEnumeratorFactory? enumeratorFactory = null)
|
||||
@@ -40,6 +45,10 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
_driverInstanceId = driverInstanceId;
|
||||
_tagFactory = tagFactory ?? new LibplctagTagFactory();
|
||||
_enumeratorFactory = enumeratorFactory ?? new EmptyAbCipTagEnumeratorFactory();
|
||||
_poll = new PollGroupEngine(
|
||||
reader: ReadAsync,
|
||||
onChange: (handle, tagRef, snapshot) =>
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)));
|
||||
}
|
||||
|
||||
/// <summary>Shared UDT template cache. Exposed for PR 6 (UDT reader) + diagnostics.</summary>
|
||||
@@ -63,9 +72,6 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
}
|
||||
foreach (var tag in _options.Tags)
|
||||
{
|
||||
// UDT tags with declared Members fan out into synthetic member-tag entries addressable
|
||||
// by composed full-reference. Parent structure tag also stored so discovery can emit a
|
||||
// folder for it.
|
||||
_tagsByName[tag.Name] = tag;
|
||||
if (tag.DataType == AbCipDataType.Structure && tag.Members is { Count: > 0 })
|
||||
{
|
||||
@@ -82,6 +88,17 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Probe loops — one per device when enabled + a ProbeTagPath is configured.
|
||||
if (_options.Probe.Enabled && !string.IsNullOrWhiteSpace(_options.Probe.ProbeTagPath))
|
||||
{
|
||||
foreach (var state in _devices.Values)
|
||||
{
|
||||
state.ProbeCts = new CancellationTokenSource();
|
||||
var ct = state.ProbeCts.Token;
|
||||
_ = Task.Run(() => ProbeLoopAsync(state, ct), ct);
|
||||
}
|
||||
}
|
||||
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -98,16 +115,117 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public Task ShutdownAsync(CancellationToken cancellationToken)
|
||||
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _poll.DisposeAsync().ConfigureAwait(false);
|
||||
foreach (var state in _devices.Values)
|
||||
{
|
||||
try { state.ProbeCts?.Cancel(); } catch { }
|
||||
state.ProbeCts?.Dispose();
|
||||
state.ProbeCts = null;
|
||||
state.DisposeHandles();
|
||||
}
|
||||
_devices.Clear();
|
||||
_tagsByName.Clear();
|
||||
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
||||
}
|
||||
|
||||
// ---- ISubscribable (polling overlay via shared engine) ----
|
||||
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
|
||||
Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval));
|
||||
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
_poll.Unsubscribe(handle);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
// ---- IHostConnectivityProbe ----
|
||||
|
||||
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses() =>
|
||||
[.. _devices.Values.Select(s => new HostConnectivityStatus(s.Options.HostAddress, s.HostState, s.HostStateChangedUtc))];
|
||||
|
||||
private async Task ProbeLoopAsync(DeviceState state, CancellationToken ct)
|
||||
{
|
||||
var probeParams = new AbCipTagCreateParams(
|
||||
Gateway: state.ParsedAddress.Gateway,
|
||||
Port: state.ParsedAddress.Port,
|
||||
CipPath: state.ParsedAddress.CipPath,
|
||||
LibplctagPlcAttribute: state.Profile.LibplctagPlcAttribute,
|
||||
TagName: _options.Probe.ProbeTagPath!,
|
||||
Timeout: _options.Probe.Timeout);
|
||||
|
||||
IAbCipTagRuntime? probeRuntime = null;
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var success = false;
|
||||
try
|
||||
{
|
||||
probeRuntime ??= _tagFactory.Create(probeParams);
|
||||
// Lazy-init on first attempt; re-init after a transport failure has caused the
|
||||
// native handle to be destroyed.
|
||||
if (!state.ProbeInitialized)
|
||||
{
|
||||
await probeRuntime.InitializeAsync(ct).ConfigureAwait(false);
|
||||
state.ProbeInitialized = true;
|
||||
}
|
||||
await probeRuntime.ReadAsync(ct).ConfigureAwait(false);
|
||||
success = probeRuntime.GetStatus() == 0;
|
||||
}
|
||||
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Wire / init error — tear down the probe runtime so the next tick re-creates it.
|
||||
try { probeRuntime?.Dispose(); } catch { }
|
||||
probeRuntime = null;
|
||||
state.ProbeInitialized = false;
|
||||
}
|
||||
|
||||
TransitionDeviceState(state, success ? HostState.Running : HostState.Stopped);
|
||||
|
||||
try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { break; }
|
||||
}
|
||||
|
||||
try { probeRuntime?.Dispose(); } catch { }
|
||||
}
|
||||
|
||||
private void TransitionDeviceState(DeviceState state, HostState newState)
|
||||
{
|
||||
HostState old;
|
||||
lock (state.ProbeLock)
|
||||
{
|
||||
old = state.HostState;
|
||||
if (old == newState) return;
|
||||
state.HostState = newState;
|
||||
state.HostStateChangedUtc = DateTime.UtcNow;
|
||||
}
|
||||
OnHostStatusChanged?.Invoke(this,
|
||||
new HostStatusChangedEventArgs(state.Options.HostAddress, old, newState));
|
||||
}
|
||||
|
||||
// ---- IPerCallHostResolver ----
|
||||
|
||||
/// <summary>
|
||||
/// Resolve the device host address for a given tag full-reference. Per plan decision #144
|
||||
/// the Phase 6.1 resilience pipeline keys its bulkhead + breaker on
|
||||
/// <c>(DriverInstanceId, hostName)</c> so multi-PLC drivers get per-device isolation —
|
||||
/// one dead PLC trips only its own breaker. Unknown references fall back to the
|
||||
/// first configured device's host address rather than throwing — the invoker handles the
|
||||
/// mislookup at the capability level when the actual read returns BadNodeIdUnknown.
|
||||
/// </summary>
|
||||
public string ResolveHost(string fullReference)
|
||||
{
|
||||
if (_tagsByName.TryGetValue(fullReference, out var def))
|
||||
return def.DeviceHostAddress;
|
||||
return _options.Devices.FirstOrDefault()?.HostAddress ?? DriverInstanceId;
|
||||
}
|
||||
|
||||
// ---- IReadable ----
|
||||
|
||||
/// <summary>
|
||||
@@ -438,6 +556,12 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
public AbCipDeviceOptions Options { get; } = options;
|
||||
public AbCipPlcFamilyProfile Profile { get; } = profile;
|
||||
|
||||
public object ProbeLock { get; } = new();
|
||||
public HostState HostState { get; set; } = HostState.Unknown;
|
||||
public DateTime HostStateChangedUtc { get; set; } = DateTime.UtcNow;
|
||||
public CancellationTokenSource? ProbeCts { get; set; }
|
||||
public bool ProbeInitialized { get; set; }
|
||||
|
||||
public Dictionary<string, PlcTagHandle> TagHandles { get; } =
|
||||
new(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
|
||||
@@ -0,0 +1,227 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.AbCip;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class AbCipHostProbeTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task GetHostStatuses_returns_one_entry_per_device()
|
||||
{
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices =
|
||||
[
|
||||
new AbCipDeviceOptions("ab://10.0.0.5/1,0"),
|
||||
new AbCipDeviceOptions("ab://10.0.0.6/1,0"),
|
||||
],
|
||||
Probe = new AbCipProbeOptions { Enabled = false },
|
||||
}, "drv-1");
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var statuses = drv.GetHostStatuses();
|
||||
statuses.Count.ShouldBe(2);
|
||||
statuses.Select(s => s.HostName).ShouldBe(["ab://10.0.0.5/1,0", "ab://10.0.0.6/1,0"], ignoreOrder: true);
|
||||
statuses.ShouldAllBe(s => s.State == HostState.Unknown);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Probe_with_successful_read_transitions_to_Running()
|
||||
{
|
||||
var factory = new FakeAbCipTagFactory { Customise = p => new FakeAbCipTag(p) { Status = 0 } };
|
||||
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Probe = new AbCipProbeOptions
|
||||
{
|
||||
Enabled = true,
|
||||
Interval = TimeSpan.FromMilliseconds(100),
|
||||
Timeout = TimeSpan.FromMilliseconds(50),
|
||||
ProbeTagPath = "@raw_cpu_type",
|
||||
},
|
||||
}, "drv-1", factory);
|
||||
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
await WaitForAsync(() => transitions.Any(t => t.NewState == HostState.Running), TimeSpan.FromSeconds(2));
|
||||
|
||||
transitions.Select(t => t.NewState).ShouldContain(HostState.Running);
|
||||
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Running);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Probe_with_read_failure_transitions_to_Stopped()
|
||||
{
|
||||
var factory = new FakeAbCipTagFactory
|
||||
{
|
||||
Customise = p => new FakeAbCipTag(p) { ThrowOnRead = true },
|
||||
};
|
||||
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Probe = new AbCipProbeOptions
|
||||
{
|
||||
Enabled = true,
|
||||
Interval = TimeSpan.FromMilliseconds(100),
|
||||
Timeout = TimeSpan.FromMilliseconds(50),
|
||||
ProbeTagPath = "@raw_cpu_type",
|
||||
},
|
||||
}, "drv-1", factory);
|
||||
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
await WaitForAsync(() => transitions.Any(t => t.NewState == HostState.Stopped), TimeSpan.FromSeconds(2));
|
||||
|
||||
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Stopped);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Probe_disabled_when_Enabled_is_false()
|
||||
{
|
||||
var factory = new FakeAbCipTagFactory();
|
||||
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Probe = new AbCipProbeOptions { Enabled = false, ProbeTagPath = "@raw_cpu_type" },
|
||||
}, "drv-1", factory);
|
||||
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
await Task.Delay(300);
|
||||
|
||||
transitions.ShouldBeEmpty();
|
||||
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Unknown);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Probe_skipped_when_ProbeTagPath_is_null()
|
||||
{
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Probe = new AbCipProbeOptions { Enabled = true, ProbeTagPath = null },
|
||||
}, "drv-1");
|
||||
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
await Task.Delay(200);
|
||||
|
||||
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Unknown);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Probe_loops_across_multiple_devices_independently()
|
||||
{
|
||||
var factory = new FakeAbCipTagFactory
|
||||
{
|
||||
// Device A returns ok, Device B throws on read.
|
||||
Customise = p => p.Gateway == "10.0.0.5"
|
||||
? new FakeAbCipTag(p)
|
||||
: new FakeAbCipTag(p) { ThrowOnRead = true },
|
||||
};
|
||||
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices =
|
||||
[
|
||||
new AbCipDeviceOptions("ab://10.0.0.5/1,0"),
|
||||
new AbCipDeviceOptions("ab://10.0.0.6/1,0"),
|
||||
],
|
||||
Probe = new AbCipProbeOptions
|
||||
{
|
||||
Enabled = true, Interval = TimeSpan.FromMilliseconds(100),
|
||||
Timeout = TimeSpan.FromMilliseconds(50), ProbeTagPath = "@raw_cpu_type",
|
||||
},
|
||||
}, "drv-1", factory);
|
||||
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
await WaitForAsync(() => transitions.Count >= 2, TimeSpan.FromSeconds(3));
|
||||
|
||||
transitions.ShouldContain(t => t.HostName == "ab://10.0.0.5/1,0" && t.NewState == HostState.Running);
|
||||
transitions.ShouldContain(t => t.HostName == "ab://10.0.0.6/1,0" && t.NewState == HostState.Stopped);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
// ---- IPerCallHostResolver ----
|
||||
|
||||
[Fact]
|
||||
public async Task ResolveHost_returns_declared_device_for_known_tag()
|
||||
{
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices =
|
||||
[
|
||||
new AbCipDeviceOptions("ab://10.0.0.5/1,0"),
|
||||
new AbCipDeviceOptions("ab://10.0.0.6/1,0"),
|
||||
],
|
||||
Tags =
|
||||
[
|
||||
new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt),
|
||||
new AbCipTagDefinition("B", "ab://10.0.0.6/1,0", "B", AbCipDataType.DInt),
|
||||
],
|
||||
Probe = new AbCipProbeOptions { Enabled = false },
|
||||
}, "drv-1");
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
drv.ResolveHost("A").ShouldBe("ab://10.0.0.5/1,0");
|
||||
drv.ResolveHost("B").ShouldBe("ab://10.0.0.6/1,0");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ResolveHost_falls_back_to_first_device_for_unknown_reference()
|
||||
{
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Probe = new AbCipProbeOptions { Enabled = false },
|
||||
}, "drv-1");
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
drv.ResolveHost("does-not-exist").ShouldBe("ab://10.0.0.5/1,0");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ResolveHost_falls_back_to_DriverInstanceId_when_no_devices()
|
||||
{
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions(), "drv-1");
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
drv.ResolveHost("anything").ShouldBe("drv-1");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ResolveHost_for_UDT_member_walks_to_synthesised_definition()
|
||||
{
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.7/1,0")],
|
||||
Tags =
|
||||
[
|
||||
new AbCipTagDefinition("Motor1", "ab://10.0.0.7/1,0", "Motor1", AbCipDataType.Structure,
|
||||
Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)]),
|
||||
],
|
||||
Probe = new AbCipProbeOptions { Enabled = false },
|
||||
}, "drv-1");
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
drv.ResolveHost("Motor1.Speed").ShouldBe("ab://10.0.0.7/1,0");
|
||||
}
|
||||
|
||||
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + timeout;
|
||||
while (!condition() && DateTime.UtcNow < deadline)
|
||||
await Task.Delay(20);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,184 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.AbCip;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class AbCipSubscriptionTests
|
||||
{
|
||||
private static (AbCipDriver drv, FakeAbCipTagFactory factory) NewDriver(params AbCipTagDefinition[] tags)
|
||||
{
|
||||
var factory = new FakeAbCipTagFactory();
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Tags = tags,
|
||||
}, "drv-1", factory);
|
||||
return (drv, factory);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Initial_poll_raises_OnDataChange_for_every_tag()
|
||||
{
|
||||
var (drv, factory) = NewDriver(
|
||||
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt),
|
||||
new AbCipTagDefinition("Temp", "ab://10.0.0.5/1,0", "Temp", AbCipDataType.Real));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
factory.Customise = p => p.TagName switch
|
||||
{
|
||||
"Speed" => new FakeAbCipTag(p) { Value = 1800 },
|
||||
"Temp" => new FakeAbCipTag(p) { Value = 72.5f },
|
||||
_ => new FakeAbCipTag(p),
|
||||
};
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Speed", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None);
|
||||
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
events.Select(e => e.FullReference).ShouldContain("Speed");
|
||||
events.Select(e => e.FullReference).ShouldContain("Temp");
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unchanged_value_raises_only_once()
|
||||
{
|
||||
var (drv, factory) = NewDriver(
|
||||
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
factory.Customise = p => new FakeAbCipTag(p) { Value = 1800 };
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
await Task.Delay(500);
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
|
||||
events.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Value_change_between_polls_raises_OnDataChange()
|
||||
{
|
||||
var (drv, factory) = NewDriver(
|
||||
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 100 };
|
||||
factory.Customise = _ => tagRef;
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
||||
tagRef.Value = 200; // simulate PLC change
|
||||
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
events.Count.ShouldBeGreaterThanOrEqualTo(2);
|
||||
events.Last().Snapshot.Value.ShouldBe(200);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unsubscribe_halts_polling()
|
||||
{
|
||||
var (drv, factory) = NewDriver(
|
||||
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 1 };
|
||||
factory.Customise = _ => tagRef;
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
|
||||
var afterUnsub = events.Count;
|
||||
tagRef.Value = 999;
|
||||
await Task.Delay(400);
|
||||
events.Count.ShouldBe(afterUnsub);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Interval_below_100ms_is_floored()
|
||||
{
|
||||
var (drv, factory) = NewDriver(
|
||||
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
factory.Customise = p => new FakeAbCipTag(p) { Value = 1 };
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(5), CancellationToken.None);
|
||||
await Task.Delay(300);
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
|
||||
// Value is stable → only the initial-data push fires; the 100 ms floor keeps polls sparse enough
|
||||
// that no extra event is produced against a stable value.
|
||||
events.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ShutdownAsync_cancels_active_subscriptions()
|
||||
{
|
||||
var (drv, factory) = NewDriver(
|
||||
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
factory.Customise = p => new FakeAbCipTag(p) { Value = 1 };
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
_ = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
|
||||
var afterShutdown = events.Count;
|
||||
await Task.Delay(300);
|
||||
events.Count.ShouldBe(afterShutdown);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscription_on_UDT_member_uses_synthesised_full_reference()
|
||||
{
|
||||
var factory = new FakeAbCipTagFactory();
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Tags =
|
||||
[
|
||||
new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure,
|
||||
Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)]),
|
||||
],
|
||||
}, "drv-1", factory);
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
factory.Customise = p => p.TagName == "Motor1.Speed"
|
||||
? new FakeAbCipTag(p) { Value = 77 }
|
||||
: new FakeAbCipTag(p);
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Motor1.Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
|
||||
|
||||
events.First().Snapshot.Value.ShouldBe(77);
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
}
|
||||
|
||||
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + timeout;
|
||||
while (!condition() && DateTime.UtcNow < deadline)
|
||||
await Task.Delay(20);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user