Merge pull request '[opcuaclient] OpcUaClient — Diagnostics counters' (#334) from auto/opcuaclient/4 into auto/driver-gaps

This commit was merged in pull request #334.
This commit is contained in:
2026-04-25 15:56:21 -04:00
4 changed files with 418 additions and 2 deletions

View File

@@ -7,10 +7,26 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// <param name="State">Current driver-instance state.</param>
/// <param name="LastSuccessfulRead">Timestamp of the most recent successful equipment read; null if never.</param>
/// <param name="LastError">Most recent error message; null when state is Healthy.</param>
/// <param name="Diagnostics">
/// Optional driver-attributable counters/metrics surfaced for the <c>driver-diagnostics</c>
/// RPC (introduced for Modbus task #154). Drivers populate the dictionary with stable,
/// well-known keys (e.g. <c>PublishRequestCount</c>, <c>NotificationsPerSecond</c>);
/// Core treats it as opaque metadata. Defaulted to an empty read-only dictionary so
/// existing drivers and call-sites that don't construct this field stay back-compat.
/// </param>
public sealed record DriverHealth(
DriverState State,
DateTime? LastSuccessfulRead,
string? LastError);
string? LastError,
IReadOnlyDictionary<string, double>? Diagnostics = null)
{
/// <summary>Driver-attributable counters, empty when the driver doesn't surface any.</summary>
public IReadOnlyDictionary<string, double> DiagnosticsOrEmpty
=> Diagnostics ?? EmptyDiagnostics;
private static readonly IReadOnlyDictionary<string, double> EmptyDiagnostics
= new Dictionary<string, double>(0);
}
/// <summary>Driver-instance lifecycle state.</summary>
public enum DriverState

View File

@@ -0,0 +1,134 @@
using System.Collections.Generic;
using System.Threading;
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
/// <summary>
/// Per-driver counters surfaced via <see cref="Core.Abstractions.DriverHealth.Diagnostics"/>
/// for the <c>driver-diagnostics</c> RPC (task #276). Hot-path increments use
/// <see cref="Interlocked"/> so they're lock-free; the read path snapshots into a
/// <see cref="IReadOnlyDictionary{TKey, TValue}"/> keyed by stable counter names.
/// </summary>
/// <remarks>
/// The counters are operational metrics, not config — they reset to zero when the
/// driver instance is recreated (Reinitialize tear-down + rebuild) and there is no
/// persistence across process restarts. NotificationsPerSecond is a simple decay-EWMA
/// so a quiet subscription doesn't latch the value at the last burst rate.
/// </remarks>
internal sealed class OpcUaClientDiagnostics
{
// ---- Hot-path counters (Interlocked) ----
private long _publishRequestCount;
private long _notificationCount;
private long _missingPublishRequestCount;
private long _droppedNotificationCount;
private long _sessionResetCount;
// ---- EWMA state for NotificationsPerSecond ----
//
// Use ticks (long) for the timestamp so we can swap atomically. The rate is a double
// updated under a tight lock — the EWMA arithmetic (load, blend, store) isn't naturally
// atomic on doubles, and the spinlock is held only for arithmetic so contention is
// bounded. A subscription firing at 10 kHz with one driver instance is dominated by
// the SDK's notification path, not this lock.
private readonly object _ewmaLock = new();
private double _notificationsPerSecond;
private long _lastNotificationTicks;
/// <summary>Half-life ~5 seconds — recent activity dominates but a paused subscription decays toward zero.</summary>
private static readonly TimeSpan EwmaHalfLife = TimeSpan.FromSeconds(5);
// ---- Reconnect state (lock-free, single-writer in OnReconnectComplete) ----
private long _lastReconnectUtcTicks;
public long PublishRequestCount => Interlocked.Read(ref _publishRequestCount);
public long NotificationCount => Interlocked.Read(ref _notificationCount);
public long MissingPublishRequestCount => Interlocked.Read(ref _missingPublishRequestCount);
public long DroppedNotificationCount => Interlocked.Read(ref _droppedNotificationCount);
public long SessionResetCount => Interlocked.Read(ref _sessionResetCount);
public DateTime? LastReconnectUtc
{
get
{
var ticks = Interlocked.Read(ref _lastReconnectUtcTicks);
return ticks == 0 ? null : new DateTime(ticks, DateTimeKind.Utc);
}
}
public double NotificationsPerSecond
{
get { lock (_ewmaLock) return _notificationsPerSecond; }
}
public void IncrementPublishRequest() => Interlocked.Increment(ref _publishRequestCount);
public void IncrementMissingPublishRequest() => Interlocked.Increment(ref _missingPublishRequestCount);
public void IncrementDroppedNotification() => Interlocked.Increment(ref _droppedNotificationCount);
/// <summary>Records one delivered notification (any monitored item) + folds the inter-arrival into the EWMA rate.</summary>
public void RecordNotification() => RecordNotification(DateTime.UtcNow);
internal void RecordNotification(DateTime nowUtc)
{
Interlocked.Increment(ref _notificationCount);
// EWMA over instantaneous rate. instRate = 1 / dt (events per second since last sample).
// Decay factor a = 2^(-dt/halfLife) puts a five-second window on the smoothing — recent
// bursts win, idle periods bleed back to zero.
var nowTicks = nowUtc.Ticks;
lock (_ewmaLock)
{
if (_lastNotificationTicks == 0)
{
_lastNotificationTicks = nowTicks;
// First sample: seed at 0 — we don't know the prior rate. The next sample
// produces a real instRate.
return;
}
var dtTicks = nowTicks - _lastNotificationTicks;
if (dtTicks <= 0)
{
// Same-tick collisions on bursts: treat as no time elapsed for rate purposes
// (count was already incremented above) so we don't divide by zero or feed
// an absurd instRate spike.
return;
}
var dtSeconds = (double)dtTicks / TimeSpan.TicksPerSecond;
var instRate = 1.0 / dtSeconds;
var alpha = System.Math.Pow(0.5, dtSeconds / EwmaHalfLife.TotalSeconds);
_notificationsPerSecond = (alpha * _notificationsPerSecond) + ((1.0 - alpha) * instRate);
_lastNotificationTicks = nowTicks;
}
}
public void RecordSessionReset(DateTime nowUtc)
{
Interlocked.Increment(ref _sessionResetCount);
Interlocked.Exchange(ref _lastReconnectUtcTicks, nowUtc.Ticks);
}
/// <summary>
/// Snapshot the counters into the dictionary shape <see cref="Core.Abstractions.DriverHealth.Diagnostics"/>
/// surfaces. Numeric-only (so the RPC can render generically); LastReconnectUtc is
/// emitted as ticks to keep the value type uniform.
/// </summary>
public IReadOnlyDictionary<string, double> Snapshot()
{
var dict = new Dictionary<string, double>(7, System.StringComparer.Ordinal)
{
["PublishRequestCount"] = PublishRequestCount,
["NotificationCount"] = NotificationCount,
["NotificationsPerSecond"] = NotificationsPerSecond,
["MissingPublishRequestCount"] = MissingPublishRequestCount,
["DroppedNotificationCount"] = DroppedNotificationCount,
["SessionResetCount"] = SessionResetCount,
};
var last = LastReconnectUtc;
if (last is not null)
dict["LastReconnectUtcTicks"] = last.Value.Ticks;
return dict;
}
}

