perf: close Theme 6 — 11 allocation / N+1 / lock-contention findings

Well-localised perf fixes across 8 modules.

Lock decoupling / SQL streaming:
- AuditLog-005: SqliteAuditWriter gains dedicated read-only _readConnection
  (+ _readLock) backed by WAL journal mode. GetBacklogStatsAsync,
  ReadPendingAsync, ReadPendingSinceAsync, ReadForwardedAsync no longer
  contend with the hot-path INSERT lock — backlog probes on a 30s timer
  can't stall the writer under multi-hundred-K Pending backlog.
- SEL-022: dropped Cache=Shared from SiteEventLogger's default connection
  string (single-connection logger; mode was dormant config).

Memory / streaming:
- CLI-019: bundle export streams base64 in 1 MB-aligned chunks via
  Convert.TryFromBase64Chars straight into the FileStream — no more
  full-bundle byte[] allocation.
- CentralUI-031: TransportImport now stages the upload to a per-session
  temp file under Path.GetTempPath() (replaces in-memory byte[] field);
  page implements IDisposable to delete the temp file on reset / new
  upload / dispose. Per-circuit working set drops from ~100 MB to ~80 KB.

N+1 hoisting:
- Transport-008: added ITemplateEngineRepository.GetTemplatesWithChildrenAsync
  bulk method; BundleImporter.PreviewAsync calls it once instead of per-
  template-name. Single query with .Include(...).AsSplitQuery().
- DM-023: BuildDeployArtifactsCommandAsync's per-site loop now references
  a pre-fetched GlobalArtifactSnapshot (shared scripts, external systems,
  DB connections, notification lists, SMTP) instead of re-querying per site.
- MgmtSvc-023: HandleQueryDeployments unfiltered branch uses one
  GetAllInstancesAsync bulk load + Dictionary<int,int?> lookup (was a
  GetInstanceByIdAsync per record).

Small allocations / per-tick rebuilds:
- InboundAPI-019: AuditWriteMiddleware gates EnableBuffering() on
  RequestHasBody() so GET/HEAD/DELETE/TRACE/OPTIONS and Content-Length:0
  requests skip the FileBufferingReadStream allocation.
- NotifOutbox-006: ResolveAdapters dictionary now cached on
  _adaptersCache (built lazily on first sweep) + actor-lifetime
  _adaptersScope; ResolveAdapters no longer rebuilds per dispatch tick.

Verify-only:
- Comm-017: Confirmed _inProgressDeployments was deleted by Comm-016 in
  commit ac96b83 — marked Resolved with that attribution. No code change.

Doc-correction:
- NS-022: Updated MailKitSmtpClientWrapper XML doc to spell out single-
  connection / per-delivery-factory contract (option (b) — transient
  client per Send — rejected because it re-handshakes TLS per email).

