Phase 6.1 Stream E.3 partial - in-flight counter feeds CurrentBulkheadDepth #107

Merged
dohertj2 merged 1 commits from phase-6-1-stream-e3-inflight-counter into v2 2026-04-19 15:04:30 -04:00
4 changed files with 188 additions and 6 deletions

View File

@@ -22,6 +22,7 @@ public sealed class CapabilityInvoker
private readonly string _driverInstanceId;
private readonly string _driverType;
private readonly Func<DriverResilienceOptions> _optionsAccessor;
private readonly DriverResilienceStatusTracker? _statusTracker;
/// <summary>
/// Construct an invoker for one driver instance.
@@ -33,11 +34,13 @@ public sealed class CapabilityInvoker
/// pipeline-invalidate can take effect without restarting the invoker.
/// </param>
/// <param name="driverType">Driver type name for structured-log enrichment (e.g. <c>"Modbus"</c>).</param>
/// <param name="statusTracker">Optional resilience-status tracker. When wired, every capability call records start/complete so Admin <c>/hosts</c> can surface <see cref="ResilienceStatusSnapshot.CurrentInFlight"/> as the bulkhead-depth proxy.</param>
public CapabilityInvoker(
DriverResiliencePipelineBuilder builder,
string driverInstanceId,
Func<DriverResilienceOptions> optionsAccessor,
string driverType = "Unknown")
string driverType = "Unknown",
DriverResilienceStatusTracker? statusTracker = null)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(optionsAccessor);
@@ -46,6 +49,7 @@ public sealed class CapabilityInvoker
_driverInstanceId = driverInstanceId;
_driverType = driverType;
_optionsAccessor = optionsAccessor;
_statusTracker = statusTracker;
}
/// <summary>Execute a capability call returning a value, honoring the per-capability pipeline.</summary>
@@ -59,9 +63,17 @@ public sealed class CapabilityInvoker
ArgumentNullException.ThrowIfNull(callSite);
var pipeline = ResolvePipeline(capability, hostName);
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
try
{
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
{
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
}
}
finally
{
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
}
}
@@ -75,9 +87,17 @@ public sealed class CapabilityInvoker
ArgumentNullException.ThrowIfNull(callSite);
var pipeline = ResolvePipeline(capability, hostName);
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
try
{
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
{
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
}
}
finally
{
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
}
}

View File

@@ -81,6 +81,29 @@ public sealed class DriverResilienceStatusTracker
});
}
/// <summary>
/// Record the entry of a capability call for this (instance, host). Increments the
/// in-flight counter used as the <see cref="ResilienceStatusSnapshot.CurrentInFlight"/>
/// surface (a cheap stand-in for Polly bulkhead depth). Paired with
/// <see cref="RecordCallComplete"/>; callers use try/finally.
/// </summary>
public void RecordCallStart(string driverInstanceId, string hostName)
{
var key = new StatusKey(driverInstanceId, hostName);
_status.AddOrUpdate(key,
_ => new ResilienceStatusSnapshot { CurrentInFlight = 1 },
(_, existing) => existing with { CurrentInFlight = existing.CurrentInFlight + 1 });
}
/// <summary>Paired with <see cref="RecordCallStart"/> — decrements the in-flight counter.</summary>
public void RecordCallComplete(string driverInstanceId, string hostName)
{
var key = new StatusKey(driverInstanceId, hostName);
_status.AddOrUpdate(key,
_ => new ResilienceStatusSnapshot { CurrentInFlight = 0 }, // start-without-complete shouldn't happen; clamp to 0
(_, existing) => existing with { CurrentInFlight = Math.Max(0, existing.CurrentInFlight - 1) });
}
/// <summary>Snapshot of a specific (instance, host) pair; null if no counters recorded yet.</summary>
public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) =>
_status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null;
@@ -101,4 +124,12 @@ public sealed record ResilienceStatusSnapshot
public long BaselineFootprintBytes { get; init; }
public long CurrentFootprintBytes { get; init; }
public DateTime LastSampledUtc { get; init; }
/// <summary>
/// In-flight capability calls against this (instance, host). Bumped on call entry +
/// decremented on completion. Feeds <c>DriverInstanceResilienceStatus.CurrentBulkheadDepth</c>
/// for Admin <c>/hosts</c> — a cheap proxy for the Polly bulkhead depth until the full
/// telemetry observer lands.
/// </summary>
public int CurrentInFlight { get; init; }
}

View File

