fix(review): remediate re-review findings — DCL-029/InboundAPI-031/SiteRuntime-032/StoreAndForward-028 + Low doc/test
Fixes the 8 findings from the 2026-06-24 re-review (commit c42bb485), with a
regression test per Medium finding:
- DataConnectionLayer-029 (Med): HandleAlarmSubscribeCompleted now mirrors the
tag-path re-check — if a feed is already stored for the source, release the
redundant just-created subscription instead of overwriting + leaking the first
one (the double-subscribe window DCL-023 reopened). +regression test.
- InboundAPI-031 (Med): remove WaitForAttribute's local 5s grace backstop (tighter
than the CommunicationService Ask's timeout+IntegrationTimeout round-trip budget,
so a slow-but-valid timed-out 'false' got cancelled into a 500). Link only the
client-abort + explicit caller tokens; the lower layer owns the backstop. +test.
- SiteRuntime-032 (Med): derive the deployed count from an authoritative set of
deployed config names (HashSet) instead of a map-presence-gated int, so deleting
a DISABLED instance decrements correctly (SiteRuntime-029's gate leaked it).
+deploy->disable->delete regression test.
- StoreAndForward-028 (Med): reset _bufferedCount in StopAsync alongside the
register-guard so a same-instance Stop->Start re-seeds from a clean base (no ~2N
gauge double-count). +restart regression test.
- AuditLog-017 (Low): test the OnIngestAsync scope-resolution guard (actor survives,
replies empty, counts the failure) — no longer unpinned.
- CentralUI-037 / ScriptAnalysis-009 / SiteRuntime-033 (Low): doc-comment + spec
fixes (Database-throws in the inbound sandbox; baseReferences param wording;
native-alarm cap return-to-normal + per-condition NativeAlarmDropped eviction).
Targeted suites green: SiteRuntime 5, StoreAndForward 6, InboundAPI 31,
DataConnectionLayer 10, AuditLog 5, ScriptAnalysis 40, CentralUI ScriptAnalysis 52.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
|
||||
using ZB.MOM.WW.Audit;
|
||||
@@ -184,6 +185,51 @@ public class AuditLogIngestActorTests : TestKit, IClassFixture<MsSqlMigrationFix
|
||||
Assert.DoesNotContain(rows, r => r.EventId == poisonId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Receive_WhenRepositoryResolutionThrows_ActorSurvives_RepliesEmpty_CountsFailure()
|
||||
{
|
||||
// AuditLog-017 (covers the AuditLog-014 guard): the production ctor resolves the
|
||||
// scoped repository per message. If scope creation / repository resolution throws
|
||||
// (transient DI or DbContext-factory fault, pooled-context init, a resolution race
|
||||
// during host churn), the outer guard must keep the singleton ALIVE, increment the
|
||||
// failure counter, and still reply with whatever was accepted (empty here) so the
|
||||
// site keeps its rows Pending and retries — rather than letting the throw restart
|
||||
// the singleton and drop the captured reply (the site's Ask would then time out).
|
||||
var counter = new CountingFailureCounter();
|
||||
|
||||
// A provider with NO IAuditLogRepository registered → GetRequiredService throws
|
||||
// inside the per-message scope; the failure counter IS registered so the guard's
|
||||
// catch can surface the fault.
|
||||
var services = new ServiceCollection();
|
||||
services.AddSingleton<ICentralAuditWriteFailureCounter>(counter);
|
||||
await using var provider = services.BuildServiceProvider();
|
||||
|
||||
var actor = Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
|
||||
(IServiceProvider)provider, NullLogger<AuditLogIngestActor>.Instance)));
|
||||
|
||||
// First batch: resolution throws → empty reply, one counted failure, no restart.
|
||||
actor.Tell(new IngestAuditEventsCommand(
|
||||
Enumerable.Range(0, 3).Select(_ => NewEvent(NewSiteId())).ToList()), TestActor);
|
||||
var reply = ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
|
||||
Assert.Empty(reply.AcceptedEventIds);
|
||||
Assert.Equal(1, counter.Count);
|
||||
|
||||
// Second batch proves the actor was not restarted/wedged: it still processes
|
||||
// messages and the guard fires again.
|
||||
actor.Tell(new IngestAuditEventsCommand(
|
||||
new List<AuditEvent> { NewEvent(NewSiteId()) }), TestActor);
|
||||
var reply2 = ExpectMsg<IngestAuditEventsReply>(TimeSpan.FromSeconds(10));
|
||||
Assert.Empty(reply2.AcceptedEventIds);
|
||||
Assert.Equal(2, counter.Count);
|
||||
}
|
||||
|
||||
/// <summary>Counts how many times the guard's catch surfaced a write failure.</summary>
|
||||
private sealed class CountingFailureCounter : ICentralAuditWriteFailureCounter
|
||||
{
|
||||
public int Count { get; private set; }
|
||||
public void Increment() => Count++;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tiny test double that delegates to a real repository but throws on a
|
||||
/// specified EventId. Used to verify per-row failure isolation: one bad
|
||||
|
||||
+80
@@ -306,4 +306,84 @@ public class DataConnectionActorAlarmTests : TestKit
|
||||
await alarmable.Received(2).SubscribeAlarmsAsync(
|
||||
"Tank01", Arg.Any<string?>(), Arg.Any<AlarmTransitionCallback>(), Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
// ── DataConnectionLayer-029: a re-subscribe during an orphaned in-flight subscribe
|
||||
// must not leak a duplicate adapter feed ──
|
||||
|
||||
[Fact]
|
||||
public async Task DCL029_ResubscribeDuringOrphanedInFlightSubscribe_ReleasesDuplicateFeed_NoLeak()
|
||||
{
|
||||
// Regression test for DataConnectionLayer-029. The DCL-023 fix clears the in-flight
|
||||
// marker on unsubscribe, which reopens a double-subscribe window: unsubscribe (last
|
||||
// subscriber, subId not stored yet) → a fresh subscribe for the SAME source sees
|
||||
// neither a stored id nor an in-flight marker, so it issues a SECOND adapter feed →
|
||||
// both completions fire. The DCL-023 orphan guard does NOT trigger on either
|
||||
// completion (the re-subscribe re-added the subscriber), so the alarm completion
|
||||
// handler used to OVERWRITE _alarmSubscriptionIds with the second id — leaking the
|
||||
// first feed (never unsubscribed, kept streaming). After DCL-029 the handler mirrors
|
||||
// the tag-path re-check: when a feed is already stored, the redundant completion
|
||||
// releases its just-created feed instead of overwriting + leaking.
|
||||
var sub1Started = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var sub1Release = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var sub2Started = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var sub2Release = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
var adapter = Substitute.For<IDataConnection, IAlarmSubscribableConnection>();
|
||||
adapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||
.Returns(Task.CompletedTask);
|
||||
var alarmable = (IAlarmSubscribableConnection)adapter;
|
||||
|
||||
var calls = 0;
|
||||
alarmable.SubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<string?>(),
|
||||
Arg.Any<AlarmTransitionCallback>(), Arg.Any<CancellationToken>())
|
||||
.Returns(_ =>
|
||||
{
|
||||
if (Interlocked.Increment(ref calls) == 1)
|
||||
{
|
||||
sub1Started.TrySetResult();
|
||||
return sub1Release.Task;
|
||||
}
|
||||
sub2Started.TrySetResult();
|
||||
return sub2Release.Task;
|
||||
});
|
||||
alarmable.UnsubscribeAlarmsAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||
.Returns(Task.CompletedTask);
|
||||
|
||||
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
||||
"conn", adapter, _options, _health, _factory, "OpcUa")));
|
||||
|
||||
// (1) Subscribe A — adapter subscribe #1 parks, in-flight={Tank01}.
|
||||
actor.Tell(new SubscribeAlarmsRequest("c1", "instA", "conn", "Tank01", null, DateTimeOffset.UtcNow));
|
||||
await sub1Started.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
|
||||
// (2) Last subscriber unsubscribes while subscribe #1 is in flight — clears the
|
||||
// in-flight marker (DCL-023); subId#1 is not stored yet so no teardown happens.
|
||||
actor.Tell(new UnsubscribeAlarmsRequest("unsub-c1", "instA", "conn", "Tank01", DateTimeOffset.UtcNow));
|
||||
await Task.Delay(150);
|
||||
|
||||
// (3) Fresh subscribe for the SAME source before #1 completes — neither a stored id
|
||||
// nor an in-flight marker exists, so the actor issues a SECOND adapter subscribe.
|
||||
actor.Tell(new SubscribeAlarmsRequest("c2", "instB", "conn", "Tank01", null, DateTimeOffset.UtcNow));
|
||||
await sub2Started.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
|
||||
// (4) Complete subscribe #1 → a subscriber exists again (B re-added), so the orphan
|
||||
// guard does NOT fire and subId#1 is stored as the live feed.
|
||||
sub1Release.SetResult("alarm-sub-1");
|
||||
await Task.Delay(150);
|
||||
|
||||
// (5) Complete subscribe #2 → a feed is already stored, so this redundant completion
|
||||
// releases its just-created feed (#2) instead of overwriting + leaking subId#1.
|
||||
sub2Release.SetResult("alarm-sub-2");
|
||||
await Task.Delay(300);
|
||||
|
||||
// The duplicate feed (#2) is released exactly once; the first feed (#1) is retained.
|
||||
await alarmable.Received(1).UnsubscribeAlarmsAsync("alarm-sub-2", Arg.Any<CancellationToken>());
|
||||
await alarmable.DidNotReceive().UnsubscribeAlarmsAsync("alarm-sub-1", Arg.Any<CancellationToken>());
|
||||
|
||||
// The retained feed (#1) is what a later unsubscribe tears down — proving subId#1,
|
||||
// not the duplicate, is the id _alarmSubscriptionIds actually tracks (no leak).
|
||||
actor.Tell(new UnsubscribeAlarmsRequest("unsub-c2", "instB", "conn", "Tank01", DateTimeOffset.UtcNow));
|
||||
await Task.Delay(200);
|
||||
await alarmable.Received(1).UnsubscribeAlarmsAsync("alarm-sub-1", Arg.Any<CancellationToken>());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,6 +279,41 @@ public class RouteHelperTests
|
||||
Assert.True(seen.IsCancellationRequested); // the client-abort token cancels the wait
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WaitForAttribute_SlowTimedOutResponse_NotPreemptedByLocalBackstop()
|
||||
{
|
||||
// InboundAPI-031: WaitForAttribute must NOT impose a local wait-timeout backstop.
|
||||
// The site enforces the wait timeout and returns Matched=false; the round trip is
|
||||
// bounded by CommunicationService's Ask (timeout + IntegrationTimeout). A local CTS
|
||||
// of `timeout + small grace` (the prior InboundAPI-029 approach) was TIGHTER than
|
||||
// that round-trip budget, so a slow-but-valid timed-out response would be cancelled
|
||||
// into an exception (a 500) instead of the spec §6 `false`. With the backstop
|
||||
// removed, a response arriving well after the (tiny) wait timeout still returns a
|
||||
// clean `false`, and the token the router observed was never cancelled by RouteHelper
|
||||
// — if a tight local backstop were reintroduced, the honoured token below would throw.
|
||||
SiteResolves("inst-1", "SiteA");
|
||||
CancellationToken seen = default;
|
||||
_router.RouteToWaitForAttributeAsync("SiteA", Arg.Any<RouteToWaitForAttributeRequest>(), Arg.Do<CancellationToken>(t => seen = t))
|
||||
.Returns(async ci =>
|
||||
{
|
||||
var token = (CancellationToken)ci[2];
|
||||
// Simulate the site enforcing the wait + a round trip far longer than the
|
||||
// tiny wait timeout; honour the token so a (re)introduced local backstop
|
||||
// would surface here as an OperationCanceledException.
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(400), token);
|
||||
return new RouteToWaitForAttributeResponse(
|
||||
((RouteToWaitForAttributeRequest)ci[1]).CorrelationId,
|
||||
Matched: false, Value: null, Quality: null, TimedOut: true,
|
||||
Success: true, ErrorMessage: null, DateTimeOffset.UtcNow);
|
||||
});
|
||||
|
||||
var matched = await CreateHelper().To("inst-1")
|
||||
.WaitForAttribute("Flag", true, TimeSpan.FromMilliseconds(20));
|
||||
|
||||
Assert.False(matched); // clean spec §6 false, not a cancellation
|
||||
Assert.False(seen.IsCancellationRequested); // no local wait-timeout backstop fired
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WaitForAttribute_WithParentExecutionId_CarriesItOnRequest()
|
||||
{
|
||||
|
||||
+41
@@ -240,6 +240,47 @@ public class DeploymentManagerRedeployTests : TestKit, IDisposable
|
||||
Assert.Equal(0, health.LastDeployedCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SR032_DeleteDisabledInstance_DecrementsDeployedCount()
|
||||
{
|
||||
// Regression test for SiteRuntime-032. SiteRuntime-029 gated the deployed-count
|
||||
// decrement on the instance being present in _instanceActors OR mid-redeploy in
|
||||
// _terminatingActorsByName. A DISABLED instance is in NEITHER map (disable removes
|
||||
// it from _instanceActors and never adds it to the terminating shadow) yet still has
|
||||
// a deployed-config row counted as deployed — so deleting a disabled instance
|
||||
// skipped the decrement and leaked the deployed/disabled tally on the health
|
||||
// dashboard. After the fix the count is derived from the authoritative set of
|
||||
// deployed config names, so a delete decrements for a disabled instance too.
|
||||
var health = new CountCapturingHealthCollector();
|
||||
var actor = CreateDeploymentManager(health);
|
||||
await Task.Delay(500);
|
||||
|
||||
// Deploy → deployed count 1.
|
||||
actor.Tell(new DeployInstanceCommand(
|
||||
"dep-1", "DisablePump", "h1", MakeConfigJson("DisablePump"), "admin", DateTimeOffset.UtcNow));
|
||||
var deploy = ExpectMsg<DeploymentStatusResponse>(TimeSpan.FromSeconds(5));
|
||||
Assert.Equal(DeploymentStatus.Success, deploy.Status);
|
||||
await Task.Delay(300);
|
||||
Assert.Equal(1, health.LastDeployedCount);
|
||||
|
||||
// Disable → the instance is still deployed (count stays 1), just not enabled.
|
||||
actor.Tell(new DisableInstanceCommand("cmd-1", "DisablePump", DateTimeOffset.UtcNow));
|
||||
var disable = ExpectMsg<InstanceLifecycleResponse>(TimeSpan.FromSeconds(5));
|
||||
Assert.True(disable.Success);
|
||||
Assert.Equal(1, health.LastDeployedCount);
|
||||
|
||||
// Delete the DISABLED instance → the deployed count must return to 0.
|
||||
// (The SiteRuntime-029 regression left it stuck at 1.)
|
||||
actor.Tell(new DeleteInstanceCommand("del-1", "DisablePump", DateTimeOffset.UtcNow));
|
||||
var delete = ExpectMsg<InstanceLifecycleResponse>(TimeSpan.FromSeconds(5));
|
||||
Assert.True(delete.Success);
|
||||
Assert.Equal(0, health.LastDeployedCount);
|
||||
|
||||
// No deployed-config row remains.
|
||||
var configs = await _storage.GetAllDeployedConfigsAsync();
|
||||
Assert.DoesNotContain(configs, c => c.InstanceUniqueName == "DisablePump");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Redeploy_ExistingInstance_DoesNotOverCountDeployedInstances()
|
||||
{
|
||||
|
||||
@@ -209,6 +209,50 @@ public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable
|
||||
Assert.Equal(0, ReadQueueDepthGauge());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// StoreAndForward-028: a same-instance Stop→Start must re-seed the cached depth from a
|
||||
/// clean base. <see cref="StoreAndForwardService.StopAsync"/> resets the one-time
|
||||
/// registration guard so a later <see cref="StoreAndForwardService.StartAsync"/>
|
||||
/// re-registers and re-seeds <c>_bufferedCount</c> from the durable Pending count; if
|
||||
/// StopAsync did not also reset <c>_bufferedCount</c>, the restart would ADD the re-read
|
||||
/// count on top of the leftover in-memory value, double-counting the gauge to ~2N.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task StartAsync_AfterStop_ReseedsFromCleanBase_NoDoubleCount()
|
||||
{
|
||||
// One durable Pending row that survives the stop/restart in SQLite.
|
||||
await _storage.EnqueueAsync(new StoreAndForwardMessage
|
||||
{
|
||||
Id = Guid.NewGuid().ToString("N"),
|
||||
Category = StoreAndForwardCategory.ExternalSystem,
|
||||
Target = "api",
|
||||
PayloadJson = "{}",
|
||||
Status = StoreAndForwardMessageStatus.Pending,
|
||||
CreatedAt = DateTimeOffset.UtcNow,
|
||||
MaxRetries = 3
|
||||
});
|
||||
|
||||
var svc = new StoreAndForwardService(
|
||||
_storage,
|
||||
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
||||
NullLogger<StoreAndForwardService>.Instance);
|
||||
|
||||
// First start seeds the cached count from the durable store → 1.
|
||||
await svc.StartAsync();
|
||||
Assert.Equal(1, ReadQueueDepthGauge());
|
||||
|
||||
// Graceful stop resets the registration guard AND the cached count (the fix).
|
||||
await svc.StopAsync();
|
||||
|
||||
// Restart the SAME instance: the guard was reset so StartAsync re-seeds from the
|
||||
// store (still 1 Pending). With the _bufferedCount reset the gauge reports 1, not 2;
|
||||
// without it the seed would ADD onto the leftover 1 → 2 (the double-count bug).
|
||||
await svc.StartAsync();
|
||||
Assert.Equal(1, ReadQueueDepthGauge());
|
||||
|
||||
await svc.StopAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Gauge_SeedsFromExistingPendingRows_OnStart()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user