10+ new regression tests across 8 test projects. Build clean; affected
suites all green. README regenerated: 54 open (was 65).
This commit is contained in:
Joseph Doherty
2026-05-28 07:47:24 -04:00
parent 2ed5c6c379
commit 55f46e7c92
34 changed files with 1131 additions and 149 deletions
@@ -112,6 +112,59 @@ public class SqliteAuditWriterBacklogStatsTests : IDisposable
Assert.Equal(t1, snapshot.OldestPendingUtc!.Value);
}
[Fact]
public async Task GetBacklogStatsAsync_DoesNotBlockOnConcurrentWriteLoad()
{
// AuditLog-005: GetBacklogStatsAsync previously took _writeLock, the
// same lock that serialises every batch INSERT in FlushBatch. Under a
// backlog growing to hundreds of thousands of rows a COUNT(*)+MIN
// index scan could park the hot-path writer for hundreds of ms. The
// fix adds a dedicated read-only connection in WAL mode so the probe
// never contends with the writer.
//
// This test demonstrates the lock decoupling by saturating the writer
// with a burst of concurrent writes and asserting that a probe issued
// while those writes are in flight returns inside a tight time bound.
// Without the fix the probe would be queued behind FlushBatch under
// the same _writeLock; with the fix it reads through _readConnection
// and is not gated by the writer.
await using var writer = CreateWriter();
// Seed a baseline so MIN(OccurredAtUtc) has a row to find — the
// important assertion is timing, but a non-empty result also confirms
// the read connection sees the writer's commits via WAL.
for (var i = 0; i < 100; i++)
{
await writer.WriteAsync(NewEvent());
}
// Kick off a sustained write burst on a background task. The writes
// are fire-and-forget — we only need the writer to be busy enough
// that any reuse of _writeLock by the probe would be observable.
var burst = Task.Run(async () =>
{
for (var i = 0; i < 2_000; i++)
{
await writer.WriteAsync(NewEvent()).ConfigureAwait(false);
}
});
// Race the probe against the write burst. The probe must return
// promptly even though the writer is actively flushing batches.
var sw = System.Diagnostics.Stopwatch.StartNew();
var snapshot = await writer.GetBacklogStatsAsync();
sw.Stop();
// Drain the burst before disposing so we don't observe a flake when
// pending writes race with dispose.
await burst;
Assert.True(sw.ElapsedMilliseconds < 1_000,
$"GetBacklogStatsAsync must not block on the writer's _writeLock; took {sw.ElapsedMilliseconds} ms");
Assert.True(snapshot.PendingCount >= 100,
$"backlog probe should see at least the seeded rows; got {snapshot.PendingCount}");
}
[Fact]
public async Task OnDiskBytes_ReturnsFileSize()
{
@@ -0,0 +1,94 @@
using ScadaLink.CLI.Commands;
namespace ScadaLink.CLI.Tests.Commands;
/// <summary>
/// CLI-019 regression tests for <see cref="BundleCommands.StreamBase64ToFile"/>.
/// The pre-fix code did <c>Convert.FromBase64String(...) → File.WriteAllBytes(...)</c>,
/// doubling the bundle's bytes onto the LOH and writing synchronously. The new
/// streaming helper decodes the base64 string in fixed-size chunks straight into
/// a <see cref="FileStream"/>, so peak working set is bounded by the chunk size
/// regardless of how large the bundle is.
/// </summary>
public class BundleCommandsStreamingTests : IDisposable
{
private readonly string _tempPath;
public BundleCommandsStreamingTests()
{
_tempPath = Path.Combine(Path.GetTempPath(), $"bundle-stream-test-{Guid.NewGuid():N}.bin");
}
public void Dispose()
{
if (File.Exists(_tempPath))
{
File.Delete(_tempPath);
}
}
[Fact]
public void StreamBase64ToFile_SmallPayload_RoundTrips()
{
var bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var base64 = Convert.ToBase64String(bytes);
var written = BundleCommands.StreamBase64ToFile(base64, _tempPath);
Assert.Equal(bytes.Length, written);
var roundTripped = File.ReadAllBytes(_tempPath);
Assert.Equal(bytes, roundTripped);
}
[Fact]
public void StreamBase64ToFile_PayloadCrossesChunkBoundary_RoundTrips()
{
// Build a payload several chunks wide so the slicing loop runs more than
// once, with enough trailing bytes that the final slice is short and
// exercises the padding/short-final-chunk path.
var size = (BundleCommands.Base64StreamChunkChars / 4 * 3) * 3 + 17;
var bytes = new byte[size];
for (var i = 0; i < size; i++) bytes[i] = (byte)(i & 0xFF);
var base64 = Convert.ToBase64String(bytes);
var written = BundleCommands.StreamBase64ToFile(base64, _tempPath);
Assert.Equal(size, written);
var roundTripped = File.ReadAllBytes(_tempPath);
Assert.Equal(bytes, roundTripped);
}
[Fact]
public void StreamBase64ToFile_EmptyString_WritesEmptyFile()
{
var written = BundleCommands.StreamBase64ToFile(string.Empty, _tempPath);
Assert.Equal(0, written);
Assert.True(File.Exists(_tempPath));
Assert.Empty(File.ReadAllBytes(_tempPath));
}
[Fact]
public void StreamBase64ToFile_InvalidBase64_ThrowsFormatException()
{
// '*' is not a valid base64 character, so TryFromBase64Chars returns
// false and the helper throws — the pre-fix code threw FormatException
// from Convert.FromBase64String, so the contract is preserved.
var invalid = "this is not valid base64 !!!*";
Assert.Throws<FormatException>(() => BundleCommands.StreamBase64ToFile(invalid, _tempPath));
}
[Fact]
public void StreamBase64ToFile_NullBase64_Throws()
{
Assert.Throws<ArgumentNullException>(() => BundleCommands.StreamBase64ToFile(null!, _tempPath));
}
[Fact]
public void StreamBase64ToFile_EmptyOutputPath_Throws()
{
Assert.Throws<ArgumentException>(() => BundleCommands.StreamBase64ToFile("AAAA", string.Empty));
}
}
@@ -370,13 +370,20 @@ public class TransportImportPageTests : BunitContext
}
/// <summary>
/// Seeds the wizard at Step 2 (Passphrase) with cached bundle bytes — the
/// Seeds the wizard at Step 2 (Passphrase) with a staged bundle file — the
/// shape after an encrypted-bundle upload completed Step 1's peek and
/// surfaced an ArgumentException ("passphrase required").
/// surfaced an ArgumentException ("passphrase required"). CentralUI-031:
/// the wizard now stages the upload to a temp file and only retains the
/// path on the component, so the test helper writes the bytes to a per-
/// test temp file and sets the path field instead of the byte[] field.
/// </summary>
private static void SeedAtPassphraseStep(TransportImportPage instance, byte[] bytes)
{
SetField(instance, "_bundleBytes", bytes);
var dir = Path.Combine(Path.GetTempPath(), "scadalink-transport-staging");
Directory.CreateDirectory(dir);
var path = Path.Combine(dir, $"test-{Guid.NewGuid():N}.scadabundle");
File.WriteAllBytes(path, bytes);
SetField(instance, "_bundleTempPath", path);
SetField(instance, "_session", null);
SetField(instance, "_step", TransportImportPage.ImportWizardStep.Passphrase);
SetField(instance, "_failedUnlockAttempts", 0);
@@ -69,6 +69,66 @@ public class TemplateEngineRepositoryTests : IDisposable
Assert.Null(loaded);
}
[Fact]
public async Task GetTemplatesWithChildrenAsync_BulkVariant_FetchesEveryMatchingNameInOneQuery()
{
// Transport-008 regression: BundleImporter.PreviewAsync previously
// called GetTemplateWithChildrenAsync(stub.Id) per matching template
// name. The bulk variant returns every match in a single query.
var a = new Template("Alpha");
a.Attributes.Add(new TemplateAttribute("A1"));
a.Scripts.Add(new TemplateScript("AS1", "return 1;"));
var b = new Template("Beta");
b.Alarms.Add(new TemplateAlarm("BAlarm"));
var c = new Template("Gamma");
_context.Templates.AddRange(a, b, c);
await _context.SaveChangesAsync();
var result = await _repository.GetTemplatesWithChildrenAsync(new[] { "Alpha", "Beta", "DoesNotExist" });
Assert.Equal(2, result.Count);
var loadedA = Assert.Single(result, t => t.Name == "Alpha");
var loadedB = Assert.Single(result, t => t.Name == "Beta");
Assert.Single(loadedA.Attributes);
Assert.Equal("A1", loadedA.Attributes.First().Name);
Assert.Single(loadedA.Scripts);
Assert.Single(loadedB.Alarms);
Assert.Equal("BAlarm", loadedB.Alarms.First().Name);
Assert.DoesNotContain(result, t => t.Name == "Gamma");
}
[Fact]
public async Task GetTemplatesWithChildrenAsync_EmptyNames_ReturnsEmpty()
{
var result = await _repository.GetTemplatesWithChildrenAsync(Array.Empty<string>());
Assert.Empty(result);
}
[Fact]
public async Task GetTemplatesWithChildrenAsync_NullEnumerable_ReturnsEmpty()
{
var result = await _repository.GetTemplatesWithChildrenAsync(null!);
Assert.Empty(result);
}
[Fact]
public async Task GetTemplatesWithChildrenAsync_FiltersOutDuplicatesAndEmptyStrings()
{
var a = new Template("Alpha");
_context.Templates.Add(a);
await _context.SaveChangesAsync();
// Duplicate names + empty / null entries should not throw; the helper
// deduplicates and filters them out before the SQL IN clause.
var result = await _repository.GetTemplatesWithChildrenAsync(
new[] { "Alpha", "Alpha", "", null!, "Alpha" });
Assert.Single(result);
Assert.Equal("Alpha", result[0].Name);
}
[Fact]
public async Task GetTemplateWithChildrenAsync_PreservesParentTemplateId_ForInheritanceChainWalk()
{
@@ -140,6 +140,63 @@ public class ArtifactDeploymentServiceTests : TestKit
Assert.Contains(result.Value.SiteResults, r => r.SiteId == "fail-site" && !r.Success);
}
// ── DeploymentManager-023: global artifact queries hoisted out of the per-site loop ──
[Fact]
public async Task DeployToAllSitesAsync_HoistsGlobalArtifactQueriesOutOfPerSiteLoop()
{
// DeploymentManager-023: previously each per-site iteration of the deploy-many
// loop re-issued the global artifact queries (shared scripts, external systems,
// DB connections, notification lists, SMTP configs) — a textbook N+1 over the
// global sets. With three sites the queries must now be issued ONCE in total,
// regardless of site count.
var sites = new List<Site>
{
new("Site One", "site-1") { Id = 1 },
new("Site Two", "site-2") { Id = 2 },
new("Site Three", "site-3") { Id = 3 },
};
_siteRepo.GetAllSitesAsync(Arg.Any<CancellationToken>()).Returns(sites);
var probe = Sys.ActorOf(Props.Create(() => new ArtifactProbeActor()));
var service = CreateServiceWithCommActor(probe);
var result = await service.DeployToAllSitesAsync("admin");
Assert.True(result.IsSuccess);
// Each global query must be called EXACTLY ONCE for the whole multi-site sweep.
await _templateRepo.Received(1).GetAllSharedScriptsAsync(Arg.Any<CancellationToken>());
await _externalSystemRepo.Received(1).GetAllExternalSystemsAsync(Arg.Any<CancellationToken>());
await _externalSystemRepo.Received(1).GetAllDatabaseConnectionsAsync(Arg.Any<CancellationToken>());
await _notificationRepo.Received(1).GetAllNotificationListsAsync(Arg.Any<CancellationToken>());
await _notificationRepo.Received(1).GetAllSmtpConfigurationsAsync(Arg.Any<CancellationToken>());
// The per-site query (data connections) DOES vary per site and must still run
// once per site.
await _siteRepo.Received(1).GetDataConnectionsBySiteIdAsync(1, Arg.Any<CancellationToken>());
await _siteRepo.Received(1).GetDataConnectionsBySiteIdAsync(2, Arg.Any<CancellationToken>());
await _siteRepo.Received(1).GetDataConnectionsBySiteIdAsync(3, Arg.Any<CancellationToken>());
}
[Fact]
public async Task RetryForSiteAsync_SingleSitePath_StillRunsTheGlobalQueriesOnce()
{
// DeploymentManager-023: the single-site convenience overload still owns its
// own global-fetch (it cannot inherit from a sweep), so for one site every
// global query is issued exactly once. Pin this so a future refactor cannot
// accidentally route RetryForSiteAsync through the multi-site loop and lose
// the audit row's deploymentId guarantee.
var probe = Sys.ActorOf(Props.Create(() => new ArtifactProbeActor()));
var service = CreateServiceWithCommActor(probe);
var result = await service.RetryForSiteAsync(1, "retry-site", "admin");
Assert.True(result.IsSuccess);
await _templateRepo.Received(1).GetAllSharedScriptsAsync(Arg.Any<CancellationToken>());
await _externalSystemRepo.Received(1).GetAllExternalSystemsAsync(Arg.Any<CancellationToken>());
await _externalSystemRepo.Received(1).GetAllDatabaseConnectionsAsync(Arg.Any<CancellationToken>());
await _notificationRepo.Received(1).GetAllNotificationListsAsync(Arg.Any<CancellationToken>());
await _notificationRepo.Received(1).GetAllSmtpConfigurationsAsync(Arg.Any<CancellationToken>());
}
[Fact]
public async Task RetryForSiteAsync_SiteSucceeds_ReturnsSuccessAndAudits()
{
@@ -800,4 +800,109 @@ public class AuditWriteMiddlewareTests
"Expected a Warning log entry observing the async audit-write fault — none found. " +
$"Entries: [{string.Join(", ", snapshot)}]");
}
// ---------------------------------------------------------------------
// InboundAPI-019 — bodyless requests skip EnableBuffering so the
// FileBufferingReadStream allocation is avoided on GET/HEAD/DELETE
// and any request whose Content-Length is 0. The audit row still emits
// with a null RequestSummary, mirroring the bodyless-POST contract.
// ---------------------------------------------------------------------
[Theory]
[InlineData("GET")]
[InlineData("HEAD")]
[InlineData("DELETE")]
public async Task BodylessMethod_SkipsEnableBuffering_RequestStreamIsNotReplaced(string method)
{
// The middleware previously called EnableBuffering on every request,
// installing a FileBufferingReadStream wrapper even when the request
// had no body. The bodyless-method short-circuit must leave
// Request.Body untouched (still the original empty stream the test
// assigns below), proving the buffering wrapper allocation is avoided.
var writer = new RecordingAuditWriter();
var ctx = new DefaultHttpContext();
ctx.Request.Method = method;
ctx.Request.Path = "/api/echo";
ctx.Request.RouteValues["methodName"] = "echo";
ctx.Connection.RemoteIpAddress = IPAddress.Parse("10.0.0.5");
// Distinct sentinel stream — the production code path that called
// EnableBuffering would replace this with FileBufferingReadStream.
// After the fix the original stream survives untouched.
var sentinel = new MemoryStream();
ctx.Request.Body = sentinel;
Stream? observedDuringHandler = null;
var mw = CreateMiddleware(hc =>
{
observedDuringHandler = hc.Request.Body;
hc.Response.StatusCode = 200;
return Task.CompletedTask;
}, writer);
await mw.InvokeAsync(ctx);
Assert.Same(sentinel, observedDuringHandler);
var evt = Assert.Single(writer.Events);
// No body → RequestSummary stays null, matching the bodyless-POST contract.
Assert.Null(evt.RequestSummary);
}
[Fact]
public async Task BodylessPost_ContentLengthZero_SkipsEnableBuffering()
{
// A POST with an explicit Content-Length of 0 is also bodyless — even
// though POST is conventionally a body-carrying method, the explicit
// zero short-circuits buffering. This pins the ContentLength branch of
// the RequestHasBody guard.
var writer = new RecordingAuditWriter();
var ctx = new DefaultHttpContext();
ctx.Request.Method = "POST";
ctx.Request.Path = "/api/echo";
ctx.Request.RouteValues["methodName"] = "echo";
ctx.Request.ContentLength = 0;
ctx.Connection.RemoteIpAddress = IPAddress.Parse("10.0.0.5");
var sentinel = new MemoryStream();
ctx.Request.Body = sentinel;
Stream? observedDuringHandler = null;
var mw = CreateMiddleware(hc =>
{
observedDuringHandler = hc.Request.Body;
hc.Response.StatusCode = 200;
return Task.CompletedTask;
}, writer);
await mw.InvokeAsync(ctx);
Assert.Same(sentinel, observedDuringHandler);
var evt = Assert.Single(writer.Events);
Assert.Null(evt.RequestSummary);
}
[Fact]
public async Task PostWithBody_StillEnablesBuffering_AndCapturesRequestSummary()
{
// Regression: the bodyless short-circuit must NOT regress the existing
// body-capture contract for normal POSTs — we still need to buffer +
// capture the request body for the audit row.
var writer = new RecordingAuditWriter();
var requestJson = "{\"a\":42}";
var ctx = BuildContext(body: requestJson);
string? observedAfterMiddleware = null;
var mw = CreateMiddleware(async hc =>
{
using var reader = new StreamReader(hc.Request.Body);
observedAfterMiddleware = await reader.ReadToEndAsync();
hc.Response.StatusCode = 200;
}, writer);
await mw.InvokeAsync(ctx);
Assert.Equal(requestJson, observedAfterMiddleware);
var evt = Assert.Single(writer.Events);
Assert.Equal(requestJson, evt.RequestSummary);
}
}
@@ -896,11 +896,15 @@ public class ManagementActorTests : TestKit, IDisposable
[Fact]
public void QueryDeployments_UnfilteredForSiteScopedUser_DropsOutOfScopeRecords()
{
// Records for instances 1 (site 1, in scope) and 2 (site 2, out of scope).
_templateRepo.GetInstanceByIdAsync(1, Arg.Any<CancellationToken>())
.Returns(new Instance("Pump1") { Id = 1, SiteId = 1 });
_templateRepo.GetInstanceByIdAsync(2, Arg.Any<CancellationToken>())
.Returns(new Instance("Pump2") { Id = 2, SiteId = 2 });
// ManagementService-023: the unfiltered branch now bulk-loads instances
// once via GetAllInstancesAsync (instead of N+1 GetInstanceByIdAsync per
// distinct InstanceId). Mock the bulk path accordingly.
_templateRepo.GetAllInstancesAsync(Arg.Any<CancellationToken>())
.Returns(new List<Instance>
{
new("Pump1") { Id = 1, SiteId = 1 },
new("Pump2") { Id = 2, SiteId = 2 },
});
var deployRepo = Substitute.For<IDeploymentManagerRepository>();
deployRepo.GetAllDeploymentRecordsAsync(Arg.Any<CancellationToken>())
.Returns(new List<Commons.Entities.Deployment.DeploymentRecord>
@@ -917,6 +921,43 @@ public class ManagementActorTests : TestKit, IDisposable
var response = ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
Assert.Contains("deploy-1", response.JsonData);
Assert.DoesNotContain("deploy-2", response.JsonData);
// The per-instance lookup must NOT have been used for the unfiltered
// branch — that was the N+1 the bulk load replaced.
_templateRepo.DidNotReceiveWithAnyArgs().GetInstanceByIdAsync(default);
}
[Fact]
public void QueryDeployments_UnfilteredForSiteScopedUser_UsesBulkInstanceLoad_NotPerRecordLookup()
{
// ManagementService-023 regression pin: the unfiltered branch must issue
// GetAllInstancesAsync ONCE and never call GetInstanceByIdAsync, regardless
// of how many DeploymentRecords reference distinct InstanceIds. Before the
// fix, three distinct instance ids would have produced three per-instance
// lookups (textbook N+1).
_templateRepo.GetAllInstancesAsync(Arg.Any<CancellationToken>())
.Returns(new List<Instance>
{
new("Pump1") { Id = 1, SiteId = 1 },
new("Pump2") { Id = 2, SiteId = 2 },
new("Pump3") { Id = 3, SiteId = 1 },
});
var deployRepo = Substitute.For<IDeploymentManagerRepository>();
deployRepo.GetAllDeploymentRecordsAsync(Arg.Any<CancellationToken>())
.Returns(new List<Commons.Entities.Deployment.DeploymentRecord>
{
DeploymentRecordFor(1), DeploymentRecordFor(2), DeploymentRecordFor(3),
DeploymentRecordFor(1), DeploymentRecordFor(3) // duplicates: still no extra lookups
});
_services.AddScoped(_ => deployRepo);
var actor = CreateActor();
var envelope = ScopedEnvelope(new QueryDeploymentsCommand(), new[] { "1" }, "Deployment");
actor.Tell(envelope);
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
_templateRepo.Received(1).GetAllInstancesAsync(Arg.Any<CancellationToken>());
_templateRepo.DidNotReceiveWithAnyArgs().GetInstanceByIdAsync(default);
}
[Fact]
@@ -395,6 +395,63 @@ public class NotificationOutboxActorDispatchTests : TestKit
"PostStop did not cancel the in-flight delivery promptly.");
}
// ── NotificationOutbox-006: adapter dictionary cached for the actor's lifetime ──
[Fact]
public void Dispatch_ResolvesAdaptersOnce_AcrossMultipleSweeps()
{
// NotificationOutbox-006: adapter registration is static per process lifetime,
// so the NotificationType -> adapter lookup must be built ONCE for the actor's
// lifetime, not per dispatch sweep. The cache is paired with an actor-lifetime
// DI scope (see _adaptersScope) so scoped adapter instances are reused safely.
SetupSmtpRetryPolicy(maxRetries: 5, retryDelay: TimeSpan.FromMinutes(1));
// Isolated substitutes for this test — we replace the dispatcher's per-sweep
// INotificationOutboxRepository registration with a private counting factory,
// so we don't mutate the shared _outboxRepository field that other tests in
// this class configure differently.
var outboxRepository = Substitute.For<INotificationOutboxRepository>();
outboxRepository.GetDueAsync(Arg.Any<DateTimeOffset>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(_ => new[] { MakeNotification() });
// Counting factory: increments each time the DI container resolves an
// INotificationDeliveryAdapter. Pre-fix this would have ticked once per
// sweep; post-fix it ticks exactly once for the actor's lifetime.
var resolutionCount = 0;
var services = new ServiceCollection();
services.AddScoped(_ => outboxRepository);
services.AddScoped(_ => _notificationRepository);
services.AddScoped<INotificationDeliveryAdapter>(_ =>
{
Interlocked.Increment(ref resolutionCount);
return new StubAdapter(() => DeliveryOutcome.Success("ops@example.com"));
});
var sp = services.BuildServiceProvider();
var actor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
sp,
new NotificationOutboxOptions { DispatchInterval = TimeSpan.FromHours(1) },
new NoOpCentralAuditWriter(),
NullLogger<NotificationOutboxActor>.Instance)));
// Fire three sweeps end-to-end. Each waits on the previous via the
// in-flight guard, so the UpdateAsync count climbs monotonically.
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => outboxRepository.Received(1).UpdateAsync(
Arg.Any<Notification>(), Arg.Any<CancellationToken>()));
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => outboxRepository.Received(2).UpdateAsync(
Arg.Any<Notification>(), Arg.Any<CancellationToken>()));
actor.Tell(InternalMessages.DispatchTick.Instance);
AwaitAssert(() => outboxRepository.Received(3).UpdateAsync(
Arg.Any<Notification>(), Arg.Any<CancellationToken>()));
// The adapter resolution must have happened EXACTLY ONCE despite three
// dispatch sweeps. Pre-fix this would have been 3 (or more).
Assert.Equal(1, resolutionCount);
}
[Fact]
public void OverlappingTicks_WhileDispatchInFlight_DoNotClaimConcurrently()
{
@@ -338,4 +338,52 @@ public sealed class BundleImporterPreviewTests : IDisposable
i.Kind == ConflictKind.Blocker && i.Name == name);
}
}
[Fact]
public async Task PreviewAsync_multiple_templates_with_children_diffs_each_correctly()
{
// Transport-008 regression: PreviewAsync previously fetched each matching
// template's children via a per-name GetTemplateWithChildrenAsync call
// (N+1). The bulk variant returns every match in a single query — this
// test seeds three templates with distinct child collections and asserts
// the preview hydrates each one so the per-child diff sees the right
// attribute / alarm / script counts (i.e. the bulk fetch did not lose
// any child rows compared to the per-name fetch).
await using (var scope = _provider.CreateAsyncScope())
{
var ctx = scope.ServiceProvider.GetRequiredService<ScadaLinkDbContext>();
var pump = new Template("Pump") { Description = "p1" };
pump.Attributes.Add(new TemplateAttribute("Flow"));
pump.Scripts.Add(new TemplateScript("init", "return 1;"));
var valve = new Template("Valve") { Description = "v1" };
valve.Alarms.Add(new TemplateAlarm("HighPressure"));
var tank = new Template("Tank") { Description = "t1" };
tank.Attributes.Add(new TemplateAttribute("Level"));
tank.Attributes.Add(new TemplateAttribute("Temperature"));
ctx.Templates.AddRange(pump, valve, tank);
await ctx.SaveChangesAsync();
}
var bundleStream = await ExportTemplatesAsync();
var bytes = await StreamToBytes(bundleStream);
ImportPreview preview;
await using (var scope = _provider.CreateAsyncScope())
{
var importer = scope.ServiceProvider.GetRequiredService<IBundleImporter>();
var session = await importer.LoadAsync(new MemoryStream(bytes), passphrase: null);
preview = await importer.PreviewAsync(session.SessionId);
}
// Each template should be diff-classified (Identical, since the bundle
// is the literal projection of the target). Critically, the diff must
// succeed for ALL three — a bulk-fetch bug that silently drops rows
// would surface here as a missing item or a wrong (New) classification.
var pumpItem = Assert.Single(preview.Items, i => i.EntityType == "Template" && i.Name == "Pump");
var valveItem = Assert.Single(preview.Items, i => i.EntityType == "Template" && i.Name == "Valve");
var tankItem = Assert.Single(preview.Items, i => i.EntityType == "Template" && i.Name == "Tank");
Assert.Equal(ConflictKind.Identical, pumpItem.Kind);
Assert.Equal(ConflictKind.Identical, valveItem.Kind);
Assert.Equal(ConflictKind.Identical, tankItem.Kind);
}
}