Auto: opcuaclient-2 — per-tag advanced subscription tuning

Closes #274
This commit is contained in:
Joseph Doherty
2026-04-25 15:25:20 -04:00
parent bf200e813e
commit fae00749ca
4 changed files with 400 additions and 25 deletions

View File

@@ -25,7 +25,7 @@ public enum DriverCapability
/// <summary><see cref="ITagDiscovery.DiscoverAsync"/>. Retries by default.</summary>
Discover,
/// <summary><see cref="ISubscribable.SubscribeAsync"/> and unsubscribe. Retries by default.</summary>
/// <summary><see cref="ISubscribable.SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/> and unsubscribe. Retries by default.</summary>
Subscribe,
/// <summary><see cref="IHostConnectivityProbe"/> probe loop. Retries by default.</summary>

View File

@@ -20,7 +20,29 @@ public interface ISubscribable
TimeSpan publishingInterval,
CancellationToken cancellationToken);
/// <summary>Cancel a subscription returned by <see cref="SubscribeAsync"/>.</summary>
/// <summary>
/// Subscribe to data changes with per-tag advanced tuning (sampling interval, queue
/// size, monitoring mode, deadband filter). Drivers that don't have a native concept
/// of these knobs (e.g. polled drivers like Modbus) MAY ignore the per-tag knobs and
/// delegate to the simple
/// <see cref="SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/>
/// overload — the default implementation does exactly that, so existing implementers
/// compile unchanged.
/// </summary>
/// <param name="tags">Per-tag subscription specs. <see cref="MonitoredTagSpec.TagName"/> is the driver-side full reference.</param>
/// <param name="publishingInterval">Subscription publishing interval, applied to the whole batch.</param>
/// <param name="cancellationToken">Cancellation.</param>
/// <returns>Opaque subscription handle for <see cref="UnsubscribeAsync"/>.</returns>
Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<MonitoredTagSpec> tags,
TimeSpan publishingInterval,
CancellationToken cancellationToken)
=> SubscribeAsync(
tags.Select(t => t.TagName).ToList(),
publishingInterval,
cancellationToken);
/// <summary>Cancel a subscription returned by either <c>SubscribeAsync</c> overload.</summary>
Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken);
/// <summary>
@@ -30,7 +52,7 @@ public interface ISubscribable
event EventHandler<DataChangeEventArgs>? OnDataChange;
}
/// <summary>Opaque subscription identity returned by <see cref="ISubscribable.SubscribeAsync"/>.</summary>
/// <summary>Opaque subscription identity returned by <see cref="ISubscribable.SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/>.</summary>
public interface ISubscriptionHandle
{
/// <summary>Driver-internal subscription identifier (for diagnostics + post-mortem).</summary>
@@ -38,10 +60,99 @@ public interface ISubscriptionHandle
}
/// <summary>Event payload for <see cref="ISubscribable.OnDataChange"/>.</summary>
/// <param name="SubscriptionHandle">The handle returned by the original <see cref="ISubscribable.SubscribeAsync"/> call.</param>
/// <param name="SubscriptionHandle">The handle returned by the original <see cref="ISubscribable.SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/> call.</param>
/// <param name="FullReference">Driver-side full reference of the changed attribute.</param>
/// <param name="Snapshot">New value + quality + timestamps.</param>
public sealed record DataChangeEventArgs(
ISubscriptionHandle SubscriptionHandle,
string FullReference,
DataValueSnapshot Snapshot);
/// <summary>
/// Per-tag subscription tuning. Maps onto OPC UA <c>MonitoredItem</c> properties for the
/// OpcUaClient driver; non-OPC-UA drivers either map a subset (e.g. ADS picks up
/// <see cref="SamplingIntervalMs"/>) or ignore the knobs entirely and fall back to the
/// simple <see cref="ISubscribable.SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/>.
/// </summary>
/// <param name="TagName">Driver-side full reference (e.g. <c>ns=2;s=Foo</c> for OPC UA).</param>
/// <param name="SamplingIntervalMs">
/// Server-side sampling rate in milliseconds. <c>null</c> = use the publishing interval.
/// Sub-publish-interval values let a server sample faster than it publishes (queue +
/// coalesce), useful for events that change between publish ticks.
/// </param>
/// <param name="QueueSize">Server-side notification queue depth. <c>null</c> = driver default (1).</param>
/// <param name="DiscardOldest">
/// When the server-side queue overflows: <c>true</c> drops oldest, <c>false</c> drops newest.
/// <c>null</c> = driver default (true — preserve recency).
/// </param>
/// <param name="MonitoringMode">
/// Per-item monitoring mode. <c>Reporting</c> = sample + publish, <c>Sampling</c> = sample
/// but suppress publishing (useful with triggering), <c>Disabled</c> = neither.
/// </param>
/// <param name="DataChangeFilter">
/// Optional data-change filter (deadband + trigger semantics). <c>null</c> = no filter
/// (every change publishes regardless of magnitude).
/// </param>
public sealed record MonitoredTagSpec(
string TagName,
double? SamplingIntervalMs = null,
uint? QueueSize = null,
bool? DiscardOldest = null,
SubscriptionMonitoringMode? MonitoringMode = null,
DataChangeFilterSpec? DataChangeFilter = null);
/// <summary>
/// OPC UA <c>DataChangeFilter</c> spec. Mirrors the OPC UA Part 4 §7.17.2 structure but
/// lives in Core.Abstractions so non-OpcUaClient drivers (e.g. Modbus, S7) can accept it
/// as metadata even if they ignore the deadband mechanics.
/// </summary>
/// <param name="Trigger">When to fire: status only / status+value / status+value+timestamp.</param>
/// <param name="DeadbandType">Deadband mode: none / absolute (engineering units) / percent of EURange.</param>
/// <param name="DeadbandValue">
/// Magnitude of the deadband. For <see cref="OtOpcUa.Core.Abstractions.DeadbandType.Absolute"/>
/// this is in the variable's engineering units; for <see cref="OtOpcUa.Core.Abstractions.DeadbandType.Percent"/>
/// it's a 0..100 percentage of EURange (server returns BadFilterNotAllowed if EURange isn't set).
/// </param>
public sealed record DataChangeFilterSpec(
DataChangeTrigger Trigger,
DeadbandType DeadbandType,
double DeadbandValue);
/// <summary>
/// OPC UA <c>DataChangeTrigger</c> values. Wraps the SDK enum so Core.Abstractions doesn't
/// leak an OPC-UA-stack reference into every driver project.
/// </summary>
public enum DataChangeTrigger
{
/// <summary>Fire only when StatusCode changes.</summary>
Status = 0,
/// <summary>Fire when StatusCode or Value changes (the OPC UA default).</summary>
StatusValue = 1,
/// <summary>Fire when StatusCode, Value, or SourceTimestamp changes.</summary>
StatusValueTimestamp = 2,
}
/// <summary>OPC UA deadband-filter modes.</summary>
public enum DeadbandType
{
/// <summary>No deadband — every value change publishes.</summary>
None = 0,
/// <summary>Deadband expressed in the variable's engineering units.</summary>
Absolute = 1,
/// <summary>Deadband expressed as 0..100 percent of the variable's EURange.</summary>
Percent = 2,
}
/// <summary>
/// Per-item subscription monitoring mode. Wraps the OPC UA SDK's <c>MonitoringMode</c>
/// so Core.Abstractions stays SDK-free.
/// </summary>
public enum SubscriptionMonitoringMode
{
/// <summary>Item is created but neither sampling nor publishing.</summary>
Disabled = 0,
/// <summary>Item samples and queues but does not publish (useful with triggering).</summary>
Sampling = 1,
/// <summary>Item samples and publishes — the OPC UA default.</summary>
Reporting = 2,
}

