Compare commits

...

6 Commits

Author SHA1 Message Date
Joseph Doherty
ac14ba9664 AB CIP PR 8 — IHostConnectivityProbe + IPerCallHostResolver. Per-device probe loop — when AbCipProbeOptions.Enabled + ProbeTagPath are configured, InitializeAsync kicks off one probe task per device that periodically reads the probe tag (lazy-init on first attempt, re-init on wire failure so destroyed native handles get recreated rather than silently staying broken), transitions Running on status==0 or Stopped on non-zero status / exception, raises OnHostStatusChanged with the device HostAddress as the host-name key. TransitionDeviceState guards against spurious same-state events under a per-device lock. ShutdownAsync cancels + disposes each probe's CTS + its captured runtime. DeviceState record gains ProbeLock / HostState / HostStateChangedUtc / ProbeCts / ProbeInitialized fields. IHostConnectivityProbe.GetHostStatuses returns one HostConnectivityStatus per device with the current state + last-change timestamp, surfaced to Admin /hosts per plan decision #144. IPerCallHostResolver.ResolveHost maps a tag full-reference to its DeviceHostAddress via the _tagsByName dict populated at Initialize time, which means UDT member full-references (Motor1.Speed synthesised by PR 6) resolve to the parent UDT's device without extra bookkeeping. Unknown references fall back to the first configured device's host address (invoker handles the actual mislookup at read time as BadNodeIdUnknown), and when no devices are configured resolver returns DriverInstanceId so the single-host fallback pipeline still works. Matches the plan decision #144 contract — Phase 6.1 resilience keys its bulkhead + breaker on (DriverInstanceId, ResolvedHostName) so a dead PLC trips only its own breaker, healthy siblings keep serving. 10 new unit tests in AbCipHostProbeTests covering GetHostStatuses returning one entry per device, probe success transitioning Unknown → Running, probe exception transitioning to Stopped, Enabled=false skipping the loop (no events + state stays Unknown), null ProbeTagPath skipping the loop, multi-device independent probe behavior (one Running + one Stopped simultaneously), ResolveHost for known tags returning the declared DeviceHostAddress, ResolveHost for unknown ref falling back to first device, ResolveHost falling back to DriverInstanceId when no devices, ResolveHost for UDT member walking to the synthesised member definition. Total AbCip unit tests now 147/147 passing (+10 from PR 7's 137). Full solution builds 0 errors; Modbus + other drivers untouched.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 17:15:10 -04:00
5978ea002d Merge pull request (#114) - AbCip ISubscribable 2026-04-19 17:13:41 -04:00
Joseph Doherty
33780eb64c AB CIP PR 7 — ISubscribable via shared PollGroupEngine. AbCipDriver now implements ISubscribable — Subscribe delegates into the PollGroupEngine extracted in PR 1, Unsubscribe releases the subscription, ShutdownAsync disposes the engine cancelling every active subscription. OnDataChange event wired through the engine's on-change callback so external subscribers see the driver as sender. The engine's reader delegate points at the driver's ReadAsync (already handles lazy runtime init + caching via EnsureTagRuntimeAsync) — each poll tick batch-reads every subscribed tag in one IReadable call. 100ms interval floor inherited from PollGroupEngine.DefaultMinInterval matches Modbus convention. Initial-data push on first poll preserved via forceRaise=true. Exception-tolerant loop preserved — individual read failures show up as DataValueSnapshot with non-Good StatusCode via the status-code mapping PR 3 established. 7 new unit tests in AbCipSubscriptionTests covering initial-poll raising per tag, unchanged value raising only once, value change between polls triggering a new event, Unsubscribe halting the loop, 100ms floor keeping a 5ms request from generating extra events against a stable value, ShutdownAsync cancelling active subscriptions, UDT member subscription routing through the synthesised Motor1.Speed full-reference (proving PR 6's fan-out composes correctly with PR 7's subscription path). Total AbCip unit tests now 137/137 passing (+7 from PR 6's 130). Validates that the shared PollGroupEngine from PR 1 works correctly for a second driver, closing the original motivation for the extraction. Full solution builds 0 errors; Modbus + other drivers untouched.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 17:11:51 -04:00
521bcb2f68 Merge pull request (#113) - AbCip UDT members 2026-04-19 17:11:08 -04:00
Joseph Doherty
b06a1ba607 AB CIP PR 6 — UDT member-declaration support. Declaration-driven UDT member fan-out — users declare a UDT-typed tag once with an explicit Members list and the driver (1) expands member-addressable tags synthetically at Initialize time so Read/Write/Subscribe hit individual native tags per member, (2) emits a folder + one Variable per member in DiscoverAsync instead of a single opaque Structure Variable. Matches the Logix 5000 addressing convention where members are reached via dotted syntax (Motor1.Speed, Motor1.Running) — AbCipTagPath already parsed this shape in PR 2, so PR 6 just had to wire config→TagPath composition. New AbCipStructureMember record — Name / DataType / Writable / WriteIdempotent — plus optional Members list on AbCipTagDefinition that's ignored for atomic types and optional for Structure types. When Structure has null or empty Members the driver falls back to emitting a single opaque Variable so downstream config can address members manually (the "black box" path documented in AbCipTagDefinition's docstring). AbCipDriver.InitializeAsync now iterates tags + for every Structure tag with non-empty Members synthesises a child AbCipTagDefinition per member (composed full-reference Parent.Member + composed TagPath parent.member passed through to libplctag as a normal symbolic read). Per-member Writable/WriteIdempotent metadata propagates so IWritable correctly rejects writes to members flagged non-writable even when the parent tag is writable — each member stands alone from the resilience + authz perspective. DiscoverAsync gains a matching branch — Structure with Members emits an intermediate folder named after the parent tag + one Variable per member under it (browse name = member.Name, FullName = Parent.Member). Members with Writable=false surface SecurityClassification.ViewOnly, WriteIdempotent flag passes through to the DriverAttributeInfo. Structure without Members falls through to the normal single-Variable path. Whole-UDT read optimization (one libplctag call returns the packed buffer + client-side member decode) is deferred — needs the CIP Template Object class 0x6C reader which is blocked on the same libplctag 1.5.2 TagInfoPlcMapper gap that deferred the real @tags walker in PR 5. AbCipTemplateCache shipped in PR 5 is the drop-in point when that reader lands. Per-member reads today are N native round-trips; whole-UDT optimisation is a perf win, not a correctness gap. 7 new unit tests in AbCipUdtMemberTests — UDT fan-out to Variable children under folder with correct SecurityClassification + WriteIdempotent propagation, member reads via synthesised full-reference with correct per-member values, member writes routing to correct TagPath, member Writable=false flag correctly blocking IWritable, Structure without Members falls back to single Variable, empty Members list treated identically to null, UDT tags coexist with flat tags in the discovery output. Total AbCip unit tests now 130/130 passing (+7 from PR 5's 123). Modbus + other drivers untouched; full solution builds 0 errors. Unblocks PR 7 (ISubscribable) — the poll engine already works with member-level full references.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 17:09:06 -04:00
dd1389a8e7 Merge pull request (#112) - AbCip ITagDiscovery 2026-04-19 17:07:03 -04:00
5 changed files with 819 additions and 4 deletions

View File

@@ -20,17 +20,22 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip;
/// from native-heap growth that the CLR allocator can't see; it tears down every
/// <see cref="PlcTagHandle"/> and reconnects each device.</para>
/// </remarks>
public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, IDisposable, IAsyncDisposable
public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable,
IHostConnectivityProbe, IPerCallHostResolver, IDisposable, IAsyncDisposable
{
private readonly AbCipDriverOptions _options;
private readonly string _driverInstanceId;
private readonly IAbCipTagFactory _tagFactory;
private readonly IAbCipTagEnumeratorFactory _enumeratorFactory;
private readonly AbCipTemplateCache _templateCache = new();
private readonly PollGroupEngine _poll;
private readonly Dictionary<string, DeviceState> _devices = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, AbCipTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
private DriverHealth _health = new(DriverState.Unknown, null, null);
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
public AbCipDriver(AbCipDriverOptions options, string driverInstanceId,
IAbCipTagFactory? tagFactory = null,
IAbCipTagEnumeratorFactory? enumeratorFactory = null)
@@ -40,6 +45,10 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
_driverInstanceId = driverInstanceId;
_tagFactory = tagFactory ?? new LibplctagTagFactory();
_enumeratorFactory = enumeratorFactory ?? new EmptyAbCipTagEnumeratorFactory();
_poll = new PollGroupEngine(
reader: ReadAsync,
onChange: (handle, tagRef, snapshot) =>
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)));
}
/// <summary>Shared UDT template cache. Exposed for PR 6 (UDT reader) + diagnostics.</summary>
@@ -61,7 +70,35 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
var profile = AbCipPlcFamilyProfile.ForFamily(device.PlcFamily);
_devices[device.HostAddress] = new DeviceState(addr, device, profile);
}
foreach (var tag in _options.Tags) _tagsByName[tag.Name] = tag;
foreach (var tag in _options.Tags)
{
_tagsByName[tag.Name] = tag;
if (tag.DataType == AbCipDataType.Structure && tag.Members is { Count: > 0 })
{
foreach (var member in tag.Members)
{
var memberTag = new AbCipTagDefinition(
Name: $"{tag.Name}.{member.Name}",
DeviceHostAddress: tag.DeviceHostAddress,
TagPath: $"{tag.TagPath}.{member.Name}",
DataType: member.DataType,
Writable: member.Writable,
WriteIdempotent: member.WriteIdempotent);
_tagsByName[memberTag.Name] = memberTag;
}
}
}
// Probe loops — one per device when enabled + a ProbeTagPath is configured.
if (_options.Probe.Enabled && !string.IsNullOrWhiteSpace(_options.Probe.ProbeTagPath))
{
foreach (var state in _devices.Values)
{
state.ProbeCts = new CancellationTokenSource();
var ct = state.ProbeCts.Token;
_ = Task.Run(() => ProbeLoopAsync(state, ct), ct);
}
}
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
}
catch (Exception ex)
@@ -78,16 +115,117 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false);
}
public Task ShutdownAsync(CancellationToken cancellationToken)
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
await _poll.DisposeAsync().ConfigureAwait(false);
foreach (var state in _devices.Values)
{
try { state.ProbeCts?.Cancel(); } catch { }
state.ProbeCts?.Dispose();
state.ProbeCts = null;
state.DisposeHandles();
}
_devices.Clear();
_tagsByName.Clear();
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
// ---- ISubscribable (polling overlay via shared engine) ----
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval));
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
_poll.Unsubscribe(handle);
return Task.CompletedTask;
}
// ---- IHostConnectivityProbe ----
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses() =>
[.. _devices.Values.Select(s => new HostConnectivityStatus(s.Options.HostAddress, s.HostState, s.HostStateChangedUtc))];
private async Task ProbeLoopAsync(DeviceState state, CancellationToken ct)
{
var probeParams = new AbCipTagCreateParams(
Gateway: state.ParsedAddress.Gateway,
Port: state.ParsedAddress.Port,
CipPath: state.ParsedAddress.CipPath,
LibplctagPlcAttribute: state.Profile.LibplctagPlcAttribute,
TagName: _options.Probe.ProbeTagPath!,
Timeout: _options.Probe.Timeout);
IAbCipTagRuntime? probeRuntime = null;
while (!ct.IsCancellationRequested)
{
var success = false;
try
{
probeRuntime ??= _tagFactory.Create(probeParams);
// Lazy-init on first attempt; re-init after a transport failure has caused the
// native handle to be destroyed.
if (!state.ProbeInitialized)
{
await probeRuntime.InitializeAsync(ct).ConfigureAwait(false);
state.ProbeInitialized = true;
}
await probeRuntime.ReadAsync(ct).ConfigureAwait(false);
success = probeRuntime.GetStatus() == 0;
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
break;
}
catch
{
// Wire / init error — tear down the probe runtime so the next tick re-creates it.
try { probeRuntime?.Dispose(); } catch { }
probeRuntime = null;
state.ProbeInitialized = false;
}
TransitionDeviceState(state, success ? HostState.Running : HostState.Stopped);
try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { break; }
}
try { probeRuntime?.Dispose(); } catch { }
}
private void TransitionDeviceState(DeviceState state, HostState newState)
{
HostState old;
lock (state.ProbeLock)
{
old = state.HostState;
if (old == newState) return;
state.HostState = newState;
state.HostStateChangedUtc = DateTime.UtcNow;
}
OnHostStatusChanged?.Invoke(this,
new HostStatusChangedEventArgs(state.Options.HostAddress, old, newState));
}
// ---- IPerCallHostResolver ----
/// <summary>
/// Resolve the device host address for a given tag full-reference. Per plan decision #144
/// the Phase 6.1 resilience pipeline keys its bulkhead + breaker on
/// <c>(DriverInstanceId, hostName)</c> so multi-PLC drivers get per-device isolation —
/// one dead PLC trips only its own breaker. Unknown references fall back to the
/// first configured device's host address rather than throwing — the invoker handles the
/// mislookup at the capability level when the actual read returns BadNodeIdUnknown.
/// </summary>
public string ResolveHost(string fullReference)
{
if (_tagsByName.TryGetValue(fullReference, out var def))
return def.DeviceHostAddress;
return _options.Devices.FirstOrDefault()?.HostAddress ?? DriverInstanceId;
}
// ---- IReadable ----
/// <summary>
@@ -304,12 +442,37 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
var deviceLabel = device.DeviceName ?? device.HostAddress;
var deviceFolder = root.Folder(device.HostAddress, deviceLabel);
// Pre-declared tags — always emitted; the primary config path.
// Pre-declared tags — always emitted; the primary config path. UDT tags with declared
// Members fan out into a sub-folder + one Variable per member instead of a single
// Structure Variable (Structure has no useful scalar value + member-addressable paths
// are what downstream consumers actually want).
var preDeclared = _options.Tags.Where(t =>
string.Equals(t.DeviceHostAddress, device.HostAddress, StringComparison.OrdinalIgnoreCase));
foreach (var tag in preDeclared)
{
if (AbCipSystemTagFilter.IsSystemTag(tag.Name)) continue;
if (tag.DataType == AbCipDataType.Structure && tag.Members is { Count: > 0 })
{
var udtFolder = deviceFolder.Folder(tag.Name, tag.Name);
foreach (var member in tag.Members)
{
var memberFullName = $"{tag.Name}.{member.Name}";
udtFolder.Variable(member.Name, member.Name, new DriverAttributeInfo(
FullName: memberFullName,
DriverDataType: member.DataType.ToDriverDataType(),
IsArray: false,
ArrayDim: null,
SecurityClass: member.Writable
? SecurityClassification.Operate
: SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false,
WriteIdempotent: member.WriteIdempotent));
}
continue;
}
deviceFolder.Variable(tag.Name, tag.Name, ToAttributeInfo(tag));
}
@@ -393,6 +556,12 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
public AbCipDeviceOptions Options { get; } = options;
public AbCipPlcFamilyProfile Profile { get; } = profile;
public object ProbeLock { get; } = new();
public HostState HostState { get; set; } = HostState.Unknown;
public DateTime HostStateChangedUtc { get; set; } = DateTime.UtcNow;
public CancellationTokenSource? ProbeCts { get; set; }
public bool ProbeInitialized { get; set; }
public Dictionary<string, PlcTagHandle> TagHandles { get; } =
new(StringComparer.OrdinalIgnoreCase);

