Auto: opcuaclient-4 — diagnostics counters

Per-driver counters surfaced via DriverHealth.Diagnostics for the
driver-diagnostics RPC. New OpcUaClientDiagnostics tracks
PublishRequestCount, NotificationCount, NotificationsPerSecond (5s-half-life
EWMA), MissingPublishRequestCount, DroppedNotificationCount,
SessionResetCount and LastReconnectUtcTicks via Interlocked on the hot path.

DriverHealth gains an optional IReadOnlyDictionary<string,double>?
Diagnostics parameter (defaulted null for back-compat with the seven other
drivers' constructors). OpcUaClientDriver wires Session.Notification +
Session.PublishError on connect and on reconnect-complete (recording a
session-reset there); GetHealth snapshots the counters on every poll so the
RPC sees fresh values without a tick source.

Tests: 11 new OpcUaClientDiagnosticsTests cover counter increments, EWMA
convergence, snapshot shape, GetHealth integration, and DriverHealth
back-compat. Full OpcUaClient.Tests 115/115 green.

Closes #276

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-25 15:53:57 -04:00
parent a04ba2af7a
commit bb1ab47b68
4 changed files with 418 additions and 2 deletions

View File

@@ -58,6 +58,23 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
private readonly OpcUaClientDriverOptions _options = options;
private readonly SemaphoreSlim _gate = new(1, 1);
/// <summary>
/// Per-driver diagnostic counters (publish/notification rates, missing-publish,
/// dropped-notification, session-reset). Surfaced through
/// <see cref="DriverHealth.Diagnostics"/> for the <c>driver-diagnostics</c> RPC.
/// Hot-path increments use <see cref="Interlocked"/>; the read path snapshots.
/// </summary>
private readonly OpcUaClientDiagnostics _diagnostics = new();
/// <summary>Test seam — exposes the live counters for unit tests.</summary>
internal OpcUaClientDiagnostics DiagnosticsForTest => _diagnostics;
/// <summary>Wired to <see cref="ISession.Notification"/> in <see cref="WireSessionDiagnostics"/>; cached so we can unwire in <see cref="ShutdownAsync"/> + on reconnect.</summary>
private NotificationEventHandler? _notificationHandler;
/// <summary>Wired to <see cref="ISession.PublishError"/>; cached so we can unwire on reconnect/shutdown.</summary>
private PublishErrorEventHandler? _publishErrorHandler;
/// <summary>Active OPC UA session. Null until <see cref="InitializeAsync"/> returns cleanly.</summary>
internal ISession? Session { get; private set; }
@@ -151,6 +168,8 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
_keepAliveHandler = OnKeepAlive;
session.KeepAlive += _keepAliveHandler;
WireSessionDiagnostics(session);
Session = session;
_connectedEndpointUrl = connectedUrl;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
@@ -447,6 +466,8 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
}
_keepAliveHandler = null;
UnwireSessionDiagnostics(Session);
try { if (Session is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); }
catch { /* best-effort */ }
try { Session?.Dispose(); } catch { }
@@ -458,7 +479,14 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
public DriverHealth GetHealth() => _health;
public DriverHealth GetHealth()
{
// Snapshot the counters into the optional Diagnostics dictionary on every poll —
// the RPC reads through GetHealth so we can't lazy-cache without a tick source.
// The snapshot is O(7) so the per-poll cost is negligible compared to the RPC plumbing.
var h = _health;
return new DriverHealth(h.State, h.LastSuccessfulRead, h.LastError, _diagnostics.Snapshot());
}
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
@@ -1562,6 +1590,16 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
newSession.KeepAlive += _keepAliveHandler;
}
// Move the diagnostic event hooks (Notification + PublishError) onto the new
// session as well so counters keep flowing post-failover. Record this as a
// session-reset for the operator dashboard.
UnwireSessionDiagnostics(oldSession);
if (newSession is not null)
{
WireSessionDiagnostics(newSession);
_diagnostics.RecordSessionReset(DateTime.UtcNow);
}
Session = newSession;
// Drop cached OperationLimits so the next batch op refetches against the (potentially
// re-redeployed) upstream server. A zero-cost guard against a server whose published
@@ -1593,6 +1631,71 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
}
/// <summary>
/// Wire the diagnostic counters onto the supplied session — every publish-response
/// notification increments <c>NotificationCount</c> + samples the EWMA;
/// <see cref="ISession.PublishError"/> distinguishes missing-publish vs other publish
/// faults so operators can see whether the upstream is starving the client of publish
/// slots vs. failing notifications outright.
/// </summary>
private void WireSessionDiagnostics(ISession session)
{
_notificationHandler = OnSessionNotification;
_publishErrorHandler = OnSessionPublishError;
session.Notification += _notificationHandler;
session.PublishError += _publishErrorHandler;
}
private void UnwireSessionDiagnostics(ISession? session)
{
if (session is null) return;
if (_notificationHandler is not null)
{
try { session.Notification -= _notificationHandler; } catch { }
}
if (_publishErrorHandler is not null)
{
try { session.PublishError -= _publishErrorHandler; } catch { }
}
_notificationHandler = null;
_publishErrorHandler = null;
}
private void OnSessionNotification(ISession session, NotificationEventArgs e)
{
// Each publish response carries one NotificationMessage with N data-change /
// event notifications. Track both cardinalities: PublishRequestCount counts
// server publish responses delivered to us; NotificationCount counts the
// individual MonitoredItem changes inside them. The difference matters when
// diagnosing "many publishes, few changes" vs "few publishes, large bursts".
_diagnostics.IncrementPublishRequest();
var msg = e.NotificationMessage;
if (msg?.NotificationData is { Count: > 0 } data)
{
for (var i = 0; i < data.Count; i++)
{
_diagnostics.RecordNotification();
}
}
}
private void OnSessionPublishError(ISession session, PublishErrorEventArgs e)
{
// BadNoSubscription / BadSequenceNumberUnknown / BadMessageNotAvailable all surface
// as "the server expected to publish but couldn't" — bucket them as missing-publish
// for the operator. Other status codes (timeout, network) are dropped notifications.
var sc = e.Status?.StatusCode;
if (sc.HasValue && IsMissingPublishStatus(sc.Value))
_diagnostics.IncrementMissingPublishRequest();
else
_diagnostics.IncrementDroppedNotification();
}
private static bool IsMissingPublishStatus(StatusCode sc) =>
sc.Code == StatusCodes.BadNoSubscription
|| sc.Code == StatusCodes.BadSequenceNumberUnknown
|| sc.Code == StatusCodes.BadMessageNotAvailable;
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public async ValueTask DisposeAsync()