From ae8f226e45b67bf0c4ed22478bd99bce00e117c5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 15:02:34 -0400 Subject: [PATCH] =?UTF-8?q?Phase=206.1=20Stream=20E.3=20partial=20?= =?UTF-8?q?=E2=80=94=20in-flight=20counter=20feeds=20CurrentBulkheadDepth?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the observer half of #162 that was flagged as "persisted as 0 today" in PR #105. The Admin /hosts column refresh + FleetStatusHub SignalR push + red-badge visual still belong to the visual-compliance pass. Core.Resilience: - DriverResilienceStatusTracker gains RecordCallStart + RecordCallComplete + CurrentInFlight field on the snapshot record. Concurrent-safe via the same ConcurrentDictionary.AddOrUpdate pattern as the other recorder methods. Clamps to zero on over-decrement so a stray Complete-without-Start can't drive the counter negative. - CapabilityInvoker gains an optional statusTracker ctor parameter. When wired, every ExecuteAsync / ExecuteAsync(void) wraps the pipeline call in try / finally that records start/complete — so the counter advances cleanly whether the call succeeds, cancels, or throws. Null tracker keeps the pre-Phase-6.1 Stream E.3 behaviour exactly. Server.Hosting: - ResilienceStatusPublisherHostedService persists CurrentInFlight as the DriverInstanceResilienceStatus.CurrentBulkheadDepth column (was 0 before this PR). One-line fix on both the insert + update branches. The in-flight counter is a pragmatic proxy for Polly's internal bulkhead depth — a future PR wiring Polly telemetry would replace it with the real value. The shape of the column + the publisher + the Admin /hosts query doesn't change, so the follow-up is invisible to consumers. Tests (8 new InFlightCounterTests, all pass): - Start+Complete nets to zero. - Nested starts sum; Complete decrements. - Complete-without-Start clamps to zero. - Different hosts track independently. - Concurrent starts (500 parallel) don't lose count. - CapabilityInvoker observed-mid-call depth == 1 during a pending call. - CapabilityInvoker exception path still decrements (try/finally). - CapabilityInvoker without tracker doesn't throw. Full solution dotnet test: 1243 passing (was 1235, +8). Pre-existing Client.CLI Subscribe flake unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Resilience/CapabilityInvoker.cs | 30 +++- .../DriverResilienceStatusTracker.cs | 31 +++++ .../ResilienceStatusPublisherHostedService.cs | 3 +- .../Resilience/InFlightCounterTests.cs | 130 ++++++++++++++++++ 4 files changed, 188 insertions(+), 6 deletions(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/InFlightCounterTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs index 8cb536c..363c9ad 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs @@ -22,6 +22,7 @@ public sealed class CapabilityInvoker private readonly string _driverInstanceId; private readonly string _driverType; private readonly Func _optionsAccessor; + private readonly DriverResilienceStatusTracker? _statusTracker; /// /// Construct an invoker for one driver instance. @@ -33,11 +34,13 @@ public sealed class CapabilityInvoker /// pipeline-invalidate can take effect without restarting the invoker. /// /// Driver type name for structured-log enrichment (e.g. "Modbus"). + /// Optional resilience-status tracker. When wired, every capability call records start/complete so Admin /hosts can surface as the bulkhead-depth proxy. public CapabilityInvoker( DriverResiliencePipelineBuilder builder, string driverInstanceId, Func optionsAccessor, - string driverType = "Unknown") + string driverType = "Unknown", + DriverResilienceStatusTracker? statusTracker = null) { ArgumentNullException.ThrowIfNull(builder); ArgumentNullException.ThrowIfNull(optionsAccessor); @@ -46,6 +49,7 @@ public sealed class CapabilityInvoker _driverInstanceId = driverInstanceId; _driverType = driverType; _optionsAccessor = optionsAccessor; + _statusTracker = statusTracker; } /// Execute a capability call returning a value, honoring the per-capability pipeline. @@ -59,9 +63,17 @@ public sealed class CapabilityInvoker ArgumentNullException.ThrowIfNull(callSite); var pipeline = ResolvePipeline(capability, hostName); - using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId())) + _statusTracker?.RecordCallStart(_driverInstanceId, hostName); + try { - return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId())) + { + return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + } + finally + { + _statusTracker?.RecordCallComplete(_driverInstanceId, hostName); } } @@ -75,9 +87,17 @@ public sealed class CapabilityInvoker ArgumentNullException.ThrowIfNull(callSite); var pipeline = ResolvePipeline(capability, hostName); - using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId())) + _statusTracker?.RecordCallStart(_driverInstanceId, hostName); + try { - await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId())) + { + await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + } + finally + { + _statusTracker?.RecordCallComplete(_driverInstanceId, hostName); } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceStatusTracker.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceStatusTracker.cs index be4cc07..0ff277d 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceStatusTracker.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceStatusTracker.cs @@ -81,6 +81,29 @@ public sealed class DriverResilienceStatusTracker }); } + /// + /// Record the entry of a capability call for this (instance, host). Increments the + /// in-flight counter used as the + /// surface (a cheap stand-in for Polly bulkhead depth). Paired with + /// ; callers use try/finally. + /// + public void RecordCallStart(string driverInstanceId, string hostName) + { + var key = new StatusKey(driverInstanceId, hostName); + _status.AddOrUpdate(key, + _ => new ResilienceStatusSnapshot { CurrentInFlight = 1 }, + (_, existing) => existing with { CurrentInFlight = existing.CurrentInFlight + 1 }); + } + + /// Paired with — decrements the in-flight counter. + public void RecordCallComplete(string driverInstanceId, string hostName) + { + var key = new StatusKey(driverInstanceId, hostName); + _status.AddOrUpdate(key, + _ => new ResilienceStatusSnapshot { CurrentInFlight = 0 }, // start-without-complete shouldn't happen; clamp to 0 + (_, existing) => existing with { CurrentInFlight = Math.Max(0, existing.CurrentInFlight - 1) }); + } + /// Snapshot of a specific (instance, host) pair; null if no counters recorded yet. public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) => _status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null; @@ -101,4 +124,12 @@ public sealed record ResilienceStatusSnapshot public long BaselineFootprintBytes { get; init; } public long CurrentFootprintBytes { get; init; } public DateTime LastSampledUtc { get; init; } + + /// + /// In-flight capability calls against this (instance, host). Bumped on call entry + + /// decremented on completion. Feeds DriverInstanceResilienceStatus.CurrentBulkheadDepth + /// for Admin /hosts — a cheap proxy for the Polly bulkhead depth until the full + /// telemetry observer lands. + /// + public int CurrentInFlight { get; init; } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/ResilienceStatusPublisherHostedService.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/ResilienceStatusPublisherHostedService.cs index 777be52..d933f5b 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/ResilienceStatusPublisherHostedService.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/ResilienceStatusPublisherHostedService.cs @@ -108,7 +108,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService HostName = hostName, LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc, ConsecutiveFailures = counters.ConsecutiveFailures, - CurrentBulkheadDepth = 0, // Phase 6.1 Stream A tracker doesn't emit bulkhead depth yet + CurrentBulkheadDepth = counters.CurrentInFlight, LastRecycleUtc = counters.LastRecycleUtc, BaselineFootprintBytes = counters.BaselineFootprintBytes, CurrentFootprintBytes = counters.CurrentFootprintBytes, @@ -119,6 +119,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService { existing.LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc; existing.ConsecutiveFailures = counters.ConsecutiveFailures; + existing.CurrentBulkheadDepth = counters.CurrentInFlight; existing.LastRecycleUtc = counters.LastRecycleUtc; existing.BaselineFootprintBytes = counters.BaselineFootprintBytes; existing.CurrentFootprintBytes = counters.CurrentFootprintBytes; diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/InFlightCounterTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/InFlightCounterTests.cs new file mode 100644 index 0000000..5f016d0 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/InFlightCounterTests.cs @@ -0,0 +1,130 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class InFlightCounterTests +{ + [Fact] + public void StartThenComplete_NetsToZero() + { + var tracker = new DriverResilienceStatusTracker(); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallComplete("drv", "host-a"); + + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0); + } + + [Fact] + public void NestedStarts_SumDepth() + { + var tracker = new DriverResilienceStatusTracker(); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallStart("drv", "host-a"); + + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(3); + + tracker.RecordCallComplete("drv", "host-a"); + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2); + } + + [Fact] + public void CompleteBeforeStart_ClampedToZero() + { + var tracker = new DriverResilienceStatusTracker(); + tracker.RecordCallComplete("drv", "host-a"); + + // A stray Complete without a matching Start shouldn't drive the counter negative. + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0); + } + + [Fact] + public void DifferentHosts_TrackIndependently() + { + var tracker = new DriverResilienceStatusTracker(); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallStart("drv", "host-b"); + + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2); + tracker.TryGet("drv", "host-b")!.CurrentInFlight.ShouldBe(1); + } + + [Fact] + public void ConcurrentStarts_DoNotLose_Count() + { + var tracker = new DriverResilienceStatusTracker(); + Parallel.For(0, 500, _ => tracker.RecordCallStart("drv", "host-a")); + + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(500); + } + + [Fact] + public async Task CapabilityInvoker_IncrementsTracker_DuringExecution() + { + var tracker = new DriverResilienceStatusTracker(); + var invoker = new CapabilityInvoker( + new DriverResiliencePipelineBuilder(), + "drv-live", + () => new DriverResilienceOptions { Tier = DriverTier.A }, + driverType: "Modbus", + statusTracker: tracker); + + var observedMidCall = -1; + await invoker.ExecuteAsync( + DriverCapability.Read, + "plc-1", + async _ => + { + observedMidCall = tracker.TryGet("drv-live", "plc-1")?.CurrentInFlight ?? -1; + await Task.Yield(); + return 42; + }, + CancellationToken.None); + + observedMidCall.ShouldBe(1, "during call, in-flight == 1"); + tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, "post-call, counter decremented"); + } + + [Fact] + public async Task CapabilityInvoker_ExceptionPath_DecrementsCounter() + { + var tracker = new DriverResilienceStatusTracker(); + var invoker = new CapabilityInvoker( + new DriverResiliencePipelineBuilder(), + "drv-live", + () => new DriverResilienceOptions { Tier = DriverTier.A }, + statusTracker: tracker); + + await Should.ThrowAsync(async () => + await invoker.ExecuteAsync( + DriverCapability.Write, + "plc-1", + _ => throw new InvalidOperationException("boom"), + CancellationToken.None)); + + tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, + "finally-block must decrement even when call-site throws"); + } + + [Fact] + public async Task CapabilityInvoker_WithoutTracker_DoesNotThrow() + { + var invoker = new CapabilityInvoker( + new DriverResiliencePipelineBuilder(), + "drv-live", + () => new DriverResilienceOptions { Tier = DriverTier.A }, + statusTracker: null); + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, "host-1", + _ => ValueTask.FromResult(7), + CancellationToken.None); + + result.ShouldBe(7); + } +}