Phase 6.1 Stream E.3 partial — in-flight counter feeds CurrentBulkheadDepth
Closes the observer half of #162 that was flagged as "persisted as 0 today" in PR #105. The Admin /hosts column refresh + FleetStatusHub SignalR push + red-badge visual still belong to the visual-compliance pass. Core.Resilience: - DriverResilienceStatusTracker gains RecordCallStart + RecordCallComplete + CurrentInFlight field on the snapshot record. Concurrent-safe via the same ConcurrentDictionary.AddOrUpdate pattern as the other recorder methods. Clamps to zero on over-decrement so a stray Complete-without-Start can't drive the counter negative. - CapabilityInvoker gains an optional statusTracker ctor parameter. When wired, every ExecuteAsync / ExecuteAsync(void) wraps the pipeline call in try / finally that records start/complete — so the counter advances cleanly whether the call succeeds, cancels, or throws. Null tracker keeps the pre-Phase-6.1 Stream E.3 behaviour exactly. Server.Hosting: - ResilienceStatusPublisherHostedService persists CurrentInFlight as the DriverInstanceResilienceStatus.CurrentBulkheadDepth column (was 0 before this PR). One-line fix on both the insert + update branches. The in-flight counter is a pragmatic proxy for Polly's internal bulkhead depth — a future PR wiring Polly telemetry would replace it with the real value. The shape of the column + the publisher + the Admin /hosts query doesn't change, so the follow-up is invisible to consumers. Tests (8 new InFlightCounterTests, all pass): - Start+Complete nets to zero. - Nested starts sum; Complete decrements. - Complete-without-Start clamps to zero. - Different hosts track independently. - Concurrent starts (500 parallel) don't lose count. - CapabilityInvoker observed-mid-call depth == 1 during a pending call. - CapabilityInvoker exception path still decrements (try/finally). - CapabilityInvoker without tracker doesn't throw. Full solution dotnet test: 1243 passing (was 1235, +8). Pre-existing Client.CLI Subscribe flake unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,130 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class InFlightCounterTests
|
||||
{
|
||||
[Fact]
|
||||
public void StartThenComplete_NetsToZero()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallComplete("drv", "host-a");
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NestedStarts_SumDepth()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(3);
|
||||
|
||||
tracker.RecordCallComplete("drv", "host-a");
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CompleteBeforeStart_ClampedToZero()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallComplete("drv", "host-a");
|
||||
|
||||
// A stray Complete without a matching Start shouldn't drive the counter negative.
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DifferentHosts_TrackIndependently()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-b");
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||
tracker.TryGet("drv", "host-b")!.CurrentInFlight.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ConcurrentStarts_DoNotLose_Count()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
Parallel.For(0, 500, _ => tracker.RecordCallStart("drv", "host-a"));
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(500);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CapabilityInvoker_IncrementsTracker_DuringExecution()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
var invoker = new CapabilityInvoker(
|
||||
new DriverResiliencePipelineBuilder(),
|
||||
"drv-live",
|
||||
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||
driverType: "Modbus",
|
||||
statusTracker: tracker);
|
||||
|
||||
var observedMidCall = -1;
|
||||
await invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
"plc-1",
|
||||
async _ =>
|
||||
{
|
||||
observedMidCall = tracker.TryGet("drv-live", "plc-1")?.CurrentInFlight ?? -1;
|
||||
await Task.Yield();
|
||||
return 42;
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
observedMidCall.ShouldBe(1, "during call, in-flight == 1");
|
||||
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, "post-call, counter decremented");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CapabilityInvoker_ExceptionPath_DecrementsCounter()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
var invoker = new CapabilityInvoker(
|
||||
new DriverResiliencePipelineBuilder(),
|
||||
"drv-live",
|
||||
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||
statusTracker: tracker);
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await invoker.ExecuteAsync<int>(
|
||||
DriverCapability.Write,
|
||||
"plc-1",
|
||||
_ => throw new InvalidOperationException("boom"),
|
||||
CancellationToken.None));
|
||||
|
||||
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0,
|
||||
"finally-block must decrement even when call-site throws");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CapabilityInvoker_WithoutTracker_DoesNotThrow()
|
||||
{
|
||||
var invoker = new CapabilityInvoker(
|
||||
new DriverResiliencePipelineBuilder(),
|
||||
"drv-live",
|
||||
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||
statusTracker: null);
|
||||
|
||||
var result = await invoker.ExecuteAsync(
|
||||
DriverCapability.Read, "host-1",
|
||||
_ => ValueTask.FromResult(7),
|
||||
CancellationToken.None);
|
||||
|
||||
result.ShouldBe(7);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user