diff --git a/CLAUDE.md b/CLAUDE.md index f60e607..4fe1684 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -133,6 +133,7 @@ This project contains design documentation for a distributed SCADA system built - Scope = script trust boundary: outbound API (sync + cached), outbound DB (sync + cached), notifications, inbound API. Framework/internal traffic is explicitly excluded. - One row per lifecycle event; cached calls produce 4+ rows per operation (`Submitted`, `Forwarded`, `Attempted`, `Delivered`/`Parked`/`Discarded`). - `ExecutionId` (`uniqueidentifier NULL`) is the universal per-run correlation value — every audit row emitted by one script execution / inbound request shares it; `CorrelationId` remains the per-operation lifecycle id (NULL for sync one-shots). +- `ParentExecutionId` (`uniqueidentifier NULL`) is the cross-execution spawn pointer — every row of a spawned run carries the spawner's `ExecutionId`; first cut bridges the inbound API → routed-site-script case (the routed run records the inbound request's `ExecutionId`; the inbound row stays top-level / NULL); `IX_AuditLog_ParentExecution` backs the filter + the recursive execution-tree walk; tag cascade deferred. - Site SQLite hot-path first, then gRPC telemetry to central; ingest is idempotent on `EventId`; periodic reconciliation pull as fallback when telemetry is lost. - Cached operations: site emits a single additively-extended `CachedCallTelemetry` packet carrying both audit events and operational state; central writes `AuditLog` + `SiteCalls` in one transaction. - Payload cap 8 KB by default / 64 KB on error rows; auth headers redacted by default; SQL parameter values captured by default; per-target redaction opt-in. diff --git a/docs/requirements/Component-AuditLog.md b/docs/requirements/Component-AuditLog.md index 7fdeb66..1b6debf 100644 --- a/docs/requirements/Component-AuditLog.md +++ b/docs/requirements/Component-AuditLog.md @@ -84,6 +84,7 @@ row per lifecycle event across all channels. | `Kind` | `varchar(32)` | Event kind discriminator (see kinds list below). | | `CorrelationId` | `uniqueidentifier` NULL | Ties multi-event operations together. `TrackedOperationId` for cached calls, `NotificationId` for notifications, request-id for inbound API. NULL for sync one-shot calls. | | `ExecutionId` | `uniqueidentifier` NULL | The originating script execution / inbound request — the universal per-run correlation value; distinct from `CorrelationId`, which is the per-operation lifecycle id. Stamped on *every* audit row emitted by one execution. | +| `ParentExecutionId` | `uniqueidentifier` NULL | The `ExecutionId` of the execution that *spawned* this run — the cross-execution correlation pointer. Set on every row of an inbound-API-routed site script run (= the inbound request's `ExecutionId`); NULL for a top-level run (inbound, tag-change / timer-triggered, un-bridged). | | `SourceSiteId` | `varchar(64)` NULL | NULL for central-originated events. | | `SourceInstanceId` | `varchar(128)` NULL | Instance whose script initiated the action (when applicable). | | `SourceScript` | `varchar(128)` NULL | Script name within the instance. | @@ -105,6 +106,7 @@ row per lifecycle event across all channels. - `IX_AuditLog_Site_Occurred (SourceSiteId, OccurredAtUtc)` — per-site filters. - `IX_AuditLog_CorrelationId (CorrelationId)` — drilldown from a single operation. - `IX_AuditLog_Execution (ExecutionId)` — drilldown to every action of one script execution / inbound request. +- `IX_AuditLog_ParentExecution (ParentExecutionId)` — cross-execution drilldown: the downward leg of the execution-tree walk seeks on it (`ParentExecutionId = ancestor.ExecutionId`), and it backs the `parentExecutionId` filter. - `IX_AuditLog_Channel_Status_Occurred (Channel, Status, OccurredAtUtc)` — KPI / dashboard tiles. - `IX_AuditLog_Target_Occurred (Target, OccurredAtUtc)` — "what did we send to system X". - Monthly partitioning on `OccurredAtUtc` from day one; purge is a partition switch (see Retention & Purge). @@ -149,6 +151,22 @@ The table carries two correlation columns at different granularities: The two are orthogonal: one execution may touch several operations (each with its own `CorrelationId`) yet every resulting row shares the one `ExecutionId`. +**`ParentExecutionId`** adds *cross-execution* correlation on top. `ExecutionId` +is per-run and flat — `WHERE ExecutionId = X` returns everything one run did, but +nothing links a run to the run that *spawned* it. `ParentExecutionId` carries the +spawning execution's `ExecutionId`: a spawned run still gets its own fresh +`ExecutionId`, and every audit row it emits also carries the spawner's id in +`ParentExecutionId`. The first cut bridges the **inbound API → routed-site-script** +case: an inbound request runs a method script that calls `Route.Call`, routing to +a site instance; the routed site script records the inbound request's +`ExecutionId` as its `ParentExecutionId`, while the inbound `InboundRequest` row +itself is top-level (`ParentExecutionId` NULL). The pointer always references the +*immediate* spawner, so a routed run that itself routes onward threads its own +`ExecutionId` — walking `ParentExecutionId → ExecutionId` recursively +reconstructs the call chain as a tree of arbitrary depth. The tag-cascade case +(an attribute write triggering another script) is **deferred** — the model +generalises to it with no schema change once that spawn point is threaded. + ## The Site-Local `AuditLog` (SQLite) A SQLite database file on each site node, alongside the Store-and-Forward diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs new file mode 100644 index 0000000..0efc6f1 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Integration/ParentExecutionIdCorrelationTests.cs @@ -0,0 +1,562 @@ +using System.Text; +using System.Text.Json; +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.TestHost; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ScadaLink.AuditLog.Central; +using ScadaLink.AuditLog.Site; +using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.AuditLog.Tests.Integration.Infrastructure; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.InboundApi; +using ScadaLink.Commons.Messages.Notification; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; +using ScadaLink.InboundAPI; +using ScadaLink.InboundAPI.Middleware; +using ScadaLink.NotificationOutbox; +using ScadaLink.NotificationOutbox.Delivery; +using ScadaLink.NotificationOutbox.Messages; +using ScadaLink.SiteRuntime.Scripts; +using ScadaLink.StoreAndForward; + +namespace ScadaLink.AuditLog.Tests.Integration; + +/// +/// Audit Log #23 — ParentExecutionId cross-execution correlation headline +/// end-to-end suite. Verifies the inbound-API → routed-site-script bridge: an +/// inbound HTTP request runs an inbound method script that calls +/// Route.Call into a site instance; the routed site script does a sync +/// ExternalSystem.Call, a cached call and a Notify.Send. Every +/// audit row the routed run produces — site + central, sync + cached lifecycle +/// + NotifySend/NotifyDeliver — must carry +/// equal to the inbound request's +/// , while the routed run has its own +/// distinct and the inbound +/// row is top-level +/// (ParentExecutionId = NULL). +/// +/// +/// +/// This is the integration-level counterpart to : +/// where that suite drives a single run and +/// asserts the shared per-run ExecutionId, this suite spans two +/// executions on opposite sides of the inbound→routed bridge and asserts the +/// cross-execution ParentExecutionId linkage plus +/// . +/// +/// +/// The bridge is exercised through the genuine production glue: +/// +/// the real in a +/// Microsoft.AspNetCore.TestHost pipeline — mints the inbound request's +/// per-request ExecutionId once, stashes it on +/// , and emits the top-level +/// row via the real +/// ; +/// the real + +/// — the executor binds the stashed inbound +/// ExecutionId via , so a +/// Route.To(...).Call(...) inside the inbound script builds a +/// carrying +/// . +/// +/// Only the cross-cluster routing transport is substituted: the test +/// stands in for +/// CommunicationServiceInstanceRouter exactly as the production site +/// (DeploymentManagerActorScriptActorScriptExecutionActor) +/// would — it reads off the +/// wire request and threads it into the routed +/// as parentExecutionId. A multi-node cluster is out of scope for an +/// in-process test (mirroring SiteAuditPushFlowTests's relay). +/// +/// +/// The central audit store is the real over the +/// per-class MSSQL database; the routed run's +/// site rows reach it through the real hot-path + +/// drain, the cached lifecycle rows through +/// the production , and the +/// NotifyDeliver rows through the real central +/// dispatcher. +/// +/// +public class ParentExecutionIdCorrelationTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public ParentExecutionIdCorrelationTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private const string RoutedInstanceCode = "Plant.Pump42"; + private const string RoutedScriptName = "OnInboundRouted"; + private const string ExternalSystemName = "ERP"; + private const string ExternalMethodName = "GetOrder"; + private const string NotifyListName = "ops-team"; + + /// Per-run site id (Guid suffix) so concurrent tests sharing the MSSQL fixture stay isolated. + private static string NewSiteId() => + "test-parentexec-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private ScadaLinkDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .ConfigureWarnings(w => w.Ignore( + Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning)) + .Options; + return new ScadaLinkDbContext(options); + } + + [SkippableFact] + public async Task InboundRoutedRun_AllRoutedRows_CarryInboundExecutionId_AsParentExecutionId() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + + // ── Central — repository + ingest actor + audit writer over the MSSQL fixture ── + var centralServices = new ServiceCollection(); + centralServices.AddDbContext(opts => + opts.UseSqlServer(_fixture.ConnectionString) + .ConfigureWarnings(w => w.Ignore( + Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning))); + centralServices.AddScoped(sp => + new AuditLogRepository(sp.GetRequiredService())); + centralServices.AddScoped(sp => + new SiteCallAuditRepository(sp.GetRequiredService())); + centralServices.AddScoped(sp => + new NotificationOutboxRepository(sp.GetRequiredService())); + centralServices.AddScoped(sp => + new NotificationRepository(sp.GetRequiredService())); + // The NotifyDeliver dispatch path runs through this same long-lived + // provider — a stub adapter that always reports a successful delivery. + centralServices.AddScoped(_ => new AlwaysDeliversAdapter()); + await using var centralProvider = centralServices.BuildServiceProvider(); + + var ingestActor = Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( + (IServiceProvider)centralProvider, + NullLogger.Instance))); + var centralAuditWriter = new CentralAuditWriter( + centralProvider, NullLogger.Instance); + + // ── Site — SQLite audit writer (hot-path) drained to central by the + // real SiteAuditTelemetryActor through the stub gRPC client. The sync + // ApiCall row and the NotifySend row flow through this chain. ── + await using var sqliteWriter = new SqliteAuditWriter( + Options.Create(new SqliteAuditWriterOptions + { + DatabasePath = "ignored", + BatchSize = 64, + ChannelCapacity = 1024, + }), + NullLogger.Instance, + connectionStringOverride: + $"Data Source=file:auditlog-parentexec-{Guid.NewGuid():N}?mode=memory&cache=shared"); + var ring = new RingBufferFallback(); + var siteAuditWriter = new FallbackAuditWriter( + sqliteWriter, ring, new NoOpAuditWriteFailureCounter(), + NullLogger.Instance); + var stubClient = new DirectActorSiteStreamAuditClient(ingestActor); + Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor( + (ISiteAuditQueue)sqliteWriter, + stubClient, + Options.Create(new SiteAuditTelemetryOptions + { + BatchSize = 256, + BusyIntervalSeconds = 1, + IdleIntervalSeconds = 1, + }), + NullLogger.Instance))); + + // Cached-call telemetry: production forwarder + dispatcher that also + // pushes each combined packet through the stub client into the central + // dual-write transaction (same wiring CombinedTelemetryHarness uses). + var cachedForwarder = new CombinedTelemetryDispatcher( + new CachedCallTelemetryForwarder( + siteAuditWriter, trackingStore: null, + NullLogger.Instance), + stubClient); + + // Site Store-and-Forward — Notify.Send buffers a NotificationSubmit here. + using var safKeepAlive = new Microsoft.Data.Sqlite.SqliteConnection( + $"Data Source=parentexec-saf-{Guid.NewGuid():N};Mode=Memory;Cache=Shared"); + safKeepAlive.Open(); + var safStorage = new StoreAndForwardStorage( + safKeepAlive.ConnectionString, NullLogger.Instance); + await safStorage.InitializeAsync(); + var storeAndForward = new StoreAndForwardService( + safStorage, + new StoreAndForwardOptions + { + DefaultRetryInterval = TimeSpan.Zero, + DefaultMaxRetries = 3, + RetryTimerInterval = TimeSpan.FromMinutes(10), + }, + NullLogger.Instance); + + // ── Outbound external-system client (routed run): sync Call succeeds, + // CachedCall completes immediately (WasBuffered=false) so the script + // helper emits the Submit + Attempted + CachedResolve lifecycle. ── + var externalClient = Substitute.For(); + externalClient + .CallAsync(ExternalSystemName, ExternalMethodName, + Arg.Any?>(), Arg.Any()) + .Returns(new ExternalCallResult(true, "{\"ok\":true}", null)); + externalClient + .CachedCallAsync(ExternalSystemName, ExternalMethodName, + Arg.Any?>(), + Arg.Any(), Arg.Any(), + Arg.Any(), + Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(new ExternalCallResult(true, "{\"ok\":true}", null, WasBuffered: false)); + + // ── The routing transport stand-in: builds the routed ScriptRuntimeContext + // carrying RouteToCallRequest.ParentExecutionId — exactly what the + // production site handler (DeploymentManagerActor) does. ── + var router = new BridgingInstanceRouter( + siteId, + externalClient, + siteAuditWriter, + cachedForwarder, + storeAndForward); + + // ── The inbound API method script: it calls Route.Call into the site + // instance. The real InboundScriptExecutor binds the inbound request's + // ExecutionId onto the RouteHelper, so the routed call carries it as + // ParentExecutionId. ── + var inboundMethod = new ScadaLink.Commons.Entities.InboundApi.ApiMethod( + "submitOrder", + $"return await Route.To(\"{RoutedInstanceCode}\").Call(\"{RoutedScriptName}\", new {{ order = 7 }});"); + var locator = Substitute.For(); + locator.GetSiteIdForInstanceAsync(RoutedInstanceCode, Arg.Any()) + .Returns(siteId); + var scriptExecutor = new InboundScriptExecutor( + NullLogger.Instance, + new ServiceCollection().BuildServiceProvider()); + Assert.True(scriptExecutor.CompileAndRegister(inboundMethod)); + + // ── Act — issue the inbound HTTP request through a TestHost pipeline + // fronted by the real AuditWriteMiddleware. The endpoint handler reads + // the middleware-stashed inbound ExecutionId and runs the inbound + // method script with it as parentExecutionId. ── + using var host = await BuildInboundHostAsync(centralAuditWriter, async ctx => + { + var inboundExecutionId = (Guid)ctx.Items[AuditWriteMiddleware.InboundExecutionIdItemKey]!; + var route = new RouteHelper(locator, router); + var result = await scriptExecutor.ExecuteAsync( + inboundMethod, + new Dictionary(), + route, + TimeSpan.FromSeconds(30), + ctx.RequestAborted, + parentExecutionId: inboundExecutionId); + + ctx.Response.StatusCode = result.Success ? 200 : 500; + await ctx.Response.WriteAsync(result.Success ? "ok" : "fail"); + }); + + var client = host.GetTestClient(); + var response = await client.PostAsync( + "/api/submitOrder", + new StringContent("{}", Encoding.UTF8, "application/json")); + Assert.Equal(System.Net.HttpStatusCode.OK, response.StatusCode); + + // The routed run produced a NotifySend that buffered a NotificationSubmit + // into S&F. Drain that genuine site-produced submission to the central + // NotificationOutboxActor so the NotifyDeliver dispatch rows materialise. + await ForwardBufferedNotificationToCentralAsync( + storeAndForward, router.NotificationId!, centralProvider, centralAuditWriter); + + // ── Assert ────────────────────────────────────────────────────────── + await AwaitAssertAsync(async () => + { + await using var readContext = CreateContext(); + var repo = new AuditLogRepository(readContext); + + // Every audit row this site produced (sync ApiCall + cached lifecycle + // + NotifySend) plus the central NotifyDeliver rows. + var siteRows = await repo.QueryAsync( + new AuditLogQueryFilter(SourceSiteIds: new[] { siteId }), + new AuditLogPaging(PageSize: 100)); + + // sync ApiCall (1) + cached Submit/Attempted/Resolve (3) + NotifySend (1) + // + NotifyDeliver Attempted/Delivered (2) = 7 rows for the routed run. + Assert.True(siteRows.Count == 7, + "Expected 7 routed-run audit rows; saw: " + + string.Join(", ", siteRows.Select(r => $"{r.Channel}/{r.Kind}/{r.Status}"))); + Assert.Single(siteRows, r => r.Channel == AuditChannel.ApiOutbound && r.Kind == AuditKind.ApiCall); + Assert.Single(siteRows, r => r.Kind == AuditKind.CachedSubmit); + Assert.Single(siteRows, r => r.Kind == AuditKind.CachedResolve); + Assert.Single(siteRows, r => r.Kind == AuditKind.NotifySend); + Assert.Equal(2, siteRows.Count(r => r.Kind == AuditKind.NotifyDeliver)); + + // CORE PROMISE: every routed-run row carries the SAME non-null + // ParentExecutionId — the inbound request's ExecutionId. + var parentIds = siteRows.Select(r => r.ParentExecutionId).Distinct().ToList(); + Assert.Single(parentIds); + Assert.NotNull(parentIds[0]); + var inboundExecutionId = parentIds[0]!.Value; + + // The routed run has its OWN distinct ExecutionId — not the parent's. + var routedExecutionIds = siteRows + .Select(r => r.ExecutionId) + .Distinct() + .ToList(); + Assert.Single(routedExecutionIds); + Assert.NotNull(routedExecutionIds[0]); + var routedExecutionId = routedExecutionIds[0]!.Value; + Assert.NotEqual(inboundExecutionId, routedExecutionId); + + // The inbound request's own InboundRequest row is TOP-LEVEL — + // ExecutionId = the propagated id, ParentExecutionId = NULL. + var inboundRows = await repo.QueryAsync( + new AuditLogQueryFilter(ExecutionId: inboundExecutionId), + new AuditLogPaging(PageSize: 10)); + var inboundRow = Assert.Single(inboundRows, + r => r.Channel == AuditChannel.ApiInbound && r.Kind == AuditKind.InboundRequest); + Assert.Equal(AuditStatus.Delivered, inboundRow.Status); + Assert.Null(inboundRow.ParentExecutionId); + + // The parentExecutionId filter pulls the routed run's complete + // trust-boundary footprint (all 7 routed rows, none of the inbound). + var byParent = await repo.QueryAsync( + new AuditLogQueryFilter(ParentExecutionId: inboundExecutionId), + new AuditLogPaging(PageSize: 100)); + Assert.Equal(7, byParent.Count); + Assert.All(byParent, r => Assert.Equal(routedExecutionId, r.ExecutionId)); + + // GetExecutionTreeAsync returns BOTH executions in one chain — + // inbound (root) and routed (child), regardless of entry point. + var treeFromChild = await repo.GetExecutionTreeAsync(routedExecutionId); + AssertChain(treeFromChild, inboundExecutionId, routedExecutionId); + var treeFromRoot = await repo.GetExecutionTreeAsync(inboundExecutionId); + AssertChain(treeFromRoot, inboundExecutionId, routedExecutionId); + }, TimeSpan.FromSeconds(30)); + } + + /// + /// Asserts the execution tree is the expected two-node inbound→routed chain: + /// the inbound execution is the root (ParentExecutionId = NULL) and the + /// routed execution's ParentExecutionId points back at it. + /// + private static void AssertChain( + IReadOnlyList tree, + Guid inboundExecutionId, + Guid routedExecutionId) + { + Assert.Equal(2, tree.Count); + var root = Assert.Single(tree, n => n.ExecutionId == inboundExecutionId); + Assert.Null(root.ParentExecutionId); + var child = Assert.Single(tree, n => n.ExecutionId == routedExecutionId); + Assert.Equal(inboundExecutionId, child.ParentExecutionId); + } + + /// + /// Spins up a minimal in-memory ASP.NET host whose pipeline mirrors the + /// production inbound-API arrangement: routing → the real + /// → the POST /api/{methodName} + /// endpoint. The middleware mints + stashes the inbound request's + /// ExecutionId and emits the top-level + /// row via the supplied . + /// + private static async Task BuildInboundHostAsync( + ICentralAuditWriter centralAuditWriter, + RequestDelegate endpointHandler) + { + var hostBuilder = new HostBuilder() + .ConfigureWebHost(webBuilder => + { + webBuilder + .UseTestServer() + .ConfigureServices(services => + { + services.AddSingleton(centralAuditWriter); + services.AddRouting(); + }) + .Configure(app => + { + app.UseRouting(); + app.UseAuditWriteMiddleware(); + app.UseEndpoints(endpoints => + { + endpoints.MapPost("/api/{methodName}", endpointHandler); + }); + }); + }); + + return await hostBuilder.StartAsync(); + } + + /// + /// Reads the genuine site-produced the routed + /// Notify.Send buffered into Store-and-Forward, then drives it through + /// a real central so the + /// dispatch rows materialise. The + /// dispatcher echoes OriginParentExecutionId off the + /// NotificationSubmit onto every NotifyDeliver row — the + /// cross-execution linkage under test on the central side. + /// + private async Task ForwardBufferedNotificationToCentralAsync( + StoreAndForwardService storeAndForward, + string notificationId, + IServiceProvider centralProvider, + ICentralAuditWriter centralAuditWriter) + { + var buffered = await storeAndForward.GetMessageByIdAsync(notificationId); + Assert.NotNull(buffered); + var submit = JsonSerializer.Deserialize(buffered!.PayloadJson); + Assert.NotNull(submit); + // The routed Notify.Send stamped the inbound request's ExecutionId as the + // submission's OriginParentExecutionId — proven separately on the + // NotifyDeliver rows, but asserted here too as the central handoff input. + Assert.NotNull(submit!.OriginParentExecutionId); + + // The outbox actor runs over the long-lived central provider (which + // carries the AlwaysDeliversAdapter) so the dispatch sweep — launched + // asynchronously by the DispatchTick — still has a live IServiceProvider + // to resolve its per-sweep scope from. + var outboxActor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + centralProvider, + new NotificationOutboxOptions + { + // Long timers so PreStart's scheduled ticks never fire — the + // test drives ingest + dispatch explicitly. + DispatchInterval = TimeSpan.FromHours(1), + PurgeInterval = TimeSpan.FromDays(1), + }, + centralAuditWriter, + NullLogger.Instance))); + + // Ingest the genuine site submission, then run one dispatch sweep. + var ack = await outboxActor.Ask( + submit, TimeSpan.FromSeconds(15)); + Assert.True(ack.Accepted, ack.Error); + outboxActor.Tell(InternalMessages.DispatchTick.Instance); + } + + /// + /// Stub that always reports a + /// successful delivery — a single dispatch sweep then yields one + /// + one + /// row. + /// + private sealed class AlwaysDeliversAdapter : INotificationDeliveryAdapter + { + public NotificationType Type => NotificationType.Email; + + public Task DeliverAsync( + ScadaLink.Commons.Entities.Notifications.Notification notification, + CancellationToken cancellationToken = default) + => Task.FromResult(DeliveryOutcome.Success("ops@example.com")); + } + + /// + /// In-process stand-in for the cross-cluster routing transport + /// (CommunicationServiceInstanceRouter → + /// CommunicationService → site DeploymentManagerActor). On a + /// routed Call it does exactly what the production site handler does: + /// it reads off the wire + /// request and threads it into a fresh routed + /// as parentExecutionId, then runs the routed script's three + /// trust-boundary actions (sync ExternalSystem.Call, a cached call and + /// a Notify.Send). The routed context still mints its OWN fresh + /// ExecutionId — only the parent pointer is inherited. + /// + private sealed class BridgingInstanceRouter : IInstanceRouter + { + private readonly string _siteId; + private readonly IExternalSystemClient _externalClient; + private readonly IAuditWriter _auditWriter; + private readonly ICachedCallTelemetryForwarder _cachedForwarder; + private readonly StoreAndForwardService _storeAndForward; + + /// + /// The NotificationId the routed Notify.Send minted, captured + /// so the test can drain the buffered . + /// + public string? NotificationId { get; private set; } + + public BridgingInstanceRouter( + string siteId, + IExternalSystemClient externalClient, + IAuditWriter auditWriter, + ICachedCallTelemetryForwarder cachedForwarder, + StoreAndForwardService storeAndForward) + { + _siteId = siteId; + _externalClient = externalClient; + _auditWriter = auditWriter; + _cachedForwarder = cachedForwarder; + _storeAndForward = storeAndForward; + } + + public async Task RouteToCallAsync( + string siteId, RouteToCallRequest request, CancellationToken cancellationToken) + { + var compilationService = new ScriptCompilationService( + NullLogger.Instance); + var sharedScriptLibrary = new SharedScriptLibrary( + compilationService, NullLogger.Instance); + + // Mirror DeploymentManagerActor → ScriptActor → ScriptExecutionActor: + // the routed script execution gets its OWN fresh ExecutionId, and the + // inbound request's ExecutionId arrives as ParentExecutionId. + var routedContext = new ScriptRuntimeContext( + ActorRefs.Nobody, + ActorRefs.Nobody, + sharedScriptLibrary, + currentCallDepth: 0, + maxCallDepth: 10, + askTimeout: TimeSpan.FromSeconds(5), + instanceName: request.InstanceUniqueName, + logger: NullLogger.Instance, + externalSystemClient: _externalClient, + databaseGateway: null, + storeAndForward: _storeAndForward, + siteCommunicationActor: null, + siteId: _siteId, + sourceScript: $"ScriptActor:{request.ScriptName}", + auditWriter: _auditWriter, + operationTrackingStore: null, + cachedForwarder: _cachedForwarder, + executionId: null, + parentExecutionId: request.ParentExecutionId); + + // The routed site script's body: a sync ExternalSystem.Call, a cached + // call, and a Notify.Send — three distinct trust-boundary actions of + // the one routed execution. + await routedContext.ExternalSystem.Call(ExternalSystemName, ExternalMethodName); + await routedContext.ExternalSystem.CachedCall(ExternalSystemName, ExternalMethodName); + NotificationId = await routedContext.Notify + .To(NotifyListName) + .Send("Routed run alert", "inbound-routed script fired"); + + return new RouteToCallResponse( + request.CorrelationId, true, "routed-ok", null, DateTimeOffset.UtcNow); + } + + public Task RouteToGetAttributesAsync( + string siteId, RouteToGetAttributesRequest request, CancellationToken cancellationToken) + => throw new NotSupportedException(); + + public Task RouteToSetAttributesAsync( + string siteId, RouteToSetAttributesRequest request, CancellationToken cancellationToken) + => throw new NotSupportedException(); + } +}