diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs
index 79dfeac..2366f28 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs
@@ -25,7 +25,7 @@ public enum DriverCapability
/// . Retries by default.
Discover,
- /// and unsubscribe. Retries by default.
+ /// and unsubscribe. Retries by default.
Subscribe,
/// probe loop. Retries by default.
diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs
index 2de7dbe..fdefce4 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs
@@ -20,7 +20,29 @@ public interface ISubscribable
TimeSpan publishingInterval,
CancellationToken cancellationToken);
- /// Cancel a subscription returned by .
+ ///
+ /// Subscribe to data changes with per-tag advanced tuning (sampling interval, queue
+ /// size, monitoring mode, deadband filter). Drivers that don't have a native concept
+ /// of these knobs (e.g. polled drivers like Modbus) MAY ignore the per-tag knobs and
+ /// delegate to the simple
+ ///
+ /// overload — the default implementation does exactly that, so existing implementers
+ /// compile unchanged.
+ ///
+ /// Per-tag subscription specs. is the driver-side full reference.
+ /// Subscription publishing interval, applied to the whole batch.
+ /// Cancellation.
+ /// Opaque subscription handle for .
+ Task SubscribeAsync(
+ IReadOnlyList tags,
+ TimeSpan publishingInterval,
+ CancellationToken cancellationToken)
+ => SubscribeAsync(
+ tags.Select(t => t.TagName).ToList(),
+ publishingInterval,
+ cancellationToken);
+
+ /// Cancel a subscription returned by either SubscribeAsync overload.
Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken);
///
@@ -30,7 +52,7 @@ public interface ISubscribable
event EventHandler? OnDataChange;
}
-/// Opaque subscription identity returned by .
+/// Opaque subscription identity returned by .
public interface ISubscriptionHandle
{
/// Driver-internal subscription identifier (for diagnostics + post-mortem).
@@ -38,10 +60,99 @@ public interface ISubscriptionHandle
}
/// Event payload for .
-/// The handle returned by the original call.
+/// The handle returned by the original call.
/// Driver-side full reference of the changed attribute.
/// New value + quality + timestamps.
public sealed record DataChangeEventArgs(
ISubscriptionHandle SubscriptionHandle,
string FullReference,
DataValueSnapshot Snapshot);
+
+///
+/// Per-tag subscription tuning. Maps onto OPC UA MonitoredItem properties for the
+/// OpcUaClient driver; non-OPC-UA drivers either map a subset (e.g. ADS picks up
+/// ) or ignore the knobs entirely and fall back to the
+/// simple .
+///
+/// Driver-side full reference (e.g. ns=2;s=Foo for OPC UA).
+///
+/// Server-side sampling rate in milliseconds. null = use the publishing interval.
+/// Sub-publish-interval values let a server sample faster than it publishes (queue +
+/// coalesce), useful for events that change between publish ticks.
+///
+/// Server-side notification queue depth. null = driver default (1).
+///
+/// When the server-side queue overflows: true drops oldest, false drops newest.
+/// null = driver default (true — preserve recency).
+///
+///
+/// Per-item monitoring mode. Reporting = sample + publish, Sampling = sample
+/// but suppress publishing (useful with triggering), Disabled = neither.
+///
+///
+/// Optional data-change filter (deadband + trigger semantics). null = no filter
+/// (every change publishes regardless of magnitude).
+///
+public sealed record MonitoredTagSpec(
+ string TagName,
+ double? SamplingIntervalMs = null,
+ uint? QueueSize = null,
+ bool? DiscardOldest = null,
+ SubscriptionMonitoringMode? MonitoringMode = null,
+ DataChangeFilterSpec? DataChangeFilter = null);
+
+///
+/// OPC UA DataChangeFilter spec. Mirrors the OPC UA Part 4 §7.17.2 structure but
+/// lives in Core.Abstractions so non-OpcUaClient drivers (e.g. Modbus, S7) can accept it
+/// as metadata even if they ignore the deadband mechanics.
+///
+/// When to fire: status only / status+value / status+value+timestamp.
+/// Deadband mode: none / absolute (engineering units) / percent of EURange.
+///
+/// Magnitude of the deadband. For
+/// this is in the variable's engineering units; for
+/// it's a 0..100 percentage of EURange (server returns BadFilterNotAllowed if EURange isn't set).
+///
+public sealed record DataChangeFilterSpec(
+ DataChangeTrigger Trigger,
+ DeadbandType DeadbandType,
+ double DeadbandValue);
+
+///
+/// OPC UA DataChangeTrigger values. Wraps the SDK enum so Core.Abstractions doesn't
+/// leak an OPC-UA-stack reference into every driver project.
+///
+public enum DataChangeTrigger
+{
+ /// Fire only when StatusCode changes.
+ Status = 0,
+ /// Fire when StatusCode or Value changes (the OPC UA default).
+ StatusValue = 1,
+ /// Fire when StatusCode, Value, or SourceTimestamp changes.
+ StatusValueTimestamp = 2,
+}
+
+/// OPC UA deadband-filter modes.
+public enum DeadbandType
+{
+ /// No deadband — every value change publishes.
+ None = 0,
+ /// Deadband expressed in the variable's engineering units.
+ Absolute = 1,
+ /// Deadband expressed as 0..100 percent of the variable's EURange.
+ Percent = 2,
+}
+
+///
+/// Per-item subscription monitoring mode. Wraps the OPC UA SDK's MonitoringMode
+/// so Core.Abstractions stays SDK-free.
+///
+public enum SubscriptionMonitoringMode
+{
+ /// Item is created but neither sampling nor publishing.
+ Disabled = 0,
+ /// Item samples and queues but does not publish (useful with triggering).
+ Sampling = 1,
+ /// Item samples and publishes — the OPC UA default.
+ Reporting = 2,
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
index b010171..fa977c3 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
@@ -851,8 +851,20 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
// ---- ISubscribable ----
- public async Task SubscribeAsync(
+ public Task SubscribeAsync(
IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
+ {
+ // Route the simple-string overload through the per-tag overload with all knobs at
+ // their defaults. Single code path for subscription create — keeps the wire-side
+ // identical for callers that don't need per-tag tuning.
+ var specs = new MonitoredTagSpec[fullReferences.Count];
+ for (var i = 0; i < fullReferences.Count; i++)
+ specs[i] = new MonitoredTagSpec(fullReferences[i]);
+ return SubscribeAsync(specs, publishingInterval, cancellationToken);
+ }
+
+ public async Task SubscribeAsync(
+ IReadOnlyList tags, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var session = RequireSession();
var id = Interlocked.Increment(ref _nextSubscriptionId);
@@ -885,29 +897,28 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
session.AddSubscription(subscription);
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
- foreach (var fullRef in fullReferences)
+ foreach (var spec in tags)
{
- if (!TryParseNodeId(session, fullRef, out var nodeId)) continue;
- // The tag string is routed through MonitoredItem.Handle so the Notification
- // handler can identify which tag changed without an extra lookup.
- var item = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
- {
- DisplayName = fullRef,
- StartNodeId = nodeId,
- AttributeId = Attributes.Value,
- MonitoringMode = MonitoringMode.Reporting,
- SamplingInterval = intervalMs,
- QueueSize = 1,
- DiscardOldest = true,
- })
- {
- Handle = fullRef,
- };
- item.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args);
- subscription.AddItem(item);
+ if (!TryParseNodeId(session, spec.TagName, out var nodeId)) continue;
+
+ var monItem = BuildMonitoredItem(spec, nodeId, intervalMs);
+ monItem.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args);
+ subscription.AddItem(monItem);
}
- await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (Opc.Ua.ServiceResultException sre)
+ {
+ // PercentDeadband requires the server to expose EURange on the variable; if
+ // it isn't set the server returns BadFilterNotAllowed during item creation.
+ // We swallow the exception here so other items in the batch still get created
+ // — per-item failure surfaces through MonitoredItem.Status.Error rather than
+ // tearing down the whole subscription.
+ if (sre.StatusCode != StatusCodes.BadFilterNotAllowed) throw;
+ }
_subscriptions[id] = new RemoteSubscription(subscription, handle);
}
finally { _gate.Release(); }
@@ -915,6 +926,84 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
return handle;
}
+ ///
+ /// Map a to a SDK with the
+ /// per-tag knobs applied. Defaults match the original hard-coded values
+ /// (Reporting / SamplingInterval=publishInterval / QueueSize=1 / DiscardOldest=true)
+ /// so a spec with all knobs null behaves identically to the legacy path.
+ ///
+ internal static MonitoredItem BuildMonitoredItem(MonitoredTagSpec spec, NodeId nodeId, int defaultIntervalMs)
+ {
+ var sampling = spec.SamplingIntervalMs.HasValue ? (int)spec.SamplingIntervalMs.Value : defaultIntervalMs;
+ var queueSize = spec.QueueSize ?? 1u;
+ var discardOldest = spec.DiscardOldest ?? true;
+ var monitoringMode = spec.MonitoringMode is { } mm ? MapMonitoringMode(mm) : MonitoringMode.Reporting;
+ var filter = BuildDataChangeFilter(spec.DataChangeFilter);
+
+ var options = new MonitoredItemOptions
+ {
+ DisplayName = spec.TagName,
+ StartNodeId = nodeId,
+ AttributeId = Attributes.Value,
+ MonitoringMode = monitoringMode,
+ SamplingInterval = sampling,
+ QueueSize = queueSize,
+ DiscardOldest = discardOldest,
+ Filter = filter,
+ };
+
+ return new MonitoredItem(telemetry: null!, options)
+ {
+ // The tag string is routed through MonitoredItem.Handle so the Notification
+ // handler can identify which tag changed without an extra lookup.
+ Handle = spec.TagName,
+ };
+ }
+
+ ///
+ /// Build the OPC UA from a ,
+ /// or return null if the caller didn't supply a filter. PercentDeadband requires
+ /// server-side EURange — if the server rejects with BadFilterNotAllowed, the caller's
+ /// SubscribeAsync swallows it so other items in the batch still get created.
+ ///
+ internal static DataChangeFilter? BuildDataChangeFilter(DataChangeFilterSpec? spec)
+ {
+ if (spec is null) return null;
+ return new DataChangeFilter
+ {
+ Trigger = MapTrigger(spec.Trigger),
+ DeadbandType = (uint)MapDeadbandType(spec.DeadbandType),
+ DeadbandValue = spec.DeadbandValue,
+ };
+ }
+
+ /// Map our SDK-free to the OPC UA SDK's enum.
+ internal static MonitoringMode MapMonitoringMode(SubscriptionMonitoringMode mode) => mode switch
+ {
+ SubscriptionMonitoringMode.Disabled => MonitoringMode.Disabled,
+ SubscriptionMonitoringMode.Sampling => MonitoringMode.Sampling,
+ SubscriptionMonitoringMode.Reporting => MonitoringMode.Reporting,
+ _ => MonitoringMode.Reporting,
+ };
+
+ /// Map our to the SDK enum.
+ internal static Opc.Ua.DataChangeTrigger MapTrigger(Core.Abstractions.DataChangeTrigger trigger) => trigger switch
+ {
+ Core.Abstractions.DataChangeTrigger.Status => Opc.Ua.DataChangeTrigger.Status,
+ Core.Abstractions.DataChangeTrigger.StatusValue => Opc.Ua.DataChangeTrigger.StatusValue,
+ Core.Abstractions.DataChangeTrigger.StatusValueTimestamp => Opc.Ua.DataChangeTrigger.StatusValueTimestamp,
+ _ => Opc.Ua.DataChangeTrigger.StatusValue,
+ };
+
+ /// Map our to the SDK enum.
+ internal static Opc.Ua.DeadbandType MapDeadbandType(Core.Abstractions.DeadbandType type) => type switch
+ {
+ Core.Abstractions.DeadbandType.None => Opc.Ua.DeadbandType.None,
+ Core.Abstractions.DeadbandType.Absolute => Opc.Ua.DeadbandType.Absolute,
+ Core.Abstractions.DeadbandType.Percent => Opc.Ua.DeadbandType.Percent,
+ _ => Opc.Ua.DeadbandType.None,
+ };
+
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (handle is not OpcUaSubscriptionHandle h) return;
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientMonitoredTagSpecTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientMonitoredTagSpecTests.cs
new file mode 100644
index 0000000..476e359
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientMonitoredTagSpecTests.cs
@@ -0,0 +1,175 @@
+using Opc.Ua;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
+
+///
+/// Unit tests for -> SDK MonitoredItem mapping.
+/// Assertion-only — no live SDK session required, so the tests run on every CI without
+/// a real OPC UA server fixture.
+///
+[Trait("Category", "Unit")]
+public sealed class OpcUaClientMonitoredTagSpecTests
+{
+ private static readonly NodeId SampleNodeId = new("Demo", 2);
+
+ [Fact]
+ public void BuildMonitoredItem_with_all_defaults_matches_legacy_hard_coded_values()
+ {
+ // Spec with every per-tag knob null should behave identically to the legacy
+ // string-only SubscribeAsync path: Reporting / SamplingInterval=publishInterval /
+ // QueueSize=1 / DiscardOldest=true / no filter.
+ var spec = new MonitoredTagSpec("ns=2;s=Demo");
+ var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
+
+ item.SamplingInterval.ShouldBe(250);
+ item.QueueSize.ShouldBe(1u);
+ item.DiscardOldest.ShouldBeTrue();
+ item.MonitoringMode.ShouldBe(MonitoringMode.Reporting);
+ item.Filter.ShouldBeNull();
+ item.Handle.ShouldBe("ns=2;s=Demo",
+ "the tag string is routed through Handle so the Notification callback can identify the changed tag without re-parsing DisplayName");
+ }
+
+ [Fact]
+ public void BuildMonitoredItem_applies_per_tag_sampling_interval_independent_of_publish_interval()
+ {
+ // Per-tag SamplingInterval lets the server sample faster than it publishes — useful
+ // for events that change between publish ticks. If the spec sets it explicitly, the
+ // mapping uses that value, not the publish-interval default.
+ var spec = new MonitoredTagSpec("ns=2;s=Fast", SamplingIntervalMs: 50);
+ var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 1000);
+ item.SamplingInterval.ShouldBe(50);
+ }
+
+ [Fact]
+ public void BuildMonitoredItem_applies_queue_size_and_discard_oldest_overrides()
+ {
+ var spec = new MonitoredTagSpec("ns=2;s=DeepQueue", QueueSize: 100, DiscardOldest: false);
+ var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
+ item.QueueSize.ShouldBe(100u);
+ item.DiscardOldest.ShouldBeFalse(
+ "discard-oldest=false preserves earliest values — useful for audit-trail subscriptions where the first overflow sample is the most diagnostic");
+ }
+
+ [Theory]
+ [InlineData(SubscriptionMonitoringMode.Disabled, MonitoringMode.Disabled)]
+ [InlineData(SubscriptionMonitoringMode.Sampling, MonitoringMode.Sampling)]
+ [InlineData(SubscriptionMonitoringMode.Reporting, MonitoringMode.Reporting)]
+ public void BuildMonitoredItem_maps_each_monitoring_mode(SubscriptionMonitoringMode input, MonitoringMode expected)
+ {
+ var spec = new MonitoredTagSpec("ns=2;s=Mode", MonitoringMode: input);
+ var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
+ item.MonitoringMode.ShouldBe(expected);
+ }
+
+ [Fact]
+ public void BuildMonitoredItem_with_absolute_deadband_emits_DataChangeFilter()
+ {
+ var spec = new MonitoredTagSpec(
+ "ns=2;s=Analog",
+ DataChangeFilter: new DataChangeFilterSpec(
+ Core.Abstractions.DataChangeTrigger.StatusValue,
+ Core.Abstractions.DeadbandType.Absolute,
+ DeadbandValue: 0.5));
+ var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
+ var filter = item.Filter.ShouldBeOfType();
+ filter.Trigger.ShouldBe(Opc.Ua.DataChangeTrigger.StatusValue);
+ filter.DeadbandType.ShouldBe((uint)Opc.Ua.DeadbandType.Absolute);
+ filter.DeadbandValue.ShouldBe(0.5);
+ }
+
+ [Fact]
+ public void BuildMonitoredItem_with_percent_deadband_emits_percent_filter()
+ {
+ // PercentDeadband is calculated server-side as a fraction of EURange; the driver
+ // emits the filter unconditionally and lets the server return BadFilterNotAllowed
+ // if EURange isn't set on the variable. SubscribeAsync's catch-block swallows that
+ // status so other items in the batch still get created.
+ var spec = new MonitoredTagSpec(
+ "ns=2;s=Pct",
+ DataChangeFilter: new DataChangeFilterSpec(
+ Core.Abstractions.DataChangeTrigger.StatusValueTimestamp,
+ Core.Abstractions.DeadbandType.Percent,
+ DeadbandValue: 5.0));
+ var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
+ var filter = item.Filter.ShouldBeOfType();
+ filter.Trigger.ShouldBe(Opc.Ua.DataChangeTrigger.StatusValueTimestamp);
+ filter.DeadbandType.ShouldBe((uint)Opc.Ua.DeadbandType.Percent);
+ filter.DeadbandValue.ShouldBe(5.0);
+ }
+
+ [Theory]
+ [InlineData(Core.Abstractions.DataChangeTrigger.Status, Opc.Ua.DataChangeTrigger.Status)]
+ [InlineData(Core.Abstractions.DataChangeTrigger.StatusValue, Opc.Ua.DataChangeTrigger.StatusValue)]
+ [InlineData(Core.Abstractions.DataChangeTrigger.StatusValueTimestamp, Opc.Ua.DataChangeTrigger.StatusValueTimestamp)]
+ public void MapTrigger_round_trips_each_enum_value(
+ Core.Abstractions.DataChangeTrigger input, Opc.Ua.DataChangeTrigger expected)
+ => OpcUaClientDriver.MapTrigger(input).ShouldBe(expected);
+
+ [Theory]
+ [InlineData(Core.Abstractions.DeadbandType.None, Opc.Ua.DeadbandType.None)]
+ [InlineData(Core.Abstractions.DeadbandType.Absolute, Opc.Ua.DeadbandType.Absolute)]
+ [InlineData(Core.Abstractions.DeadbandType.Percent, Opc.Ua.DeadbandType.Percent)]
+ public void MapDeadbandType_round_trips_each_enum_value(
+ Core.Abstractions.DeadbandType input, Opc.Ua.DeadbandType expected)
+ => OpcUaClientDriver.MapDeadbandType(input).ShouldBe(expected);
+
+ [Fact]
+ public async Task DefaultInterfaceImplementation_routes_through_legacy_overload()
+ {
+ // ISubscribable's default interface impl of the per-tag overload delegates to the
+ // simple-string overload, ignoring per-tag knobs. Drivers that DON'T override the
+ // new overload (Modbus / S7 / Galaxy / TwinCAT / FOCAS / AbCip / AbLegacy) still
+ // accept MonitoredTagSpec lists and just pass through the tag names — back-compat
+ // for ISubscribable consumers.
+ var stub = new StubSubscribableDriver();
+ var specs = new[]
+ {
+ new MonitoredTagSpec("Tag1", SamplingIntervalMs: 50, QueueSize: 5),
+ new MonitoredTagSpec("Tag2", DataChangeFilter: new DataChangeFilterSpec(
+ Core.Abstractions.DataChangeTrigger.StatusValue,
+ Core.Abstractions.DeadbandType.Absolute,
+ 1.0)),
+ };
+
+ ISubscribable iface = stub;
+ _ = await iface.SubscribeAsync(specs, TimeSpan.FromMilliseconds(250), TestContext.Current.CancellationToken);
+
+ stub.LastTagNames.ShouldBe(["Tag1", "Tag2"]);
+ stub.LastPublishingInterval.ShouldBe(TimeSpan.FromMilliseconds(250));
+ }
+
+ ///
+ /// Test-double that records whatever the legacy
+ /// SubscribeAsync(IReadOnlyList<string>, ...) overload was called with.
+ /// Used to verify the default-impl per-tag overload routes correctly without needing
+ /// a real OPC UA session.
+ ///
+ private sealed class StubSubscribableDriver : ISubscribable
+ {
+ public IReadOnlyList? LastTagNames { get; private set; }
+ public TimeSpan LastPublishingInterval { get; private set; }
+
+ public Task SubscribeAsync(
+ IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
+ {
+ LastTagNames = fullReferences;
+ LastPublishingInterval = publishingInterval;
+ return Task.FromResult(new StubHandle());
+ }
+
+ public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) => Task.CompletedTask;
+
+#pragma warning disable CS0067 // event never used — the test only asserts the SubscribeAsync call routing
+ public event EventHandler? OnDataChange;
+#pragma warning restore CS0067
+ }
+
+ private sealed record StubHandle() : ISubscriptionHandle
+ {
+ public string DiagnosticId => "stub";
+ }
+}