View File

@@ -54,12 +54,30 @@ public sealed record AbCipDeviceOptions(
/// <param name="DataType">Logix atomic type, or <see cref="AbCipDataType.Structure"/> for UDT-typed tags.</param>
/// <param name="Writable">When <c>true</c> and the tag's ExternalAccess permits writes, IWritable routes writes here.</param>
/// <param name="WriteIdempotent">Per plan decisions #44#45, #143 — safe to replay on write timeout. Default <c>false</c>.</param>
/// <param name="Members">For <see cref="AbCipDataType.Structure"/>-typed tags, the declared UDT
/// member layout. When supplied, discovery fans out the UDT into a folder + one Variable per
/// member (member TagPath = <c>{tag.TagPath}.{member.Name}</c>). When <c>null</c> on a Structure
/// tag, the driver treats it as a black-box and relies on downstream configuration to address
/// members individually via dotted <see cref="AbCipTagPath"/> syntax. Ignored for atomic types.</param>
public sealed record AbCipTagDefinition(
string Name,
string DeviceHostAddress,
string TagPath,
AbCipDataType DataType,
bool Writable = true,
bool WriteIdempotent = false,
IReadOnlyList<AbCipStructureMember>? Members = null);
/// <summary>
/// One declared member of a UDT tag. Name is the member identifier on the PLC (e.g. <c>Speed</c>,
/// <c>Status</c>), DataType is the atomic Logix type, Writable/WriteIdempotent mirror
/// <see cref="AbCipTagDefinition"/>. Declaration-driven — the real CIP Template Object reader
/// (class 0x6C) that would auto-discover member layouts lands as a follow-up PR.
/// </summary>
public sealed record AbCipStructureMember(
string Name,
AbCipDataType DataType,
bool Writable = true,
bool WriteIdempotent = false);
/// <summary>Which AB PLC family the device is — selects the profile applied to connection params.</summary>

View File

@@ -0,0 +1,227 @@
using System.Collections.Concurrent;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.AbCip;
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
[Trait("Category", "Unit")]
public sealed class AbCipHostProbeTests
{
[Fact]
public async Task GetHostStatuses_returns_one_entry_per_device()
{
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices =
[
new AbCipDeviceOptions("ab://10.0.0.5/1,0"),
new AbCipDeviceOptions("ab://10.0.0.6/1,0"),
],
Probe = new AbCipProbeOptions { Enabled = false },
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
var statuses = drv.GetHostStatuses();
statuses.Count.ShouldBe(2);
statuses.Select(s => s.HostName).ShouldBe(["ab://10.0.0.5/1,0", "ab://10.0.0.6/1,0"], ignoreOrder: true);
statuses.ShouldAllBe(s => s.State == HostState.Unknown);
}
[Fact]
public async Task Probe_with_successful_read_transitions_to_Running()
{
var factory = new FakeAbCipTagFactory { Customise = p => new FakeAbCipTag(p) { Status = 0 } };
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Probe = new AbCipProbeOptions
{
Enabled = true,
Interval = TimeSpan.FromMilliseconds(100),
Timeout = TimeSpan.FromMilliseconds(50),
ProbeTagPath = "@raw_cpu_type",
},
}, "drv-1", factory);
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await WaitForAsync(() => transitions.Any(t => t.NewState == HostState.Running), TimeSpan.FromSeconds(2));
transitions.Select(t => t.NewState).ShouldContain(HostState.Running);
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Running);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Probe_with_read_failure_transitions_to_Stopped()
{
var factory = new FakeAbCipTagFactory
{
Customise = p => new FakeAbCipTag(p) { ThrowOnRead = true },
};
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Probe = new AbCipProbeOptions
{
Enabled = true,
Interval = TimeSpan.FromMilliseconds(100),
Timeout = TimeSpan.FromMilliseconds(50),
ProbeTagPath = "@raw_cpu_type",
},
}, "drv-1", factory);
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await WaitForAsync(() => transitions.Any(t => t.NewState == HostState.Stopped), TimeSpan.FromSeconds(2));
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Stopped);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Probe_disabled_when_Enabled_is_false()
{
var factory = new FakeAbCipTagFactory();
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Probe = new AbCipProbeOptions { Enabled = false, ProbeTagPath = "@raw_cpu_type" },
}, "drv-1", factory);
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await Task.Delay(300);
transitions.ShouldBeEmpty();
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Unknown);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Probe_skipped_when_ProbeTagPath_is_null()
{
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Probe = new AbCipProbeOptions { Enabled = true, ProbeTagPath = null },
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
await Task.Delay(200);
drv.GetHostStatuses().Single().State.ShouldBe(HostState.Unknown);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Probe_loops_across_multiple_devices_independently()
{
var factory = new FakeAbCipTagFactory
{
// Device A returns ok, Device B throws on read.
Customise = p => p.Gateway == "10.0.0.5"
? new FakeAbCipTag(p)
: new FakeAbCipTag(p) { ThrowOnRead = true },
};
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices =
[
new AbCipDeviceOptions("ab://10.0.0.5/1,0"),
new AbCipDeviceOptions("ab://10.0.0.6/1,0"),
],
Probe = new AbCipProbeOptions
{
Enabled = true, Interval = TimeSpan.FromMilliseconds(100),
Timeout = TimeSpan.FromMilliseconds(50), ProbeTagPath = "@raw_cpu_type",
},
}, "drv-1", factory);
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await WaitForAsync(() => transitions.Count >= 2, TimeSpan.FromSeconds(3));
transitions.ShouldContain(t => t.HostName == "ab://10.0.0.5/1,0" && t.NewState == HostState.Running);
transitions.ShouldContain(t => t.HostName == "ab://10.0.0.6/1,0" && t.NewState == HostState.Stopped);
await drv.ShutdownAsync(CancellationToken.None);
}
// ---- IPerCallHostResolver ----
[Fact]
public async Task ResolveHost_returns_declared_device_for_known_tag()
{
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices =
[
new AbCipDeviceOptions("ab://10.0.0.5/1,0"),
new AbCipDeviceOptions("ab://10.0.0.6/1,0"),
],
Tags =
[
new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt),
new AbCipTagDefinition("B", "ab://10.0.0.6/1,0", "B", AbCipDataType.DInt),
],
Probe = new AbCipProbeOptions { Enabled = false },
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
drv.ResolveHost("A").ShouldBe("ab://10.0.0.5/1,0");
drv.ResolveHost("B").ShouldBe("ab://10.0.0.6/1,0");
}
[Fact]
public async Task ResolveHost_falls_back_to_first_device_for_unknown_reference()
{
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Probe = new AbCipProbeOptions { Enabled = false },
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
drv.ResolveHost("does-not-exist").ShouldBe("ab://10.0.0.5/1,0");
}
[Fact]
public async Task ResolveHost_falls_back_to_DriverInstanceId_when_no_devices()
{
var drv = new AbCipDriver(new AbCipDriverOptions(), "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
drv.ResolveHost("anything").ShouldBe("drv-1");
}
[Fact]
public async Task ResolveHost_for_UDT_member_walks_to_synthesised_definition()
{
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.7/1,0")],
Tags =
[
new AbCipTagDefinition("Motor1", "ab://10.0.0.7/1,0", "Motor1", AbCipDataType.Structure,
Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)]),
],
Probe = new AbCipProbeOptions { Enabled = false },
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
drv.ResolveHost("Motor1.Speed").ShouldBe("ab://10.0.0.7/1,0");
}
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (!condition() && DateTime.UtcNow < deadline)
await Task.Delay(20);
}
}