View File

@@ -851,8 +851,20 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
// ---- ISubscribable ----
public async Task<ISubscriptionHandle> SubscribeAsync(
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
// Route the simple-string overload through the per-tag overload with all knobs at
// their defaults. Single code path for subscription create — keeps the wire-side
// identical for callers that don't need per-tag tuning.
var specs = new MonitoredTagSpec[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
specs[i] = new MonitoredTagSpec(fullReferences[i]);
return SubscribeAsync(specs, publishingInterval, cancellationToken);
}
public async Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<MonitoredTagSpec> tags, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var session = RequireSession();
var id = Interlocked.Increment(ref _nextSubscriptionId);
@@ -885,29 +897,28 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
session.AddSubscription(subscription);
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
foreach (var fullRef in fullReferences)
foreach (var spec in tags)
{
if (!TryParseNodeId(session, fullRef, out var nodeId)) continue;
// The tag string is routed through MonitoredItem.Handle so the Notification
// handler can identify which tag changed without an extra lookup.
var item = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
{
DisplayName = fullRef,
StartNodeId = nodeId,
AttributeId = Attributes.Value,
MonitoringMode = MonitoringMode.Reporting,
SamplingInterval = intervalMs,
QueueSize = 1,
DiscardOldest = true,
})
{
Handle = fullRef,
};
item.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args);
subscription.AddItem(item);
if (!TryParseNodeId(session, spec.TagName, out var nodeId)) continue;
var monItem = BuildMonitoredItem(spec, nodeId, intervalMs);
monItem.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args);
subscription.AddItem(monItem);
}
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
try
{
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
}
catch (Opc.Ua.ServiceResultException sre)
{
// PercentDeadband requires the server to expose EURange on the variable; if
// it isn't set the server returns BadFilterNotAllowed during item creation.
// We swallow the exception here so other items in the batch still get created
// — per-item failure surfaces through MonitoredItem.Status.Error rather than
// tearing down the whole subscription.
if (sre.StatusCode != StatusCodes.BadFilterNotAllowed) throw;
}
_subscriptions[id] = new RemoteSubscription(subscription, handle);
}
finally { _gate.Release(); }
@@ -915,6 +926,84 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
return handle;
}
/// <summary>
/// Map a <see cref="MonitoredTagSpec"/> to a SDK <see cref="MonitoredItem"/> with the
/// per-tag knobs applied. Defaults match the original hard-coded values
/// (Reporting / SamplingInterval=publishInterval / QueueSize=1 / DiscardOldest=true)
/// so a spec with all knobs <c>null</c> behaves identically to the legacy path.
/// </summary>
internal static MonitoredItem BuildMonitoredItem(MonitoredTagSpec spec, NodeId nodeId, int defaultIntervalMs)
{
var sampling = spec.SamplingIntervalMs.HasValue ? (int)spec.SamplingIntervalMs.Value : defaultIntervalMs;
var queueSize = spec.QueueSize ?? 1u;
var discardOldest = spec.DiscardOldest ?? true;
var monitoringMode = spec.MonitoringMode is { } mm ? MapMonitoringMode(mm) : MonitoringMode.Reporting;
var filter = BuildDataChangeFilter(spec.DataChangeFilter);
var options = new MonitoredItemOptions
{
DisplayName = spec.TagName,
StartNodeId = nodeId,
AttributeId = Attributes.Value,
MonitoringMode = monitoringMode,
SamplingInterval = sampling,
QueueSize = queueSize,
DiscardOldest = discardOldest,
Filter = filter,
};
return new MonitoredItem(telemetry: null!, options)
{
// The tag string is routed through MonitoredItem.Handle so the Notification
// handler can identify which tag changed without an extra lookup.
Handle = spec.TagName,
};
}
/// <summary>
/// Build the OPC UA <see cref="DataChangeFilter"/> from a <see cref="DataChangeFilterSpec"/>,
/// or return <c>null</c> if the caller didn't supply a filter. PercentDeadband requires
/// server-side EURange — if the server rejects with BadFilterNotAllowed, the caller's
/// <c>SubscribeAsync</c> swallows it so other items in the batch still get created.
/// </summary>
internal static DataChangeFilter? BuildDataChangeFilter(DataChangeFilterSpec? spec)
{
if (spec is null) return null;
return new DataChangeFilter
{
Trigger = MapTrigger(spec.Trigger),
DeadbandType = (uint)MapDeadbandType(spec.DeadbandType),
DeadbandValue = spec.DeadbandValue,
};
}
/// <summary>Map our SDK-free <see cref="SubscriptionMonitoringMode"/> to the OPC UA SDK's enum.</summary>
internal static MonitoringMode MapMonitoringMode(SubscriptionMonitoringMode mode) => mode switch
{
SubscriptionMonitoringMode.Disabled => MonitoringMode.Disabled,
SubscriptionMonitoringMode.Sampling => MonitoringMode.Sampling,
SubscriptionMonitoringMode.Reporting => MonitoringMode.Reporting,
_ => MonitoringMode.Reporting,
};
/// <summary>Map our <see cref="Core.Abstractions.DataChangeTrigger"/> to the SDK enum.</summary>
internal static Opc.Ua.DataChangeTrigger MapTrigger(Core.Abstractions.DataChangeTrigger trigger) => trigger switch
{
Core.Abstractions.DataChangeTrigger.Status => Opc.Ua.DataChangeTrigger.Status,
Core.Abstractions.DataChangeTrigger.StatusValue => Opc.Ua.DataChangeTrigger.StatusValue,
Core.Abstractions.DataChangeTrigger.StatusValueTimestamp => Opc.Ua.DataChangeTrigger.StatusValueTimestamp,
_ => Opc.Ua.DataChangeTrigger.StatusValue,
};
/// <summary>Map our <see cref="Core.Abstractions.DeadbandType"/> to the SDK enum.</summary>
internal static Opc.Ua.DeadbandType MapDeadbandType(Core.Abstractions.DeadbandType type) => type switch
{
Core.Abstractions.DeadbandType.None => Opc.Ua.DeadbandType.None,
Core.Abstractions.DeadbandType.Absolute => Opc.Ua.DeadbandType.Absolute,
Core.Abstractions.DeadbandType.Percent => Opc.Ua.DeadbandType.Percent,
_ => Opc.Ua.DeadbandType.None,
};
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (handle is not OpcUaSubscriptionHandle h) return;