View File

@@ -58,6 +58,23 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
private readonly OpcUaClientDriverOptions _options = options;
private readonly SemaphoreSlim _gate = new(1, 1);
/// <summary>
/// Per-driver diagnostic counters (publish/notification rates, missing-publish,
/// dropped-notification, session-reset). Surfaced through
/// <see cref="DriverHealth.Diagnostics"/> for the <c>driver-diagnostics</c> RPC.
/// Hot-path increments use <see cref="Interlocked"/>; the read path snapshots.
/// </summary>
private readonly OpcUaClientDiagnostics _diagnostics = new();
/// <summary>Test seam — exposes the live counters for unit tests.</summary>
internal OpcUaClientDiagnostics DiagnosticsForTest => _diagnostics;
/// <summary>Wired to <see cref="ISession.Notification"/> in <see cref="WireSessionDiagnostics"/>; cached so we can unwire in <see cref="ShutdownAsync"/> + on reconnect.</summary>
private NotificationEventHandler? _notificationHandler;
/// <summary>Wired to <see cref="ISession.PublishError"/>; cached so we can unwire on reconnect/shutdown.</summary>
private PublishErrorEventHandler? _publishErrorHandler;
/// <summary>Active OPC UA session. Null until <see cref="InitializeAsync"/> returns cleanly.</summary>
internal ISession? Session { get; private set; }
@@ -151,6 +168,8 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
_keepAliveHandler = OnKeepAlive;
session.KeepAlive += _keepAliveHandler;
WireSessionDiagnostics(session);
Session = session;
_connectedEndpointUrl = connectedUrl;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
@@ -447,6 +466,8 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
}
_keepAliveHandler = null;
UnwireSessionDiagnostics(Session);
try { if (Session is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); }
catch { /* best-effort */ }
try { Session?.Dispose(); } catch { }
@@ -458,7 +479,14 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
public DriverHealth GetHealth() => _health;
public DriverHealth GetHealth()
{
// Snapshot the counters into the optional Diagnostics dictionary on every poll —
// the RPC reads through GetHealth so we can't lazy-cache without a tick source.
// The snapshot is O(7) so the per-poll cost is negligible compared to the RPC plumbing.
var h = _health;
return new DriverHealth(h.State, h.LastSuccessfulRead, h.LastError, _diagnostics.Snapshot());
}
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
@@ -1562,6 +1590,16 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
newSession.KeepAlive += _keepAliveHandler;
}
// Move the diagnostic event hooks (Notification + PublishError) onto the new
// session as well so counters keep flowing post-failover. Record this as a
// session-reset for the operator dashboard.
UnwireSessionDiagnostics(oldSession);
if (newSession is not null)
{
WireSessionDiagnostics(newSession);
_diagnostics.RecordSessionReset(DateTime.UtcNow);
}
Session = newSession;
// Drop cached OperationLimits so the next batch op refetches against the (potentially
// re-redeployed) upstream server. A zero-cost guard against a server whose published
@@ -1593,6 +1631,71 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
}
/// <summary>
/// Wire the diagnostic counters onto the supplied session — every publish-response
/// notification increments <c>NotificationCount</c> + samples the EWMA;
/// <see cref="ISession.PublishError"/> distinguishes missing-publish vs other publish
/// faults so operators can see whether the upstream is starving the client of publish
/// slots vs. failing notifications outright.
/// </summary>
private void WireSessionDiagnostics(ISession session)
{
_notificationHandler = OnSessionNotification;
_publishErrorHandler = OnSessionPublishError;
session.Notification += _notificationHandler;
session.PublishError += _publishErrorHandler;
}
private void UnwireSessionDiagnostics(ISession? session)
{
if (session is null) return;
if (_notificationHandler is not null)
{
try { session.Notification -= _notificationHandler; } catch { }
}
if (_publishErrorHandler is not null)
{
try { session.PublishError -= _publishErrorHandler; } catch { }
}
_notificationHandler = null;
_publishErrorHandler = null;
}
private void OnSessionNotification(ISession session, NotificationEventArgs e)
{
// Each publish response carries one NotificationMessage with N data-change /
// event notifications. Track both cardinalities: PublishRequestCount counts
// server publish responses delivered to us; NotificationCount counts the
// individual MonitoredItem changes inside them. The difference matters when
// diagnosing "many publishes, few changes" vs "few publishes, large bursts".
_diagnostics.IncrementPublishRequest();
var msg = e.NotificationMessage;
if (msg?.NotificationData is { Count: > 0 } data)
{
for (var i = 0; i < data.Count; i++)
{
_diagnostics.RecordNotification();
}
}
}
private void OnSessionPublishError(ISession session, PublishErrorEventArgs e)
{
// BadNoSubscription / BadSequenceNumberUnknown / BadMessageNotAvailable all surface
// as "the server expected to publish but couldn't" — bucket them as missing-publish
// for the operator. Other status codes (timeout, network) are dropped notifications.
var sc = e.Status?.StatusCode;
if (sc.HasValue && IsMissingPublishStatus(sc.Value))
_diagnostics.IncrementMissingPublishRequest();
else
_diagnostics.IncrementDroppedNotification();
}
private static bool IsMissingPublishStatus(StatusCode sc) =>
sc.Code == StatusCodes.BadNoSubscription
|| sc.Code == StatusCodes.BadSequenceNumberUnknown
|| sc.Code == StatusCodes.BadMessageNotAvailable;
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public async ValueTask DisposeAsync()

