[opcuaclient] OpcUaClient — Per-tag advanced subscription tuning incl. deadband #332
@@ -25,7 +25,7 @@ public enum DriverCapability
|
|||||||
/// <summary><see cref="ITagDiscovery.DiscoverAsync"/>. Retries by default.</summary>
|
/// <summary><see cref="ITagDiscovery.DiscoverAsync"/>. Retries by default.</summary>
|
||||||
Discover,
|
Discover,
|
||||||
|
|
||||||
/// <summary><see cref="ISubscribable.SubscribeAsync"/> and unsubscribe. Retries by default.</summary>
|
/// <summary><see cref="ISubscribable.SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/> and unsubscribe. Retries by default.</summary>
|
||||||
Subscribe,
|
Subscribe,
|
||||||
|
|
||||||
/// <summary><see cref="IHostConnectivityProbe"/> probe loop. Retries by default.</summary>
|
/// <summary><see cref="IHostConnectivityProbe"/> probe loop. Retries by default.</summary>
|
||||||
|
|||||||
@@ -20,7 +20,29 @@ public interface ISubscribable
|
|||||||
TimeSpan publishingInterval,
|
TimeSpan publishingInterval,
|
||||||
CancellationToken cancellationToken);
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
/// <summary>Cancel a subscription returned by <see cref="SubscribeAsync"/>.</summary>
|
/// <summary>
|
||||||
|
/// Subscribe to data changes with per-tag advanced tuning (sampling interval, queue
|
||||||
|
/// size, monitoring mode, deadband filter). Drivers that don't have a native concept
|
||||||
|
/// of these knobs (e.g. polled drivers like Modbus) MAY ignore the per-tag knobs and
|
||||||
|
/// delegate to the simple
|
||||||
|
/// <see cref="SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/>
|
||||||
|
/// overload — the default implementation does exactly that, so existing implementers
|
||||||
|
/// compile unchanged.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="tags">Per-tag subscription specs. <see cref="MonitoredTagSpec.TagName"/> is the driver-side full reference.</param>
|
||||||
|
/// <param name="publishingInterval">Subscription publishing interval, applied to the whole batch.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation.</param>
|
||||||
|
/// <returns>Opaque subscription handle for <see cref="UnsubscribeAsync"/>.</returns>
|
||||||
|
Task<ISubscriptionHandle> SubscribeAsync(
|
||||||
|
IReadOnlyList<MonitoredTagSpec> tags,
|
||||||
|
TimeSpan publishingInterval,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
=> SubscribeAsync(
|
||||||
|
tags.Select(t => t.TagName).ToList(),
|
||||||
|
publishingInterval,
|
||||||
|
cancellationToken);
|
||||||
|
|
||||||
|
/// <summary>Cancel a subscription returned by either <c>SubscribeAsync</c> overload.</summary>
|
||||||
Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken);
|
Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -30,7 +52,7 @@ public interface ISubscribable
|
|||||||
event EventHandler<DataChangeEventArgs>? OnDataChange;
|
event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Opaque subscription identity returned by <see cref="ISubscribable.SubscribeAsync"/>.</summary>
|
/// <summary>Opaque subscription identity returned by <see cref="ISubscribable.SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/>.</summary>
|
||||||
public interface ISubscriptionHandle
|
public interface ISubscriptionHandle
|
||||||
{
|
{
|
||||||
/// <summary>Driver-internal subscription identifier (for diagnostics + post-mortem).</summary>
|
/// <summary>Driver-internal subscription identifier (for diagnostics + post-mortem).</summary>
|
||||||
@@ -38,10 +60,99 @@ public interface ISubscriptionHandle
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Event payload for <see cref="ISubscribable.OnDataChange"/>.</summary>
|
/// <summary>Event payload for <see cref="ISubscribable.OnDataChange"/>.</summary>
|
||||||
/// <param name="SubscriptionHandle">The handle returned by the original <see cref="ISubscribable.SubscribeAsync"/> call.</param>
|
/// <param name="SubscriptionHandle">The handle returned by the original <see cref="ISubscribable.SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/> call.</param>
|
||||||
/// <param name="FullReference">Driver-side full reference of the changed attribute.</param>
|
/// <param name="FullReference">Driver-side full reference of the changed attribute.</param>
|
||||||
/// <param name="Snapshot">New value + quality + timestamps.</param>
|
/// <param name="Snapshot">New value + quality + timestamps.</param>
|
||||||
public sealed record DataChangeEventArgs(
|
public sealed record DataChangeEventArgs(
|
||||||
ISubscriptionHandle SubscriptionHandle,
|
ISubscriptionHandle SubscriptionHandle,
|
||||||
string FullReference,
|
string FullReference,
|
||||||
DataValueSnapshot Snapshot);
|
DataValueSnapshot Snapshot);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Per-tag subscription tuning. Maps onto OPC UA <c>MonitoredItem</c> properties for the
|
||||||
|
/// OpcUaClient driver; non-OPC-UA drivers either map a subset (e.g. ADS picks up
|
||||||
|
/// <see cref="SamplingIntervalMs"/>) or ignore the knobs entirely and fall back to the
|
||||||
|
/// simple <see cref="ISubscribable.SubscribeAsync(IReadOnlyList{string}, TimeSpan, CancellationToken)"/>.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="TagName">Driver-side full reference (e.g. <c>ns=2;s=Foo</c> for OPC UA).</param>
|
||||||
|
/// <param name="SamplingIntervalMs">
|
||||||
|
/// Server-side sampling rate in milliseconds. <c>null</c> = use the publishing interval.
|
||||||
|
/// Sub-publish-interval values let a server sample faster than it publishes (queue +
|
||||||
|
/// coalesce), useful for events that change between publish ticks.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="QueueSize">Server-side notification queue depth. <c>null</c> = driver default (1).</param>
|
||||||
|
/// <param name="DiscardOldest">
|
||||||
|
/// When the server-side queue overflows: <c>true</c> drops oldest, <c>false</c> drops newest.
|
||||||
|
/// <c>null</c> = driver default (true — preserve recency).
|
||||||
|
/// </param>
|
||||||
|
/// <param name="MonitoringMode">
|
||||||
|
/// Per-item monitoring mode. <c>Reporting</c> = sample + publish, <c>Sampling</c> = sample
|
||||||
|
/// but suppress publishing (useful with triggering), <c>Disabled</c> = neither.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="DataChangeFilter">
|
||||||
|
/// Optional data-change filter (deadband + trigger semantics). <c>null</c> = no filter
|
||||||
|
/// (every change publishes regardless of magnitude).
|
||||||
|
/// </param>
|
||||||
|
public sealed record MonitoredTagSpec(
|
||||||
|
string TagName,
|
||||||
|
double? SamplingIntervalMs = null,
|
||||||
|
uint? QueueSize = null,
|
||||||
|
bool? DiscardOldest = null,
|
||||||
|
SubscriptionMonitoringMode? MonitoringMode = null,
|
||||||
|
DataChangeFilterSpec? DataChangeFilter = null);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// OPC UA <c>DataChangeFilter</c> spec. Mirrors the OPC UA Part 4 §7.17.2 structure but
|
||||||
|
/// lives in Core.Abstractions so non-OpcUaClient drivers (e.g. Modbus, S7) can accept it
|
||||||
|
/// as metadata even if they ignore the deadband mechanics.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="Trigger">When to fire: status only / status+value / status+value+timestamp.</param>
|
||||||
|
/// <param name="DeadbandType">Deadband mode: none / absolute (engineering units) / percent of EURange.</param>
|
||||||
|
/// <param name="DeadbandValue">
|
||||||
|
/// Magnitude of the deadband. For <see cref="OtOpcUa.Core.Abstractions.DeadbandType.Absolute"/>
|
||||||
|
/// this is in the variable's engineering units; for <see cref="OtOpcUa.Core.Abstractions.DeadbandType.Percent"/>
|
||||||
|
/// it's a 0..100 percentage of EURange (server returns BadFilterNotAllowed if EURange isn't set).
|
||||||
|
/// </param>
|
||||||
|
public sealed record DataChangeFilterSpec(
|
||||||
|
DataChangeTrigger Trigger,
|
||||||
|
DeadbandType DeadbandType,
|
||||||
|
double DeadbandValue);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// OPC UA <c>DataChangeTrigger</c> values. Wraps the SDK enum so Core.Abstractions doesn't
|
||||||
|
/// leak an OPC-UA-stack reference into every driver project.
|
||||||
|
/// </summary>
|
||||||
|
public enum DataChangeTrigger
|
||||||
|
{
|
||||||
|
/// <summary>Fire only when StatusCode changes.</summary>
|
||||||
|
Status = 0,
|
||||||
|
/// <summary>Fire when StatusCode or Value changes (the OPC UA default).</summary>
|
||||||
|
StatusValue = 1,
|
||||||
|
/// <summary>Fire when StatusCode, Value, or SourceTimestamp changes.</summary>
|
||||||
|
StatusValueTimestamp = 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>OPC UA deadband-filter modes.</summary>
|
||||||
|
public enum DeadbandType
|
||||||
|
{
|
||||||
|
/// <summary>No deadband — every value change publishes.</summary>
|
||||||
|
None = 0,
|
||||||
|
/// <summary>Deadband expressed in the variable's engineering units.</summary>
|
||||||
|
Absolute = 1,
|
||||||
|
/// <summary>Deadband expressed as 0..100 percent of the variable's EURange.</summary>
|
||||||
|
Percent = 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Per-item subscription monitoring mode. Wraps the OPC UA SDK's <c>MonitoringMode</c>
|
||||||
|
/// so Core.Abstractions stays SDK-free.
|
||||||
|
/// </summary>
|
||||||
|
public enum SubscriptionMonitoringMode
|
||||||
|
{
|
||||||
|
/// <summary>Item is created but neither sampling nor publishing.</summary>
|
||||||
|
Disabled = 0,
|
||||||
|
/// <summary>Item samples and queues but does not publish (useful with triggering).</summary>
|
||||||
|
Sampling = 1,
|
||||||
|
/// <summary>Item samples and publishes — the OPC UA default.</summary>
|
||||||
|
Reporting = 2,
|
||||||
|
}
|
||||||
|
|||||||
@@ -851,8 +851,20 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|||||||
|
|
||||||
// ---- ISubscribable ----
|
// ---- ISubscribable ----
|
||||||
|
|
||||||
public async Task<ISubscriptionHandle> SubscribeAsync(
|
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
// Route the simple-string overload through the per-tag overload with all knobs at
|
||||||
|
// their defaults. Single code path for subscription create — keeps the wire-side
|
||||||
|
// identical for callers that don't need per-tag tuning.
|
||||||
|
var specs = new MonitoredTagSpec[fullReferences.Count];
|
||||||
|
for (var i = 0; i < fullReferences.Count; i++)
|
||||||
|
specs[i] = new MonitoredTagSpec(fullReferences[i]);
|
||||||
|
return SubscribeAsync(specs, publishingInterval, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<ISubscriptionHandle> SubscribeAsync(
|
||||||
|
IReadOnlyList<MonitoredTagSpec> tags, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var session = RequireSession();
|
var session = RequireSession();
|
||||||
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
||||||
@@ -885,29 +897,28 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|||||||
session.AddSubscription(subscription);
|
session.AddSubscription(subscription);
|
||||||
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
|
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
foreach (var fullRef in fullReferences)
|
foreach (var spec in tags)
|
||||||
{
|
{
|
||||||
if (!TryParseNodeId(session, fullRef, out var nodeId)) continue;
|
if (!TryParseNodeId(session, spec.TagName, out var nodeId)) continue;
|
||||||
// The tag string is routed through MonitoredItem.Handle so the Notification
|
|
||||||
// handler can identify which tag changed without an extra lookup.
|
var monItem = BuildMonitoredItem(spec, nodeId, intervalMs);
|
||||||
var item = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
|
monItem.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args);
|
||||||
{
|
subscription.AddItem(monItem);
|
||||||
DisplayName = fullRef,
|
|
||||||
StartNodeId = nodeId,
|
|
||||||
AttributeId = Attributes.Value,
|
|
||||||
MonitoringMode = MonitoringMode.Reporting,
|
|
||||||
SamplingInterval = intervalMs,
|
|
||||||
QueueSize = 1,
|
|
||||||
DiscardOldest = true,
|
|
||||||
})
|
|
||||||
{
|
|
||||||
Handle = fullRef,
|
|
||||||
};
|
|
||||||
item.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args);
|
|
||||||
subscription.AddItem(item);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
|
try
|
||||||
|
{
|
||||||
|
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (Opc.Ua.ServiceResultException sre)
|
||||||
|
{
|
||||||
|
// PercentDeadband requires the server to expose EURange on the variable; if
|
||||||
|
// it isn't set the server returns BadFilterNotAllowed during item creation.
|
||||||
|
// We swallow the exception here so other items in the batch still get created
|
||||||
|
// — per-item failure surfaces through MonitoredItem.Status.Error rather than
|
||||||
|
// tearing down the whole subscription.
|
||||||
|
if (sre.StatusCode != StatusCodes.BadFilterNotAllowed) throw;
|
||||||
|
}
|
||||||
_subscriptions[id] = new RemoteSubscription(subscription, handle);
|
_subscriptions[id] = new RemoteSubscription(subscription, handle);
|
||||||
}
|
}
|
||||||
finally { _gate.Release(); }
|
finally { _gate.Release(); }
|
||||||
@@ -915,6 +926,84 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Map a <see cref="MonitoredTagSpec"/> to a SDK <see cref="MonitoredItem"/> with the
|
||||||
|
/// per-tag knobs applied. Defaults match the original hard-coded values
|
||||||
|
/// (Reporting / SamplingInterval=publishInterval / QueueSize=1 / DiscardOldest=true)
|
||||||
|
/// so a spec with all knobs <c>null</c> behaves identically to the legacy path.
|
||||||
|
/// </summary>
|
||||||
|
internal static MonitoredItem BuildMonitoredItem(MonitoredTagSpec spec, NodeId nodeId, int defaultIntervalMs)
|
||||||
|
{
|
||||||
|
var sampling = spec.SamplingIntervalMs.HasValue ? (int)spec.SamplingIntervalMs.Value : defaultIntervalMs;
|
||||||
|
var queueSize = spec.QueueSize ?? 1u;
|
||||||
|
var discardOldest = spec.DiscardOldest ?? true;
|
||||||
|
var monitoringMode = spec.MonitoringMode is { } mm ? MapMonitoringMode(mm) : MonitoringMode.Reporting;
|
||||||
|
var filter = BuildDataChangeFilter(spec.DataChangeFilter);
|
||||||
|
|
||||||
|
var options = new MonitoredItemOptions
|
||||||
|
{
|
||||||
|
DisplayName = spec.TagName,
|
||||||
|
StartNodeId = nodeId,
|
||||||
|
AttributeId = Attributes.Value,
|
||||||
|
MonitoringMode = monitoringMode,
|
||||||
|
SamplingInterval = sampling,
|
||||||
|
QueueSize = queueSize,
|
||||||
|
DiscardOldest = discardOldest,
|
||||||
|
Filter = filter,
|
||||||
|
};
|
||||||
|
|
||||||
|
return new MonitoredItem(telemetry: null!, options)
|
||||||
|
{
|
||||||
|
// The tag string is routed through MonitoredItem.Handle so the Notification
|
||||||
|
// handler can identify which tag changed without an extra lookup.
|
||||||
|
Handle = spec.TagName,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Build the OPC UA <see cref="DataChangeFilter"/> from a <see cref="DataChangeFilterSpec"/>,
|
||||||
|
/// or return <c>null</c> if the caller didn't supply a filter. PercentDeadband requires
|
||||||
|
/// server-side EURange — if the server rejects with BadFilterNotAllowed, the caller's
|
||||||
|
/// <c>SubscribeAsync</c> swallows it so other items in the batch still get created.
|
||||||
|
/// </summary>
|
||||||
|
internal static DataChangeFilter? BuildDataChangeFilter(DataChangeFilterSpec? spec)
|
||||||
|
{
|
||||||
|
if (spec is null) return null;
|
||||||
|
return new DataChangeFilter
|
||||||
|
{
|
||||||
|
Trigger = MapTrigger(spec.Trigger),
|
||||||
|
DeadbandType = (uint)MapDeadbandType(spec.DeadbandType),
|
||||||
|
DeadbandValue = spec.DeadbandValue,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Map our SDK-free <see cref="SubscriptionMonitoringMode"/> to the OPC UA SDK's enum.</summary>
|
||||||
|
internal static MonitoringMode MapMonitoringMode(SubscriptionMonitoringMode mode) => mode switch
|
||||||
|
{
|
||||||
|
SubscriptionMonitoringMode.Disabled => MonitoringMode.Disabled,
|
||||||
|
SubscriptionMonitoringMode.Sampling => MonitoringMode.Sampling,
|
||||||
|
SubscriptionMonitoringMode.Reporting => MonitoringMode.Reporting,
|
||||||
|
_ => MonitoringMode.Reporting,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// <summary>Map our <see cref="Core.Abstractions.DataChangeTrigger"/> to the SDK enum.</summary>
|
||||||
|
internal static Opc.Ua.DataChangeTrigger MapTrigger(Core.Abstractions.DataChangeTrigger trigger) => trigger switch
|
||||||
|
{
|
||||||
|
Core.Abstractions.DataChangeTrigger.Status => Opc.Ua.DataChangeTrigger.Status,
|
||||||
|
Core.Abstractions.DataChangeTrigger.StatusValue => Opc.Ua.DataChangeTrigger.StatusValue,
|
||||||
|
Core.Abstractions.DataChangeTrigger.StatusValueTimestamp => Opc.Ua.DataChangeTrigger.StatusValueTimestamp,
|
||||||
|
_ => Opc.Ua.DataChangeTrigger.StatusValue,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// <summary>Map our <see cref="Core.Abstractions.DeadbandType"/> to the SDK enum.</summary>
|
||||||
|
internal static Opc.Ua.DeadbandType MapDeadbandType(Core.Abstractions.DeadbandType type) => type switch
|
||||||
|
{
|
||||||
|
Core.Abstractions.DeadbandType.None => Opc.Ua.DeadbandType.None,
|
||||||
|
Core.Abstractions.DeadbandType.Absolute => Opc.Ua.DeadbandType.Absolute,
|
||||||
|
Core.Abstractions.DeadbandType.Percent => Opc.Ua.DeadbandType.Percent,
|
||||||
|
_ => Opc.Ua.DeadbandType.None,
|
||||||
|
};
|
||||||
|
|
||||||
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
if (handle is not OpcUaSubscriptionHandle h) return;
|
if (handle is not OpcUaSubscriptionHandle h) return;
|
||||||
|
|||||||
@@ -0,0 +1,175 @@
|
|||||||
|
using Opc.Ua;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Unit tests for <see cref="MonitoredTagSpec"/> -> SDK <c>MonitoredItem</c> mapping.
|
||||||
|
/// Assertion-only — no live SDK session required, so the tests run on every CI without
|
||||||
|
/// a real OPC UA server fixture.
|
||||||
|
/// </summary>
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class OpcUaClientMonitoredTagSpecTests
|
||||||
|
{
|
||||||
|
private static readonly NodeId SampleNodeId = new("Demo", 2);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void BuildMonitoredItem_with_all_defaults_matches_legacy_hard_coded_values()
|
||||||
|
{
|
||||||
|
// Spec with every per-tag knob null should behave identically to the legacy
|
||||||
|
// string-only SubscribeAsync path: Reporting / SamplingInterval=publishInterval /
|
||||||
|
// QueueSize=1 / DiscardOldest=true / no filter.
|
||||||
|
var spec = new MonitoredTagSpec("ns=2;s=Demo");
|
||||||
|
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
|
||||||
|
|
||||||
|
item.SamplingInterval.ShouldBe(250);
|
||||||
|
item.QueueSize.ShouldBe(1u);
|
||||||
|
item.DiscardOldest.ShouldBeTrue();
|
||||||
|
item.MonitoringMode.ShouldBe(MonitoringMode.Reporting);
|
||||||
|
item.Filter.ShouldBeNull();
|
||||||
|
item.Handle.ShouldBe("ns=2;s=Demo",
|
||||||
|
"the tag string is routed through Handle so the Notification callback can identify the changed tag without re-parsing DisplayName");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void BuildMonitoredItem_applies_per_tag_sampling_interval_independent_of_publish_interval()
|
||||||
|
{
|
||||||
|
// Per-tag SamplingInterval lets the server sample faster than it publishes — useful
|
||||||
|
// for events that change between publish ticks. If the spec sets it explicitly, the
|
||||||
|
// mapping uses that value, not the publish-interval default.
|
||||||
|
var spec = new MonitoredTagSpec("ns=2;s=Fast", SamplingIntervalMs: 50);
|
||||||
|
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 1000);
|
||||||
|
item.SamplingInterval.ShouldBe(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void BuildMonitoredItem_applies_queue_size_and_discard_oldest_overrides()
|
||||||
|
{
|
||||||
|
var spec = new MonitoredTagSpec("ns=2;s=DeepQueue", QueueSize: 100, DiscardOldest: false);
|
||||||
|
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
|
||||||
|
item.QueueSize.ShouldBe(100u);
|
||||||
|
item.DiscardOldest.ShouldBeFalse(
|
||||||
|
"discard-oldest=false preserves earliest values — useful for audit-trail subscriptions where the first overflow sample is the most diagnostic");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(SubscriptionMonitoringMode.Disabled, MonitoringMode.Disabled)]
|
||||||
|
[InlineData(SubscriptionMonitoringMode.Sampling, MonitoringMode.Sampling)]
|
||||||
|
[InlineData(SubscriptionMonitoringMode.Reporting, MonitoringMode.Reporting)]
|
||||||
|
public void BuildMonitoredItem_maps_each_monitoring_mode(SubscriptionMonitoringMode input, MonitoringMode expected)
|
||||||
|
{
|
||||||
|
var spec = new MonitoredTagSpec("ns=2;s=Mode", MonitoringMode: input);
|
||||||
|
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
|
||||||
|
item.MonitoringMode.ShouldBe(expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void BuildMonitoredItem_with_absolute_deadband_emits_DataChangeFilter()
|
||||||
|
{
|
||||||
|
var spec = new MonitoredTagSpec(
|
||||||
|
"ns=2;s=Analog",
|
||||||
|
DataChangeFilter: new DataChangeFilterSpec(
|
||||||
|
Core.Abstractions.DataChangeTrigger.StatusValue,
|
||||||
|
Core.Abstractions.DeadbandType.Absolute,
|
||||||
|
DeadbandValue: 0.5));
|
||||||
|
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
|
||||||
|
var filter = item.Filter.ShouldBeOfType<DataChangeFilter>();
|
||||||
|
filter.Trigger.ShouldBe(Opc.Ua.DataChangeTrigger.StatusValue);
|
||||||
|
filter.DeadbandType.ShouldBe((uint)Opc.Ua.DeadbandType.Absolute);
|
||||||
|
filter.DeadbandValue.ShouldBe(0.5);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void BuildMonitoredItem_with_percent_deadband_emits_percent_filter()
|
||||||
|
{
|
||||||
|
// PercentDeadband is calculated server-side as a fraction of EURange; the driver
|
||||||
|
// emits the filter unconditionally and lets the server return BadFilterNotAllowed
|
||||||
|
// if EURange isn't set on the variable. SubscribeAsync's catch-block swallows that
|
||||||
|
// status so other items in the batch still get created.
|
||||||
|
var spec = new MonitoredTagSpec(
|
||||||
|
"ns=2;s=Pct",
|
||||||
|
DataChangeFilter: new DataChangeFilterSpec(
|
||||||
|
Core.Abstractions.DataChangeTrigger.StatusValueTimestamp,
|
||||||
|
Core.Abstractions.DeadbandType.Percent,
|
||||||
|
DeadbandValue: 5.0));
|
||||||
|
var item = OpcUaClientDriver.BuildMonitoredItem(spec, SampleNodeId, defaultIntervalMs: 250);
|
||||||
|
var filter = item.Filter.ShouldBeOfType<DataChangeFilter>();
|
||||||
|
filter.Trigger.ShouldBe(Opc.Ua.DataChangeTrigger.StatusValueTimestamp);
|
||||||
|
filter.DeadbandType.ShouldBe((uint)Opc.Ua.DeadbandType.Percent);
|
||||||
|
filter.DeadbandValue.ShouldBe(5.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(Core.Abstractions.DataChangeTrigger.Status, Opc.Ua.DataChangeTrigger.Status)]
|
||||||
|
[InlineData(Core.Abstractions.DataChangeTrigger.StatusValue, Opc.Ua.DataChangeTrigger.StatusValue)]
|
||||||
|
[InlineData(Core.Abstractions.DataChangeTrigger.StatusValueTimestamp, Opc.Ua.DataChangeTrigger.StatusValueTimestamp)]
|
||||||
|
public void MapTrigger_round_trips_each_enum_value(
|
||||||
|
Core.Abstractions.DataChangeTrigger input, Opc.Ua.DataChangeTrigger expected)
|
||||||
|
=> OpcUaClientDriver.MapTrigger(input).ShouldBe(expected);
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(Core.Abstractions.DeadbandType.None, Opc.Ua.DeadbandType.None)]
|
||||||
|
[InlineData(Core.Abstractions.DeadbandType.Absolute, Opc.Ua.DeadbandType.Absolute)]
|
||||||
|
[InlineData(Core.Abstractions.DeadbandType.Percent, Opc.Ua.DeadbandType.Percent)]
|
||||||
|
public void MapDeadbandType_round_trips_each_enum_value(
|
||||||
|
Core.Abstractions.DeadbandType input, Opc.Ua.DeadbandType expected)
|
||||||
|
=> OpcUaClientDriver.MapDeadbandType(input).ShouldBe(expected);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DefaultInterfaceImplementation_routes_through_legacy_overload()
|
||||||
|
{
|
||||||
|
// ISubscribable's default interface impl of the per-tag overload delegates to the
|
||||||
|
// simple-string overload, ignoring per-tag knobs. Drivers that DON'T override the
|
||||||
|
// new overload (Modbus / S7 / Galaxy / TwinCAT / FOCAS / AbCip / AbLegacy) still
|
||||||
|
// accept MonitoredTagSpec lists and just pass through the tag names — back-compat
|
||||||
|
// for ISubscribable consumers.
|
||||||
|
var stub = new StubSubscribableDriver();
|
||||||
|
var specs = new[]
|
||||||
|
{
|
||||||
|
new MonitoredTagSpec("Tag1", SamplingIntervalMs: 50, QueueSize: 5),
|
||||||
|
new MonitoredTagSpec("Tag2", DataChangeFilter: new DataChangeFilterSpec(
|
||||||
|
Core.Abstractions.DataChangeTrigger.StatusValue,
|
||||||
|
Core.Abstractions.DeadbandType.Absolute,
|
||||||
|
1.0)),
|
||||||
|
};
|
||||||
|
|
||||||
|
ISubscribable iface = stub;
|
||||||
|
_ = await iface.SubscribeAsync(specs, TimeSpan.FromMilliseconds(250), TestContext.Current.CancellationToken);
|
||||||
|
|
||||||
|
stub.LastTagNames.ShouldBe(["Tag1", "Tag2"]);
|
||||||
|
stub.LastPublishingInterval.ShouldBe(TimeSpan.FromMilliseconds(250));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Test-double <see cref="ISubscribable"/> that records whatever the legacy
|
||||||
|
/// <c>SubscribeAsync(IReadOnlyList<string>, ...)</c> overload was called with.
|
||||||
|
/// Used to verify the default-impl per-tag overload routes correctly without needing
|
||||||
|
/// a real OPC UA session.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class StubSubscribableDriver : ISubscribable
|
||||||
|
{
|
||||||
|
public IReadOnlyList<string>? LastTagNames { get; private set; }
|
||||||
|
public TimeSpan LastPublishingInterval { get; private set; }
|
||||||
|
|
||||||
|
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||||
|
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
LastTagNames = fullReferences;
|
||||||
|
LastPublishingInterval = publishingInterval;
|
||||||
|
return Task.FromResult<ISubscriptionHandle>(new StubHandle());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||||
|
|
||||||
|
#pragma warning disable CS0067 // event never used — the test only asserts the SubscribeAsync call routing
|
||||||
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||||
|
#pragma warning restore CS0067
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record StubHandle() : ISubscriptionHandle
|
||||||
|
{
|
||||||
|
public string DiagnosticId => "stub";
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user