View File

@@ -0,0 +1,175 @@
using Opc.Ua;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
/// <summary>
/// Unit tests for <see cref="MonitoredTagSpec"/> -> SDK <c>MonitoredItem</c> mapping.
/// Assertion-only — no live SDK session required, so the tests run on every CI without
/// a real OPC UA server fixture.
/// </summary>
[Trait("Category", "Unit")]
public sealed class OpcUaClientMonitoredTagSpecTests
{
private static readonly NodeId SampleNodeId = new("Demo", 2);
[Fact]
public void BuildMonitoredItem_with_all_defaults_matches_legacy_hard_coded_values()
{
// Spec with every per-tag knob null should behave identically to the legacy
// string-only SubscribeAsync path: Reporting / SamplingInterval=publishInterval /
// QueueSize=1 / DiscardOldest=true / no filter.
var spec = new MonitoredTagSpec("ns=2;s=Demo");
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
item.SamplingInterval.ShouldBe(250);
item.QueueSize.ShouldBe(1u);
item.DiscardOldest.ShouldBeTrue();
item.MonitoringMode.ShouldBe(MonitoringMode.Reporting);
item.Filter.ShouldBeNull();
item.Handle.ShouldBe("ns=2;s=Demo",
"the tag string is routed through Handle so the Notification callback can identify the changed tag without re-parsing DisplayName");
}
[Fact]
public void BuildMonitoredItem_applies_per_tag_sampling_interval_independent_of_publish_interval()
{
// Per-tag SamplingInterval lets the server sample faster than it publishes — useful
// for events that change between publish ticks. If the spec sets it explicitly, the
// mapping uses that value, not the publish-interval default.
var spec = new MonitoredTagSpec("ns=2;s=Fast", SamplingIntervalMs: 50);
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 1000);
item.SamplingInterval.ShouldBe(50);
}
[Fact]
public void BuildMonitoredItem_applies_queue_size_and_discard_oldest_overrides()
{
var spec = new MonitoredTagSpec("ns=2;s=DeepQueue", QueueSize: 100, DiscardOldest: false);
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
item.QueueSize.ShouldBe(100u);
item.DiscardOldest.ShouldBeFalse(
"discard-oldest=false preserves earliest values — useful for audit-trail subscriptions where the first overflow sample is the most diagnostic");
}
[Theory]
[InlineData(SubscriptionMonitoringMode.Disabled, MonitoringMode.Disabled)]
[InlineData(SubscriptionMonitoringMode.Sampling, MonitoringMode.Sampling)]
[InlineData(SubscriptionMonitoringMode.Reporting, MonitoringMode.Reporting)]
public void BuildMonitoredItem_maps_each_monitoring_mode(SubscriptionMonitoringMode input, MonitoringMode expected)
{
var spec = new MonitoredTagSpec("ns=2;s=Mode", MonitoringMode: input);
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
item.MonitoringMode.ShouldBe(expected);
}
[Fact]
public void BuildMonitoredItem_with_absolute_deadband_emits_DataChangeFilter()
{
var spec = new MonitoredTagSpec(
"ns=2;s=Analog",
DataChangeFilter: new DataChangeFilterSpec(
Core.Abstractions.DataChangeTrigger.StatusValue,
Core.Abstractions.DeadbandType.Absolute,
DeadbandValue: 0.5));
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
var filter = item.Filter.ShouldBeOfType<DataChangeFilter>();
filter.Trigger.ShouldBe(Opc.Ua.DataChangeTrigger.StatusValue);
filter.DeadbandType.ShouldBe((uint)Opc.Ua.DeadbandType.Absolute);
filter.DeadbandValue.ShouldBe(0.5);
}
[Fact]
public void BuildMonitoredItem_with_percent_deadband_emits_percent_filter()
{
// PercentDeadband is calculated server-side as a fraction of EURange; the driver
// emits the filter unconditionally and lets the server return BadFilterNotAllowed
// if EURange isn't set on the variable. SubscribeAsync's catch-block swallows that
// status so other items in the batch still get created.
var spec = new MonitoredTagSpec(
"ns=2;s=Pct",
DataChangeFilter: new DataChangeFilterSpec(
Core.Abstractions.DataChangeTrigger.StatusValueTimestamp,
Core.Abstractions.DeadbandType.Percent,
DeadbandValue: 5.0));
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
var filter = item.Filter.ShouldBeOfType<DataChangeFilter>();
filter.Trigger.ShouldBe(Opc.Ua.DataChangeTrigger.StatusValueTimestamp);
filter.DeadbandType.ShouldBe((uint)Opc.Ua.DeadbandType.Percent);
filter.DeadbandValue.ShouldBe(5.0);
}
[Theory]
[InlineData(Core.Abstractions.DataChangeTrigger.Status, Opc.Ua.DataChangeTrigger.Status)]
[InlineData(Core.Abstractions.DataChangeTrigger.StatusValue, Opc.Ua.DataChangeTrigger.StatusValue)]
[InlineData(Core.Abstractions.DataChangeTrigger.StatusValueTimestamp, Opc.Ua.DataChangeTrigger.StatusValueTimestamp)]
public void MapTrigger_round_trips_each_enum_value(
Core.Abstractions.DataChangeTrigger input, Opc.Ua.DataChangeTrigger expected)
=> OpcUaClientDriver.MapTrigger(input).ShouldBe(expected);
[Theory]
[InlineData(Core.Abstractions.DeadbandType.None, Opc.Ua.DeadbandType.None)]
[InlineData(Core.Abstractions.DeadbandType.Absolute, Opc.Ua.DeadbandType.Absolute)]
[InlineData(Core.Abstractions.DeadbandType.Percent, Opc.Ua.DeadbandType.Percent)]
public void MapDeadbandType_round_trips_each_enum_value(
Core.Abstractions.DeadbandType input, Opc.Ua.DeadbandType expected)
=> OpcUaClientDriver.MapDeadbandType(input).ShouldBe(expected);
[Fact]
public async Task DefaultInterfaceImplementation_routes_through_legacy_overload()
{
// ISubscribable's default interface impl of the per-tag overload delegates to the
// simple-string overload, ignoring per-tag knobs. Drivers that DON'T override the
// new overload (Modbus / S7 / Galaxy / TwinCAT / FOCAS / AbCip / AbLegacy) still
// accept MonitoredTagSpec lists and just pass through the tag names — back-compat
// for ISubscribable consumers.
var stub = new StubSubscribableDriver();
var specs = new[]
{
new MonitoredTagSpec("Tag1", SamplingIntervalMs: 50, QueueSize: 5),
new MonitoredTagSpec("Tag2", DataChangeFilter: new DataChangeFilterSpec(
Core.Abstractions.DataChangeTrigger.StatusValue,
Core.Abstractions.DeadbandType.Absolute,
1.0)),
};
ISubscribable iface = stub;
_ = await iface.SubscribeAsync(specs, TimeSpan.FromMilliseconds(250), TestContext.Current.CancellationToken);
stub.LastTagNames.ShouldBe(["Tag1", "Tag2"]);
stub.LastPublishingInterval.ShouldBe(TimeSpan.FromMilliseconds(250));
}
/// <summary>
/// Test-double <see cref="ISubscribable"/> that records whatever the legacy
/// <c>SubscribeAsync(IReadOnlyList&lt;string&gt;, ...)</c> overload was called with.
/// Used to verify the default-impl per-tag overload routes correctly without needing
/// a real OPC UA session.
/// </summary>
private sealed class StubSubscribableDriver : ISubscribable
{
public IReadOnlyList<string>? LastTagNames { get; private set; }
public TimeSpan LastPublishingInterval { get; private set; }
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
LastTagNames = fullReferences;
LastPublishingInterval = publishingInterval;
return Task.FromResult<ISubscriptionHandle>(new StubHandle());
}
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) => Task.CompletedTask;
#pragma warning disable CS0067 // event never used — the test only asserts the SubscribeAsync call routing
public event EventHandler<DataChangeEventArgs>? OnDataChange;
#pragma warning restore CS0067
}
private sealed record StubHandle() : ISubscriptionHandle
{
public string DiagnosticId => "stub";
}
}