View File

@@ -0,0 +1,163 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
/// <summary>
/// Unit tests for the per-driver diagnostic counters surfaced via
/// <see cref="DriverHealth.Diagnostics"/> for the <c>driver-diagnostics</c> RPC
/// (task #276). Counters are exercised directly through the internal helper rather
/// than via a live SDK <c>ISession</c> because the SDK requires a connected upstream
/// to publish events and we want unit-level coverage of the math + the snapshot shape.
/// </summary>
[Trait("Category", "Unit")]
public sealed class OpcUaClientDiagnosticsTests
{
[Fact]
public void Counters_default_to_zero()
{
var d = new OpcUaClientDiagnostics();
d.PublishRequestCount.ShouldBe(0);
d.NotificationCount.ShouldBe(0);
d.NotificationsPerSecond.ShouldBe(0);
d.MissingPublishRequestCount.ShouldBe(0);
d.DroppedNotificationCount.ShouldBe(0);
d.SessionResetCount.ShouldBe(0);
d.LastReconnectUtc.ShouldBeNull();
}
[Fact]
public void IncrementPublishRequest_bumps_total()
{
var d = new OpcUaClientDiagnostics();
d.IncrementPublishRequest();
d.IncrementPublishRequest();
d.IncrementPublishRequest();
d.PublishRequestCount.ShouldBe(3);
}
[Fact]
public void IncrementMissingPublishRequest_bumps_total()
{
var d = new OpcUaClientDiagnostics();
d.IncrementMissingPublishRequest();
d.IncrementMissingPublishRequest();
d.MissingPublishRequestCount.ShouldBe(2);
}
[Fact]
public void IncrementDroppedNotification_bumps_total()
{
var d = new OpcUaClientDiagnostics();
d.IncrementDroppedNotification();
d.DroppedNotificationCount.ShouldBe(1);
}
[Fact]
public void RecordNotification_grows_count_and_then_rate()
{
var d = new OpcUaClientDiagnostics();
var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
// First sample seeds the EWMA — rate stays 0 until we have a delta.
d.RecordNotification(t0);
d.NotificationCount.ShouldBe(1);
d.NotificationsPerSecond.ShouldBe(0);
// 1 Hz steady state: 30 samples spaced 1s apart converge toward 1/s. With 5s half-life
// and alpha=0.5^(1/5)≈0.871, the EWMA approaches 1 - alpha^N — after 30 samples that's
// 1 - 0.871^30 ≈ 0.984.
for (var i = 1; i <= 30; i++)
d.RecordNotification(t0.AddSeconds(i));
d.NotificationCount.ShouldBe(31);
d.NotificationsPerSecond.ShouldBeInRange(0.95, 1.05, "EWMA at 5s half-life converges to ~1Hz after 30 samples");
}
[Fact]
public void RecordSessionReset_bumps_count_and_sets_last_reconnect()
{
var d = new OpcUaClientDiagnostics();
var t = new DateTime(2026, 4, 25, 12, 34, 56, DateTimeKind.Utc);
d.RecordSessionReset(t);
d.SessionResetCount.ShouldBe(1);
d.LastReconnectUtc.ShouldBe(t);
// Second reset overwrites timestamp + bumps count.
var t2 = t.AddMinutes(5);
d.RecordSessionReset(t2);
d.SessionResetCount.ShouldBe(2);
d.LastReconnectUtc.ShouldBe(t2);
}
[Fact]
public void Snapshot_emits_well_known_keys()
{
var d = new OpcUaClientDiagnostics();
d.IncrementPublishRequest();
d.RecordNotification(new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc));
d.IncrementMissingPublishRequest();
d.IncrementDroppedNotification();
d.RecordSessionReset(new DateTime(2026, 4, 25, 0, 0, 0, DateTimeKind.Utc));
var snap = d.Snapshot();
snap.ShouldContainKey("PublishRequestCount");
snap["PublishRequestCount"].ShouldBe(1);
snap.ShouldContainKey("NotificationCount");
snap["NotificationCount"].ShouldBe(1);
snap.ShouldContainKey("NotificationsPerSecond");
snap.ShouldContainKey("MissingPublishRequestCount");
snap["MissingPublishRequestCount"].ShouldBe(1);
snap.ShouldContainKey("DroppedNotificationCount");
snap["DroppedNotificationCount"].ShouldBe(1);
snap.ShouldContainKey("SessionResetCount");
snap["SessionResetCount"].ShouldBe(1);
snap.ShouldContainKey("LastReconnectUtcTicks");
}
[Fact]
public void Snapshot_omits_LastReconnectUtcTicks_when_no_reset_recorded()
{
var d = new OpcUaClientDiagnostics();
d.Snapshot().ShouldNotContainKey("LastReconnectUtcTicks");
}
[Fact]
public void Driver_GetHealth_includes_diagnostics_dictionary()
{
// GetHealth must expose the snapshot to the RPC consumer even before any session
// has been opened — operators call it during startup to check counters baseline.
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "diag-test");
var health = drv.GetHealth();
health.Diagnostics.ShouldNotBeNull();
health.Diagnostics!.ShouldContainKey("PublishRequestCount");
health.Diagnostics["PublishRequestCount"].ShouldBe(0);
health.Diagnostics.ShouldContainKey("NotificationCount");
health.Diagnostics.ShouldContainKey("SessionResetCount");
}
[Fact]
public void Driver_health_diagnostics_reflect_internal_counters_after_increment()
{
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "diag-test-2");
// Drive a counter through the test seam to prove the GetHealth snapshot is live,
// not a one-shot at construction.
drv.DiagnosticsForTest.IncrementPublishRequest();
drv.DiagnosticsForTest.IncrementPublishRequest();
var health = drv.GetHealth();
health.Diagnostics!["PublishRequestCount"].ShouldBe(2);
}
[Fact]
public void DriverHealth_default_diagnostics_is_null_but_DiagnosticsOrEmpty_is_empty()
{
// Back-compat: pre-existing call sites that construct DriverHealth with the
// 3-arg overload must keep working — the 4th param defaults to null.
var h = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
h.Diagnostics.ShouldBeNull();
h.DiagnosticsOrEmpty.ShouldBeEmpty();
}
}