From bb1ab47b68e471f2aa74f2f16994bd6bb6c92687 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 25 Apr 2026 15:53:57 -0400 Subject: [PATCH] =?UTF-8?q?Auto:=20opcuaclient-4=20=E2=80=94=20diagnostics?= =?UTF-8?q?=20counters?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-driver counters surfaced via DriverHealth.Diagnostics for the driver-diagnostics RPC. New OpcUaClientDiagnostics tracks PublishRequestCount, NotificationCount, NotificationsPerSecond (5s-half-life EWMA), MissingPublishRequestCount, DroppedNotificationCount, SessionResetCount and LastReconnectUtcTicks via Interlocked on the hot path. DriverHealth gains an optional IReadOnlyDictionary? Diagnostics parameter (defaulted null for back-compat with the seven other drivers' constructors). OpcUaClientDriver wires Session.Notification + Session.PublishError on connect and on reconnect-complete (recording a session-reset there); GetHealth snapshots the counters on every poll so the RPC sees fresh values without a tick source. Tests: 11 new OpcUaClientDiagnosticsTests cover counter increments, EWMA convergence, snapshot shape, GetHealth integration, and DriverHealth back-compat. Full OpcUaClient.Tests 115/115 green. Closes #276 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../DriverHealth.cs | 18 +- .../OpcUaClientDiagnostics.cs | 134 ++++++++++++++ .../OpcUaClientDriver.cs | 105 ++++++++++- .../OpcUaClientDiagnosticsTests.cs | 163 ++++++++++++++++++ 4 files changed, 418 insertions(+), 2 deletions(-) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDiagnostics.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientDiagnosticsTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverHealth.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverHealth.cs index 4ad1b3a..830a5c5 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverHealth.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverHealth.cs @@ -7,10 +7,26 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; /// Current driver-instance state. /// Timestamp of the most recent successful equipment read; null if never. /// Most recent error message; null when state is Healthy. +/// +/// Optional driver-attributable counters/metrics surfaced for the driver-diagnostics +/// RPC (introduced for Modbus task #154). Drivers populate the dictionary with stable, +/// well-known keys (e.g. PublishRequestCount, NotificationsPerSecond); +/// Core treats it as opaque metadata. Defaulted to an empty read-only dictionary so +/// existing drivers and call-sites that don't construct this field stay back-compat. +/// public sealed record DriverHealth( DriverState State, DateTime? LastSuccessfulRead, - string? LastError); + string? LastError, + IReadOnlyDictionary? Diagnostics = null) +{ + /// Driver-attributable counters, empty when the driver doesn't surface any. + public IReadOnlyDictionary DiagnosticsOrEmpty + => Diagnostics ?? EmptyDiagnostics; + + private static readonly IReadOnlyDictionary EmptyDiagnostics + = new Dictionary(0); +} /// Driver-instance lifecycle state. public enum DriverState diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDiagnostics.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDiagnostics.cs new file mode 100644 index 0000000..2b4db85 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDiagnostics.cs @@ -0,0 +1,134 @@ +using System.Collections.Generic; +using System.Threading; + +namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient; + +/// +/// Per-driver counters surfaced via +/// for the driver-diagnostics RPC (task #276). Hot-path increments use +/// so they're lock-free; the read path snapshots into a +/// keyed by stable counter names. +/// +/// +/// The counters are operational metrics, not config — they reset to zero when the +/// driver instance is recreated (Reinitialize tear-down + rebuild) and there is no +/// persistence across process restarts. NotificationsPerSecond is a simple decay-EWMA +/// so a quiet subscription doesn't latch the value at the last burst rate. +/// +internal sealed class OpcUaClientDiagnostics +{ + // ---- Hot-path counters (Interlocked) ---- + + private long _publishRequestCount; + private long _notificationCount; + private long _missingPublishRequestCount; + private long _droppedNotificationCount; + private long _sessionResetCount; + + // ---- EWMA state for NotificationsPerSecond ---- + // + // Use ticks (long) for the timestamp so we can swap atomically. The rate is a double + // updated under a tight lock — the EWMA arithmetic (load, blend, store) isn't naturally + // atomic on doubles, and the spinlock is held only for arithmetic so contention is + // bounded. A subscription firing at 10 kHz with one driver instance is dominated by + // the SDK's notification path, not this lock. + private readonly object _ewmaLock = new(); + private double _notificationsPerSecond; + private long _lastNotificationTicks; + + /// Half-life ~5 seconds — recent activity dominates but a paused subscription decays toward zero. + private static readonly TimeSpan EwmaHalfLife = TimeSpan.FromSeconds(5); + + // ---- Reconnect state (lock-free, single-writer in OnReconnectComplete) ---- + private long _lastReconnectUtcTicks; + + public long PublishRequestCount => Interlocked.Read(ref _publishRequestCount); + public long NotificationCount => Interlocked.Read(ref _notificationCount); + public long MissingPublishRequestCount => Interlocked.Read(ref _missingPublishRequestCount); + public long DroppedNotificationCount => Interlocked.Read(ref _droppedNotificationCount); + public long SessionResetCount => Interlocked.Read(ref _sessionResetCount); + + public DateTime? LastReconnectUtc + { + get + { + var ticks = Interlocked.Read(ref _lastReconnectUtcTicks); + return ticks == 0 ? null : new DateTime(ticks, DateTimeKind.Utc); + } + } + + public double NotificationsPerSecond + { + get { lock (_ewmaLock) return _notificationsPerSecond; } + } + + public void IncrementPublishRequest() => Interlocked.Increment(ref _publishRequestCount); + + public void IncrementMissingPublishRequest() => Interlocked.Increment(ref _missingPublishRequestCount); + + public void IncrementDroppedNotification() => Interlocked.Increment(ref _droppedNotificationCount); + + /// Records one delivered notification (any monitored item) + folds the inter-arrival into the EWMA rate. + public void RecordNotification() => RecordNotification(DateTime.UtcNow); + + internal void RecordNotification(DateTime nowUtc) + { + Interlocked.Increment(ref _notificationCount); + + // EWMA over instantaneous rate. instRate = 1 / dt (events per second since last sample). + // Decay factor a = 2^(-dt/halfLife) puts a five-second window on the smoothing — recent + // bursts win, idle periods bleed back to zero. + var nowTicks = nowUtc.Ticks; + lock (_ewmaLock) + { + if (_lastNotificationTicks == 0) + { + _lastNotificationTicks = nowTicks; + // First sample: seed at 0 — we don't know the prior rate. The next sample + // produces a real instRate. + return; + } + var dtTicks = nowTicks - _lastNotificationTicks; + if (dtTicks <= 0) + { + // Same-tick collisions on bursts: treat as no time elapsed for rate purposes + // (count was already incremented above) so we don't divide by zero or feed + // an absurd instRate spike. + return; + } + var dtSeconds = (double)dtTicks / TimeSpan.TicksPerSecond; + var instRate = 1.0 / dtSeconds; + var alpha = System.Math.Pow(0.5, dtSeconds / EwmaHalfLife.TotalSeconds); + _notificationsPerSecond = (alpha * _notificationsPerSecond) + ((1.0 - alpha) * instRate); + _lastNotificationTicks = nowTicks; + } + } + + public void RecordSessionReset(DateTime nowUtc) + { + Interlocked.Increment(ref _sessionResetCount); + Interlocked.Exchange(ref _lastReconnectUtcTicks, nowUtc.Ticks); + } + + /// + /// Snapshot the counters into the dictionary shape + /// surfaces. Numeric-only (so the RPC can render generically); LastReconnectUtc is + /// emitted as ticks to keep the value type uniform. + /// + public IReadOnlyDictionary Snapshot() + { + var dict = new Dictionary(7, System.StringComparer.Ordinal) + { + ["PublishRequestCount"] = PublishRequestCount, + ["NotificationCount"] = NotificationCount, + ["NotificationsPerSecond"] = NotificationsPerSecond, + ["MissingPublishRequestCount"] = MissingPublishRequestCount, + ["DroppedNotificationCount"] = DroppedNotificationCount, + ["SessionResetCount"] = SessionResetCount, + }; + var last = LastReconnectUtc; + if (last is not null) + dict["LastReconnectUtcTicks"] = last.Value.Ticks; + return dict; + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs index cc89175..992ee2a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs @@ -58,6 +58,23 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d private readonly OpcUaClientDriverOptions _options = options; private readonly SemaphoreSlim _gate = new(1, 1); + /// + /// Per-driver diagnostic counters (publish/notification rates, missing-publish, + /// dropped-notification, session-reset). Surfaced through + /// for the driver-diagnostics RPC. + /// Hot-path increments use ; the read path snapshots. + /// + private readonly OpcUaClientDiagnostics _diagnostics = new(); + + /// Test seam — exposes the live counters for unit tests. + internal OpcUaClientDiagnostics DiagnosticsForTest => _diagnostics; + + /// Wired to in ; cached so we can unwire in + on reconnect. + private NotificationEventHandler? _notificationHandler; + + /// Wired to ; cached so we can unwire on reconnect/shutdown. + private PublishErrorEventHandler? _publishErrorHandler; + /// Active OPC UA session. Null until returns cleanly. internal ISession? Session { get; private set; } @@ -151,6 +168,8 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d _keepAliveHandler = OnKeepAlive; session.KeepAlive += _keepAliveHandler; + WireSessionDiagnostics(session); + Session = session; _connectedEndpointUrl = connectedUrl; _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); @@ -447,6 +466,8 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d } _keepAliveHandler = null; + UnwireSessionDiagnostics(Session); + try { if (Session is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); } catch { /* best-effort */ } try { Session?.Dispose(); } catch { } @@ -458,7 +479,14 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); } - public DriverHealth GetHealth() => _health; + public DriverHealth GetHealth() + { + // Snapshot the counters into the optional Diagnostics dictionary on every poll — + // the RPC reads through GetHealth so we can't lazy-cache without a tick source. + // The snapshot is O(7) so the per-poll cost is negligible compared to the RPC plumbing. + var h = _health; + return new DriverHealth(h.State, h.LastSuccessfulRead, h.LastError, _diagnostics.Snapshot()); + } public long GetMemoryFootprint() => 0; public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; @@ -1562,6 +1590,16 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d newSession.KeepAlive += _keepAliveHandler; } + // Move the diagnostic event hooks (Notification + PublishError) onto the new + // session as well so counters keep flowing post-failover. Record this as a + // session-reset for the operator dashboard. + UnwireSessionDiagnostics(oldSession); + if (newSession is not null) + { + WireSessionDiagnostics(newSession); + _diagnostics.RecordSessionReset(DateTime.UtcNow); + } + Session = newSession; // Drop cached OperationLimits so the next batch op refetches against the (potentially // re-redeployed) upstream server. A zero-cost guard against a server whose published @@ -1593,6 +1631,71 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState)); } + /// + /// Wire the diagnostic counters onto the supplied session — every publish-response + /// notification increments NotificationCount + samples the EWMA; + /// distinguishes missing-publish vs other publish + /// faults so operators can see whether the upstream is starving the client of publish + /// slots vs. failing notifications outright. + /// + private void WireSessionDiagnostics(ISession session) + { + _notificationHandler = OnSessionNotification; + _publishErrorHandler = OnSessionPublishError; + session.Notification += _notificationHandler; + session.PublishError += _publishErrorHandler; + } + + private void UnwireSessionDiagnostics(ISession? session) + { + if (session is null) return; + if (_notificationHandler is not null) + { + try { session.Notification -= _notificationHandler; } catch { } + } + if (_publishErrorHandler is not null) + { + try { session.PublishError -= _publishErrorHandler; } catch { } + } + _notificationHandler = null; + _publishErrorHandler = null; + } + + private void OnSessionNotification(ISession session, NotificationEventArgs e) + { + // Each publish response carries one NotificationMessage with N data-change / + // event notifications. Track both cardinalities: PublishRequestCount counts + // server publish responses delivered to us; NotificationCount counts the + // individual MonitoredItem changes inside them. The difference matters when + // diagnosing "many publishes, few changes" vs "few publishes, large bursts". + _diagnostics.IncrementPublishRequest(); + var msg = e.NotificationMessage; + if (msg?.NotificationData is { Count: > 0 } data) + { + for (var i = 0; i < data.Count; i++) + { + _diagnostics.RecordNotification(); + } + } + } + + private void OnSessionPublishError(ISession session, PublishErrorEventArgs e) + { + // BadNoSubscription / BadSequenceNumberUnknown / BadMessageNotAvailable all surface + // as "the server expected to publish but couldn't" — bucket them as missing-publish + // for the operator. Other status codes (timeout, network) are dropped notifications. + var sc = e.Status?.StatusCode; + if (sc.HasValue && IsMissingPublishStatus(sc.Value)) + _diagnostics.IncrementMissingPublishRequest(); + else + _diagnostics.IncrementDroppedNotification(); + } + + private static bool IsMissingPublishStatus(StatusCode sc) => + sc.Code == StatusCodes.BadNoSubscription + || sc.Code == StatusCodes.BadSequenceNumberUnknown + || sc.Code == StatusCodes.BadMessageNotAvailable; + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientDiagnosticsTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientDiagnosticsTests.cs new file mode 100644 index 0000000..b1d6448 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientDiagnosticsTests.cs @@ -0,0 +1,163 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests; + +/// +/// Unit tests for the per-driver diagnostic counters surfaced via +/// for the driver-diagnostics RPC +/// (task #276). Counters are exercised directly through the internal helper rather +/// than via a live SDK ISession because the SDK requires a connected upstream +/// to publish events and we want unit-level coverage of the math + the snapshot shape. +/// +[Trait("Category", "Unit")] +public sealed class OpcUaClientDiagnosticsTests +{ + [Fact] + public void Counters_default_to_zero() + { + var d = new OpcUaClientDiagnostics(); + d.PublishRequestCount.ShouldBe(0); + d.NotificationCount.ShouldBe(0); + d.NotificationsPerSecond.ShouldBe(0); + d.MissingPublishRequestCount.ShouldBe(0); + d.DroppedNotificationCount.ShouldBe(0); + d.SessionResetCount.ShouldBe(0); + d.LastReconnectUtc.ShouldBeNull(); + } + + [Fact] + public void IncrementPublishRequest_bumps_total() + { + var d = new OpcUaClientDiagnostics(); + d.IncrementPublishRequest(); + d.IncrementPublishRequest(); + d.IncrementPublishRequest(); + d.PublishRequestCount.ShouldBe(3); + } + + [Fact] + public void IncrementMissingPublishRequest_bumps_total() + { + var d = new OpcUaClientDiagnostics(); + d.IncrementMissingPublishRequest(); + d.IncrementMissingPublishRequest(); + d.MissingPublishRequestCount.ShouldBe(2); + } + + [Fact] + public void IncrementDroppedNotification_bumps_total() + { + var d = new OpcUaClientDiagnostics(); + d.IncrementDroppedNotification(); + d.DroppedNotificationCount.ShouldBe(1); + } + + [Fact] + public void RecordNotification_grows_count_and_then_rate() + { + var d = new OpcUaClientDiagnostics(); + var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); + // First sample seeds the EWMA — rate stays 0 until we have a delta. + d.RecordNotification(t0); + d.NotificationCount.ShouldBe(1); + d.NotificationsPerSecond.ShouldBe(0); + + // 1 Hz steady state: 30 samples spaced 1s apart converge toward 1/s. With 5s half-life + // and alpha=0.5^(1/5)≈0.871, the EWMA approaches 1 - alpha^N — after 30 samples that's + // 1 - 0.871^30 ≈ 0.984. + for (var i = 1; i <= 30; i++) + d.RecordNotification(t0.AddSeconds(i)); + + d.NotificationCount.ShouldBe(31); + d.NotificationsPerSecond.ShouldBeInRange(0.95, 1.05, "EWMA at 5s half-life converges to ~1Hz after 30 samples"); + } + + [Fact] + public void RecordSessionReset_bumps_count_and_sets_last_reconnect() + { + var d = new OpcUaClientDiagnostics(); + var t = new DateTime(2026, 4, 25, 12, 34, 56, DateTimeKind.Utc); + d.RecordSessionReset(t); + d.SessionResetCount.ShouldBe(1); + d.LastReconnectUtc.ShouldBe(t); + + // Second reset overwrites timestamp + bumps count. + var t2 = t.AddMinutes(5); + d.RecordSessionReset(t2); + d.SessionResetCount.ShouldBe(2); + d.LastReconnectUtc.ShouldBe(t2); + } + + [Fact] + public void Snapshot_emits_well_known_keys() + { + var d = new OpcUaClientDiagnostics(); + d.IncrementPublishRequest(); + d.RecordNotification(new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc)); + d.IncrementMissingPublishRequest(); + d.IncrementDroppedNotification(); + d.RecordSessionReset(new DateTime(2026, 4, 25, 0, 0, 0, DateTimeKind.Utc)); + + var snap = d.Snapshot(); + + snap.ShouldContainKey("PublishRequestCount"); + snap["PublishRequestCount"].ShouldBe(1); + snap.ShouldContainKey("NotificationCount"); + snap["NotificationCount"].ShouldBe(1); + snap.ShouldContainKey("NotificationsPerSecond"); + snap.ShouldContainKey("MissingPublishRequestCount"); + snap["MissingPublishRequestCount"].ShouldBe(1); + snap.ShouldContainKey("DroppedNotificationCount"); + snap["DroppedNotificationCount"].ShouldBe(1); + snap.ShouldContainKey("SessionResetCount"); + snap["SessionResetCount"].ShouldBe(1); + snap.ShouldContainKey("LastReconnectUtcTicks"); + } + + [Fact] + public void Snapshot_omits_LastReconnectUtcTicks_when_no_reset_recorded() + { + var d = new OpcUaClientDiagnostics(); + d.Snapshot().ShouldNotContainKey("LastReconnectUtcTicks"); + } + + [Fact] + public void Driver_GetHealth_includes_diagnostics_dictionary() + { + // GetHealth must expose the snapshot to the RPC consumer even before any session + // has been opened — operators call it during startup to check counters baseline. + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "diag-test"); + var health = drv.GetHealth(); + health.Diagnostics.ShouldNotBeNull(); + health.Diagnostics!.ShouldContainKey("PublishRequestCount"); + health.Diagnostics["PublishRequestCount"].ShouldBe(0); + health.Diagnostics.ShouldContainKey("NotificationCount"); + health.Diagnostics.ShouldContainKey("SessionResetCount"); + } + + [Fact] + public void Driver_health_diagnostics_reflect_internal_counters_after_increment() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "diag-test-2"); + + // Drive a counter through the test seam to prove the GetHealth snapshot is live, + // not a one-shot at construction. + drv.DiagnosticsForTest.IncrementPublishRequest(); + drv.DiagnosticsForTest.IncrementPublishRequest(); + + var health = drv.GetHealth(); + health.Diagnostics!["PublishRequestCount"].ShouldBe(2); + } + + [Fact] + public void DriverHealth_default_diagnostics_is_null_but_DiagnosticsOrEmpty_is_empty() + { + // Back-compat: pre-existing call sites that construct DriverHealth with the + // 3-arg overload must keep working — the 4th param defaults to null. + var h = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); + h.Diagnostics.ShouldBeNull(); + h.DiagnosticsOrEmpty.ShouldBeEmpty(); + } +}