Files
scadalink-design/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs
Joseph Doherty 479870e40c feat(audit): stamp SourceNode at site SqliteAuditWriter from INodeIdentityProvider
Caller-provided SourceNode wins (preserves reconciled rows from other nodes);
otherwise the writer fills it from the local INodeIdentityProvider.NodeName.
Reads from the provider on every write — singleton lifetime means zero overhead.
2026-05-23 17:08:21 -04:00

621 lines
31 KiB
C#

using System.Text;
using System.Text.Json;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.TestHost;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ScadaLink.AuditLog.Central;
using ScadaLink.AuditLog.Site;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.AuditLog.Tests.Integration.Infrastructure;
using ScadaLink.AuditLog.Tests.TestSupport;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.InboundApi;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
using ScadaLink.InboundAPI;
using ScadaLink.InboundAPI.Middleware;
using ScadaLink.NotificationOutbox;
using ScadaLink.NotificationOutbox.Delivery;
using ScadaLink.NotificationOutbox.Messages;
using ScadaLink.SiteRuntime.Scripts;
using ScadaLink.StoreAndForward;
namespace ScadaLink.AuditLog.Tests.Integration;
/// <summary>
/// Audit Log #23 — <b>ParentExecutionId cross-execution correlation</b> headline
/// end-to-end suite. Verifies the inbound-API → routed-site-script bridge: an
/// inbound HTTP request runs an inbound method script that calls
/// <c>Route.Call</c> into a site instance; the routed site script does a sync
/// <c>ExternalSystem.Call</c>, a cached call and a <c>Notify.Send</c>. Every
/// audit row the routed run produces — site + central, sync + cached lifecycle
/// + <c>NotifySend</c>/<c>NotifyDeliver</c> — must carry
/// <see cref="AuditEvent.ParentExecutionId"/> equal to the inbound request's
/// <see cref="AuditEvent.ExecutionId"/>, while the routed run has its own
/// distinct <see cref="AuditEvent.ExecutionId"/> and the inbound
/// <see cref="AuditKind.InboundRequest"/> row is top-level
/// (<c>ParentExecutionId = NULL</c>).
/// </summary>
/// <remarks>
/// <para>
/// This is the integration-level counterpart to <see cref="ExecutionIdCorrelationTests"/>:
/// where that suite drives a single <see cref="ScriptRuntimeContext"/> run and
/// asserts the shared per-run <c>ExecutionId</c>, this suite spans <b>two</b>
/// executions on opposite sides of the inbound→routed bridge and asserts the
/// cross-execution <c>ParentExecutionId</c> linkage plus
/// <see cref="IAuditLogRepository.GetExecutionTreeAsync"/>.
/// </para>
/// <para>
/// The bridge is exercised through the genuine production glue:
/// <list type="bullet">
/// <item><description>the real <see cref="AuditWriteMiddleware"/> in a
/// Microsoft.AspNetCore.TestHost pipeline — mints the inbound request's
/// per-request <c>ExecutionId</c> once, stashes it on
/// <see cref="HttpContext.Items"/>, and emits the top-level
/// <see cref="AuditKind.InboundRequest"/> row via the real
/// <see cref="CentralAuditWriter"/>;</description></item>
/// <item><description>the real <see cref="InboundScriptExecutor"/> +
/// <see cref="RouteHelper"/> — the executor binds the stashed inbound
/// <c>ExecutionId</c> via <see cref="RouteHelper.WithParentExecutionId"/>, so a
/// <c>Route.To(...).Call(...)</c> inside the inbound script builds a
/// <see cref="RouteToCallRequest"/> carrying
/// <see cref="RouteToCallRequest.ParentExecutionId"/>.</description></item>
/// </list>
/// Only the cross-cluster routing transport is substituted: the test
/// <see cref="BridgingInstanceRouter"/> stands in for
/// <c>CommunicationServiceInstanceRouter</c> exactly as the production site
/// (<c>DeploymentManagerActor</c> → <c>ScriptActor</c> → <c>ScriptExecutionActor</c>)
/// would — it reads <see cref="RouteToCallRequest.ParentExecutionId"/> off the
/// wire request and threads it into the routed <see cref="ScriptRuntimeContext"/>
/// as <c>parentExecutionId</c>. A multi-node cluster is out of scope for an
/// in-process test (mirroring <c>SiteAuditPushFlowTests</c>'s relay).
/// </para>
/// <para>
/// The central audit store is the real <see cref="AuditLogRepository"/> over the
/// per-class <see cref="MsSqlMigrationFixture"/> MSSQL database; the routed run's
/// site rows reach it through the real <see cref="SqliteAuditWriter"/> hot-path +
/// <see cref="SiteAuditTelemetryActor"/> drain, the cached lifecycle rows through
/// the production <see cref="CachedCallTelemetryForwarder"/>, and the
/// <c>NotifyDeliver</c> rows through the real central
/// <see cref="NotificationOutboxActor"/> dispatcher.
/// </para>
/// </remarks>
public class ParentExecutionIdCorrelationTests : TestKit, IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public ParentExecutionIdCorrelationTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
private const string RoutedInstanceCode = "Plant.Pump42";
private const string RoutedScriptName = "OnInboundRouted";
private const string ExternalSystemName = "ERP";
private const string ExternalMethodName = "GetOrder";
private const string NotifyListName = "ops-team";
/// <summary>Per-run site id (Guid suffix) so concurrent tests sharing the MSSQL fixture stay isolated.</summary>
private static string NewSiteId() =>
"test-parentexec-" + Guid.NewGuid().ToString("N").Substring(0, 8);
private ScadaLinkDbContext CreateContext()
{
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
.UseSqlServer(_fixture.ConnectionString)
.ConfigureWarnings(w => w.Ignore(
Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning))
.Options;
return new ScadaLinkDbContext(options);
}
[SkippableFact]
public async Task InboundRoutedRun_AllRoutedRows_CarryInboundExecutionId_AsParentExecutionId()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
// ── Central — repository + ingest actor + audit writer over the MSSQL fixture ──
var centralServices = new ServiceCollection();
centralServices.AddDbContext<ScadaLinkDbContext>(opts =>
opts.UseSqlServer(_fixture.ConnectionString)
.ConfigureWarnings(w => w.Ignore(
Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning)));
centralServices.AddScoped<IAuditLogRepository>(sp =>
new AuditLogRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
centralServices.AddScoped<ISiteCallAuditRepository>(sp =>
new SiteCallAuditRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
centralServices.AddScoped<INotificationOutboxRepository>(sp =>
new NotificationOutboxRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
centralServices.AddScoped<INotificationRepository>(sp =>
new NotificationRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
// The NotifyDeliver dispatch path runs through this same long-lived
// provider — a stub adapter that always reports a successful delivery.
centralServices.AddScoped<INotificationDeliveryAdapter>(_ => new AlwaysDeliversAdapter());
await using var centralProvider = centralServices.BuildServiceProvider();
var ingestActor = Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
(IServiceProvider)centralProvider,
NullLogger<AuditLogIngestActor>.Instance)));
var centralAuditWriter = new CentralAuditWriter(
centralProvider, NullLogger<CentralAuditWriter>.Instance);
// ── Site — SQLite audit writer (hot-path) drained to central by the
// real SiteAuditTelemetryActor through the stub gRPC client. The sync
// ApiCall row and the NotifySend row flow through this chain. ──
await using var sqliteWriter = new SqliteAuditWriter(
Options.Create(new SqliteAuditWriterOptions
{
DatabasePath = "ignored",
BatchSize = 64,
ChannelCapacity = 1024,
}),
NullLogger<SqliteAuditWriter>.Instance,
new FakeNodeIdentityProvider(),
connectionStringOverride:
$"Data Source=file:auditlog-parentexec-{Guid.NewGuid():N}?mode=memory&cache=shared");
var ring = new RingBufferFallback();
var siteAuditWriter = new FallbackAuditWriter(
sqliteWriter, ring, new NoOpAuditWriteFailureCounter(),
NullLogger<FallbackAuditWriter>.Instance);
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
(ISiteAuditQueue)sqliteWriter,
stubClient,
Options.Create(new SiteAuditTelemetryOptions
{
BatchSize = 256,
BusyIntervalSeconds = 1,
IdleIntervalSeconds = 1,
}),
NullLogger<SiteAuditTelemetryActor>.Instance)));
// Cached-call telemetry: production forwarder + dispatcher that also
// pushes each combined packet through the stub client into the central
// dual-write transaction (same wiring CombinedTelemetryHarness uses).
var cachedForwarder = new CombinedTelemetryDispatcher(
new CachedCallTelemetryForwarder(
siteAuditWriter, trackingStore: null,
NullLogger<CachedCallTelemetryForwarder>.Instance),
stubClient);
// Site Store-and-Forward — Notify.Send buffers a NotificationSubmit here.
using var safKeepAlive = new Microsoft.Data.Sqlite.SqliteConnection(
$"Data Source=parentexec-saf-{Guid.NewGuid():N};Mode=Memory;Cache=Shared");
safKeepAlive.Open();
var safStorage = new StoreAndForwardStorage(
safKeepAlive.ConnectionString, NullLogger<StoreAndForwardStorage>.Instance);
await safStorage.InitializeAsync();
var storeAndForward = new StoreAndForwardService(
safStorage,
new StoreAndForwardOptions
{
DefaultRetryInterval = TimeSpan.Zero,
DefaultMaxRetries = 3,
RetryTimerInterval = TimeSpan.FromMinutes(10),
},
NullLogger<StoreAndForwardService>.Instance);
// ── Outbound external-system client (routed run): sync Call succeeds,
// CachedCall completes immediately (WasBuffered=false) so the script
// helper emits the Submit + Attempted + CachedResolve lifecycle. ──
var externalClient = Substitute.For<IExternalSystemClient>();
externalClient
.CallAsync(ExternalSystemName, ExternalMethodName,
Arg.Any<IReadOnlyDictionary<string, object?>?>(), Arg.Any<CancellationToken>())
.Returns(new ExternalCallResult(true, "{\"ok\":true}", null));
externalClient
.CachedCallAsync(ExternalSystemName, ExternalMethodName,
Arg.Any<IReadOnlyDictionary<string, object?>?>(),
Arg.Any<string?>(), Arg.Any<CancellationToken>(),
Arg.Any<ScadaLink.Commons.Types.TrackedOperationId?>(),
Arg.Any<Guid?>(), Arg.Any<string?>(), Arg.Any<Guid?>())
.Returns(new ExternalCallResult(true, "{\"ok\":true}", null, WasBuffered: false));
// ── The routing transport stand-in: builds the routed ScriptRuntimeContext
// carrying RouteToCallRequest.ParentExecutionId — exactly what the
// production site handler (DeploymentManagerActor) does. ──
var router = new BridgingInstanceRouter(
siteId,
externalClient,
siteAuditWriter,
cachedForwarder,
storeAndForward);
// ── The inbound API method script: it calls Route.Call into the site
// instance. The real InboundScriptExecutor binds the inbound request's
// ExecutionId onto the RouteHelper, so the routed call carries it as
// ParentExecutionId. ──
var inboundMethod = new ScadaLink.Commons.Entities.InboundApi.ApiMethod(
"submitOrder",
$"return await Route.To(\"{RoutedInstanceCode}\").Call(\"{RoutedScriptName}\", new {{ order = 7 }});");
var locator = Substitute.For<IInstanceLocator>();
locator.GetSiteIdForInstanceAsync(RoutedInstanceCode, Arg.Any<CancellationToken>())
.Returns(siteId);
var scriptExecutor = new InboundScriptExecutor(
NullLogger<InboundScriptExecutor>.Instance,
new ServiceCollection().BuildServiceProvider());
Assert.True(scriptExecutor.CompileAndRegister(inboundMethod));
// ── Act — issue the inbound HTTP request through a TestHost pipeline
// fronted by the real AuditWriteMiddleware. The endpoint handler reads
// the middleware-stashed inbound ExecutionId and runs the inbound
// method script with it as parentExecutionId. ──
using var host = await BuildInboundHostAsync(centralAuditWriter, async ctx =>
{
var inboundExecutionId = (Guid)ctx.Items[AuditWriteMiddleware.InboundExecutionIdItemKey]!;
var route = new RouteHelper(locator, router);
var result = await scriptExecutor.ExecuteAsync(
inboundMethod,
new Dictionary<string, object?>(),
route,
TimeSpan.FromSeconds(30),
ctx.RequestAborted,
parentExecutionId: inboundExecutionId);
ctx.Response.StatusCode = result.Success ? 200 : 500;
await ctx.Response.WriteAsync(result.Success ? "ok" : "fail");
});
var client = host.GetTestClient();
var response = await client.PostAsync(
"/api/submitOrder",
new StringContent("{}", Encoding.UTF8, "application/json"));
Assert.Equal(System.Net.HttpStatusCode.OK, response.StatusCode);
// The routed run emits its sync-ApiCall and NotifySend audit rows on a
// deliberately fire-and-forget path (alog.md §7 — an audit write must
// never block the user-facing script call). `Notify.Send` therefore
// returns — and the routed `RouteToCallAsync` completes — BEFORE the
// SqliteAuditWriter background loop has flushed the NotifySend row into
// the site hot-path. Wait for all five site rows to be durably present
// in SQLite before the central assertion: this is the production
// durability point (the row IS in SQLite before it is considered
// audited), and pinning it removes the emit-vs-drain race that
// otherwise let the SiteAuditTelemetryADrain forward only four rows on
// its first tick and leave NotifySend stranded for a full drain
// interval under heavy parallel load.
await WaitForSiteRowsPersistedAsync(sqliteWriter);
// The routed run produced a NotifySend that buffered a NotificationSubmit
// into S&F. Drain that genuine site-produced submission to the central
// NotificationOutboxActor so the NotifyDeliver dispatch rows materialise.
await ForwardBufferedNotificationToCentralAsync(
storeAndForward, router.NotificationId!, centralProvider, centralAuditWriter);
// ── Assert ──────────────────────────────────────────────────────────
await AwaitAssertAsync(async () =>
{
await using var readContext = CreateContext();
var repo = new AuditLogRepository(readContext);
// Every audit row this site produced (sync ApiCall + cached lifecycle
// + NotifySend) plus the central NotifyDeliver rows.
var siteRows = await repo.QueryAsync(
new AuditLogQueryFilter(SourceSiteIds: new[] { siteId }),
new AuditLogPaging(PageSize: 100));
// sync ApiCall (1) + cached Submit/Attempted/Resolve (3) + NotifySend (1)
// + NotifyDeliver Attempted/Delivered (2) = 7 rows for the routed run.
Assert.True(siteRows.Count == 7,
"Expected 7 routed-run audit rows; saw: "
+ string.Join(", ", siteRows.Select(r => $"{r.Channel}/{r.Kind}/{r.Status}")));
Assert.Single(siteRows, r => r.Channel == AuditChannel.ApiOutbound && r.Kind == AuditKind.ApiCall);
Assert.Single(siteRows, r => r.Kind == AuditKind.CachedSubmit);
Assert.Single(siteRows, r => r.Kind == AuditKind.CachedResolve);
Assert.Single(siteRows, r => r.Kind == AuditKind.NotifySend);
Assert.Equal(2, siteRows.Count(r => r.Kind == AuditKind.NotifyDeliver));
// CORE PROMISE: every routed-run row carries the SAME non-null
// ParentExecutionId — the inbound request's ExecutionId.
var parentIds = siteRows.Select(r => r.ParentExecutionId).Distinct().ToList();
Assert.Single(parentIds);
Assert.NotNull(parentIds[0]);
var inboundExecutionId = parentIds[0]!.Value;
// The routed run has its OWN distinct ExecutionId — not the parent's.
var routedExecutionIds = siteRows
.Select(r => r.ExecutionId)
.Distinct()
.ToList();
Assert.Single(routedExecutionIds);
Assert.NotNull(routedExecutionIds[0]);
var routedExecutionId = routedExecutionIds[0]!.Value;
Assert.NotEqual(inboundExecutionId, routedExecutionId);
// The inbound request's own InboundRequest row is TOP-LEVEL —
// ExecutionId = the propagated id, ParentExecutionId = NULL.
var inboundRows = await repo.QueryAsync(
new AuditLogQueryFilter(ExecutionId: inboundExecutionId),
new AuditLogPaging(PageSize: 10));
var inboundRow = Assert.Single(inboundRows,
r => r.Channel == AuditChannel.ApiInbound && r.Kind == AuditKind.InboundRequest);
Assert.Equal(AuditStatus.Delivered, inboundRow.Status);
Assert.Null(inboundRow.ParentExecutionId);
// The parentExecutionId filter pulls the routed run's complete
// trust-boundary footprint (all 7 routed rows, none of the inbound).
var byParent = await repo.QueryAsync(
new AuditLogQueryFilter(ParentExecutionId: inboundExecutionId),
new AuditLogPaging(PageSize: 100));
Assert.Equal(7, byParent.Count);
Assert.All(byParent, r => Assert.Equal(routedExecutionId, r.ExecutionId));
// GetExecutionTreeAsync returns BOTH executions in one chain —
// inbound (root) and routed (child), regardless of entry point.
var treeFromChild = await repo.GetExecutionTreeAsync(routedExecutionId);
AssertChain(treeFromChild, inboundExecutionId, routedExecutionId);
var treeFromRoot = await repo.GetExecutionTreeAsync(inboundExecutionId);
AssertChain(treeFromRoot, inboundExecutionId, routedExecutionId);
}, TimeSpan.FromSeconds(90));
}
/// <summary>
/// Asserts the execution tree is the expected two-node inbound→routed chain:
/// the inbound execution is the root (<c>ParentExecutionId = NULL</c>) and the
/// routed execution's <c>ParentExecutionId</c> points back at it.
/// </summary>
private static void AssertChain(
IReadOnlyList<ExecutionTreeNode> tree,
Guid inboundExecutionId,
Guid routedExecutionId)
{
Assert.Equal(2, tree.Count);
var root = Assert.Single(tree, n => n.ExecutionId == inboundExecutionId);
Assert.Null(root.ParentExecutionId);
var child = Assert.Single(tree, n => n.ExecutionId == routedExecutionId);
Assert.Equal(inboundExecutionId, child.ParentExecutionId);
}
/// <summary>
/// Spins up a minimal in-memory ASP.NET host whose pipeline mirrors the
/// production inbound-API arrangement: routing → the real
/// <see cref="AuditWriteMiddleware"/> → the <c>POST /api/{methodName}</c>
/// endpoint. The middleware mints + stashes the inbound request's
/// <c>ExecutionId</c> and emits the top-level <see cref="AuditKind.InboundRequest"/>
/// row via the supplied <see cref="ICentralAuditWriter"/>.
/// </summary>
private static async Task<IHost> BuildInboundHostAsync(
ICentralAuditWriter centralAuditWriter,
RequestDelegate endpointHandler)
{
var hostBuilder = new HostBuilder()
.ConfigureWebHost(webBuilder =>
{
webBuilder
.UseTestServer()
.ConfigureServices(services =>
{
services.AddSingleton(centralAuditWriter);
services.AddRouting();
})
.Configure(app =>
{
app.UseRouting();
app.UseAuditWriteMiddleware();
app.UseEndpoints(endpoints =>
{
endpoints.MapPost("/api/{methodName}", endpointHandler);
});
});
});
return await hostBuilder.StartAsync();
}
/// <summary>
/// Reads the genuine site-produced <see cref="NotificationSubmit"/> the routed
/// <c>Notify.Send</c> buffered into Store-and-Forward, then drives it through
/// a real central <see cref="NotificationOutboxActor"/> so the
/// <see cref="AuditKind.NotifyDeliver"/> dispatch rows materialise. The
/// dispatcher echoes <c>OriginParentExecutionId</c> off the
/// <c>NotificationSubmit</c> onto every <c>NotifyDeliver</c> row — the
/// cross-execution linkage under test on the central side.
/// </summary>
private async Task ForwardBufferedNotificationToCentralAsync(
StoreAndForwardService storeAndForward,
string notificationId,
IServiceProvider centralProvider,
ICentralAuditWriter centralAuditWriter)
{
var buffered = await storeAndForward.GetMessageByIdAsync(notificationId);
Assert.NotNull(buffered);
var submit = JsonSerializer.Deserialize<NotificationSubmit>(buffered!.PayloadJson);
Assert.NotNull(submit);
// The routed Notify.Send stamped the inbound request's ExecutionId as the
// submission's OriginParentExecutionId — proven separately on the
// NotifyDeliver rows, but asserted here too as the central handoff input.
Assert.NotNull(submit!.OriginParentExecutionId);
// The outbox actor runs over the long-lived central provider (which
// carries the AlwaysDeliversAdapter) so the dispatch sweep — launched
// asynchronously by the DispatchTick — still has a live IServiceProvider
// to resolve its per-sweep scope from.
var outboxActor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
centralProvider,
new NotificationOutboxOptions
{
// Long timers so PreStart's scheduled ticks never fire — the
// test drives ingest + dispatch explicitly.
DispatchInterval = TimeSpan.FromHours(1),
PurgeInterval = TimeSpan.FromDays(1),
},
centralAuditWriter,
NullLogger<NotificationOutboxActor>.Instance)));
// Ingest the genuine site submission, then run one dispatch sweep.
var ack = await outboxActor.Ask<NotificationSubmitAck>(
submit, TimeSpan.FromSeconds(15));
Assert.True(ack.Accepted, ack.Error);
outboxActor.Tell(InternalMessages.DispatchTick.Instance);
}
/// <summary>
/// Polls the site SQLite hot-path until every audit <see cref="AuditKind"/>
/// the routed run is expected to emit — sync <c>ApiCall</c>, the cached
/// <c>CachedSubmit</c>/<c>ApiCallCached</c>/<c>CachedResolve</c> lifecycle,
/// and <c>NotifySend</c> — is durably present (Pending or Forwarded).
/// </summary>
/// <remarks>
/// The routed run's sync-<c>ApiCall</c> and <c>NotifySend</c> audit rows are
/// written fire-and-forget (the script call must not block on the audit
/// writer — alog.md §7), so the routed <c>RouteToCallAsync</c> returns
/// before the background writer loop has committed those rows.
/// <c>NotifySend</c> is emitted last and therefore settles last. This wait
/// asserts the specific <b>Kinds</b> are present, not merely a row count: a
/// bare count could be satisfied while the last-emitted <c>NotifySend</c>
/// row was still in flight, letting the <c>SiteAuditTelemetryActor</c> drain
/// only a partial snapshot and leave <c>NotifySend</c> stranded for a later
/// tick — the emit-vs-drain race that failed this test under full-suite load.
/// </remarks>
private async Task WaitForSiteRowsPersistedAsync(SqliteAuditWriter sqliteWriter)
{
var expectedKinds = new[]
{
AuditKind.ApiCall, AuditKind.CachedSubmit, AuditKind.ApiCallCached,
AuditKind.CachedResolve, AuditKind.NotifySend,
};
await AwaitAssertAsync(
async () =>
{
var pending = await sqliteWriter.ReadPendingAsync(256);
var forwarded = await sqliteWriter.ReadForwardedAsync(256);
var kinds = pending.Concat(forwarded).Select(r => r.Kind).ToHashSet();
var missing = expectedKinds.Where(k => !kinds.Contains(k)).ToList();
Assert.True(
missing.Count == 0,
"Expected every routed-run audit Kind durably in SQLite; missing: "
+ string.Join(", ", missing)
+ $" (saw {pending.Count} Pending + {forwarded.Count} Forwarded).");
},
TimeSpan.FromSeconds(30),
TimeSpan.FromMilliseconds(50));
}
/// <summary>
/// Stub <see cref="INotificationDeliveryAdapter"/> that always reports a
/// successful delivery — a single dispatch sweep then yields one
/// <see cref="AuditStatus.Attempted"/> + one <see cref="AuditStatus.Delivered"/>
/// <see cref="AuditKind.NotifyDeliver"/> row.
/// </summary>
private sealed class AlwaysDeliversAdapter : INotificationDeliveryAdapter
{
public NotificationType Type => NotificationType.Email;
public Task<DeliveryOutcome> DeliverAsync(
ScadaLink.Commons.Entities.Notifications.Notification notification,
CancellationToken cancellationToken = default)
=> Task.FromResult(DeliveryOutcome.Success("ops@example.com"));
}
/// <summary>
/// In-process stand-in for the cross-cluster routing transport
/// (<c>CommunicationServiceInstanceRouter</c> →
/// <c>CommunicationService</c> → site <c>DeploymentManagerActor</c>). On a
/// routed <c>Call</c> it does exactly what the production site handler does:
/// it reads <see cref="RouteToCallRequest.ParentExecutionId"/> off the wire
/// request and threads it into a fresh routed <see cref="ScriptRuntimeContext"/>
/// as <c>parentExecutionId</c>, then runs the routed script's three
/// trust-boundary actions (sync <c>ExternalSystem.Call</c>, a cached call and
/// a <c>Notify.Send</c>). The routed context still mints its OWN fresh
/// <c>ExecutionId</c> — only the parent pointer is inherited.
/// </summary>
private sealed class BridgingInstanceRouter : IInstanceRouter
{
private readonly string _siteId;
private readonly IExternalSystemClient _externalClient;
private readonly IAuditWriter _auditWriter;
private readonly ICachedCallTelemetryForwarder _cachedForwarder;
private readonly StoreAndForwardService _storeAndForward;
/// <summary>
/// The <c>NotificationId</c> the routed <c>Notify.Send</c> minted, captured
/// so the test can drain the buffered <see cref="NotificationSubmit"/>.
/// </summary>
public string? NotificationId { get; private set; }
public BridgingInstanceRouter(
string siteId,
IExternalSystemClient externalClient,
IAuditWriter auditWriter,
ICachedCallTelemetryForwarder cachedForwarder,
StoreAndForwardService storeAndForward)
{
_siteId = siteId;
_externalClient = externalClient;
_auditWriter = auditWriter;
_cachedForwarder = cachedForwarder;
_storeAndForward = storeAndForward;
}
public async Task<RouteToCallResponse> RouteToCallAsync(
string siteId, RouteToCallRequest request, CancellationToken cancellationToken)
{
var compilationService = new ScriptCompilationService(
NullLogger<ScriptCompilationService>.Instance);
var sharedScriptLibrary = new SharedScriptLibrary(
compilationService, NullLogger<SharedScriptLibrary>.Instance);
// Mirror DeploymentManagerActor → ScriptActor → ScriptExecutionActor:
// the routed script execution gets its OWN fresh ExecutionId, and the
// inbound request's ExecutionId arrives as ParentExecutionId.
var routedContext = new ScriptRuntimeContext(
ActorRefs.Nobody,
ActorRefs.Nobody,
sharedScriptLibrary,
currentCallDepth: 0,
maxCallDepth: 10,
askTimeout: TimeSpan.FromSeconds(5),
instanceName: request.InstanceUniqueName,
logger: NullLogger.Instance,
externalSystemClient: _externalClient,
databaseGateway: null,
storeAndForward: _storeAndForward,
siteCommunicationActor: null,
siteId: _siteId,
sourceScript: $"ScriptActor:{request.ScriptName}",
auditWriter: _auditWriter,
operationTrackingStore: null,
cachedForwarder: _cachedForwarder,
executionId: null,
parentExecutionId: request.ParentExecutionId);
// The routed site script's body: a sync ExternalSystem.Call, a cached
// call, and a Notify.Send — three distinct trust-boundary actions of
// the one routed execution.
await routedContext.ExternalSystem.Call(ExternalSystemName, ExternalMethodName);
await routedContext.ExternalSystem.CachedCall(ExternalSystemName, ExternalMethodName);
NotificationId = await routedContext.Notify
.To(NotifyListName)
.Send("Routed run alert", "inbound-routed script fired");
return new RouteToCallResponse(
request.CorrelationId, true, "routed-ok", null, DateTimeOffset.UtcNow);
}
public Task<RouteToGetAttributesResponse> RouteToGetAttributesAsync(
string siteId, RouteToGetAttributesRequest request, CancellationToken cancellationToken)
=> throw new NotSupportedException();
public Task<RouteToSetAttributesResponse> RouteToSetAttributesAsync(
string siteId, RouteToSetAttributesRequest request, CancellationToken cancellationToken)
=> throw new NotSupportedException();
}
}