diff --git a/code-reviews/Driver.Historian.Wonderware.Client/findings.md b/code-reviews/Driver.Historian.Wonderware.Client/findings.md index 7b2aff1..4f345fa 100644 --- a/code-reviews/Driver.Historian.Wonderware.Client/findings.md +++ b/code-reviews/Driver.Historian.Wonderware.Client/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-22 | | Commit reviewed | `76d35d1` | | Status | Reviewed | -| Open findings | 5 | +| Open findings | 0 | ## Checklist coverage @@ -92,7 +92,7 @@ dead-lettered. Until then, document explicitly that this writer never produces | Severity | Low | | Category | Concurrency & thread safety | | Location | `WonderwareHistorianClient.cs:207`, `WonderwareHistorianClient.cs:132-150` | -| Status | Open | +| Status | Resolved | **Description:** `_totalQueries` is mutated with `Interlocked.Increment` in `Invoke`, but read inside `GetHealthSnapshot` under `_healthLock`, and every other counter @@ -106,7 +106,7 @@ and the counters are advisory, but the mixed model is a latent hazard. `_healthLock` block (a new `RecordQuery()` helper, or fold it into `RecordSuccess`/ `RecordFailure`) so all six health fields share a single lock. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — replaced the mixed `Interlocked.Increment(ref _totalQueries)` + `_healthLock`-protected outcome counters with a single `RecordOutcome(bool success, string? error)` helper that increments `_totalQueries` and exactly one of `_totalSuccesses` / `_totalFailures` under one `_healthLock` acquisition; `GetHealthSnapshot` documents the invariant that `TotalSuccesses + TotalFailures == TotalQueries` at every observed snapshot. Added the regression test `GetHealthSnapshot_ConcurrentCallsAndReads_CountersAreInternallyConsistent` that runs a polling reader concurrently with 50 calls and asserts the invariant never breaks (fails red against the previous code, passes green now). ### Driver.Historian.Wonderware.Client-004 @@ -115,7 +115,7 @@ and the counters are advisory, but the mixed model is a latent hazard. | Severity | Low | | Category | Concurrency & thread safety | | Location | `WonderwareHistorianClient.cs:203-267` | -| Status | Open | +| Status | Resolved | **Description:** A sidecar-reported failure is recorded in two non-atomic steps under separate lock acquisitions: `Invoke` calls `RecordSuccess()` (line 211) and then the @@ -132,7 +132,7 @@ sidecar-level `Success` flag has been checked, or pass the reply success/error i single `RecordOutcome(bool transportOk, bool sidecarOk, string? error)` that updates all counters under one lock acquisition. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — eliminated the `RecordSuccess` → `ReclassifySuccessAsFailure` undo dance. `InvokeAsync` now takes a `Func` evaluator, evaluates it once when the transport reply lands, and calls `RecordOutcome(bool success, string? error)` exactly once per call under a single `_healthLock` acquisition. A sidecar-reported failure is now classified as a failure on its first and only counter update — no transient "success then undo" state is observable. The read-side `InvokeAndClassifyAsync` wrapper preserves the prior `InvalidOperationException` throw on sidecar failure. Added regression test `GetHealthSnapshot_SidecarFailure_NeverInflatesSuccessCounter` pinning `TotalSuccesses=0`/`TotalFailures=1` after a sidecar-error call. ### Driver.Historian.Wonderware.Client-005 @@ -167,7 +167,7 @@ the reader. | Severity | Low | | Category | Error handling & resilience | | Location | `Internal/PipeChannel.cs:96-107`, `WonderwareHistorianClientOptions.cs:11-12` | -| Status | Open | +| Status | Resolved | **Description:** `PipeChannel.InvokeAsync` retries exactly once on transport failure and otherwise propagates. The options expose `ReconnectInitialBackoff` and @@ -182,7 +182,7 @@ or the options are dead config that misleads operators. path, or remove the two unused option fields and their XML docs and state plainly that retry/backoff is owned by the caller (the alarm drain worker / history router). -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — removed the dead `ReconnectInitialBackoff`/`ReconnectMaxBackoff` fields (and their `Effective*` accessors) from `WonderwareHistorianClientOptions` and added a `` block stating that retry/backoff is owned by the caller (the alarm drain worker and the read-side history router) and that the channel itself performs exactly one in-place reconnect with no delay. Confirmed no consumer referenced the removed fields (only `code-reviews/` references remain). Solution-level build clean — Server picks up the new options shape without change. ### Driver.Historian.Wonderware.Client-007 @@ -218,7 +218,7 @@ deserializing. | Severity | Low | | Category | Security | | Location | `ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.csproj:29-32` | -| Status | Open | +| Status | Resolved | **Description:** The csproj suppresses two NuGet audit advisories (`GHSA-37gx-xxp4-5rgx`, `GHSA-w3x6-4m5h-cxqf`) for the `MessagePack` 2.5.187 dependency @@ -232,7 +232,7 @@ advisory title, why it does not apply to this module usage, and a revisit trigge follow-up to upgrade `MessagePack` once a patched version is available so the suppressions can be dropped. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — the suppression block in the csproj (already added under finding 007) records each advisory title (GHSA-37gx-xxp4-5rgx unsafe-dynamic-codegen, GHSA-w3x6-4m5h-cxqf typeless-resolver gadget chain), why neither applies to this module (default `StandardResolver` only, no `TypelessContractlessStandardResolver` / `DynamicUnion` / `DynamicGenericResolver`, plus the 64 KiB per-sample ValueBytes cap in `DeserializeSampleValue` from finding 007), and the revisit trigger ("Revisit once MessagePack 3.x is available and drop these suppressions at that time"). All three pieces the recommendation asked for are present; the single comment block above both `NuGetAuditSuppress` entries was confirmed to satisfy the audit-trail gap. ### Driver.Historian.Wonderware.Client-009 @@ -272,7 +272,7 @@ silent `[Key]` drift between the two duplicated contract sets is caught at build | Severity | Low | | Category | Documentation & comments | | Location | `WonderwareHistorianClient.cs:355-361`, `WonderwareHistorianClient.cs:132-150` | -| Status | Open | +| Status | Resolved | **Description:** Two doc/behaviour mismatches. (1) The `Dispose()` XML comment asserts the underlying channel async cleanup is @@ -291,4 +291,4 @@ node concept. The collapse is reasonable but undocumented. short remark on `GetHealthSnapshot` explaining that the single-channel client maps both connection flags to one transport and does not track per-node health. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — (1) reworded the `Dispose()` XML comment to drop the "non-blocking" claim and instead state that the bridge is **deadlock-safe** because the cleanup never awaits a captured `SynchronizationContext` nor takes any lock the caller could hold, while acknowledging that `NamedPipeClientStream` teardown can block briefly on OS handle release. (2) Added a full `` + `` block to `GetHealthSnapshot` explaining the single-channel collapse — both `ProcessConnectionOpen` and `EventConnectionOpen` report the same channel state, and `ActiveProcessNode`/`ActiveEventNode`/`Nodes` are intentionally null/empty because the client has no per-node telemetry. The remarks also pin the finding-003/004 invariant `TotalSuccesses + TotalFailures == TotalQueries`. diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs index 3b5e1ef..c3f2f08 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs @@ -72,8 +72,9 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist MaxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue), CorrelationId = Guid.NewGuid().ToString("N"), }; - var reply = await Invoke(MessageKind.ReadRawRequest, MessageKind.ReadRawReply, req, cancellationToken).ConfigureAwait(false); - ThrowIfFailed(reply.Success, reply.Error, "ReadRaw"); + var reply = await InvokeAndClassifyAsync( + MessageKind.ReadRawRequest, MessageKind.ReadRawReply, req, + r => (r.Success, r.Error), "ReadRaw", cancellationToken).ConfigureAwait(false); return new HistoryReadResult(ToSnapshots(reply.Samples), ContinuationPoint: null); } @@ -90,8 +91,9 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist AggregateColumn = MapAggregate(aggregate), CorrelationId = Guid.NewGuid().ToString("N"), }; - var reply = await Invoke(MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req, cancellationToken).ConfigureAwait(false); - ThrowIfFailed(reply.Success, reply.Error, "ReadProcessed"); + var reply = await InvokeAndClassifyAsync( + MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req, + r => (r.Success, r.Error), "ReadProcessed", cancellationToken).ConfigureAwait(false); return new HistoryReadResult(ToAggregateSnapshots(reply.Buckets), ContinuationPoint: null); } @@ -107,8 +109,9 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist TimestampsUtcTicks = ticks, CorrelationId = Guid.NewGuid().ToString("N"), }; - var reply = await Invoke(MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply, req, cancellationToken).ConfigureAwait(false); - ThrowIfFailed(reply.Success, reply.Error, "ReadAtTime"); + var reply = await InvokeAndClassifyAsync( + MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply, req, + r => (r.Success, r.Error), "ReadAtTime", cancellationToken).ConfigureAwait(false); return new HistoryReadResult(AlignAtTimeSnapshots(timestampsUtc, reply.Samples), ContinuationPoint: null); } @@ -167,11 +170,34 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist MaxEvents = maxEvents, CorrelationId = Guid.NewGuid().ToString("N"), }; - var reply = await Invoke(MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply, req, cancellationToken).ConfigureAwait(false); - ThrowIfFailed(reply.Success, reply.Error, "ReadEvents"); + var reply = await InvokeAndClassifyAsync( + MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply, req, + r => (r.Success, r.Error), "ReadEvents", cancellationToken).ConfigureAwait(false); return new HistoricalEventsResult(ToHistoricalEvents(reply.Events), ContinuationPoint: null); } + /// + /// Returns a snapshot of operation counters and the single pipe channel's connection + /// state. + /// + /// + /// This client owns one duplex named-pipe channel to the sidecar — it has no notion of + /// separate process / event connections and no per-node telemetry. The single channel's + /// connected state is reported for both + /// and , and + /// / + /// / + /// are intentionally null/empty. Consumers + /// that need to distinguish two connections should read another driver. (Finding 010.) + /// + /// All six counter fields (TotalQueries, TotalSuccesses, TotalFailures, + /// ConsecutiveFailures, LastSuccessTime, LastFailureTime, LastError) are mutated + /// exclusively under _healthLock, so the snapshot is internally consistent — + /// in particular TotalSuccesses + TotalFailures == TotalQueries at every + /// observed snapshot (a call that has started but not yet completed has not + /// incremented any counter). (Finding 003 / 004.) + /// + /// public HistorianHealthSnapshot GetHealthSnapshot() { lock (_healthLock) @@ -233,8 +259,9 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist try { - var reply = await Invoke( - MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, req, cancellationToken).ConfigureAwait(false); + var reply = await InvokeAsync( + MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, req, + r => (r.Success, r.Error), cancellationToken).ConfigureAwait(false); // Whole-call failure → transient retry for every event in the batch. if (!reply.Success) @@ -279,69 +306,79 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist // ===== Helpers ===== - private async Task Invoke( - MessageKind requestKind, MessageKind expectedReplyKind, TRequest request, CancellationToken ct) + /// + /// Sends one request through the channel and records the outcome (transport success or + /// transport failure) under a single _healthLock acquisition that also bumps + /// _totalQueries. Sidecar-level success / failure is NOT classified here — the + /// caller passes that through instead. (Finding + /// 003 / 004: all six counter fields share one synchronization mechanism so a snapshot + /// can never observe a torn state.) + /// + private async Task InvokeAsync( + MessageKind requestKind, MessageKind expectedReplyKind, TRequest request, + Func evaluate, CancellationToken ct) where TReply : class { - Interlocked.Increment(ref _totalQueries); try { var reply = await _channel.InvokeAsync(requestKind, expectedReplyKind, request, ct).ConfigureAwait(false); - RecordSuccess(); + // Classify transport+sidecar in one lock so TotalQueries/TotalSuccesses/ + // TotalFailures move together and no intermediate "success-then-undo" state is + // visible to a concurrent GetHealthSnapshot. + var (ok, error) = evaluate(reply); + RecordOutcome(ok, error); return reply; } catch (Exception ex) { - RecordFailure(ex.Message); + RecordOutcome(success: false, ex.Message); throw; } } - private void RecordSuccess() + /// + /// Convenience wrapper around that throws + /// on a sidecar-reported failure. Used by the + /// read methods. + /// + private async Task InvokeAndClassifyAsync( + MessageKind requestKind, MessageKind expectedReplyKind, TRequest request, + Func evaluate, string op, CancellationToken ct) + where TReply : class { - lock (_healthLock) + var reply = await InvokeAsync(requestKind, expectedReplyKind, request, evaluate, ct).ConfigureAwait(false); + var (ok, error) = evaluate(reply); + if (!ok) { - _totalSuccesses++; - _consecutiveFailures = 0; - _lastSuccessUtc = DateTime.UtcNow; - } - } - - private void RecordFailure(string message) - { - lock (_healthLock) - { - _totalFailures++; - _consecutiveFailures++; - _lastFailureUtc = DateTime.UtcNow; - _lastError = message; - } - } - - private void ThrowIfFailed(bool success, string? error, string op) - { - if (!success) - { - // Sidecar-reported failure counts as an operation failure even though the - // transport delivered a reply. The Invoke wrapper already recorded transport - // success — undo that and record the failure so the health snapshot reflects - // operation-level success rates rather than just connectivity. - ReclassifySuccessAsFailure(error); throw new InvalidOperationException( $"Sidecar {op} failed: {error ?? ""}."); } + return reply; } - private void ReclassifySuccessAsFailure(string? message) + /// + /// Records the outcome of a single call — increments _totalQueries and exactly + /// one of _totalSuccesses / _totalFailures under a single + /// _healthLock acquisition. (Findings 003 + 004.) + /// + private void RecordOutcome(bool success, string? error) { lock (_healthLock) { - // Transport-level RecordSuccess happened a moment ago; reverse it. - _totalSuccesses--; - _totalFailures++; - _consecutiveFailures++; - _lastFailureUtc = DateTime.UtcNow; - _lastError = message; + _totalQueries++; + if (success) + { + _totalSuccesses++; + _consecutiveFailures = 0; + _lastSuccessUtc = DateTime.UtcNow; + } + else + { + _totalFailures++; + _consecutiveFailures++; + _lastFailureUtc = DateTime.UtcNow; + _lastError = error; + } } } @@ -452,9 +489,12 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist /// /// Synchronous dispose required by on - /// . The underlying channel's async cleanup is - /// non-blocking (just resets transport state + disposes streams), so the - /// GetAwaiter()/GetResult() bridge is safe. + /// . The underlying channel's async cleanup runs + /// teardown, which can block briefly + /// on OS handle release — strictly speaking it is not non-blocking — but the + /// GetAwaiter()/GetResult() bridge is deadlock-safe because the cleanup never + /// awaits a captured nor takes any + /// lock that the caller could hold. (Finding 010.) /// public void Dispose() => _channel.DisposeAsync().AsTask().GetAwaiter().GetResult(); } diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClientOptions.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClientOptions.cs index 739da8b..1771530 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClientOptions.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClientOptions.cs @@ -3,24 +3,28 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client; /// /// Connection options for . /// +/// +/// +/// Retry / backoff ownership (finding 006): this module performs exactly one +/// in-place transport reconnect inside PipeChannel.InvokeAsync with no delay, +/// and does NOT implement exponential reconnect backoff. Broader retry/backoff is the +/// caller's responsibility — the alarm drain worker +/// (Core.AlarmHistorian.SqliteStoreAndForwardSink) and the read-side +/// history router are expected to layer their own backoff on top. +/// +/// /// Named-pipe name the sidecar listens on (matches the sidecar's OTOPCUA_HISTORIAN_PIPE). /// Per-process shared secret the sidecar will verify in the Hello frame. /// Diagnostic peer identifier sent in Hello — typically the OtOpcUa instance id. /// Cap on the named-pipe connect + Hello round trip on each (re)connect. /// Cap on a single read/write call once connected. -/// Backoff between the first failed reconnect attempts. -/// Upper bound on the exponential backoff between reconnects. public sealed record WonderwareHistorianClientOptions( string PipeName, string SharedSecret, string PeerName = "OtOpcUa", TimeSpan? ConnectTimeout = null, - TimeSpan? CallTimeout = null, - TimeSpan? ReconnectInitialBackoff = null, - TimeSpan? ReconnectMaxBackoff = null) + TimeSpan? CallTimeout = null) { public TimeSpan EffectiveConnectTimeout => ConnectTimeout ?? TimeSpan.FromSeconds(10); public TimeSpan EffectiveCallTimeout => CallTimeout ?? TimeSpan.FromSeconds(30); - public TimeSpan EffectiveReconnectInitialBackoff => ReconnectInitialBackoff ?? TimeSpan.FromMilliseconds(500); - public TimeSpan EffectiveReconnectMaxBackoff => ReconnectMaxBackoff ?? TimeSpan.FromSeconds(30); } diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs index 7e75a3f..5fb2876 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/WonderwareHistorianClientTests.cs @@ -491,4 +491,95 @@ public sealed class WonderwareHistorianClientTests await Should.ThrowAsync(() => client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None)); } + + // ===== Finding-003 / Finding-004: health counter consistency ===== + + /// + /// (Finding 003 + 004) A sidecar-level failure must be classified once: TotalSuccesses + /// must stay at 0, TotalFailures must become 1, and TotalQueries / TotalSuccesses / + /// TotalFailures must all be updated under the same lock so a concurrent snapshot can + /// never observe inflated successes or out-of-band TotalQueries. This pins behaviour so + /// a future regression to the "RecordSuccess then undo via ReclassifySuccessAsFailure" + /// dance is caught. + /// + [Fact] + public async Task GetHealthSnapshot_SidecarFailure_NeverInflatesSuccessCounter() + { + var pipe = UniquePipeName(); + await using var server = new FakeSidecarServer(pipe, Secret) + { + OnReadRaw = _ => new ReadRawReply { Success = false, Error = "boom" }, + }; + await server.StartAsync(); + + await using var client = new WonderwareHistorianClient(OptsFor(pipe)); + + await Should.ThrowAsync(() => + client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, CancellationToken.None)); + + var snap = client.GetHealthSnapshot(); + snap.TotalQueries.ShouldBe(1); + snap.TotalSuccesses.ShouldBe(0); + snap.TotalFailures.ShouldBe(1); + snap.ConsecutiveFailures.ShouldBe(1); + snap.LastError.ShouldNotBeNull(); + } + + /// + /// (Finding 003) Concurrent calls + concurrent + /// reads must observe consistent counters. Specifically, TotalSuccesses + TotalFailures + /// must equal TotalQueries at every observed snapshot (no torn read between an + /// Interlocked-incremented TotalQueries and a lock-protected outcome counter). The + /// channel serializes calls, so the test is observable: each completed query strictly + /// increments either successes or failures by one. + /// + [Fact] + public async Task GetHealthSnapshot_ConcurrentCallsAndReads_CountersAreInternallyConsistent() + { + var pipe = UniquePipeName(); + await using var server = new FakeSidecarServer(pipe, Secret) + { + OnReadRaw = _ => new ReadRawReply { Success = true }, + }; + await server.StartAsync(); + + await using var client = new WonderwareHistorianClient(OptsFor(pipe)); + + using var stop = new CancellationTokenSource(); + var readerSawInconsistent = false; + +#pragma warning disable xUnit1051 // Internal Task.Run loop drives a polling stress test; cancellation flows via stop.IsCancellationRequested below. + var reader = Task.Run(() => + { + while (!stop.IsCancellationRequested) + { + var snap = client.GetHealthSnapshot(); + // Every completed call increments TotalQueries AND exactly one of + // TotalSuccesses or TotalFailures under the same lock; an in-flight call + // has not yet incremented any of them. So TotalQueries should always equal + // the sum of TotalSuccesses + TotalFailures (no in-between state visible). + if (snap.TotalSuccesses + snap.TotalFailures != snap.TotalQueries) + { + readerSawInconsistent = true; + } + } + }); +#pragma warning restore xUnit1051 + + for (var i = 0; i < 50; i++) + { + await client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, TestContext.Current.CancellationToken); + } + + stop.Cancel(); + await reader; + + readerSawInconsistent.ShouldBeFalse( + "GetHealthSnapshot exposed TotalQueries that disagreed with the sum of TotalSuccesses + TotalFailures — counters are not updated under a single lock."); + + var final = client.GetHealthSnapshot(); + final.TotalQueries.ShouldBe(50); + final.TotalSuccesses.ShouldBe(50); + final.TotalFailures.ShouldBe(0); + } }