@@ -77,6 +77,50 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
/// <summary>Wired to <see cref="ISession.PublishError"/>; cached so we can unwire on reconnect/shutdown.</summary>
|
||||
private PublishErrorEventHandler? _publishErrorHandler;
|
||||
|
||||
/// <summary>
|
||||
/// Subscription that watches the upstream <c>Server</c> node (<c>i=2253</c>) for
|
||||
/// <c>BaseModelChangeEventType</c> / <c>GeneralModelChangeEventType</c> notifications.
|
||||
/// Created at the end of <see cref="InitializeAsync"/> when
|
||||
/// <see cref="OpcUaClientDriverOptions.WatchModelChanges"/> is <c>true</c>; null
|
||||
/// when the watch is disabled or before init runs.
|
||||
/// </summary>
|
||||
private Subscription? _modelChangeSubscription;
|
||||
|
||||
/// <summary>
|
||||
/// Debounce timer for upstream model-change events. Created lazily on first event
|
||||
/// arrival; reset (Change) on every subsequent event so a burst of N events triggers
|
||||
/// exactly one <c>ReinitializeAsync</c> after the last event in the window.
|
||||
/// </summary>
|
||||
private Timer? _modelChangeDebounceTimer;
|
||||
|
||||
/// <summary>
|
||||
/// Cached driver-config JSON snapshot from the most recent <see cref="InitializeAsync"/>.
|
||||
/// The debounce timer fire path passes this back into <see cref="ReinitializeAsync"/>
|
||||
/// so the re-import uses the same options the operator originally configured.
|
||||
/// </summary>
|
||||
private string? _lastConfigJson;
|
||||
|
||||
/// <summary>
|
||||
/// Test seam — count of debounced re-import invocations the driver has fired. Lets
|
||||
/// unit tests assert the coalescing window without spying on <see cref="ReinitializeAsync"/>.
|
||||
/// </summary>
|
||||
private long _modelChangeReimportCount;
|
||||
internal long ModelChangeReimportCountForTest => Interlocked.Read(ref _modelChangeReimportCount);
|
||||
|
||||
/// <summary>
|
||||
/// Test seam — fired before the actual re-import call so unit tests can assert "the
|
||||
/// driver decided to re-import N times" without standing up a full Initialize loop.
|
||||
/// When non-null, the handler runs <i>instead of</i> calling <see cref="ReinitializeAsync"/>.
|
||||
/// </summary>
|
||||
internal Func<CancellationToken, Task>? ModelChangeReimportHookForTest { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Test seam — drive a synthetic model-change event into the debounce path. Mirrors
|
||||
/// what the SDK's <c>MonitoredItem.Notification</c> wire-up does on a real
|
||||
/// <c>BaseModelChangeEventType</c> arrival.
|
||||
/// </summary>
|
||||
internal void InjectModelChangeForTest() => OnModelChangeNotification();
|
||||
|
||||
/// <summary>Active OPC UA session. Null until <see cref="InitializeAsync"/> returns cleanly.</summary>
|
||||
internal ISession? Session { get; private set; }
|
||||
|
||||
@@ -125,6 +169,10 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
||||
{
|
||||
_health = new DriverHealth(DriverState.Initializing, null, null);
|
||||
// Snapshot the config JSON so the model-change debounce path can hand it back to
|
||||
// ReinitializeAsync without callers needing to re-pass it. Capture before the failover
|
||||
// sweep so a partial-init failure still has the JSON available for the next attempt.
|
||||
_lastConfigJson = driverConfigJson;
|
||||
try
|
||||
{
|
||||
var appConfig = await BuildApplicationConfigurationAsync(cancellationToken).ConfigureAwait(false);
|
||||
@@ -198,6 +246,23 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
_connectedEndpointUrl = connectedUrl;
|
||||
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
||||
TransitionTo(HostState.Running);
|
||||
|
||||
// Watch the upstream Server node for ModelChangeEvent notifications. Best-effort
|
||||
// — if the upstream doesn't expose the event types or rejects the EventFilter the
|
||||
// driver still functions for the existing capability surface. Init shouldn't fail
|
||||
// because the operator's upstream doesn't advertise topology change events.
|
||||
if (_options.WatchModelChanges)
|
||||
{
|
||||
try
|
||||
{
|
||||
await SubscribeModelChangesAsync(session, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// best-effort — silently degrade to no-watch; operators see this through
|
||||
// the absence of re-import on topology change rather than a hard init fail.
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -699,6 +764,19 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
}
|
||||
_alarmSubscriptions.Clear();
|
||||
|
||||
// Tear down the model-change subscription + dispose the debounce timer. A pending
|
||||
// debounce fire that races with shutdown is harmless — the timer callback null-checks
|
||||
// the session before doing any work, and ReinitializeAsync re-acquires _gate which
|
||||
// serializes with the caller of ShutdownAsync.
|
||||
if (_modelChangeSubscription is not null)
|
||||
{
|
||||
try { await _modelChangeSubscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
|
||||
catch { /* best-effort */ }
|
||||
_modelChangeSubscription = null;
|
||||
}
|
||||
try { _modelChangeDebounceTimer?.Dispose(); } catch { }
|
||||
_modelChangeDebounceTimer = null;
|
||||
|
||||
// Abort any in-flight reconnect attempts before touching the session — BeginReconnect's
|
||||
// retry loop holds a reference to the current session and would fight Session.CloseAsync
|
||||
// if left spinning.
|
||||
@@ -2199,6 +2277,159 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
_ => AlarmSeverity.Critical,
|
||||
};
|
||||
|
||||
// ---- ModelChangeEvent watch (PR-10) ----
|
||||
|
||||
/// <summary>
|
||||
/// Create a separate <see cref="Subscription"/> on the upstream session monitoring
|
||||
/// the <c>Server</c> node (<see cref="ObjectIds.Server"/> = <c>i=2253</c>) for
|
||||
/// <c>BaseModelChangeEventType</c> + <c>GeneralModelChangeEventType</c>
|
||||
/// notifications. On any event the driver enqueues a debounced re-import via the
|
||||
/// <see cref="OpcUaClientDriverOptions.ModelChangeDebounce"/> window so a bulk
|
||||
/// topology edit on the upstream doesn't trigger N re-imports back-to-back.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// The subscription is created without acquiring <see cref="_gate"/> because
|
||||
/// <see cref="InitializeAsync"/> is single-threaded with respect to driver
|
||||
/// consumers — no other capability path can touch the session before init returns.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The <see cref="EventFilter"/> selects no fields beyond the standard
|
||||
/// <c>EventType</c> identifier — the driver only needs to know "an event arrived",
|
||||
/// not its payload. Field-less filters are spec-legal and minimize wire chatter.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
private async Task SubscribeModelChangesAsync(ISession session, CancellationToken cancellationToken)
|
||||
{
|
||||
var subDefaults = _options.Subscriptions;
|
||||
var subscription = new Subscription(telemetry: null!, new SubscriptionOptions
|
||||
{
|
||||
DisplayName = "opcua-modelchange-watch",
|
||||
// 1s publish interval — the debounce window collapses bursts; the upstream only
|
||||
// needs to advertise change events, not stream them at high rate.
|
||||
PublishingInterval = 1000,
|
||||
KeepAliveCount = (uint)subDefaults.KeepAliveCount,
|
||||
LifetimeCount = subDefaults.LifetimeCount,
|
||||
MaxNotificationsPerPublish = subDefaults.MaxNotificationsPerPublish,
|
||||
PublishingEnabled = true,
|
||||
Priority = subDefaults.Priority,
|
||||
TimestampsToReturn = TimestampsToReturn.Both,
|
||||
});
|
||||
|
||||
// EventFilter that fires on Base + GeneralModelChangeEventType. We only need a
|
||||
// single SelectClause (EventType) for the notification handler to verify "yes this
|
||||
// is a model-change event" — payload fields like Changes[] are intentionally
|
||||
// ignored because the debounce path always re-imports the full upstream root.
|
||||
var filter = new EventFilter();
|
||||
filter.SelectClauses.Add(new SimpleAttributeOperand
|
||||
{
|
||||
TypeDefinitionId = ObjectTypeIds.BaseEventType,
|
||||
BrowsePath = [new QualifiedName("EventType")],
|
||||
AttributeId = Attributes.Value,
|
||||
});
|
||||
// WhereClause: EventType OfType BaseModelChangeEventType. OPC UA spec defines
|
||||
// GeneralModelChangeEventType as a subtype of BaseModelChangeEventType, so the
|
||||
// OfType filter catches both with a single content-filter element. Without a
|
||||
// WhereClause the subscription would receive every event the Server node fires
|
||||
// (including audit + condition events), which would spam the debounce path.
|
||||
filter.WhereClause = new ContentFilter();
|
||||
var operand = new LiteralOperand { Value = new Variant(ObjectTypeIds.BaseModelChangeEventType) };
|
||||
filter.WhereClause.Push(FilterOperator.OfType, operand);
|
||||
|
||||
session.AddSubscription(subscription);
|
||||
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var eventItem = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
|
||||
{
|
||||
DisplayName = "Server/ModelChangeEvents",
|
||||
StartNodeId = ObjectIds.Server,
|
||||
AttributeId = Attributes.EventNotifier,
|
||||
MonitoringMode = MonitoringMode.Reporting,
|
||||
QueueSize = 100,
|
||||
DiscardOldest = true,
|
||||
Filter = filter,
|
||||
});
|
||||
eventItem.Notification += (_, _) => OnModelChangeNotification();
|
||||
subscription.AddItem(eventItem);
|
||||
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
_modelChangeSubscription = subscription;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Notification entry-point for the upstream ModelChangeEvent watch. Starts the
|
||||
/// debounce timer (or resets it if one is already pending) so that a burst of N
|
||||
/// events triggers exactly one re-import after the window elapses.
|
||||
/// </summary>
|
||||
private void OnModelChangeNotification()
|
||||
{
|
||||
// Lazy-create the timer on first event so the cost is zero for upstream servers
|
||||
// that never advertise topology change events. Timer.Change resets the dueTime
|
||||
// on subsequent calls — that's the entire debounce semantics.
|
||||
var window = (int)_options.ModelChangeDebounce.TotalMilliseconds;
|
||||
if (window < 0) window = 0;
|
||||
|
||||
// Single-instance timer per driver; use lock for create-or-reset transition since
|
||||
// the ISession.Notification path is multi-threaded inside the SDK.
|
||||
lock (_probeLock)
|
||||
{
|
||||
if (_modelChangeDebounceTimer is null)
|
||||
{
|
||||
_modelChangeDebounceTimer = new Timer(
|
||||
callback: _ => _ = OnDebounceFiredAsync(),
|
||||
state: null,
|
||||
dueTime: window,
|
||||
period: System.Threading.Timeout.Infinite);
|
||||
}
|
||||
else
|
||||
{
|
||||
_modelChangeDebounceTimer.Change(window, System.Threading.Timeout.Infinite);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Fires when the debounce window elapses with no further events. Calls the
|
||||
/// re-import path (test hook or <see cref="ReinitializeAsync"/>) under the same
|
||||
/// <see cref="_gate"/> serialization that the rest of the driver uses, so the
|
||||
/// re-import doesn't race with an in-flight read / write / browse.
|
||||
/// </summary>
|
||||
private async Task OnDebounceFiredAsync()
|
||||
{
|
||||
Interlocked.Increment(ref _modelChangeReimportCount);
|
||||
// Test hook bypass — when set the unit tests want to count debounce fires without
|
||||
// standing up a full ReinitializeAsync loop. The hook still serializes on _gate
|
||||
// so the test asserting "no parallel re-imports" sees the same invariant the
|
||||
// production ReinitializeAsync path provides.
|
||||
var hook = ModelChangeReimportHookForTest;
|
||||
if (hook is not null)
|
||||
{
|
||||
await _gate.WaitAsync(CancellationToken.None).ConfigureAwait(false);
|
||||
try { await hook(CancellationToken.None).ConfigureAwait(false); }
|
||||
catch { /* best-effort */ }
|
||||
finally { _gate.Release(); }
|
||||
return;
|
||||
}
|
||||
|
||||
var configJson = _lastConfigJson;
|
||||
if (configJson is null) return;
|
||||
|
||||
// Re-import via ReinitializeAsync. Internally that runs ShutdownAsync +
|
||||
// InitializeAsync; both acquire _gate sub-paths so downstream callers blocked on
|
||||
// the gate see a brief browse-gap (≈ DiscoverAsync duration) but no data
|
||||
// corruption. Failure here is best-effort — the next ModelChangeEvent triggers
|
||||
// another attempt, and the keep-alive watchdog covers permanent upstream loss.
|
||||
try
|
||||
{
|
||||
await ReinitializeAsync(configJson, CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Swallow — operators see the failure through DriverHealth + diagnostics, the
|
||||
// next event re-attempts.
|
||||
}
|
||||
}
|
||||
|
||||
private sealed record RemoteAlarmSubscription(Subscription Subscription, OpcUaAlarmSubscriptionHandle Handle);
|
||||
|
||||
private sealed record OpcUaAlarmSubscriptionHandle(long Id) : IAlarmSubscriptionHandle
|
||||
|
||||
Reference in New Issue
Block a user