@@ -108,7 +108,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService
HostName = hostName,
LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc,
ConsecutiveFailures = counters.ConsecutiveFailures,
CurrentBulkheadDepth = 0, // Phase 6.1 Stream A tracker doesn't emit bulkhead depth yet
CurrentBulkheadDepth = counters.CurrentInFlight,
LastRecycleUtc = counters.LastRecycleUtc,
BaselineFootprintBytes = counters.BaselineFootprintBytes,
CurrentFootprintBytes = counters.CurrentFootprintBytes,
@@ -119,6 +119,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService
{
existing.LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc;
existing.ConsecutiveFailures = counters.ConsecutiveFailures;
existing.CurrentBulkheadDepth = counters.CurrentInFlight;
existing.LastRecycleUtc = counters.LastRecycleUtc;
existing.BaselineFootprintBytes = counters.BaselineFootprintBytes;
existing.CurrentFootprintBytes = counters.CurrentFootprintBytes;

View File

@@ -0,0 +1,130 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
[Trait("Category", "Unit")]
public sealed class InFlightCounterTests
{
[Fact]
public void StartThenComplete_NetsToZero()
{
var tracker = new DriverResilienceStatusTracker();
tracker.RecordCallStart("drv", "host-a");
tracker.RecordCallComplete("drv", "host-a");
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
}
[Fact]
public void NestedStarts_SumDepth()
{
var tracker = new DriverResilienceStatusTracker();
tracker.RecordCallStart("drv", "host-a");
tracker.RecordCallStart("drv", "host-a");
tracker.RecordCallStart("drv", "host-a");
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(3);
tracker.RecordCallComplete("drv", "host-a");
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
}
[Fact]
public void CompleteBeforeStart_ClampedToZero()
{
var tracker = new DriverResilienceStatusTracker();
tracker.RecordCallComplete("drv", "host-a");
// A stray Complete without a matching Start shouldn't drive the counter negative.
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
}
[Fact]
public void DifferentHosts_TrackIndependently()
{
var tracker = new DriverResilienceStatusTracker();
tracker.RecordCallStart("drv", "host-a");
tracker.RecordCallStart("drv", "host-a");
tracker.RecordCallStart("drv", "host-b");
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
tracker.TryGet("drv", "host-b")!.CurrentInFlight.ShouldBe(1);
}
[Fact]
public void ConcurrentStarts_DoNotLose_Count()
{
var tracker = new DriverResilienceStatusTracker();
Parallel.For(0, 500, _ => tracker.RecordCallStart("drv", "host-a"));
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(500);
}
[Fact]
public async Task CapabilityInvoker_IncrementsTracker_DuringExecution()
{
var tracker = new DriverResilienceStatusTracker();
var invoker = new CapabilityInvoker(
new DriverResiliencePipelineBuilder(),
"drv-live",
() => new DriverResilienceOptions { Tier = DriverTier.A },
driverType: "Modbus",
statusTracker: tracker);
var observedMidCall = -1;
await invoker.ExecuteAsync(
DriverCapability.Read,
"plc-1",
async _ =>
{
observedMidCall = tracker.TryGet("drv-live", "plc-1")?.CurrentInFlight ?? -1;
await Task.Yield();
return 42;
},
CancellationToken.None);
observedMidCall.ShouldBe(1, "during call, in-flight == 1");
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, "post-call, counter decremented");
}
[Fact]
public async Task CapabilityInvoker_ExceptionPath_DecrementsCounter()
{
var tracker = new DriverResilienceStatusTracker();
var invoker = new CapabilityInvoker(
new DriverResiliencePipelineBuilder(),
"drv-live",
() => new DriverResilienceOptions { Tier = DriverTier.A },
statusTracker: tracker);
await Should.ThrowAsync<InvalidOperationException>(async () =>
await invoker.ExecuteAsync<int>(
DriverCapability.Write,
"plc-1",
_ => throw new InvalidOperationException("boom"),
CancellationToken.None));
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0,
"finally-block must decrement even when call-site throws");
}
[Fact]
public async Task CapabilityInvoker_WithoutTracker_DoesNotThrow()
{
var invoker = new CapabilityInvoker(
new DriverResiliencePipelineBuilder(),
"drv-live",
() => new DriverResilienceOptions { Tier = DriverTier.A },
statusTracker: null);
var result = await invoker.ExecuteAsync(
DriverCapability.Read, "host-1",
_ => ValueTask.FromResult(7),
CancellationToken.None);
result.ShouldBe(7);
}
}