diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs index 8cb536c..363c9ad 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs @@ -22,6 +22,7 @@ public sealed class CapabilityInvoker private readonly string _driverInstanceId; private readonly string _driverType; private readonly Func _optionsAccessor; + private readonly DriverResilienceStatusTracker? _statusTracker; /// /// Construct an invoker for one driver instance. @@ -33,11 +34,13 @@ public sealed class CapabilityInvoker /// pipeline-invalidate can take effect without restarting the invoker. /// /// Driver type name for structured-log enrichment (e.g. "Modbus"). + /// Optional resilience-status tracker. When wired, every capability call records start/complete so Admin /hosts can surface as the bulkhead-depth proxy. public CapabilityInvoker( DriverResiliencePipelineBuilder builder, string driverInstanceId, Func optionsAccessor, - string driverType = "Unknown") + string driverType = "Unknown", + DriverResilienceStatusTracker? statusTracker = null) { ArgumentNullException.ThrowIfNull(builder); ArgumentNullException.ThrowIfNull(optionsAccessor); @@ -46,6 +49,7 @@ public sealed class CapabilityInvoker _driverInstanceId = driverInstanceId; _driverType = driverType; _optionsAccessor = optionsAccessor; + _statusTracker = statusTracker; } /// Execute a capability call returning a value, honoring the per-capability pipeline. @@ -59,9 +63,17 @@ public sealed class CapabilityInvoker ArgumentNullException.ThrowIfNull(callSite); var pipeline = ResolvePipeline(capability, hostName); - using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId())) + _statusTracker?.RecordCallStart(_driverInstanceId, hostName); + try { - return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId())) + { + return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + } + finally + { + _statusTracker?.RecordCallComplete(_driverInstanceId, hostName); } } @@ -75,9 +87,17 @@ public sealed class CapabilityInvoker ArgumentNullException.ThrowIfNull(callSite); var pipeline = ResolvePipeline(capability, hostName); - using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId())) + _statusTracker?.RecordCallStart(_driverInstanceId, hostName); + try { - await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId())) + { + await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + } + finally + { + _statusTracker?.RecordCallComplete(_driverInstanceId, hostName); } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceStatusTracker.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceStatusTracker.cs index be4cc07..0ff277d 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceStatusTracker.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceStatusTracker.cs @@ -81,6 +81,29 @@ public sealed class DriverResilienceStatusTracker }); } + /// + /// Record the entry of a capability call for this (instance, host). Increments the + /// in-flight counter used as the + /// surface (a cheap stand-in for Polly bulkhead depth). Paired with + /// ; callers use try/finally. + /// + public void RecordCallStart(string driverInstanceId, string hostName) + { + var key = new StatusKey(driverInstanceId, hostName); + _status.AddOrUpdate(key, + _ => new ResilienceStatusSnapshot { CurrentInFlight = 1 }, + (_, existing) => existing with { CurrentInFlight = existing.CurrentInFlight + 1 }); + } + + /// Paired with — decrements the in-flight counter. + public void RecordCallComplete(string driverInstanceId, string hostName) + { + var key = new StatusKey(driverInstanceId, hostName); + _status.AddOrUpdate(key, + _ => new ResilienceStatusSnapshot { CurrentInFlight = 0 }, // start-without-complete shouldn't happen; clamp to 0 + (_, existing) => existing with { CurrentInFlight = Math.Max(0, existing.CurrentInFlight - 1) }); + } + /// Snapshot of a specific (instance, host) pair; null if no counters recorded yet. public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) => _status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null; @@ -101,4 +124,12 @@ public sealed record ResilienceStatusSnapshot public long BaselineFootprintBytes { get; init; } public long CurrentFootprintBytes { get; init; } public DateTime LastSampledUtc { get; init; } + + /// + /// In-flight capability calls against this (instance, host). Bumped on call entry + + /// decremented on completion. Feeds DriverInstanceResilienceStatus.CurrentBulkheadDepth + /// for Admin /hosts — a cheap proxy for the Polly bulkhead depth until the full + /// telemetry observer lands. + /// + public int CurrentInFlight { get; init; } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/ResilienceStatusPublisherHostedService.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/ResilienceStatusPublisherHostedService.cs index 777be52..d933f5b 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/ResilienceStatusPublisherHostedService.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/ResilienceStatusPublisherHostedService.cs @@ -108,7 +108,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService HostName = hostName, LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc, ConsecutiveFailures = counters.ConsecutiveFailures, - CurrentBulkheadDepth = 0, // Phase 6.1 Stream A tracker doesn't emit bulkhead depth yet + CurrentBulkheadDepth = counters.CurrentInFlight, LastRecycleUtc = counters.LastRecycleUtc, BaselineFootprintBytes = counters.BaselineFootprintBytes, CurrentFootprintBytes = counters.CurrentFootprintBytes, @@ -119,6 +119,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService { existing.LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc; existing.ConsecutiveFailures = counters.ConsecutiveFailures; + existing.CurrentBulkheadDepth = counters.CurrentInFlight; existing.LastRecycleUtc = counters.LastRecycleUtc; existing.BaselineFootprintBytes = counters.BaselineFootprintBytes; existing.CurrentFootprintBytes = counters.CurrentFootprintBytes; diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/InFlightCounterTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/InFlightCounterTests.cs new file mode 100644 index 0000000..5f016d0 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/InFlightCounterTests.cs @@ -0,0 +1,130 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class InFlightCounterTests +{ + [Fact] + public void StartThenComplete_NetsToZero() + { + var tracker = new DriverResilienceStatusTracker(); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallComplete("drv", "host-a"); + + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0); + } + + [Fact] + public void NestedStarts_SumDepth() + { + var tracker = new DriverResilienceStatusTracker(); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallStart("drv", "host-a"); + + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(3); + + tracker.RecordCallComplete("drv", "host-a"); + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2); + } + + [Fact] + public void CompleteBeforeStart_ClampedToZero() + { + var tracker = new DriverResilienceStatusTracker(); + tracker.RecordCallComplete("drv", "host-a"); + + // A stray Complete without a matching Start shouldn't drive the counter negative. + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0); + } + + [Fact] + public void DifferentHosts_TrackIndependently() + { + var tracker = new DriverResilienceStatusTracker(); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallStart("drv", "host-a"); + tracker.RecordCallStart("drv", "host-b"); + + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2); + tracker.TryGet("drv", "host-b")!.CurrentInFlight.ShouldBe(1); + } + + [Fact] + public void ConcurrentStarts_DoNotLose_Count() + { + var tracker = new DriverResilienceStatusTracker(); + Parallel.For(0, 500, _ => tracker.RecordCallStart("drv", "host-a")); + + tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(500); + } + + [Fact] + public async Task CapabilityInvoker_IncrementsTracker_DuringExecution() + { + var tracker = new DriverResilienceStatusTracker(); + var invoker = new CapabilityInvoker( + new DriverResiliencePipelineBuilder(), + "drv-live", + () => new DriverResilienceOptions { Tier = DriverTier.A }, + driverType: "Modbus", + statusTracker: tracker); + + var observedMidCall = -1; + await invoker.ExecuteAsync( + DriverCapability.Read, + "plc-1", + async _ => + { + observedMidCall = tracker.TryGet("drv-live", "plc-1")?.CurrentInFlight ?? -1; + await Task.Yield(); + return 42; + }, + CancellationToken.None); + + observedMidCall.ShouldBe(1, "during call, in-flight == 1"); + tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, "post-call, counter decremented"); + } + + [Fact] + public async Task CapabilityInvoker_ExceptionPath_DecrementsCounter() + { + var tracker = new DriverResilienceStatusTracker(); + var invoker = new CapabilityInvoker( + new DriverResiliencePipelineBuilder(), + "drv-live", + () => new DriverResilienceOptions { Tier = DriverTier.A }, + statusTracker: tracker); + + await Should.ThrowAsync(async () => + await invoker.ExecuteAsync( + DriverCapability.Write, + "plc-1", + _ => throw new InvalidOperationException("boom"), + CancellationToken.None)); + + tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, + "finally-block must decrement even when call-site throws"); + } + + [Fact] + public async Task CapabilityInvoker_WithoutTracker_DoesNotThrow() + { + var invoker = new CapabilityInvoker( + new DriverResiliencePipelineBuilder(), + "drv-live", + () => new DriverResilienceOptions { Tier = DriverTier.A }, + statusTracker: null); + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, "host-1", + _ => ValueTask.FromResult(7), + CancellationToken.None); + + result.ShouldBe(7); + } +}