TwinCAT follow-up — Native ADS notifications for ISubscribable. Closes task #189 — upgrades TwinCATDriver's subscription path from polling (shared PollGroupEngine) to native AdsClient.AddDeviceNotificationExAsync so the PLC pushes changes on its own cycle rather than the driver polling. Strictly better for latency + CPU — TC2 and TC3 runtimes notify on value change with sub-millisecond latency from the PLC cycle. ITwinCATClient gains AddNotificationAsync — takes symbolPath + TwinCATDataType + optional bitIndex + cycleTime + onChange callback + CancellationToken; returns an ITwinCATNotificationHandle whose Dispose tears the notification down on the wire. Bit-within-word reads supported — the parent word value arrives via the notification, driver extracts the bit before invoking the callback (same ExtractBit path as the read surface from PR 2). AdsTwinCATClient — subscribes to AdsClient.AdsNotificationEx in the ctor, maintains a ConcurrentDictionary<uint, NotificationRegistration> keyed on the server-side notification handle. AddDeviceNotificationExAsync returns Task<ResultHandle> with Handle + ErrorCode; non-NoError throws InvalidOperationException so the driver can catch + retry. Notification event args carry Handle + Value + DataType; lookup in _notifications dict routes the value through any bit-extraction + calls the consumer callback. Consumer-side exceptions are swallowed so a misbehaving callback can't crash the ADS notification thread. Dispose unsubscribes from AdsNotificationEx + clears the dict + disposes AdsClient. NotificationRegistration is ITwinCATNotificationHandle — Dispose fires DeleteDeviceNotificationAsync as fire-and-forget with CancellationToken.None (caller has already committed to teardown; blocking would slow shutdown). TwinCATDriverOptions.UseNativeNotifications — new bool, default true. When true the driver uses native notifications; when false it falls through to the shared PollGroupEngine (same semantics as other libplctag-backed drivers, also a safety valve for targets with notification limits). TwinCATDriver.SubscribeAsync dual-path — if UseNativeNotifications false delegate into _poll.Subscribe (unchanged behavior from PR 3). If true, iterate fullReferences, resolve each to its device's client via EnsureConnectedAsync (reuses PR 2's per-device connection cache), parse the SymbolPath via TwinCATSymbolPath (preserves bit-in-word support), call ITwinCATClient.AddNotificationAsync with a closure over the FullReference (not the ADS symbol — OPC UA subscribers addressed the driver-side name). Per-registration callback bridges (_, value) → OnDataChange event with a fresh DataValueSnapshot (Good status, current UtcNow timestamps). Any mid-registration failure triggers a try/catch that disposes every already-registered handle before rethrowing, keeping the driver in a clean never-existed state rather than half-registered. UnsubscribeAsync dispatches on handle type — NativeSubscriptionHandle disposes all its cached ITwinCATNotificationHandles; anything else delegates to _poll.Unsubscribe for the poll fallback. ShutdownAsync tears down native subs first (so AdsClient-level cleanup happens before the client itself disposes), then PollGroupEngine, then per-device probe CTS + client. NativeSubscriptionHandle DiagnosticId prefixes with twincat-native-sub- so Admin UI + logs can distinguish the paths. 9 new unit tests in TwinCATNativeNotificationTests — native subscribe registers one notification per tag, pushed value via FireNotification fires OnDataChange with the right FullReference (driver-side, not ADS symbol), unsubscribe disposes all notifications, unsubscribe halts future notifications, partial-failure cleanup via FailAfterNAddsFake (first succeeds, second throws → first gets torn down + Notifications count returns to 0 + AddCallCount=2 proving the test actually exercised both calls), shutdown disposes subscriptions, poll fallback works when UseNativeNotifications=false (no native handles created + initial-data push still fires), handle DiagnosticId distinguishes native vs poll. Existing poll-mode ISubscribable tests in TwinCATCapabilityTests updated with UseNativeNotifications=false so they continue testing the poll path specifically — both poll + native paths have test coverage now. TwinCATDriverTests got Probe.Enabled=false added because the default factory creates a real AdsClient which was flakily affected by parallel test execution sharing AMS router state. Total TwinCAT unit tests now 93/93 passing (+8 from PR 3's 85 counting the new native tests + 2 existing tests that got options tweaks). Full solution builds 0 errors; Modbus / AbCip / AbLegacy / other drivers untouched. TwinCAT driver is now feature-complete end-to-end — read / write / discover / native-subscribe / probe / host-resolve, with poll-mode as a safety valve. Unblocks closing task #120 for TwinCAT; remaining sub-task: FOCAS + task #188 (symbol-browsing — lower priority than FOCAS since real config flows still use pre-declared tags).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -56,6 +56,48 @@ internal class FakeTwinCATClient : ITwinCATClient
|
||||
DisposeCount++;
|
||||
IsConnected = false;
|
||||
}
|
||||
|
||||
// ---- notification fake ----
|
||||
|
||||
public List<FakeNotification> Notifications { get; } = new();
|
||||
public bool ThrowOnAddNotification { get; set; }
|
||||
|
||||
public virtual Task<ITwinCATNotificationHandle> AddNotificationAsync(
|
||||
string symbolPath, TwinCATDataType type, int? bitIndex, TimeSpan cycleTime,
|
||||
Action<string, object?> onChange, CancellationToken cancellationToken)
|
||||
{
|
||||
if (ThrowOnAddNotification)
|
||||
throw Exception ?? new InvalidOperationException("fake AddNotification failure");
|
||||
|
||||
var reg = new FakeNotification(symbolPath, type, bitIndex, onChange, this);
|
||||
Notifications.Add(reg);
|
||||
return Task.FromResult<ITwinCATNotificationHandle>(reg);
|
||||
}
|
||||
|
||||
/// <summary>Fire a change event through the registered callback for <paramref name="symbolPath"/>.</summary>
|
||||
public void FireNotification(string symbolPath, object? value)
|
||||
{
|
||||
foreach (var n in Notifications)
|
||||
if (!n.Disposed && string.Equals(n.SymbolPath, symbolPath, StringComparison.OrdinalIgnoreCase))
|
||||
n.OnChange(symbolPath, value);
|
||||
}
|
||||
|
||||
public sealed class FakeNotification(
|
||||
string symbolPath, TwinCATDataType type, int? bitIndex,
|
||||
Action<string, object?> onChange, FakeTwinCATClient owner) : ITwinCATNotificationHandle
|
||||
{
|
||||
public string SymbolPath { get; } = symbolPath;
|
||||
public TwinCATDataType Type { get; } = type;
|
||||
public int? BitIndex { get; } = bitIndex;
|
||||
public Action<string, object?> OnChange { get; } = onChange;
|
||||
public bool Disposed { get; private set; }
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Disposed = true;
|
||||
owner.Notifications.Remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class FakeTwinCATClientFactory : ITwinCATClientFactory
|
||||
|
||||
@@ -49,6 +49,7 @@ public sealed class TwinCATCapabilityTests
|
||||
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
|
||||
Tags = [new TwinCATTagDefinition("X", "ads://5.23.91.23.1.1:851", "MAIN.X", TwinCATDataType.DInt)],
|
||||
Probe = new TwinCATProbeOptions { Enabled = false },
|
||||
UseNativeNotifications = false, // poll-mode test
|
||||
}, "drv-1", factory);
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
@@ -74,6 +75,7 @@ public sealed class TwinCATCapabilityTests
|
||||
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
|
||||
Tags = [new TwinCATTagDefinition("X", "ads://5.23.91.23.1.1:851", "MAIN.X", TwinCATDataType.DInt)],
|
||||
Probe = new TwinCATProbeOptions { Enabled = false },
|
||||
UseNativeNotifications = false, // poll-mode test
|
||||
}, "drv-1", factory);
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ public sealed class TwinCATDriverTests
|
||||
var drv = new TwinCATDriver(new TwinCATDriverOptions
|
||||
{
|
||||
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
|
||||
Probe = new TwinCATProbeOptions { Enabled = false },
|
||||
}, "drv-1");
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
@@ -68,6 +69,7 @@ public sealed class TwinCATDriverTests
|
||||
var drv = new TwinCATDriver(new TwinCATDriverOptions
|
||||
{
|
||||
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
|
||||
Probe = new TwinCATProbeOptions { Enabled = false },
|
||||
}, "drv-1");
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
await drv.ReinitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
@@ -0,0 +1,221 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.TwinCAT;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.TwinCAT.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class TwinCATNativeNotificationTests
|
||||
{
|
||||
private static (TwinCATDriver drv, FakeTwinCATClientFactory factory) NewNativeDriver(params TwinCATTagDefinition[] tags)
|
||||
{
|
||||
var factory = new FakeTwinCATClientFactory();
|
||||
var drv = new TwinCATDriver(new TwinCATDriverOptions
|
||||
{
|
||||
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
|
||||
Tags = tags,
|
||||
Probe = new TwinCATProbeOptions { Enabled = false },
|
||||
UseNativeNotifications = true,
|
||||
}, "drv-1", factory);
|
||||
return (drv, factory);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Native_subscribe_registers_one_notification_per_tag()
|
||||
{
|
||||
var (drv, factory) = NewNativeDriver(
|
||||
new TwinCATTagDefinition("A", "ads://5.23.91.23.1.1:851", "MAIN.A", TwinCATDataType.DInt),
|
||||
new TwinCATTagDefinition("B", "ads://5.23.91.23.1.1:851", "MAIN.B", TwinCATDataType.Real));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["A", "B"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
handle.DiagnosticId.ShouldStartWith("twincat-native-sub-");
|
||||
|
||||
factory.Clients[0].Notifications.Count.ShouldBe(2);
|
||||
factory.Clients[0].Notifications.Select(n => n.SymbolPath).ShouldBe(["MAIN.A", "MAIN.B"], ignoreOrder: true);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Native_notification_fires_OnDataChange_with_pushed_value()
|
||||
{
|
||||
var (drv, factory) = NewNativeDriver(
|
||||
new TwinCATTagDefinition("Speed", "ads://5.23.91.23.1.1:851", "MAIN.Speed", TwinCATDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
_ = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
|
||||
factory.Clients[0].FireNotification("MAIN.Speed", 4200);
|
||||
factory.Clients[0].FireNotification("MAIN.Speed", 4201);
|
||||
|
||||
events.Count.ShouldBe(2);
|
||||
events.Last().Snapshot.Value.ShouldBe(4201);
|
||||
events.Last().FullReference.ShouldBe("Speed"); // driver-side reference, not ADS symbol
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Native_unsubscribe_disposes_all_notifications()
|
||||
{
|
||||
var (drv, factory) = NewNativeDriver(
|
||||
new TwinCATTagDefinition("A", "ads://5.23.91.23.1.1:851", "MAIN.A", TwinCATDataType.DInt),
|
||||
new TwinCATTagDefinition("B", "ads://5.23.91.23.1.1:851", "MAIN.B", TwinCATDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["A", "B"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
factory.Clients[0].Notifications.Count.ShouldBe(2);
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
factory.Clients[0].Notifications.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Native_unsubscribe_halts_future_notifications()
|
||||
{
|
||||
var (drv, factory) = NewNativeDriver(
|
||||
new TwinCATTagDefinition("X", "ads://5.23.91.23.1.1:851", "MAIN.X", TwinCATDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["X"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
factory.Clients[0].FireNotification("MAIN.X", 1);
|
||||
var snapshotFake = factory.Clients[0];
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
|
||||
var afterUnsub = events.Count;
|
||||
// After unsubscribe the fake's Notifications list is empty so FireNotification finds nothing
|
||||
// to invoke. This mirrors the production contract — disposed handles no longer deliver.
|
||||
snapshotFake.FireNotification("MAIN.X", 999);
|
||||
events.Count.ShouldBe(afterUnsub);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Native_subscribe_failure_mid_registration_cleans_up_partial_state()
|
||||
{
|
||||
// Fail-on-second-call fake — first AddNotificationAsync succeeds, second throws.
|
||||
// Subscribe's catch block must tear the first one down before rethrowing so no zombie
|
||||
// notification lingers.
|
||||
var fake = new FailAfterNAddsFake(new AbTagParamsIrrelevant(), succeedBefore: 1);
|
||||
var factory = new FakeTwinCATClientFactory { Customise = () => fake };
|
||||
var drv = new TwinCATDriver(new TwinCATDriverOptions
|
||||
{
|
||||
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
|
||||
Tags =
|
||||
[
|
||||
new TwinCATTagDefinition("A", "ads://5.23.91.23.1.1:851", "MAIN.A", TwinCATDataType.DInt),
|
||||
new TwinCATTagDefinition("B", "ads://5.23.91.23.1.1:851", "MAIN.B", TwinCATDataType.DInt),
|
||||
],
|
||||
Probe = new TwinCATProbeOptions { Enabled = false },
|
||||
UseNativeNotifications = true,
|
||||
}, "drv-1", factory);
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(() =>
|
||||
drv.SubscribeAsync(["A", "B"], TimeSpan.FromMilliseconds(100), CancellationToken.None));
|
||||
|
||||
// First registration succeeded then got torn down by the catch; second threw.
|
||||
fake.AddCallCount.ShouldBe(2);
|
||||
fake.Notifications.Count.ShouldBe(0); // partial handle cleaned up
|
||||
}
|
||||
|
||||
private sealed class AbTagParamsIrrelevant { }
|
||||
|
||||
private sealed class FailAfterNAddsFake : FakeTwinCATClient
|
||||
{
|
||||
private readonly int _succeedBefore;
|
||||
public int AddCallCount { get; private set; }
|
||||
|
||||
public FailAfterNAddsFake(AbTagParamsIrrelevant _, int succeedBefore) : base()
|
||||
{
|
||||
_succeedBefore = succeedBefore;
|
||||
}
|
||||
|
||||
public override Task<ITwinCATNotificationHandle> AddNotificationAsync(
|
||||
string symbolPath, TwinCATDataType type, int? bitIndex, TimeSpan cycleTime,
|
||||
Action<string, object?> onChange, CancellationToken cancellationToken)
|
||||
{
|
||||
AddCallCount++;
|
||||
if (AddCallCount > _succeedBefore)
|
||||
throw new InvalidOperationException($"fake fail on call #{AddCallCount}");
|
||||
return base.AddNotificationAsync(symbolPath, type, bitIndex, cycleTime, onChange, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Native_shutdown_disposes_subscriptions()
|
||||
{
|
||||
var (drv, factory) = NewNativeDriver(
|
||||
new TwinCATTagDefinition("X", "ads://5.23.91.23.1.1:851", "MAIN.X", TwinCATDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
_ = await drv.SubscribeAsync(["X"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
factory.Clients[0].Notifications.Count.ShouldBe(1);
|
||||
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
factory.Clients[0].Notifications.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Poll_path_still_works_when_UseNativeNotifications_false()
|
||||
{
|
||||
var factory = new FakeTwinCATClientFactory
|
||||
{
|
||||
Customise = () => new FakeTwinCATClient { Values = { ["MAIN.X"] = 7 } },
|
||||
};
|
||||
var drv = new TwinCATDriver(new TwinCATDriverOptions
|
||||
{
|
||||
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
|
||||
Tags = [new TwinCATTagDefinition("X", "ads://5.23.91.23.1.1:851", "MAIN.X", TwinCATDataType.DInt)],
|
||||
Probe = new TwinCATProbeOptions { Enabled = false },
|
||||
UseNativeNotifications = false,
|
||||
}, "drv-1", factory);
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["X"], TimeSpan.FromMilliseconds(150), CancellationToken.None);
|
||||
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
|
||||
|
||||
events.First().Snapshot.Value.ShouldBe(7);
|
||||
factory.Clients[0].Notifications.ShouldBeEmpty(); // no native notifications on poll path
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribe_handle_DiagnosticId_indicates_native_vs_poll()
|
||||
{
|
||||
var (drvNative, _) = NewNativeDriver(
|
||||
new TwinCATTagDefinition("X", "ads://5.23.91.23.1.1:851", "MAIN.X", TwinCATDataType.DInt));
|
||||
await drvNative.InitializeAsync("{}", CancellationToken.None);
|
||||
var nativeHandle = await drvNative.SubscribeAsync(["X"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
nativeHandle.DiagnosticId.ShouldContain("native");
|
||||
|
||||
var factoryPoll = new FakeTwinCATClientFactory
|
||||
{
|
||||
Customise = () => new FakeTwinCATClient { Values = { ["MAIN.X"] = 1 } },
|
||||
};
|
||||
var drvPoll = new TwinCATDriver(new TwinCATDriverOptions
|
||||
{
|
||||
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
|
||||
Tags = [new TwinCATTagDefinition("X", "ads://5.23.91.23.1.1:851", "MAIN.X", TwinCATDataType.DInt)],
|
||||
Probe = new TwinCATProbeOptions { Enabled = false },
|
||||
UseNativeNotifications = false,
|
||||
}, "drv-1", factoryPoll);
|
||||
await drvPoll.InitializeAsync("{}", CancellationToken.None);
|
||||
var pollHandle = await drvPoll.SubscribeAsync(["X"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
pollHandle.DiagnosticId.ShouldNotContain("native");
|
||||
}
|
||||
|
||||
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + timeout;
|
||||
while (!condition() && DateTime.UtcNow < deadline)
|
||||
await Task.Delay(20);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user