View File

@@ -0,0 +1,184 @@
using System.Collections.Concurrent;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.AbCip;
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
[Trait("Category", "Unit")]
public sealed class AbCipSubscriptionTests
{
private static (AbCipDriver drv, FakeAbCipTagFactory factory) NewDriver(params AbCipTagDefinition[] tags)
{
var factory = new FakeAbCipTagFactory();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Tags = tags,
}, "drv-1", factory);
return (drv, factory);
}
[Fact]
public async Task Initial_poll_raises_OnDataChange_for_every_tag()
{
var (drv, factory) = NewDriver(
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt),
new AbCipTagDefinition("Temp", "ab://10.0.0.5/1,0", "Temp", AbCipDataType.Real));
await drv.InitializeAsync("{}", CancellationToken.None);
factory.Customise = p => p.TagName switch
{
"Speed" => new FakeAbCipTag(p) { Value = 1800 },
"Temp" => new FakeAbCipTag(p) { Value = 72.5f },
_ => new FakeAbCipTag(p),
};
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Speed", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None);
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
events.Select(e => e.FullReference).ShouldContain("Speed");
events.Select(e => e.FullReference).ShouldContain("Temp");
await drv.UnsubscribeAsync(handle, CancellationToken.None);
}
[Fact]
public async Task Unchanged_value_raises_only_once()
{
var (drv, factory) = NewDriver(
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
await drv.InitializeAsync("{}", CancellationToken.None);
factory.Customise = p => new FakeAbCipTag(p) { Value = 1800 };
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await Task.Delay(500);
await drv.UnsubscribeAsync(handle, CancellationToken.None);
events.Count.ShouldBe(1);
}
[Fact]
public async Task Value_change_between_polls_raises_OnDataChange()
{
var (drv, factory) = NewDriver(
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
await drv.InitializeAsync("{}", CancellationToken.None);
var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 100 };
factory.Customise = _ => tagRef;
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
tagRef.Value = 200; // simulate PLC change
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
await drv.UnsubscribeAsync(handle, CancellationToken.None);
events.Count.ShouldBeGreaterThanOrEqualTo(2);
events.Last().Snapshot.Value.ShouldBe(200);
}
[Fact]
public async Task Unsubscribe_halts_polling()
{
var (drv, factory) = NewDriver(
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
await drv.InitializeAsync("{}", CancellationToken.None);
var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 1 };
factory.Customise = _ => tagRef;
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
await drv.UnsubscribeAsync(handle, CancellationToken.None);
var afterUnsub = events.Count;
tagRef.Value = 999;
await Task.Delay(400);
events.Count.ShouldBe(afterUnsub);
}
[Fact]
public async Task Interval_below_100ms_is_floored()
{
var (drv, factory) = NewDriver(
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
await drv.InitializeAsync("{}", CancellationToken.None);
factory.Customise = p => new FakeAbCipTag(p) { Value = 1 };
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(5), CancellationToken.None);
await Task.Delay(300);
await drv.UnsubscribeAsync(handle, CancellationToken.None);
// Value is stable → only the initial-data push fires; the 100 ms floor keeps polls sparse enough
// that no extra event is produced against a stable value.
events.Count.ShouldBe(1);
}
[Fact]
public async Task ShutdownAsync_cancels_active_subscriptions()
{
var (drv, factory) = NewDriver(
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
await drv.InitializeAsync("{}", CancellationToken.None);
factory.Customise = p => new FakeAbCipTag(p) { Value = 1 };
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
_ = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
await drv.ShutdownAsync(CancellationToken.None);
var afterShutdown = events.Count;
await Task.Delay(300);
events.Count.ShouldBe(afterShutdown);
}
[Fact]
public async Task Subscription_on_UDT_member_uses_synthesised_full_reference()
{
var factory = new FakeAbCipTagFactory();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Tags =
[
new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure,
Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)]),
],
}, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
factory.Customise = p => p.TagName == "Motor1.Speed"
? new FakeAbCipTag(p) { Value = 77 }
: new FakeAbCipTag(p);
var events = new ConcurrentQueue<DataChangeEventArgs>();
drv.OnDataChange += (_, e) => events.Enqueue(e);
var handle = await drv.SubscribeAsync(["Motor1.Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
events.First().Snapshot.Value.ShouldBe(77);
await drv.UnsubscribeAsync(handle, CancellationToken.None);
}
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (!condition() && DateTime.UtcNow < deadline)
await Task.Delay(20);
}
}

