From fae00749caaa103841b3a2c4edabd620f706552d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 25 Apr 2026 15:25:20 -0400 Subject: [PATCH] =?UTF-8?q?Auto:=20opcuaclient-2=20=E2=80=94=20per-tag=20a?= =?UTF-8?q?dvanced=20subscription=20tuning?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #274 --- .../DriverCapability.cs | 2 +- .../ISubscribable.cs | 117 +++++++++++- .../OpcUaClientDriver.cs | 131 ++++++++++--- .../OpcUaClientMonitoredTagSpecTests.cs | 175 ++++++++++++++++++ 4 files changed, 400 insertions(+), 25 deletions(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientMonitoredTagSpecTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs index 79dfeac..2366f28 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs @@ -25,7 +25,7 @@ public enum DriverCapability /// . Retries by default. Discover, - /// and unsubscribe. Retries by default. + /// and unsubscribe. Retries by default. Subscribe, /// probe loop. Retries by default. diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs index 2de7dbe..fdefce4 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/ISubscribable.cs @@ -20,7 +20,29 @@ public interface ISubscribable TimeSpan publishingInterval, CancellationToken cancellationToken); - /// Cancel a subscription returned by . + /// + /// Subscribe to data changes with per-tag advanced tuning (sampling interval, queue + /// size, monitoring mode, deadband filter). Drivers that don't have a native concept + /// of these knobs (e.g. polled drivers like Modbus) MAY ignore the per-tag knobs and + /// delegate to the simple + /// + /// overload — the default implementation does exactly that, so existing implementers + /// compile unchanged. + /// + /// Per-tag subscription specs. is the driver-side full reference. + /// Subscription publishing interval, applied to the whole batch. + /// Cancellation. + /// Opaque subscription handle for . + Task SubscribeAsync( + IReadOnlyList tags, + TimeSpan publishingInterval, + CancellationToken cancellationToken) + => SubscribeAsync( + tags.Select(t => t.TagName).ToList(), + publishingInterval, + cancellationToken); + + /// Cancel a subscription returned by either SubscribeAsync overload. Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken); /// @@ -30,7 +52,7 @@ public interface ISubscribable event EventHandler? OnDataChange; } -/// Opaque subscription identity returned by . +/// Opaque subscription identity returned by . public interface ISubscriptionHandle { /// Driver-internal subscription identifier (for diagnostics + post-mortem). @@ -38,10 +60,99 @@ public interface ISubscriptionHandle } /// Event payload for . -/// The handle returned by the original call. +/// The handle returned by the original call. /// Driver-side full reference of the changed attribute. /// New value + quality + timestamps. public sealed record DataChangeEventArgs( ISubscriptionHandle SubscriptionHandle, string FullReference, DataValueSnapshot Snapshot); + +/// +/// Per-tag subscription tuning. Maps onto OPC UA MonitoredItem properties for the +/// OpcUaClient driver; non-OPC-UA drivers either map a subset (e.g. ADS picks up +/// ) or ignore the knobs entirely and fall back to the +/// simple . +/// +/// Driver-side full reference (e.g. ns=2;s=Foo for OPC UA). +/// +/// Server-side sampling rate in milliseconds. null = use the publishing interval. +/// Sub-publish-interval values let a server sample faster than it publishes (queue + +/// coalesce), useful for events that change between publish ticks. +/// +/// Server-side notification queue depth. null = driver default (1). +/// +/// When the server-side queue overflows: true drops oldest, false drops newest. +/// null = driver default (true — preserve recency). +/// +/// +/// Per-item monitoring mode. Reporting = sample + publish, Sampling = sample +/// but suppress publishing (useful with triggering), Disabled = neither. +/// +/// +/// Optional data-change filter (deadband + trigger semantics). null = no filter +/// (every change publishes regardless of magnitude). +/// +public sealed record MonitoredTagSpec( + string TagName, + double? SamplingIntervalMs = null, + uint? QueueSize = null, + bool? DiscardOldest = null, + SubscriptionMonitoringMode? MonitoringMode = null, + DataChangeFilterSpec? DataChangeFilter = null); + +/// +/// OPC UA DataChangeFilter spec. Mirrors the OPC UA Part 4 §7.17.2 structure but +/// lives in Core.Abstractions so non-OpcUaClient drivers (e.g. Modbus, S7) can accept it +/// as metadata even if they ignore the deadband mechanics. +/// +/// When to fire: status only / status+value / status+value+timestamp. +/// Deadband mode: none / absolute (engineering units) / percent of EURange. +/// +/// Magnitude of the deadband. For +/// this is in the variable's engineering units; for +/// it's a 0..100 percentage of EURange (server returns BadFilterNotAllowed if EURange isn't set). +/// +public sealed record DataChangeFilterSpec( + DataChangeTrigger Trigger, + DeadbandType DeadbandType, + double DeadbandValue); + +/// +/// OPC UA DataChangeTrigger values. Wraps the SDK enum so Core.Abstractions doesn't +/// leak an OPC-UA-stack reference into every driver project. +/// +public enum DataChangeTrigger +{ + /// Fire only when StatusCode changes. + Status = 0, + /// Fire when StatusCode or Value changes (the OPC UA default). + StatusValue = 1, + /// Fire when StatusCode, Value, or SourceTimestamp changes. + StatusValueTimestamp = 2, +} + +/// OPC UA deadband-filter modes. +public enum DeadbandType +{ + /// No deadband — every value change publishes. + None = 0, + /// Deadband expressed in the variable's engineering units. + Absolute = 1, + /// Deadband expressed as 0..100 percent of the variable's EURange. + Percent = 2, +} + +/// +/// Per-item subscription monitoring mode. Wraps the OPC UA SDK's MonitoringMode +/// so Core.Abstractions stays SDK-free. +/// +public enum SubscriptionMonitoringMode +{ + /// Item is created but neither sampling nor publishing. + Disabled = 0, + /// Item samples and queues but does not publish (useful with triggering). + Sampling = 1, + /// Item samples and publishes — the OPC UA default. + Reporting = 2, +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs index b010171..fa977c3 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs @@ -851,8 +851,20 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d // ---- ISubscribable ---- - public async Task SubscribeAsync( + public Task SubscribeAsync( IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) + { + // Route the simple-string overload through the per-tag overload with all knobs at + // their defaults. Single code path for subscription create — keeps the wire-side + // identical for callers that don't need per-tag tuning. + var specs = new MonitoredTagSpec[fullReferences.Count]; + for (var i = 0; i < fullReferences.Count; i++) + specs[i] = new MonitoredTagSpec(fullReferences[i]); + return SubscribeAsync(specs, publishingInterval, cancellationToken); + } + + public async Task SubscribeAsync( + IReadOnlyList tags, TimeSpan publishingInterval, CancellationToken cancellationToken) { var session = RequireSession(); var id = Interlocked.Increment(ref _nextSubscriptionId); @@ -885,29 +897,28 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d session.AddSubscription(subscription); await subscription.CreateAsync(cancellationToken).ConfigureAwait(false); - foreach (var fullRef in fullReferences) + foreach (var spec in tags) { - if (!TryParseNodeId(session, fullRef, out var nodeId)) continue; - // The tag string is routed through MonitoredItem.Handle so the Notification - // handler can identify which tag changed without an extra lookup. - var item = new MonitoredItem(telemetry: null!, new MonitoredItemOptions - { - DisplayName = fullRef, - StartNodeId = nodeId, - AttributeId = Attributes.Value, - MonitoringMode = MonitoringMode.Reporting, - SamplingInterval = intervalMs, - QueueSize = 1, - DiscardOldest = true, - }) - { - Handle = fullRef, - }; - item.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args); - subscription.AddItem(item); + if (!TryParseNodeId(session, spec.TagName, out var nodeId)) continue; + + var monItem = BuildMonitoredItem(spec, nodeId, intervalMs); + monItem.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args); + subscription.AddItem(monItem); } - await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false); + try + { + await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false); + } + catch (Opc.Ua.ServiceResultException sre) + { + // PercentDeadband requires the server to expose EURange on the variable; if + // it isn't set the server returns BadFilterNotAllowed during item creation. + // We swallow the exception here so other items in the batch still get created + // — per-item failure surfaces through MonitoredItem.Status.Error rather than + // tearing down the whole subscription. + if (sre.StatusCode != StatusCodes.BadFilterNotAllowed) throw; + } _subscriptions[id] = new RemoteSubscription(subscription, handle); } finally { _gate.Release(); } @@ -915,6 +926,84 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d return handle; } + /// + /// Map a to a SDK with the + /// per-tag knobs applied. Defaults match the original hard-coded values + /// (Reporting / SamplingInterval=publishInterval / QueueSize=1 / DiscardOldest=true) + /// so a spec with all knobs null behaves identically to the legacy path. + /// + internal static MonitoredItem BuildMonitoredItem(MonitoredTagSpec spec, NodeId nodeId, int defaultIntervalMs) + { + var sampling = spec.SamplingIntervalMs.HasValue ? (int)spec.SamplingIntervalMs.Value : defaultIntervalMs; + var queueSize = spec.QueueSize ?? 1u; + var discardOldest = spec.DiscardOldest ?? true; + var monitoringMode = spec.MonitoringMode is { } mm ? MapMonitoringMode(mm) : MonitoringMode.Reporting; + var filter = BuildDataChangeFilter(spec.DataChangeFilter); + + var options = new MonitoredItemOptions + { + DisplayName = spec.TagName, + StartNodeId = nodeId, + AttributeId = Attributes.Value, + MonitoringMode = monitoringMode, + SamplingInterval = sampling, + QueueSize = queueSize, + DiscardOldest = discardOldest, + Filter = filter, + }; + + return new MonitoredItem(telemetry: null!, options) + { + // The tag string is routed through MonitoredItem.Handle so the Notification + // handler can identify which tag changed without an extra lookup. + Handle = spec.TagName, + }; + } + + /// + /// Build the OPC UA from a , + /// or return null if the caller didn't supply a filter. PercentDeadband requires + /// server-side EURange — if the server rejects with BadFilterNotAllowed, the caller's + /// SubscribeAsync swallows it so other items in the batch still get created. + /// + internal static DataChangeFilter? BuildDataChangeFilter(DataChangeFilterSpec? spec) + { + if (spec is null) return null; + return new DataChangeFilter + { + Trigger = MapTrigger(spec.Trigger), + DeadbandType = (uint)MapDeadbandType(spec.DeadbandType), + DeadbandValue = spec.DeadbandValue, + }; + } + + /// Map our SDK-free to the OPC UA SDK's enum. + internal static MonitoringMode MapMonitoringMode(SubscriptionMonitoringMode mode) => mode switch + { + SubscriptionMonitoringMode.Disabled => MonitoringMode.Disabled, + SubscriptionMonitoringMode.Sampling => MonitoringMode.Sampling, + SubscriptionMonitoringMode.Reporting => MonitoringMode.Reporting, + _ => MonitoringMode.Reporting, + }; + + /// Map our to the SDK enum. + internal static Opc.Ua.DataChangeTrigger MapTrigger(Core.Abstractions.DataChangeTrigger trigger) => trigger switch + { + Core.Abstractions.DataChangeTrigger.Status => Opc.Ua.DataChangeTrigger.Status, + Core.Abstractions.DataChangeTrigger.StatusValue => Opc.Ua.DataChangeTrigger.StatusValue, + Core.Abstractions.DataChangeTrigger.StatusValueTimestamp => Opc.Ua.DataChangeTrigger.StatusValueTimestamp, + _ => Opc.Ua.DataChangeTrigger.StatusValue, + }; + + /// Map our to the SDK enum. + internal static Opc.Ua.DeadbandType MapDeadbandType(Core.Abstractions.DeadbandType type) => type switch + { + Core.Abstractions.DeadbandType.None => Opc.Ua.DeadbandType.None, + Core.Abstractions.DeadbandType.Absolute => Opc.Ua.DeadbandType.Absolute, + Core.Abstractions.DeadbandType.Percent => Opc.Ua.DeadbandType.Percent, + _ => Opc.Ua.DeadbandType.None, + }; + public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) { if (handle is not OpcUaSubscriptionHandle h) return; diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientMonitoredTagSpecTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientMonitoredTagSpecTests.cs new file mode 100644 index 0000000..476e359 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientMonitoredTagSpecTests.cs @@ -0,0 +1,175 @@ +using Opc.Ua; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests; + +/// +/// Unit tests for -> SDK MonitoredItem mapping. +/// Assertion-only — no live SDK session required, so the tests run on every CI without +/// a real OPC UA server fixture. +/// +[Trait("Category", "Unit")] +public sealed class OpcUaClientMonitoredTagSpecTests +{ + private static readonly NodeId SampleNodeId = new("Demo", 2); + + [Fact] + public void BuildMonitoredItem_with_all_defaults_matches_legacy_hard_coded_values() + { + // Spec with every per-tag knob null should behave identically to the legacy + // string-only SubscribeAsync path: Reporting / SamplingInterval=publishInterval / + // QueueSize=1 / DiscardOldest=true / no filter. + var spec = new MonitoredTagSpec("ns=2;s=Demo"); + var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250); + + item.SamplingInterval.ShouldBe(250); + item.QueueSize.ShouldBe(1u); + item.DiscardOldest.ShouldBeTrue(); + item.MonitoringMode.ShouldBe(MonitoringMode.Reporting); + item.Filter.ShouldBeNull(); + item.Handle.ShouldBe("ns=2;s=Demo", + "the tag string is routed through Handle so the Notification callback can identify the changed tag without re-parsing DisplayName"); + } + + [Fact] + public void BuildMonitoredItem_applies_per_tag_sampling_interval_independent_of_publish_interval() + { + // Per-tag SamplingInterval lets the server sample faster than it publishes — useful + // for events that change between publish ticks. If the spec sets it explicitly, the + // mapping uses that value, not the publish-interval default. + var spec = new MonitoredTagSpec("ns=2;s=Fast", SamplingIntervalMs: 50); + var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 1000); + item.SamplingInterval.ShouldBe(50); + } + + [Fact] + public void BuildMonitoredItem_applies_queue_size_and_discard_oldest_overrides() + { + var spec = new MonitoredTagSpec("ns=2;s=DeepQueue", QueueSize: 100, DiscardOldest: false); + var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250); + item.QueueSize.ShouldBe(100u); + item.DiscardOldest.ShouldBeFalse( + "discard-oldest=false preserves earliest values — useful for audit-trail subscriptions where the first overflow sample is the most diagnostic"); + } + + [Theory] + [InlineData(SubscriptionMonitoringMode.Disabled, MonitoringMode.Disabled)] + [InlineData(SubscriptionMonitoringMode.Sampling, MonitoringMode.Sampling)] + [InlineData(SubscriptionMonitoringMode.Reporting, MonitoringMode.Reporting)] + public void BuildMonitoredItem_maps_each_monitoring_mode(SubscriptionMonitoringMode input, MonitoringMode expected) + { + var spec = new MonitoredTagSpec("ns=2;s=Mode", MonitoringMode: input); + var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250); + item.MonitoringMode.ShouldBe(expected); + } + + [Fact] + public void BuildMonitoredItem_with_absolute_deadband_emits_DataChangeFilter() + { + var spec = new MonitoredTagSpec( + "ns=2;s=Analog", + DataChangeFilter: new DataChangeFilterSpec( + Core.Abstractions.DataChangeTrigger.StatusValue, + Core.Abstractions.DeadbandType.Absolute, + DeadbandValue: 0.5)); + var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250); + var filter = item.Filter.ShouldBeOfType(); + filter.Trigger.ShouldBe(Opc.Ua.DataChangeTrigger.StatusValue); + filter.DeadbandType.ShouldBe((uint)Opc.Ua.DeadbandType.Absolute); + filter.DeadbandValue.ShouldBe(0.5); + } + + [Fact] + public void BuildMonitoredItem_with_percent_deadband_emits_percent_filter() + { + // PercentDeadband is calculated server-side as a fraction of EURange; the driver + // emits the filter unconditionally and lets the server return BadFilterNotAllowed + // if EURange isn't set on the variable. SubscribeAsync's catch-block swallows that + // status so other items in the batch still get created. + var spec = new MonitoredTagSpec( + "ns=2;s=Pct", + DataChangeFilter: new DataChangeFilterSpec( + Core.Abstractions.DataChangeTrigger.StatusValueTimestamp, + Core.Abstractions.DeadbandType.Percent, + DeadbandValue: 5.0)); + var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250); + var filter = item.Filter.ShouldBeOfType(); + filter.Trigger.ShouldBe(Opc.Ua.DataChangeTrigger.StatusValueTimestamp); + filter.DeadbandType.ShouldBe((uint)Opc.Ua.DeadbandType.Percent); + filter.DeadbandValue.ShouldBe(5.0); + } + + [Theory] + [InlineData(Core.Abstractions.DataChangeTrigger.Status, Opc.Ua.DataChangeTrigger.Status)] + [InlineData(Core.Abstractions.DataChangeTrigger.StatusValue, Opc.Ua.DataChangeTrigger.StatusValue)] + [InlineData(Core.Abstractions.DataChangeTrigger.StatusValueTimestamp, Opc.Ua.DataChangeTrigger.StatusValueTimestamp)] + public void MapTrigger_round_trips_each_enum_value( + Core.Abstractions.DataChangeTrigger input, Opc.Ua.DataChangeTrigger expected) + => OpcUaClientDriver.MapTrigger(input).ShouldBe(expected); + + [Theory] + [InlineData(Core.Abstractions.DeadbandType.None, Opc.Ua.DeadbandType.None)] + [InlineData(Core.Abstractions.DeadbandType.Absolute, Opc.Ua.DeadbandType.Absolute)] + [InlineData(Core.Abstractions.DeadbandType.Percent, Opc.Ua.DeadbandType.Percent)] + public void MapDeadbandType_round_trips_each_enum_value( + Core.Abstractions.DeadbandType input, Opc.Ua.DeadbandType expected) + => OpcUaClientDriver.MapDeadbandType(input).ShouldBe(expected); + + [Fact] + public async Task DefaultInterfaceImplementation_routes_through_legacy_overload() + { + // ISubscribable's default interface impl of the per-tag overload delegates to the + // simple-string overload, ignoring per-tag knobs. Drivers that DON'T override the + // new overload (Modbus / S7 / Galaxy / TwinCAT / FOCAS / AbCip / AbLegacy) still + // accept MonitoredTagSpec lists and just pass through the tag names — back-compat + // for ISubscribable consumers. + var stub = new StubSubscribableDriver(); + var specs = new[] + { + new MonitoredTagSpec("Tag1", SamplingIntervalMs: 50, QueueSize: 5), + new MonitoredTagSpec("Tag2", DataChangeFilter: new DataChangeFilterSpec( + Core.Abstractions.DataChangeTrigger.StatusValue, + Core.Abstractions.DeadbandType.Absolute, + 1.0)), + }; + + ISubscribable iface = stub; + _ = await iface.SubscribeAsync(specs, TimeSpan.FromMilliseconds(250), TestContext.Current.CancellationToken); + + stub.LastTagNames.ShouldBe(["Tag1", "Tag2"]); + stub.LastPublishingInterval.ShouldBe(TimeSpan.FromMilliseconds(250)); + } + + /// + /// Test-double that records whatever the legacy + /// SubscribeAsync(IReadOnlyList<string>, ...) overload was called with. + /// Used to verify the default-impl per-tag overload routes correctly without needing + /// a real OPC UA session. + /// + private sealed class StubSubscribableDriver : ISubscribable + { + public IReadOnlyList? LastTagNames { get; private set; } + public TimeSpan LastPublishingInterval { get; private set; } + + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) + { + LastTagNames = fullReferences; + LastPublishingInterval = publishingInterval; + return Task.FromResult(new StubHandle()); + } + + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) => Task.CompletedTask; + +#pragma warning disable CS0067 // event never used — the test only asserts the SubscribeAsync call routing + public event EventHandler? OnDataChange; +#pragma warning restore CS0067 + } + + private sealed record StubHandle() : ISubscriptionHandle + { + public string DiagnosticId => "stub"; + } +} -- 2.49.1