TwinCAT PR 3 � Discovery + Subscribe + Probe + HostResolver #122

Merged
dohertj2 merged 1 commits from twincat-pr3-remaining-capabilities into v2 2026-04-19 18:38:46 -04:00
2 changed files with 383 additions and 4 deletions

View File

@@ -7,15 +7,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.TwinCAT;
/// the <see cref="IDriver"/> skeleton; read / write / discover / subscribe / probe / host-
/// resolver land in PRs 2 and 3.
/// </summary>
public sealed class TwinCATDriver : IDriver, IReadable, IWritable, IDisposable, IAsyncDisposable
public sealed class TwinCATDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable,
IHostConnectivityProbe, IPerCallHostResolver, IDisposable, IAsyncDisposable
{
private readonly TwinCATDriverOptions _options;
private readonly string _driverInstanceId;
private readonly ITwinCATClientFactory _clientFactory;
private readonly PollGroupEngine _poll;
private readonly Dictionary<string, DeviceState> _devices = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, TwinCATTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
private DriverHealth _health = new(DriverState.Unknown, null, null);
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
public TwinCATDriver(TwinCATDriverOptions options, string driverInstanceId,
ITwinCATClientFactory? clientFactory = null)
{
@@ -23,6 +28,10 @@ public sealed class TwinCATDriver : IDriver, IReadable, IWritable, IDisposable,
_options = options;
_driverInstanceId = driverInstanceId;
_clientFactory = clientFactory ?? new AdsTwinCATClientFactory();
_poll = new PollGroupEngine(
reader: ReadAsync,
onChange: (handle, tagRef, snapshot) =>
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)));
}
public string DriverInstanceId => _driverInstanceId;
@@ -41,6 +50,16 @@ public sealed class TwinCATDriver : IDriver, IReadable, IWritable, IDisposable,
_devices[device.HostAddress] = new DeviceState(addr, device);
}
foreach (var tag in _options.Tags) _tagsByName[tag.Name] = tag;
if (_options.Probe.Enabled)
{
foreach (var state in _devices.Values)
{
state.ProbeCts = new CancellationTokenSource();
var ct = state.ProbeCts.Token;
_ = Task.Run(() => ProbeLoopAsync(state, ct), ct);
}
}
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
}
catch (Exception ex)
@@ -57,13 +76,19 @@ public sealed class TwinCATDriver : IDriver, IReadable, IWritable, IDisposable,
await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false);
}
public Task ShutdownAsync(CancellationToken cancellationToken)
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
foreach (var state in _devices.Values) state.DisposeClient();
await _poll.DisposeAsync().ConfigureAwait(false);
foreach (var state in _devices.Values)
{
try { state.ProbeCts?.Cancel(); } catch { }
state.ProbeCts?.Dispose();
state.ProbeCts = null;
state.DisposeClient();
}
_devices.Clear();
_tagsByName.Clear();
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
return Task.CompletedTask;
}
public DriverHealth GetHealth() => _health;
@@ -183,6 +208,100 @@ public sealed class TwinCATDriver : IDriver, IReadable, IWritable, IDisposable,
return results;
}
// ---- ITagDiscovery ----
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(builder);
var root = builder.Folder("TwinCAT", "TwinCAT");
foreach (var device in _options.Devices)
{
var label = device.DeviceName ?? device.HostAddress;
var deviceFolder = root.Folder(device.HostAddress, label);
var tagsForDevice = _options.Tags.Where(t =>
string.Equals(t.DeviceHostAddress, device.HostAddress, StringComparison.OrdinalIgnoreCase));
foreach (var tag in tagsForDevice)
{
deviceFolder.Variable(tag.Name, tag.Name, new DriverAttributeInfo(
FullName: tag.Name,
DriverDataType: tag.DataType.ToDriverDataType(),
IsArray: false,
ArrayDim: null,
SecurityClass: tag.Writable
? SecurityClassification.Operate
: SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false,
WriteIdempotent: tag.WriteIdempotent));
}
}
return Task.CompletedTask;
}
// ---- ISubscribable (polling overlay via shared engine) ----
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval));
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
_poll.Unsubscribe(handle);
return Task.CompletedTask;
}
// ---- IHostConnectivityProbe ----
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses() =>
[.. _devices.Values.Select(s => new HostConnectivityStatus(s.Options.HostAddress, s.HostState, s.HostStateChangedUtc))];
private async Task ProbeLoopAsync(DeviceState state, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var success = false;
try
{
var client = await EnsureConnectedAsync(state, ct).ConfigureAwait(false);
success = await client.ProbeAsync(ct).ConfigureAwait(false);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested) { break; }
catch
{
// Probe failure — EnsureConnectedAsync's connect-failure path already disposed
// + cleared the client, so next tick will reconnect.
}
TransitionDeviceState(state, success ? HostState.Running : HostState.Stopped);
try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { break; }
}
}
private void TransitionDeviceState(DeviceState state, HostState newState)
{
HostState old;
lock (state.ProbeLock)
{
old = state.HostState;
if (old == newState) return;
state.HostState = newState;
state.HostStateChangedUtc = DateTime.UtcNow;
}
OnHostStatusChanged?.Invoke(this,
new HostStatusChangedEventArgs(state.Options.HostAddress, old, newState));
}
// ---- IPerCallHostResolver ----
public string ResolveHost(string fullReference)
{
if (_tagsByName.TryGetValue(fullReference, out var def))
return def.DeviceHostAddress;
return _options.Devices.FirstOrDefault()?.HostAddress ?? DriverInstanceId;
}
private async Task<ITwinCATClient> EnsureConnectedAsync(DeviceState device, CancellationToken ct)
{
if (device.Client is { IsConnected: true } c) return c;
@@ -210,6 +329,11 @@ public sealed class TwinCATDriver : IDriver, IReadable, IWritable, IDisposable,
public TwinCATDeviceOptions Options { get; } = options;
public ITwinCATClient? Client { get; set; }
public object ProbeLock { get; } = new();
public HostState HostState { get; set; } = HostState.Unknown;
public DateTime HostStateChangedUtc { get; set; } = DateTime.UtcNow;
public CancellationTokenSource? ProbeCts { get; set; }
public void DisposeClient()
{
Client?.Dispose();

View File

@@ -0,0 +1,255 @@
using System.Collections.Concurrent;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.TwinCAT;
namespace ZB.MOM.WW.OtOpcUa.Driver.TwinCAT.Tests;
[Trait("Category", "Unit")]
public sealed class TwinCATCapabilityTests
{
// ---- ITagDiscovery ----
[Fact]
public async Task DiscoverAsync_emits_pre_declared_tags()
{
var builder = new RecordingBuilder();
var drv = new TwinCATDriver(new TwinCATDriverOptions
{
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851", DeviceName: "Mach1")],
Tags =
[
new TwinCATTagDefinition("Speed", "ads://5.23.91.23.1.1:851", "MAIN.Speed", TwinCATDataType.DInt),
new TwinCATTagDefinition("Status", "ads://5.23.91.23.1.1:851", "GVL.Status", TwinCATDataType.Bool, Writable: false),
],
Probe = new TwinCATProbeOptions { Enabled = false },
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
await drv.DiscoverAsync(builder, CancellationToken.None);
builder.Folders.ShouldContain(f => f.BrowseName == "TwinCAT");
builder.Folders.ShouldContain(f => f.BrowseName == "ads://5.23.91.23.1.1:851" && f.DisplayName == "Mach1");
builder.Variables.Single(v => v.BrowseName == "Speed").Info.SecurityClass.ShouldBe(SecurityClassification.Operate);
builder.Variables.Single(v => v.BrowseName == "Status").Info.SecurityClass.ShouldBe(SecurityClassification.ViewOnly);
}
// ---- ISubscribable ----
[Fact]
public async Task Subscribe_initial_poll_raises_OnDataChange()
{
var factory = new FakeTwinCATClientFactory
{
Customise = () => new FakeTwinCATClient { Values = { ["MAIN.X"] = 42 } },
};
var drv = new TwinCATDriver(new TwinCATDriverOptions
{
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
Tags = [new TwinCATTagDefinition("X", "ads://5.23.91.23.1.1:851", "MAIN.X", TwinCATDataType.DInt)],
Probe = new TwinCATProbeOptions { Enabled = false },
}, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["X"], TimeSpan.FromMilliseconds(200), CancellationToken.None);
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
events.First().Snapshot.Value.ShouldBe(42);
await drv.UnsubscribeAsync(handle, CancellationToken.None);
}
[Fact]
public async Task ShutdownAsync_cancels_active_subscriptions()
{
var factory = new FakeTwinCATClientFactory
{
Customise = () => new FakeTwinCATClient { Values = { ["MAIN.X"] = 1 } },
};
var drv = new TwinCATDriver(new TwinCATDriverOptions
{
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
Tags = [new TwinCATTagDefinition("X", "ads://5.23.91.23.1.1:851", "MAIN.X", TwinCATDataType.DInt)],
Probe = new TwinCATProbeOptions { Enabled = false },
}, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
_ = await drv.SubscribeAsync(["X"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
await drv.ShutdownAsync(CancellationToken.None);
var afterShutdown = events.Count;
await Task.Delay(200);
events.Count.ShouldBe(afterShutdown);
}
// ---- IHostConnectivityProbe ----
[Fact]
public async Task GetHostStatuses_returns_entry_per_device()
{
var drv = new TwinCATDriver(new TwinCATDriverOptions
{
Devices =
[
new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851"),
new TwinCATDeviceOptions("ads://5.23.91.24.1.1:851"),
],
Probe = new TwinCATProbeOptions { Enabled = false },
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
drv.GetHostStatuses().Count.ShouldBe(2);
}
[Fact]
public async Task Probe_transitions_to_Running_on_successful_probe()
{
var factory = new FakeTwinCATClientFactory
{
Customise = () => new FakeTwinCATClient { ProbeResult = true },
};
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
var drv = new TwinCATDriver(new TwinCATDriverOptions
{
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
Probe = new TwinCATProbeOptions
{
Enabled = true, Interval = TimeSpan.FromMilliseconds(100),
Timeout = TimeSpan.FromMilliseconds(50),
},
}, "drv-1", factory);
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await WaitForAsync(() => transitions.Any(t => t.NewState == HostState.Running), TimeSpan.FromSeconds(2));
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Running);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Probe_transitions_to_Stopped_on_probe_failure()
{
var factory = new FakeTwinCATClientFactory
{
Customise = () => new FakeTwinCATClient { ProbeResult = false },
};
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
var drv = new TwinCATDriver(new TwinCATDriverOptions
{
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
Probe = new TwinCATProbeOptions
{
Enabled = true, Interval = TimeSpan.FromMilliseconds(100),
Timeout = TimeSpan.FromMilliseconds(50),
},
}, "drv-1", factory);
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await WaitForAsync(() => transitions.Any(t => t.NewState == HostState.Stopped), TimeSpan.FromSeconds(2));
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Stopped);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Probe_disabled_when_Enabled_is_false()
{
var factory = new FakeTwinCATClientFactory();
var drv = new TwinCATDriver(new TwinCATDriverOptions
{
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
Probe = new TwinCATProbeOptions { Enabled = false },
}, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
await Task.Delay(200);
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Unknown);
await drv.ShutdownAsync(CancellationToken.None);
}
// ---- IPerCallHostResolver ----
[Fact]
public async Task ResolveHost_returns_declared_device_for_known_tag()
{
var drv = new TwinCATDriver(new TwinCATDriverOptions
{
Devices =
[
new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851"),
new TwinCATDeviceOptions("ads://5.23.91.24.1.1:851"),
],
Tags =
[
new TwinCATTagDefinition("A", "ads://5.23.91.23.1.1:851", "MAIN.A", TwinCATDataType.DInt),
new TwinCATTagDefinition("B", "ads://5.23.91.24.1.1:851", "MAIN.B", TwinCATDataType.DInt),
],
Probe = new TwinCATProbeOptions { Enabled = false },
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
drv.ResolveHost("A").ShouldBe("ads://5.23.91.23.1.1:851");
drv.ResolveHost("B").ShouldBe("ads://5.23.91.24.1.1:851");
}
[Fact]
public async Task ResolveHost_falls_back_to_first_device_for_unknown_ref()
{
var drv = new TwinCATDriver(new TwinCATDriverOptions
{
Devices = [new TwinCATDeviceOptions("ads://5.23.91.23.1.1:851")],
Probe = new TwinCATProbeOptions { Enabled = false },
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
drv.ResolveHost("missing").ShouldBe("ads://5.23.91.23.1.1:851");
}
[Fact]
public async Task ResolveHost_falls_back_to_DriverInstanceId_when_no_devices()
{
var drv = new TwinCATDriver(new TwinCATDriverOptions(), "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
drv.ResolveHost("anything").ShouldBe("drv-1");
}
// ---- helpers ----
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (!condition() && DateTime.UtcNow < deadline)
await Task.Delay(20);
}
private sealed class RecordingBuilder : IAddressSpaceBuilder
{
public List<(string BrowseName, string DisplayName)> Folders { get; } = new();
public List<(string BrowseName, DriverAttributeInfo Info)> Variables { get; } = new();
public IAddressSpaceBuilder Folder(string browseName, string displayName)
{ Folders.Add((browseName, displayName)); return this; }
public IVariableHandle Variable(string browseName, string displayName, DriverAttributeInfo info)
{ Variables.Add((browseName, info)); return new Handle(info.FullName); }
public void AddProperty(string _, DriverDataType __, object? ___) { }
private sealed class Handle(string fullRef) : IVariableHandle
{
public string FullReference => fullRef;
public IAlarmConditionSink MarkAsAlarmCondition(AlarmConditionInfo info) => new NullSink();
}
private sealed class NullSink : IAlarmConditionSink { public void OnTransition(AlarmEventArgs args) { } }
}
}