View File

@@ -0,0 +1,217 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.AbCip;
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
[Trait("Category", "Unit")]
public sealed class AbCipUdtMemberTests
{
[Fact]
public async Task UDT_with_declared_members_fans_out_to_member_variables()
{
var builder = new RecordingBuilder();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Tags =
[
new AbCipTagDefinition(
Name: "Motor1",
DeviceHostAddress: "ab://10.0.0.5/1,0",
TagPath: "Motor1",
DataType: AbCipDataType.Structure,
Members:
[
new AbCipStructureMember("Speed", AbCipDataType.DInt),
new AbCipStructureMember("Running", AbCipDataType.Bool, Writable: false),
new AbCipStructureMember("SetPoint", AbCipDataType.Real, WriteIdempotent: true),
]),
],
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
await drv.DiscoverAsync(builder, CancellationToken.None);
builder.Folders.ShouldContain(f => f.BrowseName == "Motor1");
var variables = builder.Variables.Select(v => (v.BrowseName, v.Info.FullName)).ToList();
variables.ShouldContain(("Speed", "Motor1.Speed"));
variables.ShouldContain(("Running", "Motor1.Running"));
variables.ShouldContain(("SetPoint", "Motor1.SetPoint"));
builder.Variables.Single(v => v.BrowseName == "Running").Info.SecurityClass
.ShouldBe(SecurityClassification.ViewOnly);
builder.Variables.Single(v => v.BrowseName == "SetPoint").Info.WriteIdempotent
.ShouldBeTrue();
}
[Fact]
public async Task UDT_members_resolvable_for_read_via_synthesised_full_reference()
{
var factory = new FakeAbCipTagFactory
{
Customise = p => p.TagName switch
{
"Motor1.Speed" => new FakeAbCipTag(p) { Value = 1800 },
"Motor1.Running" => new FakeAbCipTag(p) { Value = true },
_ => new FakeAbCipTag(p),
},
};
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Tags =
[
new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure,
Members:
[
new AbCipStructureMember("Speed", AbCipDataType.DInt),
new AbCipStructureMember("Running", AbCipDataType.Bool),
]),
],
}, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var snapshots = await drv.ReadAsync(["Motor1.Speed", "Motor1.Running"], CancellationToken.None);
snapshots[0].Value.ShouldBe(1800);
snapshots[0].StatusCode.ShouldBe(AbCipStatusMapper.Good);
snapshots[1].Value.ShouldBe(true);
snapshots[1].StatusCode.ShouldBe(AbCipStatusMapper.Good);
}
[Fact]
public async Task UDT_member_write_routes_through_synthesised_tagpath()
{
var factory = new FakeAbCipTagFactory();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Tags =
[
new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure,
Members:
[
new AbCipStructureMember("SetPoint", AbCipDataType.Real),
]),
],
}, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var results = await drv.WriteAsync(
[new WriteRequest("Motor1.SetPoint", 42.5f)], CancellationToken.None);
results.Single().StatusCode.ShouldBe(AbCipStatusMapper.Good);
factory.Tags["Motor1.SetPoint"].Value.ShouldBe(42.5f);
}
[Fact]
public async Task UDT_member_read_write_honours_member_Writable_flag()
{
var factory = new FakeAbCipTagFactory();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Tags =
[
new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure,
Members:
[
new AbCipStructureMember("Status", AbCipDataType.DInt, Writable: false),
]),
],
}, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var results = await drv.WriteAsync(
[new WriteRequest("Motor1.Status", 1)], CancellationToken.None);
results.Single().StatusCode.ShouldBe(AbCipStatusMapper.BadNotWritable);
}
[Fact]
public async Task Structure_tag_without_members_is_emitted_as_single_variable()
{
// Fallback path: a Structure tag with no declared Members still appears as a Variable so
// downstream configuration can address it manually. This matches the "black box" note in
// AbCipTagDefinition's docstring.
var builder = new RecordingBuilder();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Tags = [new AbCipTagDefinition("OpaqueUdt", "ab://10.0.0.5/1,0", "OpaqueUdt", AbCipDataType.Structure)],
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
await drv.DiscoverAsync(builder, CancellationToken.None);
builder.Variables.ShouldContain(v => v.BrowseName == "OpaqueUdt");
builder.Folders.ShouldNotContain(f => f.BrowseName == "OpaqueUdt");
}
[Fact]
public async Task Empty_Members_list_is_treated_like_null()
{
var builder = new RecordingBuilder();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Tags = [new AbCipTagDefinition("EmptyUdt", "ab://10.0.0.5/1,0", "E", AbCipDataType.Structure, Members: [])],
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
await drv.DiscoverAsync(builder, CancellationToken.None);
builder.Folders.ShouldNotContain(f => f.BrowseName == "EmptyUdt");
builder.Variables.ShouldContain(v => v.BrowseName == "EmptyUdt");
}
[Fact]
public async Task UDT_members_mixed_with_flat_tags_coexist()
{
var builder = new RecordingBuilder();
var drv = new AbCipDriver(new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
Tags =
[
new AbCipTagDefinition("FlatA", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt),
new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure,
Members:
[
new AbCipStructureMember("Speed", AbCipDataType.DInt),
]),
new AbCipTagDefinition("FlatB", "ab://10.0.0.5/1,0", "B", AbCipDataType.Real),
],
}, "drv-1");
await drv.InitializeAsync("{}", CancellationToken.None);
await drv.DiscoverAsync(builder, CancellationToken.None);
builder.Variables.Select(v => v.BrowseName).ShouldBe(["FlatA", "Speed", "FlatB"], ignoreOrder: true);
}
// ---- helpers ----
private sealed class RecordingBuilder : IAddressSpaceBuilder
{
public List<(string BrowseName, string DisplayName)> Folders { get; } = new();
public List<(string BrowseName, DriverAttributeInfo Info)> Variables { get; } = new();
public IAddressSpaceBuilder Folder(string browseName, string displayName)
{ Folders.Add((browseName, displayName)); return this; }
public IVariableHandle Variable(string browseName, string displayName, DriverAttributeInfo info)
{ Variables.Add((browseName, info)); return new Handle(info.FullName); }
public void AddProperty(string _, DriverDataType __, object? ___) { }
private sealed class Handle(string fullRef) : IVariableHandle
{
public string FullReference => fullRef;
public IAlarmConditionSink MarkAsAlarmCondition(AlarmConditionInfo info) => new NullSink();
}
private sealed class NullSink : IAlarmConditionSink { public void OnTransition(AlarmEventArgs args) { } }
}
}