diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverHealth.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverHealth.cs
index 4ad1b3a..830a5c5 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverHealth.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverHealth.cs
@@ -7,10 +7,26 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// Current driver-instance state.
/// Timestamp of the most recent successful equipment read; null if never.
/// Most recent error message; null when state is Healthy.
+///
+/// Optional driver-attributable counters/metrics surfaced for the driver-diagnostics
+/// RPC (introduced for Modbus task #154). Drivers populate the dictionary with stable,
+/// well-known keys (e.g. PublishRequestCount, NotificationsPerSecond);
+/// Core treats it as opaque metadata. Defaulted to an empty read-only dictionary so
+/// existing drivers and call-sites that don't construct this field stay back-compat.
+///
public sealed record DriverHealth(
DriverState State,
DateTime? LastSuccessfulRead,
- string? LastError);
+ string? LastError,
+ IReadOnlyDictionary? Diagnostics = null)
+{
+ /// Driver-attributable counters, empty when the driver doesn't surface any.
+ public IReadOnlyDictionary DiagnosticsOrEmpty
+ => Diagnostics ?? EmptyDiagnostics;
+
+ private static readonly IReadOnlyDictionary EmptyDiagnostics
+ = new Dictionary(0);
+}
/// Driver-instance lifecycle state.
public enum DriverState
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDiagnostics.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDiagnostics.cs
new file mode 100644
index 0000000..2b4db85
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDiagnostics.cs
@@ -0,0 +1,134 @@
+using System.Collections.Generic;
+using System.Threading;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
+
+///
+/// Per-driver counters surfaced via
+/// for the driver-diagnostics RPC (task #276). Hot-path increments use
+/// so they're lock-free; the read path snapshots into a
+/// keyed by stable counter names.
+///
+///
+/// The counters are operational metrics, not config — they reset to zero when the
+/// driver instance is recreated (Reinitialize tear-down + rebuild) and there is no
+/// persistence across process restarts. NotificationsPerSecond is a simple decay-EWMA
+/// so a quiet subscription doesn't latch the value at the last burst rate.
+///
+internal sealed class OpcUaClientDiagnostics
+{
+ // ---- Hot-path counters (Interlocked) ----
+
+ private long _publishRequestCount;
+ private long _notificationCount;
+ private long _missingPublishRequestCount;
+ private long _droppedNotificationCount;
+ private long _sessionResetCount;
+
+ // ---- EWMA state for NotificationsPerSecond ----
+ //
+ // Use ticks (long) for the timestamp so we can swap atomically. The rate is a double
+ // updated under a tight lock — the EWMA arithmetic (load, blend, store) isn't naturally
+ // atomic on doubles, and the spinlock is held only for arithmetic so contention is
+ // bounded. A subscription firing at 10 kHz with one driver instance is dominated by
+ // the SDK's notification path, not this lock.
+ private readonly object _ewmaLock = new();
+ private double _notificationsPerSecond;
+ private long _lastNotificationTicks;
+
+ /// Half-life ~5 seconds — recent activity dominates but a paused subscription decays toward zero.
+ private static readonly TimeSpan EwmaHalfLife = TimeSpan.FromSeconds(5);
+
+ // ---- Reconnect state (lock-free, single-writer in OnReconnectComplete) ----
+ private long _lastReconnectUtcTicks;
+
+ public long PublishRequestCount => Interlocked.Read(ref _publishRequestCount);
+ public long NotificationCount => Interlocked.Read(ref _notificationCount);
+ public long MissingPublishRequestCount => Interlocked.Read(ref _missingPublishRequestCount);
+ public long DroppedNotificationCount => Interlocked.Read(ref _droppedNotificationCount);
+ public long SessionResetCount => Interlocked.Read(ref _sessionResetCount);
+
+ public DateTime? LastReconnectUtc
+ {
+ get
+ {
+ var ticks = Interlocked.Read(ref _lastReconnectUtcTicks);
+ return ticks == 0 ? null : new DateTime(ticks, DateTimeKind.Utc);
+ }
+ }
+
+ public double NotificationsPerSecond
+ {
+ get { lock (_ewmaLock) return _notificationsPerSecond; }
+ }
+
+ public void IncrementPublishRequest() => Interlocked.Increment(ref _publishRequestCount);
+
+ public void IncrementMissingPublishRequest() => Interlocked.Increment(ref _missingPublishRequestCount);
+
+ public void IncrementDroppedNotification() => Interlocked.Increment(ref _droppedNotificationCount);
+
+ /// Records one delivered notification (any monitored item) + folds the inter-arrival into the EWMA rate.
+ public void RecordNotification() => RecordNotification(DateTime.UtcNow);
+
+ internal void RecordNotification(DateTime nowUtc)
+ {
+ Interlocked.Increment(ref _notificationCount);
+
+ // EWMA over instantaneous rate. instRate = 1 / dt (events per second since last sample).
+ // Decay factor a = 2^(-dt/halfLife) puts a five-second window on the smoothing — recent
+ // bursts win, idle periods bleed back to zero.
+ var nowTicks = nowUtc.Ticks;
+ lock (_ewmaLock)
+ {
+ if (_lastNotificationTicks == 0)
+ {
+ _lastNotificationTicks = nowTicks;
+ // First sample: seed at 0 — we don't know the prior rate. The next sample
+ // produces a real instRate.
+ return;
+ }
+ var dtTicks = nowTicks - _lastNotificationTicks;
+ if (dtTicks <= 0)
+ {
+ // Same-tick collisions on bursts: treat as no time elapsed for rate purposes
+ // (count was already incremented above) so we don't divide by zero or feed
+ // an absurd instRate spike.
+ return;
+ }
+ var dtSeconds = (double)dtTicks / TimeSpan.TicksPerSecond;
+ var instRate = 1.0 / dtSeconds;
+ var alpha = System.Math.Pow(0.5, dtSeconds / EwmaHalfLife.TotalSeconds);
+ _notificationsPerSecond = (alpha * _notificationsPerSecond) + ((1.0 - alpha) * instRate);
+ _lastNotificationTicks = nowTicks;
+ }
+ }
+
+ public void RecordSessionReset(DateTime nowUtc)
+ {
+ Interlocked.Increment(ref _sessionResetCount);
+ Interlocked.Exchange(ref _lastReconnectUtcTicks, nowUtc.Ticks);
+ }
+
+ ///
+ /// Snapshot the counters into the dictionary shape
+ /// surfaces. Numeric-only (so the RPC can render generically); LastReconnectUtc is
+ /// emitted as ticks to keep the value type uniform.
+ ///
+ public IReadOnlyDictionary Snapshot()
+ {
+ var dict = new Dictionary(7, System.StringComparer.Ordinal)
+ {
+ ["PublishRequestCount"] = PublishRequestCount,
+ ["NotificationCount"] = NotificationCount,
+ ["NotificationsPerSecond"] = NotificationsPerSecond,
+ ["MissingPublishRequestCount"] = MissingPublishRequestCount,
+ ["DroppedNotificationCount"] = DroppedNotificationCount,
+ ["SessionResetCount"] = SessionResetCount,
+ };
+ var last = LastReconnectUtc;
+ if (last is not null)
+ dict["LastReconnectUtcTicks"] = last.Value.Ticks;
+ return dict;
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
index cc89175..992ee2a 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
@@ -58,6 +58,23 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
private readonly OpcUaClientDriverOptions _options = options;
private readonly SemaphoreSlim _gate = new(1, 1);
+ ///
+ /// Per-driver diagnostic counters (publish/notification rates, missing-publish,
+ /// dropped-notification, session-reset). Surfaced through
+ /// for the driver-diagnostics RPC.
+ /// Hot-path increments use ; the read path snapshots.
+ ///
+ private readonly OpcUaClientDiagnostics _diagnostics = new();
+
+ /// Test seam — exposes the live counters for unit tests.
+ internal OpcUaClientDiagnostics DiagnosticsForTest => _diagnostics;
+
+ /// Wired to in ; cached so we can unwire in + on reconnect.
+ private NotificationEventHandler? _notificationHandler;
+
+ /// Wired to ; cached so we can unwire on reconnect/shutdown.
+ private PublishErrorEventHandler? _publishErrorHandler;
+
/// Active OPC UA session. Null until returns cleanly.
internal ISession? Session { get; private set; }
@@ -151,6 +168,8 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
_keepAliveHandler = OnKeepAlive;
session.KeepAlive += _keepAliveHandler;
+ WireSessionDiagnostics(session);
+
Session = session;
_connectedEndpointUrl = connectedUrl;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
@@ -447,6 +466,8 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
}
_keepAliveHandler = null;
+ UnwireSessionDiagnostics(Session);
+
try { if (Session is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); }
catch { /* best-effort */ }
try { Session?.Dispose(); } catch { }
@@ -458,7 +479,14 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
- public DriverHealth GetHealth() => _health;
+ public DriverHealth GetHealth()
+ {
+ // Snapshot the counters into the optional Diagnostics dictionary on every poll —
+ // the RPC reads through GetHealth so we can't lazy-cache without a tick source.
+ // The snapshot is O(7) so the per-poll cost is negligible compared to the RPC plumbing.
+ var h = _health;
+ return new DriverHealth(h.State, h.LastSuccessfulRead, h.LastError, _diagnostics.Snapshot());
+ }
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
@@ -1562,6 +1590,16 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
newSession.KeepAlive += _keepAliveHandler;
}
+ // Move the diagnostic event hooks (Notification + PublishError) onto the new
+ // session as well so counters keep flowing post-failover. Record this as a
+ // session-reset for the operator dashboard.
+ UnwireSessionDiagnostics(oldSession);
+ if (newSession is not null)
+ {
+ WireSessionDiagnostics(newSession);
+ _diagnostics.RecordSessionReset(DateTime.UtcNow);
+ }
+
Session = newSession;
// Drop cached OperationLimits so the next batch op refetches against the (potentially
// re-redeployed) upstream server. A zero-cost guard against a server whose published
@@ -1593,6 +1631,71 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
}
+ ///
+ /// Wire the diagnostic counters onto the supplied session — every publish-response
+ /// notification increments NotificationCount + samples the EWMA;
+ /// distinguishes missing-publish vs other publish
+ /// faults so operators can see whether the upstream is starving the client of publish
+ /// slots vs. failing notifications outright.
+ ///
+ private void WireSessionDiagnostics(ISession session)
+ {
+ _notificationHandler = OnSessionNotification;
+ _publishErrorHandler = OnSessionPublishError;
+ session.Notification += _notificationHandler;
+ session.PublishError += _publishErrorHandler;
+ }
+
+ private void UnwireSessionDiagnostics(ISession? session)
+ {
+ if (session is null) return;
+ if (_notificationHandler is not null)
+ {
+ try { session.Notification -= _notificationHandler; } catch { }
+ }
+ if (_publishErrorHandler is not null)
+ {
+ try { session.PublishError -= _publishErrorHandler; } catch { }
+ }
+ _notificationHandler = null;
+ _publishErrorHandler = null;
+ }
+
+ private void OnSessionNotification(ISession session, NotificationEventArgs e)
+ {
+ // Each publish response carries one NotificationMessage with N data-change /
+ // event notifications. Track both cardinalities: PublishRequestCount counts
+ // server publish responses delivered to us; NotificationCount counts the
+ // individual MonitoredItem changes inside them. The difference matters when
+ // diagnosing "many publishes, few changes" vs "few publishes, large bursts".
+ _diagnostics.IncrementPublishRequest();
+ var msg = e.NotificationMessage;
+ if (msg?.NotificationData is { Count: > 0 } data)
+ {
+ for (var i = 0; i < data.Count; i++)
+ {
+ _diagnostics.RecordNotification();
+ }
+ }
+ }
+
+ private void OnSessionPublishError(ISession session, PublishErrorEventArgs e)
+ {
+ // BadNoSubscription / BadSequenceNumberUnknown / BadMessageNotAvailable all surface
+ // as "the server expected to publish but couldn't" — bucket them as missing-publish
+ // for the operator. Other status codes (timeout, network) are dropped notifications.
+ var sc = e.Status?.StatusCode;
+ if (sc.HasValue && IsMissingPublishStatus(sc.Value))
+ _diagnostics.IncrementMissingPublishRequest();
+ else
+ _diagnostics.IncrementDroppedNotification();
+ }
+
+ private static bool IsMissingPublishStatus(StatusCode sc) =>
+ sc.Code == StatusCodes.BadNoSubscription
+ || sc.Code == StatusCodes.BadSequenceNumberUnknown
+ || sc.Code == StatusCodes.BadMessageNotAvailable;
+
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public async ValueTask DisposeAsync()
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientDiagnosticsTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientDiagnosticsTests.cs
new file mode 100644
index 0000000..b1d6448
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientDiagnosticsTests.cs
@@ -0,0 +1,163 @@
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
+
+///
+/// Unit tests for the per-driver diagnostic counters surfaced via
+/// for the driver-diagnostics RPC
+/// (task #276). Counters are exercised directly through the internal helper rather
+/// than via a live SDK ISession because the SDK requires a connected upstream
+/// to publish events and we want unit-level coverage of the math + the snapshot shape.
+///
+[Trait("Category", "Unit")]
+public sealed class OpcUaClientDiagnosticsTests
+{
+ [Fact]
+ public void Counters_default_to_zero()
+ {
+ var d = new OpcUaClientDiagnostics();
+ d.PublishRequestCount.ShouldBe(0);
+ d.NotificationCount.ShouldBe(0);
+ d.NotificationsPerSecond.ShouldBe(0);
+ d.MissingPublishRequestCount.ShouldBe(0);
+ d.DroppedNotificationCount.ShouldBe(0);
+ d.SessionResetCount.ShouldBe(0);
+ d.LastReconnectUtc.ShouldBeNull();
+ }
+
+ [Fact]
+ public void IncrementPublishRequest_bumps_total()
+ {
+ var d = new OpcUaClientDiagnostics();
+ d.IncrementPublishRequest();
+ d.IncrementPublishRequest();
+ d.IncrementPublishRequest();
+ d.PublishRequestCount.ShouldBe(3);
+ }
+
+ [Fact]
+ public void IncrementMissingPublishRequest_bumps_total()
+ {
+ var d = new OpcUaClientDiagnostics();
+ d.IncrementMissingPublishRequest();
+ d.IncrementMissingPublishRequest();
+ d.MissingPublishRequestCount.ShouldBe(2);
+ }
+
+ [Fact]
+ public void IncrementDroppedNotification_bumps_total()
+ {
+ var d = new OpcUaClientDiagnostics();
+ d.IncrementDroppedNotification();
+ d.DroppedNotificationCount.ShouldBe(1);
+ }
+
+ [Fact]
+ public void RecordNotification_grows_count_and_then_rate()
+ {
+ var d = new OpcUaClientDiagnostics();
+ var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+ // First sample seeds the EWMA — rate stays 0 until we have a delta.
+ d.RecordNotification(t0);
+ d.NotificationCount.ShouldBe(1);
+ d.NotificationsPerSecond.ShouldBe(0);
+
+ // 1 Hz steady state: 30 samples spaced 1s apart converge toward 1/s. With 5s half-life
+ // and alpha=0.5^(1/5)≈0.871, the EWMA approaches 1 - alpha^N — after 30 samples that's
+ // 1 - 0.871^30 ≈ 0.984.
+ for (var i = 1; i <= 30; i++)
+ d.RecordNotification(t0.AddSeconds(i));
+
+ d.NotificationCount.ShouldBe(31);
+ d.NotificationsPerSecond.ShouldBeInRange(0.95, 1.05, "EWMA at 5s half-life converges to ~1Hz after 30 samples");
+ }
+
+ [Fact]
+ public void RecordSessionReset_bumps_count_and_sets_last_reconnect()
+ {
+ var d = new OpcUaClientDiagnostics();
+ var t = new DateTime(2026, 4, 25, 12, 34, 56, DateTimeKind.Utc);
+ d.RecordSessionReset(t);
+ d.SessionResetCount.ShouldBe(1);
+ d.LastReconnectUtc.ShouldBe(t);
+
+ // Second reset overwrites timestamp + bumps count.
+ var t2 = t.AddMinutes(5);
+ d.RecordSessionReset(t2);
+ d.SessionResetCount.ShouldBe(2);
+ d.LastReconnectUtc.ShouldBe(t2);
+ }
+
+ [Fact]
+ public void Snapshot_emits_well_known_keys()
+ {
+ var d = new OpcUaClientDiagnostics();
+ d.IncrementPublishRequest();
+ d.RecordNotification(new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc));
+ d.IncrementMissingPublishRequest();
+ d.IncrementDroppedNotification();
+ d.RecordSessionReset(new DateTime(2026, 4, 25, 0, 0, 0, DateTimeKind.Utc));
+
+ var snap = d.Snapshot();
+
+ snap.ShouldContainKey("PublishRequestCount");
+ snap["PublishRequestCount"].ShouldBe(1);
+ snap.ShouldContainKey("NotificationCount");
+ snap["NotificationCount"].ShouldBe(1);
+ snap.ShouldContainKey("NotificationsPerSecond");
+ snap.ShouldContainKey("MissingPublishRequestCount");
+ snap["MissingPublishRequestCount"].ShouldBe(1);
+ snap.ShouldContainKey("DroppedNotificationCount");
+ snap["DroppedNotificationCount"].ShouldBe(1);
+ snap.ShouldContainKey("SessionResetCount");
+ snap["SessionResetCount"].ShouldBe(1);
+ snap.ShouldContainKey("LastReconnectUtcTicks");
+ }
+
+ [Fact]
+ public void Snapshot_omits_LastReconnectUtcTicks_when_no_reset_recorded()
+ {
+ var d = new OpcUaClientDiagnostics();
+ d.Snapshot().ShouldNotContainKey("LastReconnectUtcTicks");
+ }
+
+ [Fact]
+ public void Driver_GetHealth_includes_diagnostics_dictionary()
+ {
+ // GetHealth must expose the snapshot to the RPC consumer even before any session
+ // has been opened — operators call it during startup to check counters baseline.
+ using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "diag-test");
+ var health = drv.GetHealth();
+ health.Diagnostics.ShouldNotBeNull();
+ health.Diagnostics!.ShouldContainKey("PublishRequestCount");
+ health.Diagnostics["PublishRequestCount"].ShouldBe(0);
+ health.Diagnostics.ShouldContainKey("NotificationCount");
+ health.Diagnostics.ShouldContainKey("SessionResetCount");
+ }
+
+ [Fact]
+ public void Driver_health_diagnostics_reflect_internal_counters_after_increment()
+ {
+ using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "diag-test-2");
+
+ // Drive a counter through the test seam to prove the GetHealth snapshot is live,
+ // not a one-shot at construction.
+ drv.DiagnosticsForTest.IncrementPublishRequest();
+ drv.DiagnosticsForTest.IncrementPublishRequest();
+
+ var health = drv.GetHealth();
+ health.Diagnostics!["PublishRequestCount"].ShouldBe(2);
+ }
+
+ [Fact]
+ public void DriverHealth_default_diagnostics_is_null_but_DiagnosticsOrEmpty_is_empty()
+ {
+ // Back-compat: pre-existing call sites that construct DriverHealth with the
+ // 3-arg overload must keep working — the 4th param defaults to null.
+ var h = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
+ h.Diagnostics.ShouldBeNull();
+ h.DiagnosticsOrEmpty.ShouldBeEmpty();
+ }
+}