Merge pull request (#107) - in-flight counter
This commit was merged in pull request #107.
This commit is contained in:
@@ -22,6 +22,7 @@ public sealed class CapabilityInvoker
|
|||||||
private readonly string _driverInstanceId;
|
private readonly string _driverInstanceId;
|
||||||
private readonly string _driverType;
|
private readonly string _driverType;
|
||||||
private readonly Func<DriverResilienceOptions> _optionsAccessor;
|
private readonly Func<DriverResilienceOptions> _optionsAccessor;
|
||||||
|
private readonly DriverResilienceStatusTracker? _statusTracker;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Construct an invoker for one driver instance.
|
/// Construct an invoker for one driver instance.
|
||||||
@@ -33,11 +34,13 @@ public sealed class CapabilityInvoker
|
|||||||
/// pipeline-invalidate can take effect without restarting the invoker.
|
/// pipeline-invalidate can take effect without restarting the invoker.
|
||||||
/// </param>
|
/// </param>
|
||||||
/// <param name="driverType">Driver type name for structured-log enrichment (e.g. <c>"Modbus"</c>).</param>
|
/// <param name="driverType">Driver type name for structured-log enrichment (e.g. <c>"Modbus"</c>).</param>
|
||||||
|
/// <param name="statusTracker">Optional resilience-status tracker. When wired, every capability call records start/complete so Admin <c>/hosts</c> can surface <see cref="ResilienceStatusSnapshot.CurrentInFlight"/> as the bulkhead-depth proxy.</param>
|
||||||
public CapabilityInvoker(
|
public CapabilityInvoker(
|
||||||
DriverResiliencePipelineBuilder builder,
|
DriverResiliencePipelineBuilder builder,
|
||||||
string driverInstanceId,
|
string driverInstanceId,
|
||||||
Func<DriverResilienceOptions> optionsAccessor,
|
Func<DriverResilienceOptions> optionsAccessor,
|
||||||
string driverType = "Unknown")
|
string driverType = "Unknown",
|
||||||
|
DriverResilienceStatusTracker? statusTracker = null)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(builder);
|
ArgumentNullException.ThrowIfNull(builder);
|
||||||
ArgumentNullException.ThrowIfNull(optionsAccessor);
|
ArgumentNullException.ThrowIfNull(optionsAccessor);
|
||||||
@@ -46,6 +49,7 @@ public sealed class CapabilityInvoker
|
|||||||
_driverInstanceId = driverInstanceId;
|
_driverInstanceId = driverInstanceId;
|
||||||
_driverType = driverType;
|
_driverType = driverType;
|
||||||
_optionsAccessor = optionsAccessor;
|
_optionsAccessor = optionsAccessor;
|
||||||
|
_statusTracker = statusTracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Execute a capability call returning a value, honoring the per-capability pipeline.</summary>
|
/// <summary>Execute a capability call returning a value, honoring the per-capability pipeline.</summary>
|
||||||
@@ -59,11 +63,19 @@ public sealed class CapabilityInvoker
|
|||||||
ArgumentNullException.ThrowIfNull(callSite);
|
ArgumentNullException.ThrowIfNull(callSite);
|
||||||
|
|
||||||
var pipeline = ResolvePipeline(capability, hostName);
|
var pipeline = ResolvePipeline(capability, hostName);
|
||||||
|
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
|
||||||
|
try
|
||||||
|
{
|
||||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||||
{
|
{
|
||||||
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Execute a void-returning capability call, honoring the per-capability pipeline.</summary>
|
/// <summary>Execute a void-returning capability call, honoring the per-capability pipeline.</summary>
|
||||||
public async ValueTask ExecuteAsync(
|
public async ValueTask ExecuteAsync(
|
||||||
@@ -75,11 +87,19 @@ public sealed class CapabilityInvoker
|
|||||||
ArgumentNullException.ThrowIfNull(callSite);
|
ArgumentNullException.ThrowIfNull(callSite);
|
||||||
|
|
||||||
var pipeline = ResolvePipeline(capability, hostName);
|
var pipeline = ResolvePipeline(capability, hostName);
|
||||||
|
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
|
||||||
|
try
|
||||||
|
{
|
||||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||||
{
|
{
|
||||||
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Execute a <see cref="DriverCapability.Write"/> call honoring <see cref="WriteIdempotentAttribute"/>
|
/// Execute a <see cref="DriverCapability.Write"/> call honoring <see cref="WriteIdempotentAttribute"/>
|
||||||
|
|||||||
@@ -81,6 +81,29 @@ public sealed class DriverResilienceStatusTracker
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Record the entry of a capability call for this (instance, host). Increments the
|
||||||
|
/// in-flight counter used as the <see cref="ResilienceStatusSnapshot.CurrentInFlight"/>
|
||||||
|
/// surface (a cheap stand-in for Polly bulkhead depth). Paired with
|
||||||
|
/// <see cref="RecordCallComplete"/>; callers use try/finally.
|
||||||
|
/// </summary>
|
||||||
|
public void RecordCallStart(string driverInstanceId, string hostName)
|
||||||
|
{
|
||||||
|
var key = new StatusKey(driverInstanceId, hostName);
|
||||||
|
_status.AddOrUpdate(key,
|
||||||
|
_ => new ResilienceStatusSnapshot { CurrentInFlight = 1 },
|
||||||
|
(_, existing) => existing with { CurrentInFlight = existing.CurrentInFlight + 1 });
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Paired with <see cref="RecordCallStart"/> — decrements the in-flight counter.</summary>
|
||||||
|
public void RecordCallComplete(string driverInstanceId, string hostName)
|
||||||
|
{
|
||||||
|
var key = new StatusKey(driverInstanceId, hostName);
|
||||||
|
_status.AddOrUpdate(key,
|
||||||
|
_ => new ResilienceStatusSnapshot { CurrentInFlight = 0 }, // start-without-complete shouldn't happen; clamp to 0
|
||||||
|
(_, existing) => existing with { CurrentInFlight = Math.Max(0, existing.CurrentInFlight - 1) });
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Snapshot of a specific (instance, host) pair; null if no counters recorded yet.</summary>
|
/// <summary>Snapshot of a specific (instance, host) pair; null if no counters recorded yet.</summary>
|
||||||
public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) =>
|
public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) =>
|
||||||
_status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null;
|
_status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null;
|
||||||
@@ -101,4 +124,12 @@ public sealed record ResilienceStatusSnapshot
|
|||||||
public long BaselineFootprintBytes { get; init; }
|
public long BaselineFootprintBytes { get; init; }
|
||||||
public long CurrentFootprintBytes { get; init; }
|
public long CurrentFootprintBytes { get; init; }
|
||||||
public DateTime LastSampledUtc { get; init; }
|
public DateTime LastSampledUtc { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// In-flight capability calls against this (instance, host). Bumped on call entry +
|
||||||
|
/// decremented on completion. Feeds <c>DriverInstanceResilienceStatus.CurrentBulkheadDepth</c>
|
||||||
|
/// for Admin <c>/hosts</c> — a cheap proxy for the Polly bulkhead depth until the full
|
||||||
|
/// telemetry observer lands.
|
||||||
|
/// </summary>
|
||||||
|
public int CurrentInFlight { get; init; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -108,7 +108,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService
|
|||||||
HostName = hostName,
|
HostName = hostName,
|
||||||
LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc,
|
LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc,
|
||||||
ConsecutiveFailures = counters.ConsecutiveFailures,
|
ConsecutiveFailures = counters.ConsecutiveFailures,
|
||||||
CurrentBulkheadDepth = 0, // Phase 6.1 Stream A tracker doesn't emit bulkhead depth yet
|
CurrentBulkheadDepth = counters.CurrentInFlight,
|
||||||
LastRecycleUtc = counters.LastRecycleUtc,
|
LastRecycleUtc = counters.LastRecycleUtc,
|
||||||
BaselineFootprintBytes = counters.BaselineFootprintBytes,
|
BaselineFootprintBytes = counters.BaselineFootprintBytes,
|
||||||
CurrentFootprintBytes = counters.CurrentFootprintBytes,
|
CurrentFootprintBytes = counters.CurrentFootprintBytes,
|
||||||
@@ -119,6 +119,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService
|
|||||||
{
|
{
|
||||||
existing.LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc;
|
existing.LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc;
|
||||||
existing.ConsecutiveFailures = counters.ConsecutiveFailures;
|
existing.ConsecutiveFailures = counters.ConsecutiveFailures;
|
||||||
|
existing.CurrentBulkheadDepth = counters.CurrentInFlight;
|
||||||
existing.LastRecycleUtc = counters.LastRecycleUtc;
|
existing.LastRecycleUtc = counters.LastRecycleUtc;
|
||||||
existing.BaselineFootprintBytes = counters.BaselineFootprintBytes;
|
existing.BaselineFootprintBytes = counters.BaselineFootprintBytes;
|
||||||
existing.CurrentFootprintBytes = counters.CurrentFootprintBytes;
|
existing.CurrentFootprintBytes = counters.CurrentFootprintBytes;
|
||||||
|
|||||||
@@ -0,0 +1,130 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class InFlightCounterTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void StartThenComplete_NetsToZero()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallComplete("drv", "host-a");
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NestedStarts_SumDepth()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(3);
|
||||||
|
|
||||||
|
tracker.RecordCallComplete("drv", "host-a");
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CompleteBeforeStart_ClampedToZero()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordCallComplete("drv", "host-a");
|
||||||
|
|
||||||
|
// A stray Complete without a matching Start shouldn't drive the counter negative.
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DifferentHosts_TrackIndependently()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallStart("drv", "host-b");
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||||
|
tracker.TryGet("drv", "host-b")!.CurrentInFlight.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ConcurrentStarts_DoNotLose_Count()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
Parallel.For(0, 500, _ => tracker.RecordCallStart("drv", "host-a"));
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CapabilityInvoker_IncrementsTracker_DuringExecution()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
var invoker = new CapabilityInvoker(
|
||||||
|
new DriverResiliencePipelineBuilder(),
|
||||||
|
"drv-live",
|
||||||
|
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||||
|
driverType: "Modbus",
|
||||||
|
statusTracker: tracker);
|
||||||
|
|
||||||
|
var observedMidCall = -1;
|
||||||
|
await invoker.ExecuteAsync(
|
||||||
|
DriverCapability.Read,
|
||||||
|
"plc-1",
|
||||||
|
async _ =>
|
||||||
|
{
|
||||||
|
observedMidCall = tracker.TryGet("drv-live", "plc-1")?.CurrentInFlight ?? -1;
|
||||||
|
await Task.Yield();
|
||||||
|
return 42;
|
||||||
|
},
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
observedMidCall.ShouldBe(1, "during call, in-flight == 1");
|
||||||
|
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, "post-call, counter decremented");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CapabilityInvoker_ExceptionPath_DecrementsCounter()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
var invoker = new CapabilityInvoker(
|
||||||
|
new DriverResiliencePipelineBuilder(),
|
||||||
|
"drv-live",
|
||||||
|
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||||
|
statusTracker: tracker);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||||
|
await invoker.ExecuteAsync<int>(
|
||||||
|
DriverCapability.Write,
|
||||||
|
"plc-1",
|
||||||
|
_ => throw new InvalidOperationException("boom"),
|
||||||
|
CancellationToken.None));
|
||||||
|
|
||||||
|
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0,
|
||||||
|
"finally-block must decrement even when call-site throws");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CapabilityInvoker_WithoutTracker_DoesNotThrow()
|
||||||
|
{
|
||||||
|
var invoker = new CapabilityInvoker(
|
||||||
|
new DriverResiliencePipelineBuilder(),
|
||||||
|
"drv-live",
|
||||||
|
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||||
|
statusTracker: null);
|
||||||
|
|
||||||
|
var result = await invoker.ExecuteAsync(
|
||||||
|
DriverCapability.Read, "host-1",
|
||||||
|
_ => ValueTask.FromResult(7),
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
result.ShouldBe(7);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user